Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 11 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,15 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## [Unreleased]

## [0.7.0] - 2026-05-22

### Added

- `datasets`: Added `client.Datasets.Create` and `client.Datasets.Update` with optional summary and markdown description options.
- `datasets`: Added datapoint query pagination and dynamic datapoint decoding helpers for converting raw protobuf datapoints to maps.
- `examples`: Added a dynamic dataset query example that decodes datapoints without generated Go types.
- `workflows`: Added `client.Automations` to list and inspect automations and storage locations.

## [0.6.0] - 2026-05-20

### Added
Expand Down Expand Up @@ -103,7 +112,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- Added support for Tilebox Observability, including logging and tracing helpers.
- Added examples for using the library.

[Unreleased]: https://github.com/tilebox/tilebox-go/compare/v0.6.0...HEAD
[Unreleased]: https://github.com/tilebox/tilebox-go/compare/v0.7.0...HEAD
[0.6.0]: https://github.com/tilebox/tilebox-go/compare/v0.6.0...v0.7.0
[0.6.0]: https://github.com/tilebox/tilebox-go/compare/v0.5.0...v0.6.0
[0.5.0]: https://github.com/tilebox/tilebox-go/compare/v0.4.0...v0.5.0
[0.4.0]: https://github.com/tilebox/tilebox-go/compare/v0.3.2...v0.4.0
Expand Down
189 changes: 154 additions & 35 deletions datasets/v1/datapoints.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,10 @@ type DatapointClient interface {
// - WithTemporalExtent: specifies the time or data point interval for which data should be loaded. (Required)
// - WithSpatialExtent: specifies the spatial extent for which data should be loaded. (Optional)
// - WithSkipData: can be used to skip the actual data when loading datapoints, only returning required datapoint fields. (Optional)
// - WithCursor: starts the query after a cursor returned by a previous page. (Optional)
// - WithLimit: limits the total number of datapoints returned. Defaults to unlimited.
//
// The datapoints are lazily loaded and returned as a sequence of bytes.
// Pagination is handled automatically under the hood. The datapoints are lazily loaded and returned as a sequence of bytes.
// The output sequence can be transformed into a proto.Message using CollectAs / As.
//
// Example usage:
Expand All @@ -56,6 +58,17 @@ type DatapointClient interface {
// Documentation: https://docs.tilebox.com/datasets/query/querying-data
Query(ctx context.Context, datasetID uuid.UUID, options ...QueryOption) iter.Seq2[[]byte, error]

// QueryPage returns a single page of datapoints from one or more collections of the same dataset.
//
// Options:
// - WithCollections / WithCollectionIDs: specifies the collections to query. If no collections are specified, all collections of the dataset will be queried. (Optional)
// - WithTemporalExtent: specifies the time or data point interval for which data should be loaded. (Required)
// - WithSpatialExtent: specifies the spatial extent for which data should be loaded. (Optional)
// - WithSkipData: can be used to skip the actual data when loading datapoints, only returning required datapoint fields. (Optional)
// - WithCursor: starts the query after a cursor returned by a previous page. (Optional)
// - WithLimit: limits the number of datapoints returned in this page. Defaults to the server default.
QueryPage(ctx context.Context, datasetID uuid.UUID, options ...QueryOption) (*DatapointPage, error)

// QueryInto queries datapoints from one or more collections of the same dataset into a slice of datapoints of a
// compatible proto.Message type.
//
Expand Down Expand Up @@ -90,6 +103,27 @@ type DatapointClient interface {
DeleteIDs(ctx context.Context, collectionID uuid.UUID, datapointIDs []uuid.UUID) (int64, error)
}

// DatapointPage is a single page of datapoints returned by a manual page query.
type DatapointPage struct {
// Datapoints is the raw protobuf-encoded datapoint data in this page.
Datapoints [][]byte
// NextCursor can be used to request the next page. Nil means there are no more pages.
NextCursor *Cursor
}

// Cursor identifies where to continue a paginated datapoint query.
type Cursor = query.Cursor

// NewCursor creates a cursor that starts after the datapoint with the given ID.
func NewCursor(startingAfter uuid.UUID) *Cursor {
return query.NewCursor(startingAfter)
}

// ParseCursor parses a cursor string returned by Cursor.String.
func ParseCursor(value string) (*Cursor, error) {
return query.ParseCursor(value)
}

var _ DatapointClient = &datapointClient{}

type datapointClient struct {
Expand All @@ -103,6 +137,8 @@ type queryOptions struct {
spatialExtent query.SpatialExtent
skipData bool
collectionIDs []uuid.UUID
cursor *Cursor
limit int64
}

// QueryOption is an interface for configuring a Query request.
Expand Down Expand Up @@ -156,6 +192,27 @@ func WithSkipData() QueryOption {
}
}

// WithCursor starts the query after a cursor returned by a previous page.
func WithCursor(cursor *Cursor) QueryOption {
return func(cfg *queryOptions) {
cfg.cursor = cursor
}
}

// WithLimit limits the number of query results returned.
//
// For auto-paginated query methods, the limit applies to the total number of datapoints yielded. For page query
// methods, the limit applies to the single page returned.
//
// Defaults to unlimited.
func WithLimit(limit int64) QueryOption {
return func(cfg *queryOptions) {
if limit > 0 {
cfg.limit = limit
}
}
}

func (d datapointClient) GetInto(ctx context.Context, datasetID uuid.UUID, datapointID uuid.UUID, datapoint proto.Message, options ...QueryOption) error {
cfg := &queryOptions{
skipData: false,
Expand Down Expand Up @@ -185,60 +242,86 @@ func (d datapointClient) Query(ctx context.Context, datasetID uuid.UUID, options
option(cfg)
}

if cfg.temporalExtent == nil {
return func(yield func([]byte, error) bool) {
// right now we return an error, in the future we might want to support queries without a temporal extent
yield(nil, errors.New("temporal extent is required"))
}
}

return func(yield func([]byte, error) bool) {
var page *tileboxv1.Pagination // nil for the first request

// we already validated that temporalExtent is not nil
timeInterval := cfg.temporalExtent.ToProtoTimeInterval()
datapointInterval := cfg.temporalExtent.ToProtoIDInterval()

if timeInterval == nil && datapointInterval == nil {
yield(nil, errors.New("invalid temporal extent"))
return
}

filters := datasetsv1.QueryFilters_builder{
TimeInterval: timeInterval,
DatapointInterval: datapointInterval,
}.Build()
cursor := cfg.cursor
remaining := cfg.limit

if cfg.spatialExtent != nil {
spatialExtent, err := cfg.spatialExtent.ToProtoSpatialFilter()
if err != nil {
yield(nil, err)
return
for {
pageOpts := *cfg
pageOpts.cursor = cursor
if cfg.limit > 0 {
if remaining == 0 {
break
}
pageOpts.limit = remaining
}
filters.SetSpatialExtent(spatialExtent)
}

for {
datapointsMessage, err := d.dataAccessService.Query(ctx, datasetID, cfg.collectionIDs, filters, page, cfg.skipData)
datapointsPage, err := d.queryPage(ctx, datasetID, &pageOpts)
if err != nil {
yield(nil, err)
return
}

for _, data := range datapointsMessage.GetData().GetValue() {
for _, data := range datapointsPage.Datapoints {
if cfg.limit > 0 && remaining == 0 {
return
}

if !yield(data, nil) {
return
}
if cfg.limit > 0 {
remaining--
}
}

page = datapointsMessage.GetNextPage()
if page == nil {
cursor = datapointsPage.NextCursor
if cursor == nil || (cfg.limit > 0 && remaining == 0) {
break
}
}
}
}

func (d datapointClient) QueryPage(ctx context.Context, datasetID uuid.UUID, options ...QueryOption) (*DatapointPage, error) {
cfg := &queryOptions{
skipData: false,
}
for _, option := range options {
option(cfg)
}
return d.queryPage(ctx, datasetID, cfg)
}

func paginationFromOptions(limit int64, cursor *Cursor) *tileboxv1.Pagination {
if limit <= 0 && cursor == nil {
return nil
}

var startingAfter *tileboxv1.ID
if cursor != nil {
startingAfter = tileboxv1.NewUUID(cursor.StartingAfter())
}

if limit <= 0 {
return tileboxv1.Pagination_builder{
StartingAfter: startingAfter,
}.Build()
}

return tileboxv1.Pagination_builder{
Limit: &limit,
StartingAfter: startingAfter,
}.Build()
}

func cursorFromPagination(page *tileboxv1.Pagination) *Cursor {
if page == nil || page.GetStartingAfter() == nil {
return nil
}
return NewCursor(page.GetStartingAfter().AsUUID())
}

func (d datapointClient) QueryInto(ctx context.Context, datasetID uuid.UUID, datapoints any, options ...QueryOption) error {
err := validateDatapoints(datapoints)
if err != nil {
Expand Down Expand Up @@ -387,6 +470,42 @@ func (d datapointClient) DeleteIDs(ctx context.Context, collectionID uuid.UUID,
return numDeleted, nil
}

func (d datapointClient) queryPage(ctx context.Context, datasetID uuid.UUID, cfg *queryOptions) (*DatapointPage, error) {
if cfg.temporalExtent == nil {
// right now we return an error, in the future we might want to support queries without a temporal extent
return nil, errors.New("temporal extent is required")
}

timeInterval := cfg.temporalExtent.ToProtoTimeInterval()
datapointInterval := cfg.temporalExtent.ToProtoIDInterval()
if timeInterval == nil && datapointInterval == nil {
return nil, errors.New("invalid temporal extent")
}

filters := datasetsv1.QueryFilters_builder{
TimeInterval: timeInterval,
DatapointInterval: datapointInterval,
}.Build()

if cfg.spatialExtent != nil {
spatialExtent, err := cfg.spatialExtent.ToProtoSpatialFilter()
if err != nil {
return nil, err
}
filters.SetSpatialExtent(spatialExtent)
}

datapointsMessage, err := d.dataAccessService.Query(ctx, datasetID, cfg.collectionIDs, filters, paginationFromOptions(cfg.limit, cfg.cursor), cfg.skipData)
if err != nil {
return nil, err
}

return &DatapointPage{
Datapoints: datapointsMessage.GetData().GetValue(),
NextCursor: cursorFromPagination(datapointsMessage.GetNextPage()),
}, nil
}

// CollectAs converts a sequence of bytes into a slice of proto.Message.
func CollectAs[T proto.Message](seq iter.Seq2[[]byte, error]) ([]T, error) {
return Collect(As[T](seq))
Expand Down
Loading