Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
302 changes: 256 additions & 46 deletions lib/services/launch_service.dart
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,158 @@ import 'package:thirds/blake3.dart';
import '../models/project.dart';
import 'task_service.dart';

/// Parallel uploader with multiple HTTP clients for concurrent uploads.
class ParallelUploader {
final List<http.Client> _clients;
final String _baseUrl;
final int _concurrency;
int _clientIdx = 0;

ParallelUploader._(this._clients, this._baseUrl, this._concurrency);

/// Create a new parallel uploader.
static ParallelUploader create(String baseUrl, {int concurrency = 3}) {
final clients = List.generate(concurrency, (_) => http.Client());
return ParallelUploader._(clients, baseUrl, concurrency);
}

/// Get the next client using round-robin selection.
http.Client _nextClient() {
final client = _clients[_clientIdx % _clients.length];
_clientIdx++;
return client;
}

/// The base URL for uploads.
String get baseUrl => _baseUrl;

/// Upload chunks in parallel with controlled concurrency.
/// Returns a stream of results as each chunk completes.
Stream<_ChunkUploadResult> uploadChunksParallel({
required List<List<Uint8List>> chunks,
required Future<Uint8List> Function(List<Uint8List> chunk) encodeChunk,
required String url,
}) async* {
// Semaphore for concurrency control
final semaphore = _Semaphore(_concurrency);
final results = <Future<_ChunkUploadResult>>[];

for (var i = 0; i < chunks.length; i++) {
final chunkIndex = i;
final chunk = chunks[i];

final future = () async {
await semaphore.acquire();
final stopwatch = Stopwatch()..start();
try {
final client = _nextClient();
final compressedBody = await encodeChunk(chunk);

final response = await client
.post(
Uri.parse(url),
headers: {
'Content-Type': 'application/cbor+gzip',
'Accept': 'application/cbor+gzip',
},
body: compressedBody,
).timeout(const Duration(minutes: 10));

stopwatch.stop();
return _ChunkUploadResult(
chunkIndex: chunkIndex,
success: response.statusCode == 200,
statusCode: response.statusCode,
responseBody: response.bodyBytes,
duration: stopwatch.elapsed,
);
} catch (e) {
stopwatch.stop();
return _ChunkUploadResult(
chunkIndex: chunkIndex,
success: false,
statusCode: 0,
error: e.toString(),
duration: stopwatch.elapsed,
);
} finally {
semaphore.release();
}
}();

results.add(future);
}

// Yield results as they complete
for (final future in results) {
yield await future;
}
}

/// Make a single request using round-robin client selection.
Future<http.Response> post(
String url, {
required Map<String, String> headers,
required Uint8List body,
}) {
final client = _nextClient();
return client.post(Uri.parse(url), headers: headers, body: body);
}

/// Close all clients.
void close() {
for (final client in _clients) {
client.close();
}
}
}

class _ChunkUploadResult {
final int chunkIndex;
final bool success;
final int statusCode;
final Uint8List? responseBody;
final String? error;
final Duration duration;

_ChunkUploadResult({
required this.chunkIndex,
required this.success,
required this.statusCode,
required this.duration,
this.responseBody,
this.error,
});
}

/// Simple semaphore for concurrency control.
class _Semaphore {
final int _maxCount;
int _currentCount = 0;
final _waitQueue = <Completer<void>>[];

_Semaphore(this._maxCount);

Future<void> acquire() async {
if (_currentCount < _maxCount) {
_currentCount++;
return;
}
final completer = Completer<void>();
_waitQueue.add(completer);
await completer.future;
}

void release() {
if (_waitQueue.isNotEmpty) {
final completer = _waitQueue.removeAt(0);
completer.complete();
} else {
_currentCount--;
}
}
}

