Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion genesis_forge/gamepads/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from .gamepad import Gamepad
from .logitech import LOGITECH_F710_CONFIG, LOGITECH_F310_CONFIG
from .dragonrise import DRAGONRISE_CONFIG
from .config import GamepadState

__all__ = ["Gamepad", "GamepadState", "LOGITECH_F710_CONFIG", "LOGITECH_F310_CONFIG"]
__all__ = ["Gamepad", "GamepadState", "LOGITECH_F710_CONFIG", "LOGITECH_F310_CONFIG", "DRAGONRISE_CONFIG"]
33 changes: 33 additions & 0 deletions genesis_forge/gamepads/dragonrise.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
"""DragonRise Inc. Generic USB Joystick configuration.

Common cheap USB gamepad found on Amazon/AliExpress.
Vendor ID: 0x0079, Product ID: 0x0006
"""

from .config import GamepadConfig

VENDOR_ID = 0x0079
PRODUCT_ID = 0x0006

# HID data mapping (determined via testing):
# Byte 0: Left stick X (0=left, 128=center, 255=right)
# Byte 1: Left stick Y (0=up/forward, ~109=center, 255=down/back)
# Byte 2: Right stick X (0=left, ~125=center, 255=right)
# Byte 3: Right stick Y (128=center)
# Byte 4: Triggers/Z-axis
# Byte 5: D-pad and face buttons (lower nibble = d-pad, upper nibble = buttons)
# Byte 6: Shoulder buttons
# Byte 7: Mode/special buttons

DRAGONRISE_CONFIG: GamepadConfig = {
"name": "DragonRise Generic USB Joystick",
"vendor_id": VENDOR_ID,
"product_id": PRODUCT_ID,
"mapping": [
# Analog sticks
{"axis": 0, "data": 0}, # Left stick X
{"axis": 1, "data": 1}, # Left stick Y
{"axis": 2, "data": 2}, # Right stick X
{"axis": 3, "data": 3}, # Right stick Y
],
}
5 changes: 4 additions & 1 deletion genesis_forge/gamepads/gamepad.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,12 @@

from .config import GamepadConfig, GamepadState
from .logitech import LOGITECH_F710_CONFIG, LOGITECH_F310_CONFIG
from .dragonrise import DRAGONRISE_CONFIG

GAMEPAD_CONFIGS = [
LOGITECH_F710_CONFIG,
LOGITECH_F310_CONFIG,
DRAGONRISE_CONFIG,
]


Expand Down Expand Up @@ -131,7 +133,8 @@ def _read_loop(self):
"""
while self.is_running:
try:
data = self._device.read(64)
# Use timeout (10ms) to prevent blocking - important for macOS
data = self._device.read(64, timeout_ms=10)
if data:
try:
self._state = self._parse_data(data)
Expand Down
6 changes: 6 additions & 0 deletions genesis_forge/managers/command/command_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -381,10 +381,16 @@ def resample_command(self, env_ids: list[int]):
def _gamepad_axis_command(self, step_count: int) -> torch.Tensor:
"""
Get the command from the gamepad.
Uses per-step caching to avoid redundant reads (command may be accessed multiple times per step).
"""
if self._gamepad_cfg is None:
return self._gamepad_axis_command_buffer

# Cache: only read gamepad once per step
if hasattr(self, '_last_gamepad_step') and self._last_gamepad_step == step_count:
return self._gamepad_axis_command_buffer
self._last_gamepad_step = step_count

gamepad = self._gamepad_cfg["gamepad"]
axis_map = self._gamepad_cfg["axis_map"]

Expand Down
3 changes: 2 additions & 1 deletion genesis_forge/wrappers/__init__.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
from .video import VideoWrapper
from .video import VideoWrapper, capped_cubic_iteration_trigger
from .rsl_rl import RslRlWrapper
from .skrl import SkrlEnvWapper

__all__ = [
"VideoWrapper",
"capped_cubic_iteration_trigger",
"RslRlWrapper",
"SkrlEnvWapper",
]
104 changes: 92 additions & 12 deletions genesis_forge/wrappers/video.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import torch
from genesis_forge.genesis_env import GenesisEnv
from genesis_forge.wrappers.wrapper import Wrapper
from typing import Tuple, Any, Callable, Literal, TYPE_CHECKING
from typing import Tuple, Any, Callable, Literal, Optional, TYPE_CHECKING

if TYPE_CHECKING:
from genesis.vis.camera import Camera
Expand All @@ -30,30 +30,53 @@ def capped_cubic_episode_trigger(episode_id: int) -> bool:
return episode_id % 1000 == 0


def capped_cubic_iteration_trigger(iteration: int) -> bool:
"""The default iteration trigger.

