diff --git a/src/client.ts b/src/client.ts index 37bb016..50724f6 100644 --- a/src/client.ts +++ b/src/client.ts @@ -415,85 +415,235 @@ class Client { return await this.call(method, url, headers, payload); } - let start = 0; - let response = null; + const totalChunks = Math.ceil(size / Client.CHUNK_SIZE); - while (start < size) { - let end = start + Client.CHUNK_SIZE; - if (end >= size) { - end = size; - } + // Upload first chunk alone to get the upload ID + const firstChunkEnd = Math.min(Client.CHUNK_SIZE, size); + const firstChunkHeaders = { ...headers, 'content-range': `bytes 0-${firstChunkEnd - 1}/${size}` }; + const firstChunk = await file.slice(0, firstChunkEnd); + const firstPayload = { ...originalPayload }; + firstPayload[fileParam] = new File([firstChunk], file.filename); - headers['content-range'] = `bytes ${start}-${end - 1}/${size}`; - const chunk = await file.slice(start, end); + let response = await this.call(method, url, firstChunkHeaders, firstPayload); + const uploadId = response?.$id; - const payload = { ...originalPayload }; - payload[fileParam] = new File([chunk], file.filename); + if (onProgress && typeof onProgress === 'function') { + onProgress({ + $id: uploadId, + progress: Math.round((firstChunkEnd / size) * 100), + sizeUploaded: firstChunkEnd, + chunksTotal: totalChunks, + chunksUploaded: 1 + }); + } + + if (totalChunks === 1) { + return response; + } + + // Prepare remaining chunks + const chunks: { start: number; end: number }[] = []; + for (let i = 1; i < totalChunks; i++) { + const start = i * Client.CHUNK_SIZE; + const end = Math.min(start + Client.CHUNK_SIZE, size); + chunks.push({ start, end }); + } - response = await this.call(method, url, headers, payload); + // Upload remaining chunks with max concurrency of 8 + const CONCURRENCY = 8; + let completedCount = 1; + let uploadedBytes = firstChunkEnd; + let lastResponse = response; + let finalResponse = null; + let failed = false; + + const isUploadComplete = (chunkResponse: any) => { + const chunksUploaded = chunkResponse?.chunksUploaded; + const chunksTotal = chunkResponse?.chunksTotal ?? totalChunks; + return typeof chunksUploaded === 'number' && typeof chunksTotal === 'number' && chunksUploaded >= chunksTotal; + }; + + const uploadChunk = async (chunk: typeof chunks[0]) => { + const chunkHeaders = { ...headers }; + if (uploadId) { + chunkHeaders['x-appwrite-id'] = uploadId; + } + chunkHeaders['content-range'] = `bytes ${chunk.start}-${chunk.end - 1}/${size}`; + + const chunkBlob = await file.slice(chunk.start, chunk.end); + const chunkPayload = { ...originalPayload }; + chunkPayload[fileParam] = new File([chunkBlob], file.filename); + + const chunkResponse = await this.call(method, url, chunkHeaders, chunkPayload); + + if (failed) { + return chunkResponse; + } + + completedCount++; + uploadedBytes += (chunk.end - chunk.start); + + lastResponse = chunkResponse; + if (isUploadComplete(chunkResponse)) { + finalResponse = chunkResponse; + } if (onProgress && typeof onProgress === 'function') { onProgress({ - $id: response.$id, - progress: Math.round((end / size) * 100), - sizeUploaded: end, - chunksTotal: Math.ceil(size / Client.CHUNK_SIZE), - chunksUploaded: Math.ceil(end / Client.CHUNK_SIZE) + $id: uploadId, + progress: Math.round((uploadedBytes / size) * 100), + sizeUploaded: uploadedBytes, + chunksTotal: totalChunks, + chunksUploaded: completedCount }); } - if (response && response.$id) { - headers['x-appwrite-id'] = response.$id; - } + return chunkResponse; + }; - start = end; + // Process with limited concurrency using a worker pool + const queue = [...chunks]; + const workers: Promise[] = []; + const workerCount = Math.min(CONCURRENCY, queue.length); + + for (let i = 0; i < workerCount; i++) { + workers.push( + (async () => { + while (!failed && queue.length > 0) { + const chunk = queue.shift()!; + try { + await uploadChunk(chunk); + } catch (error) { + failed = true; + throw error; + } + } + })() + ); } - return response; + await Promise.all(workers); + + return finalResponse ?? lastResponse; } if (file.size <= Client.CHUNK_SIZE) { return await this.call(method, url, headers, originalPayload); } - let start = 0; - let response = null; + const totalChunks = Math.ceil(file.size / Client.CHUNK_SIZE); + + // Upload first chunk alone to get the upload ID + const firstChunkEnd = Math.min(Client.CHUNK_SIZE, file.size); + const firstChunkHeaders = { ...headers, 'content-range': `bytes 0-${firstChunkEnd - 1}/${file.size}` }; + const firstChunk = file.slice(0, firstChunkEnd); + const firstPayload = { ...originalPayload }; + firstPayload[fileParam] = new File([firstChunk], file.name); + + let response = await this.call(method, url, firstChunkHeaders, firstPayload); + const uploadId = response?.$id; + + if (onProgress && typeof onProgress === 'function') { + onProgress({ + $id: uploadId, + progress: Math.round((firstChunkEnd / file.size) * 100), + sizeUploaded: firstChunkEnd, + chunksTotal: totalChunks, + chunksUploaded: 1 + }); + } - while (start < file.size) { - let end = start + Client.CHUNK_SIZE; // Prepare end for the next chunk - if (end >= file.size) { - end = file.size; // Adjust for the last chunk to include the last byte - } + if (totalChunks === 1) { + return response; + } - headers['content-range'] = `bytes ${start}-${end-1}/${file.size}`; - const chunk = file.slice(start, end); + // Prepare remaining chunks + const chunks: { start: number; end: number }[] = []; + for (let i = 1; i < totalChunks; i++) { + const start = i * Client.CHUNK_SIZE; + const end = Math.min(start + Client.CHUNK_SIZE, file.size); + chunks.push({ start, end }); + } - let payload = { ...originalPayload }; - payload[fileParam] = new File([chunk], file.name); + // Upload remaining chunks with max concurrency of 8 + const CONCURRENCY = 8; + let completedCount = 1; + let uploadedBytes = firstChunkEnd; + let lastResponse = response; + let finalResponse = null; + let failed = false; + + const isUploadComplete = (chunkResponse: any) => { + const chunksUploaded = chunkResponse?.chunksUploaded; + const chunksTotal = chunkResponse?.chunksTotal ?? totalChunks; + return typeof chunksUploaded === 'number' && typeof chunksTotal === 'number' && chunksUploaded >= chunksTotal; + }; - response = await this.call(method, url, headers, payload); + const uploadChunk = async (chunk: typeof chunks[0]) => { + const chunkHeaders = { ...headers }; + if (uploadId) { + chunkHeaders['x-appwrite-id'] = uploadId; + } + chunkHeaders['content-range'] = `bytes ${chunk.start}-${chunk.end - 1}/${file.size}`; + + const chunkBlob = file.slice(chunk.start, chunk.end); + const chunkPayload = { ...originalPayload }; + chunkPayload[fileParam] = new File([chunkBlob], file.name); + + const chunkResponse = await this.call(method, url, chunkHeaders, chunkPayload); + + if (failed) { + return chunkResponse; + } + + completedCount++; + uploadedBytes += (chunk.end - chunk.start); + + lastResponse = chunkResponse; + if (isUploadComplete(chunkResponse)) { + finalResponse = chunkResponse; + } if (onProgress && typeof onProgress === 'function') { onProgress({ - $id: response.$id, - progress: Math.round((end / file.size) * 100), - sizeUploaded: end, - chunksTotal: Math.ceil(file.size / Client.CHUNK_SIZE), - chunksUploaded: Math.ceil(end / Client.CHUNK_SIZE) + $id: uploadId, + progress: Math.round((uploadedBytes / file.size) * 100), + sizeUploaded: uploadedBytes, + chunksTotal: totalChunks, + chunksUploaded: completedCount }); } - if (response && response.$id) { - headers['x-appwrite-id'] = response.$id; - } + return chunkResponse; + }; - start = end; + // Process with limited concurrency using a worker pool + const queue = [...chunks]; + const workers: Promise[] = []; + const workerCount = Math.min(CONCURRENCY, queue.length); + + for (let i = 0; i < workerCount; i++) { + workers.push( + (async () => { + while (!failed && queue.length > 0) { + const chunk = queue.shift()!; + try { + await uploadChunk(chunk); + } catch (error) { + failed = true; + throw error; + } + } + })() + ); } - return response; + await Promise.all(workers); + + return finalResponse ?? lastResponse; } - async ping(): Promise { + async ping(): Promise { return this.call('GET', new URL(this.config.endpoint + '/ping')); }