class LaunchService {
final TaskService _taskService = TaskService();
static const String _apiPrefix = '/api/0';
Expand Down Expand Up @@ -97,7 +249,7 @@ class LaunchService {
return _cancellationTokens[launchId] == true;
}

/// Make an HTTP request with timeout and cancellation support
/// Make an HTTP request with cancellation support
Future<http.Response> _makeRequest(
String launchId,
Future<http.Response> Function() requestFn,
Expand All @@ -107,7 +259,11 @@ class LaunchService {
throw Exception('Launch cancelled');
}

final responseFuture = requestFn();
// Add 10 minute timeout to prevent hanging forever on stalled connections
final responseFuture = requestFn().timeout(
const Duration(minutes: 10),
onTimeout: () => throw TimeoutException('Request timed out'),
);

// Poll for cancellation while waiting for response
final cancelCheckFuture = Future.doWhile(() async {
Expand All @@ -119,21 +275,11 @@ class LaunchService {
});

try {
// Race between response, timeout, and cancellation
final response =
await Future.any([
responseFuture,
cancelCheckFuture.then(
(_) => throw Exception('Launch cancelled'),
),
]).timeout(
_httpTimeout,
onTimeout: () {
throw TimeoutException(
'Request timed out after ${_httpTimeout.inSeconds} seconds',
);
},
);
// Race between response and cancellation
final response = await Future.any([
responseFuture,
cancelCheckFuture.then((_) => throw Exception('Launch cancelled')),
]);

return response;
} catch (e) {
Expand Down Expand Up @@ -1063,7 +1209,8 @@ class LaunchService {
);
}

/// Upload missing parts with chunking (like xmit)
/// Upload missing parts with chunking and parallel uploads (like xmit)
/// Uses DNS resolution to discover multiple IPs and round-robins across them.
Future<void> _uploadMissingParts(
String launchId,
String url,
Expand Down Expand Up @@ -1115,46 +1262,109 @@ class LaunchService {
chunks.add(currentChunk);
}

// Create parallel uploader with IP discovery and round-robin
final uploader = ParallelUploader.create(url, concurrency: 3);

// Calculate total size for display
final totalBytes = toUpload.fold<int>(0, (sum, f) => sum + f.length);
final totalSizeStr = _formatBytes(totalBytes);

_logToStep(
'Uploading ${toUpload.length} parts in ${chunks.length} chunk(s)',
'0/${chunks.length} chunks ($totalSizeStr)',
onStepUpdate,
);

// Upload each chunk
for (var i = 0; i < chunks.length; i++) {
if (_isCancelled(launchId)) {
throw Exception('Upload cancelled');
try {
// Encode chunk function for the parallel uploader
Future<Uint8List> encodeChunk(List<Uint8List> chunk) async {
final chunkBytes = chunk.map((bytes) => CborBytes(bytes)).toList();
return _encodeRequest(
{
5: CborString(domain),
7: CborList(chunkBytes),
},
authKey,
teamId,
);
}

final chunk = chunks[i];
final chunkBytes = chunk.map((bytes) => CborBytes(bytes)).toList();
// Upload chunks in parallel
var completedChunks = 0;
final errors = <String>[];

_logToStep(
'Uploading chunk ${i + 1}/${chunks.length} (${chunk.length} parts)',
onStepUpdate,
);
await for (final result in uploader.uploadChunksParallel(
chunks: chunks,
encodeChunk: encodeChunk,
url: url,
)) {
if (_isCancelled(launchId)) {
throw Exception('Upload cancelled');
}

final compressed = await _encodeRequest(
{
5: CborString(domain),
7: CborList(chunkBytes),
},
authKey,
teamId,
);
completedChunks++;

final (decoded, _) = await _makeApiRequest(launchId, url, compressed);
if (result.success && result.responseBody != null) {
// Decompress and decode response to log any messages
try {
final decompressed = gzip.decode(result.responseBody!);
final decoded = cbor.decode(decompressed) as CborMap;

// Log any response messages
_logResponseMessages(
errors: _parseCborStringList(decoded, 2),
warnings: _parseCborStringList(decoded, 3),
messages: _parseCborStringList(decoded, 4),
onStepUpdate: onStepUpdate,
);
_logResponseMessages(
errors: _parseCborStringList(decoded, 2),
warnings: _parseCborStringList(decoded, 3),
messages: _parseCborStringList(decoded, 4),
onStepUpdate: onStepUpdate,
);
} catch (_) {
// Ignore decode errors for response logging
}

// Format duration
final durationStr = _formatDuration(result.duration);

// Log completed chunk with timing
_logToStep(
'Chunk ${result.chunkIndex + 1} done in $durationStr',
onStepUpdate,
);

// Update progress display
_logToStep(
'$completedChunks/${chunks.length} chunks ($totalSizeStr)',
onStepUpdate,
);
} else {
final errorMsg = result.error ?? 'HTTP ${result.statusCode}';
errors.add('Chunk ${result.chunkIndex + 1}: $errorMsg');
_logToStep(
'❌ Chunk ${result.chunkIndex + 1} failed: $errorMsg',
onStepUpdate,
);
}
}

if (errors.isNotEmpty) {
throw Exception('Some chunks failed to upload: ${errors.join(', ')}');
}

_logToStep('✅ All ${chunks.length} chunks uploaded', onStepUpdate);
} finally {
uploader.close();
}
}

/// Format bytes as human-readable string
String _formatBytes(int bytes) {
if (bytes < 1024) return '$bytes B';
if (bytes < 1024 * 1024) return '${(bytes / 1024).toStringAsFixed(1)} KB';
return '${(bytes / (1024 * 1024)).toStringAsFixed(1)} MB';
}

_logToStep('✅ All missing parts uploaded', onStepUpdate);
/// Format duration as human-readable string
String _formatDuration(Duration d) {
if (d.inMilliseconds < 1000) return '${d.inMilliseconds}ms';
if (d.inSeconds < 60) return '${(d.inMilliseconds / 1000).toStringAsFixed(1)}s';
return '${d.inMinutes}m ${d.inSeconds % 60}s';
}

/// Finalize launch
Expand Down
Loading