Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 50 additions & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -262,6 +262,7 @@ target_sources(viam-trajex-totg
src/viam/trajex/trajex.cpp
src/viam/trajex/totg/observers.cpp
src/viam/trajex/totg/path.cpp
src/viam/trajex/totg/streaming/session.cpp
src/viam/trajex/totg/trajectory.cpp
src/viam/trajex/totg/uniform_sampler.cpp
src/viam/trajex/totg/waypoint_accumulator.cpp
Expand Down Expand Up @@ -515,6 +516,55 @@ if (VIAM_TRAJEX_BUILD_TESTS)
)


# Separate binary for streaming session tests so they can be looped fast
# without paying the integration suite's runtime.
add_executable(viam-trajex-totg-streaming-test)

target_sources(viam-trajex-totg-streaming-test
PRIVATE
src/viam/trajex/totg/streaming/test/main.cpp
src/viam/trajex/totg/streaming/test/session.cpp
)

target_link_libraries(viam-trajex-totg-streaming-test
PRIVATE
viam::trajex::totg
Boost::boost
)

target_compile_definitions(viam-trajex-totg-streaming-test PRIVATE
VIAM_TRAJEX_STREAMING_TEST_DATA_DIR="${CMAKE_CURRENT_SOURCE_DIR}/src/viam/trajex/totg/streaming/test/data"
)

apply_wall_werror(viam-trajex-totg-streaming-test)

add_test(
NAME viam-trajex-totg-streaming-test
COMMAND viam-trajex-totg-streaming-test
)


# Streaming-session simulator. Standalone binary, not wired into ctest --
# see orbsanding-streaming-session-simulation-plan.md.
add_executable(viam-trajex-totg-streaming-sim-exe)

target_sources(viam-trajex-totg-streaming-sim-exe
PRIVATE
src/viam/trajex/totg/streaming/test/sim/streaming_sim_main.cpp
)

set_target_properties(viam-trajex-totg-streaming-sim-exe PROPERTIES
OUTPUT_NAME viam-trajex-totg-streaming-sim
)

target_link_libraries(viam-trajex-totg-streaming-sim-exe
PRIVATE
viam::trajex::totg::tools
)

apply_wall_werror(viam-trajex-totg-streaming-sim-exe)


add_executable(viam-trajex-totg-tools-test)

target_sources(viam-trajex-totg-tools-test
Expand Down
2 changes: 1 addition & 1 deletion go/totg/rdk/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ func (s *Service) Metadata(_ context.Context) (mlmodel.MLMetadata, error) {
{
Name: totg.KeyTrajectorySamplingFreqHz,
Description: "Output sample rate, in Hz [scalar]",
DataType: "int64",
DataType: "float64",
Shape: []int{1},
},
},
Expand Down
8 changes: 4 additions & 4 deletions go/totg/rdk/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ func buildInputs() ml.Tensors {
),
totg.KeyTrajectorySamplingFreqHz: tensor.New(
tensor.WithShape(1),
tensor.WithBacking([]int64{100}),
tensor.WithBacking([]float64{100.0}),
),
}
}
Expand Down Expand Up @@ -170,7 +170,7 @@ func TestMinimalTrajectoryLatency(t *testing.T) {
),
totg.KeyTrajectorySamplingFreqHz: tensor.New(
tensor.WithShape(1),
tensor.WithBacking([]int64{100}),
tensor.WithBacking([]float64{100.0}),
),
}

Expand Down Expand Up @@ -283,7 +283,7 @@ func TestHypercubeTortureLatency(t *testing.T) {
),
totg.KeyTrajectorySamplingFreqHz: tensor.New(
tensor.WithShape(1),
tensor.WithBacking([]int64{10}),
tensor.WithBacking([]float64{10.0}),
),
}