This function will trigger recordings at the iteration indices 0, 1, 8, 27, ..., :math:`k^3`, ..., 729, 1000, 2000, 3000, ...

Args:
iteration: The iteration number

Returns:
If to apply a video schedule number
"""
if iteration < 1000:
return int(round(iteration ** (1.0 / 3))) ** 3 == iteration
else:
return iteration % 1000 == 0


class VideoWrapper(Wrapper):
"""
Automatically record videos during training at a regular step or episode intervals.
Automatically record videos during training at a regular step, episode, or iteration intervals.

Based on the RecordVideo wrapper from Gymnasium: https://gymnasium.farama.org/main/api/wrappers/misc_wrappers/#gymnasium.wrappers.RecordVideo

Recordings will be made from a dedicated camera, which you need to add to your environment (see the example below).

To control how frequently recordings are made specify **either** ``episode_trigger`` **or** ``step_trigger`` (not both).
To control how frequently recordings are made specify **one of** ``episode_trigger``, ``step_trigger``, or ``iteration_trigger``.
They should be functions returning a boolean that indicates whether a recording should be started at the
current episode or step, respectively. If neither :attr:`episode_trigger` nor ``step_trigger`` is passed,
current episode, step, or iteration, respectively. If none are passed,
a default ``episode_trigger`` will be used, which records at the episode indices 0, 1, 8, 27, ..., :math:`k^3`, ..., 729, 1000, 2000, 3000,.

Args:
env: GenesisEnv
camera_attr: The attribute of the base environment that contains the camera to use for recording.
video_length_sec: Length of each video, in seconds. Use this OR ``video_length_iterations``.
video_length_iterations: Length of each video, in iterations. Useful for iteration_trigger to ensure
recording duration doesn't exceed trigger interval. Requires ``num_steps_per_env``.
episode_trigger: Function that accepts an episode count integer and returns ``True`` if a recording should be started at this episode
step_trigger: Function that accepts a step count integer and returns ``True`` if a recording should be started at this step
video_length_sec: Length of each video, in seconds.
iteration_trigger: Function that accepts an iteration count integer and returns ``True`` if a recording should be started at this iteration.
Requires ``num_steps_per_env`` to be set to calculate iteration from step count.
num_steps_per_env: Number of steps per environment per iteration. Required when using ``iteration_trigger`` or ``video_length_iterations``.
initial_iteration: Initial iteration offset (useful when resuming training).
out_dir: Directory to save the videos to.
fps: Frames per second for the video.
env_idx: If triggering on episode, this is the index of the environment to be counting episodes for.
filename: The filename for the video.
If None, the video will automatically be named for the current step.
If None, the video will automatically be named for the current step (or iteration if using iteration_trigger).
If defined, each video will overwrite the previous video with this name.

Example::
Expand Down Expand Up @@ -88,15 +111,31 @@ def train():
out_dir="./videos",
step_trigger=lambda step: step % 1500 == 0
)

Record every 50 iterations (useful for RL training)::

