Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 55 additions & 5 deletions PRASCore.jl/src/Results/Results.jl
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ abstract type ResultSpec end

abstract type ResultAccumulator{R<:ResultSpec} end

issamplebased(::ResultSpec) = false

abstract type Result{
N, # Number of timesteps simulated
L, # Length of each simulation timestep
Expand Down Expand Up @@ -173,6 +175,20 @@ include("StorageEnergySamples.jl")
include("GeneratorStorageEnergySamples.jl")
include("DemandResponseEnergySamples.jl")

issamplebased(::ShortfallSamples) = true
issamplebased(::DemandResponseShortfallSamples) = true
issamplebased(::SurplusSamples) = true
issamplebased(::FlowSamples) = true
issamplebased(::UtilizationSamples) = true
issamplebased(::StorageEnergySamples) = true
issamplebased(::GeneratorStorageEnergySamples) = true
issamplebased(::DemandResponseEnergySamples) = true
issamplebased(::GeneratorAvailability) = true
issamplebased(::StorageAvailability) = true
issamplebased(::GeneratorStorageAvailability) = true
issamplebased(::DemandResponseAvailability) = true
issamplebased(::LineAvailability) = true

function resultchannel(
results::T, threads::Int
) where T <: Tuple{Vararg{ResultSpec}}
Expand All @@ -185,18 +201,52 @@ end
merge!(xs::T, ys::T) where T <: Tuple{Vararg{ResultAccumulator}} =
foreach(merge!, xs, ys)

function copy_sample_partition!(
x::A,
y::A,
sampleids::UnitRange{Int},
) where {A<:ResultAccumulator}

field = fieldnames(A)[1]
xarr = getfield(x, field)
yarr = getfield(y, field)

@views xarr[:, :, sampleids] .= yarr
return
end

function finalize(
results::Channel{<:Tuple{Vararg{ResultAccumulator}}},
results::Channel,
system::SystemModel{N,L,T,P,E},
threads::Int
threads::Int,
nsamples::Int,
resultspecs::Tuple{Vararg{ResultSpec}},
) where {N,L,T,P,E}

total_result = take!(results)
first_recorders, first_sampleids = take!(results)

total_result = map(resultspecs, first_recorders) do spec, recorder
issamplebased(spec) ? accumulator(system, nsamples, spec) : recorder
end

for i in eachindex(total_result)
if issamplebased(resultspecs[i])
copy_sample_partition!(total_result[i], first_recorders[i], first_sampleids)
end
end

for _ in 2:threads
thread_result = take!(results)
merge!(total_result, thread_result)
thread_recorders, sampleids = take!(results)

for i in eachindex(total_result)
if issamplebased(resultspecs[i])
copy_sample_partition!(total_result[i], thread_recorders[i], sampleids)
else
merge!(total_result[i], thread_recorders[i])
end
end
end

close(results)

return finalize.(total_result, system)
Expand Down
120 changes: 80 additions & 40 deletions PRASCore.jl/src/Simulations/Simulations.jl
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@ import ..Systems: SystemModel, AbstractAssets, Generators, Lines,
conversionfactor, energytopower

import ..Results
import ..Results: ResultSpec, ResultAccumulator,
accumulator, resultchannel, finalize
import ..Results: ResultSpec,
accumulator, finalize, issamplebased

import Base: broadcastable
import Base.Threads: nthreads, @spawn
Expand Down Expand Up @@ -40,7 +40,11 @@ It it recommended that you fix the random seed for reproducibility.
- `samples::Int=10_000`: Number of samples
- `seed::Integer=rand(UInt64)`: Random seed
- `verbose::Bool=false`: Print progress
- `threaded::Bool=true`: Use multi-threading
- `threaded::Bool=true`: Enable threaded Monte Carlo simulation

When `threaded=true`, sample-level result specifications are internally
partitioned across threads during simulation and assembled into dense
result arrays during finalization.

# Returns

Expand All @@ -65,6 +69,36 @@ end

broadcastable(x::SequentialMonteCarlo) = Ref(x)

function sample_ranges(nsamples::Int, nworkers::Int)
base, rem = divrem(nsamples, nworkers)

ranges = UnitRange{Int}[]
start = 1

for i in 1:nworkers
len = base + (i <= rem ? 1 : 0)

if len > 0
stop = start + len - 1
push!(ranges, start:stop)
start = stop + 1
end
end

return ranges
end

function partition_recorders(
system::SystemModel,
nsamples::Int,
local_nsamples::Int,
resultspecs::Tuple{Vararg{ResultSpec}},
)
return map(resultspecs) do spec
accumulator(system, issamplebased(spec) ? local_nsamples : nsamples, spec)
end
end

"""
assess(system::SystemModel, method::SequentialMonteCarlo, resultspecs::ResultSpec...)

Expand All @@ -79,83 +113,89 @@ and return `resultspecs`.

# Returns

- `results::Tuple{Vararg{ResultAccumulator{SequentialMonteCarlo}}}`: PRAS metric results
- `results::Tuple`: PRAS metric results
"""
function assess(
system::SystemModel,
method::SequentialMonteCarlo,
resultspecs::ResultSpec...
)

threads = nthreads()
sampleseeds = Channel{Int}(2*threads)
results = resultchannel(resultspecs, threads)
threads = method.threaded ? nthreads() : 1

@spawn makeseeds(sampleseeds, method.nsamples)

if method.threaded

if (threads == 1)
@warn "It looks like you haven't configured JULIA_NUM_THREADS before you started the julia repl. \n If you want to use multi-threading, stop the execution and start your julia repl using : \n julia --project --threads auto"
end

for _ in 1:threads
@spawn assess(system, method, sampleseeds, results, resultspecs...)
end
else
assess(system, method, sampleseeds, results, resultspecs...)
if method.threaded && threads == 1
@warn "It looks like you haven't configured JULIA_NUM_THREADS before you started the julia repl. \n If you want to use multi-threading, stop the execution and start your julia repl using : \n julia --project --threads auto"
end

return finalize(results, system, method.threaded ? threads : 1)
ranges = sample_ranges(method.nsamples, threads)
actual_threads = length(ranges)

end

function makeseeds(sampleseeds::Channel{Int}, nsamples::Int)
results = Channel{Any}(actual_threads)

for s in 1:nsamples
put!(sampleseeds, s)
for sampleids in ranges
if method.threaded
@spawn assess(system, method, sampleids, results, resultspecs...)
else
assess(system, method, sampleids, results, resultspecs...)
end
end

close(sampleseeds)
return finalize(results, system, actual_threads, method.nsamples, resultspecs)

end

function assess(
system::SystemModel{N}, method::SequentialMonteCarlo,
sampleseeds::Channel{Int},
results::Channel{<:Tuple{Vararg{ResultAccumulator}}},
system::SystemModel{N},
method::SequentialMonteCarlo,
sampleids::UnitRange{Int},
results::Channel,
resultspecs::ResultSpec...
) where N

dispatchproblem = DispatchProblem(system)
systemstate = SystemState(system)
recorders = accumulator.(system, method.nsamples, resultspecs)

recorders = partition_recorders(
system,
method.nsamples,
length(sampleids),
resultspecs,
)

# TODO: Test performance of Philox vs Threefry, choice of rounds
# Also consider implementing an efficient Bernoulli trial with direct
# mantissa comparison
rng = Philox4x((0, 0), 10)

for s in sampleseeds
for (local_sampleid, global_sampleid) in enumerate(sampleids)

seed!(rng, (method.seed, s))
seed!(rng, (method.seed, global_sampleid))
initialize!(rng, systemstate, system)

for t in 1:N

advance!(rng, systemstate, dispatchproblem, system, t)
solve!(dispatchproblem, systemstate, system, t)
foreach(recorder -> record!(
recorder, system, systemstate, dispatchproblem, s, t
), recorders)

foreach(recorders) do recorder
record!(
recorder,
system,
systemstate,
dispatchproblem,
local_sampleid,
t,
)
end
end

foreach(recorder -> reset!(recorder, s), recorders)

foreach(recorders) do recorder
reset!(recorder, local_sampleid)
end
end

put!(results, recorders)
put!(results, (recorders, sampleids))

return

end

Expand Down
18 changes: 18 additions & 0 deletions PRASCore.jl/test/Simulations/runtests.jl
Original file line number Diff line number Diff line change
Expand Up @@ -590,5 +590,23 @@
@test isapprox(sum(dr_shortfall_samples["Region 1",dts[5]])/1e4,TestData.threenode_dr_shortfall_samples/1e4, rtol=0.01)
end

@testset "Threaded sample result partitioning" begin
simspec_serial = SequentialMonteCarlo(samples=10, seed=123, threaded=false)
simspec_threaded = SequentialMonteCarlo(samples=10, seed=123, threaded=true)

serial_shortfall, serial_samples =
assess(TestData.singlenode_a, simspec_serial,
Shortfall(), ShortfallSamples())

threaded_shortfall, threaded_samples =
assess(TestData.singlenode_a, simspec_threaded,
Shortfall(), ShortfallSamples())

@test size(threaded_samples.shortfall) == size(serial_samples.shortfall)
@test length(threaded_samples[]) == simspec_threaded.nsamples

@test LOLE(threaded_shortfall) ≈ LOLE(threaded_samples)
@test EUE(threaded_shortfall) ≈ EUE(threaded_samples)
end

end
Loading