Expand Down Expand Up @@ -346,7 +346,7 @@ func TestPhase4Gate(t *testing.T) {
totg.KeyVelocityLimitsRadsPerSec: tensor.New(tensor.WithShape(nDOF), tensor.WithBacking(velLimits)),
totg.KeyAccelerationLimitsRadsPerSec2: tensor.New(tensor.WithShape(nDOF), tensor.WithBacking(accLimits)),
totg.KeyPathToleranceDeltaRads: tensor.New(tensor.WithShape(1), tensor.WithBacking([]float64{0.05})),
totg.KeyTrajectorySamplingFreqHz: tensor.New(tensor.WithShape(1), tensor.WithBacking([]int64{200})),
totg.KeyTrajectorySamplingFreqHz: tensor.New(tensor.WithShape(1), tensor.WithBacking([]float64{200.0})),
}

s := newTestService(t)
Expand Down
185 changes: 185 additions & 0 deletions go/totg/streaming/session.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,185 @@
//go:build !windows && !no_cgo

// Package streaming exposes the trajex TOTG streaming session as a Go API,
// layered on top of github.com/viam-modules/trajex's TensorMap. Sessions are
// stateful: callers construct a session with a fixed configuration, extend it
// with waypoint batches over time, and pull samples incrementally.
//
// The schema-key constants (KeyWaypointsRads, KeyVelocityLimitsRadsPerSec,
// etc.) are re-exported from the parent totg package so callers using both
// stateless Generate and streaming sessions need to import only one set of
// key names.
package streaming

/*
#cgo CFLAGS: -I${SRCDIR}/../../../src/viam/trajex/capi

#include "capi.h"
*/
import "C"

import (
"context"
"time"

"github.com/pkg/errors"

trajex "github.com/viam-modules/trajex/go"
"github.com/viam-modules/trajex/go/totg"
)

// Re-export the shared schema-key constants from totg so streaming callers
// have a single source of truth. The values are identical to totg.Key* by
// construction (both initialized from the same C externs at package init).
var (
KeyVelocityLimitsRadsPerSec = totg.KeyVelocityLimitsRadsPerSec //nolint:revive
KeyAccelerationLimitsRadsPerSec2 = totg.KeyAccelerationLimitsRadsPerSec2 //nolint:revive
KeyPathToleranceDeltaRads = totg.KeyPathToleranceDeltaRads //nolint:revive
KeyPathColinearizationRatio = totg.KeyPathColinearizationRatio //nolint:revive
KeyTrajectorySamplingFreqHz = totg.KeyTrajectorySamplingFreqHz //nolint:revive
KeyWaypointsRads = totg.KeyWaypointsRads //nolint:revive
KeySampleTimesSec = totg.KeySampleTimesSec //nolint:revive
KeyConfigurationsRads = totg.KeyConfigurationsRads //nolint:revive
KeyVelocitiesRadsPerSec = totg.KeyVelocitiesRadsPerSec //nolint:revive
KeyAccelerationsRadsPerSec2 = totg.KeyAccelerationsRadsPerSec2 //nolint:revive
)

// Session is a Go-owned handle to a CAPI streaming session. Close must be
// called (typically via defer) to release the underlying C resource. Session
// is not safe for concurrent use; concurrent operations on the same handle
// are the caller's responsibility, matching the C ABI's contract.
type Session struct {
handle *C.viam_trajex_totg_streaming_session_t
}

// New constructs a streaming session from a configuration tensor map. The
// options map carries the velocity / acceleration limits, path tolerance,
// sample rate, and optional path colinearization ratio; see the C ABI header
// for the full schema. The options map is read-only during construction and
// may be closed by the caller as soon as New returns.
func New(options *trajex.TensorMap) (*Session, error) {
optHandle := (*C.viam_trajex_tensor_map_t)(options.UnsafeHandle())
var errOut *C.char
h := C.viam_trajex_totg_streaming_session_create(optHandle, &errOut)
if h == nil {
msg := C.GoString(errOut)
C.viam_trajex_string_destroy(errOut)
return nil, errors.Errorf("trajex/totg/streaming: New failed: %s", msg)
}
return &Session{handle: h}, nil
}

// Close releases the underlying C session. Safe to call on a nil receiver
// and idempotent: subsequent calls are no-ops.
func (s *Session) Close() {
if s == nil || s.handle == nil {
return
}
C.viam_trajex_totg_streaming_session_destroy(s.handle)
s.handle = nil
}

