From cfa2104c70b5911c0639b6a7c5de0c9b7613a8d5 Mon Sep 17 00:00:00 2001 From: Torsten Dittmann Date: Thu, 21 May 2026 17:20:32 +0400 Subject: [PATCH 1/2] feat: support concurrent chunk uploads --- client/client.go | 152 ++++++++++++++++++++++++++++++++++++++++------- 1 file changed, 132 insertions(+), 20 deletions(-) diff --git a/client/client.go b/client/client.go index 74798bef..d91efc98 100644 --- a/client/client.go +++ b/client/client.go @@ -74,11 +74,11 @@ type Client struct { func New(optionalSetters ...ClientOption) Client { headers := map[string]string{ "X-Appwrite-Response-Format" : "1.9.5", - "user-agent" : fmt.Sprintf("AppwriteGoSDK/v4.0.0 (%s; %s)", runtime.GOOS, runtime.GOARCH), + "user-agent" : fmt.Sprintf("AppwriteGoSDK/v4.1.0 (%s; %s)", runtime.GOOS, runtime.GOARCH), "x-sdk-name": "Go", "x-sdk-platform": "server", "x-sdk-language": "go", - "x-sdk-version": "v4.0.0", + "x-sdk-version": "v4.1.0", } httpClient, err := GetDefaultClient(defaultTimeout) if err != nil { @@ -148,6 +148,22 @@ func isFileUpload(headers map[string]interface{}) bool { return false } +func copyHeaders(headers map[string]interface{}) map[string]interface{} { + clone := make(map[string]interface{}, len(headers)) + for key, value := range headers { + clone[key] = value + } + return clone +} + +func copyParams(params map[string]interface{}) map[string]interface{} { + clone := make(map[string]interface{}, len(params)) + for key, value := range params { + clone[key] = value + } + return clone +} + func (client *Client) FileUpload(url string, headers map[string]interface{}, params map[string]interface{}, paramName string, uploadId string) (*ClientResponse, error) { inputFile, ok := params[paramName].(file.InputFile) if !ok { @@ -178,6 +194,7 @@ func (client *Client) FileUpload(url string, headers map[string]interface{}, par if uploadId != "" { resp, err := client.Call("GET", url+"/"+uploadId, nil, nil) if err == nil { + result = resp var result map[string]interface{} if resStr, ok := resp.Result.(string); ok { _ = json.Unmarshal([]byte(resStr), &result) @@ -217,40 +234,135 @@ func (client *Client) FileUpload(url string, headers map[string]interface{}, par return result, nil } - for i := currentChunk; i < numChunks; i++ { + parseUploadId := func(resp *ClientResponse) string { + var parsed map[string]interface{} + if resp != nil && strings.HasPrefix(resp.Type, "application/json") { + err = json.Unmarshal([]byte(resp.Result.(string)), &parsed) + if err == nil { + id, _ := parsed["$id"].(string) + return id + } + } + return "" + } + + isUploadComplete := func(resp *ClientResponse) bool { + if resp == nil || !strings.HasPrefix(resp.Type, "application/json") { + return false + } + + var parsed map[string]interface{} + if resStr, ok := resp.Result.(string); ok { + _ = json.Unmarshal([]byte(resStr), &parsed) + } else { + parsed, _ = resp.Result.(map[string]interface{}) + } + if parsed == nil || parsed["chunksUploaded"] == nil { + return false + } + + chunksUploaded, ok := parsed["chunksUploaded"].(float64) + if !ok { + return false + } + chunksTotal := float64(numChunks) + if value, ok := parsed["chunksTotal"].(float64); ok { + chunksTotal = value + } + + return chunksUploaded >= chunksTotal + } + + uploadChunk := func(i int64, id string) (*ClientResponse, error) { chunkSize := client.ChunkSize - offset := int64(i) * chunkSize + offset := i * client.ChunkSize if i == numChunks-1 { chunkSize = fileInfo.Size() - offset - inputFile.Data = make([]byte, chunkSize) } - _, err := file.ReadAt(inputFile.Data, offset) + + chunkFile := inputFile + chunkFile.Data = make([]byte, chunkSize) + _, err := file.ReadAt(chunkFile.Data, offset) if err != nil && err != io.EOF { return nil, err } - params[paramName] = inputFile - if uploadId != "" { - headers["x-appwrite-id"] = uploadId + + chunkParams := copyParams(params) + chunkParams[paramName] = chunkFile + chunkHeaders := copyHeaders(headers) + if id != "" { + chunkHeaders["x-appwrite-id"] = id } + totalSize := fileInfo.Size() start := offset - end := offset + client.ChunkSize - 1 - if end >= totalSize { - end = totalSize - 1 - } - headers["content-range"] = fmt.Sprintf("bytes %d-%d/%d", start, end, totalSize) - result, err = client.Call("POST", url, headers, params) + end := offset + chunkSize - 1 + chunkHeaders["content-range"] = fmt.Sprintf("bytes %d-%d/%d", start, end, totalSize) + + return client.Call("POST", url, chunkHeaders, chunkParams) + } + + if currentChunk == 0 { + result, err = uploadChunk(0, uploadId) if err != nil { return nil, err } + if id := parseUploadId(result); id != "" { + uploadId = id + } + currentChunk = 1 + } - var parsed map[string]interface{} - if strings.HasPrefix(result.Type, "application/json") { - err = json.Unmarshal([]byte(result.Result.(string)), &parsed) - if err == nil { - uploadId, _ = parsed["$id"].(string) + type chunkResult struct { + index int64 + resp *ClientResponse + err error + } + + remainingChunks := numChunks - currentChunk + if remainingChunks <= 0 { + return result, nil + } + + concurrency := int64(8) + if remainingChunks < concurrency { + concurrency = remainingChunks + } + + jobs := make(chan int64) + results := make(chan chunkResult, int(remainingChunks)) + + for worker := int64(0); worker < concurrency; worker++ { + go func() { + for i := range jobs { + resp, err := uploadChunk(i, uploadId) + results <- chunkResult{index: i, resp: resp, err: err} } + }() + } + + go func() { + for i := currentChunk; i < numChunks; i++ { + jobs <- i } + close(jobs) + }() + + var firstErr error + for i := int64(0); i < remainingChunks; i++ { + chunk := <-results + if chunk.err != nil { + if firstErr == nil { + firstErr = chunk.err + } + continue + } + if isUploadComplete(chunk.resp) { + result = chunk.resp + } + } + if firstErr != nil { + return nil, firstErr } return result, nil } From 63b55bf9ef7018c769173fd0329e6e71b7f750be Mon Sep 17 00:00:00 2001 From: Torsten Dittmann Date: Thu, 21 May 2026 20:46:22 +0400 Subject: [PATCH 2/2] feat: support concurrent chunk uploads --- client/client.go | 41 +++++++++++++++++++++++++++-------------- 1 file changed, 27 insertions(+), 14 deletions(-) diff --git a/client/client.go b/client/client.go index d91efc98..b64dfd5b 100644 --- a/client/client.go +++ b/client/client.go @@ -237,8 +237,7 @@ func (client *Client) FileUpload(url string, headers map[string]interface{}, par parseUploadId := func(resp *ClientResponse) string { var parsed map[string]interface{} if resp != nil && strings.HasPrefix(resp.Type, "application/json") { - err = json.Unmarshal([]byte(resp.Result.(string)), &parsed) - if err == nil { + if unmarshalErr := json.Unmarshal([]byte(resp.Result.(string)), &parsed); unmarshalErr == nil { id, _ := parsed["$id"].(string) return id } @@ -331,38 +330,52 @@ func (client *Client) FileUpload(url string, headers map[string]interface{}, par jobs := make(chan int64) results := make(chan chunkResult, int(remainingChunks)) + done := make(chan struct{}) for worker := int64(0); worker < concurrency; worker++ { go func() { - for i := range jobs { - resp, err := uploadChunk(i, uploadId) - results <- chunkResult{index: i, resp: resp, err: err} + for { + select { + case <-done: + return + case i, ok := <-jobs: + if !ok { + return + } + resp, err := uploadChunk(i, uploadId) + results <- chunkResult{index: i, resp: resp, err: err} + } } }() } go func() { + defer close(jobs) for i := currentChunk; i < numChunks; i++ { - jobs <- i + select { + case <-done: + return + case jobs <- i: + } } - close(jobs) }() - var firstErr error + var lastResult *ClientResponse + completeFound := false for i := int64(0); i < remainingChunks; i++ { chunk := <-results if chunk.err != nil { - if firstErr == nil { - firstErr = chunk.err - } - continue + close(done) + return nil, chunk.err } + lastResult = chunk.resp if isUploadComplete(chunk.resp) { result = chunk.resp + completeFound = true } } - if firstErr != nil { - return nil, firstErr + if !completeFound && lastResult != nil { + result = lastResult } return result, nil }