From cefaf7e6083e748ca1eebe53235a05a2806b1ac7 Mon Sep 17 00:00:00 2001 From: Jack Feser Date: Wed, 26 Nov 2025 14:06:19 -0500 Subject: [PATCH 1/5] add sample operation for computing a batch of llm results in parallel --- effectful/handlers/llm/sampling.py | 24 ++++++++++++++++++++++-- 1 file changed, 22 insertions(+), 2 deletions(-) diff --git a/effectful/handlers/llm/sampling.py b/effectful/handlers/llm/sampling.py index effc197d..85d5874d 100644 --- a/effectful/handlers/llm/sampling.py +++ b/effectful/handlers/llm/sampling.py @@ -1,11 +1,12 @@ from collections import Counter from concurrent import futures from concurrent.futures.thread import ThreadPoolExecutor +from typing import Callable, Sequence from effectful.handlers.llm import Template from effectful.internals.runtime import get_interpretation, interpreter -from effectful.ops.semantics import fwd -from effectful.ops.syntax import ObjectInterpretation, implements +from effectful.ops.semantics import fwd, handler +from effectful.ops.syntax import ObjectInterpretation, defop, implements class KAheadSampler[**P, T](ObjectInterpretation): @@ -45,3 +46,22 @@ def n_votes_ahead(): tasks.append(executor.submit(interpreter(intp)(fwd), *args, **kwargs)) executor.shutdown() return self.votes.most_common(1)[0][0] + + +def sample[**P, T](template: Template[P, T], n: int) -> Callable[P, Sequence[T]]: + @defop + def in_nested_call() -> bool: + return False + + def _template_call(template, *args, **kwargs): + if in_nested_call(): + return fwd() + + with handler({in_nested_call: lambda: True}): + with ThreadPoolExecutor() as executor: + intp = get_interpretation() + tasks = [executor.submit(interpreter(intp)(fwd)) for _ in range(n)] + completed = futures.wait(tasks, return_when=futures.ALL_COMPLETED) + return [t.result() for t in completed.done] + + return handler({Template.__call__: _template_call})(template) From 27cdbed5e63a07ffadae5dda400b1dc640fb5765 Mon Sep 17 00:00:00 2001 From: Jack Feser Date: Wed, 26 Nov 2025 14:13:12 -0500 Subject: [PATCH 2/5] lint --- effectful/handlers/llm/sampling.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/effectful/handlers/llm/sampling.py b/effectful/handlers/llm/sampling.py index 85d5874d..30648942 100644 --- a/effectful/handlers/llm/sampling.py +++ b/effectful/handlers/llm/sampling.py @@ -1,7 +1,7 @@ from collections import Counter +from collections.abc import Callable, Sequence from concurrent import futures from concurrent.futures.thread import ThreadPoolExecutor -from typing import Callable, Sequence from effectful.handlers.llm import Template from effectful.internals.runtime import get_interpretation, interpreter @@ -48,7 +48,7 @@ def n_votes_ahead(): return self.votes.most_common(1)[0][0] -def sample[**P, T](template: Template[P, T], n: int) -> Callable[P, Sequence[T]]: +def sample(template, n): @defop def in_nested_call() -> bool: return False From 0d8532b3a43484ef8c8e22a0b12904f47bc67559 Mon Sep 17 00:00:00 2001 From: Jack Feser Date: Wed, 26 Nov 2025 14:13:23 -0500 Subject: [PATCH 3/5] lint --- effectful/handlers/llm/sampling.py | 1 - 1 file changed, 1 deletion(-) diff --git a/effectful/handlers/llm/sampling.py b/effectful/handlers/llm/sampling.py index 30648942..4c2d5f78 100644 --- a/effectful/handlers/llm/sampling.py +++ b/effectful/handlers/llm/sampling.py @@ -1,5 +1,4 @@ from collections import Counter -from collections.abc import Callable, Sequence from concurrent import futures from concurrent.futures.thread import ThreadPoolExecutor From 427a3491f0d5b81830005d4db52c2e5f49b3c4d2 Mon Sep 17 00:00:00 2001 From: Jack Feser Date: Mon, 1 Dec 2025 18:47:21 -0500 Subject: [PATCH 4/5] restrict parallel execution to `completion` calls --- effectful/handlers/llm/sampling.py | 41 +++++++++++++++++++++++++++--- 1 file changed, 38 insertions(+), 3 deletions(-) diff --git a/effectful/handlers/llm/sampling.py b/effectful/handlers/llm/sampling.py index 4c2d5f78..eb0e17f8 100644 --- a/effectful/handlers/llm/sampling.py +++ b/effectful/handlers/llm/sampling.py @@ -1,8 +1,10 @@ +import threading from collections import Counter from concurrent import futures from concurrent.futures.thread import ThreadPoolExecutor from effectful.handlers.llm import Template +from effectful.handlers.llm.providers import completion, tool_call from effectful.internals.runtime import get_interpretation, interpreter from effectful.ops.semantics import fwd, handler from effectful.ops.syntax import ObjectInterpretation, defop, implements @@ -52,15 +54,48 @@ def sample(template, n): def in_nested_call() -> bool: return False + lock = threading.Lock() + def _template_call(template, *args, **kwargs): if in_nested_call(): return fwd() with handler({in_nested_call: lambda: True}): with ThreadPoolExecutor() as executor: - intp = get_interpretation() - tasks = [executor.submit(interpreter(intp)(fwd)) for _ in range(n)] + + @interpreter(get_interpretation()) + def do_work(): + lock.acquire() + try: + result = fwd() + finally: + assert lock.locked() + lock.release() + return result + + tasks = [executor.submit(do_work) for _ in range(n)] completed = futures.wait(tasks, return_when=futures.ALL_COMPLETED) return [t.result() for t in completed.done] - return handler({Template.__call__: _template_call})(template) + def _completion(*args, **kwargs): + lock.release() + result = fwd() + lock.acquire() + return result + + def _tool_call(*args, **kwargs): + lock.acquire() + try: + result = fwd() + except Exception as e: + lock.release() + raise e + return result + + return handler( + { + Template.__call__: _template_call, + completion: _completion, + tool_call: _tool_call, + } + )(template) From 4dab7aaed9b47456ee30cdc672656b25506ca8e7 Mon Sep 17 00:00:00 2001 From: Jack Feser Date: Tue, 2 Dec 2025 18:37:36 -0500 Subject: [PATCH 5/5] cleanup --- effectful/handlers/llm/sampling.py | 64 +++++++++++++++--------------- 1 file changed, 31 insertions(+), 33 deletions(-) diff --git a/effectful/handlers/llm/sampling.py b/effectful/handlers/llm/sampling.py index eb0e17f8..dae040a2 100644 --- a/effectful/handlers/llm/sampling.py +++ b/effectful/handlers/llm/sampling.py @@ -1,5 +1,7 @@ +import functools import threading from collections import Counter +from collections.abc import Callable, Sequence from concurrent import futures from concurrent.futures.thread import ThreadPoolExecutor @@ -7,7 +9,7 @@ from effectful.handlers.llm.providers import completion, tool_call from effectful.internals.runtime import get_interpretation, interpreter from effectful.ops.semantics import fwd, handler -from effectful.ops.syntax import ObjectInterpretation, defop, implements +from effectful.ops.syntax import ObjectInterpretation, implements class KAheadSampler[**P, T](ObjectInterpretation): @@ -49,33 +51,16 @@ def n_votes_ahead(): return self.votes.most_common(1)[0][0] -def sample(template, n): - @defop - def in_nested_call() -> bool: - return False +def sample[**P, T](template: Template[P, T], n: int) -> Callable[P, Sequence[T]]: + """sample returns a function with the same signature as `template` except + that `n` samples are returned. - lock = threading.Lock() - - def _template_call(template, *args, **kwargs): - if in_nested_call(): - return fwd() - - with handler({in_nested_call: lambda: True}): - with ThreadPoolExecutor() as executor: + When computing a batch of samples, calls to `completion` (and handlers of + `completion`) proceed in parallel, but other calls (e.g. tool calls) proceed + synchronously. - @interpreter(get_interpretation()) - def do_work(): - lock.acquire() - try: - result = fwd() - finally: - assert lock.locked() - lock.release() - return result - - tasks = [executor.submit(do_work) for _ in range(n)] - completed = futures.wait(tasks, return_when=futures.ALL_COMPLETED) - return [t.result() for t in completed.done] + """ + lock = threading.Lock() def _completion(*args, **kwargs): lock.release() @@ -92,10 +77,23 @@ def _tool_call(*args, **kwargs): raise e return result - return handler( - { - Template.__call__: _template_call, - completion: _completion, - tool_call: _tool_call, - } - )(template) + @functools.wraps(template) + @handler({completion: _completion, tool_call: _tool_call}) + def wrapper(*args, **kwargs): + with ThreadPoolExecutor() as executor: + + @interpreter(get_interpretation()) + def do_work(): + lock.acquire() + try: + result = template(*args, **kwargs) + finally: + assert lock.locked() + lock.release() + return result + + tasks = [executor.submit(do_work) for _ in range(n)] + completed = futures.wait(tasks, return_when=futures.ALL_COMPLETED) + return [t.result() for t in completed.done] + + return wrapper