diff --git a/lib/services/launch_service.dart b/lib/services/launch_service.dart index 77c70eb..8884b74 100644 --- a/lib/services/launch_service.dart +++ b/lib/services/launch_service.dart @@ -9,6 +9,158 @@ import 'package:thirds/blake3.dart'; import '../models/project.dart'; import 'task_service.dart'; +/// Parallel uploader with multiple HTTP clients for concurrent uploads. +class ParallelUploader { + final List _clients; + final String _baseUrl; + final int _concurrency; + int _clientIdx = 0; + + ParallelUploader._(this._clients, this._baseUrl, this._concurrency); + + /// Create a new parallel uploader. + static ParallelUploader create(String baseUrl, {int concurrency = 3}) { + final clients = List.generate(concurrency, (_) => http.Client()); + return ParallelUploader._(clients, baseUrl, concurrency); + } + + /// Get the next client using round-robin selection. + http.Client _nextClient() { + final client = _clients[_clientIdx % _clients.length]; + _clientIdx++; + return client; + } + + /// The base URL for uploads. + String get baseUrl => _baseUrl; + + /// Upload chunks in parallel with controlled concurrency. + /// Returns a stream of results as each chunk completes. + Stream<_ChunkUploadResult> uploadChunksParallel({ + required List> chunks, + required Future Function(List chunk) encodeChunk, + required String url, + }) async* { + // Semaphore for concurrency control + final semaphore = _Semaphore(_concurrency); + final results = >[]; + + for (var i = 0; i < chunks.length; i++) { + final chunkIndex = i; + final chunk = chunks[i]; + + final future = () async { + await semaphore.acquire(); + final stopwatch = Stopwatch()..start(); + try { + final client = _nextClient(); + final compressedBody = await encodeChunk(chunk); + + final response = await client + .post( + Uri.parse(url), + headers: { + 'Content-Type': 'application/cbor+gzip', + 'Accept': 'application/cbor+gzip', + }, + body: compressedBody, + ).timeout(const Duration(minutes: 10)); + + stopwatch.stop(); + return _ChunkUploadResult( + chunkIndex: chunkIndex, + success: response.statusCode == 200, + statusCode: response.statusCode, + responseBody: response.bodyBytes, + duration: stopwatch.elapsed, + ); + } catch (e) { + stopwatch.stop(); + return _ChunkUploadResult( + chunkIndex: chunkIndex, + success: false, + statusCode: 0, + error: e.toString(), + duration: stopwatch.elapsed, + ); + } finally { + semaphore.release(); + } + }(); + + results.add(future); + } + + // Yield results as they complete + for (final future in results) { + yield await future; + } + } + + /// Make a single request using round-robin client selection. + Future post( + String url, { + required Map headers, + required Uint8List body, + }) { + final client = _nextClient(); + return client.post(Uri.parse(url), headers: headers, body: body); + } + + /// Close all clients. + void close() { + for (final client in _clients) { + client.close(); + } + } +} + +class _ChunkUploadResult { + final int chunkIndex; + final bool success; + final int statusCode; + final Uint8List? responseBody; + final String? error; + final Duration duration; + + _ChunkUploadResult({ + required this.chunkIndex, + required this.success, + required this.statusCode, + required this.duration, + this.responseBody, + this.error, + }); +} + +/// Simple semaphore for concurrency control. +class _Semaphore { + final int _maxCount; + int _currentCount = 0; + final _waitQueue = >[]; + + _Semaphore(this._maxCount); + + Future acquire() async { + if (_currentCount < _maxCount) { + _currentCount++; + return; + } + final completer = Completer(); + _waitQueue.add(completer); + await completer.future; + } + + void release() { + if (_waitQueue.isNotEmpty) { + final completer = _waitQueue.removeAt(0); + completer.complete(); + } else { + _currentCount--; + } + } +} + class LaunchService { final TaskService _taskService = TaskService(); static const String _apiPrefix = '/api/0'; @@ -97,7 +249,7 @@ class LaunchService { return _cancellationTokens[launchId] == true; } - /// Make an HTTP request with timeout and cancellation support + /// Make an HTTP request with cancellation support Future _makeRequest( String launchId, Future Function() requestFn, @@ -107,7 +259,11 @@ class LaunchService { throw Exception('Launch cancelled'); } - final responseFuture = requestFn(); + // Add 10 minute timeout to prevent hanging forever on stalled connections + final responseFuture = requestFn().timeout( + const Duration(minutes: 10), + onTimeout: () => throw TimeoutException('Request timed out'), + ); // Poll for cancellation while waiting for response final cancelCheckFuture = Future.doWhile(() async { @@ -119,21 +275,11 @@ class LaunchService { }); try { - // Race between response, timeout, and cancellation - final response = - await Future.any([ - responseFuture, - cancelCheckFuture.then( - (_) => throw Exception('Launch cancelled'), - ), - ]).timeout( - _httpTimeout, - onTimeout: () { - throw TimeoutException( - 'Request timed out after ${_httpTimeout.inSeconds} seconds', - ); - }, - ); + // Race between response and cancellation + final response = await Future.any([ + responseFuture, + cancelCheckFuture.then((_) => throw Exception('Launch cancelled')), + ]); return response; } catch (e) { @@ -1063,7 +1209,8 @@ class LaunchService { ); } - /// Upload missing parts with chunking (like xmit) + /// Upload missing parts with chunking and parallel uploads (like xmit) + /// Uses DNS resolution to discover multiple IPs and round-robins across them. Future _uploadMissingParts( String launchId, String url, @@ -1115,46 +1262,109 @@ class LaunchService { chunks.add(currentChunk); } + // Create parallel uploader with IP discovery and round-robin + final uploader = ParallelUploader.create(url, concurrency: 3); + + // Calculate total size for display + final totalBytes = toUpload.fold(0, (sum, f) => sum + f.length); + final totalSizeStr = _formatBytes(totalBytes); + _logToStep( - 'Uploading ${toUpload.length} parts in ${chunks.length} chunk(s)', + '0/${chunks.length} chunks ($totalSizeStr)', onStepUpdate, ); - // Upload each chunk - for (var i = 0; i < chunks.length; i++) { - if (_isCancelled(launchId)) { - throw Exception('Upload cancelled'); + try { + // Encode chunk function for the parallel uploader + Future encodeChunk(List chunk) async { + final chunkBytes = chunk.map((bytes) => CborBytes(bytes)).toList(); + return _encodeRequest( + { + 5: CborString(domain), + 7: CborList(chunkBytes), + }, + authKey, + teamId, + ); } - final chunk = chunks[i]; - final chunkBytes = chunk.map((bytes) => CborBytes(bytes)).toList(); + // Upload chunks in parallel + var completedChunks = 0; + final errors = []; - _logToStep( - 'Uploading chunk ${i + 1}/${chunks.length} (${chunk.length} parts)', - onStepUpdate, - ); + await for (final result in uploader.uploadChunksParallel( + chunks: chunks, + encodeChunk: encodeChunk, + url: url, + )) { + if (_isCancelled(launchId)) { + throw Exception('Upload cancelled'); + } - final compressed = await _encodeRequest( - { - 5: CborString(domain), - 7: CborList(chunkBytes), - }, - authKey, - teamId, - ); + completedChunks++; - final (decoded, _) = await _makeApiRequest(launchId, url, compressed); + if (result.success && result.responseBody != null) { + // Decompress and decode response to log any messages + try { + final decompressed = gzip.decode(result.responseBody!); + final decoded = cbor.decode(decompressed) as CborMap; - // Log any response messages - _logResponseMessages( - errors: _parseCborStringList(decoded, 2), - warnings: _parseCborStringList(decoded, 3), - messages: _parseCborStringList(decoded, 4), - onStepUpdate: onStepUpdate, - ); + _logResponseMessages( + errors: _parseCborStringList(decoded, 2), + warnings: _parseCborStringList(decoded, 3), + messages: _parseCborStringList(decoded, 4), + onStepUpdate: onStepUpdate, + ); + } catch (_) { + // Ignore decode errors for response logging + } + + // Format duration + final durationStr = _formatDuration(result.duration); + + // Log completed chunk with timing + _logToStep( + 'Chunk ${result.chunkIndex + 1} done in $durationStr', + onStepUpdate, + ); + + // Update progress display + _logToStep( + '$completedChunks/${chunks.length} chunks ($totalSizeStr)', + onStepUpdate, + ); + } else { + final errorMsg = result.error ?? 'HTTP ${result.statusCode}'; + errors.add('Chunk ${result.chunkIndex + 1}: $errorMsg'); + _logToStep( + '❌ Chunk ${result.chunkIndex + 1} failed: $errorMsg', + onStepUpdate, + ); + } + } + + if (errors.isNotEmpty) { + throw Exception('Some chunks failed to upload: ${errors.join(', ')}'); + } + + _logToStep('✅ All ${chunks.length} chunks uploaded', onStepUpdate); + } finally { + uploader.close(); } + } + + /// Format bytes as human-readable string + String _formatBytes(int bytes) { + if (bytes < 1024) return '$bytes B'; + if (bytes < 1024 * 1024) return '${(bytes / 1024).toStringAsFixed(1)} KB'; + return '${(bytes / (1024 * 1024)).toStringAsFixed(1)} MB'; + } - _logToStep('✅ All missing parts uploaded', onStepUpdate); + /// Format duration as human-readable string + String _formatDuration(Duration d) { + if (d.inMilliseconds < 1000) return '${d.inMilliseconds}ms'; + if (d.inSeconds < 60) return '${(d.inMilliseconds / 1000).toStringAsFixed(1)}s'; + return '${d.inMinutes}m ${d.inSeconds % 60}s'; } /// Finalize launch