// Extend appends a waypoint batch to the session. The batch must contain a
// waypoints_rads tensor of shape [n_waypoints, n_dof]; on calls after the
// first, batch[0] must compare bit-exactly equal to the session's most
// recently stored waypoint (the seam contract).
//
// Extend honors ctx at entry and exit: if ctx is already cancelled when
// Extend is called, it returns ctx.Err() without invoking the C ABI; if ctx
// is cancelled while the C call is in flight (which itself cannot be
// interrupted), the result is discarded and ctx.Err() is returned.
func (s *Session) Extend(ctx context.Context, batch *trajex.TensorMap) error {
if err := ctx.Err(); err != nil {
return err
}
batchHandle := (*C.viam_trajex_tensor_map_t)(batch.UnsafeHandle())
var errOut *C.char
rc := C.viam_trajex_totg_streaming_session_extend(s.handle, batchHandle, &errOut)
if rc != 0 {
msg := C.GoString(errOut)
C.viam_trajex_string_destroy(errOut)
return errors.Errorf("trajex/totg/streaming: Extend failed: %s", msg)
}
if err := ctx.Err(); err != nil {
return err
}
return nil
}

// SampleNext pulls up to n samples into outputs. The output map's prior
// contents are replaced. If the session is exhausted, outputs carries
// zero-length sample tensors.
//
// Honors ctx like Extend.
func (s *Session) SampleNext(ctx context.Context, n int, outputs *trajex.TensorMap) error {
if err := ctx.Err(); err != nil {
return err
}
outHandle := (*C.viam_trajex_tensor_map_t)(outputs.UnsafeHandle())
var errOut *C.char
rc := C.viam_trajex_totg_streaming_session_sample_next(s.handle, C.size_t(n), outHandle, &errOut)
if rc != 0 {
msg := C.GoString(errOut)
C.viam_trajex_string_destroy(errOut)
return errors.Errorf("trajex/totg/streaming: SampleNext failed: %s", msg)
}
if err := ctx.Err(); err != nil {
return err
}
return nil
}

// SampleAtLeast pulls samples until the most recent sample's time is at least
// CurrentTime + horizon, writing them into outputs.
//
// Honors ctx like Extend.
func (s *Session) SampleAtLeast(ctx context.Context, horizon time.Duration, outputs *trajex.TensorMap) error {
if err := ctx.Err(); err != nil {
return err
}
outHandle := (*C.viam_trajex_tensor_map_t)(outputs.UnsafeHandle())
var errOut *C.char
rc := C.viam_trajex_totg_streaming_session_sample_at_least(s.handle, C.double(horizon.Seconds()), outHandle, &errOut)
if rc != 0 {
msg := C.GoString(errOut)
C.viam_trajex_string_destroy(errOut)
return errors.Errorf("trajex/totg/streaming: SampleAtLeast failed: %s", msg)
}
if err := ctx.Err(); err != nil {
return err
}
return nil
}

// CurrentTime returns the global time of the most recently emitted sample,
// or zero if no samples have been emitted yet.
func (s *Session) CurrentTime() time.Duration {
var out C.double
C.viam_trajex_totg_streaming_session_current_time_sec(s.handle, &out)
return time.Duration(float64(out) * float64(time.Second))
}

// GenerationCount returns the cumulative number of trajectories the session
// has installed as active (first build + each pivot + each rebase). Zero for
// a fresh session before the first Extend.
func (s *Session) GenerationCount() int64 {
var out C.int64_t
C.viam_trajex_totg_streaming_session_generation_count(s.handle, &out)
return int64(out)
}

// HasActiveTrajectory reports whether the session has an active trajectory.
// False iff fresh (no successful Extend has occurred yet).
func (s *Session) HasActiveTrajectory() bool {
var out C.int
C.viam_trajex_totg_streaming_session_has_active_trajectory(s.handle, &out)
return out != 0
}

// ActiveDuration returns the duration of the active trajectory. Returns zero
// when no active trajectory is present.
func (s *Session) ActiveDuration() time.Duration {
var out C.double
C.viam_trajex_totg_streaming_session_active_duration_sec(s.handle, &out)
return time.Duration(float64(out) * float64(time.Second))
}
Loading
Loading