diff --git a/client/client.go b/client/client.go index 74798bef..b64dfd5b 100644 --- a/client/client.go +++ b/client/client.go @@ -74,11 +74,11 @@ type Client struct { func New(optionalSetters ...ClientOption) Client { headers := map[string]string{ "X-Appwrite-Response-Format" : "1.9.5", - "user-agent" : fmt.Sprintf("AppwriteGoSDK/v4.0.0 (%s; %s)", runtime.GOOS, runtime.GOARCH), + "user-agent" : fmt.Sprintf("AppwriteGoSDK/v4.1.0 (%s; %s)", runtime.GOOS, runtime.GOARCH), "x-sdk-name": "Go", "x-sdk-platform": "server", "x-sdk-language": "go", - "x-sdk-version": "v4.0.0", + "x-sdk-version": "v4.1.0", } httpClient, err := GetDefaultClient(defaultTimeout) if err != nil { @@ -148,6 +148,22 @@ func isFileUpload(headers map[string]interface{}) bool { return false } +func copyHeaders(headers map[string]interface{}) map[string]interface{} { + clone := make(map[string]interface{}, len(headers)) + for key, value := range headers { + clone[key] = value + } + return clone +} + +func copyParams(params map[string]interface{}) map[string]interface{} { + clone := make(map[string]interface{}, len(params)) + for key, value := range params { + clone[key] = value + } + return clone +} + func (client *Client) FileUpload(url string, headers map[string]interface{}, params map[string]interface{}, paramName string, uploadId string) (*ClientResponse, error) { inputFile, ok := params[paramName].(file.InputFile) if !ok { @@ -178,6 +194,7 @@ func (client *Client) FileUpload(url string, headers map[string]interface{}, par if uploadId != "" { resp, err := client.Call("GET", url+"/"+uploadId, nil, nil) if err == nil { + result = resp var result map[string]interface{} if resStr, ok := resp.Result.(string); ok { _ = json.Unmarshal([]byte(resStr), &result) @@ -217,40 +234,148 @@ func (client *Client) FileUpload(url string, headers map[string]interface{}, par return result, nil } - for i := currentChunk; i < numChunks; i++ { + parseUploadId := func(resp *ClientResponse) string { + var parsed map[string]interface{} + if resp != nil && strings.HasPrefix(resp.Type, "application/json") { + if unmarshalErr := json.Unmarshal([]byte(resp.Result.(string)), &parsed); unmarshalErr == nil { + id, _ := parsed["$id"].(string) + return id + } + } + return "" + } + + isUploadComplete := func(resp *ClientResponse) bool { + if resp == nil || !strings.HasPrefix(resp.Type, "application/json") { + return false + } + + var parsed map[string]interface{} + if resStr, ok := resp.Result.(string); ok { + _ = json.Unmarshal([]byte(resStr), &parsed) + } else { + parsed, _ = resp.Result.(map[string]interface{}) + } + if parsed == nil || parsed["chunksUploaded"] == nil { + return false + } + + chunksUploaded, ok := parsed["chunksUploaded"].(float64) + if !ok { + return false + } + chunksTotal := float64(numChunks) + if value, ok := parsed["chunksTotal"].(float64); ok { + chunksTotal = value + } + + return chunksUploaded >= chunksTotal + } + + uploadChunk := func(i int64, id string) (*ClientResponse, error) { chunkSize := client.ChunkSize - offset := int64(i) * chunkSize + offset := i * client.ChunkSize if i == numChunks-1 { chunkSize = fileInfo.Size() - offset - inputFile.Data = make([]byte, chunkSize) } - _, err := file.ReadAt(inputFile.Data, offset) + + chunkFile := inputFile + chunkFile.Data = make([]byte, chunkSize) + _, err := file.ReadAt(chunkFile.Data, offset) if err != nil && err != io.EOF { return nil, err } - params[paramName] = inputFile - if uploadId != "" { - headers["x-appwrite-id"] = uploadId + + chunkParams := copyParams(params) + chunkParams[paramName] = chunkFile + chunkHeaders := copyHeaders(headers) + if id != "" { + chunkHeaders["x-appwrite-id"] = id } + totalSize := fileInfo.Size() start := offset - end := offset + client.ChunkSize - 1 - if end >= totalSize { - end = totalSize - 1 - } - headers["content-range"] = fmt.Sprintf("bytes %d-%d/%d", start, end, totalSize) - result, err = client.Call("POST", url, headers, params) + end := offset + chunkSize - 1 + chunkHeaders["content-range"] = fmt.Sprintf("bytes %d-%d/%d", start, end, totalSize) + + return client.Call("POST", url, chunkHeaders, chunkParams) + } + + if currentChunk == 0 { + result, err = uploadChunk(0, uploadId) if err != nil { return nil, err } + if id := parseUploadId(result); id != "" { + uploadId = id + } + currentChunk = 1 + } - var parsed map[string]interface{} - if strings.HasPrefix(result.Type, "application/json") { - err = json.Unmarshal([]byte(result.Result.(string)), &parsed) - if err == nil { - uploadId, _ = parsed["$id"].(string) + type chunkResult struct { + index int64 + resp *ClientResponse + err error + } + + remainingChunks := numChunks - currentChunk + if remainingChunks <= 0 { + return result, nil + } + + concurrency := int64(8) + if remainingChunks < concurrency { + concurrency = remainingChunks + } + + jobs := make(chan int64) + results := make(chan chunkResult, int(remainingChunks)) + done := make(chan struct{}) + + for worker := int64(0); worker < concurrency; worker++ { + go func() { + for { + select { + case <-done: + return + case i, ok := <-jobs: + if !ok { + return + } + resp, err := uploadChunk(i, uploadId) + results <- chunkResult{index: i, resp: resp, err: err} + } + } + }() + } + + go func() { + defer close(jobs) + for i := currentChunk; i < numChunks; i++ { + select { + case <-done: + return + case jobs <- i: } } + }() + + var lastResult *ClientResponse + completeFound := false + for i := int64(0); i < remainingChunks; i++ { + chunk := <-results + if chunk.err != nil { + close(done) + return nil, chunk.err + } + lastResult = chunk.resp + if isUploadComplete(chunk.resp) { + result = chunk.resp + completeFound = true + } + } + if !completeFound && lastResult != nil { + result = lastResult } return result, nil }