diff --git a/PRASCore.jl/src/Results/Results.jl b/PRASCore.jl/src/Results/Results.jl index 3e4e3300..1ca980d4 100644 --- a/PRASCore.jl/src/Results/Results.jl +++ b/PRASCore.jl/src/Results/Results.jl @@ -34,6 +34,8 @@ abstract type ResultSpec end abstract type ResultAccumulator{R<:ResultSpec} end +issamplebased(::ResultSpec) = false + abstract type Result{ N, # Number of timesteps simulated L, # Length of each simulation timestep @@ -173,6 +175,20 @@ include("StorageEnergySamples.jl") include("GeneratorStorageEnergySamples.jl") include("DemandResponseEnergySamples.jl") +issamplebased(::ShortfallSamples) = true +issamplebased(::DemandResponseShortfallSamples) = true +issamplebased(::SurplusSamples) = true +issamplebased(::FlowSamples) = true +issamplebased(::UtilizationSamples) = true +issamplebased(::StorageEnergySamples) = true +issamplebased(::GeneratorStorageEnergySamples) = true +issamplebased(::DemandResponseEnergySamples) = true +issamplebased(::GeneratorAvailability) = true +issamplebased(::StorageAvailability) = true +issamplebased(::GeneratorStorageAvailability) = true +issamplebased(::DemandResponseAvailability) = true +issamplebased(::LineAvailability) = true + function resultchannel( results::T, threads::Int ) where T <: Tuple{Vararg{ResultSpec}} @@ -185,18 +201,52 @@ end merge!(xs::T, ys::T) where T <: Tuple{Vararg{ResultAccumulator}} = foreach(merge!, xs, ys) +function copy_sample_partition!( + x::A, + y::A, + sampleids::UnitRange{Int}, +) where {A<:ResultAccumulator} + + field = fieldnames(A)[1] + xarr = getfield(x, field) + yarr = getfield(y, field) + + @views xarr[:, :, sampleids] .= yarr + return +end + function finalize( - results::Channel{<:Tuple{Vararg{ResultAccumulator}}}, + results::Channel, system::SystemModel{N,L,T,P,E}, - threads::Int + threads::Int, + nsamples::Int, + resultspecs::Tuple{Vararg{ResultSpec}}, ) where {N,L,T,P,E} - total_result = take!(results) + first_recorders, first_sampleids = take!(results) + + total_result = map(resultspecs, first_recorders) do spec, recorder + issamplebased(spec) ? accumulator(system, nsamples, spec) : recorder + end + + for i in eachindex(total_result) + if issamplebased(resultspecs[i]) + copy_sample_partition!(total_result[i], first_recorders[i], first_sampleids) + end + end for _ in 2:threads - thread_result = take!(results) - merge!(total_result, thread_result) + thread_recorders, sampleids = take!(results) + + for i in eachindex(total_result) + if issamplebased(resultspecs[i]) + copy_sample_partition!(total_result[i], thread_recorders[i], sampleids) + else + merge!(total_result[i], thread_recorders[i]) + end + end end + close(results) return finalize.(total_result, system) diff --git a/PRASCore.jl/src/Simulations/Simulations.jl b/PRASCore.jl/src/Simulations/Simulations.jl index 99b70e96..f4b26e0b 100644 --- a/PRASCore.jl/src/Simulations/Simulations.jl +++ b/PRASCore.jl/src/Simulations/Simulations.jl @@ -4,8 +4,8 @@ import ..Systems: SystemModel, AbstractAssets, Generators, Lines, conversionfactor, energytopower import ..Results -import ..Results: ResultSpec, ResultAccumulator, - accumulator, resultchannel, finalize +import ..Results: ResultSpec, + accumulator, finalize, issamplebased import Base: broadcastable import Base.Threads: nthreads, @spawn @@ -40,7 +40,11 @@ It it recommended that you fix the random seed for reproducibility. - `samples::Int=10_000`: Number of samples - `seed::Integer=rand(UInt64)`: Random seed - `verbose::Bool=false`: Print progress - - `threaded::Bool=true`: Use multi-threading + - `threaded::Bool=true`: Enable threaded Monte Carlo simulation + +When `threaded=true`, sample-level result specifications are internally +partitioned across threads during simulation and assembled into dense +result arrays during finalization. # Returns @@ -65,6 +69,36 @@ end broadcastable(x::SequentialMonteCarlo) = Ref(x) +function sample_ranges(nsamples::Int, nworkers::Int) + base, rem = divrem(nsamples, nworkers) + + ranges = UnitRange{Int}[] + start = 1 + + for i in 1:nworkers + len = base + (i <= rem ? 1 : 0) + + if len > 0 + stop = start + len - 1 + push!(ranges, start:stop) + start = stop + 1 + end + end + + return ranges +end + +function partition_recorders( + system::SystemModel, + nsamples::Int, + local_nsamples::Int, + resultspecs::Tuple{Vararg{ResultSpec}}, +) + return map(resultspecs) do spec + accumulator(system, issamplebased(spec) ? local_nsamples : nsamples, spec) + end +end + """ assess(system::SystemModel, method::SequentialMonteCarlo, resultspecs::ResultSpec...) @@ -79,7 +113,7 @@ and return `resultspecs`. # Returns - - `results::Tuple{Vararg{ResultAccumulator{SequentialMonteCarlo}}}`: PRAS metric results + - `results::Tuple`: PRAS metric results """ function assess( system::SystemModel, @@ -87,75 +121,81 @@ function assess( resultspecs::ResultSpec... ) - threads = nthreads() - sampleseeds = Channel{Int}(2*threads) - results = resultchannel(resultspecs, threads) + threads = method.threaded ? nthreads() : 1 - @spawn makeseeds(sampleseeds, method.nsamples) - - if method.threaded - - if (threads == 1) - @warn "It looks like you haven't configured JULIA_NUM_THREADS before you started the julia repl. \n If you want to use multi-threading, stop the execution and start your julia repl using : \n julia --project --threads auto" - end - - for _ in 1:threads - @spawn assess(system, method, sampleseeds, results, resultspecs...) - end - else - assess(system, method, sampleseeds, results, resultspecs...) + if method.threaded && threads == 1 + @warn "It looks like you haven't configured JULIA_NUM_THREADS before you started the julia repl. \n If you want to use multi-threading, stop the execution and start your julia repl using : \n julia --project --threads auto" end - return finalize(results, system, method.threaded ? threads : 1) + ranges = sample_ranges(method.nsamples, threads) + actual_threads = length(ranges) -end - -function makeseeds(sampleseeds::Channel{Int}, nsamples::Int) + results = Channel{Any}(actual_threads) - for s in 1:nsamples - put!(sampleseeds, s) + for sampleids in ranges + if method.threaded + @spawn assess(system, method, sampleids, results, resultspecs...) + else + assess(system, method, sampleids, results, resultspecs...) + end end - close(sampleseeds) + return finalize(results, system, actual_threads, method.nsamples, resultspecs) end function assess( - system::SystemModel{N}, method::SequentialMonteCarlo, - sampleseeds::Channel{Int}, - results::Channel{<:Tuple{Vararg{ResultAccumulator}}}, + system::SystemModel{N}, + method::SequentialMonteCarlo, + sampleids::UnitRange{Int}, + results::Channel, resultspecs::ResultSpec... ) where N dispatchproblem = DispatchProblem(system) systemstate = SystemState(system) - recorders = accumulator.(system, method.nsamples, resultspecs) + + recorders = partition_recorders( + system, + method.nsamples, + length(sampleids), + resultspecs, + ) # TODO: Test performance of Philox vs Threefry, choice of rounds # Also consider implementing an efficient Bernoulli trial with direct # mantissa comparison rng = Philox4x((0, 0), 10) - for s in sampleseeds + for (local_sampleid, global_sampleid) in enumerate(sampleids) - seed!(rng, (method.seed, s)) + seed!(rng, (method.seed, global_sampleid)) initialize!(rng, systemstate, system) for t in 1:N - advance!(rng, systemstate, dispatchproblem, system, t) solve!(dispatchproblem, systemstate, system, t) - foreach(recorder -> record!( - recorder, system, systemstate, dispatchproblem, s, t - ), recorders) + foreach(recorders) do recorder + record!( + recorder, + system, + systemstate, + dispatchproblem, + local_sampleid, + t, + ) + end end - foreach(recorder -> reset!(recorder, s), recorders) - + foreach(recorders) do recorder + reset!(recorder, local_sampleid) + end end - put!(results, recorders) + put!(results, (recorders, sampleids)) + + return end diff --git a/PRASCore.jl/test/Simulations/runtests.jl b/PRASCore.jl/test/Simulations/runtests.jl index 3c518634..d337cc55 100644 --- a/PRASCore.jl/test/Simulations/runtests.jl +++ b/PRASCore.jl/test/Simulations/runtests.jl @@ -590,5 +590,23 @@ @test isapprox(sum(dr_shortfall_samples["Region 1",dts[5]])/1e4,TestData.threenode_dr_shortfall_samples/1e4, rtol=0.01) end + @testset "Threaded sample result partitioning" begin + simspec_serial = SequentialMonteCarlo(samples=10, seed=123, threaded=false) + simspec_threaded = SequentialMonteCarlo(samples=10, seed=123, threaded=true) + + serial_shortfall, serial_samples = + assess(TestData.singlenode_a, simspec_serial, + Shortfall(), ShortfallSamples()) + + threaded_shortfall, threaded_samples = + assess(TestData.singlenode_a, simspec_threaded, + Shortfall(), ShortfallSamples()) + + @test size(threaded_samples.shortfall) == size(serial_samples.shortfall) + @test length(threaded_samples[]) == simspec_threaded.nsamples + + @test LOLE(threaded_shortfall) ≈ LOLE(threaded_samples) + @test EUE(threaded_shortfall) ≈ EUE(threaded_samples) + end end