From d21fd724feb3c038c956c98e43f4a50ab34fc191 Mon Sep 17 00:00:00 2001 From: Lukas Bindreiter Date: Fri, 22 May 2026 16:42:12 +0200 Subject: [PATCH 1/3] Add dynamic datapoint parsing --- CHANGELOG.md | 7 + datasets/v1/datapoints.go | 189 +++++++++++--- datasets/v1/datapoints_test.go | 90 ++++++- datasets/v1/datasets.go | 110 +++++++- datasets/v1/datasets_test.go | 6 +- datasets/v1/dynamic.go | 319 ++++++++++++++++++++++++ datasets/v1/dynamic_test.go | 124 +++++++++ datasets/v1/field/field.go | 23 ++ datasets/v1/service.go | 26 +- examples/datasets/query-dynamic/main.go | 78 ++++++ query/cursor.go | 45 ++++ workflows/v1/automations.go | 253 +++++++++++++++++++ workflows/v1/automations_test.go | 45 ++++ workflows/v1/client.go | 12 +- workflows/v1/job/options.go | 29 +-- 15 files changed, 1260 insertions(+), 96 deletions(-) create mode 100644 datasets/v1/dynamic.go create mode 100644 datasets/v1/dynamic_test.go create mode 100644 examples/datasets/query-dynamic/main.go create mode 100644 query/cursor.go create mode 100644 workflows/v1/automations.go create mode 100644 workflows/v1/automations_test.go diff --git a/CHANGELOG.md b/CHANGELOG.md index ce0c06b..dcd27b8 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,13 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] +### Added + +- `datasets`: Added `client.Datasets.Create` and `client.Datasets.Update` with optional summary and markdown description options. +- `datasets`: Added datapoint query pagination and dynamic datapoint decoding helpers for converting raw protobuf datapoints to maps. +- `examples`: Added a dynamic dataset query example that decodes datapoints without generated Go types. +- `workflows`: Added `client.Automations` to list and inspect automations and storage locations. + ## [0.6.0] - 2026-05-20 ### Added diff --git a/datasets/v1/datapoints.go b/datasets/v1/datapoints.go index 7f74318..d6a86d0 100644 --- a/datasets/v1/datapoints.go +++ b/datasets/v1/datapoints.go @@ -35,8 +35,10 @@ type DatapointClient interface { // - WithTemporalExtent: specifies the time or data point interval for which data should be loaded. (Required) // - WithSpatialExtent: specifies the spatial extent for which data should be loaded. (Optional) // - WithSkipData: can be used to skip the actual data when loading datapoints, only returning required datapoint fields. (Optional) + // - WithCursor: starts the query after a cursor returned by a previous page. (Optional) + // - WithLimit: limits the total number of datapoints returned. Defaults to unlimited. // - // The datapoints are lazily loaded and returned as a sequence of bytes. + // Pagination is handled automatically under the hood. The datapoints are lazily loaded and returned as a sequence of bytes. // The output sequence can be transformed into a proto.Message using CollectAs / As. // // Example usage: @@ -56,6 +58,17 @@ type DatapointClient interface { // Documentation: https://docs.tilebox.com/datasets/query/querying-data Query(ctx context.Context, datasetID uuid.UUID, options ...QueryOption) iter.Seq2[[]byte, error] + // QueryPage returns a single page of datapoints from one or more collections of the same dataset. + // + // Options: + // - WithCollections / WithCollectionIDs: specifies the collections to query. If no collections are specified, all collections of the dataset will be queried. (Optional) + // - WithTemporalExtent: specifies the time or data point interval for which data should be loaded. (Required) + // - WithSpatialExtent: specifies the spatial extent for which data should be loaded. (Optional) + // - WithSkipData: can be used to skip the actual data when loading datapoints, only returning required datapoint fields. (Optional) + // - WithCursor: starts the query after a cursor returned by a previous page. (Optional) + // - WithLimit: limits the number of datapoints returned in this page. Defaults to the server default. + QueryPage(ctx context.Context, datasetID uuid.UUID, options ...QueryOption) (*DatapointPage, error) + // QueryInto queries datapoints from one or more collections of the same dataset into a slice of datapoints of a // compatible proto.Message type. // @@ -90,6 +103,27 @@ type DatapointClient interface { DeleteIDs(ctx context.Context, collectionID uuid.UUID, datapointIDs []uuid.UUID) (int64, error) } +// DatapointPage is a single page of datapoints returned by a manual page query. +type DatapointPage struct { + // Datapoints is the raw protobuf-encoded datapoint data in this page. + Datapoints [][]byte + // NextCursor can be used to request the next page. Nil means there are no more pages. + NextCursor *Cursor +} + +// Cursor identifies where to continue a paginated datapoint query. +type Cursor = query.Cursor + +// NewCursor creates a cursor that starts after the datapoint with the given ID. +func NewCursor(startingAfter uuid.UUID) *Cursor { + return query.NewCursor(startingAfter) +} + +// ParseCursor parses a cursor string returned by Cursor.String. +func ParseCursor(value string) (*Cursor, error) { + return query.ParseCursor(value) +} + var _ DatapointClient = &datapointClient{} type datapointClient struct { @@ -103,6 +137,8 @@ type queryOptions struct { spatialExtent query.SpatialExtent skipData bool collectionIDs []uuid.UUID + cursor *Cursor + limit int64 } // QueryOption is an interface for configuring a Query request. @@ -156,6 +192,27 @@ func WithSkipData() QueryOption { } } +// WithCursor starts the query after a cursor returned by a previous page. +func WithCursor(cursor *Cursor) QueryOption { + return func(cfg *queryOptions) { + cfg.cursor = cursor + } +} + +// WithLimit limits the number of query results returned. +// +// For auto-paginated query methods, the limit applies to the total number of datapoints yielded. For page query +// methods, the limit applies to the single page returned. +// +// Defaults to unlimited. +func WithLimit(limit int64) QueryOption { + return func(cfg *queryOptions) { + if limit > 0 { + cfg.limit = limit + } + } +} + func (d datapointClient) GetInto(ctx context.Context, datasetID uuid.UUID, datapointID uuid.UUID, datapoint proto.Message, options ...QueryOption) error { cfg := &queryOptions{ skipData: false, @@ -185,60 +242,122 @@ func (d datapointClient) Query(ctx context.Context, datasetID uuid.UUID, options option(cfg) } - if cfg.temporalExtent == nil { - return func(yield func([]byte, error) bool) { - // right now we return an error, in the future we might want to support queries without a temporal extent - yield(nil, errors.New("temporal extent is required")) - } - } - return func(yield func([]byte, error) bool) { - var page *tileboxv1.Pagination // nil for the first request - - // we already validated that temporalExtent is not nil - timeInterval := cfg.temporalExtent.ToProtoTimeInterval() - datapointInterval := cfg.temporalExtent.ToProtoIDInterval() - - if timeInterval == nil && datapointInterval == nil { - yield(nil, errors.New("invalid temporal extent")) - return - } - - filters := datasetsv1.QueryFilters_builder{ - TimeInterval: timeInterval, - DatapointInterval: datapointInterval, - }.Build() + cursor := cfg.cursor + remaining := cfg.limit - if cfg.spatialExtent != nil { - spatialExtent, err := cfg.spatialExtent.ToProtoSpatialFilter() - if err != nil { - yield(nil, err) - return + for { + pageOpts := *cfg + pageOpts.cursor = cursor + if cfg.limit > 0 { + if remaining == 0 { + break + } + pageOpts.limit = remaining } - filters.SetSpatialExtent(spatialExtent) - } - for { - datapointsMessage, err := d.dataAccessService.Query(ctx, datasetID, cfg.collectionIDs, filters, page, cfg.skipData) + datapointsPage, err := d.queryPage(ctx, datasetID, &pageOpts) if err != nil { yield(nil, err) return } - for _, data := range datapointsMessage.GetData().GetValue() { + for _, data := range datapointsPage.Datapoints { + if cfg.limit > 0 && remaining == 0 { + return + } + if !yield(data, nil) { return } + if cfg.limit > 0 { + remaining-- + } } - page = datapointsMessage.GetNextPage() - if page == nil { + cursor = datapointsPage.NextCursor + if cursor == nil || (cfg.limit > 0 && remaining == 0) { break } } } } +func (d datapointClient) QueryPage(ctx context.Context, datasetID uuid.UUID, options ...QueryOption) (*DatapointPage, error) { + cfg := &queryOptions{ + skipData: false, + } + for _, option := range options { + option(cfg) + } + return d.queryPage(ctx, datasetID, cfg) +} + +func (d datapointClient) queryPage(ctx context.Context, datasetID uuid.UUID, cfg *queryOptions) (*DatapointPage, error) { + if cfg.temporalExtent == nil { + // right now we return an error, in the future we might want to support queries without a temporal extent + return nil, errors.New("temporal extent is required") + } + + timeInterval := cfg.temporalExtent.ToProtoTimeInterval() + datapointInterval := cfg.temporalExtent.ToProtoIDInterval() + if timeInterval == nil && datapointInterval == nil { + return nil, errors.New("invalid temporal extent") + } + + filters := datasetsv1.QueryFilters_builder{ + TimeInterval: timeInterval, + DatapointInterval: datapointInterval, + }.Build() + + if cfg.spatialExtent != nil { + spatialExtent, err := cfg.spatialExtent.ToProtoSpatialFilter() + if err != nil { + return nil, err + } + filters.SetSpatialExtent(spatialExtent) + } + + datapointsMessage, err := d.dataAccessService.Query(ctx, datasetID, cfg.collectionIDs, filters, paginationFromOptions(cfg.limit, cfg.cursor), cfg.skipData) + if err != nil { + return nil, err + } + + return &DatapointPage{ + Datapoints: datapointsMessage.GetData().GetValue(), + NextCursor: cursorFromPagination(datapointsMessage.GetNextPage()), + }, nil +} + +func paginationFromOptions(limit int64, cursor *Cursor) *tileboxv1.Pagination { + if limit <= 0 && cursor == nil { + return nil + } + + var startingAfter *tileboxv1.ID + if cursor != nil { + startingAfter = tileboxv1.NewUUID(cursor.StartingAfter()) + } + + if limit <= 0 { + return tileboxv1.Pagination_builder{ + StartingAfter: startingAfter, + }.Build() + } + + return tileboxv1.Pagination_builder{ + Limit: &limit, + StartingAfter: startingAfter, + }.Build() +} + +func cursorFromPagination(page *tileboxv1.Pagination) *Cursor { + if page == nil || page.GetStartingAfter() == nil { + return nil + } + return NewCursor(page.GetStartingAfter().AsUUID()) +} + func (d datapointClient) QueryInto(ctx context.Context, datasetID uuid.UUID, datapoints any, options ...QueryOption) error { err := validateDatapoints(datapoints) if err != nil { diff --git a/datasets/v1/datapoints_test.go b/datasets/v1/datapoints_test.go index ae3e0a3..dcae9bc 100644 --- a/datasets/v1/datapoints_test.go +++ b/datasets/v1/datapoints_test.go @@ -40,9 +40,16 @@ type mockDataAccessService struct { n int } -func (m mockDataAccessService) Query(_ context.Context, _ uuid.UUID, _ []uuid.UUID, _ *datasetsv1.QueryFilters, _ *tileboxv1.Pagination, _ bool) (*datasetsv1.QueryResultPage, error) { - data := make([][]byte, m.n) - for i := range m.n { +var mockNextCursorID = uuid.MustParse("01994da4-255e-740d-9df7-b8c1aa41c75b") + +func (m mockDataAccessService) Query(_ context.Context, _ uuid.UUID, _ []uuid.UUID, _ *datasetsv1.QueryFilters, page *tileboxv1.Pagination, _ bool) (*datasetsv1.QueryResultPage, error) { + count := m.n + if page.GetLimit() > 0 { + count = min(m.n, int(page.GetLimit())) + } + + data := make([][]byte, count) + for i := range count { datapoint := examplesv1.Sentinel2Msi_builder{ GranuleName: pointer(uuid.New().String()), ProcessingLevel: pointer(datasetsv1.ProcessingLevel_PROCESSING_LEVEL_L1), @@ -58,11 +65,16 @@ func (m mockDataAccessService) Query(_ context.Context, _ uuid.UUID, _ []uuid.UU data[i] = message } + var nextPage *tileboxv1.Pagination + if page.GetLimit() > 0 && int(page.GetLimit()) < m.n { + nextPage = tileboxv1.Pagination_builder{StartingAfter: tileboxv1.NewUUID(mockNextCursorID)}.Build() + } + return datasetsv1.QueryResultPage_builder{ Data: datasetsv1.RepeatedAny_builder{ Value: data, }.Build(), - NextPage: nil, + NextPage: nextPage, }.Build(), nil } @@ -70,6 +82,7 @@ func Test_QueryOptions(t *testing.T) { now := time.Now() collectionID1 := uuid.New() collectionID2 := uuid.New() + cursor := NewCursor(uuid.New()) colorado := orb.Polygon{ {{-109.05, 37.09}, {-102.06, 37.09}, {-102.06, 41.59}, {-109.05, 41.59}, {-109.05, 37.09}}, } @@ -142,6 +155,24 @@ func Test_QueryOptions(t *testing.T) { skipData: true, }, }, + { + name: "with cursor", + options: []QueryOption{ + WithCursor(cursor), + }, + want: queryOptions{ + cursor: cursor, + }, + }, + { + name: "with limit", + options: []QueryOption{ + WithLimit(10), + }, + want: queryOptions{ + limit: 10, + }, + }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { @@ -154,6 +185,18 @@ func Test_QueryOptions(t *testing.T) { } } +func TestCursor(t *testing.T) { + id := uuid.New() + cursor := NewCursor(id) + + assert.Equal(t, id, cursor.StartingAfter()) + assert.Equal(t, id.String(), cursor.String()) + + parsed, err := ParseCursor(cursor.String()) + require.NoError(t, err) + assert.Equal(t, id, parsed.StartingAfter()) +} + func Test_datapointClient_GetInto(t *testing.T) { ctx := context.Background() client := NewReplayClient(t, "datapoint_getinto") @@ -262,6 +305,45 @@ func Test_datapointClient_QueryInto(t *testing.T) { } } +func Test_datapointClient_Query_WithLimit(t *testing.T) { + ctx := context.Background() + client := NewDatapointClient(10) + + datasetID := uuid.New() + timeInterval := query.NewTimeInterval(time.Now(), time.Now().Add(time.Hour)) + + datapoints, err := Collect(client.Query(ctx, datasetID, WithTemporalExtent(timeInterval), WithLimit(2))) + require.NoError(t, err) + + assert.Len(t, datapoints, 2) +} + +func Test_datapointClient_QueryPage(t *testing.T) { + ctx := context.Background() + client := NewDatapointClient(10) + + datasetID := uuid.New() + cursorID := uuid.New() + timeInterval := query.NewTimeInterval(time.Now(), time.Now().Add(time.Hour)) + + page, err := client.QueryPage(ctx, datasetID, WithTemporalExtent(timeInterval), WithCursor(NewCursor(cursorID)), WithLimit(3)) + require.NoError(t, err) + + assert.Len(t, page.Datapoints, 3) + require.NotNil(t, page.NextCursor) + assert.Equal(t, mockNextCursorID, page.NextCursor.StartingAfter()) +} + +func Test_paginationFromOptions(t *testing.T) { + cursorID := uuid.New() + + page := paginationFromOptions(10, NewCursor(cursorID)) + + require.NotNil(t, page) + assert.Equal(t, int64(10), page.GetLimit()) + assert.Equal(t, cursorID, page.GetStartingAfter().AsUUID()) +} + // resultQueryInto is used to avoid the compiler optimizing away the benchmark output var resultQueryInto []*examplesv1.Sentinel2Msi diff --git a/datasets/v1/datasets.go b/datasets/v1/datasets.go index feb8af6..a0a7388 100644 --- a/datasets/v1/datasets.go +++ b/datasets/v1/datasets.go @@ -24,7 +24,9 @@ type Dataset struct { Type *datasetsv1.AnnotatedType // Name is the name of the dataset. Name string - // Description is a short description of the dataset. + // Summary is a short description of the dataset. + Summary string + // Description is the dataset's larger markdown documentation. Description string // Slug is the unique slug of the dataset. Slug string @@ -40,16 +42,61 @@ func (d Dataset) String() string { case datasetsv1.DatasetKind_DATASET_KIND_UNSPECIFIED: } - return fmt.Sprintf("%s [%s Dataset]: %s", d.Name, kind, d.Description) + return fmt.Sprintf("%s [%s Dataset]: %s", d.Name, kind, d.Summary) +} + +type datasetOptions struct { + summary *string + description *string +} + +// DatasetOption configures dataset create and update requests. +type DatasetOption func(*datasetOptions) + +// WithSummary sets a short dataset description. +func WithSummary(summary string) DatasetOption { + return func(options *datasetOptions) { + options.summary = &summary + } +} + +// WithDescription sets the dataset's larger markdown documentation. +func WithDescription(description string) DatasetOption { + return func(options *datasetOptions) { + options.description = &description + } +} + +func optionalString(value *string) string { + if value == nil { + return "" + } + return *value +} + +func applyDatasetOptions(options ...DatasetOption) datasetOptions { + var applied datasetOptions + for _, option := range options { + option(&applied) + } + return applied } type DatasetClient interface { + // Create creates a new dataset with the given name, codeName, schema kind, and custom fields. + // + // Documentation: https://docs.tilebox.com/datasets/concepts/datasets#creating-a-dataset + Create(ctx context.Context, kind DatasetKind, codeName string, name string, fields []Field, options ...DatasetOption) (*Dataset, error) + + // Update updates an existing dataset with the given id, name, codeName, schema kind, and custom fields. + Update(ctx context.Context, id uuid.UUID, kind DatasetKind, codeName string, name string, fields []Field, options ...DatasetOption) (*Dataset, error) + // CreateOrUpdate creates a new dataset with the given name and codeName, or updates the schema of an existing dataset by adding new fields. // If the dataset is empty, the schema can also be altered in a backwards incompatible way, by removing fields or changing their type. // Otherwise, any attempt to such an update operation will result in an error. // // Documentation: https://docs.tilebox.com/datasets/concepts/datasets#creating-a-dataset - CreateOrUpdate(ctx context.Context, kind DatasetKind, codeName string, name string, fields []Field) (*Dataset, error) + CreateOrUpdate(ctx context.Context, kind DatasetKind, codeName string, name string, fields []Field, options ...DatasetOption) (*Dataset, error) // Get returns a dataset by its full slug, e.g. "open_data.copernicus.sentinel1_sar". Get(ctx context.Context, slug string) (*Dataset, error) @@ -145,32 +192,68 @@ type Field interface { Descriptor() *field.Descriptor } -// CreateOrUpdate creates a new dataset with the given name and codeName, or updates the schema of an existing dataset by adding new fields. -// If the dataset is empty, the schema can also be altered in a backwards incompatible way, by removing fields or changing their type. -// Otherwise, any attempt to such an update operation will result in an error. -func (d datasetClient) CreateOrUpdate(ctx context.Context, kind DatasetKind, codeName string, name string, fields []Field) (*Dataset, error) { +func datasetType(kind DatasetKind, fields []Field) (*datasetsv1.DatasetType, error) { // make sure our dataset type contains all the fixed fields for the given kind requiredFields, ok := requiredFieldsPerDatasetKind[kind] if !ok { return nil, fmt.Errorf("unknown dataset kind: %d", kind) } - datasetFields := make([]*datasetsv1.Field, 0, len(requiredFields)) + datasetFields := make([]*datasetsv1.Field, 0, len(requiredFields)+len(fields)) datasetFields = append(datasetFields, requiredFields...) for _, f := range fields { datasetFields = append(datasetFields, f.Descriptor().ToProto()) } - datasetType := datasetsv1.DatasetType_builder{ + return datasetsv1.DatasetType_builder{ Kind: datasetsv1.DatasetKind(kind), Fields: datasetFields, - }.Build() + }.Build(), nil +} + +// Create creates a new dataset with the given name, codeName, schema kind, and custom fields. +func (d datasetClient) Create(ctx context.Context, kind DatasetKind, codeName string, name string, fields []Field, options ...DatasetOption) (*Dataset, error) { + datasetType, err := datasetType(kind, fields) + if err != nil { + return nil, err + } + + response, err := d.service.CreateDataset(ctx, codeName, name, datasetType, applyDatasetOptions(options...)) + if err != nil { + return nil, fmt.Errorf("failed to create dataset: %w", err) + } + return protoToDataset(response), nil +} + +// Update updates an existing dataset with the given id, name, codeName, schema kind, and custom fields. +func (d datasetClient) Update(ctx context.Context, id uuid.UUID, kind DatasetKind, codeName string, name string, fields []Field, options ...DatasetOption) (*Dataset, error) { + datasetType, err := datasetType(kind, fields) + if err != nil { + return nil, err + } + + response, err := d.service.UpdateDataset(ctx, id, codeName, name, datasetType, applyDatasetOptions(options...)) + if err != nil { + return nil, fmt.Errorf("failed to update dataset: %w", err) + } + return protoToDataset(response), nil +} + +// CreateOrUpdate creates a new dataset with the given name and codeName, or updates the schema of an existing dataset by adding new fields. +// If the dataset is empty, the schema can also be altered in a backwards incompatible way, by removing fields or changing their type. +// Otherwise, any attempt to such an update operation will result in an error. +func (d datasetClient) CreateOrUpdate(ctx context.Context, kind DatasetKind, codeName string, name string, fields []Field, options ...DatasetOption) (*Dataset, error) { + datasetType, err := datasetType(kind, fields) + if err != nil { + return nil, err + } + datasetOptions := applyDatasetOptions(options...) // check whether the dataset already exists, in which case we update it existingDataset, err := d.service.GetDataset(ctx, codeName) if err != nil { if connect.CodeOf(err) == connect.CodeNotFound { - response, err := d.service.CreateDataset(ctx, codeName, name, datasetType) + response, err := d.service.CreateDataset(ctx, codeName, name, datasetType, datasetOptions) if err != nil { return nil, fmt.Errorf("failed to create dataset: %w", err) } @@ -180,7 +263,7 @@ func (d datasetClient) CreateOrUpdate(ctx context.Context, kind DatasetKind, cod } // we found an existing dataset, so let's update it - response, err := d.service.UpdateDataset(ctx, existingDataset.GetId().AsUUID(), codeName, name, datasetType) + response, err := d.service.UpdateDataset(ctx, existingDataset.GetId().AsUUID(), codeName, name, datasetType, datasetOptions) if err != nil { return nil, fmt.Errorf("failed to update dataset: %w", err) } @@ -216,7 +299,8 @@ func protoToDataset(d *datasetsv1.Dataset) *Dataset { ID: d.GetId().AsUUID(), Type: d.GetType(), Name: d.GetName(), - Description: d.GetSummary(), + Summary: d.GetSummary(), + Description: d.GetDescription(), Slug: d.GetSlug(), } } diff --git a/datasets/v1/datasets_test.go b/datasets/v1/datasets_test.go index d884d8c..34bd77f 100644 --- a/datasets/v1/datasets_test.go +++ b/datasets/v1/datasets_test.go @@ -78,8 +78,8 @@ func TestDataset_String(t *testing.T) { rapid.Just(datasetsv1.DatasetKind_DATASET_KIND_SPATIOTEMPORAL), ).Draw(t, "Kind"), }.Build(), - Name: rapid.String().Draw(t, "Name"), - Description: rapid.String().Draw(t, "Description"), + Name: rapid.String().Draw(t, "Name"), + Summary: rapid.String().Draw(t, "Summary"), } }) @@ -89,7 +89,7 @@ func TestDataset_String(t *testing.T) { assert.Contains(t, got, input.Name) assert.NotContains(t, got, input.ID.String()) - assert.Contains(t, got, input.Description) + assert.Contains(t, got, input.Summary) if input.Type.GetKind() == datasetsv1.DatasetKind_DATASET_KIND_TEMPORAL { assert.Contains(t, got, "Temporal") diff --git a/datasets/v1/dynamic.go b/datasets/v1/dynamic.go new file mode 100644 index 0000000..bed5e6b --- /dev/null +++ b/datasets/v1/dynamic.go @@ -0,0 +1,319 @@ +package datasets + +import ( + "encoding/base64" + "encoding/json" + "errors" + "fmt" + "strings" + "time" + "unicode" + + "github.com/google/uuid" + datasetsv1 "github.com/tilebox/tilebox-go/protogen/datasets/v1" + "google.golang.org/protobuf/encoding/protojson" + "google.golang.org/protobuf/proto" + "google.golang.org/protobuf/reflect/protodesc" + "google.golang.org/protobuf/reflect/protoreflect" + "google.golang.org/protobuf/reflect/protoregistry" + "google.golang.org/protobuf/types/descriptorpb" + "google.golang.org/protobuf/types/dynamicpb" + "google.golang.org/protobuf/types/known/durationpb" + "google.golang.org/protobuf/types/known/timestamppb" +) + +// DatapointDescriptor contains the resolved protobuf descriptor needed to decode raw datapoint bytes. +type DatapointDescriptor struct { + // MessageDescriptor is the protobuf message descriptor for datapoints in a dataset. + MessageDescriptor protoreflect.MessageDescriptor + resolver *dynamicpb.Types +} + +// NewDatapointDescriptor creates a reusable datapoint descriptor from a loaded dataset. +func NewDatapointDescriptor(dataset *Dataset) (*DatapointDescriptor, error) { + if dataset == nil || dataset.Type == nil || dataset.Type.GetDescriptorSet() == nil || len(dataset.Type.GetDescriptorSet().GetFile()) == 0 { + return nil, errors.New("dataset does not include a protobuf descriptor") + } + + descriptorSet := descriptorSetWithWellKnownTypes(dataset.Type.GetDescriptorSet()) + files, err := protodesc.NewFiles(descriptorSet) + if err != nil { + return nil, fmt.Errorf("failed to build dataset protobuf descriptors: %w", err) + } + + messageFullName, err := datasetMessageFullName(dataset.Type.GetDescriptorSet()) + if err != nil { + return nil, err + } + descriptor, err := files.FindDescriptorByName(messageFullName) + if err != nil { + return nil, fmt.Errorf("failed to find dataset protobuf message descriptor %q: %w", messageFullName, err) + } + messageDescriptor, ok := descriptor.(protoreflect.MessageDescriptor) + if !ok { + return nil, fmt.Errorf("dataset protobuf descriptor %q is not a message descriptor", messageFullName) + } + + return &DatapointDescriptor{ + MessageDescriptor: messageDescriptor, + resolver: dynamicpb.NewTypes(files), + }, nil +} + +// DatapointDecoder decodes raw protobuf datapoints into JSON-like maps. +// +// The options mirror protojson.UnmarshalOptions. +type DatapointDecoder struct { + // If AllowPartial is set, input for messages that will result in missing required fields will not return an error. + AllowPartial bool + // If DiscardUnknown is set, unknown fields are ignored. + DiscardUnknown bool + // Resolver is used for looking up message and extension types. If nil, the resolver from the DatapointDescriptor is used. + Resolver interface { + protoregistry.MessageTypeResolver + protoregistry.ExtensionTypeResolver + } + // RecursionLimit limits how deeply messages may be nested. If zero, a default limit is applied. + RecursionLimit int +} + +// UnmarshalDatapoint decodes raw protobuf datapoint bytes into a JSON-like map. +func UnmarshalDatapoint(descriptor *DatapointDescriptor, data []byte) (map[string]any, error) { + return DatapointDecoder{}.Unmarshal(descriptor, data) +} + +// Unmarshal decodes raw protobuf datapoint bytes into a JSON-like map. +func (d DatapointDecoder) Unmarshal(descriptor *DatapointDescriptor, data []byte) (map[string]any, error) { + if descriptor == nil || descriptor.MessageDescriptor == nil { + return nil, errors.New("datapoint descriptor is required") + } + + resolver := d.resolver(descriptor) + message := dynamicpb.NewMessage(descriptor.MessageDescriptor) + err := proto.UnmarshalOptions{ + AllowPartial: d.AllowPartial, + DiscardUnknown: d.DiscardUnknown, + Resolver: resolver, + RecursionLimit: d.RecursionLimit, + }.Unmarshal(data, message) + if err != nil { + return nil, fmt.Errorf("failed to unmarshal datapoint: %w", err) + } + + jsonData, err := protojson.MarshalOptions{ + AllowPartial: d.AllowPartial, + Resolver: resolver, + UseProtoNames: true, + }.Marshal(message) + if err != nil { + return nil, fmt.Errorf("failed to marshal datapoint as JSON: %w", err) + } + + var datapoint map[string]any + if err := json.Unmarshal(jsonData, &datapoint); err != nil { + return nil, fmt.Errorf("failed to unmarshal datapoint JSON: %w", err) + } + if err := convertSpecialTypes(descriptor.MessageDescriptor, datapoint); err != nil { + return nil, err + } + return datapoint, nil +} + +func (d DatapointDecoder) resolver(descriptor *DatapointDescriptor) interface { + protoregistry.MessageTypeResolver + protoregistry.ExtensionTypeResolver +} { + if d.Resolver != nil { + return d.Resolver + } + return descriptor.resolver +} + +func descriptorSetWithWellKnownTypes(datasetDescriptorSet *descriptorpb.FileDescriptorSet) *descriptorpb.FileDescriptorSet { + descriptorSet := &descriptorpb.FileDescriptorSet{} + seen := map[string]bool{} + addFile := func(file *descriptorpb.FileDescriptorProto) { + if file == nil || seen[file.GetName()] { + return + } + seen[file.GetName()] = true + descriptorSet.File = append(descriptorSet.File, file) + } + + addFile(protodesc.ToFileDescriptorProto(durationpb.File_google_protobuf_duration_proto)) + addFile(protodesc.ToFileDescriptorProto(timestamppb.File_google_protobuf_timestamp_proto)) + addFile(protodesc.ToFileDescriptorProto(datasetsv1.File_datasets_v1_well_known_types_proto)) + for _, file := range datasetDescriptorSet.GetFile() { + addFile(file) + } + return descriptorSet +} + +func datasetMessageFullName(descriptorSet *descriptorpb.FileDescriptorSet) (protoreflect.FullName, error) { + for _, file := range descriptorSet.GetFile() { + if len(file.GetMessageType()) == 0 { + continue + } + + messageName := file.GetMessageType()[0].GetName() + if messageName == "" { + return "", errors.New("dataset protobuf descriptor includes a message type without a name") + } + if file.GetPackage() == "" { + return protoreflect.FullName(messageName), nil + } + return protoreflect.FullName(strings.Join([]string{file.GetPackage(), messageName}, ".")), nil + } + return "", errors.New("dataset protobuf descriptor does not include a message type") +} + +func convertSpecialTypes(descriptor protoreflect.MessageDescriptor, datapoint map[string]any) error { + fields := descriptor.Fields() + for i := range fields.Len() { + field := fields.Get(i) + if field.IsMap() || (field.Kind() != protoreflect.MessageKind && field.Kind() != protoreflect.EnumKind) { + continue + } + + name := string(field.Name()) + value, ok := datapoint[name] + if !ok { + continue + } + + converted, err := convertSpecialType(field, value) + if err != nil { + return fmt.Errorf("failed to convert field %q: %w", name, err) + } + datapoint[name] = converted + } + return nil +} + +func convertSpecialType(field protoreflect.FieldDescriptor, value any) (any, error) { + if field.IsList() { + values, ok := value.([]any) + if !ok { + return nil, fmt.Errorf("expected repeated field to be an array, got %T", value) + } + + converted := make([]any, len(values)) + for i, item := range values { + convertedItem, err := convertSpecialScalar(field, item) + if err != nil { + return nil, fmt.Errorf("failed to convert item %d: %w", i, err) + } + converted[i] = convertedItem + } + return converted, nil + } + + return convertSpecialScalar(field, value) +} + +func convertSpecialScalar(field protoreflect.FieldDescriptor, value any) (any, error) { + if field.Kind() == protoreflect.EnumKind { + return convertEnum(field.Enum(), value) + } + + switch field.Message().FullName() { + case "datasets.v1.Geometry": + return convertGeometry(value) + case "datasets.v1.UUID": + return convertUUID(value) + case "google.protobuf.Duration": + return convertDuration(value) + case "google.protobuf.Timestamp": + return convertTimestamp(value) + default: + return value, nil + } +} + +func convertEnum(descriptor protoreflect.EnumDescriptor, value any) (any, error) { + name, ok := value.(string) + if !ok { + return nil, fmt.Errorf("expected enum to be a string, got %T", value) + } + prefix := screamingSnakeCase(string(descriptor.Name())) + "_" + return strings.TrimPrefix(name, prefix), nil +} + +func screamingSnakeCase(value string) string { + var builder strings.Builder + for i, r := range value { + if i > 0 && unicode.IsUpper(r) { + builder.WriteByte('_') + } + builder.WriteRune(unicode.ToUpper(r)) + } + return builder.String() +} + +func convertGeometry(value any) (any, error) { + message, ok := value.(map[string]any) + if !ok { + return nil, fmt.Errorf("expected Geometry to be an object, got %T", value) + } + wkb, err := bytesField(message, "wkb") + if err != nil { + return nil, err + } + return datasetsv1.Geometry_builder{Wkb: wkb}.Build().AsGeometry(), nil +} + +func convertUUID(value any) (any, error) { + message, ok := value.(map[string]any) + if !ok { + return nil, fmt.Errorf("expected UUID to be an object, got %T", value) + } + data, err := bytesField(message, "uuid") + if err != nil { + return nil, err + } + id, err := uuid.FromBytes(data) + if err != nil { + return nil, fmt.Errorf("invalid UUID: %w", err) + } + return id, nil +} + +func convertDuration(value any) (any, error) { + duration, ok := value.(string) + if !ok { + return nil, fmt.Errorf("expected Duration to be a string, got %T", value) + } + parsed, err := time.ParseDuration(duration) + if err != nil { + return nil, fmt.Errorf("invalid Duration: %w", err) + } + return parsed, nil +} + +func convertTimestamp(value any) (any, error) { + timestamp, ok := value.(string) + if !ok { + return nil, fmt.Errorf("expected Timestamp to be a string, got %T", value) + } + parsed, err := time.Parse(time.RFC3339Nano, timestamp) + if err != nil { + return nil, fmt.Errorf("invalid Timestamp: %w", err) + } + return parsed, nil +} + +func bytesField(message map[string]any, name string) ([]byte, error) { + value, ok := message[name] + if !ok { + return nil, fmt.Errorf("missing %q", name) + } + encoded, ok := value.(string) + if !ok { + return nil, fmt.Errorf("expected %q to be a base64 string, got %T", name, value) + } + data, err := base64.StdEncoding.DecodeString(encoded) + if err != nil { + return nil, fmt.Errorf("invalid %q: %w", name, err) + } + return data, nil +} diff --git a/datasets/v1/dynamic_test.go b/datasets/v1/dynamic_test.go new file mode 100644 index 0000000..ee115c7 --- /dev/null +++ b/datasets/v1/dynamic_test.go @@ -0,0 +1,124 @@ +package datasets + +import ( + "testing" + "time" + + "github.com/google/uuid" + "github.com/paulmach/orb" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + datasetsv1 "github.com/tilebox/tilebox-go/protogen/datasets/v1" + examplesv1 "github.com/tilebox/tilebox-go/protogen/examples/v1" + "google.golang.org/protobuf/proto" + "google.golang.org/protobuf/reflect/protodesc" + "google.golang.org/protobuf/reflect/protoreflect" + "google.golang.org/protobuf/types/descriptorpb" + "google.golang.org/protobuf/types/dynamicpb" + "google.golang.org/protobuf/types/known/durationpb" + "google.golang.org/protobuf/types/known/timestamppb" +) + +func TestNewDatapointDescriptor(t *testing.T) { + descriptor, err := NewDatapointDescriptor(exampleDataset()) + + require.NoError(t, err) + require.NotNil(t, descriptor) + assert.Equal(t, "tilebox.v1.Sentinel2Msi", string(descriptor.MessageDescriptor.FullName())) +} + +func TestUnmarshalDatapoint(t *testing.T) { + descriptor, err := NewDatapointDescriptor(exampleDataset()) + require.NoError(t, err) + + datapointID := uuid.MustParse("01941f29-c650-202f-6495-c71dd2118fb1") + geometry := orb.Point{16, 48} + timestamp := time.Date(2025, time.January, 1, 0, 0, 19, 24_000_000, time.UTC) + datapoint := examplesv1.Sentinel2Msi_builder{ + Id: datasetsv1.NewUUID(datapointID), + Time: timestamppb.New(timestamp), + Geometry: datasetsv1.NewGeometry(geometry), + GranuleName: pointer("S2B_MSIL1C_20250101T000019_N0511_R073_T57QWV_20250101T010340.SAFE"), + ProcessingLevel: pointer(datasetsv1.ProcessingLevel_PROCESSING_LEVEL_L1C), + FlightDirection: pointer(datasetsv1.FlightDirection_FLIGHT_DIRECTION_ASCENDING), + AcquisitionMode: pointer(datasetsv1.AcquisitionMode_ACQUISITION_MODE_NOBS), + }.Build() + rawDatapoint, err := proto.Marshal(datapoint) + require.NoError(t, err) + + got, err := UnmarshalDatapoint(descriptor, rawDatapoint) + + require.NoError(t, err) + assert.Equal(t, timestamp, got["time"]) + assert.Equal(t, geometry, got["geometry"]) + assert.Equal(t, "S2B_MSIL1C_20250101T000019_N0511_R073_T57QWV_20250101T010340.SAFE", got["granule_name"]) + assert.Equal(t, "L1C", got["processing_level"]) + assert.Equal(t, "ASCENDING", got["flight_direction"]) + assert.Equal(t, "NOBS", got["acquisition_mode"]) + assert.Equal(t, datapointID, got["id"]) + assert.NotContains(t, got, "cloud_cover") + assert.NotContains(t, got, "file_size") + assert.NotContains(t, got, "updated") +} + +func TestUnmarshalDatapointConvertsDuration(t *testing.T) { + descriptor, err := NewDatapointDescriptor(durationDataset()) + require.NoError(t, err) + + duration := 1500 * time.Millisecond + message := dynamicpb.NewMessage(descriptor.MessageDescriptor) + field := descriptor.MessageDescriptor.Fields().ByName("elapsed") + message.Set(field, protoreflect.ValueOfMessage(durationpb.New(duration).ProtoReflect())) + rawDatapoint, err := proto.Marshal(message) + require.NoError(t, err) + + got, err := UnmarshalDatapoint(descriptor, rawDatapoint) + + require.NoError(t, err) + assert.Equal(t, duration, got["elapsed"]) +} + +func TestDatapointDecoderUnmarshalValidatesDescriptor(t *testing.T) { + _, err := DatapointDecoder{}.Unmarshal(nil, nil) + + require.Error(t, err) + assert.Contains(t, err.Error(), "datapoint descriptor is required") +} + +func exampleDataset() *Dataset { + return &Dataset{ + Type: datasetsv1.AnnotatedType_builder{ + DescriptorSet: &descriptorpb.FileDescriptorSet{File: []*descriptorpb.FileDescriptorProto{ + protodesc.ToFileDescriptorProto(examplesv1.File_tilebox_v1_Sentinel2Msi_proto), + }}, + }.Build(), + } +} + +func durationDataset() *Dataset { + return &Dataset{ + Type: datasetsv1.AnnotatedType_builder{ + DescriptorSet: &descriptorpb.FileDescriptorSet{File: []*descriptorpb.FileDescriptorProto{ + { + Name: proto.String("tilebox/v1/DurationDatapoint.proto"), + Package: proto.String("tilebox.v1"), + Dependency: []string{"google/protobuf/duration.proto"}, + MessageType: []*descriptorpb.DescriptorProto{ + { + Name: proto.String("DurationDatapoint"), + Field: []*descriptorpb.FieldDescriptorProto{ + { + Name: proto.String("elapsed"), + Number: proto.Int32(1), + Label: descriptorpb.FieldDescriptorProto_LABEL_OPTIONAL.Enum(), + Type: descriptorpb.FieldDescriptorProto_TYPE_MESSAGE.Enum(), + TypeName: proto.String(".google.protobuf.Duration"), + }, + }, + }, + }, + }, + }}, + }.Build(), + } +} diff --git a/datasets/v1/field/field.go b/datasets/v1/field/field.go index 2091605..337b2fd 100644 --- a/datasets/v1/field/field.go +++ b/datasets/v1/field/field.go @@ -412,6 +412,29 @@ type Descriptor struct { repeated bool } +// Description can be used to provide more context and details about the data. Optional. +func (d *Descriptor) Description(description string) *Descriptor { + d.description = description + return d +} + +// ExampleValue can be used to provide an example value for documentation purposes. Optional. +func (d *Descriptor) ExampleValue(exampleValue string) *Descriptor { + d.exampleValue = exampleValue + return d +} + +// Repeated indicates that this field is an array. Defaults to false. +func (d *Descriptor) Repeated() *Descriptor { + d.repeated = true + return d +} + +// Descriptor implements the datasets.Field interface by returning itself. +func (d *Descriptor) Descriptor() *Descriptor { + return d +} + func (d *Descriptor) ToProto() *datasetsv1.Field { label := descriptorpb.FieldDescriptorProto_LABEL_OPTIONAL if d.repeated { diff --git a/datasets/v1/service.go b/datasets/v1/service.go index c1c355e..b0846d8 100644 --- a/datasets/v1/service.go +++ b/datasets/v1/service.go @@ -16,8 +16,8 @@ import ( ) type DatasetService interface { - CreateDataset(ctx context.Context, codeName string, name string, datasetType *datasetsv1.DatasetType) (*datasetsv1.Dataset, error) - UpdateDataset(ctx context.Context, id uuid.UUID, codeName string, name string, datasetType *datasetsv1.DatasetType) (*datasetsv1.Dataset, error) + CreateDataset(ctx context.Context, codeName string, name string, datasetType *datasetsv1.DatasetType, options datasetOptions) (*datasetsv1.Dataset, error) + UpdateDataset(ctx context.Context, id uuid.UUID, codeName string, name string, datasetType *datasetsv1.DatasetType, options datasetOptions) (*datasetsv1.Dataset, error) GetDataset(ctx context.Context, slug string) (*datasetsv1.Dataset, error) ListDatasets(ctx context.Context) (*datasetsv1.ListDatasetsResponse, error) } @@ -36,13 +36,15 @@ func newDatasetsService(datasetClient datasetsv1connect.DatasetServiceClient, tr } } -func (s *datasetService) CreateDataset(ctx context.Context, codeName string, name string, datasetType *datasetsv1.DatasetType) (*datasetsv1.Dataset, error) { +func (s *datasetService) CreateDataset(ctx context.Context, codeName string, name string, datasetType *datasetsv1.DatasetType, options datasetOptions) (*datasetsv1.Dataset, error) { return observability.WithSpanResult(ctx, s.tracer, "datasets/create", func(ctx context.Context) (*datasetsv1.Dataset, error) { res, err := s.datasetClient.CreateDataset(ctx, connect.NewRequest( datasetsv1.CreateDatasetRequest_builder{ - CodeName: codeName, - Name: name, - Type: datasetType, + CodeName: codeName, + Name: name, + Type: datasetType, + Summary: optionalString(options.summary), + Description: optionalString(options.description), }.Build(), )) if err != nil { @@ -53,13 +55,15 @@ func (s *datasetService) CreateDataset(ctx context.Context, codeName string, nam }) } -func (s *datasetService) UpdateDataset(ctx context.Context, id uuid.UUID, codeName string, name string, datasetType *datasetsv1.DatasetType) (*datasetsv1.Dataset, error) { +func (s *datasetService) UpdateDataset(ctx context.Context, id uuid.UUID, codeName string, name string, datasetType *datasetsv1.DatasetType, options datasetOptions) (*datasetsv1.Dataset, error) { return observability.WithSpanResult(ctx, s.tracer, "datasets/update", func(ctx context.Context) (*datasetsv1.Dataset, error) { res, err := s.datasetClient.UpdateDataset(ctx, connect.NewRequest(datasetsv1.UpdateDatasetRequest_builder{ - Id: tileboxv1.NewUUID(id), - CodeName: &codeName, - Name: &name, - Type: datasetType, + Id: tileboxv1.NewUUID(id), + CodeName: &codeName, + Name: &name, + Type: datasetType, + Summary: options.summary, + Description: options.description, }.Build())) if err != nil { return nil, fmt.Errorf("failed to update dataset: %w", err) diff --git a/examples/datasets/query-dynamic/main.go b/examples/datasets/query-dynamic/main.go new file mode 100644 index 0000000..c9a5fb7 --- /dev/null +++ b/examples/datasets/query-dynamic/main.go @@ -0,0 +1,78 @@ +package main + +import ( + "context" + "encoding/json" + "log/slog" + "os" + "time" + + "github.com/paulmach/orb" + "github.com/tilebox/tilebox-go/datasets/v1" + "github.com/tilebox/tilebox-go/query" +) + +func main() { + ctx := context.Background() + + // Create a Tilebox Datasets client. + client := datasets.NewClient() + + // Select a dataset. The dataset metadata contains the protobuf descriptor needed to decode datapoints dynamically. + dataset, err := client.Datasets.Get(ctx, "open_data.copernicus.sentinel2_msi") + if err != nil { + slog.ErrorContext(ctx, "Failed to get dataset", slog.Any("error", err)) + return + } + + // Build the descriptor once and reuse it for every datapoint returned by the query. + descriptor, err := datasets.NewDatapointDescriptor(dataset) + if err != nil { + slog.ErrorContext(ctx, "Failed to build datapoint descriptor", slog.Any("error", err)) + return + } + + // Select a collection. + collection, err := client.Collections.Get(ctx, dataset.ID, "S2A_S2MSI1C") + if err != nil { + slog.ErrorContext(ctx, "Failed to get collection", slog.Any("error", err)) + return + } + + // Select a temporal extent. + startDate := time.Date(2025, 4, 2, 0, 0, 0, 0, time.UTC) + endDate := time.Date(2025, 4, 3, 0, 0, 0, 0, time.UTC) + + // Select a spatial extent. + area := orb.Polygon{ // area roughly covering the state of Colorado + {{-109.05, 41.00}, {-102.05, 41.00}, {-102.05, 37.0}, {-109.045, 37.0}, {-109.05, 41.00}}, + } + + // Query raw protobuf datapoint bytes and decode them dynamically into maps. + datapoints := make([]map[string]any, 0) + for rawDatapoint, err := range client.Datapoints.Query(ctx, + dataset.ID, + datasets.WithCollections(collection), + datasets.WithTemporalExtent(query.NewTimeInterval(startDate, endDate)), + datasets.WithSpatialExtent(area), + datasets.WithLimit(5), + ) { + if err != nil { + slog.ErrorContext(ctx, "Failed to query datapoints", slog.Any("error", err)) + return + } + + datapoint, err := datasets.UnmarshalDatapoint(descriptor, rawDatapoint) + if err != nil { + slog.ErrorContext(ctx, "Failed to decode datapoint", slog.Any("error", err)) + return + } + datapoints = append(datapoints, datapoint) + } + + encoder := json.NewEncoder(os.Stdout) + encoder.SetIndent("", " ") + if err := encoder.Encode(datapoints); err != nil { + slog.ErrorContext(ctx, "Failed to write datapoints as JSON", slog.Any("error", err)) + } +} diff --git a/query/cursor.go b/query/cursor.go new file mode 100644 index 0000000..28b2cc6 --- /dev/null +++ b/query/cursor.go @@ -0,0 +1,45 @@ +package query + +import ( + "fmt" + + "github.com/google/uuid" +) + +// Cursor identifies where to continue a paginated query. +// +// Cursors are returned as NextCursor values from page query methods and should only be reused with the same endpoint, +// filters and sort direction that produced them. +type Cursor struct { + startingAfter uuid.UUID +} + +// NewCursor creates a cursor that starts after the entry with the given ID. +func NewCursor(startingAfter uuid.UUID) *Cursor { + return &Cursor{startingAfter: startingAfter} +} + +// ParseCursor parses a cursor string returned by Cursor.String. +func ParseCursor(value string) (*Cursor, error) { + startingAfter, err := uuid.Parse(value) + if err != nil { + return nil, fmt.Errorf("invalid cursor: %w", err) + } + return NewCursor(startingAfter), nil +} + +// String returns the cursor as a string that can be passed to ParseCursor. +func (c *Cursor) String() string { + if c == nil { + return "" + } + return c.startingAfter.String() +} + +// StartingAfter returns the ID after which the next page should start. +func (c *Cursor) StartingAfter() uuid.UUID { + if c == nil { + return uuid.Nil + } + return c.startingAfter +} diff --git a/workflows/v1/automations.go b/workflows/v1/automations.go new file mode 100644 index 0000000..4be06f1 --- /dev/null +++ b/workflows/v1/automations.go @@ -0,0 +1,253 @@ +package workflows // import "github.com/tilebox/tilebox-go/workflows/v1" + +import ( + "context" + "encoding/json" + + "github.com/google/uuid" + tileboxv1 "github.com/tilebox/tilebox-go/protogen/tilebox/v1" + workflowsv1 "github.com/tilebox/tilebox-go/protogen/workflows/v1" +) + +// StorageType is the kind of storage location used by automation triggers. +type StorageType string + +const ( + StorageTypeUnspecified StorageType = "unspecified" + StorageTypeGCS StorageType = "gcs" + StorageTypeS3 StorageType = "s3" + StorageTypeFS StorageType = "fs" +) + +func (t StorageType) String() string { + return string(t) +} + +func (t StorageType) MarshalJSON() ([]byte, error) { + return json.Marshal(t.String()) +} + +// StorageLocation is a storage location available for automation storage event triggers. +type StorageLocation struct { + // ID is the unique identifier of the storage location. + ID uuid.UUID + // Location is the storage-system-specific location identifier. + Location string + // Type is the kind of storage location. + Type StorageType +} + +// Automation represents an automation prototype that can submit tasks from storage or cron triggers. +type Automation struct { + // ID is the unique identifier of the automation. + ID uuid.UUID + // Name is the human-readable name of the automation. + Name string + // Prototype is the task submission prototype that the automation submits. + Prototype *AutomationTaskPrototype + // StorageEventTriggers are triggers that submit the task for matching storage events. + StorageEventTriggers []*StorageEventTrigger + // CronTriggers are triggers that submit the task on a schedule. + CronTriggers []*CronTrigger + // Disabled reports whether the automation is paused. + Disabled bool +} + +// AutomationTaskPrototype is the single task submitted by an automation. +type AutomationTaskPrototype struct { + // ClusterSlug is the cluster where the task should run. + ClusterSlug string + // Identifier identifies the task implementation. + Identifier TaskIdentifier + // Display is a human-readable task label. + Display string + // Dependencies are task dependency indexes. + Dependencies []int64 + // MaxRetries is the maximum number of automatic retries for the task. + MaxRetries int64 + // Input is the serialized task input. + Input []byte +} + +// StorageEventTrigger submits an automation task when a matching object is created in a storage location. +type StorageEventTrigger struct { + // ID is the unique identifier of the trigger. + ID uuid.UUID + // StorageLocation is the storage location watched by this trigger. + StorageLocation *StorageLocation + // GlobPattern matches objects/files in the storage location. + GlobPattern string +} + +// CronTrigger submits an automation task on a cron schedule. +type CronTrigger struct { + // ID is the unique identifier of the trigger. + ID uuid.UUID + // Schedule is the cron schedule for the trigger. + Schedule string +} + +type AutomationClient interface { + // List returns all automations. + List(ctx context.Context) ([]*Automation, error) + + // Get returns an automation by ID. + Get(ctx context.Context, automationID uuid.UUID) (*Automation, error) + + // GetStorageLocation returns a storage location by ID. + GetStorageLocation(ctx context.Context, storageLocationID uuid.UUID) (*StorageLocation, error) + + // ListStorageLocations returns all storage locations available for automation triggers. + ListStorageLocations(ctx context.Context) ([]*StorageLocation, error) +} + +var _ AutomationClient = &automationClient{} + +type automationClient struct { + service _automationService +} + +func (c automationClient) List(ctx context.Context) ([]*Automation, error) { + response, err := c.service.ListAutomations(ctx) + if err != nil { + return nil, err + } + + automations := make([]*Automation, len(response.GetAutomations())) + for i, automation := range response.GetAutomations() { + automations[i] = protoToAutomation(automation) + } + + return automations, nil +} + +func (c automationClient) Get(ctx context.Context, automationID uuid.UUID) (*Automation, error) { + response, err := c.service.GetAutomation(ctx, automationID) + if err != nil { + return nil, err + } + + return protoToAutomation(response), nil +} + +func (c automationClient) GetStorageLocation(ctx context.Context, storageLocationID uuid.UUID) (*StorageLocation, error) { + response, err := c.service.GetStorageLocation(ctx, storageLocationID) + if err != nil { + return nil, err + } + + return protoToStorageLocation(response), nil +} + +func (c automationClient) ListStorageLocations(ctx context.Context) ([]*StorageLocation, error) { + response, err := c.service.ListStorageLocations(ctx) + if err != nil { + return nil, err + } + + locations := make([]*StorageLocation, len(response.GetLocations())) + for i, location := range response.GetLocations() { + locations[i] = protoToStorageLocation(location) + } + + return locations, nil +} + +func protoToAutomation(automation *workflowsv1.AutomationPrototype) *Automation { + if automation == nil { + return nil + } + + storageEventTriggers := make([]*StorageEventTrigger, len(automation.GetStorageEventTriggers())) + for i, trigger := range automation.GetStorageEventTriggers() { + storageEventTriggers[i] = protoToStorageEventTrigger(trigger) + } + + cronTriggers := make([]*CronTrigger, len(automation.GetCronTriggers())) + for i, trigger := range automation.GetCronTriggers() { + cronTriggers[i] = protoToCronTrigger(trigger) + } + + return &Automation{ + ID: protoIDToUUID(automation.GetId()), + Name: automation.GetName(), + Prototype: protoToAutomationTaskPrototype(automation.GetPrototype()), + StorageEventTriggers: storageEventTriggers, + CronTriggers: cronTriggers, + Disabled: automation.GetDisabled(), + } +} + +func protoToAutomationTaskPrototype(prototype *workflowsv1.SingleTaskSubmission) *AutomationTaskPrototype { + if prototype == nil { + return nil + } + + var identifier TaskIdentifier + if protoIdentifier := prototype.GetIdentifier(); protoIdentifier != nil { + identifier = NewTaskIdentifier(protoIdentifier.GetName(), protoIdentifier.GetVersion()) + } + + return &AutomationTaskPrototype{ + ClusterSlug: prototype.GetClusterSlug(), + Identifier: identifier, + Display: prototype.GetDisplay(), + Dependencies: prototype.GetDependencies(), + MaxRetries: prototype.GetMaxRetries(), + Input: prototype.GetInput(), + } +} + +func protoToStorageEventTrigger(trigger *workflowsv1.StorageEventTrigger) *StorageEventTrigger { + if trigger == nil { + return nil + } + return &StorageEventTrigger{ + ID: protoIDToUUID(trigger.GetId()), + StorageLocation: protoToStorageLocation(trigger.GetStorageLocation()), + GlobPattern: trigger.GetGlobPattern(), + } +} + +func protoToCronTrigger(trigger *workflowsv1.CronTrigger) *CronTrigger { + if trigger == nil { + return nil + } + return &CronTrigger{ + ID: protoIDToUUID(trigger.GetId()), + Schedule: trigger.GetSchedule(), + } +} + +func protoToStorageLocation(location *workflowsv1.StorageLocation) *StorageLocation { + if location == nil { + return nil + } + return &StorageLocation{ + ID: protoIDToUUID(location.GetId()), + Location: location.GetLocation(), + Type: protoToStorageType(location.GetType()), + } +} + +func protoToStorageType(storageType workflowsv1.StorageType) StorageType { + switch storageType { + case workflowsv1.StorageType_STORAGE_TYPE_UNSPECIFIED: + return StorageTypeUnspecified + case workflowsv1.StorageType_STORAGE_TYPE_GCS: + return StorageTypeGCS + case workflowsv1.StorageType_STORAGE_TYPE_S3: + return StorageTypeS3 + case workflowsv1.StorageType_STORAGE_TYPE_FS: + return StorageTypeFS + default: + return StorageTypeUnspecified + } +} + +func protoIDToUUID(id *tileboxv1.ID) uuid.UUID { + if id == nil { + return uuid.Nil + } + return id.AsUUID() +} diff --git a/workflows/v1/automations_test.go b/workflows/v1/automations_test.go new file mode 100644 index 0000000..828dc50 --- /dev/null +++ b/workflows/v1/automations_test.go @@ -0,0 +1,45 @@ +package workflows + +import ( + "context" + "testing" + + "github.com/google/uuid" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + tileboxv1 "github.com/tilebox/tilebox-go/protogen/tilebox/v1" + workflowsv1 "github.com/tilebox/tilebox-go/protogen/workflows/v1" +) + +type mockAutomationService struct { + _automationService + + getStorageLocationID uuid.UUID + storageLocation *workflowsv1.StorageLocation +} + +func (m *mockAutomationService) GetStorageLocation(_ context.Context, storageLocationID uuid.UUID) (*workflowsv1.StorageLocation, error) { + m.getStorageLocationID = storageLocationID + return m.storageLocation, nil +} + +func TestAutomationClientGetStorageLocation(t *testing.T) { + storageLocationID := uuid.MustParse("019e4f3c-4646-7312-b8fe-2e7fa83c1546") + service := &mockAutomationService{ + storageLocation: workflowsv1.StorageLocation_builder{ + Id: tileboxv1.NewUUID(storageLocationID), + Location: "gs://bucket", + Type: workflowsv1.StorageType_STORAGE_TYPE_GCS, + }.Build(), + } + client := &automationClient{service: service} + + location, err := client.GetStorageLocation(context.Background(), storageLocationID) + + require.NoError(t, err) + require.NotNil(t, location) + assert.Equal(t, storageLocationID, service.getStorageLocationID) + assert.Equal(t, storageLocationID, location.ID) + assert.Equal(t, "gs://bucket", location.Location) + assert.Equal(t, StorageTypeGCS, location.Type) +} diff --git a/workflows/v1/client.go b/workflows/v1/client.go index 31c72c1..39c8714 100644 --- a/workflows/v1/client.go +++ b/workflows/v1/client.go @@ -23,8 +23,9 @@ const ( // Client is a Tilebox Workflows client. type Client struct { - Jobs JobClient - Clusters ClusterClient + Jobs JobClient + Clusters ClusterClient + Automations AutomationClient // Used by NewTaskRunner taskService TaskService @@ -49,12 +50,15 @@ func NewClient(options ...ClientOption) *Client { telemetryConnectClient := newConnectClient(workflowsv1connect.NewTelemetryQueryServiceClient, cfg) taskConnectClient := newConnectClient(workflowsv1connect.NewTaskServiceClient, cfg) workflowConnectClient := newConnectClient(workflowsv1connect.NewWorkflowsServiceClient, cfg) + automationConnectClient := newConnectClient(workflowsv1connect.NewAutomationServiceClient, cfg) tracer := cfg.tracerProvider.Tracer(otelTracerName) + automationService := &automationService{automationClient: automationConnectClient, tracer: tracer} return &Client{ - Jobs: &jobClient{service: newJobService(jobConnectClient, tracer), telemetryService: newTelemetryService(telemetryConnectClient, tracer)}, - Clusters: &clusterClient{service: newWorkflowService(workflowConnectClient, tracer)}, + Jobs: &jobClient{service: newJobService(jobConnectClient, tracer), telemetryService: newTelemetryService(telemetryConnectClient, tracer)}, + Clusters: &clusterClient{service: newWorkflowService(workflowConnectClient, tracer)}, + Automations: &automationClient{service: automationService}, taskService: newTaskService(taskConnectClient, tracer), tracer: tracer, diff --git a/workflows/v1/job/options.go b/workflows/v1/job/options.go index 74cc8fb..919d27c 100644 --- a/workflows/v1/job/options.go +++ b/workflows/v1/job/options.go @@ -2,7 +2,6 @@ package job // import "github.com/tilebox/tilebox-go/workflows/v1/job" import ( "encoding/json" - "fmt" "github.com/google/uuid" "github.com/tilebox/tilebox-go/query" @@ -182,38 +181,16 @@ type CursorOption = sharedQueryOption // // Cursors are returned as NextCursor values from page query methods and should only be reused with the same endpoint, // filters and sort direction that produced them. -type Cursor struct { - startingAfter uuid.UUID -} +type Cursor = query.Cursor // NewCursor creates a cursor that starts after the entry with the given ID. func NewCursor(startingAfter uuid.UUID) *Cursor { - return &Cursor{startingAfter: startingAfter} + return query.NewCursor(startingAfter) } // ParseCursor parses a cursor string returned by Cursor.String. func ParseCursor(value string) (*Cursor, error) { - startingAfter, err := uuid.Parse(value) - if err != nil { - return nil, fmt.Errorf("invalid cursor: %w", err) - } - return NewCursor(startingAfter), nil -} - -// String returns the cursor as a string that can be passed to ParseCursor. -func (c *Cursor) String() string { - if c == nil { - return "" - } - return c.startingAfter.String() -} - -// StartingAfter returns the ID after which the next page should start. -func (c *Cursor) StartingAfter() uuid.UUID { - if c == nil { - return uuid.Nil - } - return c.startingAfter + return query.ParseCursor(value) } type cursorOption struct { From 5ec19f801d35a23c6278c057eaf56849a20e2262 Mon Sep 17 00:00:00 2001 From: Lukas Bindreiter Date: Fri, 22 May 2026 16:42:59 +0200 Subject: [PATCH 2/3] prepare release --- CHANGELOG.md | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index dcd27b8..2a2423b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] +## [0.7.0] - 2026-05-22 + ### Added - `datasets`: Added `client.Datasets.Create` and `client.Datasets.Update` with optional summary and markdown description options. @@ -110,7 +112,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - Added support for Tilebox Observability, including logging and tracing helpers. - Added examples for using the library. -[Unreleased]: https://github.com/tilebox/tilebox-go/compare/v0.6.0...HEAD +[Unreleased]: https://github.com/tilebox/tilebox-go/compare/v0.7.0...HEAD +[0.6.0]: https://github.com/tilebox/tilebox-go/compare/v0.6.0...v0.7.0 [0.6.0]: https://github.com/tilebox/tilebox-go/compare/v0.5.0...v0.6.0 [0.5.0]: https://github.com/tilebox/tilebox-go/compare/v0.4.0...v0.5.0 [0.4.0]: https://github.com/tilebox/tilebox-go/compare/v0.3.2...v0.4.0 From 04b7bf3b385535097b3c1130723ae28370d09647 Mon Sep 17 00:00:00 2001 From: Lukas Bindreiter Date: Fri, 22 May 2026 16:46:50 +0200 Subject: [PATCH 3/3] Fix lint issues --- datasets/v1/datapoints.go | 72 +++++++++++++++++++-------------------- 1 file changed, 36 insertions(+), 36 deletions(-) diff --git a/datasets/v1/datapoints.go b/datasets/v1/datapoints.go index d6a86d0..e2a2074 100644 --- a/datasets/v1/datapoints.go +++ b/datasets/v1/datapoints.go @@ -293,42 +293,6 @@ func (d datapointClient) QueryPage(ctx context.Context, datasetID uuid.UUID, opt return d.queryPage(ctx, datasetID, cfg) } -func (d datapointClient) queryPage(ctx context.Context, datasetID uuid.UUID, cfg *queryOptions) (*DatapointPage, error) { - if cfg.temporalExtent == nil { - // right now we return an error, in the future we might want to support queries without a temporal extent - return nil, errors.New("temporal extent is required") - } - - timeInterval := cfg.temporalExtent.ToProtoTimeInterval() - datapointInterval := cfg.temporalExtent.ToProtoIDInterval() - if timeInterval == nil && datapointInterval == nil { - return nil, errors.New("invalid temporal extent") - } - - filters := datasetsv1.QueryFilters_builder{ - TimeInterval: timeInterval, - DatapointInterval: datapointInterval, - }.Build() - - if cfg.spatialExtent != nil { - spatialExtent, err := cfg.spatialExtent.ToProtoSpatialFilter() - if err != nil { - return nil, err - } - filters.SetSpatialExtent(spatialExtent) - } - - datapointsMessage, err := d.dataAccessService.Query(ctx, datasetID, cfg.collectionIDs, filters, paginationFromOptions(cfg.limit, cfg.cursor), cfg.skipData) - if err != nil { - return nil, err - } - - return &DatapointPage{ - Datapoints: datapointsMessage.GetData().GetValue(), - NextCursor: cursorFromPagination(datapointsMessage.GetNextPage()), - }, nil -} - func paginationFromOptions(limit int64, cursor *Cursor) *tileboxv1.Pagination { if limit <= 0 && cursor == nil { return nil @@ -506,6 +470,42 @@ func (d datapointClient) DeleteIDs(ctx context.Context, collectionID uuid.UUID, return numDeleted, nil } +func (d datapointClient) queryPage(ctx context.Context, datasetID uuid.UUID, cfg *queryOptions) (*DatapointPage, error) { + if cfg.temporalExtent == nil { + // right now we return an error, in the future we might want to support queries without a temporal extent + return nil, errors.New("temporal extent is required") + } + + timeInterval := cfg.temporalExtent.ToProtoTimeInterval() + datapointInterval := cfg.temporalExtent.ToProtoIDInterval() + if timeInterval == nil && datapointInterval == nil { + return nil, errors.New("invalid temporal extent") + } + + filters := datasetsv1.QueryFilters_builder{ + TimeInterval: timeInterval, + DatapointInterval: datapointInterval, + }.Build() + + if cfg.spatialExtent != nil { + spatialExtent, err := cfg.spatialExtent.ToProtoSpatialFilter() + if err != nil { + return nil, err + } + filters.SetSpatialExtent(spatialExtent) + } + + datapointsMessage, err := d.dataAccessService.Query(ctx, datasetID, cfg.collectionIDs, filters, paginationFromOptions(cfg.limit, cfg.cursor), cfg.skipData) + if err != nil { + return nil, err + } + + return &DatapointPage{ + Datapoints: datapointsMessage.GetData().GetValue(), + NextCursor: cursorFromPagination(datapointsMessage.GetNextPage()), + }, nil +} + // CollectAs converts a sequence of bytes into a slice of proto.Message. func CollectAs[T proto.Message](seq iter.Seq2[[]byte, error]) ([]T, error) { return Collect(As[T](seq))