env = MyEnv()
env = VideoWrapper(
env,
camera_attr="camera",
out_dir="./videos",
iteration_trigger=lambda it: it % 50 == 0,
num_steps_per_env=24, # Should match your training config
video_length_iterations=1, # Record 1 iteration per video
)
"""

def __init__(
self,
env: GenesisEnv,
camera_attr: str = "camera",
video_length_sec: int = 8,
video_length_sec: Optional[int] = None,
video_length_iterations: Optional[int] = None,
episode_trigger: Callable[[int], bool] | None = None,
step_trigger: Callable[[int], bool] | None = None,
iteration_trigger: Callable[[int], bool] | None = None,
num_steps_per_env: Optional[int] = None,
initial_iteration: int = 0,
out_dir: str = "./videos",
fps: int = 60,
env_idx: int = 0,
Expand All @@ -109,7 +148,10 @@ def __init__(
self._logging: bool = logging
self._current_step: int = 0
self._current_episode: int = 0
self._current_iteration: int = initial_iteration
self._last_triggered_iteration: int = initial_iteration - 1
self._recording_start_step: int = 0
self._recording_start_iteration: int = 0
self._recording_stop_step: int = 0
self._record_final_episode = record_final_episode
self._has_recording_buffer = False
Expand All @@ -123,19 +165,39 @@ def __init__(
self._camera_attr = camera_attr
self._out_dir = out_dir
self._filename = filename
self._video_length_steps = math.ceil(video_length_sec / self.dt)
self._steps_per_frame = round(1.0 / fps / self.dt)
self._actual_fps = round(1.0 / self.dt / self._steps_per_frame)
self._env_idx = env_idx

if episode_trigger is None and step_trigger is None:
# Iteration tracking
self._num_steps_per_env = num_steps_per_env
self._initial_iteration = initial_iteration
self._video_length_iterations = video_length_iterations

# Calculate video length in steps
if video_length_iterations is not None:
if num_steps_per_env is None:
raise ValueError("num_steps_per_env must be specified when using video_length_iterations")
self._video_length_steps = video_length_iterations * num_steps_per_env
elif video_length_sec is not None:
self._video_length_steps = math.ceil(video_length_sec / self.dt)
else:
# Default: 8 seconds
self._video_length_steps = math.ceil(8 / self.dt)

# Validate trigger configuration
if iteration_trigger is not None and num_steps_per_env is None:
raise ValueError("num_steps_per_env must be specified when using iteration_trigger")

if episode_trigger is None and step_trigger is None and iteration_trigger is None:
episode_trigger = capped_cubic_episode_trigger

trigger_count = sum(x is not None for x in [episode_trigger, step_trigger])
assert trigger_count == 1, "Must specify only one trigger"
trigger_count = sum(x is not None for x in [episode_trigger, step_trigger, iteration_trigger])
assert trigger_count == 1, "Must specify only one trigger (episode_trigger, step_trigger, or iteration_trigger)"

self.episode_trigger = episode_trigger
self.step_trigger = step_trigger
self.iteration_trigger = iteration_trigger

os.makedirs(self._out_dir, exist_ok=True)

Expand Down Expand Up @@ -166,6 +228,10 @@ def step(
extras,
) = super().step(actions)

# Update iteration count if using iteration trigger
if self._num_steps_per_env is not None:
self._current_iteration = self._initial_iteration + (self._current_step // self._num_steps_per_env)

self._check_recording_trigger()
if self._current_step % self._steps_per_frame == 0:
self._cam.render()
Expand Down Expand Up @@ -208,6 +274,7 @@ def start_recording(self, type: RecordingType = "active"):
self._has_recording_buffer = False
self._recording_type = type
self._recording_start_step = self._current_step
self._recording_start_iteration = self._current_iteration
self._recording_stop_step = self._current_step + self._video_length_steps
self._cam.start_recording()

Expand All @@ -220,7 +287,13 @@ def finish_recording(self):

# Save recording
if self._recording_type == "active":
filename = self._filename or f"{self._recording_start_step}.mp4"
# Use iteration-based filename if iteration_trigger is active
if self._filename:
filename = self._filename
elif self.iteration_trigger is not None:
filename = f"iter_{self._recording_start_iteration}.mp4"
else:
filename = f"{self._recording_start_step}.mp4"
filepath = os.path.join(self._out_dir, filename)
if self._logging:
print(f"Saving recording to {filepath}")
Expand All @@ -237,12 +310,19 @@ def finish_recording(self):

def _check_recording_trigger(self) -> bool:
"""Check if a recording should be started"""
record = False
if self._is_recording and self._recording_type == "active":
record = False
elif self.episode_trigger is not None:
record = self.episode_trigger(self._current_episode)
elif self.step_trigger is not None:
record = self.step_trigger(self._current_step)
elif self.iteration_trigger is not None:
# Only trigger once per iteration (at the start of each new iteration)
if self._current_iteration > self._last_triggered_iteration:
if self.iteration_trigger(self._current_iteration):
record = True
self._last_triggered_iteration = self._current_iteration

if record:
self.start_recording()
Expand Down