diff --git a/ci/resources/stemcell-version-bump/go.mod b/ci/resources/stemcell-version-bump/go.mod index 367805bb2..80ba81f80 100644 --- a/ci/resources/stemcell-version-bump/go.mod +++ b/ci/resources/stemcell-version-bump/go.mod @@ -5,7 +5,7 @@ go 1.24.0 toolchain go1.24.1 require ( - cloud.google.com/go/storage v1.56.2 + cloud.google.com/go/storage v1.57.0 github.com/stretchr/testify v1.11.1 google.golang.org/api v0.249.0 ) diff --git a/ci/resources/stemcell-version-bump/go.sum b/ci/resources/stemcell-version-bump/go.sum index 2dd23e364..640746e4d 100644 --- a/ci/resources/stemcell-version-bump/go.sum +++ b/ci/resources/stemcell-version-bump/go.sum @@ -16,8 +16,8 @@ cloud.google.com/go/longrunning v0.6.7 h1:IGtfDWHhQCgCjwQjV9iiLnUta9LBCo8R9QmAFs cloud.google.com/go/longrunning v0.6.7/go.mod h1:EAFV3IZAKmM56TyiE6VAP3VoTzhZzySwI/YI1s/nRsY= cloud.google.com/go/monitoring v1.24.2 h1:5OTsoJ1dXYIiMiuL+sYscLc9BumrL3CarVLL7dd7lHM= cloud.google.com/go/monitoring v1.24.2/go.mod h1:x7yzPWcgDRnPEv3sI+jJGBkwl5qINf+6qY4eq0I9B4U= -cloud.google.com/go/storage v1.56.2 h1:DzxQ4ppJe4OSTtZLtCqscC3knyW919eNl0zLLpojnqo= -cloud.google.com/go/storage v1.56.2/go.mod h1:C9xuCZgFl3buo2HZU/1FncgvvOgTAs/rnh4gF4lMg0s= +cloud.google.com/go/storage v1.57.0 h1:4g7NB7Ta7KetVbOMpCqy89C+Vg5VE8scqlSHUPm7Rds= +cloud.google.com/go/storage v1.57.0/go.mod h1:329cwlpzALLgJuu8beyJ/uvQznDHpa2U5lGjWednkzg= cloud.google.com/go/trace v1.11.6 h1:2O2zjPzqPYAHrn3OKl029qlqG6W8ZdYaOWRyr8NgMT4= cloud.google.com/go/trace v1.11.6/go.mod h1:GA855OeDEBiBMzcckLPE2kDunIpC72N+Pq8WFieFjnI= github.com/GoogleCloudPlatform/opentelemetry-operations-go/detectors/gcp v1.29.0 h1:UQUsRi8WTzhZntp5313l+CHIAT95ojUI2lpP/ExlZa4= diff --git a/ci/resources/stemcell-version-bump/vendor/cloud.google.com/go/storage/CHANGES.md b/ci/resources/stemcell-version-bump/vendor/cloud.google.com/go/storage/CHANGES.md index f91215387..c4872fbeb 100644 --- a/ci/resources/stemcell-version-bump/vendor/cloud.google.com/go/storage/CHANGES.md +++ b/ci/resources/stemcell-version-bump/vendor/cloud.google.com/go/storage/CHANGES.md @@ -1,12 +1,26 @@ # Changes -## [1.56.2](https://github.com/googleapis/google-cloud-go/compare/storage/v1.56.1...storage/v1.56.2) (2025-09-12) +## [1.57.0](https://github.com/googleapis/google-cloud-go/compare/storage/v1.56.1...storage/v1.57.0) (2025-09-23) + + +### Features + +* **storage/control:** Add new GetIamPolicy, SetIamPolicy, and TestIamPermissions RPCs ([d73f912](https://github.com/googleapis/google-cloud-go/commit/d73f9123be77bb3278f48d510cd0fb22feb605bc)) +* **storage:** Post support dynamic key name ([#12677](https://github.com/googleapis/google-cloud-go/issues/12677)) ([9e761f9](https://github.com/googleapis/google-cloud-go/commit/9e761f961a2c4351b3e0793ed655314ac5853903)) +* **storage:** WithMeterProvider allows custom meter provider configuration ([#12668](https://github.com/googleapis/google-cloud-go/issues/12668)) ([7f574b0](https://github.com/googleapis/google-cloud-go/commit/7f574b01e0b454c1ef5c13e6a58075e394ee990d)) ### Bug Fixes -* **storage:** Free buffers in Bidi Reader ([#12839](https://github.com/googleapis/google-cloud-go/issues/12839)) ([10c8fac](https://github.com/googleapis/google-cloud-go/commit/10c8faccc2dae2a8177ff30ab16d67413df9f536)) +* **storage:** Free buffers in Bidi Reader ([#12839](https://github.com/googleapis/google-cloud-go/issues/12839)) ([bc247fd](https://github.com/googleapis/google-cloud-go/commit/bc247fdc3f5234a8bd6934e58d5b0b578f1335cb)) +* **storage:** Make Writer thread-safe. ([#12753](https://github.com/googleapis/google-cloud-go/issues/12753)) ([9ea380b](https://github.com/googleapis/google-cloud-go/commit/9ea380bea5b980a9054d201be4f315a195da2182)) +* **storage:** No progress report for oneshot write ([#12746](https://github.com/googleapis/google-cloud-go/issues/12746)) ([b97c286](https://github.com/googleapis/google-cloud-go/commit/b97c286ec369a10a81b1a8a3a1aae18b46d2dfbc)) + + +### Performance Improvements + +* **storage:** Pipeline gRPC writes ([#12422](https://github.com/googleapis/google-cloud-go/issues/12422)) ([1f2c5fe](https://github.com/googleapis/google-cloud-go/commit/1f2c5fe2843724302086fe04cb8dab8b515969c5)) ## [1.56.1](https://github.com/googleapis/google-cloud-go/compare/storage/v1.56.0...storage/v1.56.1) (2025-08-19) diff --git a/ci/resources/stemcell-version-bump/vendor/cloud.google.com/go/storage/bucket.go b/ci/resources/stemcell-version-bump/vendor/cloud.google.com/go/storage/bucket.go index 60a5ffb5b..a8e56ed17 100644 --- a/ci/resources/stemcell-version-bump/vendor/cloud.google.com/go/storage/bucket.go +++ b/ci/resources/stemcell-version-bump/vendor/cloud.google.com/go/storage/bucket.go @@ -225,6 +225,11 @@ func (b *BucketHandle) SignedURL(object string, opts *SignedURLOptions) (string, // to be non-nil. You may need to set the GoogleAccessID and PrivateKey fields // in some cases. Read more on the [automatic detection of credentials] for this method. // +// To allow the unauthenticated client to upload to any object name in the +// bucket with a given prefix rather than a specific object name, you can pass +// an empty string for object and set [PostPolicyV4Options].Conditions to +// include [ConditionStartsWith]("$key", "prefix"). +// // [automatic detection of credentials]: https://pkg.go.dev/cloud.google.com/go/storage#hdr-Credential_requirements_for_signing func (b *BucketHandle) GenerateSignedPostPolicyV4(object string, opts *PostPolicyV4Options) (*PostPolicyV4, error) { // Make a copy of opts so we don't modify the pointer parameter. diff --git a/ci/resources/stemcell-version-bump/vendor/cloud.google.com/go/storage/doc.go b/ci/resources/stemcell-version-bump/vendor/cloud.google.com/go/storage/doc.go index 6a64c6c03..3726432c8 100644 --- a/ci/resources/stemcell-version-bump/vendor/cloud.google.com/go/storage/doc.go +++ b/ci/resources/stemcell-version-bump/vendor/cloud.google.com/go/storage/doc.go @@ -364,7 +364,7 @@ to add a [custom audit logging] header: This package includes support for the [Cloud Storage gRPC API]. This implementation uses gRPC rather than the default JSON & XML APIs to make requests to Cloud Storage. All methods on the [Client] support -the gRPC API, with the exception of [GetServiceAccount], [Notification], +the gRPC API, with the exception of the [Client.ServiceAccount], [Notification], and [HMACKey] methods. The Cloud Storage gRPC API is generally available. @@ -390,7 +390,10 @@ Requirements to use Direct Connectivity include: - Your client must use service account authentication. Additional requirements for Direct Connectivity are documented in the -[Cloud Storage gRPC docs]. +[Cloud Storage gRPC docs]. If all requirements are met, the client will +use Direct Connectivity by default without requiring any client options +or environment variables. To disable Direct Connectivity, you can set +the environment variable GOOGLE_CLOUD_DISABLE_DIRECT_PATH=true. Dependencies for the gRPC API may slightly increase the size of binaries for applications depending on this package. If you are not using gRPC, you can use diff --git a/ci/resources/stemcell-version-bump/vendor/cloud.google.com/go/storage/experimental/experimental.go b/ci/resources/stemcell-version-bump/vendor/cloud.google.com/go/storage/experimental/experimental.go index a178f9ebb..819e105d7 100644 --- a/ci/resources/stemcell-version-bump/vendor/cloud.google.com/go/storage/experimental/experimental.go +++ b/ci/resources/stemcell-version-bump/vendor/cloud.google.com/go/storage/experimental/experimental.go @@ -33,17 +33,25 @@ import ( // It sets how often to emit metrics [metric.WithInterval] when using // [metric.NewPeriodicReader] // When using Cloud Monitoring interval must be at minimum 1 [time.Minute]. +// This option is ignored if WithMeterProvider is also set. func WithMetricInterval(metricInterval time.Duration) option.ClientOption { return internal.WithMetricInterval.(func(time.Duration) option.ClientOption)(metricInterval) } // WithMetricExporter provides a [option.ClientOption] that may be passed to [storage.NewGRPCClient]. // Set an alternate client-side metric Exporter to emit metrics through. -// Must implement [metric.Exporter] +// Must implement [metric.Exporter]. This option is ignored if WithMeterProvider is also set. func WithMetricExporter(ex *metric.Exporter) option.ClientOption { return internal.WithMetricExporter.(func(*metric.Exporter) option.ClientOption)(ex) } +// WithMeterProvider provides a [option.ClientOption] that may be passed to [storage.NewGRPCClient]. +// Set an alternate client-side meter provider to emit metrics through. +// This option takes precedence over WithMetricExporter and WithMetricInterval. +func WithMeterProvider(mp *metric.MeterProvider) option.ClientOption { + return internal.WithMeterProvider.(func(*metric.MeterProvider) option.ClientOption)(mp) +} + // WithReadStallTimeout provides a [option.ClientOption] that may be passed to [storage.NewClient]. // It enables the client to retry stalled requests when starting a download from // Cloud Storage. If the timeout elapses with no response from the server, the request diff --git a/ci/resources/stemcell-version-bump/vendor/cloud.google.com/go/storage/grpc_client.go b/ci/resources/stemcell-version-bump/vendor/cloud.google.com/go/storage/grpc_client.go index 2ba4d5bbe..15de38df7 100644 --- a/ci/resources/stemcell-version-bump/vendor/cloud.google.com/go/storage/grpc_client.go +++ b/ci/resources/stemcell-version-bump/vendor/cloud.google.com/go/storage/grpc_client.go @@ -100,9 +100,10 @@ func defaultGRPCOptions() []option.ClientOption { } else { // Only enable DirectPath when the emulator is not being targeted. defaults = append(defaults, - internaloption.EnableDirectPath(true), internaloption.AllowNonDefaultServiceAccount(true), - internaloption.EnableDirectPathXds()) + internaloption.EnableDirectPath(true), + internaloption.EnableDirectPathXds(), + internaloption.AllowHardBoundTokens("ALTS")) } return defaults @@ -124,9 +125,11 @@ func enableClientMetrics(ctx context.Context, s *settings, config storageConfig) project = c.ProjectID } metricsContext, err := newGRPCMetricContext(ctx, metricsConfig{ - project: project, - interval: config.metricInterval, - manualReader: config.manualReader}, + project: project, + interval: config.metricInterval, + manualReader: config.manualReader, + meterProvider: config.meterProvider, + }, ) if err != nil { return nil, fmt.Errorf("gRPC Metrics: %w", err) diff --git a/ci/resources/stemcell-version-bump/vendor/cloud.google.com/go/storage/grpc_metrics.go b/ci/resources/stemcell-version-bump/vendor/cloud.google.com/go/storage/grpc_metrics.go index f7bebd1de..044210157 100644 --- a/ci/resources/stemcell-version-bump/vendor/cloud.google.com/go/storage/grpc_metrics.go +++ b/ci/resources/stemcell-version-bump/vendor/cloud.google.com/go/storage/grpc_metrics.go @@ -126,6 +126,7 @@ type metricsConfig struct { project string interval time.Duration customExporter *metric.Exporter + meterProvider *metric.MeterProvider manualReader *metric.ManualReader // used by tests disableExporter bool // used by tests disables exports resourceOpts []resource.Option // used by tests @@ -172,7 +173,10 @@ func newGRPCMetricContext(ctx context.Context, cfg metricsConfig) (*metricsConte meterOpts = append(meterOpts, metric.WithReader( metric.NewPeriodicReader(&exporterLogSuppressor{Exporter: exporter}, metric.WithInterval(interval)))) } - provider := metric.NewMeterProvider(meterOpts...) + provider := cfg.meterProvider + if provider == nil { + provider = metric.NewMeterProvider(meterOpts...) + } mo := opentelemetry.MetricsOptions{ MeterProvider: provider, Metrics: stats.NewMetrics( diff --git a/ci/resources/stemcell-version-bump/vendor/cloud.google.com/go/storage/grpc_writer.go b/ci/resources/stemcell-version-bump/vendor/cloud.google.com/go/storage/grpc_writer.go index 3d82e5951..b4fa4c855 100644 --- a/ci/resources/stemcell-version-bump/vendor/cloud.google.com/go/storage/grpc_writer.go +++ b/ci/resources/stemcell-version-bump/vendor/cloud.google.com/go/storage/grpc_writer.go @@ -19,7 +19,9 @@ import ( "errors" "fmt" "io" + "net/http" "net/url" + "strings" "time" gapic "cloud.google.com/go/storage/internal/apiv2" @@ -42,311 +44,745 @@ const ( maxPerMessageWriteSize int = int(storagepb.ServiceConstants_MAX_WRITE_CHUNK_BYTES) ) -func withBidiWriteObjectRedirectionErrorRetries(s *settings) (newr *retryConfig) { - oldr := s.retry - newr = oldr.clone() - if newr == nil { - newr = &retryConfig{} +func (w *gRPCWriter) Write(p []byte) (n int, err error) { + done := make(chan struct{}) + cmd := &gRPCWriterCommandWrite{p: p, done: done} + select { + case <-w.donec: + return 0, w.streamResult + case w.writesChan <- cmd: + // write command successfully delivered to sender. We no longer own cmd. + break } - if (oldr.policy == RetryIdempotent && !s.idempotent) || oldr.policy == RetryNever { - // We still retry redirection errors even when settings indicate not to - // retry. - // - // The protocol requires us to respect redirection errors, so RetryNever has - // to ignore them. - // - // Idempotency is always protected by redirection errors: they either - // contain a handle which can be used as idempotency information, or they do - // not contain a handle and are "affirmative failures" which indicate that - // no server-side action occurred. - newr.policy = RetryAlways - newr.shouldRetry = func(err error) bool { - return errors.Is(err, bidiWriteObjectRedirectionError{}) - } - return newr + + select { + case <-w.donec: + return 0, w.streamResult + case <-done: + return len(p), nil } - // If retry settings allow retries normally, fall back to that behavior. - newr.shouldRetry = func(err error) bool { - if errors.Is(err, bidiWriteObjectRedirectionError{}) { - return true - } - v := oldr.runShouldRetry(err) - return v +} + +func (w *gRPCWriter) Flush() (int64, error) { + done := make(chan int64) + cmd := &gRPCWriterCommandFlush{done: done} + select { + case <-w.donec: + return 0, w.streamResult + case w.writesChan <- cmd: + // flush command successfully delivered to sender. We no longer own cmd. + break + } + + select { + case <-w.donec: + return 0, w.streamResult + case f := <-done: + return f, nil } - return newr } -type flushResult struct { - err error - offset int64 +func (w *gRPCWriter) Close() error { + w.CloseWithError(nil) + return w.streamResult } -type gRPCInternalWriter struct { - flushSupported bool - flushInProgress bool +func (w *gRPCWriter) CloseWithError(err error) error { + // N.B. CloseWithError always returns nil! + select { + case <-w.donec: + return nil + case w.writesChan <- &gRPCWriterCommandClose{err: err}: + break + } + <-w.donec + return nil +} - pw *io.PipeWriter +func (c *grpcStorageClient) OpenWriter(params *openWriterParams, opts ...storageOption) (internalWriter, error) { + if params.attrs.Retention != nil { + // TO-DO: remove once ObjectRetention is available - see b/308194853 + return nil, status.Errorf(codes.Unimplemented, "storage: object retention is not supported in gRPC") + } - flushComplete chan flushResult -} + spec := &storagepb.WriteObjectSpec{ + Resource: params.attrs.toProtoObject(params.bucket), + Appendable: proto.Bool(params.append), + } + // WriteObject doesn't support the generation condition, so use default. + if err := applyCondsProto("WriteObject", defaultGen, params.conds, spec); err != nil { + return nil, err + } + + s := callSettings(c.settings, opts...) + + if s.retry == nil { + s.retry = defaultRetry.clone() + } + if params.append { + s.retry = withBidiWriteObjectRedirectionErrorRetries(s) + } + + chunkRetryDeadline := defaultWriteChunkRetryDeadline + if params.chunkRetryDeadline != 0 { + chunkRetryDeadline = params.chunkRetryDeadline + } + + ctx := params.ctx + if s.userProject != "" { + ctx = setUserProjectMetadata(ctx, s.userProject) + } + + chunkSize := gRPCChunkSize(params.chunkSize) + writeQuantum := maxPerMessageWriteSize + if writeQuantum > chunkSize { + writeQuantum = chunkSize + } + sendableUnits := chunkSize / writeQuantum + // There's no strict requirement that the chunk size be an exact multiple of + // the writeQuantum. In that case, there will be a tail segment of less than + // writeQuantum. + lastSegmentStart := sendableUnits * writeQuantum + if lastSegmentStart < chunkSize { + sendableUnits++ + } + + if params.append && params.appendGen >= 0 && params.setTakeoverOffset == nil { + return nil, errors.New("storage: no way to report offset for appendable takeover") + } + + w := &gRPCWriter{ + preRunCtx: ctx, + c: c, + settings: s, -func (giw *gRPCInternalWriter) Flush() (int64, error) { - if !giw.flushSupported { - return 0, errors.New("Flush is supported only if Writer.Append is set to true") + bucket: params.bucket, + attrs: params.attrs, + conds: params.conds, + spec: spec, + encryptionKey: params.encryptionKey, + + setError: params.setError, + progress: params.progress, + setObj: params.setObj, + setSize: params.setSize, + setTakeoverOffset: params.setTakeoverOffset, + + flushSupported: params.append, + sendCRC32C: params.sendCRC32C, + forceOneShot: params.chunkSize <= 0, + forceEmptyContentType: params.forceEmptyContentType, + append: params.append, + appendGen: params.appendGen, + finalizeOnClose: params.finalizeOnClose, + + buf: make([]byte, 0, chunkSize), + writeQuantum: writeQuantum, + lastSegmentStart: lastSegmentStart, + sendableUnits: sendableUnits, + bufUnsentIdx: 0, + bufFlushedIdx: -1, // Handle flushes to length 0 + bufBaseOffset: 0, + + chunkRetryDeadline: chunkRetryDeadline, + abandonRetriesTime: time.Time{}, + attempts: 0, + lastErr: nil, + streamSender: nil, + + writesChan: make(chan gRPCWriterCommand, 1), + currentCommand: nil, + streamResult: nil, + donec: params.donec, } - giw.flushInProgress = true - giw.pw.Close() + go func() { + if err := w.gatherFirstBuffer(); err != nil { + w.streamResult = err + w.setError(err) + close(w.donec) + return + } + + if w.attrs.ContentType == "" && !w.forceEmptyContentType { + w.spec.Resource.ContentType = w.detectContentType() + } + w.streamSender = w.pickBufferSender() + + w.streamResult = checkCanceled(run(w.preRunCtx, func(ctx context.Context) error { + w.lastErr = w.writeLoop(ctx) + return w.lastErr + }, w.settings.retry, w.settings.idempotent)) + w.setError(w.streamResult) + close(w.donec) + }() - // Return the offset based on flushComplete. - r := <-giw.flushComplete - return r.offset, r.err + return w, nil } -// Forward other methods to *io.PipeWriter -func (giw *gRPCInternalWriter) Write(p []byte) (int, error) { - return giw.pw.Write(p) +// gRPCWriter is a wrapper around the the gRPC client-stream API that manages +// sending chunks of data provided by the user over the stream. +type gRPCWriter struct { + preRunCtx context.Context + c *grpcStorageClient + settings *settings + + bucket string + attrs *ObjectAttrs + conds *Conditions + spec *storagepb.WriteObjectSpec + encryptionKey []byte + + setError func(error) + progress func(int64) + setObj func(*ObjectAttrs) + setSize func(int64) + setTakeoverOffset func(int64) + + flushSupported bool + sendCRC32C bool + forceOneShot bool + forceEmptyContentType bool + append bool + appendGen int64 + finalizeOnClose bool + + buf []byte + // A writeQuantum is the largest quantity of data which can be sent to the + // service in a single message. + writeQuantum int + lastSegmentStart int + sendableUnits int + bufUnsentIdx int + bufFlushedIdx int + bufBaseOffset int64 + + chunkRetryDeadline time.Duration + abandonRetriesTime time.Time + attempts int + lastErr error + streamSender gRPCBidiWriteBufferSender + + // Communication from the user goroutine to the stream management goroutines + writesChan chan gRPCWriterCommand + currentCommand gRPCWriterCommand + forcedStreamResult error + streamResult error + donec chan struct{} } -func (giw *gRPCInternalWriter) Close() error { - return giw.pw.Close() +func (w *gRPCWriter) pickBufferSender() gRPCBidiWriteBufferSender { + if w.append { + // Appendable object semantics + if w.appendGen >= 0 { + return w.newGRPCAppendTakeoverWriteBufferSender() + } + return w.newGRPCAppendableObjectBufferSender() + } + if w.forceOneShot { + // One shot semantics - no progress reports + w.progress = func(int64) {} + return w.newGRPCOneshotBidiWriteBufferSender() + } + // Resumable write semantics + return w.newGRPCResumableBidiWriteBufferSender() } -func (giw *gRPCInternalWriter) CloseWithError(err error) error { - return giw.pw.CloseWithError(err) +// sendBufferToTarget uses cs to send slices of buf, which starts at baseOffset +// bytes into the object. Slices are sent until flushAt bytes have sent, in +// which case the final request is a flush, or until len(buf) < w.writeQuantum. +// +// handleCompletion is called for any completions that arrive during sends. +// +// Returns the last byte offset sent. Returns true if all desired requests were +// delivered, and false if cs.completions was closed before all requests could +// be delivered. +func (w *gRPCWriter) sendBufferToTarget(cs gRPCWriterCommandHandleChans, buf []byte, baseOffset int64, flushAt int, handleCompletion func(gRPCBidiWriteCompletion)) (int64, bool) { + sent := 0 + if len(buf) > flushAt { + buf = buf[:flushAt] + } + for len(buf) > 0 && (len(buf) >= w.writeQuantum || len(buf) >= flushAt-sent) { + q := w.writeQuantum + if flushAt-sent < w.writeQuantum { + q = flushAt - sent + } + req := gRPCBidiWriteRequest{ + buf: buf[:q], + offset: baseOffset + int64(sent), + flush: q == flushAt-sent, + } + if !cs.deliverRequestUnlessCompleted(req, handleCompletion) { + return baseOffset + int64(sent), false + } + buf = buf[q:] + sent += q + } + return baseOffset + int64(sent), true } -func (c *grpcStorageClient) OpenWriter(params *openWriterParams, opts ...storageOption) (internalWriter, error) { - var offset int64 - errorf := params.setError - setObj := params.setObj - pr, pw := io.Pipe() +func (w *gRPCWriter) handleCompletion(c gRPCBidiWriteCompletion) { + if c.resource != nil { + w.setObj(newObjectFromProto(c.resource)) + } - s := callSettings(c.settings, opts...) + // Already handled this completion + if c.flushOffset <= w.bufBaseOffset+int64(w.bufFlushedIdx) { + return + } - retryDeadline := defaultWriteChunkRetryDeadline - if params.chunkRetryDeadline != 0 { - retryDeadline = params.chunkRetryDeadline + w.bufFlushedIdx = int(c.flushOffset - w.bufBaseOffset) + if w.bufFlushedIdx >= len(w.buf) { + // We can clear w.buf + w.bufBaseOffset = c.flushOffset + w.bufUnsentIdx = 0 + w.bufFlushedIdx = 0 + w.buf = w.buf[:0] } - if s.retry == nil { - s.retry = defaultRetry.clone() + w.setSize(c.flushOffset) + w.progress(c.flushOffset) +} + +func (w *gRPCWriter) withCommandRetryDeadline(f func() error) error { + w.abandonRetriesTime = time.Now().Add(w.chunkRetryDeadline) + err := f() + if err == nil { + w.abandonRetriesTime = time.Time{} } - if params.append { - s.retry = withBidiWriteObjectRedirectionErrorRetries(s) + return err +} + +// Gather write commands before starting the actual write. Returns nil if the +// stream should be started, and an error otherwise. +func (w *gRPCWriter) gatherFirstBuffer() error { + if w.append && w.appendGen >= 0 { + // For takeovers, kick off the stream immediately since we need to know the + // takeover offset to issue writes. + return nil } - s.retry.maxRetryDuration = retryDeadline - gw, err := newGRPCWriter(c, s, params, pr, pr, pw) - if err != nil { - errorf(err) - pr.CloseWithError(err) - close(params.donec) - return nil, err + for cmd := range w.writesChan { + switch v := cmd.(type) { + case *gRPCWriterCommandWrite: + if len(w.buf)+len(v.p) <= cap(w.buf) { + // We have not started sending yet, and we can stage all data without + // starting a send. Compare against cap(w.buf) instead of + // w.writeQuantum: that way we can perform a oneshot upload for objects + // which fit in one chunk, even though we will cut the request into + // w.writeQuantum units when we do start sending. + origLen := len(w.buf) + w.buf = w.buf[:origLen+len(v.p)] + copy(w.buf[origLen:], v.p) + close(v.done) + } else { + // Too large. Handle it in writeLoop. + w.currentCommand = cmd + return nil + } + break + case *gRPCWriterCommandClose: + // If we get here, data (if any) fits in w.buf, so we can force oneshot. + w.forceOneShot = true + w.currentCommand = cmd + // No need to start sending if v.err is not nil. + return v.err + default: + // Have to start sending! + w.currentCommand = cmd + return nil + } } + // Nothing should ever close w.writesChan, so we should never get here + return errors.New("storage.Writer: unexpectedly closed w.writesChan") +} - var o *storagepb.Object +func (w *gRPCWriter) writeLoop(ctx context.Context) error { + w.attempts++ + // Return an error if we've been waiting for a single operation for too long. + if !w.abandonRetriesTime.IsZero() && time.Now().After(w.abandonRetriesTime) { + return fmt.Errorf("storage: retry deadline of %s reached after %v attempts; last error: %w", w.chunkRetryDeadline, w.attempts, w.lastErr) + } + // Allow each request in w.buf to be sent and result in a completion without + // blocking. + requests := make(chan gRPCBidiWriteRequest, w.sendableUnits) + completions := make(chan gRPCBidiWriteCompletion, w.sendableUnits) + // Only one request ack will be outstanding at a time. + requestAcks := make(chan struct{}, 1) + chcs := gRPCWriterCommandHandleChans{requests, requestAcks, completions} + bscs := gRPCBufSenderChans{requests, requestAcks, completions} + ctx, cancel := context.WithCancel(ctx) + defer cancel() + w.streamSender.connect(ctx, bscs, w.settings.gax...) + + // Send any full quantum in w.buf, possibly including a flush + if err := w.withCommandRetryDeadline(func() error { + sentOffset, ok := w.sendBufferToTarget(chcs, w.buf, w.bufBaseOffset, cap(w.buf), + w.handleCompletion) + if !ok { + return w.streamSender.err() + } + w.bufUnsentIdx = int(sentOffset - w.bufBaseOffset) + // We may have observed a completion that is after all of w.buf if we also + // have a write command in w.currentCommand which sent a flush, but failed + // before the completion could be delivered. + if w.bufUnsentIdx < 0 { + w.bufUnsentIdx = 0 + } + return nil + }); err != nil { + return err + } - // If we are taking over an appendable object, send the first message here - // to get the append offset. - if params.append && params.appendGen >= 0 { - // Create the buffer sender. This opens a stream and blocks until we - // get a response that tells us what offset to write from. - wbs, err := gw.newGRPCAppendTakeoverWriteBufferSender(params.ctx) - if err != nil { - return nil, fmt.Errorf("storage: creating buffer sender: %w", err) + err := func() error { + for { + if w.currentCommand != nil { + if err := w.withCommandRetryDeadline(func() error { + return w.currentCommand.handle(w, chcs) + }); err != nil { + return err + } + w.currentCommand = nil + } + select { + case c, ok := <-completions: + if !ok { + return w.streamSender.err() + } + w.handleCompletion(c) + case cmd, ok := <-w.writesChan: + if !ok { + // Nothing should ever close w.writesChan, so we should never get here + return errors.New("storage.Writer: unexpectedly closed w.writesChan") + } + w.currentCommand = cmd + } } - // Propagate append offset to caller and buffer sending logic below. - params.setTakeoverOffset(wbs.takeoverOffset) - offset = wbs.takeoverOffset - gw.streamSender = wbs - o = wbs.objResource - setObj(newObjectFromProto(o)) + }() + if err == nil { + err = errors.New("storage.Writer: unexpected nil error from write loop") + } + var closeErr *gRPCWriterCommandClose + if !errors.As(err, &closeErr) { + // Not a shutdown. + return err } - // This function reads the data sent to the pipe and sends sets of messages - // on the gRPC client-stream as the buffer is filled. - go func() { - err := func() error { - // Unless the user told us the content type, we have to determine it from - // the first read. - if params.attrs.ContentType == "" && !params.forceEmptyContentType { - gw.reader, gw.spec.Resource.ContentType = gax.DetermineContentType(gw.reader) + if closeErr.err == nil { + // Clean shutdown. Send any remaining tail. + req := gRPCBidiWriteRequest{ + buf: w.buf[w.bufUnsentIdx:], + offset: w.bufBaseOffset + int64(w.bufUnsentIdx), + flush: true, + finishWrite: true, + } + if err := w.withCommandRetryDeadline(func() error { + if !chcs.deliverRequestUnlessCompleted(req, w.handleCompletion) { + return w.streamSender.err() } + return nil + }); err != nil { + return err + } + } else { + // Unclean shutdown. Cancel the context so we clean up expeditiously. + cancel() + } - // Loop until there is an error or the Object has been finalized. - for { - // Note: This blocks until either the buffer is full or EOF is read. - recvd, doneReading, err := gw.read() - if err != nil { - return err - } + close(requests) + for c := range completions { + w.handleCompletion(c) + } + if closeErr.err == nil { + return w.streamSender.err() + } + return closeErr.err +} - uploadBuff := func(ctx context.Context) error { - obj, err := gw.uploadBuffer(ctx, recvd, offset, doneReading) - if obj != nil { - o = obj - setObj(newObjectFromProto(o)) - } - return err - } +// gRPCWriterCommandHandleChans contains the channels that a gRPCWriterCommand +// implementation must use to send requests and get notified of completions. +// Requests are delivered on a write-only channel, request acks and completions +// arrive on read-only channels. +type gRPCWriterCommandHandleChans struct { + requests chan<- gRPCBidiWriteRequest + requestAcks <-chan struct{} + completions <-chan gRPCBidiWriteCompletion +} - // Add routing headers to the context metadata for single-shot and resumable - // writes. Append writes need to set this at a lower level to pass the routing - // token. - bctx := gw.ctx - if !gw.append { - bctx = bucketContext(bctx, gw.bucket) - } - err = run(bctx, uploadBuff, gw.settings.retry, s.idempotent) - offset += int64(recvd) - // If this buffer upload was triggered by a flush, reset and - // communicate back the result. - if gw.iw.flushInProgress { - gw.setSize(offset) - gw.iw.flushInProgress = false - gw.iw.flushComplete <- flushResult{offset: offset, err: err} - } - if err != nil { - return err - } - // When we are done reading data without errors, set the object and - // finish. - if doneReading { - // Build Object from server's response. - setObj(newObjectFromProto(o)) - return nil - } +// gRPCBufSenderChans contains the channels that a gRPCBidiWriteBufferSender +// must use to get notified of requests and deliver completions. Requests arrive +// on a read-only channel, request acks and completions are delivered on +// write-only channels. +type gRPCBufSenderChans struct { + requests <-chan gRPCBidiWriteRequest + requestAcks chan<- struct{} + completions chan<- gRPCBidiWriteCompletion +} + +// deliverRequestUnlessCompleted submits req to cs.requests, unless +// cs.completions is closed first. If a completion arrives before the request is +// enqueued, handleCompletion is called. +// +// Returns true if request was successfully enqueued, and false if completions +// was closed first. +func (cs gRPCWriterCommandHandleChans) deliverRequestUnlessCompleted(req gRPCBidiWriteRequest, handleCompletion func(gRPCBidiWriteCompletion)) bool { + for { + select { + case cs.requests <- req: + return true + case c, ok := <-cs.completions: + if !ok { + return false } - }() + handleCompletion(c) + } + } +} - // These calls are still valid if err is nil - err = checkCanceled(err) - errorf(err) - gw.pr.CloseWithError(err) - close(params.donec) - }() +// gRPCWriterCommand represents an operation on a gRPCWriter +type gRPCWriterCommand interface { + // handle applies the command to a gRPCWriter. + // + // Implementations may return an error. In that case, the command may be + // retried with a new gRPCWriterCommandHandleChans instance. + handle(*gRPCWriter, gRPCWriterCommandHandleChans) error +} - return gw.iw, nil +type gRPCWriterCommandWrite struct { + p []byte + done chan struct{} } -func newGRPCWriter(c *grpcStorageClient, s *settings, params *openWriterParams, r io.Reader, pr *io.PipeReader, pw *io.PipeWriter) (*gRPCWriter, error) { - if params.attrs.Retention != nil { - // TO-DO: remove once ObjectRetention is available - see b/308194853 - return nil, status.Errorf(codes.Unimplemented, "storage: object retention is not supported in gRPC") +func (c *gRPCWriterCommandWrite) handle(w *gRPCWriter, cs gRPCWriterCommandHandleChans) error { + if len(c.p) == 0 { + // No data to write. + close(c.done) + return nil } - size := googleapi.MinUploadChunkSize - // A completely bufferless upload (params.chunkSize <= 0) is not possible in - // gRPC because the buffer must be provided to the message. Use the minimum - // size possible. - if params.chunkSize > 0 { - size = params.chunkSize + wblen := len(w.buf) + allKnownBytes := wblen + len(c.p) + fullBufs := allKnownBytes / cap(w.buf) + partialBuf := allKnownBytes % cap(w.buf) + if partialBuf == 0 { + // If we would exactly fill some number of cap(w.buf) units, we don't need + // to block on the flush for the last one. We know that c.p is not empty, so + // allKnownBytes is not 0 and therefore if partialBuf is 0, fullBufs is not + // 0. + fullBufs-- + partialBuf = cap(w.buf) } - // Round up chunksize to nearest 256KiB - if size%googleapi.MinUploadChunkSize != 0 { - size += googleapi.MinUploadChunkSize - (size % googleapi.MinUploadChunkSize) + if fullBufs == 0 { + // Everything fits in w.buf. Copy in and send from there. + w.buf = w.buf[:allKnownBytes] + copied := copy(w.buf[wblen:], c.p) + // Now that it's in w.buf, clear it from the command in case we retry. + c.p = c.p[copied:] + sending := w.buf[w.bufUnsentIdx:] + sentOffset, ok := w.sendBufferToTarget(cs, sending, w.bufBaseOffset+int64(w.bufUnsentIdx), cap(sending), + w.handleCompletion) + if !ok { + return w.streamSender.err() + } + w.bufUnsentIdx = int(sentOffset - w.bufBaseOffset) + close(c.done) + return nil } - if s.userProject != "" { - params.ctx = setUserProjectMetadata(params.ctx, s.userProject) + // We have at least one full buffer, followed by a partial. The first full + // buffer is the interesting one. We don't actually have to copy all of c.p + // in: we can send from it in place, except for any partial quantum at the + // tail of w.buf. Send that quantum... + toNextWriteQuantum := func() int { + if wblen > w.lastSegmentStart { + return cap(w.buf) - wblen + } + if wblen%w.writeQuantum == 0 { + return 0 + } + return w.writeQuantum - (wblen % w.writeQuantum) + }() + w.buf = w.buf[:wblen+toNextWriteQuantum] + copied := copy(w.buf[wblen:], c.p) + c.p = c.p[copied:] + firstFullBufFromCmd := cap(w.buf) - len(w.buf) + + sending := w.buf[w.bufUnsentIdx:] + sentOffset, ok := w.sendBufferToTarget(cs, sending, w.bufBaseOffset+int64(w.bufUnsentIdx), cap(sending), + w.handleCompletion) + if !ok { + return w.streamSender.err() } - spec := &storagepb.WriteObjectSpec{ - Resource: params.attrs.toProtoObject(params.bucket), - Appendable: proto.Bool(params.append), + // ...then send the prefix of c.p which could fill w.buf + cmdBaseOffset := w.bufBaseOffset + int64(len(w.buf)) + cmdBuf := c.p + trimCommandBuf := func(cmp gRPCBidiWriteCompletion) { + w.handleCompletion(cmp) + // After a completion, keep c.p up to date with w.buf's tail. + bufTail := w.bufBaseOffset + int64(len(w.buf)) + if bufTail <= cmdBaseOffset { + return + } + trim := int(bufTail - cmdBaseOffset) + if len(c.p) < trim { + trim = len(c.p) + } + c.p = c.p[trim:] + cmdBaseOffset = bufTail + } + offset := cmdBaseOffset + sentOffset, ok = w.sendBufferToTarget(cs, cmdBuf, offset, firstFullBufFromCmd, + trimCommandBuf) + if !ok { + return w.streamSender.err() } - var appendSpec *storagepb.AppendObjectSpec - if params.append && params.appendGen >= 0 { - appendSpec = &storagepb.AppendObjectSpec{ - Bucket: bucketResourceName(globalProjectAlias, params.bucket), - Object: params.attrs.Name, - Generation: params.appendGen, + cmdBuf = cmdBuf[int(sentOffset-offset):] + offset = sentOffset + + // Remaining full buffers can be satisfied entirely from cmdBuf with no copies. + for i := 0; i < fullBufs-1; i++ { + sentOffset, ok = w.sendBufferToTarget(cs, cmdBuf, offset, cap(w.buf), + trimCommandBuf) + if !ok { + return w.streamSender.err() } + cmdBuf = cmdBuf[int(sentOffset-offset):] + offset = sentOffset } - // WriteObject doesn't support the generation condition, so use default. - if err := applyCondsProto("WriteObject", defaultGen, params.conds, spec); err != nil { - return nil, err + + // Send the last partial buffer. We need to flush to offset before we can copy + // the rest of cmdBuf into w.buf and complete this command. + sentOffset, ok = w.sendBufferToTarget(cs, cmdBuf, offset, cap(w.buf), + trimCommandBuf) + if !ok { + return w.streamSender.err() + } + // Finally, we need the sender to ack to let us know c.p can be released. + if !cs.deliverRequestUnlessCompleted(gRPCBidiWriteRequest{requestAck: true}, trimCommandBuf) { + return w.streamSender.err() + } + ackOutstanding := true + for ackOutstanding || (w.bufBaseOffset+int64(w.bufFlushedIdx)) < offset { + select { + case cmp, ok := <-cs.completions: + if !ok { + return w.streamSender.err() + } + trimCommandBuf(cmp) + case <-cs.requestAcks: + ackOutstanding = false + } } + toCopyIn := cmdBuf[int(w.bufBaseOffset-offset):] + w.buf = w.buf[:len(toCopyIn)] + copy(w.buf, toCopyIn) + w.bufUnsentIdx = int(sentOffset - w.bufBaseOffset) + close(c.done) + return nil +} - return &gRPCWriter{ - buf: make([]byte, size), - c: c, - ctx: params.ctx, - reader: r, - pr: pr, - iw: &gRPCInternalWriter{ - flushSupported: params.append, - flushInProgress: false, - pw: pw, - flushComplete: make(chan flushResult), - }, - bucket: params.bucket, - attrs: params.attrs, - conds: params.conds, - spec: spec, - appendSpec: appendSpec, - encryptionKey: params.encryptionKey, - settings: s, - progress: params.progress, - setSize: params.setSize, - sendCRC32C: params.sendCRC32C, - forceOneShot: params.chunkSize <= 0, - forceEmptyContentType: params.forceEmptyContentType, - append: params.append, - finalizeOnClose: params.finalizeOnClose, - }, nil +type gRPCWriterCommandFlush struct { + done chan int64 } -// gRPCWriter is a wrapper around the the gRPC client-stream API that manages -// sending chunks of data provided by the user over the stream. -type gRPCWriter struct { - c *grpcStorageClient - buf []byte - reader io.Reader - pr *io.PipeReader // Keep track of pr to update post-flush - iw *gRPCInternalWriter +func (c *gRPCWriterCommandFlush) handle(w *gRPCWriter, cs gRPCWriterCommandHandleChans) error { + flushTarget := w.bufBaseOffset + int64(len(w.buf)) + // We know that there are at most w.writeQuantum bytes in + // w.buf[w.bufUnsentIdx:], because we send anything more inline when handling + // a write. + req := gRPCBidiWriteRequest{ + buf: w.buf[w.bufUnsentIdx:], + offset: w.bufBaseOffset + int64(w.bufUnsentIdx), + flush: true, + finishWrite: false, + } + if !cs.deliverRequestUnlessCompleted(req, w.handleCompletion) { + return w.streamSender.err() + } + // Successful flushes will clear w.buf. + for (w.bufBaseOffset + int64(w.bufFlushedIdx)) < flushTarget { + c, ok := <-cs.completions + if !ok { + // Stream failure + return w.streamSender.err() + } + w.handleCompletion(c) + } + // handleCompletion has cleared w.buf and updated w.bufUnsentIdx by now. + c.done <- flushTarget + return nil +} - ctx context.Context +type gRPCWriterCommandClose struct { + err error +} - bucket string - attrs *ObjectAttrs - conds *Conditions - spec *storagepb.WriteObjectSpec - appendSpec *storagepb.AppendObjectSpec - encryptionKey []byte - settings *settings - progress func(int64) - setSize func(int64) +func (e *gRPCWriterCommandClose) Error() string { + return e.err.Error() +} - sendCRC32C bool - forceOneShot bool - forceEmptyContentType bool - append bool - finalizeOnClose bool +func (c *gRPCWriterCommandClose) handle(w *gRPCWriter, cs gRPCWriterCommandHandleChans) error { + // N.B. c is not nil, even if c.err is nil! + return c +} - streamSender gRPCBidiWriteBufferSender +// Detect content type using bytes first from baseBuf, then from pendingBuf if +// there are not enough bytes in baseBuf. +func (w *gRPCWriter) detectContentType() string { + wblen := len(w.buf) + // If the current command is a write, we want to be able to update it in + // place. If the + cmdbuf := &([]byte{}) + if c, ok := w.currentCommand.(*gRPCWriterCommandWrite); ok { + cmdbuf = &c.p + } + if wblen == 0 { + // Use the command in place + return http.DetectContentType(*cmdbuf) + } + if wblen >= w.writeQuantum { + // Use w.buf in place + return http.DetectContentType(w.buf) + } + + // We need to put bytes from the command onto w.buf. Try to fill a + // writeQuantum since we'll have to do that in order to send, anyway. + newSz := w.writeQuantum + if wblen+len(*cmdbuf) < newSz { + newSz = wblen + len(*cmdbuf) + } + w.buf = w.buf[:newSz] + copied := copy(w.buf[wblen:], *cmdbuf) + *cmdbuf = (*cmdbuf)[copied:] + return http.DetectContentType(w.buf) } -func bucketContext(ctx context.Context, bucket string) context.Context { - hds := []string{"x-goog-request-params", fmt.Sprintf("bucket=projects/_/buckets/%s", url.QueryEscape(bucket))} - return gax.InsertMetadataIntoOutgoingContext(ctx, hds...) +type gRPCBidiWriteRequest struct { + buf []byte + offset int64 + flush bool + finishWrite bool + // If requestAck is true, no other message fields may be set. Buffer senders + // must ack on the requestAcks channel if all prior messages on the requests + // channel have been delivered to gRPC. + requestAck bool } -// drainInboundStream calls stream.Recv() repeatedly until an error is returned. -// It returns the last Resource received on the stream, or nil if no Resource -// was returned. drainInboundStream always returns a non-nil error. io.EOF -// indicates all messages were successfully read. -func drainInboundStream(stream storagepb.Storage_BidiWriteObjectClient) (object *storagepb.Object, err error) { - for err == nil { - var resp *storagepb.BidiWriteObjectResponse - resp, err = stream.Recv() - // GetResource() returns nil on a nil response - if resp.GetResource() != nil { - object = resp.GetResource() - } +type gRPCBidiWriteCompletion struct { + flushOffset int64 + resource *storagepb.Object +} + +func completion(r *storagepb.BidiWriteObjectResponse) *gRPCBidiWriteCompletion { + switch c := r.WriteStatus.(type) { + case *storagepb.BidiWriteObjectResponse_PersistedSize: + return &gRPCBidiWriteCompletion{flushOffset: c.PersistedSize} + case *storagepb.BidiWriteObjectResponse_Resource: + return &gRPCBidiWriteCompletion{flushOffset: c.Resource.GetSize(), resource: c.Resource} + default: + return nil } - return object, err } func bidiWriteObjectRequest(buf []byte, offset int64, flush, finishWrite bool) *storagepb.BidiWriteObjectRequest { @@ -369,464 +805,445 @@ func bidiWriteObjectRequest(buf []byte, offset int64, flush, finishWrite bool) * } type gRPCBidiWriteBufferSender interface { - // sendBuffer implementations should upload buf, respecting flush and - // finishWrite. Callers must guarantee that buf is not too long to fit in a - // gRPC message. + // connect implementations may attempt to establish a connection for issuing + // writes. + // + // In case of an error, implementations must close the completion channel. The + // write loop will inspect err() after that channel is closed and before any + // subsequent calls to connect(). // - // If flush is true, implementations must not return until the data in buf is - // stable. If finishWrite is true, implementations must return the object on - // success. - sendBuffer(ctx context.Context, buf []byte, offset int64, flush, finishWrite bool) (*storagepb.Object, error) + // If a request is delivered with flush true, implementations must request + // that the service make the data stable. If a request is delivered with + // finishWrite true, no subsequent messages will be delivered on the channel, + // and implementations must attempt to tear down the connection cleanly after + // sending the request. In both cases, the write loop will stall unless the + // completion channel is closed or receives a completion indicating that + // offset+len(buf) is persisted. + connect(context.Context, gRPCBufSenderChans, ...gax.CallOption) + + // err implementations must return the error on the stream. err() must be safe + // to call after the completion channel provided to connect() is closed. The + // write loop will not make concurrent calls to connect() and err(). + err() error } type gRPCOneshotBidiWriteBufferSender struct { - firstMessage *storagepb.BidiWriteObjectRequest raw *gapic.Client - stream storagepb.Storage_BidiWriteObjectClient - settings *settings + bucket string + firstMessage *storagepb.BidiWriteObjectRequest + streamErr error } -func (w *gRPCWriter) newGRPCOneshotBidiWriteBufferSender() (*gRPCOneshotBidiWriteBufferSender, error) { - firstMessage := &storagepb.BidiWriteObjectRequest{ - FirstMessage: &storagepb.BidiWriteObjectRequest_WriteObjectSpec{ - WriteObjectSpec: w.spec, +func (w *gRPCWriter) newGRPCOneshotBidiWriteBufferSender() *gRPCOneshotBidiWriteBufferSender { + return &gRPCOneshotBidiWriteBufferSender{ + raw: w.c.raw, + bucket: w.bucket, + firstMessage: &storagepb.BidiWriteObjectRequest{ + FirstMessage: &storagepb.BidiWriteObjectRequest_WriteObjectSpec{ + WriteObjectSpec: w.spec, + }, + CommonObjectRequestParams: toProtoCommonObjectRequestParams(w.encryptionKey), + // For a non-resumable upload, checksums must be sent in this message. + // TODO: Currently the checksums are only sent on the first message + // of the stream, but in the future, we must also support sending it + // on the *last* message of the stream (instead of the first). + ObjectChecksums: toProtoChecksums(w.sendCRC32C, w.attrs), }, - CommonObjectRequestParams: toProtoCommonObjectRequestParams(w.encryptionKey), - // For a non-resumable upload, checksums must be sent in this message. - // TODO: Currently the checksums are only sent on the first message - // of the stream, but in the future, we must also support sending it - // on the *last* message of the stream (instead of the first). - ObjectChecksums: toProtoChecksums(w.sendCRC32C, w.attrs), } - - return &gRPCOneshotBidiWriteBufferSender{ - firstMessage: firstMessage, - raw: w.c.raw, - settings: w.settings, - }, nil } -func (s *gRPCOneshotBidiWriteBufferSender) sendBuffer(ctx context.Context, buf []byte, offset int64, flush, finishWrite bool) (obj *storagepb.Object, err error) { - var firstMessage *storagepb.BidiWriteObjectRequest - if s.stream == nil { - s.stream, err = s.raw.BidiWriteObject(ctx, s.settings.gax...) - if err != nil { - return +func (s *gRPCOneshotBidiWriteBufferSender) err() error { return s.streamErr } + +// drainInboundStream calls stream.Recv() repeatedly until an error is returned. +// It returns the last Resource received on the stream, or nil if no Resource +// was returned. drainInboundStream always returns a non-nil error. io.EOF +// indicates all messages were successfully read. +func drainInboundStream(stream storagepb.Storage_BidiWriteObjectClient) (object *storagepb.Object, err error) { + for err == nil { + var resp *storagepb.BidiWriteObjectResponse + resp, err = stream.Recv() + // GetResource() returns nil on a nil response + if resp.GetResource() != nil { + object = resp.GetResource() } - firstMessage = s.firstMessage - } - req := bidiWriteObjectRequest(buf, offset, flush, finishWrite) - if firstMessage != nil { - proto.Merge(req, firstMessage) } + return object, err +} - sendErr := s.stream.Send(req) - if sendErr != nil { - obj, err = drainInboundStream(s.stream) - s.stream = nil - if sendErr != io.EOF { - err = sendErr - } +func (s *gRPCOneshotBidiWriteBufferSender) connect(ctx context.Context, cs gRPCBufSenderChans, opts ...gax.CallOption) { + s.streamErr = nil + ctx = gRPCWriteRequestParams{bucket: s.bucket}.apply(ctx) + stream, err := s.raw.BidiWriteObject(ctx, opts...) + if err != nil { + s.streamErr = err + close(cs.completions) return } - // Oneshot uploads assume all flushes succeed - if finishWrite { - s.stream.CloseSend() - // Oneshot uploads only read from the response stream on completion or - // failure - obj, err = drainInboundStream(s.stream) - s.stream = nil - if err == io.EOF { - err = nil + go func() { + firstSend := true + for r := range cs.requests { + if r.requestAck { + cs.requestAcks <- struct{}{} + continue + } + + req := bidiWriteObjectRequest(r.buf, r.offset, r.flush, r.finishWrite) + if firstSend { + proto.Merge(req, s.firstMessage) + firstSend = false + } + + if err := stream.Send(req); err != nil { + _, s.streamErr = drainInboundStream(stream) + if err != io.EOF { + s.streamErr = err + } + close(cs.completions) + return + } + + if r.finishWrite { + stream.CloseSend() + // Oneshot uploads only read from the response stream on completion or + // failure + obj, err := drainInboundStream(stream) + if obj == nil || err != io.EOF { + s.streamErr = err + } else { + cs.completions <- gRPCBidiWriteCompletion{flushOffset: obj.GetSize(), resource: obj} + } + close(cs.completions) + return + } + + // Oneshot uploads assume all flushes succeed + if r.flush { + cs.completions <- gRPCBidiWriteCompletion{flushOffset: r.offset + int64(len(r.buf))} + } } - } - return + }() } type gRPCResumableBidiWriteBufferSender struct { - queryRetry *retryConfig - upid string - progress func(int64) - raw *gapic.Client - forceFirstMessage bool - stream storagepb.Storage_BidiWriteObjectClient - flushOffset int64 - settings *settings -} - -func (w *gRPCWriter) newGRPCResumableBidiWriteBufferSender(ctx context.Context) (*gRPCResumableBidiWriteBufferSender, error) { - req := &storagepb.StartResumableWriteRequest{ - WriteObjectSpec: w.spec, - CommonObjectRequestParams: toProtoCommonObjectRequestParams(w.encryptionKey), - // TODO: Currently the checksums are only sent on the request to initialize - // the upload, but in the future, we must also support sending it - // on the *last* message of the stream. - ObjectChecksums: toProtoChecksums(w.sendCRC32C, w.attrs), - } + raw *gapic.Client + bucket string - var upid string - err := run(ctx, func(ctx context.Context) error { - upres, err := w.c.raw.StartResumableWrite(ctx, req, w.settings.gax...) - upid = upres.GetUploadId() - return err - }, w.settings.retry, w.settings.idempotent) - if err != nil { - return nil, err - } + startWriteRequest *storagepb.StartResumableWriteRequest + upid string - // Set up an initial connection for the 0 offset, so we don't query state - // unnecessarily for the first buffer. If we fail, we'll just retry in the - // normal connect path. - stream, err := w.c.raw.BidiWriteObject(ctx, w.settings.gax...) - if err != nil { - stream = nil - } + streamErr error +} +func (w *gRPCWriter) newGRPCResumableBidiWriteBufferSender() *gRPCResumableBidiWriteBufferSender { return &gRPCResumableBidiWriteBufferSender{ - queryRetry: w.settings.retry, - upid: upid, - progress: w.progress, - raw: w.c.raw, - forceFirstMessage: true, - stream: stream, - settings: w.settings, - }, nil + raw: w.c.raw, + bucket: w.bucket, + startWriteRequest: &storagepb.StartResumableWriteRequest{ + WriteObjectSpec: w.spec, + CommonObjectRequestParams: toProtoCommonObjectRequestParams(w.encryptionKey), + // TODO: Currently the checksums are only sent on the request to initialize + // the upload, but in the future, we must also support sending it + // on the *last* message of the stream. + ObjectChecksums: toProtoChecksums(w.sendCRC32C, w.attrs), + }, + } } -// queryProgress is a helper that queries the status of the resumable upload -// associated with the given upload ID. -func (s *gRPCResumableBidiWriteBufferSender) queryProgress(ctx context.Context) (int64, error) { - var persistedSize int64 - err := run(ctx, func(ctx context.Context) error { - q, err := s.raw.QueryWriteStatus(ctx, &storagepb.QueryWriteStatusRequest{ - UploadId: s.upid, - }, s.settings.gax...) - // q.GetPersistedSize() will return 0 if q is nil. - persistedSize = q.GetPersistedSize() - return err - }, s.queryRetry, true) +func (s *gRPCResumableBidiWriteBufferSender) err() error { return s.streamErr } - return persistedSize, err -} +func (s *gRPCResumableBidiWriteBufferSender) connect(ctx context.Context, cs gRPCBufSenderChans, opts ...gax.CallOption) { + s.streamErr = nil + ctx = gRPCWriteRequestParams{bucket: s.bucket}.apply(ctx) -func (s *gRPCResumableBidiWriteBufferSender) sendBuffer(ctx context.Context, buf []byte, offset int64, flush, finishWrite bool) (obj *storagepb.Object, err error) { - if s.stream == nil { - // Determine offset and reconnect - s.flushOffset, err = s.queryProgress(ctx) + if s.startWriteRequest != nil { + upres, err := s.raw.StartResumableWrite(ctx, s.startWriteRequest, opts...) if err != nil { + s.streamErr = err + close(cs.completions) return } - s.stream, err = s.raw.BidiWriteObject(ctx, s.settings.gax...) + s.upid = upres.GetUploadId() + s.startWriteRequest = nil + } else { + q, err := s.raw.QueryWriteStatus(ctx, &storagepb.QueryWriteStatusRequest{UploadId: s.upid}, opts...) if err != nil { + s.streamErr = err + close(cs.completions) return } - s.forceFirstMessage = true - } - - // clean up buf. We'll still write the message if a flush/finishWrite was - // requested. - if offset < s.flushOffset { - trim := s.flushOffset - offset - if int64(len(buf)) <= trim { - trim = int64(len(buf)) - } - buf = buf[trim:] - offset += trim - } - if len(buf) == 0 && !flush && !finishWrite { - // no need to send anything - return nil, nil - } - - req := bidiWriteObjectRequest(buf, offset, flush, finishWrite) - if s.forceFirstMessage { - req.FirstMessage = &storagepb.BidiWriteObjectRequest_UploadId{UploadId: s.upid} - s.forceFirstMessage = false - } - - sendErr := s.stream.Send(req) - if sendErr != nil { - obj, err = drainInboundStream(s.stream) - s.stream = nil - if err == io.EOF { - // This is unexpected - we got an error on Send(), but not on Recv(). - // Bubble up the sendErr. - err = sendErr - } - return + cs.completions <- gRPCBidiWriteCompletion{flushOffset: q.GetPersistedSize()} } - if finishWrite { - s.stream.CloseSend() - obj, err = drainInboundStream(s.stream) - s.stream = nil - if err == io.EOF { - err = nil - if obj.GetSize() > s.flushOffset { - s.progress(obj.GetSize()) - } - } + stream, err := s.raw.BidiWriteObject(ctx, opts...) + if err != nil { + s.streamErr = err + close(cs.completions) return } - if flush { - resp, err := s.stream.Recv() - if err != nil { - return nil, err - } - persistedOffset := resp.GetPersistedSize() - if persistedOffset > s.flushOffset { - s.flushOffset = persistedOffset - s.progress(s.flushOffset) - } - } - return -} + go func() { + var sendErr, recvErr error + sendDone := make(chan struct{}) + recvDone := make(chan struct{}) + + go func() { + sendErr = func() error { + firstSend := true + for { + select { + case <-recvDone: + // Because `requests` is not connected to the gRPC machinery, we + // have to check for asynchronous termination on the receive side. + return nil + case r, ok := <-cs.requests: + if !ok { + stream.CloseSend() + return nil + } + if r.requestAck { + cs.requestAcks <- struct{}{} + continue + } + req := bidiWriteObjectRequest(r.buf, r.offset, r.flush, r.finishWrite) + if firstSend { + req.FirstMessage = &storagepb.BidiWriteObjectRequest_UploadId{UploadId: s.upid} + firstSend = false + } + if err := stream.Send(req); err != nil { + return err + } + if r.finishWrite { + stream.CloseSend() + return nil + } + } + } + }() + close(sendDone) + }() -// uploadBuffer uploads the buffer at the given offset using a bi-directional -// Write stream. It will open a new stream if necessary (on the first call or -// after resuming from failure) and chunk the buffer per maxPerMessageWriteSize. -// The final Object is returned on success if doneReading is true. -// -// Returns object and any error that is not retriable. -func (w *gRPCWriter) uploadBuffer(ctx context.Context, recvd int, start int64, doneReading bool) (obj *storagepb.Object, err error) { - if w.streamSender == nil { - if w.append { - // Appendable object semantics - w.streamSender, err = w.newGRPCAppendableObjectBufferSender() - } else if doneReading || w.forceOneShot { - // One shot semantics - w.streamSender, err = w.newGRPCOneshotBidiWriteBufferSender() - } else { - // Resumable write semantics - w.streamSender, err = w.newGRPCResumableBidiWriteBufferSender(ctx) - } - if err != nil { - return - } - } + go func() { + recvErr = func() error { + for { + resp, err := stream.Recv() + if err != nil { + return err + } + if c := completion(resp); c != nil { + cs.completions <- *c + } + } + }() + close(recvDone) + }() - data := w.buf[:recvd] - offset := start - // We want to go through this loop at least once, in case we have to - // finishWrite with an empty buffer. - for { - // Send as much as we can fit into a single gRPC message. Only flush once, - // when sending the very last message. - l := maxPerMessageWriteSize - flush := false - if len(data) <= l { - l = len(data) - flush = true - } - obj, err = w.streamSender.sendBuffer(ctx, data[:l], offset, flush, flush && doneReading) - if err != nil { - return nil, err - } - data = data[l:] - offset += int64(l) - if len(data) == 0 { - // Update object size to match persisted offset. - if obj != nil { - obj.Size = offset - } - break + <-sendDone + <-recvDone + // Prefer recvErr since that's where RPC errors are delivered + if recvErr != nil { + s.streamErr = recvErr + } else if sendErr != nil { + s.streamErr = sendErr } - } - return -} - -// read copies the data in the reader to the given buffer and reports how much -// data was read into the buffer and if there is no more data to read (EOF). -// read returns when either 1. the buffer is full, 2. Writer.Flush was called, -// or 3. Writer.Close was called. -func (w *gRPCWriter) read() (int, bool, error) { - // Set n to -1 to start the Read loop. - var n, recvd int = -1, 0 - var err error - for err == nil && n != 0 { - // The routine blocks here until data is received. - n, err = w.reader.Read(w.buf[recvd:]) - recvd += n - } - var done bool - if err == io.EOF { - err = nil - // EOF can come from Writer.Flush or Writer.Close. - if w.iw.flushInProgress { - // Reset pipe for additional writes after the flush. - pr, pw := io.Pipe() - w.reader = pr - w.pr = pr - w.iw.pw = pw - } else { - done = true + if s.streamErr == io.EOF { + s.streamErr = nil } - } - return recvd, done, err + close(cs.completions) + }() } -func checkCanceled(err error) error { - if status.Code(err) == codes.Canceled { - return context.Canceled - } +type gRPCAppendBidiWriteBufferSender struct { + raw *gapic.Client + bucket string + routingToken *string - return err -} + firstMessage *storagepb.BidiWriteObjectRequest -type gRPCAppendBidiWriteBufferSender struct { - bucket string - routingToken *string - raw *gapic.Client - settings *settings - stream storagepb.Storage_BidiWriteObjectClient - firstMessage *storagepb.BidiWriteObjectRequest objectChecksums *storagepb.ObjectChecksums - finalizeOnClose bool + objResource *storagepb.Object - forceFirstMessage bool - progress func(int64) - flushOffset int64 - takeoverOffset int64 - objResource *storagepb.Object // Captures received obj to set w.Attrs. - - // Fields used to report responses from the receive side of the stream - // recvs is closed when the current recv goroutine is complete. recvErr is set - // to the result of that stream (including io.EOF to indicate success) - recvs <-chan *storagepb.BidiWriteObjectResponse - recvErr error + streamErr error } +func (s *gRPCAppendBidiWriteBufferSender) err() error { return s.streamErr } + // Use for a newly created appendable object. -func (w *gRPCWriter) newGRPCAppendableObjectBufferSender() (*gRPCAppendBidiWriteBufferSender, error) { - s := &gRPCAppendBidiWriteBufferSender{ - bucket: w.spec.GetResource().GetBucket(), - raw: w.c.raw, - settings: w.settings, +func (w *gRPCWriter) newGRPCAppendableObjectBufferSender() *gRPCAppendBidiWriteBufferSender { + return &gRPCAppendBidiWriteBufferSender{ + raw: w.c.raw, + bucket: w.bucket, firstMessage: &storagepb.BidiWriteObjectRequest{ FirstMessage: &storagepb.BidiWriteObjectRequest_WriteObjectSpec{ WriteObjectSpec: w.spec, }, CommonObjectRequestParams: toProtoCommonObjectRequestParams(w.encryptionKey), }, - objectChecksums: toProtoChecksums(w.sendCRC32C, w.attrs), - finalizeOnClose: w.finalizeOnClose, - forceFirstMessage: true, - progress: w.progress, - flushOffset: -1, // We should ack flushes to length 0. + objectChecksums: toProtoChecksums(w.sendCRC32C, w.attrs), + finalizeOnClose: w.finalizeOnClose, } - return s, nil } -// Use for a takeover of an appendable object. -// Unlike newGRPCAppendableObjectBufferSender, this blocks until the stream is -// open because it needs to get the append offset from the server. -func (w *gRPCWriter) newGRPCAppendTakeoverWriteBufferSender(ctx context.Context) (*gRPCAppendBidiWriteBufferSender, error) { - s := &gRPCAppendBidiWriteBufferSender{ - bucket: w.spec.GetResource().GetBucket(), - raw: w.c.raw, - settings: w.settings, - firstMessage: &storagepb.BidiWriteObjectRequest{ - FirstMessage: &storagepb.BidiWriteObjectRequest_AppendObjectSpec{ - AppendObjectSpec: w.appendSpec, - }, - }, - objectChecksums: toProtoChecksums(w.sendCRC32C, w.attrs), - finalizeOnClose: w.finalizeOnClose, - forceFirstMessage: true, - progress: w.progress, - } - if err := s.connect(ctx); err != nil { - return nil, fmt.Errorf("storage: opening appendable write stream: %w", err) - } - _, err := s.sendOnConnectedStream(nil, 0, false, false, true) +func (s *gRPCAppendBidiWriteBufferSender) connect(ctx context.Context, cs gRPCBufSenderChans, opts ...gax.CallOption) { + s.streamErr = nil + ctx = gRPCWriteRequestParams{appendable: true, bucket: s.bucket, routingToken: s.routingToken}.apply(ctx) + + stream, err := s.raw.BidiWriteObject(ctx, opts...) if err != nil { - return nil, err - } - firstResp := <-s.recvs - // Check recvErr after getting the response. - if s.recvErr != nil { - return nil, s.recvErr + s.streamErr = err + close(cs.completions) + return } - // Object resource is returned in the first response on takeover, so capture - // this now. - s.objResource = firstResp.GetResource() - s.takeoverOffset = firstResp.GetResource().GetSize() - return s, nil + go s.handleStream(stream, cs, true) } -func (s *gRPCAppendBidiWriteBufferSender) connect(ctx context.Context) (err error) { - err = func() error { - // If this is a forced first message, we've already determined it's safe to - // send. - if s.forceFirstMessage { - s.forceFirstMessage = false - return nil - } - - // It's always ok to reconnect if there is a handle. This is the common - // case. - if s.firstMessage.GetAppendObjectSpec().GetWriteHandle() != nil { - return nil - } - // Also always okay to reconnect if there is a generation. - if s.firstMessage.GetAppendObjectSpec().GetGeneration() != 0 { - return nil - } - // Also always ok to reconnect if we've seen a redirect token - if s.routingToken != nil { - return nil - } - - // We can also reconnect if the first message has an if_generation_match or - // if_metageneration_match condition. Note that negative conditions like - // if_generation_not_match are not necessarily safe to retry. - aos := s.firstMessage.GetAppendObjectSpec() - wos := s.firstMessage.GetWriteObjectSpec() +func (s *gRPCAppendBidiWriteBufferSender) handleStream(stream storagepb.Storage_BidiWriteObjectClient, cs gRPCBufSenderChans, firstSend bool) { + var sendErr, recvErr error + sendDone := make(chan struct{}) + recvDone := make(chan struct{}) - if aos != nil && aos.IfMetagenerationMatch != nil { - return nil - } + go func() { + sendErr = func() error { + for { + select { + case <-recvDone: + // Because `requests` is not connected to the gRPC machinery, we + // have to check for asynchronous termination on the receive side. + return nil + case r, ok := <-cs.requests: + if !ok { + stream.CloseSend() + return nil + } + if r.requestAck { + cs.requestAcks <- struct{}{} + continue + } + err := s.send(stream, r.buf, r.offset, r.flush, r.finishWrite, firstSend) + firstSend = false + if err != nil { + return err + } + if r.finishWrite { + stream.CloseSend() + return nil + } + } + } + }() + close(sendDone) + }() - if wos != nil && wos.IfGenerationMatch != nil { - return nil - } - if wos != nil && wos.IfMetagenerationMatch != nil { - return nil - } + go func() { + recvErr = func() error { + for { + resp, err := stream.Recv() + if err != nil { + return s.maybeHandleRedirectionError(err) + } + s.maybeUpdateFirstMessage(resp) - // Otherwise, it is not safe to reconnect. - return errors.New("cannot safely reconnect; no write handle or preconditions") + if c := completion(resp); c != nil { + cs.completions <- *c + } + } + }() + close(recvDone) }() - if err != nil { - return err + + <-sendDone + <-recvDone + // Prefer recvErr since that's where RPC errors are delivered + if recvErr != nil { + s.streamErr = recvErr + } else if sendErr != nil { + s.streamErr = sendErr + } + if s.streamErr == io.EOF { + s.streamErr = nil } + close(cs.completions) +} + +type gRPCAppendTakeoverBidiWriteBufferSender struct { + gRPCAppendBidiWriteBufferSender + takeoverReported bool + setTakeoverOffset func(int64) +} - return s.startReceiver(ctx) +func writeObjectSpecAsAppendObjectSpec(s *storagepb.WriteObjectSpec, gen int64) *storagepb.AppendObjectSpec { + return &storagepb.AppendObjectSpec{ + Bucket: s.GetResource().GetBucket(), + Object: s.GetResource().GetName(), + Generation: gen, + IfMetagenerationMatch: s.IfMetagenerationMatch, + IfMetagenerationNotMatch: s.IfMetagenerationNotMatch, + } } -func (s *gRPCAppendBidiWriteBufferSender) withRequestParams(ctx context.Context) context.Context { - param := fmt.Sprintf("appendable=true&bucket=%s", s.bucket) - if s.routingToken != nil { - param = param + fmt.Sprintf("&routing_token=%s", *s.routingToken) +// Use for a takeover of an appendable object. +func (w *gRPCWriter) newGRPCAppendTakeoverWriteBufferSender() *gRPCAppendTakeoverBidiWriteBufferSender { + return &gRPCAppendTakeoverBidiWriteBufferSender{ + gRPCAppendBidiWriteBufferSender: gRPCAppendBidiWriteBufferSender{ + raw: w.c.raw, + bucket: w.bucket, + firstMessage: &storagepb.BidiWriteObjectRequest{ + FirstMessage: &storagepb.BidiWriteObjectRequest_AppendObjectSpec{ + AppendObjectSpec: writeObjectSpecAsAppendObjectSpec(w.spec, w.appendGen), + }, + }, + objectChecksums: toProtoChecksums(w.sendCRC32C, w.attrs), + finalizeOnClose: w.finalizeOnClose, + }, + takeoverReported: false, + setTakeoverOffset: w.setTakeoverOffset, } - return gax.InsertMetadataIntoOutgoingContext(ctx, "x-goog-request-params", param) } -func (s *gRPCAppendBidiWriteBufferSender) startReceiver(ctx context.Context) (err error) { - s.stream, err = s.raw.BidiWriteObject(s.withRequestParams(ctx), s.settings.gax...) +func (s *gRPCAppendTakeoverBidiWriteBufferSender) connect(ctx context.Context, cs gRPCBufSenderChans, opts ...gax.CallOption) { + s.streamErr = nil + ctx = gRPCWriteRequestParams{appendable: true, bucket: s.bucket, routingToken: s.routingToken}.apply(ctx) + + stream, err := s.raw.BidiWriteObject(ctx, opts...) if err != nil { + s.streamErr = err + close(cs.completions) return } - recvs := make(chan *storagepb.BidiWriteObjectResponse) - s.recvs = recvs - s.recvErr = nil - go s.receiveMessages(recvs) - return + // This blocks until we know the takeover offset from the server on first + // connection + firstSend := true + if !s.takeoverReported { + if err := s.send(stream, nil, 0, false, false, true); err != nil { + s.streamErr = err + close(cs.completions) + return + } + firstSend = false + + resp, err := stream.Recv() + if err != nil { + s.streamErr = err + close(cs.completions) + return + } + + c := completion(resp) + if c == nil { + s.streamErr = fmt.Errorf("storage: unexpectedly no size in initial takeover response %+v", resp) + close(cs.completions) + return + } + + s.setTakeoverOffset(c.flushOffset) + s.takeoverReported = true + cs.completions <- *c + } + + go s.handleStream(stream, cs, firstSend) } func (s *gRPCAppendBidiWriteBufferSender) ensureFirstMessageAppendObjectSpec() { @@ -865,72 +1282,40 @@ func (e bidiWriteObjectRedirectionError) Error() string { return "" } -func (s *gRPCAppendBidiWriteBufferSender) handleRedirectionError(e *storagepb.BidiWriteObjectRedirectedError) bool { - if e.RoutingToken == nil { - // This shouldn't happen, but we don't want to blindly retry here. Instead, - // surface the error to the caller. - return false - } - - if e.WriteHandle != nil { - // If we get back a write handle, we should use it. We can only use it - // on an append object spec. - s.ensureFirstMessageAppendObjectSpec() - s.firstMessage.GetAppendObjectSpec().WriteHandle = e.WriteHandle - // Generation is meant to only come with the WriteHandle, so ignore it - // otherwise. - if e.Generation != nil { - s.firstMessage.GetAppendObjectSpec().Generation = e.GetGeneration() - } - } - - s.routingToken = e.RoutingToken - return true -} - -func (s *gRPCAppendBidiWriteBufferSender) receiveMessages(resps chan<- *storagepb.BidiWriteObjectResponse) { - resp, err := s.stream.Recv() - for err == nil { - s.maybeUpdateFirstMessage(resp) - - if resp.WriteStatus != nil { - // We only get a WriteStatus if this was a solicited message (either - // state_lookup: true or finish_write: true). Unsolicited messages may - // arrive to update our handle if necessary. We don't want to block on - // this channel write if this was an unsolicited message. - resps <- resp - } - - resp, err = s.stream.Recv() - } - +func (s *gRPCAppendBidiWriteBufferSender) maybeHandleRedirectionError(err error) error { if st, ok := status.FromError(err); ok && st.Code() == codes.Aborted { for _, d := range st.Details() { if e, ok := d.(*storagepb.BidiWriteObjectRedirectedError); ok { - // If we can handle this error, wrap it with the sentinel so it gets - // retried. - if ok := s.handleRedirectionError(e); ok { - err = fmt.Errorf("%w%w", bidiWriteObjectRedirectionError{}, err) + if e.RoutingToken == nil { + // This shouldn't happen, but we don't want to blindly retry here. + // Instead, surface the error to the caller. + return err + } + + if e.WriteHandle != nil { + // If we get back a write handle, we should use it. We can only use it + // on an append object spec. + s.ensureFirstMessageAppendObjectSpec() + s.firstMessage.GetAppendObjectSpec().WriteHandle = e.WriteHandle + // Generation is meant to only come with the WriteHandle, so ignore it + // otherwise. + if e.Generation != nil { + s.firstMessage.GetAppendObjectSpec().Generation = e.GetGeneration() + } } + + s.routingToken = e.RoutingToken + return fmt.Errorf("%w%w", bidiWriteObjectRedirectionError{}, err) } } } - - // TODO: automatically reconnect on retriable recv errors, even if there are - // no sends occurring. - s.recvErr = err - close(resps) + return err } -func (s *gRPCAppendBidiWriteBufferSender) sendOnConnectedStream(buf []byte, offset int64, flush, finishWrite, sendFirstMessage bool) (obj *storagepb.Object, err error) { - var req *storagepb.BidiWriteObjectRequest +func (s *gRPCAppendBidiWriteBufferSender) send(stream storagepb.Storage_BidiWriteObjectClient, buf []byte, offset int64, flush, finishWrite, sendFirstMessage bool) error { finalizeObject := finishWrite && s.finalizeOnClose - if finishWrite { - // Always flush when finishing the Write, even if not finalizing. - req = bidiWriteObjectRequest(buf, offset, true, finalizeObject) - } else { - req = bidiWriteObjectRequest(buf, offset, flush, false) - } + flush = flush || finishWrite + req := bidiWriteObjectRequest(buf, offset, flush, finalizeObject) if finalizeObject { // appendable objects pass checksums on the finalize message only req.ObjectChecksums = s.objectChecksums @@ -939,90 +1324,84 @@ func (s *gRPCAppendBidiWriteBufferSender) sendOnConnectedStream(buf []byte, offs proto.Merge(req, s.firstMessage) } - if err = s.stream.Send(req); err != nil { - return nil, err - } + return stream.Send(req) +} - if finishWrite { - s.stream.CloseSend() - for resp := range s.recvs { - if resp.GetResource() != nil { - obj = resp.GetResource() - } - // When closing the stream, update the object resource to reflect - // the persisted size. We get a new object from the stream if - // the object was finalized, but not if it's unfinalized. - if s.objResource != nil && resp.GetPersistedSize() > 0 { - s.objResource.Size = resp.GetPersistedSize() - } - } - if s.recvErr != io.EOF { - return nil, s.recvErr - } - if obj.GetSize() > s.flushOffset { - s.flushOffset = obj.GetSize() - s.progress(s.flushOffset) - } - return +func checkCanceled(err error) error { + if status.Code(err) == codes.Canceled { + return context.Canceled } - if flush { - // We don't necessarily expect multiple responses for a single flush, but - // this allows the server to send multiple responses if it wants to. - flushOffset := s.flushOffset + return err +} - for flushOffset < offset+int64(len(buf)) { - resp, ok := <-s.recvs - if !ok { - return nil, s.recvErr - } - pSize := resp.GetPersistedSize() - rSize := resp.GetResource().GetSize() - if flushOffset < pSize { - flushOffset = pSize - } - if flushOffset < rSize { - flushOffset = rSize - } - // On the first flush, we expect to get an object resource back and - // should return it. - if resp.GetResource() != nil { - obj = resp.GetResource() - } - } - if s.flushOffset < flushOffset { - s.flushOffset = flushOffset - s.progress(s.flushOffset) - } +// gRPCChunkSize returns the chunk size to use based on the requested chunk +// size. +// +// The chunk size returned is always greater than 0 and a multiple of +// googleapi.MinUploadChunkSize +func gRPCChunkSize(requestSize int) int { + size := googleapi.MinUploadChunkSize + if requestSize > size { + size = requestSize + } + + if size%googleapi.MinUploadChunkSize != 0 { + size += googleapi.MinUploadChunkSize - (size % googleapi.MinUploadChunkSize) } - return + return size } -func (s *gRPCAppendBidiWriteBufferSender) sendBuffer(ctx context.Context, buf []byte, offset int64, flush, finishWrite bool) (obj *storagepb.Object, err error) { - for { - sendFirstMessage := false - if s.stream == nil { - sendFirstMessage = true - if err = s.connect(ctx); err != nil { - return - } - } +type gRPCWriteRequestParams struct { + appendable bool + bucket string + routingToken *string +} - obj, err = s.sendOnConnectedStream(buf, offset, flush, finishWrite, sendFirstMessage) - if obj != nil { - s.objResource = obj - } - if err == nil { - return - } +func (p gRPCWriteRequestParams) apply(ctx context.Context) context.Context { + hds := make([]string, 0, 3) + if p.appendable { + hds = append(hds, "appendable=true") + } + if p.bucket != "" { + hds = append(hds, fmt.Sprintf("bucket=projects/_/buckets/%s", url.QueryEscape(p.bucket))) + } + if p.routingToken != nil { + hds = append(hds, fmt.Sprintf("routing_token=%s", *p.routingToken)) + } + return gax.InsertMetadataIntoOutgoingContext(ctx, "x-goog-request-params", strings.Join(hds, "&")) +} - // await recv stream termination - for range s.recvs { +func withBidiWriteObjectRedirectionErrorRetries(s *settings) (newr *retryConfig) { + oldr := s.retry + newr = oldr.clone() + if newr == nil { + newr = &retryConfig{} + } + if (oldr.policy == RetryIdempotent && !s.idempotent) || oldr.policy == RetryNever { + // We still retry redirection errors even when settings indicate not to + // retry. + // + // The protocol requires us to respect redirection errors, so RetryNever has + // to ignore them. + // + // Idempotency is always protected by redirection errors: they either + // contain a handle which can be used as idempotency information, or they do + // not contain a handle and are "affirmative failures" which indicate that + // no server-side action occurred. + newr.policy = RetryAlways + newr.shouldRetry = func(err error) bool { + return errors.Is(err, bidiWriteObjectRedirectionError{}) } - if s.recvErr != io.EOF { - err = s.recvErr + return newr + } + // If retry settings allow retries normally, fall back to that behavior. + newr.shouldRetry = func(err error) bool { + if errors.Is(err, bidiWriteObjectRedirectionError{}) { + return true } - s.stream = nil - return + v := oldr.runShouldRetry(err) + return v } + return newr } diff --git a/ci/resources/stemcell-version-bump/vendor/cloud.google.com/go/storage/internal/experimental.go b/ci/resources/stemcell-version-bump/vendor/cloud.google.com/go/storage/internal/experimental.go index 4482eab8a..b8c5731fe 100644 --- a/ci/resources/stemcell-version-bump/vendor/cloud.google.com/go/storage/internal/experimental.go +++ b/ci/resources/stemcell-version-bump/vendor/cloud.google.com/go/storage/internal/experimental.go @@ -26,6 +26,10 @@ var ( // Set an alternate client-side metric Exporter to emit metrics through. WithMetricExporter any // func (*metric.Exporter) option.ClientOption + // WithMeterProvider is a function which is implemented by storage package. + // Set an alternate client-side meter provider to emit metrics through. + WithMeterProvider any // func (*metric.MeterProvider) option.ClientOption + // WithReadStallTimeout is a function which is implemented by storage package. // It takes ReadStallTimeoutConfig as inputs and returns a option.ClientOption. WithReadStallTimeout any // func (*ReadStallTimeoutConfig) option.ClientOption diff --git a/ci/resources/stemcell-version-bump/vendor/cloud.google.com/go/storage/internal/version.go b/ci/resources/stemcell-version-bump/vendor/cloud.google.com/go/storage/internal/version.go index c69bbfcc8..07cfc2811 100644 --- a/ci/resources/stemcell-version-bump/vendor/cloud.google.com/go/storage/internal/version.go +++ b/ci/resources/stemcell-version-bump/vendor/cloud.google.com/go/storage/internal/version.go @@ -15,4 +15,4 @@ package internal // Version is the current tagged release of the library. -const Version = "1.56.2" +const Version = "1.57.0" diff --git a/ci/resources/stemcell-version-bump/vendor/cloud.google.com/go/storage/option.go b/ci/resources/stemcell-version-bump/vendor/cloud.google.com/go/storage/option.go index 6548cc18e..6afc66938 100644 --- a/ci/resources/stemcell-version-bump/vendor/cloud.google.com/go/storage/option.go +++ b/ci/resources/stemcell-version-bump/vendor/cloud.google.com/go/storage/option.go @@ -39,6 +39,7 @@ func init() { // initialize experimental options storageinternal.WithMetricExporter = withMetricExporter storageinternal.WithMetricInterval = withMetricInterval + storageinternal.WithMeterProvider = withMeterProvider storageinternal.WithReadStallTimeout = withReadStallTimeout storageinternal.WithGRPCBidiReads = withGRPCBidiReads storageinternal.WithZonalBucketAPIs = withZonalBucketAPIs @@ -81,6 +82,7 @@ type storageConfig struct { disableClientMetrics bool metricExporter *metric.Exporter metricInterval time.Duration + meterProvider *metric.MeterProvider manualReader *metric.ManualReader readStallTimeoutConfig *experimental.ReadStallTimeoutConfig grpcBidiReads bool @@ -203,6 +205,20 @@ type withTestMetricReaderConfig struct { metricReader *metric.ManualReader } +type withMeterProviderConfig struct { + internaloption.EmbeddableAdapter + // meter provider override + meterProvider *metric.MeterProvider +} + +func withMeterProvider(provider *metric.MeterProvider) option.ClientOption { + return &withMeterProviderConfig{meterProvider: provider} +} + +func (w *withMeterProviderConfig) ApplyStorageOpt(c *storageConfig) { + c.meterProvider = w.meterProvider +} + func withTestMetricReader(ex *metric.ManualReader) option.ClientOption { return &withTestMetricReaderConfig{metricReader: ex} } diff --git a/ci/resources/stemcell-version-bump/vendor/cloud.google.com/go/storage/post_policy_v4.go b/ci/resources/stemcell-version-bump/vendor/cloud.google.com/go/storage/post_policy_v4.go index 6bc73fb7a..4b2feea95 100644 --- a/ci/resources/stemcell-version-bump/vendor/cloud.google.com/go/storage/post_policy_v4.go +++ b/ci/resources/stemcell-version-bump/vendor/cloud.google.com/go/storage/post_policy_v4.go @@ -252,9 +252,6 @@ func GenerateSignedPostPolicyV4(bucket, object string, opts *PostPolicyV4Options if bucket == "" { return nil, errors.New("storage: bucket must be non-empty") } - if object == "" { - return nil, errors.New("storage: object must be non-empty") - } now := utcNow() if err := validatePostPolicyV4Options(opts, now); err != nil { return nil, err diff --git a/ci/resources/stemcell-version-bump/vendor/cloud.google.com/go/storage/storage.go b/ci/resources/stemcell-version-bump/vendor/cloud.google.com/go/storage/storage.go index 41cf96072..2f4e093d8 100644 --- a/ci/resources/stemcell-version-bump/vendor/cloud.google.com/go/storage/storage.go +++ b/ci/resources/stemcell-version-bump/vendor/cloud.google.com/go/storage/storage.go @@ -1288,12 +1288,14 @@ func (o *ObjectHandle) NewWriterFromAppendableObject(ctx context.Context, opts * if o.gen < 0 { return nil, 0, errors.New("storage: ObjectHandle.Generation must be set to use NewWriterFromAppendableObject") } + toc := make(chan int64) w := &Writer{ - ctx: ctx, - o: o, - donec: make(chan struct{}), - ObjectAttrs: ObjectAttrs{Name: o.object}, - Append: true, + ctx: ctx, + o: o, + donec: make(chan struct{}), + ObjectAttrs: ObjectAttrs{Name: o.object}, + Append: true, + setTakeoverOffset: func(to int64) { toc <- to }, } opts.apply(w) if w.ChunkSize == 0 { @@ -1303,7 +1305,16 @@ func (o *ObjectHandle) NewWriterFromAppendableObject(ctx context.Context, opts * if err != nil { return nil, 0, err } - return w, w.takeoverOffset, nil + // Block until we discover the takeover offset, or the stream fails + select { + case to, ok := <-toc: + if !ok { + return nil, 0, errors.New("storage: unexpectedly did not discover takeover offset") + } + return w, to, nil + case <-w.donec: + return nil, 0, w.err + } } // AppendableWriterOpts provides options to set on a Writer initialized diff --git a/ci/resources/stemcell-version-bump/vendor/cloud.google.com/go/storage/writer.go b/ci/resources/stemcell-version-bump/vendor/cloud.google.com/go/storage/writer.go index bc0893eea..e7c8ed51b 100644 --- a/ci/resources/stemcell-version-bump/vendor/cloud.google.com/go/storage/writer.go +++ b/ci/resources/stemcell-version-bump/vendor/cloud.google.com/go/storage/writer.go @@ -159,9 +159,9 @@ type Writer struct { donec chan struct{} // closed after err and obj are set. obj *ObjectAttrs - mu sync.Mutex - err error - takeoverOffset int64 // offset from which the writer started appending to the object. + mu sync.Mutex + err error + setTakeoverOffset func(int64) } // Write appends to w. It implements the io.Writer interface. @@ -297,7 +297,7 @@ func (w *Writer) openWriter() (err error) { w.obj.Size = n } }, - setTakeoverOffset: func(n int64) { w.takeoverOffset = n }, + setTakeoverOffset: w.setTakeoverOffset, forceEmptyContentType: w.ForceEmptyContentType, } if err := w.ctx.Err(); err != nil { diff --git a/ci/resources/stemcell-version-bump/vendor/modules.txt b/ci/resources/stemcell-version-bump/vendor/modules.txt index 19b862a83..471e3539e 100644 --- a/ci/resources/stemcell-version-bump/vendor/modules.txt +++ b/ci/resources/stemcell-version-bump/vendor/modules.txt @@ -39,8 +39,8 @@ cloud.google.com/go/iam/apiv1/iampb cloud.google.com/go/monitoring/apiv3/v2 cloud.google.com/go/monitoring/apiv3/v2/monitoringpb cloud.google.com/go/monitoring/internal -# cloud.google.com/go/storage v1.56.2 -## explicit; go 1.23.0 +# cloud.google.com/go/storage v1.57.0 +## explicit; go 1.24.0 cloud.google.com/go/storage cloud.google.com/go/storage/experimental cloud.google.com/go/storage/internal