Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions dimos/memory2/architecture.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,9 @@ Supporting Systems:
| `transform.py` | Transformer ABC, FnTransformer, FnIterTransformer, QualityWindow |
| `buffer.py` | Backpressure buffers for live mode (KeepLast, Bounded, Unbounded) |
| `embed.py` | EmbedImages / EmbedText transformers |
| `tick.py` | Tick assembly — samplers (tick/latest/interpolate/window) + TickMachine |
| `puremodule.py`| PureModule — pure `step` over aligned inputs; live ports or offline `over()` ([usage](/docs/usage/pure_modules.md), [design notes](puremodule.md)) |
| `health.py` | HealthMonitor — drop counters, contracts, `_health` stream for PureModule |

## Subpackages

Expand Down
15 changes: 15 additions & 0 deletions dimos/memory2/buffer.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,15 @@ def close(self) -> None:
@abstractmethod
def __len__(self) -> int: ...

def clone(self) -> BackpressureBuffer[T]:
"""Fresh, empty buffer with the same policy.

Lets a buffer instance serve as a *template* (e.g. a class-level
``backpressure = Bounded(8)``) while each consumer gets its own
independent state. Subclasses with constructor args must override.
"""
return type(self)()

def __iter__(self) -> Iterator[T]:
"""Yield items until the buffer is closed."""
while True:
Expand Down Expand Up @@ -138,6 +147,9 @@ def __init__(self, maxlen: int) -> None:
self._closed = False
self._cond = threading.Condition()

def clone(self) -> Bounded[T]:
return Bounded(self._buf.maxlen or 0)

def put(self, item: T) -> bool:
with self._cond:
if self._closed:
Expand Down Expand Up @@ -178,6 +190,9 @@ def __init__(self, maxlen: int) -> None:
self._closed = False
self._cond = threading.Condition()

def clone(self) -> DropNew[T]:
return DropNew(self._maxlen)

def put(self, item: T) -> bool:
with self._cond:
if self._closed or len(self._buf) >= self._maxlen:
Expand Down
Loading
Loading