From 2c6c1be3a2f7ec3d6bdbf9d660550f7d817355a5 Mon Sep 17 00:00:00 2001 From: "dependabot[bot]" <49699333+dependabot[bot]@users.noreply.github.com> Date: Tue, 13 Jan 2026 01:51:07 +0000 Subject: [PATCH] chore(deps): Bump cloud.google.com/go/storage Bumps [cloud.google.com/go/storage](https://github.com/googleapis/google-cloud-go) from 1.58.0 to 1.59.0. - [Release notes](https://github.com/googleapis/google-cloud-go/releases) - [Changelog](https://github.com/googleapis/google-cloud-go/blob/main/CHANGES.md) - [Commits](https://github.com/googleapis/google-cloud-go/compare/spanner/v1.58.0...spanner/v1.59.0) --- updated-dependencies: - dependency-name: cloud.google.com/go/storage dependency-version: 1.59.0 dependency-type: direct:production update-type: version-update:semver-minor ... Signed-off-by: dependabot[bot] --- ci/resources/stemcell-version-bump/go.mod | 2 +- ci/resources/stemcell-version-bump/go.sum | 4 +- .../cloud.google.com/go/storage/CHANGES.md | 10 + .../vendor/cloud.google.com/go/storage/doc.go | 4 + .../go/storage/emulator_test.sh | 13 +- .../go/storage/grpc_client.go | 545 ----------- .../go/storage/grpc_reader_multi_range.go | 901 +++++++++++++++++- .../go/storage/grpc_writer.go | 105 +- .../go/storage/internal/version.go | 4 +- .../vendor/cloud.google.com/go/storage/pcu.go | 364 +++++++ .../cloud.google.com/go/storage/reader.go | 62 +- .../cloud.google.com/go/storage/writer.go | 7 +- .../stemcell-version-bump/vendor/modules.txt | 2 +- 13 files changed, 1384 insertions(+), 639 deletions(-) create mode 100644 ci/resources/stemcell-version-bump/vendor/cloud.google.com/go/storage/pcu.go diff --git a/ci/resources/stemcell-version-bump/go.mod b/ci/resources/stemcell-version-bump/go.mod index 397da198f..f4f53c079 100644 --- a/ci/resources/stemcell-version-bump/go.mod +++ b/ci/resources/stemcell-version-bump/go.mod @@ -5,7 +5,7 @@ go 1.24.0 toolchain go1.24.1 require ( - cloud.google.com/go/storage v1.58.0 + cloud.google.com/go/storage v1.59.0 github.com/stretchr/testify v1.11.1 google.golang.org/api v0.257.0 ) diff --git a/ci/resources/stemcell-version-bump/go.sum b/ci/resources/stemcell-version-bump/go.sum index 2893ad8c5..4be2527c5 100644 --- a/ci/resources/stemcell-version-bump/go.sum +++ b/ci/resources/stemcell-version-bump/go.sum @@ -16,8 +16,8 @@ cloud.google.com/go/longrunning v0.7.0 h1:FV0+SYF1RIj59gyoWDRi45GiYUMM3K1qO51qob cloud.google.com/go/longrunning v0.7.0/go.mod h1:ySn2yXmjbK9Ba0zsQqunhDkYi0+9rlXIwnoAf+h+TPY= cloud.google.com/go/monitoring v1.24.2 h1:5OTsoJ1dXYIiMiuL+sYscLc9BumrL3CarVLL7dd7lHM= cloud.google.com/go/monitoring v1.24.2/go.mod h1:x7yzPWcgDRnPEv3sI+jJGBkwl5qINf+6qY4eq0I9B4U= -cloud.google.com/go/storage v1.58.0 h1:PflFXlmFJjG/nBeR9B7pKddLQWaFaRWx4uUi/LyNxxo= -cloud.google.com/go/storage v1.58.0/go.mod h1:cMWbtM+anpC74gn6qjLh+exqYcfmB9Hqe5z6adx+CLI= +cloud.google.com/go/storage v1.59.0 h1:9p3yDzEN9Vet4JnbN90FECIw6n4FCXcKBK1scxtQnw8= +cloud.google.com/go/storage v1.59.0/go.mod h1:cMWbtM+anpC74gn6qjLh+exqYcfmB9Hqe5z6adx+CLI= cloud.google.com/go/trace v1.11.6 h1:2O2zjPzqPYAHrn3OKl029qlqG6W8ZdYaOWRyr8NgMT4= cloud.google.com/go/trace v1.11.6/go.mod h1:GA855OeDEBiBMzcckLPE2kDunIpC72N+Pq8WFieFjnI= github.com/GoogleCloudPlatform/opentelemetry-operations-go/detectors/gcp v1.30.0 h1:sBEjpZlNHzK1voKq9695PJSX2o5NEXl7/OL3coiIY0c= diff --git a/ci/resources/stemcell-version-bump/vendor/cloud.google.com/go/storage/CHANGES.md b/ci/resources/stemcell-version-bump/vendor/cloud.google.com/go/storage/CHANGES.md index 249035935..e0ce3d07d 100644 --- a/ci/resources/stemcell-version-bump/vendor/cloud.google.com/go/storage/CHANGES.md +++ b/ci/resources/stemcell-version-bump/vendor/cloud.google.com/go/storage/CHANGES.md @@ -1,6 +1,16 @@ # Changes +## [1.59.0](https://github.com/googleapis/google-cloud-go/releases/tag/storage%2Fv1.59.0) (2026-01-09) + +### Features + +* add default checksums for appendable writer (excludes appendable takeover writer) (#13379) ([647baf3](https://github.com/googleapis/google-cloud-go/commit/647baf3249b01e7d5eb5902197bb828706c4c08f)) + +### Bug Fixes + +* refactor MultiRangeDownloader to resolve deadlock and race conditions (#13524) ([1cfd100](https://github.com/googleapis/google-cloud-go/commit/1cfd10089f206bca0bdcef1e873574b552ae6abb) + ## [1.58.0](https://github.com/googleapis/google-cloud-go/releases/tag/storage%2Fv1.58.0) (2025-12-03) ### Features diff --git a/ci/resources/stemcell-version-bump/vendor/cloud.google.com/go/storage/doc.go b/ci/resources/stemcell-version-bump/vendor/cloud.google.com/go/storage/doc.go index 3726432c8..d0434db29 100644 --- a/ci/resources/stemcell-version-bump/vendor/cloud.google.com/go/storage/doc.go +++ b/ci/resources/stemcell-version-bump/vendor/cloud.google.com/go/storage/doc.go @@ -407,6 +407,10 @@ roles which must be enabled in order to do the export successfully. To disable this export, you can use the [WithDisabledClientMetrics] client option. +The gRPC client automatically computes and sends CRC32C checksums for uploads using [Writer], +which provides an additional layer of data integrity validation when compared to the HTTP client. +This behavior can optionally be disabled by using [Writer.DisableAutoChecksum]. + # Storage Control API Certain control plane and long-running operations for Cloud Storage (including Folder diff --git a/ci/resources/stemcell-version-bump/vendor/cloud.google.com/go/storage/emulator_test.sh b/ci/resources/stemcell-version-bump/vendor/cloud.google.com/go/storage/emulator_test.sh index 4d8da2eeb..6c49ed133 100644 --- a/ci/resources/stemcell-version-bump/vendor/cloud.google.com/go/storage/emulator_test.sh +++ b/ci/resources/stemcell-version-bump/vendor/cloud.google.com/go/storage/emulator_test.sh @@ -31,6 +31,7 @@ if [ "$minor_ver" -lt "$min_minor_ver" ]; then exit 0 fi +export DOCKER_API_VERSION=1.39 export STORAGE_EMULATOR_HOST="http://localhost:9000" export STORAGE_EMULATOR_HOST_GRPC="localhost:8888" @@ -66,6 +67,7 @@ function cleanup() { docker stop $CONTAINER_NAME unset STORAGE_EMULATOR_HOST; unset STORAGE_EMULATOR_HOST_GRPC; + unset DOCKER_API_VERSION } trap cleanup EXIT @@ -89,5 +91,12 @@ then fi # Run tests - -go test -v -timeout 17m ./ ./dataflux -run="^Test(RetryConformance|.*Emulated)$" -short -race 2>&1 | tee -a sponge_log.log +gotestsum --packages="./ ./dataflux" \ + --junitfile sponge_log_emulator.xml \ + --format standard-verbose \ + -- \ + -timeout 17m \ + -run="^Test(RetryConformance|.*Emulated)$" \ + -short \ + -race \ + 2>&1 | tee -a sponge_log.log \ No newline at end of file diff --git a/ci/resources/stemcell-version-bump/vendor/cloud.google.com/go/storage/grpc_client.go b/ci/resources/stemcell-version-bump/vendor/cloud.google.com/go/storage/grpc_client.go index 7dc3d0a18..dfe387037 100644 --- a/ci/resources/stemcell-version-bump/vendor/cloud.google.com/go/storage/grpc_client.go +++ b/ci/resources/stemcell-version-bump/vendor/cloud.google.com/go/storage/grpc_client.go @@ -24,7 +24,6 @@ import ( "log" "os" "strconv" - "sync" "cloud.google.com/go/iam/apiv1/iampb" gapic "cloud.google.com/go/storage/internal/apiv2" @@ -1093,550 +1092,6 @@ func contextMetadataFromBidiReadObject(req *storagepb.BidiReadObjectRequest) []s return []string{"x-goog-request-params", fmt.Sprintf("bucket=%s", req.GetReadObjectSpec().GetBucket())} } -func (c *grpcStorageClient) NewMultiRangeDownloader(ctx context.Context, params *newMultiRangeDownloaderParams, opts ...storageOption) (mr *MultiRangeDownloader, err error) { - if !c.config.grpcBidiReads { - return nil, errors.New("storage: MultiRangeDownloader requires the experimental.WithGRPCBidiReads option") - } - - ctx, _ = startSpan(ctx, "grpcStorageClient.NewMultiRangeDownloader") - defer func() { endSpan(ctx, err) }() - s := callSettings(c.settings, opts...) - // Force the use of the custom codec to enable zero-copy reads. - s.gax = append(s.gax, gax.WithGRPCOptions( - grpc.ForceCodecV2(bytesCodecV2{}), - )) - - if s.userProject != "" { - ctx = setUserProjectMetadata(ctx, s.userProject) - } - - b := bucketResourceName(globalProjectAlias, params.bucket) - object := params.object - bidiObject := &storagepb.BidiReadObjectSpec{ - Bucket: b, - Object: object, - CommonObjectRequestParams: toProtoCommonObjectRequestParams(params.encryptionKey), - } - - // The default is a negative value, which means latest. - if params.gen >= 0 { - bidiObject.Generation = params.gen - } - - if params.handle != nil && len(*params.handle) != 0 { - bidiObject.ReadHandle = &storagepb.BidiReadHandle{ - Handle: *params.handle, - } - } - req := &storagepb.BidiReadObjectRequest{ - ReadObjectSpec: bidiObject, - } - - openStream := func(readHandle ReadHandle) (*bidiReadStreamResponse, context.CancelFunc, error) { - if err := applyCondsProto("grpcStorageClient.BidiReadObject", params.gen, params.conds, bidiObject); err != nil { - return nil, nil, err - } - if len(readHandle) != 0 { - req.GetReadObjectSpec().ReadHandle = &storagepb.BidiReadHandle{ - Handle: readHandle, - } - } - databufs := mem.BufferSlice{} - - var stream storagepb.Storage_BidiReadObjectClient - var decoder *readResponseDecoder - cc, cancel := context.WithCancel(ctx) - err = run(cc, func(ctx context.Context) error { - openAndSendReq := func() error { - mdCtx := gax.InsertMetadataIntoOutgoingContext(ctx, contextMetadataFromBidiReadObject(req)...) - - stream, err = c.raw.BidiReadObject(mdCtx, s.gax...) - if err != nil { - return err - } - // If stream opened succesfully, send first message on the stream. - // First message to stream should contain read_object_spec - err = stream.Send(req) - if err != nil { - return err - } - // Use RecvMsg to get the raw buffer slice instead of Recv(). - err = stream.RecvMsg(&databufs) - if err != nil { - return err - } - return nil - } - - err := openAndSendReq() - - // We might get a redirect error here for an out-of-region request. - // Add the routing token and read handle to the request and do one - // retry. - if st, ok := status.FromError(err); ok && st.Code() == codes.Aborted { - // BidiReadObjectRedirectedError error is only returned on initial open in case of a redirect. - // The routing token that should be used when reopening the read stream. Needs to be exported. - for _, detail := range st.Details() { - if bidiError, ok := detail.(*storagepb.BidiReadObjectRedirectedError); ok { - bidiObject.ReadHandle = bidiError.ReadHandle - bidiObject.RoutingToken = bidiError.RoutingToken - databufs = mem.BufferSlice{} - err = openAndSendReq() - break - } - } - } - if err != nil { - databufs.Free() - return err - } - - // Use the custom decoder to parse the raw buffer without copying object data. - decoder = &readResponseDecoder{ - databufs: databufs, - } - err = decoder.readFullObjectResponse() - return err - }, s.retry, s.idempotent) - if err != nil { - // Close the stream context we just created to ensure we don't leak - // resources. - cancel() - return nil, nil, err - } - return &bidiReadStreamResponse{stream: stream, decoder: decoder}, cancel, nil - } - - // For the first time open stream without adding any range. - resp, cancel, err := openStream(nil) - if err != nil { - return nil, err - } - - // The first message was Recv'd on stream open, use it to populate the - // object metadata. - msg := resp.decoder.msg - obj := msg.GetMetadata() - - mrd := &gRPCBidiReader{ - stream: resp.stream, - cancel: cancel, - settings: s, - readHandle: msg.GetReadHandle().GetHandle(), - readIDGenerator: &readIDGenerator{}, - reopen: openStream, - readSpec: bidiObject, - rangesToRead: make(chan []mrdRange, 100), - ctx: ctx, - closeReceiver: make(chan bool, 10), - closeSender: make(chan bool, 10), - senderRetry: make(chan bool), // create unbuffered channel for closing the streamManager goroutine. - receiverRetry: make(chan bool), // create unbuffered channel for closing the streamReceiver goroutine. - activeRanges: make(map[int64]mrdRange), - done: false, - numActiveRanges: 0, - streamRecreation: false, - } - - // sender receives ranges from user adds and requests these ranges from GCS. - sender := func() { - var currentSpec []mrdRange - for { - select { - case <-mrd.ctx.Done(): - mrd.mu.Lock() - mrd.done = true - mrd.mu.Unlock() - return - case <-mrd.senderRetry: - return - case <-mrd.closeSender: - mrd.mu.Lock() - if len(mrd.activeRanges) != 0 { - for key := range mrd.activeRanges { - mrd.activeRanges[key].callback(mrd.activeRanges[key].offset, mrd.activeRanges[key].totalBytesWritten, fmt.Errorf("stream closed early")) - delete(mrd.activeRanges, key) - } - } - mrd.numActiveRanges = 0 - mrd.mu.Unlock() - return - case currentSpec = <-mrd.rangesToRead: - var readRanges []*storagepb.ReadRange - var err error - mrd.mu.Lock() - for _, v := range currentSpec { - mrd.activeRanges[v.readID] = v - readRanges = append(readRanges, &storagepb.ReadRange{ReadOffset: v.offset, ReadLength: v.limit, ReadId: v.readID}) - } - mrd.mu.Unlock() - // We can just send 100 request to gcs in one request. - // In case of Add we will send only one range request to gcs but in case of retry we can have more than 100 ranges. - // Hence be will divide the request in chunk of 100. - // For example with 457 ranges on stream we will have 5 request to gcs [0:99], [100:199], [200:299], [300:399], [400:456] - requestCount := len(readRanges) / 100 - if len(readRanges)%100 != 0 { - requestCount++ - } - for i := 0; i < requestCount; i++ { - start := i * 100 - end := (i + 1) * 100 - if end > len(readRanges) { - end = len(readRanges) - } - curReq := readRanges[start:end] - err = mrd.stream.Send(&storagepb.BidiReadObjectRequest{ - ReadRanges: curReq, - }) - if err != nil { - // cancel stream and reopen the stream again. - // Incase again an error is thrown close the streamManager goroutine. - mrd.retrier(err, "manager") - break - } - } - - } - } - } - - // receives ranges responses on the stream and executes the callback. - receiver := func() { - var err error - for { - select { - case <-mrd.ctx.Done(): - mrd.done = true - return - case <-mrd.receiverRetry: - return - case <-mrd.closeReceiver: - return - default: - // This function reads the data sent for a particular range request and has a callback - // to indicate that output buffer is filled. - databufs := mem.BufferSlice{} - err = mrd.stream.RecvMsg(&databufs) - if err == io.EOF { - err = nil - } else { - // Cancel stream and reopen the stream again. - // In case again an error is thrown, close the streamManager goroutine. - // TODO: special handling for not found error. - mrd.retrier(err, "receiver") - } - - if err == nil { - // Use the custom decoder to parse the message. - decoder := &readResponseDecoder{databufs: databufs} - if err := decoder.readFullObjectResponse(); err != nil { - mrd.retrier(err, "receiver") - continue // Move to next iteration after retry - } - msg := decoder.msg - - if msg.GetReadHandle().GetHandle() != nil { - mrd.readHandle = msg.GetReadHandle().GetHandle() - } - - mrd.mu.Lock() - if len(mrd.activeRanges) == 0 && mrd.numActiveRanges == 0 { - mrd.mu.Unlock() - mrd.closeReceiver <- true - mrd.closeSender <- true - return - } - mrd.mu.Unlock() - for _, val := range msg.GetObjectDataRanges() { - id := val.GetReadRange().GetReadId() - func() { - mrd.mu.Lock() - defer mrd.mu.Unlock() - currRange, ok := mrd.activeRanges[id] - if !ok { - // it's ok to ignore responses for read_id not in map as user would have been notified by callback. - return - } - - // The decoder holds the object content. writeToAndUpdateCRC writes - // it to the user's buffer without an intermediate copy. - written, _, err := decoder.writeToAndUpdateCRC(currRange.writer, id, func(b []byte) { - // crc update logic can be added here if needed - }) - - if err != nil { - currRange.callback(currRange.offset, currRange.totalBytesWritten, err) - mrd.numActiveRanges-- - delete(mrd.activeRanges, id) - } else { - currRange = mrdRange{ - readID: currRange.readID, - writer: currRange.writer, - offset: currRange.offset, - limit: currRange.limit, - currentBytesWritten: currRange.currentBytesWritten + written, - totalBytesWritten: currRange.totalBytesWritten + written, - callback: currRange.callback, - } - mrd.activeRanges[id] = currRange - } - if val.GetRangeEnd() { - currRange.callback(currRange.offset, currRange.totalBytesWritten, nil) - mrd.numActiveRanges-- - delete(mrd.activeRanges, id) - } - }() - } - // Free the buffers once the message has been processed. - decoder.databufs.Free() - } - } - } - } - - mrd.retrier = func(err error, thread string) { - mrd.mu.Lock() - if !mrd.streamRecreation { - mrd.streamRecreation = true - } else { - mrd.mu.Unlock() - return - } - mrd.mu.Unlock() - // close both the go routines to make the stream recreation syncronous. - if thread == "receiver" { - mrd.senderRetry <- true - } else { - mrd.receiverRetry <- true - } - err = mrd.retryStream(err) - if err != nil { - mrd.mu.Lock() - for key := range mrd.activeRanges { - mrd.activeRanges[key].callback(mrd.activeRanges[key].offset, mrd.activeRanges[key].totalBytesWritten, err) - delete(mrd.activeRanges, key) - } - // In case we hit an permanent error, delete entries from map and remove active tasks. - mrd.numActiveRanges = 0 - mrd.mu.Unlock() - mrd.close() - } else { - // If stream recreation happened successfully lets again start - // both the goroutine making the whole flow asynchronous again. - if thread == "receiver" { - go sender() - } else { - go receiver() - } - } - mrd.mu.Lock() - mrd.streamRecreation = false - mrd.mu.Unlock() - } - - go sender() - go receiver() - - return &MultiRangeDownloader{ - Attrs: ReaderObjectAttrs{ - Size: obj.GetSize(), // this is the size of the entire object, even if only a range was requested. - ContentType: obj.GetContentType(), - ContentEncoding: obj.GetContentEncoding(), - CacheControl: obj.GetCacheControl(), - LastModified: obj.GetUpdateTime().AsTime(), - Metageneration: obj.GetMetageneration(), - Generation: obj.GetGeneration(), - }, - reader: mrd, - }, nil -} - -type gRPCBidiReader struct { - ctx context.Context - stream storagepb.Storage_BidiReadObjectClient - cancel context.CancelFunc - settings *settings - readHandle ReadHandle - readIDGenerator *readIDGenerator - reopen func(ReadHandle) (*bidiReadStreamResponse, context.CancelFunc, error) - readSpec *storagepb.BidiReadObjectSpec - closeReceiver chan bool - closeSender chan bool - senderRetry chan bool - receiverRetry chan bool - // rangesToRead are ranges that have not yet been sent or have been sent but - // must be retried. - rangesToRead chan []mrdRange - // activeRanges are ranges that are currently being sent or are waiting for - // a response from GCS. - activeRanges map[int64]mrdRange // always use the mutex when accessing the map - numActiveRanges int64 // always use the mutex when accessing this variable - done bool // always use the mutex when accessing this variable, indicates whether stream is closed or not. - mu sync.Mutex // protects all vars in gRPCBidiReader from concurrent access - retrier func(error, string) - streamRecreation bool // This helps us identify if stream recreation is in progress or not. If stream recreation gets called from two goroutine then this will stop second one. -} - -func (mrd *gRPCBidiReader) activeRange() []mrdRange { - mrd.mu.Lock() - defer mrd.mu.Unlock() - var activeRange []mrdRange - for k, v := range mrd.activeRanges { - activeRange = append(activeRange, mrdRange{ - readID: k, - writer: v.writer, - offset: (v.offset + v.currentBytesWritten), - limit: v.limit - v.currentBytesWritten, - callback: v.callback, - currentBytesWritten: 0, - totalBytesWritten: v.totalBytesWritten, - }) - mrd.activeRanges[k] = activeRange[len(activeRange)-1] - } - return activeRange -} - -// retryStream cancel's stream and reopen the stream again. -func (mrd *gRPCBidiReader) retryStream(err error) error { - if mrd.settings.retry.runShouldRetry(err) { - // This will "close" the existing stream and immediately attempt to - // reopen the stream, but will backoff if further attempts are necessary. - // When Reopening the stream only failed readID will be added to stream. - return mrd.reopenStream(mrd.activeRange()) - } - return err -} - -// reopenStream "closes" the existing stream and attempts to reopen a stream and -// sets the Reader's stream and cancelStream properties in the process. -func (mrd *gRPCBidiReader) reopenStream(failSpec []mrdRange) error { - // Close existing stream and initialize new stream with updated offset. - if mrd.cancel != nil { - mrd.cancel() - } - - res, cancel, err := mrd.reopen(mrd.readHandle) - if err != nil { - return err - } - mrd.stream = res.stream - mrd.cancel = cancel - msg := res.decoder.msg - if msg.GetReadHandle().GetHandle() != nil { - mrd.readHandle = msg.GetReadHandle().GetHandle() - } - - // Process any data ranges that came back in the initial response. - // This prevents data loss from the first message on the new stream. - for _, val := range msg.GetObjectDataRanges() { - id := val.GetReadRange().GetReadId() - mrd.mu.Lock() - activeRange, ok := mrd.activeRanges[id] - if !ok { - mrd.mu.Unlock() - continue - } - - // Use the decoder's zero-copy write method. - written, _, writeErr := res.decoder.writeToAndUpdateCRC(activeRange.writer, id, nil) - if writeErr != nil { - activeRange.callback(activeRange.offset, activeRange.totalBytesWritten, writeErr) - mrd.numActiveRanges-- - delete(mrd.activeRanges, id) - } else { - activeRange.currentBytesWritten += written - activeRange.totalBytesWritten += written - mrd.activeRanges[id] = activeRange - } - - if val.GetRangeEnd() { - activeRange.callback(activeRange.offset, activeRange.totalBytesWritten, nil) - mrd.numActiveRanges-- - delete(mrd.activeRanges, id) - } - mrd.mu.Unlock() - } - // Once all data in the initial response has been read out, free buffers. - res.decoder.databufs.Free() - if failSpec != nil { - mrd.rangesToRead <- failSpec - } - return nil -} - -// add will add current range to stream. The size of the range is not validated -// by add; if the client requests more bytes than are available in the object -// the server will return an error. -func (mrd *gRPCBidiReader) add(output io.Writer, offset, limit int64, callback func(int64, int64, error)) { - if limit < 0 { - callback(offset, 0, errors.New("storage: cannot add range because the limit cannot be negative")) - return - } - - id := mrd.readIDGenerator.Next() - if !mrd.done { - spec := mrdRange{readID: id, writer: output, offset: offset, limit: limit, currentBytesWritten: 0, totalBytesWritten: 0, callback: callback} - mrd.mu.Lock() - mrd.numActiveRanges++ - mrd.mu.Unlock() - mrd.rangesToRead <- []mrdRange{spec} - } else { - callback(offset, 0, errors.New("storage: cannot add range because the stream is closed")) - } -} - -func (mrd *gRPCBidiReader) wait() { - mrd.mu.Lock() - // we should wait until there is active task or an entry in the map. - // there can be a scenario we have nothing in map for a moment or too but still have active task. - // hence in case we have permanent errors we reduce active task to 0 so that this does not block wait. - keepWaiting := len(mrd.activeRanges) != 0 || mrd.numActiveRanges != 0 - mrd.mu.Unlock() - - for keepWaiting { - mrd.mu.Lock() - keepWaiting = len(mrd.activeRanges) != 0 || mrd.numActiveRanges != 0 - mrd.mu.Unlock() - } -} - -// Close will notify stream manager goroutine that the reader has been closed, if it's still running. -func (mrd *gRPCBidiReader) close() error { - if mrd.cancel != nil { - mrd.cancel() - } - mrd.mu.Lock() - mrd.done = true - mrd.numActiveRanges = 0 - mrd.mu.Unlock() - mrd.closeReceiver <- true - mrd.closeSender <- true - return nil -} - -func (mrd *gRPCBidiReader) getHandle() []byte { - return mrd.readHandle -} - -func (mrd *gRPCBidiReader) error() error { - mrd.mu.Lock() - defer mrd.mu.Unlock() - if mrd.done { - return errors.New("storage: stream is permanently closed") - } - return nil -} - -type mrdRange struct { - readID int64 - writer io.Writer - offset int64 - limit int64 - currentBytesWritten int64 - totalBytesWritten int64 - callback func(int64, int64, error) -} - func (c *grpcStorageClient) NewRangeReader(ctx context.Context, params *newRangeReaderParams, opts ...storageOption) (r *Reader, err error) { // If bidi reads was not selected, use the legacy read object API. if !c.config.grpcBidiReads { diff --git a/ci/resources/stemcell-version-bump/vendor/cloud.google.com/go/storage/grpc_reader_multi_range.go b/ci/resources/stemcell-version-bump/vendor/cloud.google.com/go/storage/grpc_reader_multi_range.go index 14b78e1a1..b153989eb 100644 --- a/ci/resources/stemcell-version-bump/vendor/cloud.google.com/go/storage/grpc_reader_multi_range.go +++ b/ci/resources/stemcell-version-bump/vendor/cloud.google.com/go/storage/grpc_reader_multi_range.go @@ -14,27 +14,896 @@ package storage -import "sync" +import ( + "context" + "errors" + "fmt" + "io" + "sync" -// readIDGenerator generates unique read IDs for multi-range reads. -// Call readIDGenerator.Next to get the next ID. Safe to be called concurrently. -type readIDGenerator struct { - initOnce sync.Once - nextID chan int64 // do not use this field directly + "cloud.google.com/go/storage/internal/apiv2/storagepb" + "google.golang.org/grpc" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/mem" + "google.golang.org/grpc/status" + "google.golang.org/protobuf/proto" + + gax "github.com/googleapis/gax-go/v2" +) + +const ( + mrdCommandChannelSize = 1 + mrdResponseChannelSize = 100 +) + +// --- internalMultiRangeDownloader Interface --- +// This provides an internal wrapper for the gRPC methods to avoid polluting +// reader.go with gRPC implementation details. The only implementation +// currently is for the gRPC transport with bidi APIs enabled. Creating +// a MultiRangeDownloader with any other client type will fail. +type internalMultiRangeDownloader interface { + add(output io.Writer, offset, length int64, callback func(int64, int64, error)) + close(err error) error + wait() + getHandle() []byte + getPermanentError() error + getSpanCtx() context.Context +} + +// --- grpcStorageClient method --- +// Top level entry point into the MultiRangeDownloader via the storageClient interface. +func (c *grpcStorageClient) NewMultiRangeDownloader(ctx context.Context, params *newMultiRangeDownloaderParams, opts ...storageOption) (*MultiRangeDownloader, error) { + if !c.config.grpcBidiReads { + return nil, errors.New("storage: MultiRangeDownloader requires the experimental.WithGRPCBidiReads option") + } + s := callSettings(c.settings, opts...) + if s.userProject != "" { + ctx = setUserProjectMetadata(ctx, s.userProject) + } + if s.retry == nil { + s.retry = defaultRetry + } + + b := bucketResourceName(globalProjectAlias, params.bucket) + readSpec := &storagepb.BidiReadObjectSpec{ + Bucket: b, + Object: params.object, + CommonObjectRequestParams: toProtoCommonObjectRequestParams(params.encryptionKey), + } + if params.gen >= 0 { + readSpec.Generation = params.gen + } + if params.handle != nil && len(*params.handle) > 0 { + readSpec.ReadHandle = &storagepb.BidiReadHandle{ + Handle: *params.handle, + } + } + + mCtx, cancel := context.WithCancel(ctx) + + // Create the manager + manager := &multiRangeDownloaderManager{ + ctx: mCtx, + cancel: cancel, + client: c, + settings: s, + params: params, + cmds: make(chan mrdCommand, mrdCommandChannelSize), + sessionResps: make(chan mrdSessionResult, mrdResponseChannelSize), + pendingRanges: make(map[int64]*rangeRequest), + readIDCounter: 1, + readSpec: readSpec, + attrsReady: make(chan struct{}), + spanCtx: ctx, + } + + mrd := &MultiRangeDownloader{ + impl: manager, + } + + manager.wg.Add(1) + go func() { + defer manager.wg.Done() + manager.eventLoop() + }() + + // Wait for attributes to be ready + select { + case <-manager.attrsReady: + if manager.permanentErr != nil { + cancel() + manager.wg.Wait() + return nil, manager.permanentErr + } + mrd.Attrs = manager.attrs + return mrd, nil + case <-ctx.Done(): + cancel() + manager.wg.Wait() + return nil, ctx.Err() + } +} + +// --- mrdCommand Interface and Implementations --- +// Used to pass commands from the user-facing code to the MRD manager. +// mrdCommand handlers are applied sequentially in the event loop. Therefore, it's okay +// for them to read/modify the manager state without concern for thread safety. +type mrdCommand interface { + apply(ctx context.Context, m *multiRangeDownloaderManager) +} +type mrdAddCmd struct { + output io.Writer + offset int64 + length int64 + callback func(int64, int64, error) +} + +func (c *mrdAddCmd) apply(ctx context.Context, m *multiRangeDownloaderManager) { + m.handleAddCmd(ctx, c) +} + +type mrdCloseCmd struct { + err error +} + +func (c *mrdCloseCmd) apply(ctx context.Context, m *multiRangeDownloaderManager) { + m.handleCloseCmd(ctx, c) +} + +type mrdWaitCmd struct { + doneC chan struct{} +} + +func (c *mrdWaitCmd) apply(ctx context.Context, m *multiRangeDownloaderManager) { + m.handleWaitCmd(ctx, c) +} + +type mrdGetHandleCmd struct { + respC chan []byte +} + +func (c *mrdGetHandleCmd) apply(ctx context.Context, m *multiRangeDownloaderManager) { + select { + case <-m.attrsReady: + select { + case c.respC <- m.lastReadHandle: + case <-m.ctx.Done(): + close(c.respC) + } + case <-m.ctx.Done(): + close(c.respC) + } +} + +type mrdErrorCmd struct { + respC chan error +} + +func (c *mrdErrorCmd) apply(ctx context.Context, m *multiRangeDownloaderManager) { + select { + case c.respC <- m.permanentErr: + case <-ctx.Done(): + close(c.respC) + } +} + +// --- mrdSessionResult --- +// This is used to pass the zero-copy decoded response from the recv stream +// back up to the multiRangeDownloadManager for processing, or to pass +// an error if the session failed. +type mrdSessionResult struct { + decoder *readResponseDecoder + err error + redirect *storagepb.BidiReadObjectRedirectedError +} + +var errClosed = errors.New("downloader closed") + +// --- multiRangeDownloaderManager --- +// Manages main event loop for MRD commands and processing responses. +// Spawns bidiStreamSession to deal with actual stream management, retries, etc. +type multiRangeDownloaderManager struct { + ctx context.Context + cancel context.CancelFunc + client *grpcStorageClient + settings *settings + params *newMultiRangeDownloaderParams + wg sync.WaitGroup // syncs completion of event loop. + cmds chan mrdCommand + sessionResps chan mrdSessionResult + + // State + currentSession *bidiReadStreamSession + readIDCounter int64 + pendingRanges map[int64]*rangeRequest + permanentErr error + waiters []chan struct{} + readSpec *storagepb.BidiReadObjectSpec + lastReadHandle []byte + attrs ReaderObjectAttrs + attrsReady chan struct{} + attrsOnce sync.Once + spanCtx context.Context + callbackWg sync.WaitGroup +} + +type rangeRequest struct { + output io.Writer + offset int64 + length int64 + callback func(int64, int64, error) + + origOffset int64 + origLength int64 + + readID int64 + bytesWritten int64 + completed bool +} + +// Methods implementing internalMultiRangeDownloader +func (m *multiRangeDownloaderManager) add(output io.Writer, offset, length int64, callback func(int64, int64, error)) { + if err := m.ctx.Err(); err != nil { + if m.permanentErr != nil { + err = m.permanentErr + } + m.runCallback(offset, length, err, callback) + return + } + if length < 0 { + m.runCallback(offset, length, fmt.Errorf("storage: MultiRangeDownloader.Add limit cannot be negative"), callback) + return + } + + cmd := &mrdAddCmd{output: output, offset: offset, length: length, callback: callback} + select { + case m.cmds <- cmd: + case <-m.ctx.Done(): + err := m.ctx.Err() + if m.permanentErr != nil { + err = m.permanentErr + } + m.runCallback(offset, length, err, callback) + } +} + +func (m *multiRangeDownloaderManager) close(err error) error { + cmd := &mrdCloseCmd{err: err} + select { + case m.cmds <- cmd: + <-m.ctx.Done() + m.wg.Wait() + if m.permanentErr != nil && !errors.Is(m.permanentErr, errClosed) { + return m.permanentErr + } + return nil + case <-m.ctx.Done(): + m.wg.Wait() + return m.ctx.Err() + } +} + +func (m *multiRangeDownloaderManager) wait() { + doneC := make(chan struct{}) + cmd := &mrdWaitCmd{doneC: doneC} + select { + case m.cmds <- cmd: + select { + case <-doneC: + m.callbackWg.Wait() + return + case <-m.ctx.Done(): + m.callbackWg.Wait() + return + } + case <-m.ctx.Done(): + m.callbackWg.Wait() + return + } +} + +func (m *multiRangeDownloaderManager) getHandle() []byte { + select { + case <-m.attrsReady: + case <-m.ctx.Done(): + return nil + } + + respC := make(chan []byte, 1) + cmd := &mrdGetHandleCmd{respC: respC} + select { + case m.cmds <- cmd: + select { + case h, ok := <-respC: + if !ok { + return nil + } + return h + case <-m.ctx.Done(): + return nil + } + case <-m.ctx.Done(): + return nil + } +} + +func (m *multiRangeDownloaderManager) getPermanentError() error { + return m.permanentErr +} + +func (m *multiRangeDownloaderManager) getSpanCtx() context.Context { + return m.spanCtx +} + +func (m *multiRangeDownloaderManager) runCallback(origOffset, numBytes int64, err error, cb func(int64, int64, error)) { + m.callbackWg.Add(1) + go func() { + defer m.callbackWg.Done() + cb(origOffset, numBytes, err) + }() +} + +func (m *multiRangeDownloaderManager) eventLoop() { + defer func() { + if m.currentSession != nil { + m.currentSession.Shutdown() + } + finalErr := m.permanentErr + if finalErr == nil { + if ctxErr := m.ctx.Err(); ctxErr != nil { + finalErr = ctxErr + } + } + if finalErr == nil { + finalErr = errClosed + } + m.failAllPending(finalErr) + for _, waiter := range m.waiters { + close(waiter) + } + m.attrsOnce.Do(func() { close(m.attrsReady) }) + m.callbackWg.Wait() + }() + + // Blocking call to establish the first session and get attributes. + if err := m.establishInitialSession(); err != nil { + // permanentErr is set within establishInitialSession if necessary. + return // Exit eventLoop if we can't start. + } + + for { + select { + case <-m.ctx.Done(): + return + case cmd := <-m.cmds: + cmd.apply(m.ctx, m) + if _, ok := cmd.(*mrdCloseCmd); ok { + return + } + case result := <-m.sessionResps: + m.processSessionResult(result) + } + + if len(m.pendingRanges) == 0 { + for _, waiter := range m.waiters { + close(waiter) + } + m.waiters = nil + } + } +} + +func (m *multiRangeDownloaderManager) establishInitialSession() error { + retry := m.settings.retry + + var firstResult mrdSessionResult + + openStreamAndReceiveFirst := func(ctx context.Context, spec *storagepb.BidiReadObjectSpec) (*bidiReadStreamSession, mrdSessionResult) { + session, err := newBidiReadStreamSession(m.ctx, m.sessionResps, m.client, m.settings, m.params, spec) + if err != nil { + return nil, mrdSessionResult{err: err} + } + + select { + case result := <-m.sessionResps: + return session, result + case <-ctx.Done(): + session.Shutdown() + return nil, mrdSessionResult{err: ctx.Err()} + } + } + + err := run(m.ctx, func(ctx context.Context) error { + if m.currentSession != nil { + m.currentSession.Shutdown() + m.currentSession = nil + } + + currentSpec := proto.Clone(m.readSpec).(*storagepb.BidiReadObjectSpec) + session, result := openStreamAndReceiveFirst(ctx, currentSpec) + + if result.err != nil { + if result.redirect != nil { + m.readSpec.RoutingToken = result.redirect.RoutingToken + m.readSpec.ReadHandle = result.redirect.ReadHandle + if session != nil { + session.Shutdown() + } + + // We might get a redirect error here for an out-of-region request. + // Add the routing token and read handle to the request and do one + // retry. + currentSpec = proto.Clone(m.readSpec).(*storagepb.BidiReadObjectSpec) + session, result = openStreamAndReceiveFirst(ctx, currentSpec) + + if result.err != nil { + if session != nil { + session.Shutdown() + } + return result.err + } + } else { + // Not a redirect error, return to run() + if session != nil { + session.Shutdown() + } + return result.err + } + } + + // Success + m.currentSession = session + firstResult = result + return nil + }, retry, true) + + if err != nil { + m.setPermanentError(err) + return m.permanentErr + } + + // Process the successful first result + m.processSessionResult(firstResult) + if m.permanentErr != nil { + return m.permanentErr + } + return nil +} + +func (m *multiRangeDownloaderManager) handleAddCmd(ctx context.Context, cmd *mrdAddCmd) { + if m.permanentErr != nil { + m.runCallback(cmd.offset, cmd.length, m.permanentErr, cmd.callback) + return + } + + req := &rangeRequest{ + output: cmd.output, + offset: cmd.offset, + length: cmd.length, + origOffset: cmd.offset, + origLength: cmd.length, + callback: cmd.callback, + readID: m.readIDCounter, + } + m.readIDCounter++ + + // Attributes should be ready if we are processing Add commands + if req.offset < 0 { + err := m.convertToPositiveOffset(req) + if err != nil { + return + } + } + + if m.currentSession == nil { + // This should not happen if establishInitialSession was successful + m.failRange(req, errors.New("storage: session not available")) + return + } + + m.pendingRanges[req.readID] = req + + protoReq := &storagepb.BidiReadObjectRequest{ + ReadRanges: []*storagepb.ReadRange{{ + ReadOffset: req.offset, + ReadLength: req.length, + ReadId: req.readID, + }}, + } + m.currentSession.SendRequest(protoReq) +} + +func (m *multiRangeDownloaderManager) convertToPositiveOffset(req *rangeRequest) error { + if req.offset >= 0 { + return nil + } + objSize := m.attrs.Size + if objSize <= 0 { + err := errors.New("storage: cannot resolve negative offset without object size") + m.failRange(req, err) + return err + } + if req.length != 0 { + err := fmt.Errorf("storage: negative offset with non-zero length is not supported (offset: %d, length: %d)", req.origOffset, req.origLength) + m.failRange(req, err) + return err + } + start := objSize + req.offset + if start < 0 { + start = 0 + } + req.offset = start + req.length = objSize - start + return nil } -func (g *readIDGenerator) init() { - g.nextID = make(chan int64, 1) - g.nextID <- 1 +func (m *multiRangeDownloaderManager) handleCloseCmd(ctx context.Context, cmd *mrdCloseCmd) { + var err error + if cmd.err != nil { + err = cmd.err + } else { + err = errClosed + + } + m.setPermanentError(err) + m.cancel() +} + +func (m *multiRangeDownloaderManager) handleWaitCmd(ctx context.Context, cmd *mrdWaitCmd) { + if len(m.pendingRanges) == 0 { + close(cmd.doneC) + } else { + m.waiters = append(m.waiters, cmd.doneC) + } +} + +func (m *multiRangeDownloaderManager) processSessionResult(result mrdSessionResult) { + if result.err != nil { + m.handleStreamEnd(result) + return + } + + resp := result.decoder.msg + if handle := resp.GetReadHandle().GetHandle(); len(handle) > 0 { + m.lastReadHandle = handle + if m.params.handle != nil { + *m.params.handle = handle + } + } + + m.attrsOnce.Do(func() { + if meta := resp.GetMetadata(); meta != nil { + obj := newObjectFromProto(meta) + attrs := readerAttrsFromObject(obj) + m.attrs = attrs + close(m.attrsReady) + + for _, req := range m.pendingRanges { + if req.offset < 0 { + _ = m.convertToPositiveOffset(req) + } + } + } else { + m.handleStreamEnd(mrdSessionResult{err: errors.New("storage: first response from BidiReadObject stream missing metadata")}) + } + }) + + for _, dataRange := range resp.GetObjectDataRanges() { + readID := dataRange.GetReadRange().GetReadId() + req, exists := m.pendingRanges[readID] + if !exists || req.completed { + continue + } + written, _, err := result.decoder.writeToAndUpdateCRC(req.output, readID, nil) + req.bytesWritten += written + if err != nil { + m.failRange(req, err) + continue + } + + if dataRange.GetRangeEnd() { + req.completed = true + delete(m.pendingRanges, req.readID) + m.runCallback(req.origOffset, req.bytesWritten, nil, req.callback) + } + } + // Once all data in the initial response has been read out, free buffers. + result.decoder.databufs.Free() +} + +// ensureSession is now only for reconnecting *after* the initial session is up. +func (m *multiRangeDownloaderManager) ensureSession(ctx context.Context) error { + if m.currentSession != nil { + return nil + } + if m.permanentErr != nil { + return m.permanentErr + } + + // Using run for retries + return run(ctx, func(ctx context.Context) error { + if m.currentSession != nil { + return nil + } + if m.permanentErr != nil { + return m.permanentErr + } + + session, err := newBidiReadStreamSession(m.ctx, m.sessionResps, m.client, m.settings, m.params, proto.Clone(m.readSpec).(*storagepb.BidiReadObjectSpec)) + if err != nil { + redirectErr, isRedirect := isRedirectError(err) + if isRedirect { + m.readSpec.RoutingToken = redirectErr.RoutingToken + m.readSpec.ReadHandle = redirectErr.ReadHandle + return fmt.Errorf("%w: %v", errBidiReadRedirect, err) + } + return err + } + m.currentSession = session + + var rangesToResend []*storagepb.ReadRange + for _, req := range m.pendingRanges { + if !req.completed { + readLength := req.length + if req.length > 0 { + readLength -= req.bytesWritten + } + if readLength < 0 { + readLength = 0 + } + + if req.length == 0 || readLength > 0 { + rangesToResend = append(rangesToResend, &storagepb.ReadRange{ + ReadOffset: req.offset + req.bytesWritten, + ReadLength: readLength, + ReadId: req.readID, + }) + } + } + } + if len(rangesToResend) > 0 { + m.currentSession.SendRequest(&storagepb.BidiReadObjectRequest{ReadRanges: rangesToResend}) + } + return nil + }, m.settings.retry, true) } -// Next returns the Next read ID. It initializes the readIDGenerator if needed. -func (g *readIDGenerator) Next() int64 { - g.initOnce.Do(g.init) +var errBidiReadRedirect = errors.New("bidi read object redirected") - id := <-g.nextID - n := id + 1 - g.nextID <- n +func (m *multiRangeDownloaderManager) handleStreamEnd(result mrdSessionResult) { + if m.currentSession != nil { + m.currentSession.Shutdown() + m.currentSession = nil + } + err := result.err + var ensureErr error + + if result.redirect != nil { + m.readSpec.RoutingToken = result.redirect.RoutingToken + m.readSpec.ReadHandle = result.redirect.ReadHandle + ensureErr = m.ensureSession(m.ctx) + } else if m.settings.retry != nil && m.settings.retry.runShouldRetry(err) { + ensureErr = m.ensureSession(m.ctx) + } else { + if !errors.Is(err, context.Canceled) && !errors.Is(err, errClosed) { + m.setPermanentError(err) + } else if m.permanentErr == nil { + m.setPermanentError(errClosed) + } + m.failAllPending(m.permanentErr) + } + + // Handle error from ensureSession. + if ensureErr != nil { + m.setPermanentError(ensureErr) + m.failAllPending(m.permanentErr) + } +} + +func (m *multiRangeDownloaderManager) failRange(req *rangeRequest, err error) { + if req.completed { + return + } + req.completed = true + delete(m.pendingRanges, req.readID) + m.runCallback(req.origOffset, req.bytesWritten, err, req.callback) +} + +func (m *multiRangeDownloaderManager) failAllPending(err error) { + for _, req := range m.pendingRanges { + if !req.completed { + req.completed = true + m.runCallback(req.origOffset, req.bytesWritten, err, req.callback) + } + } + m.pendingRanges = make(map[int64]*rangeRequest) +} + +// Set permanent error to the provided error, if it hasn't been set already. +func (m *multiRangeDownloaderManager) setPermanentError(err error) { + if m.permanentErr == nil { + m.permanentErr = err + } +} + +// --- bidiReadStreamSession --- +// Controls lifespan of an individual bi-directional gRPC stream to the +// object in GCS. Spins up goroutines for the read and write sides of the +// stream. +type bidiReadStreamSession struct { + ctx context.Context + cancel context.CancelFunc + + stream storagepb.Storage_BidiReadObjectClient + client *grpcStorageClient + settings *settings + params *newMultiRangeDownloaderParams + readSpec *storagepb.BidiReadObjectSpec + + reqC chan *storagepb.BidiReadObjectRequest + respC chan<- mrdSessionResult + wg sync.WaitGroup + + errOnce sync.Once + streamErr error +} + +func newBidiReadStreamSession(ctx context.Context, respC chan<- mrdSessionResult, client *grpcStorageClient, settings *settings, params *newMultiRangeDownloaderParams, readSpec *storagepb.BidiReadObjectSpec) (*bidiReadStreamSession, error) { + sCtx, cancel := context.WithCancel(ctx) + + s := &bidiReadStreamSession{ + ctx: sCtx, + cancel: cancel, + client: client, + settings: settings, + params: params, + readSpec: readSpec, + reqC: make(chan *storagepb.BidiReadObjectRequest, 100), + respC: respC, + } + + initialReq := &storagepb.BidiReadObjectRequest{ + ReadObjectSpec: s.readSpec, + } + reqCtx := gax.InsertMetadataIntoOutgoingContext(s.ctx, contextMetadataFromBidiReadObject(initialReq)...) + // Force the use of the custom codec to enable zero-copy reads. + s.settings.gax = append(s.settings.gax, gax.WithGRPCOptions( + grpc.ForceCodecV2(bytesCodecV2{}), + )) + + var err error + s.stream, err = client.raw.BidiReadObject(reqCtx, s.settings.gax...) + if err != nil { + cancel() + return nil, err + } + + if err := s.stream.Send(initialReq); err != nil { + s.stream.CloseSend() + cancel() + return nil, err + } + + s.wg.Add(2) + go s.sendLoop() + go s.receiveLoop() + + go func() { + s.wg.Wait() + s.cancel() + }() + + return s, nil +} +func (s *bidiReadStreamSession) SendRequest(req *storagepb.BidiReadObjectRequest) { + select { + case s.reqC <- req: + case <-s.ctx.Done(): + } +} +func (s *bidiReadStreamSession) Shutdown() { + s.cancel() + s.wg.Wait() +} +func (s *bidiReadStreamSession) setError(err error) { + s.errOnce.Do(func() { + s.streamErr = err + }) +} +func (s *bidiReadStreamSession) sendLoop() { + defer s.wg.Done() + defer s.stream.CloseSend() + for { + select { + case req, ok := <-s.reqC: + if !ok { + return + } + if err := s.stream.Send(req); err != nil { + s.setError(err) + s.cancel() + return + } + case <-s.ctx.Done(): + return + } + } +} +func (s *bidiReadStreamSession) receiveLoop() { + defer s.wg.Done() + defer s.cancel() + for { + if err := s.ctx.Err(); err != nil { + return + } + + // Receive message without a copy. + databufs := mem.BufferSlice{} + err := s.stream.RecvMsg(&databufs) + var decoder *readResponseDecoder + if err == nil { + // Use the custom decoder to parse the raw buffer without copying object data. + decoder = &readResponseDecoder{ + databufs: databufs, + } + err = decoder.readFullObjectResponse() + } + + if err != nil { + databufs.Free() + redirectErr, isRedirect := isRedirectError(err) + result := mrdSessionResult{err: err} + if isRedirect { + result.redirect = redirectErr + err = fmt.Errorf("%w: %v", errBidiReadRedirect, err) + result.err = err + } + s.setError(err) + + select { + case s.respC <- result: + case <-s.ctx.Done(): + } + return + } + + select { + case s.respC <- mrdSessionResult{decoder: decoder}: + case <-s.ctx.Done(): + return + } + } +} +func isRedirectError(err error) (*storagepb.BidiReadObjectRedirectedError, bool) { + st, ok := status.FromError(err) + if !ok { + return nil, false + } + if st.Code() != codes.Aborted { + return nil, false + } + for _, d := range st.Details() { + if bidiError, ok := d.(*storagepb.BidiReadObjectRedirectedError); ok { + if bidiError.RoutingToken != nil { + return bidiError, true + } + } + } + return nil, false +} - return id +func readerAttrsFromObject(o *ObjectAttrs) ReaderObjectAttrs { + if o == nil { + return ReaderObjectAttrs{} + } + return ReaderObjectAttrs{ + Size: o.Size, + ContentType: o.ContentType, + ContentEncoding: o.ContentEncoding, + CacheControl: o.CacheControl, + LastModified: o.Updated, + Generation: o.Generation, + Metageneration: o.Metageneration, + CRC32C: o.CRC32C, + } } diff --git a/ci/resources/stemcell-version-bump/vendor/cloud.google.com/go/storage/grpc_writer.go b/ci/resources/stemcell-version-bump/vendor/cloud.google.com/go/storage/grpc_writer.go index 2af970ecb..b8773b170 100644 --- a/ci/resources/stemcell-version-bump/vendor/cloud.google.com/go/storage/grpc_writer.go +++ b/ci/resources/stemcell-version-bump/vendor/cloud.google.com/go/storage/grpc_writer.go @@ -819,11 +819,12 @@ func bidiWriteObjectRequest(r gRPCBidiWriteRequest, bufChecksum *uint32, objectC } type getObjectChecksumsParams struct { - fullObjectChecksum func() uint32 - finishWrite bool sendCRC32C bool disableAutoChecksum bool - attrs *ObjectAttrs + objectAttrs *ObjectAttrs + fullObjectChecksum func() uint32 + finishWrite bool + takeoverWriter bool } // getObjectChecksums determines what checksum information to include in the final @@ -840,9 +841,10 @@ func getObjectChecksums(params *getObjectChecksumsParams) *storagepb.ObjectCheck // send user's checksum on last write op if available if params.sendCRC32C { - return toProtoChecksums(params.sendCRC32C, params.attrs) + return toProtoChecksums(params.sendCRC32C, params.objectAttrs) } - if params.disableAutoChecksum { + // TODO(b/461982277): Enable checksum validation for appendable takeover writer gRPC + if params.disableAutoChecksum || params.takeoverWriter { return nil } return &storagepb.ObjectChecksums{ @@ -879,8 +881,11 @@ type gRPCOneshotBidiWriteBufferSender struct { firstMessage *storagepb.BidiWriteObjectRequest streamErr error - checksumSettings func() (bool, bool, *ObjectAttrs) - fullObjectChecksum func() uint32 + // Checksum related settings. + sendCRC32C bool + disableAutoChecksum bool + objectAttrs *ObjectAttrs + fullObjectChecksum func() uint32 } func (w *gRPCWriter) newGRPCOneshotBidiWriteBufferSender() *gRPCOneshotBidiWriteBufferSender { @@ -894,9 +899,9 @@ func (w *gRPCWriter) newGRPCOneshotBidiWriteBufferSender() *gRPCOneshotBidiWrite CommonObjectRequestParams: toProtoCommonObjectRequestParams(w.encryptionKey), ObjectChecksums: toProtoChecksums(w.sendCRC32C, w.attrs), }, - checksumSettings: func() (bool, bool, *ObjectAttrs) { - return w.sendCRC32C, w.disableAutoChecksum, w.attrs - }, + sendCRC32C: w.sendCRC32C, + disableAutoChecksum: w.disableAutoChecksum, + objectAttrs: w.attrs, fullObjectChecksum: func() uint32 { return w.fullObjectChecksum }, @@ -939,17 +944,16 @@ func (s *gRPCOneshotBidiWriteBufferSender) connect(ctx context.Context, cs gRPCB continue } - sendCrc32C, disableAutoChecksum, attrs := s.checksumSettings() var bufChecksum *uint32 - if !disableAutoChecksum { + if !s.disableAutoChecksum { bufChecksum = proto.Uint32(crc32.Checksum(r.buf, crc32cTable)) } objectChecksums := getObjectChecksums(&getObjectChecksumsParams{ + sendCRC32C: s.sendCRC32C, + objectAttrs: s.objectAttrs, fullObjectChecksum: s.fullObjectChecksum, + disableAutoChecksum: s.disableAutoChecksum, finishWrite: r.finishWrite, - sendCRC32C: sendCrc32C, - disableAutoChecksum: disableAutoChecksum, - attrs: attrs, }) req := bidiWriteObjectRequest(r, bufChecksum, objectChecksums) @@ -996,8 +1000,11 @@ type gRPCResumableBidiWriteBufferSender struct { startWriteRequest *storagepb.StartResumableWriteRequest upid string - checksumSettings func() (bool, bool, *ObjectAttrs) - fullObjectChecksum func() uint32 + // Checksum related settings. + sendCRC32C bool + disableAutoChecksum bool + objectAttrs *ObjectAttrs + fullObjectChecksum func() uint32 streamErr error } @@ -1011,9 +1018,9 @@ func (w *gRPCWriter) newGRPCResumableBidiWriteBufferSender() *gRPCResumableBidiW CommonObjectRequestParams: toProtoCommonObjectRequestParams(w.encryptionKey), ObjectChecksums: toProtoChecksums(w.sendCRC32C, w.attrs), }, - checksumSettings: func() (bool, bool, *ObjectAttrs) { - return w.sendCRC32C, w.disableAutoChecksum, w.attrs - }, + sendCRC32C: w.sendCRC32C, + disableAutoChecksum: w.disableAutoChecksum, + objectAttrs: w.attrs, fullObjectChecksum: func() uint32 { return w.fullObjectChecksum }, @@ -1076,17 +1083,16 @@ func (s *gRPCResumableBidiWriteBufferSender) connect(ctx context.Context, cs gRP continue } - sendCrc32C, disableAutoChecksum, attrs := s.checksumSettings() var bufChecksum *uint32 - if !disableAutoChecksum { + if !s.disableAutoChecksum { bufChecksum = proto.Uint32(crc32.Checksum(r.buf, crc32cTable)) } objectChecksums := getObjectChecksums(&getObjectChecksumsParams{ + sendCRC32C: s.sendCRC32C, + objectAttrs: s.objectAttrs, fullObjectChecksum: s.fullObjectChecksum, + disableAutoChecksum: s.disableAutoChecksum, finishWrite: r.finishWrite, - sendCRC32C: sendCrc32C, - disableAutoChecksum: disableAutoChecksum, - attrs: attrs, }) req := bidiWriteObjectRequest(r, bufChecksum, objectChecksums) @@ -1142,12 +1148,18 @@ type gRPCAppendBidiWriteBufferSender struct { bucket string routingToken *string - firstMessage *storagepb.BidiWriteObjectRequest - - objectChecksums *storagepb.ObjectChecksums + firstMessage *storagepb.BidiWriteObjectRequest finalizeOnClose bool objResource *storagepb.Object + // Checksum related settings. + sendCRC32C bool + disableAutoChecksum bool + objectAttrs *ObjectAttrs + fullObjectChecksum func() uint32 + + takeoverWriter bool + streamErr error } @@ -1164,8 +1176,13 @@ func (w *gRPCWriter) newGRPCAppendableObjectBufferSender() *gRPCAppendBidiWriteB }, CommonObjectRequestParams: toProtoCommonObjectRequestParams(w.encryptionKey), }, - objectChecksums: toProtoChecksums(w.sendCRC32C, w.attrs), - finalizeOnClose: w.finalizeOnClose, + finalizeOnClose: w.finalizeOnClose, + sendCRC32C: w.sendCRC32C, + disableAutoChecksum: w.disableAutoChecksum, + objectAttrs: w.attrs, + fullObjectChecksum: func() uint32 { + return w.fullObjectChecksum + }, } } @@ -1278,8 +1295,14 @@ func (w *gRPCWriter) newGRPCAppendTakeoverWriteBufferSender() *gRPCAppendTakeove AppendObjectSpec: writeObjectSpecAsAppendObjectSpec(w.spec, w.appendGen), }, }, - objectChecksums: toProtoChecksums(w.sendCRC32C, w.attrs), - finalizeOnClose: w.finalizeOnClose, + finalizeOnClose: w.finalizeOnClose, + takeoverWriter: true, + sendCRC32C: w.sendCRC32C, + disableAutoChecksum: w.disableAutoChecksum, + objectAttrs: w.attrs, + fullObjectChecksum: func() uint32 { + return w.fullObjectChecksum + }, }, takeoverReported: false, handleTakeoverCompletion: func(c gRPCBidiWriteCompletion) { @@ -1409,12 +1432,20 @@ func (s *gRPCAppendBidiWriteBufferSender) send(stream storagepb.Storage_BidiWrit flush: flush, finishWrite: finalizeObject, } - // TODO(b/453869602): implement default checksumming for appendable writes - req := bidiWriteObjectRequest(r, nil, nil) - if finalizeObject { - // appendable objects pass checksums on the finalize message only - req.ObjectChecksums = s.objectChecksums + + var bufChecksum *uint32 + if !s.disableAutoChecksum { + bufChecksum = proto.Uint32(crc32.Checksum(r.buf, crc32cTable)) } + objectChecksums := getObjectChecksums(&getObjectChecksumsParams{ + sendCRC32C: s.sendCRC32C, + objectAttrs: s.objectAttrs, + fullObjectChecksum: s.fullObjectChecksum, + disableAutoChecksum: s.disableAutoChecksum, + finishWrite: finalizeObject, + takeoverWriter: s.takeoverWriter, + }) + req := bidiWriteObjectRequest(r, bufChecksum, objectChecksums) if sendFirstMessage { proto.Merge(req, s.firstMessage) } diff --git a/ci/resources/stemcell-version-bump/vendor/cloud.google.com/go/storage/internal/version.go b/ci/resources/stemcell-version-bump/vendor/cloud.google.com/go/storage/internal/version.go index 7d1d0dfd4..38a03ee4d 100644 --- a/ci/resources/stemcell-version-bump/vendor/cloud.google.com/go/storage/internal/version.go +++ b/ci/resources/stemcell-version-bump/vendor/cloud.google.com/go/storage/internal/version.go @@ -1,4 +1,4 @@ -// Copyright 2025 Google LLC +// Copyright 2026 Google LLC // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -17,4 +17,4 @@ package internal // Version is the current tagged release of the library. -const Version = "1.58.0" +const Version = "1.59.0" diff --git a/ci/resources/stemcell-version-bump/vendor/cloud.google.com/go/storage/pcu.go b/ci/resources/stemcell-version-bump/vendor/cloud.google.com/go/storage/pcu.go new file mode 100644 index 000000000..0078d2649 --- /dev/null +++ b/ci/resources/stemcell-version-bump/vendor/cloud.google.com/go/storage/pcu.go @@ -0,0 +1,364 @@ +// Copyright 2025 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package storage + +import ( + "context" + "errors" + "fmt" + "maps" + "math/rand" + "path" + "runtime" + "strconv" + "sync" + "time" + + gax "github.com/googleapis/gax-go/v2" +) + +// parallelUploadConfig holds configuration for Parallel Composite Uploads. +// Setting this config and EnableParallelUpload flag on Writer enables PCU. +// +// **Note:** This feature is currently experimental and its API surface may change +// in future releases. It is not yet recommended for production use. +type parallelUploadConfig struct { + // minSize is the minimum size of an object in bytes to use PCU. + // If an object's size is less than this value, a simple upload is performed. + // If this is not set, a default of 64 MiB will be used. + // To enable PCU for all uploads regardless of size, set this to 0. + minSize *int64 + + // partSize is the size of each part to be uploaded in parallel. + // Defaults to 16MiB. Must be a multiple of 256KiB. + partSize int + + // numWorkers is the number of goroutines to use for uploading parts in parallel. + // Defaults to a dynamic value based on the number of CPUs (min(4 + NumCPU/2, 16)). + numWorkers int + + // bufferPoolSize is the number of PartSize buffers to pool. + // Defaults to NumWorkers + 1. + bufferPoolSize int + + // tmpObjectPrefix is the prefix for temporary object names. + // Defaults to "gcs-go-sdk-pcu-tmp/". + tmpObjectPrefix string + + // retryOptions defines the retry behavior for uploading parts. + // Defaults to a sensible policy for part uploads (e.g., max 3 retries). + retryOptions []RetryOption + + // cleanupStrategy dictates how temporary parts are cleaned up. + // Defaults to CleanupAlways. + cleanupStrategy partCleanupStrategy + + // namingStrategy provides a strategy for naming temporary part objects. + // Defaults to a strategy that includes a random element to avoid hotspotting. + namingStrategy partNamingStrategy + + // metadataDecorator allows adding custom metadata to temporary part objects. + metadataDecorator partMetadataDecorator +} + +// partCleanupStrategy defines when temporary objects are deleted. +type partCleanupStrategy int + +const ( + // cleanupAlways clean up temporary parts on both success and failure. + cleanupAlways partCleanupStrategy = iota + // cleanupOnSuccess clean up temporary parts only on successful final composition. + cleanupOnSuccess + // cleanupNever means the application is responsible for cleaning up temporary parts. + cleanupNever +) + +func (s partCleanupStrategy) String() string { + switch s { + case cleanupAlways: + return "always" + case cleanupOnSuccess: + return "on_success" + case cleanupNever: + return "never" + default: + return fmt.Sprintf("PartCleanupStrategy(%d)", s) + } +} + +// partNamingStrategy interface for generating temporary object names. +type partNamingStrategy interface { + newPartName(bucket, prefix, finalName string, partNumber int) string +} + +// defaultNamingStrategy provides a default implementation for naming temporary parts. +type defaultNamingStrategy struct{} + +// newPartName creates a unique name for a temporary part to avoid hotspotting. +func (d *defaultNamingStrategy) newPartName(bucket, prefix, finalName string, partNumber int) string { + rnd := rand.Uint64() + return path.Join(prefix, fmt.Sprintf("%x-%s-part-%d", rnd, finalName, partNumber)) +} + +// partMetadataDecorator interface for modifying temporary object metadata. +type partMetadataDecorator interface { + Decorate(attrs *ObjectAttrs) +} + +const ( + defaultPartSize = 16 * 1024 * 1024 // 16 MiB + defaultMinSize = 64 * 1024 * 1024 // 64 MiB + baseWorkers = 4 + maxWorkers = 16 + defaultTmpObjectPrefix = "gcs-go-sdk-pcu-tmp/" + maxComposeComponents = 32 + defaultMaxRetries = 3 + defaultBaseDelay = 100 * time.Millisecond + defaultMaxDelay = 5 * time.Second + pcuPartNumberMetadataKey = "x-goog-meta-gcs-pcu-part-number" + pcuFinalObjectMetadataKey = "x-goog-meta-gcs-pcu-final-object" +) + +func (c *parallelUploadConfig) defaults() { + if c.minSize == nil { + c.minSize = new(int64) + *c.minSize = defaultMinSize + } + if c.partSize == 0 { + c.partSize = defaultPartSize + } + // Use a heuristic for the number of workers: start with 4, add 1 for + // every 2 CPUs, but don't exceed a cap of 16. This provides a + // balance between parallelism and resource contention. + if c.numWorkers == 0 { + c.numWorkers = min(baseWorkers+(runtime.NumCPU()/2), maxWorkers) + } + if c.bufferPoolSize == 0 { + c.bufferPoolSize = c.numWorkers + 1 + } + if c.tmpObjectPrefix == "" { + c.tmpObjectPrefix = defaultTmpObjectPrefix + } + if c.retryOptions == nil { + c.retryOptions = []RetryOption{ + WithMaxAttempts(defaultMaxRetries), + WithBackoff(gax.Backoff{ + Initial: defaultBaseDelay, + Max: defaultMaxDelay, + }), + } + } + if c.cleanupStrategy == 0 { + c.cleanupStrategy = cleanupAlways + } + if c.namingStrategy == nil { + c.namingStrategy = &defaultNamingStrategy{} + } +} + +type pcuState struct { + ctx context.Context + cancel context.CancelFunc + w *Writer + config *parallelUploadConfig + + mu sync.Mutex + // Handles to the uploaded temporary parts, keyed by partNumber. + partMap map[int]*ObjectHandle + // Handles to intermediate composite objects, keyed by their object name. + intermediateMap map[string]*ObjectHandle + failedDeletes []*ObjectHandle + errOnce sync.Once + firstErr error + errors []error + partNum int + currentBuffer []byte + bytesBuffered int64 + + bufferCh chan []byte + uploadCh chan uploadTask + resultCh chan uploadResult + workerWG sync.WaitGroup + collectorWG sync.WaitGroup + started bool + + // Function to upload a part; can be overridden for testing. + uploadPartFn func(s *pcuState, task uploadTask) (*ObjectHandle, *ObjectAttrs, error) +} + +type uploadTask struct { + partNumber int + buffer []byte + size int64 +} + +type uploadResult struct { + partNumber int + obj *ObjectAttrs + handle *ObjectHandle + err error +} + +func (w *Writer) initPCU(ctx context.Context) error { + // TODO: Check if PCU is enabled on the Writer. + + // TODO: Get the config from the Writer. + cfg := ¶llelUploadConfig{} + cfg.defaults() + + // Ensure PartSize is a multiple of googleapi.MinUploadChunkSize. + cfg.partSize = gRPCChunkSize(cfg.partSize) + + pCtx, cancel := context.WithCancel(ctx) + + state := &pcuState{ + ctx: pCtx, + cancel: cancel, + w: w, + config: cfg, + bufferCh: make(chan []byte, cfg.bufferPoolSize), + uploadCh: make(chan uploadTask), + resultCh: make(chan uploadResult), + partMap: make(map[int]*ObjectHandle), + intermediateMap: make(map[string]*ObjectHandle), + uploadPartFn: (*pcuState).uploadPart, + } + // TODO: Assign the state to the Writer + + for i := 0; i < cfg.bufferPoolSize; i++ { + state.bufferCh <- make([]byte, cfg.partSize) + } + + state.workerWG.Add(cfg.numWorkers) + for i := 0; i < cfg.numWorkers; i++ { + go state.worker() + } + + state.collectorWG.Add(1) + go state.resultCollector() + + // Handle to get the first buffer. + select { + case <-state.ctx.Done(): + return state.ctx.Err() + case state.currentBuffer = <-state.bufferCh: + state.bytesBuffered = 0 + } + state.started = true + return nil +} + +// worker processes upload tasks from upload channel, reporting results +// and returning buffers to the pool. +func (s *pcuState) worker() { + defer s.workerWG.Done() + for { + select { + case <-s.ctx.Done(): + return + case task, ok := <-s.uploadCh: + if !ok { + return + } + func(t uploadTask) { + // Ensure the buffer is returned to the pool. + defer func() { s.bufferCh <- t.buffer }() + // This handles the case where cancellation happens before we begin upload. + select { + case <-s.ctx.Done(): + s.resultCh <- uploadResult{partNumber: t.partNumber, err: s.ctx.Err()} + return + default: + } + + handle, attrs, err := s.uploadPartFn(s, t) + + // Always send a result to the collector. + s.resultCh <- uploadResult{partNumber: t.partNumber, obj: attrs, handle: handle, err: err} + }(task) + } + } +} + +// TODO: add retry logic. +func (s *pcuState) uploadPart(task uploadTask) (*ObjectHandle, *ObjectAttrs, error) { + partName := s.config.namingStrategy.newPartName(s.w.o.bucket, s.config.tmpObjectPrefix, s.w.o.object, task.partNumber) + partHandle := s.w.o.c.Bucket(s.w.o.bucket).Object(partName) + + pw := partHandle.NewWriter(s.ctx) + pw.ObjectAttrs.Name = partName + pw.ObjectAttrs.Size = task.size + pw.SendCRC32C = s.w.SendCRC32C + pw.ChunkSize = 0 // Force single-shot upload for parts. + // Clear fields not applicable to parts or that are set by compose. + pw.ObjectAttrs.CRC32C = 0 + pw.ObjectAttrs.MD5 = nil + setPartMetadata(pw, s, task) + + _, err := pw.Write(task.buffer[:task.size]) + if err != nil { + pw.CloseWithError(err) + return nil, nil, fmt.Errorf("failed to write part %d: %w", task.partNumber, err) + } + + if err := pw.Close(); err != nil { + return nil, nil, fmt.Errorf("failed to close part %d: %w", task.partNumber, err) + } + + return partHandle, pw.Attrs(), nil +} + +func setPartMetadata(pw *Writer, s *pcuState, task uploadTask) { + partNumberStr := strconv.Itoa(task.partNumber) + var md map[string]string + if s.w.ObjectAttrs.Metadata != nil { + md = maps.Clone(s.w.ObjectAttrs.Metadata) + } else { + md = make(map[string]string) + } + pw.ObjectAttrs.Metadata = md + pw.ObjectAttrs.Metadata[pcuPartNumberMetadataKey] = partNumberStr + pw.ObjectAttrs.Metadata[pcuFinalObjectMetadataKey] = s.w.o.object + if s.config.metadataDecorator != nil { + s.config.metadataDecorator.Decorate(&pw.ObjectAttrs) + } +} + +func (s *pcuState) resultCollector() { + defer s.collectorWG.Done() + for result := range s.resultCh { + if result.err != nil { + s.setError(result.err) + } else if result.handle != nil { + s.mu.Lock() + s.partMap[result.partNumber] = result.handle + s.mu.Unlock() + } + } +} + +func (s *pcuState) setError(err error) { + if err == nil || errors.Is(err, context.Canceled) { + return + } + s.mu.Lock() + defer s.mu.Unlock() + s.errors = append(s.errors, err) + + s.errOnce.Do(func() { + s.firstErr = err + s.cancel() // Cancel context on first error. + }) +} diff --git a/ci/resources/stemcell-version-bump/vendor/cloud.google.com/go/storage/reader.go b/ci/resources/stemcell-version-bump/vendor/cloud.google.com/go/storage/reader.go index 0c5f5be0a..9be40dfac 100644 --- a/ci/resources/stemcell-version-bump/vendor/cloud.google.com/go/storage/reader.go +++ b/ci/resources/stemcell-version-bump/vendor/cloud.google.com/go/storage/reader.go @@ -161,11 +161,19 @@ func (o *ObjectHandle) NewRangeReader(ctx context.Context, offset, length int64) // preview; please contact your account manager if interested. The option // [experimental.WithGRPCBidiReads] or [experimental.WithZonalBucketAPIs] // must be selected in order to use this API. + +// NewMultiRangeDownloader creates a multi-range reader for an object. +// Must be called on a gRPC client created using [NewGRPCClient]. func (o *ObjectHandle) NewMultiRangeDownloader(ctx context.Context) (mrd *MultiRangeDownloader, err error) { - // This span covers the life of the reader. It is closed via the context - // in Reader.Close. - ctx, _ = startSpan(ctx, "Object.MultiRangeDownloader") - defer func() { endSpan(ctx, err) }() + // This span covers the life of the MRD. It is closed via the context + // in MultiRangeDownloader.Close. + var spanCtx context.Context + spanCtx, _ = startSpan(ctx, "Object.MultiRangeDownloader") + defer func() { + if err != nil { + endSpan(spanCtx, err) + } + }() if err := o.validate(); err != nil { return nil, err @@ -187,15 +195,8 @@ func (o *ObjectHandle) NewMultiRangeDownloader(ctx context.Context) (mrd *MultiR handle: &o.readHandle, } - r, err := o.c.tc.NewMultiRangeDownloader(ctx, params, opts...) - - // Pass the context so that the span can be closed in MultiRangeDownloader.Close(), or close the - // span now if there is an error. - if err == nil { - r.ctx = ctx - } - - return r, err + // This call will return the *MultiRangeDownloader with the .impl field set. + return o.c.tc.NewMultiRangeDownloader(spanCtx, params, opts...) } // decompressiveTranscoding returns true if the request was served decompressed @@ -387,17 +388,9 @@ func (r *Reader) ReadHandle() ReadHandle { // // This API is currently in preview and is not yet available for general use. type MultiRangeDownloader struct { - Attrs ReaderObjectAttrs - reader multiRangeDownloader - ctx context.Context -} - -type multiRangeDownloader interface { - add(output io.Writer, offset, limit int64, callback func(int64, int64, error)) - wait() - close() error - getHandle() []byte - error() error + // Attrs is populated when NewMultiRangeDownloader returns. + Attrs ReaderObjectAttrs + impl internalMultiRangeDownloader } // Add adds a new range to MultiRangeDownloader. @@ -407,8 +400,11 @@ type multiRangeDownloader interface { // // A negative offset value will be interpreted as the number of bytes from the // end of the object to be returned. Requesting a negative offset with magnitude -// larger than the size of the object will return the entire object. An offset -// larger than the size of the object will result in an OutOfRange error. +// larger than the size of the object will return the entire object. +// +// An offset larger than the size of the object returns an OutOfRange error via +// the callback and enters a permanent error state. All subsequent calls to Close +// will return this same error. // // A limit of zero indicates that there is no limit, and a negative limit will // cause an error. @@ -421,7 +417,7 @@ type multiRangeDownloader interface { // of the read. Note that the length of the data read may be less than the // requested length if the end of the object is reached. func (mrd *MultiRangeDownloader) Add(output io.Writer, offset, length int64, callback func(int64, int64, error)) { - mrd.reader.add(output, offset, length, callback) + mrd.impl.add(output, offset, length, callback) } // Close the MultiRangeDownloader. It must be called when done reading. @@ -430,9 +426,11 @@ func (mrd *MultiRangeDownloader) Add(output io.Writer, offset, length int64, cal // This will immediately close the stream and can result in a // "stream closed early" error if a response for a range is still not processed. // Call [MultiRangeDownloader.Wait] to avoid this error. +// +// If the downloader is in a permanent error state, this will return an error. func (mrd *MultiRangeDownloader) Close() error { - err := mrd.reader.close() - endSpan(mrd.ctx, err) + err := mrd.impl.close(nil) + endSpan(mrd.impl.getSpanCtx(), err) return err } @@ -440,18 +438,18 @@ func (mrd *MultiRangeDownloader) Close() error { // Adding new ranges after this has been called will cause an error. // Wait will wait for all callbacks to finish. func (mrd *MultiRangeDownloader) Wait() { - mrd.reader.wait() + mrd.impl.wait() } // GetHandle returns the read handle. This can be used to further speed up the // follow up read if the same object is read through a different stream. func (mrd *MultiRangeDownloader) GetHandle() []byte { - return mrd.reader.getHandle() + return mrd.impl.getHandle() // TODO: Consider plumbing context from caller } // Error returns an error if the MultiRangeDownloader is in a permanent failure // state. It returns a nil error if the MultiRangeDownloader is open and can be // used. func (mrd *MultiRangeDownloader) Error() error { - return mrd.reader.error() + return mrd.impl.getPermanentError() } diff --git a/ci/resources/stemcell-version-bump/vendor/cloud.google.com/go/storage/writer.go b/ci/resources/stemcell-version-bump/vendor/cloud.google.com/go/storage/writer.go index cba6184cb..e65e714d5 100644 --- a/ci/resources/stemcell-version-bump/vendor/cloud.google.com/go/storage/writer.go +++ b/ci/resources/stemcell-version-bump/vendor/cloud.google.com/go/storage/writer.go @@ -65,9 +65,14 @@ type Writer struct { // from being sent. If SendCRC32C is true and the Writer's CRC32C field is // populated, that checksum will still be sent to GCS for validation. // + // Automatic CRC32C checksum calculation introduces increased CPU overhead + // because of checksum computation in gRPC writes. Use this field to disable + // it if needed. + // // Note: DisableAutoChecksum must be set before the first call to // Writer.Write(). Automatic checksumming is not enabled for writes - // using the HTTP client or for unfinalized writes to appendable objects in gRPC. + // using the HTTP client or for full object checksums for unfinalized writes to + // appendable objects in gRPC. DisableAutoChecksum bool // ChunkSize controls the maximum number of bytes of the object that the diff --git a/ci/resources/stemcell-version-bump/vendor/modules.txt b/ci/resources/stemcell-version-bump/vendor/modules.txt index 347044fd0..c33bf5b8b 100644 --- a/ci/resources/stemcell-version-bump/vendor/modules.txt +++ b/ci/resources/stemcell-version-bump/vendor/modules.txt @@ -42,7 +42,7 @@ cloud.google.com/go/iam/apiv1/iampb cloud.google.com/go/monitoring/apiv3/v2 cloud.google.com/go/monitoring/apiv3/v2/monitoringpb cloud.google.com/go/monitoring/internal -# cloud.google.com/go/storage v1.58.0 +# cloud.google.com/go/storage v1.59.0 ## explicit; go 1.24.0 cloud.google.com/go/storage cloud.google.com/go/storage/experimental