From 8ad373a8af2de73d04c7b2ee619d25bc0324e7b0 Mon Sep 17 00:00:00 2001 From: akrivi <20168326+akrivi@users.noreply.github.com> Date: Sun, 17 May 2026 20:15:29 -0600 Subject: [PATCH 1/2] add infrastructure for partitioned sample accumulators --- PRASCore.jl/src/Results/Results.jl | 69 +++++++++++++++++++++++++++--- 1 file changed, 62 insertions(+), 7 deletions(-) diff --git a/PRASCore.jl/src/Results/Results.jl b/PRASCore.jl/src/Results/Results.jl index 3e4e3300..b1ff251a 100644 --- a/PRASCore.jl/src/Results/Results.jl +++ b/PRASCore.jl/src/Results/Results.jl @@ -5,6 +5,7 @@ import OnlineStats: Series import OnlineStatsBase: EqualWeight, Mean, Variance, value import Printf: @sprintf import StatsBase: mean, std, stderror +import Dates: Date import ..Systems: SystemModel, ZonedDateTime, Period, PowerUnit, EnergyUnit, conversionfactor, @@ -12,7 +13,8 @@ import ..Systems: SystemModel, ZonedDateTime, Period, export # Metrics - ReliabilityMetric, LOLE, EUE, NEUE, + ReliabilityMetric, LOLE, EUE, NEUE, LOLD, LOLEv, + MeanEventDuration, MaxEventDuration, MeanEventEnergy, MaxEventEnergy, val, stderror, # Result specifications @@ -25,7 +27,7 @@ export DemandResponseEnergy, DemandResponseEnergySamples, GeneratorAvailability, StorageAvailability, GeneratorStorageAvailability,DemandResponseAvailability, - LineAvailability + LineAvailability, ShortfallEvents include("utils.jl") include("metrics.jl") @@ -34,6 +36,8 @@ abstract type ResultSpec end abstract type ResultAccumulator{R<:ResultSpec} end +issamplebased(::ResultSpec) = false + abstract type Result{ N, # Number of timesteps simulated L, # Length of each simulation timestep @@ -166,12 +170,29 @@ getindex(x::AbstractEnergyResult, name::String, ::Colon) = getindex(x::AbstractEnergyResult, ::Colon, ::Colon) = getindex.(x, names(x), permutedims(x.timestamps)) +abstract type AbstractShortfallEventResult{N,L,T} <: Result{N,L,T} end + include("StorageEnergy.jl") include("GeneratorStorageEnergy.jl") include("DemandResponseEnergy.jl") include("StorageEnergySamples.jl") include("GeneratorStorageEnergySamples.jl") include("DemandResponseEnergySamples.jl") +include("ShortfallEvents.jl") + +issamplebased(::ShortfallSamples) = true +issamplebased(::DemandResponseShortfallSamples) = true +issamplebased(::SurplusSamples) = true +issamplebased(::FlowSamples) = true +issamplebased(::UtilizationSamples) = true +issamplebased(::StorageEnergySamples) = true +issamplebased(::GeneratorStorageEnergySamples) = true +issamplebased(::DemandResponseEnergySamples) = true +issamplebased(::GeneratorAvailability) = true +issamplebased(::StorageAvailability) = true +issamplebased(::GeneratorStorageAvailability) = true +issamplebased(::DemandResponseAvailability) = true +issamplebased(::LineAvailability) = true function resultchannel( results::T, threads::Int @@ -185,18 +206,52 @@ end merge!(xs::T, ys::T) where T <: Tuple{Vararg{ResultAccumulator}} = foreach(merge!, xs, ys) +function copy_sample_partition!( + x::A, + y::A, + sampleids::UnitRange{Int}, +) where {A<:ResultAccumulator} + + field = fieldnames(A)[1] + xarr = getfield(x, field) + yarr = getfield(y, field) + + @views xarr[:, :, sampleids] .= yarr + return +end + function finalize( - results::Channel{<:Tuple{Vararg{ResultAccumulator}}}, + results::Channel, system::SystemModel{N,L,T,P,E}, - threads::Int + threads::Int, + nsamples::Int, + resultspecs::Tuple{Vararg{ResultSpec}}, ) where {N,L,T,P,E} - total_result = take!(results) + first_recorders, first_sampleids = take!(results) + + total_result = map(resultspecs, first_recorders) do spec, recorder + issamplebased(spec) ? accumulator(system, nsamples, spec) : recorder + end + + for i in eachindex(total_result) + if issamplebased(resultspecs[i]) + copy_sample_partition!(total_result[i], first_recorders[i], first_sampleids) + end + end for _ in 2:threads - thread_result = take!(results) - merge!(total_result, thread_result) + thread_recorders, sampleids = take!(results) + + for i in eachindex(total_result) + if issamplebased(resultspecs[i]) + copy_sample_partition!(total_result[i], thread_recorders[i], sampleids) + else + merge!(total_result[i], thread_recorders[i]) + end + end end + close(results) return finalize.(total_result, system) From f6313c9f4a4db99be326055c8b2df519a0dfb60f Mon Sep 17 00:00:00 2001 From: akrivi <20168326+akrivi@users.noreply.github.com> Date: Sun, 17 May 2026 21:21:56 -0600 Subject: [PATCH 2/2] partition threaded sample accumulators by sample range --- PRASCore.jl/src/Results/Results.jl | 9 +- PRASCore.jl/src/Simulations/Simulations.jl | 120 ++++++++++++++------- PRASCore.jl/test/Simulations/runtests.jl | 18 ++++ 3 files changed, 100 insertions(+), 47 deletions(-) diff --git a/PRASCore.jl/src/Results/Results.jl b/PRASCore.jl/src/Results/Results.jl index b1ff251a..1ca980d4 100644 --- a/PRASCore.jl/src/Results/Results.jl +++ b/PRASCore.jl/src/Results/Results.jl @@ -5,7 +5,6 @@ import OnlineStats: Series import OnlineStatsBase: EqualWeight, Mean, Variance, value import Printf: @sprintf import StatsBase: mean, std, stderror -import Dates: Date import ..Systems: SystemModel, ZonedDateTime, Period, PowerUnit, EnergyUnit, conversionfactor, @@ -13,8 +12,7 @@ import ..Systems: SystemModel, ZonedDateTime, Period, export # Metrics - ReliabilityMetric, LOLE, EUE, NEUE, LOLD, LOLEv, - MeanEventDuration, MaxEventDuration, MeanEventEnergy, MaxEventEnergy, + ReliabilityMetric, LOLE, EUE, NEUE, val, stderror, # Result specifications @@ -27,7 +25,7 @@ export DemandResponseEnergy, DemandResponseEnergySamples, GeneratorAvailability, StorageAvailability, GeneratorStorageAvailability,DemandResponseAvailability, - LineAvailability, ShortfallEvents + LineAvailability include("utils.jl") include("metrics.jl") @@ -170,15 +168,12 @@ getindex(x::AbstractEnergyResult, name::String, ::Colon) = getindex(x::AbstractEnergyResult, ::Colon, ::Colon) = getindex.(x, names(x), permutedims(x.timestamps)) -abstract type AbstractShortfallEventResult{N,L,T} <: Result{N,L,T} end - include("StorageEnergy.jl") include("GeneratorStorageEnergy.jl") include("DemandResponseEnergy.jl") include("StorageEnergySamples.jl") include("GeneratorStorageEnergySamples.jl") include("DemandResponseEnergySamples.jl") -include("ShortfallEvents.jl") issamplebased(::ShortfallSamples) = true issamplebased(::DemandResponseShortfallSamples) = true diff --git a/PRASCore.jl/src/Simulations/Simulations.jl b/PRASCore.jl/src/Simulations/Simulations.jl index 99b70e96..f4b26e0b 100644 --- a/PRASCore.jl/src/Simulations/Simulations.jl +++ b/PRASCore.jl/src/Simulations/Simulations.jl @@ -4,8 +4,8 @@ import ..Systems: SystemModel, AbstractAssets, Generators, Lines, conversionfactor, energytopower import ..Results -import ..Results: ResultSpec, ResultAccumulator, - accumulator, resultchannel, finalize +import ..Results: ResultSpec, + accumulator, finalize, issamplebased import Base: broadcastable import Base.Threads: nthreads, @spawn @@ -40,7 +40,11 @@ It it recommended that you fix the random seed for reproducibility. - `samples::Int=10_000`: Number of samples - `seed::Integer=rand(UInt64)`: Random seed - `verbose::Bool=false`: Print progress - - `threaded::Bool=true`: Use multi-threading + - `threaded::Bool=true`: Enable threaded Monte Carlo simulation + +When `threaded=true`, sample-level result specifications are internally +partitioned across threads during simulation and assembled into dense +result arrays during finalization. # Returns @@ -65,6 +69,36 @@ end broadcastable(x::SequentialMonteCarlo) = Ref(x) +function sample_ranges(nsamples::Int, nworkers::Int) + base, rem = divrem(nsamples, nworkers) + + ranges = UnitRange{Int}[] + start = 1 + + for i in 1:nworkers + len = base + (i <= rem ? 1 : 0) + + if len > 0 + stop = start + len - 1 + push!(ranges, start:stop) + start = stop + 1 + end + end + + return ranges +end + +function partition_recorders( + system::SystemModel, + nsamples::Int, + local_nsamples::Int, + resultspecs::Tuple{Vararg{ResultSpec}}, +) + return map(resultspecs) do spec + accumulator(system, issamplebased(spec) ? local_nsamples : nsamples, spec) + end +end + """ assess(system::SystemModel, method::SequentialMonteCarlo, resultspecs::ResultSpec...) @@ -79,7 +113,7 @@ and return `resultspecs`. # Returns - - `results::Tuple{Vararg{ResultAccumulator{SequentialMonteCarlo}}}`: PRAS metric results + - `results::Tuple`: PRAS metric results """ function assess( system::SystemModel, @@ -87,75 +121,81 @@ function assess( resultspecs::ResultSpec... ) - threads = nthreads() - sampleseeds = Channel{Int}(2*threads) - results = resultchannel(resultspecs, threads) + threads = method.threaded ? nthreads() : 1 - @spawn makeseeds(sampleseeds, method.nsamples) - - if method.threaded - - if (threads == 1) - @warn "It looks like you haven't configured JULIA_NUM_THREADS before you started the julia repl. \n If you want to use multi-threading, stop the execution and start your julia repl using : \n julia --project --threads auto" - end - - for _ in 1:threads - @spawn assess(system, method, sampleseeds, results, resultspecs...) - end - else - assess(system, method, sampleseeds, results, resultspecs...) + if method.threaded && threads == 1 + @warn "It looks like you haven't configured JULIA_NUM_THREADS before you started the julia repl. \n If you want to use multi-threading, stop the execution and start your julia repl using : \n julia --project --threads auto" end - return finalize(results, system, method.threaded ? threads : 1) + ranges = sample_ranges(method.nsamples, threads) + actual_threads = length(ranges) -end - -function makeseeds(sampleseeds::Channel{Int}, nsamples::Int) + results = Channel{Any}(actual_threads) - for s in 1:nsamples - put!(sampleseeds, s) + for sampleids in ranges + if method.threaded + @spawn assess(system, method, sampleids, results, resultspecs...) + else + assess(system, method, sampleids, results, resultspecs...) + end end - close(sampleseeds) + return finalize(results, system, actual_threads, method.nsamples, resultspecs) end function assess( - system::SystemModel{N}, method::SequentialMonteCarlo, - sampleseeds::Channel{Int}, - results::Channel{<:Tuple{Vararg{ResultAccumulator}}}, + system::SystemModel{N}, + method::SequentialMonteCarlo, + sampleids::UnitRange{Int}, + results::Channel, resultspecs::ResultSpec... ) where N dispatchproblem = DispatchProblem(system) systemstate = SystemState(system) - recorders = accumulator.(system, method.nsamples, resultspecs) + + recorders = partition_recorders( + system, + method.nsamples, + length(sampleids), + resultspecs, + ) # TODO: Test performance of Philox vs Threefry, choice of rounds # Also consider implementing an efficient Bernoulli trial with direct # mantissa comparison rng = Philox4x((0, 0), 10) - for s in sampleseeds + for (local_sampleid, global_sampleid) in enumerate(sampleids) - seed!(rng, (method.seed, s)) + seed!(rng, (method.seed, global_sampleid)) initialize!(rng, systemstate, system) for t in 1:N - advance!(rng, systemstate, dispatchproblem, system, t) solve!(dispatchproblem, systemstate, system, t) - foreach(recorder -> record!( - recorder, system, systemstate, dispatchproblem, s, t - ), recorders) + foreach(recorders) do recorder + record!( + recorder, + system, + systemstate, + dispatchproblem, + local_sampleid, + t, + ) + end end - foreach(recorder -> reset!(recorder, s), recorders) - + foreach(recorders) do recorder + reset!(recorder, local_sampleid) + end end - put!(results, recorders) + put!(results, (recorders, sampleids)) + + return end diff --git a/PRASCore.jl/test/Simulations/runtests.jl b/PRASCore.jl/test/Simulations/runtests.jl index 3c518634..d337cc55 100644 --- a/PRASCore.jl/test/Simulations/runtests.jl +++ b/PRASCore.jl/test/Simulations/runtests.jl @@ -590,5 +590,23 @@ @test isapprox(sum(dr_shortfall_samples["Region 1",dts[5]])/1e4,TestData.threenode_dr_shortfall_samples/1e4, rtol=0.01) end + @testset "Threaded sample result partitioning" begin + simspec_serial = SequentialMonteCarlo(samples=10, seed=123, threaded=false) + simspec_threaded = SequentialMonteCarlo(samples=10, seed=123, threaded=true) + + serial_shortfall, serial_samples = + assess(TestData.singlenode_a, simspec_serial, + Shortfall(), ShortfallSamples()) + + threaded_shortfall, threaded_samples = + assess(TestData.singlenode_a, simspec_threaded, + Shortfall(), ShortfallSamples()) + + @test size(threaded_samples.shortfall) == size(serial_samples.shortfall) + @test length(threaded_samples[]) == simspec_threaded.nsamples + + @test LOLE(threaded_shortfall) ≈ LOLE(threaded_samples) + @test EUE(threaded_shortfall) ≈ EUE(threaded_samples) + end end