A high-performance, vectorized scene graph implementation for Python, designed for robotics and 3D scene understanding. It supports multiple graph backends (NetworkX, Rustworkx) and data backends (NumPy, PyTorch), enabling efficient batch updates with a particular focus on GPU acceleration.
- Vectorized Operations: Optimized for batch updates of contiguous node and edge attribute buffers using NumPy or PyTorch.
- Flexible Backends:
- Graph: Choose between
rustworkx(high-performance Rust-based; default)ornetworkx(pure Python; fallback implementation) backends. - Data: Store attributes in
numpyarrays ortorchtensors for seamless integration with deep learning pipelines.
- Graph: Choose between
- Layered Architecture: Organize nodes into logical layers (e.g., "objects", "rooms", "agents") with specific attribute schemas.
- Efficient Memory Management: Uses continuous memory buffers (
DenseBuffer,RaggedBuffer) with a "Swap-and-Pop" strategy for consistent removals. - NVIDIA Warp Integration: Core implementation using NVIDIA Warp to enable high-performance GPU kernels and vectorized operations.
- Python >= 3.9
- NVIDIA Warp: Custom CPU/GPU kernels and backend
- Rustworkx or Networkx (Optional): backend graph operations.
- NumPy: CPU attribute storage
- PyTorch (Optional): GPU-accelerated attribute storage.
git clone https://github.com/your-repo/scene-graph.git
cd scene-graph
pip install .For development:
pip install -e ".[dev]"import numpy as np
from scene_graph.scene_graph import SceneGraph
# Initialize SceneGraph with rustworkx and numpy backends
sg = SceneGraph(graph_backend="rustworkx", default_buffer_capacity=100)
# Add a layer for objects
sg.add_layer("objects", backend="numpy")
# Add a buffer for 3D positions
from scene_graph.buffer import DenseBuffer
mgr = sg._node_layers["objects"]
pos_buffer = DenseBuffer(mgr.index_manager, shape=(3,), dtype="float32")
mgr.add_buffer("position", pos_buffer)
# Reserve space for a new node and set its data
node_id, local_idx = sg.reserve_instance("objects")
data = sg.get_node_data(node_id, "objects")
data["position"] = np.array([1.0, 2.0, 3.0], dtype=np.float32)
data["label"] = "chair" # Fallback to standard dict storage for non-buffered data
print(f"Node {node_id} position: {data['position']}")# Reserve multiple instances at once
global_ids, local_indices = sg.reserve_instances("objects", 10)
# Access the full buffer as a NumPy view/Torch tensor
pos_view = sg.get_layer_node_buffer("objects", "position")
# Batch update positions
new_positions = np.random.rand(10, 3).astype(np.float32)
pos_view[local_indices] = new_positions# Add a layer for connections between objects (intra-layer)
sg.add_layer_connection("objects", "objects")
# Reserve a connection
u, v = global_ids[0], global_ids[1]
sg.reserve_connection(u, v, u_layer="objects", v_layer="objects")
# Access edge data
edge_data = sg.get_edge_data(u, v, u_layer="objects", v_layer="objects")
edge_data["distance"] = 1.5scene_graph/: Core library code.scene_graph.py: MainSceneGraphclass.graph_backend.py: Graph library abstractions (NetworkX/Rustworkx).buffer/: Memory management and vectorized storage.attribute_proxy.py: Transparent access to buffered and non-buffered attributes.index_manager/: Warp implementation of the index management using parallelized hashmaps.
tests/: Comprehensive test suite usingpytest.
pytestThis project is licensed under the Apache License 2.0 - see the LICENSE file for details.