diff --git a/BACKLOG.md b/BACKLOG.md index 8b871d9..93c62f3 100644 --- a/BACKLOG.md +++ b/BACKLOG.md @@ -251,26 +251,6 @@ Each item: --- -## SCTP / Data Channel - -> Same 2026-05-29 audit; **unverified leads.** - -### DataChannel close doesn't send SCTP stream reset (RECONFIG) - -- **Found:** 2026-05-29, RFC/W3C divergence audit. **Unverified.** -- **Detail:** RFC 6525 / RFC 8831 §6.7. `DataChannel.close()` flips local state - only; no `OUTGOING_SSN_RESET_REQUEST` is sent, and the SCTP layer doesn't - parse the `reconfig` chunk (declared but unhandled). Also the W3C `closing` - state is effectively skipped (set + cleared in one tick; `onclosing` never - fires). - [dart/lib/sctp/state_machine.dart](dart/lib/sctp/state_machine.dart), - [dart/lib/peer_connection/data_channel.dart](dart/lib/peer_connection/data_channel.dart). -- **Why deferred:** Needs RECONFIG send + parse + handler and an E2E close test. -- **Acceptance:** Close initiates a stream reset, awaits the peer's reset, - fires `onclosing` then `onclose`; RECONFIG round-trips against Chrome. - ---- - ## RTP / RTCP / SDP > Same 2026-05-29 audit; **unverified leads.** diff --git a/dart/lib/core/types.dart b/dart/lib/core/types.dart index d5786cd..139a572 100644 --- a/dart/lib/core/types.dart +++ b/dart/lib/core/types.dart @@ -56,6 +56,11 @@ final class SctpT3RtxToken extends TimerToken { SctpT3RtxToken(this.tsn); } +/// Retransmit timer for an outstanding RE-CONFIG request (RFC 6525 §5.1). +final class SctpReconfigToken extends TimerToken { + SctpReconfigToken(); +} + /// Fires the periodic STUN consent-freshness check on the selected pair /// (RFC 7675 §5.1) — also serves as the keepalive (§6). final class IceConsentToken extends TimerToken { diff --git a/dart/lib/peer_connection/data_channel.dart b/dart/lib/peer_connection/data_channel.dart index dc80065..42d4505 100644 --- a/dart/lib/peer_connection/data_channel.dart +++ b/dart/lib/peer_connection/data_channel.dart @@ -30,12 +30,18 @@ final class DataChannel { final _messageController = StreamController.broadcast(); final _openController = StreamController.broadcast(); + final _closingController = StreamController.broadcast(); final _closeController = StreamController.broadcast(); final _errorController = StreamController.broadcast(); // Callback set by PeerConnection to send data via SCTP. void Function(Uint8List data, {bool binary})? _sendCallback; + // Callback set by PeerConnection to initiate an SCTP stream reset when the + // channel is closed (RFC 8831 §6.7). Null until wired or when there is no + // association to reset. + void Function()? _closeCallback; + DataChannel({ required this.label, this.ordered = true, @@ -54,6 +60,10 @@ final class DataChannel { /// Fired when the channel opens. Stream get onOpen => _openController.stream; + /// Fired when the channel begins closing (W3C `onclosing`) — i.e. the SCTP + /// stream reset has been initiated but not yet completed. + Stream get onClosing => _closingController.stream; + /// Fired when the channel closes. Stream get onClose => _closeController.stream; @@ -78,9 +88,29 @@ final class DataChannel { _bytesSent += data.length; } + /// Begin closing the channel (W3C close procedure). Transitions to + /// `closing`, fires `onClosing`, and initiates the SCTP stream reset + /// (RFC 8831 §6.7). The transition to `closed` (and `onClose`) happens in + /// [_finalizeClose] once the reset completes. With no association to reset + /// (e.g. SCTP not established) it closes immediately. void close() { - if (_readyState == DataChannelState.closed) return; + if (_readyState == DataChannelState.closed || + _readyState == DataChannelState.closing) { + return; + } _readyState = DataChannelState.closing; + _closingController.add(null); + final cb = _closeCallback; + if (cb != null) { + cb(); + } else { + _finalizeClose(); + } + } + + /// Complete the close once the SCTP stream reset finishes. Idempotent. + void _finalizeClose() { + if (_readyState == DataChannelState.closed) return; _readyState = DataChannelState.closed; _closeController.add(null); _disposeControllers(); @@ -109,6 +139,7 @@ final class DataChannel { void _disposeControllers() { _messageController.close(); _openController.close(); + _closingController.close(); _closeController.close(); _errorController.close(); } diff --git a/dart/lib/peer_connection/peer_connection.dart b/dart/lib/peer_connection/peer_connection.dart index 07e0a9c..63b336e 100644 --- a/dart/lib/peer_connection/peer_connection.dart +++ b/dart/lib/peer_connection/peer_connection.dart @@ -461,6 +461,7 @@ final class PeerConnection { _transport.scheduleSctpTimeout(result.value.nextTimeout); } }; + channel._closeCallback = () => _resetSctpStream(id); _dataChannels[id] = channel; // Send DCEP OPEN when SCTP is established @@ -849,7 +850,9 @@ final class PeerConnection { _rtcpTimer?.cancel(); for (final r in _receivers.values) { r._close(); } _receivers.clear(); - for (final ch in _dataChannels.values) { ch.close(); } + // Tear down channels immediately — the whole transport is going away, so + // a graceful SCTP stream reset would never complete. + for (final ch in _dataChannels.values) { ch._finalizeClose(); } _dataChannels.clear(); await _transport.stop(); unawaited(_iceCandidateController.close()); @@ -915,6 +918,7 @@ final class PeerConnection { _sctp.onEstablished = _notifySctpEstablished; _sctp.onDataChannelOpen = _onRemoteDataChannelOpen; _sctp.onData = _onSctpData; + _sctp.onStreamReset = _onSctpStreamReset; _transport.attachIce(_ice); _transport.attachDtls(_dtls); @@ -1114,6 +1118,7 @@ final class PeerConnection { ordered: ordered, ppid: _dataChannelPpid(data, binary)); }; + channel._closeCallback = () => _resetSctpStream(streamId); channel._open(); _dataChannels[streamId] = channel; _dataChannelController.add(DataChannelEvent(channel)); @@ -1123,6 +1128,22 @@ final class PeerConnection { _dataChannels[streamId]?._deliverMessage(data, isBinary); } + /// Initiate an SCTP stream reset to close a data channel (RFC 8831 §6.7). + void _resetSctpStream(int streamId) { + final result = _sctp.resetStreams([streamId]); + if (result.isOk) { + for (final pkt in result.value.outputPackets) { + _transport.sendSctp(pkt.data); + } + _transport.scheduleSctpTimeout(result.value.nextTimeout); + } + } + + /// A stream reset completed (RFC 6525) — finalize the channel's close. + void _onSctpStreamReset(int streamId) { + _dataChannels[streamId]?._finalizeClose(); + } + /// SCTP PPID for a data-channel message (RFC 8831 §6.6). An empty /// message uses the "Empty" PPID so the SCTP layer can carry it as a /// single padding byte instead of an invalid zero-length DATA chunk. diff --git a/dart/lib/sctp/chunk.dart b/dart/lib/sctp/chunk.dart index 0757d1e..2ceb2b8 100644 --- a/dart/lib/sctp/chunk.dart +++ b/dart/lib/sctp/chunk.dart @@ -266,6 +266,119 @@ final class SctpShutdownCompleteChunk extends SctpChunk { Uint8List encode() => _wrapChunk(type, flags, Uint8List(0)); } +/// RE-CONFIG chunk (RFC 6525 §3.1) — carries one or two reconfiguration +/// parameters. Used by WebRTC data channels to reset (close) streams +/// (RFC 8831 §6.7). +final class SctpReconfigChunk extends SctpChunk { + final List parameters; + const SctpReconfigChunk(this.parameters) : super(SctpChunkType.reconfig, 0); + + @override + Uint8List encode() => + _wrapChunk(type, flags, _concatBytes([for (final p in parameters) p.encode()])); +} + +// ── RE-CONFIG parameters (RFC 6525 §4) ────────────────────────────────────────── + +/// RE-CONFIG parameter types (RFC 6525 §4). +abstract final class SctpReconfigParamType { + SctpReconfigParamType._(); + static const int outgoingSsnReset = 13; // 0x000D — §4.1 + static const int incomingSsnReset = 14; // 0x000E — §4.2 + static const int reconfigResponse = 16; // 0x0010 — §4.4 +} + +sealed class SctpReconfigParameter { + final int type; + const SctpReconfigParameter(this.type); + + /// The parameter value (everything after the 4-byte TLV header). + Uint8List encodeValue(); + + /// Encode as a TLV, padded to a 4-byte boundary (padding not counted in + /// the length field, RFC 6525 §4). + Uint8List encode() => _encodeTlv(type, encodeValue()); +} + +/// Outgoing SSN Reset Request Parameter (RFC 6525 §4.1) — asks the peer to +/// reset the sender's outgoing streams (i.e. the peer's incoming streams). +/// An empty [streams] list means "all streams". +final class SctpOutgoingSsnResetRequest extends SctpReconfigParameter { + final int requestSeq; + final int responseSeq; + final int lastAssignedTsn; + final List streams; + const SctpOutgoingSsnResetRequest({ + required this.requestSeq, + required this.responseSeq, + required this.lastAssignedTsn, + this.streams = const [], + }) : super(SctpReconfigParamType.outgoingSsnReset); + + @override + Uint8List encodeValue() { + final out = Uint8List(12 + streams.length * 2); + _writeU32(out, 0, requestSeq); + _writeU32(out, 4, responseSeq); + _writeU32(out, 8, lastAssignedTsn); + var offset = 12; + for (final s in streams) { + _writeU16(out, offset, s); + offset += 2; + } + return out; + } +} + +/// Incoming SSN Reset Request Parameter (RFC 6525 §4.2) — asks the peer to +/// reset its outgoing streams (our incoming). An empty [streams] list means +/// "all streams". +final class SctpIncomingSsnResetRequest extends SctpReconfigParameter { + final int requestSeq; + final List streams; + const SctpIncomingSsnResetRequest({ + required this.requestSeq, + this.streams = const [], + }) : super(SctpReconfigParamType.incomingSsnReset); + + @override + Uint8List encodeValue() { + final out = Uint8List(4 + streams.length * 2); + _writeU32(out, 0, requestSeq); + var offset = 4; + for (final s in streams) { + _writeU16(out, offset, s); + offset += 2; + } + return out; + } +} + +/// Re-configuration Response Parameter (RFC 6525 §4.4). +final class SctpReconfigResponse extends SctpReconfigParameter { + // Result codes (RFC 6525 §4.4). + static const int resultSuccessNop = 0; + static const int resultSuccessPerformed = 1; + static const int resultDenied = 2; + static const int resultErrorWrongSsn = 3; + static const int resultErrorRequestInProgress = 4; + static const int resultErrorBadSequence = 5; + static const int resultInProgress = 6; + + final int responseSeq; + final int result; + const SctpReconfigResponse({required this.responseSeq, required this.result}) + : super(SctpReconfigParamType.reconfigResponse); + + @override + Uint8List encodeValue() { + final out = Uint8List(8); + _writeU32(out, 0, responseSeq); + _writeU32(out, 4, result); + return out; + } +} + // ── Parameters ──────────────────────────────────────────────────────────────── sealed class SctpParameter { @@ -383,11 +496,61 @@ SctpChunk? _parseChunk(int type, int flags, Uint8List body) { return const SctpShutdownAckChunk(); case SctpChunkType.shutdownComplete: return const SctpShutdownCompleteChunk(); + case SctpChunkType.reconfig: + return SctpReconfigChunk(_parseReconfigParams(body)); default: return null; } } +List _parseReconfigParams(Uint8List body) { + final params = []; + var offset = 0; + while (offset + 4 <= body.length) { + final type = _u16(body, offset); + final len = _u16(body, offset + 2); + if (len < 4 || offset + len > body.length) break; // malformed + final end = offset + len; + switch (type) { + case SctpReconfigParamType.outgoingSsnReset: + if (len >= 16) { + final streams = []; + for (var o = offset + 16; o + 2 <= end; o += 2) { + streams.add(_u16(body, o)); + } + params.add(SctpOutgoingSsnResetRequest( + requestSeq: _u32(body, offset + 4), + responseSeq: _u32(body, offset + 8), + lastAssignedTsn: _u32(body, offset + 12), + streams: streams, + )); + } + case SctpReconfigParamType.incomingSsnReset: + if (len >= 8) { + final streams = []; + for (var o = offset + 8; o + 2 <= end; o += 2) { + streams.add(_u16(body, o)); + } + params.add(SctpIncomingSsnResetRequest( + requestSeq: _u32(body, offset + 4), + streams: streams, + )); + } + case SctpReconfigParamType.reconfigResponse: + if (len >= 12) { + params.add(SctpReconfigResponse( + responseSeq: _u32(body, offset + 4), + result: _u32(body, offset + 8), + )); + } + default: + break; // ignore unknown reconfig parameters + } + offset += (len + 3) & ~3; + } + return params; +} + Uint8List _extractCookie(Uint8List params) { var offset = 0; while (offset + 4 <= params.length) { @@ -419,28 +582,29 @@ Uint8List _wrapChunk(int type, int flags, Uint8List body) { return out; } -Uint8List _encodeParams(List params) { - final parts = []; - for (final p in params) { - final val = p.encodeValue(); - final len = 4 + val.length; - final padded = (len + 3) & ~3; - final out = Uint8List(padded); - out[0] = (p.type >> 8) & 0xFF; - out[1] = p.type & 0xFF; - out[2] = (len >> 8) & 0xFF; - out[3] = len & 0xFF; - out.setRange(4, 4 + val.length, val); - parts.add(out); - } - final total = parts.fold(0, (s, p) => s + p.length); - final result = Uint8List(total); +Uint8List _encodeParams(List params) => + _concatBytes([for (final p in params) _encodeTlv(p.type, p.encodeValue())]); + +/// Encode a `type`/`length`/`value` parameter, padded to a 4-byte boundary +/// (padding is not counted in the length field — RFC 4960 §3.2.1, RFC 6525 §4). +Uint8List _encodeTlv(int type, Uint8List value) { + final len = 4 + value.length; + final out = Uint8List((len + 3) & ~3); + _writeU16(out, 0, type); + _writeU16(out, 2, len); + out.setRange(4, 4 + value.length, value); + return out; +} + +Uint8List _concatBytes(List parts) { + final total = parts.fold(0, (s, p) => s + p.length); + final out = Uint8List(total); var offset = 0; for (final p in parts) { - result.setRange(offset, offset + p.length, p); + out.setRange(offset, offset + p.length, p); offset += p.length; } - return result; + return out; } void _writeU16(Uint8List d, int o, int v) { diff --git a/dart/lib/sctp/state_machine.dart b/dart/lib/sctp/state_machine.dart index 91e7c30..573c29b 100644 --- a/dart/lib/sctp/state_machine.dart +++ b/dart/lib/sctp/state_machine.dart @@ -45,6 +45,37 @@ final class SctpStateMachine implements ProtocolStateMachine { // Retransmission queue keyed by TSN final Map _retransmitQueue = {}; + // ── Stream reconfiguration (RFC 6525 / RFC 8831 §6.7) ────────────────────── + // Our next outgoing Re-config Request Sequence Number. + int _reconfigReqSeq = 0; + // The request sequence currently awaiting a response (null = none in flight). + int? _outstandingReqSeq; + List _outstandingStreams = const []; + SctpReconfigChunk? _outstandingReconfigChunk; + int _reconfigRetransmitCount = 0; + static const int _maxReconfigRetransmit = 10; + + /// Parameters advertised in every INIT / INIT-ACK: ForwardTSN (RFC 3758) + /// and the Supported Extensions list (RFC 5061 §4.2.7) enabling RE-CONFIG + /// stream resets (RFC 6525). Without the latter Chrome's dcSCTP rejects + /// close() with UNSUPPORTED_OPERATION. Both handshake sides advertise these + /// identically. + static const List _handshakeParameters = [ + SctpForwardTsnSupportedParameter(), + SctpSupportedExtensionsParameter( + [SctpChunkType.reconfig, SctpChunkType.forwardTsn]), + ]; + // Last peer request sequence we processed (for duplicate detection). + int _lastRemoteReqSeq = 0; + // Cached response to the last request, resent if the peer retransmits. + SctpReconfigChunk? _lastReconfigResponseChunk; + // Streams already reported via [onStreamReset] (fire once per stream). + final Set _resetNotified = {}; + // Streams whose outgoing side we have already reset. + final Set _localOutgoingResetDone = {}; + // Streams queued for an outgoing reset (sent one request at a time). + final Set _pendingResetStreams = {}; + int _retransmitCount = 0; // Set when we yield to a simultaneous-open INIT — prevents the peer's // INIT-ACK (for our abandoned INIT) from overwriting remote tags. @@ -77,6 +108,10 @@ final class SctpStateMachine implements ProtocolStateMachine { /// Called when data arrives on a stream. void Function(int streamId, Uint8List data, bool isBinary)? onData; + /// Called when a stream has been reset (RFC 6525) — i.e. the corresponding + /// data channel has closed. Fired once per stream. + void Function(int streamId)? onStreamReset; + SctpStateMachine({bool isClient = true}) : _isClient = isClient; /// Update the SCTP role (must be called before connect/processInput). @@ -251,6 +286,9 @@ final class SctpStateMachine implements ProtocolStateMachine { if (token is SctpT3RtxToken) { return _retransmit(token.tsn); } + if (token is SctpReconfigToken) { + return _retransmitReconfig(); + } return const Ok(ProcessResult.empty); } @@ -270,6 +308,7 @@ final class SctpStateMachine implements ProtocolStateMachine { case SctpShutdownChunk(): return _handleShutdown(chunk); case SctpShutdownAckChunk(): return _handleShutdownAck(); case SctpShutdownCompleteChunk(): return _handleShutdownComplete(); + case SctpReconfigChunk(): return _handleReconfig(chunk); } } @@ -326,6 +365,7 @@ final class SctpStateMachine implements ProtocolStateMachine { numInboundStreams: 1024, initialTsn: _localInitialTsn, cookie: cookie, + parameters: _handshakeParameters, ); _state = SctpState.cookieWait; // server waits for COOKIE-ECHO return Ok(ProcessResult(outputPackets: [_buildPacket([ack])])); @@ -362,7 +402,10 @@ final class SctpStateMachine implements ProtocolStateMachine { // paths may complete). final wasEstablished = _state == SctpState.established; _state = SctpState.established; - if (!wasEstablished) onEstablished?.call(); + if (!wasEstablished) { + _initReconfigState(); + onEstablished?.call(); + } final ack = const SctpCookieAckChunk(); return Ok(ProcessResult(outputPackets: [_buildPacket([ack])])); } @@ -370,6 +413,7 @@ final class SctpStateMachine implements ProtocolStateMachine { Result _handleCookieAck() { if (_state != SctpState.cookieEchoed) { return const Ok(ProcessResult.empty); } _state = SctpState.established; + _initReconfigState(); onEstablished?.call(); return const Ok(ProcessResult.empty); } @@ -596,6 +640,180 @@ final class SctpStateMachine implements ProtocolStateMachine { return const Ok(ProcessResult.empty); } + // ── Stream reconfiguration (RFC 6525 / RFC 8831 §6.7) ────────────────────── + + /// Initialise RE-CONFIG sequence numbers once the association is up. The + /// initial Re-config Request Sequence Number is the initial TSN (RFC 6525 + /// §4.1); the expected first remote request sequence is the peer's. + void _initReconfigState() { + _reconfigReqSeq = _localInitialTsn; + _lastRemoteReqSeq = (_remoteInitialTsn - 1) & 0xFFFFFFFF; + } + + /// Reset (close) the given outgoing streams (RFC 8831 §6.7). Sends an + /// Outgoing SSN Reset Request; each stream's reset is reported via + /// [onStreamReset] once it completes in both directions. + Result resetStreams(List streamIds) { + if (_state != SctpState.established) { + return Err(const StateError('SCTP: not established')); + } + _queueOutgoingReset(streamIds); + return Ok(_maybeSendReconfig()); + } + + /// Queue [streams] for an outgoing reset, skipping any already reset or in + /// flight (so a peer-triggered reset can't loop back on itself). + void _queueOutgoingReset(Iterable streams) { + for (final s in streams) { + if (!_localOutgoingResetDone.contains(s) && + !_outstandingStreams.contains(s)) { + _pendingResetStreams.add(s); + } + } + } + + /// Send a RE-CONFIG for any queued streams, if no request is in flight + /// (RFC 6525 allows only one outstanding outgoing reset request). Returns + /// the packet(s) to emit, or [ProcessResult.empty] when nothing is sent. + ProcessResult _maybeSendReconfig() { + if (_outstandingReqSeq != null || _pendingResetStreams.isEmpty) { + return ProcessResult.empty; + } + final streams = _pendingResetStreams.toList()..sort(); + _pendingResetStreams.clear(); + final reqSeq = _reconfigReqSeq; + _reconfigReqSeq = (_reconfigReqSeq + 1) & 0xFFFFFFFF; + final chunk = SctpReconfigChunk([ + SctpOutgoingSsnResetRequest( + requestSeq: reqSeq, + responseSeq: _lastRemoteReqSeq, + lastAssignedTsn: (_localTsn - 1) & 0xFFFFFFFF, + streams: streams, + ), + ]); + _outstandingReqSeq = reqSeq; + _outstandingStreams = streams; + _outstandingReconfigChunk = chunk; + _reconfigRetransmitCount = 0; + return ProcessResult( + outputPackets: [_buildPacket([chunk])], + nextTimeout: _reconfigTimeout(0), + ); + } + + Timeout _reconfigTimeout(int retransmitCount) => Timeout( + at: DateTime.now().add(Duration( + milliseconds: (_t3RtxMs * (1 << retransmitCount)).clamp(0, 60000))), + token: SctpReconfigToken(), + ); + + Result _handleReconfig(SctpReconfigChunk chunk) { + final out = []; + Timeout? timeout; + for (final param in chunk.parameters) { + final ProcessResult r; + switch (param) { + case SctpOutgoingSsnResetRequest(): + r = _handleOutgoingResetRequest(param); + case SctpReconfigResponse(): + r = _handleReconfigResponse(param); + case SctpIncomingSsnResetRequest(): + // Peer asks us to reset our outgoing streams. + _queueOutgoingReset(param.streams); + r = _maybeSendReconfig(); + } + out.addAll(r.outputPackets); + timeout = r.nextTimeout ?? timeout; + } + return Ok(ProcessResult(outputPackets: out, nextTimeout: timeout)); + } + + /// Handle a peer's Outgoing SSN Reset Request — reset our incoming state + /// for the listed streams, respond, and (for a full data-channel close) + /// reset our own outgoing side too. + ProcessResult _handleOutgoingResetRequest(SctpOutgoingSsnResetRequest req) { + // Duplicate request → resend the cached response. + if (req.requestSeq == _lastRemoteReqSeq && _lastReconfigResponseChunk != null) { + return ProcessResult( + outputPackets: [_buildPacket([_lastReconfigResponseChunk!])]); + } + + // Empty stream list means "all streams" (RFC 6525 §4.1). + final streams = req.streams.isNotEmpty + ? req.streams + : {..._recvSsn.keys, ..._channelLabels.keys}.toList(); + for (final s in streams) { + _recvSsn.remove(s); + _reassemblyBuffer.remove(s); + _notifyStreamReset(s); + } + _lastRemoteReqSeq = req.requestSeq; + + final response = SctpReconfigChunk([ + SctpReconfigResponse( + responseSeq: req.requestSeq, + result: SctpReconfigResponse.resultSuccessPerformed, + ), + ]); + _lastReconfigResponseChunk = response; + + // Reset our outgoing side for these streams too, so the channel closes + // both ways (_queueOutgoingReset skips streams already reset or in + // flight, avoiding a reset loop). + _queueOutgoingReset(streams); + final sent = _maybeSendReconfig(); + return ProcessResult( + outputPackets: [_buildPacket([response]), ...sent.outputPackets], + nextTimeout: sent.nextTimeout, + ); + } + + /// Handle a Re-config Response to our outstanding outgoing reset request. + ProcessResult _handleReconfigResponse(SctpReconfigResponse resp) { + if (_outstandingReqSeq == null || resp.responseSeq != _outstandingReqSeq) { + return ProcessResult.empty; + } + // The peer is still working on it — keep the retransmit timer running. + if (resp.result == SctpReconfigResponse.resultInProgress) { + return ProcessResult.empty; + } + // Success (or a terminal error): consider the outgoing reset finished so + // the channel never hangs. + _completeOutstandingReset(); + return _maybeSendReconfig(); + } + + void _completeOutstandingReset() { + for (final s in _outstandingStreams) { + _sendSsn.remove(s); + _localOutgoingResetDone.add(s); + _notifyStreamReset(s); + } + _outstandingReqSeq = null; + _outstandingStreams = const []; + _outstandingReconfigChunk = null; + _reconfigRetransmitCount = 0; + } + + void _notifyStreamReset(int streamId) { + if (_resetNotified.add(streamId)) { onStreamReset?.call(streamId); } + } + + Result _retransmitReconfig() { + final chunk = _outstandingReconfigChunk; + if (chunk == null) { return const Ok(ProcessResult.empty); } + if (_reconfigRetransmitCount >= _maxReconfigRetransmit) { + // Give up after Max.Retrans so the channel doesn't hang forever. + _completeOutstandingReset(); + return const Ok(ProcessResult.empty); + } + _reconfigRetransmitCount++; + return Ok(ProcessResult( + outputPackets: [_buildPacket([chunk])], + nextTimeout: _reconfigTimeout(_reconfigRetransmitCount), + )); + } + // ── Init / connect ──────────────────────────────────────────────────────── Result _sendInit() { @@ -606,7 +824,7 @@ final class SctpStateMachine implements ProtocolStateMachine { numOutboundStreams: 1024, numInboundStreams: 1024, initialTsn: _localInitialTsn, - parameters: const [SctpForwardTsnSupportedParameter()], + parameters: _handshakeParameters, ); final timeout = Timeout( at: DateTime.now().add(const Duration(milliseconds: 1000)), diff --git a/dart/test/e2e/e2e_test.dart b/dart/test/e2e/e2e_test.dart index 3c7617a..59c5fb6 100644 --- a/dart/test/e2e/e2e_test.dart +++ b/dart/test/e2e/e2e_test.dart @@ -154,6 +154,57 @@ void main() { final received = await browserState(d, 'receivedCount'); expect(received, greaterThanOrEqualTo(2)); }); + + test('webdartc closes the data channel → both peers observe onclose ' + '(RFC 6525 stream reset)', () async { + final d = driver!; + final sig = sigServer!; + + final url = + 'http://127.0.0.1:$htmlPort/?port=${sig.port}' + '&role=answerer&scenario=data'; + await d.navigateTo(url); + + await waitFor( + () async => await browserState(d, 'ready') == true, + timeout: const Duration(seconds: 10), + ); + + // webdartc offerer: send echoes, then close the channel and wait for + // its own onClose (which only fires once the SCTP stream reset + // round-trips with Chrome). + final offererFuture = _runWebdartcOfferer(sig.port, closeChannel: true); + + try { + await waitFor( + () async { + final v = await browserState(d, 'iceState'); + return v == 'connected' || v == 'completed'; + }, + timeout: const Duration(seconds: 30), + interval: const Duration(seconds: 3), + ); + } catch (e) { + _printChromeLog(); + rethrow; + } + + await waitFor( + () async => await browserState(d, 'dcOpen') == true, + timeout: const Duration(seconds: 15), + ); + + // webdartc peer fires onClose → the helper exits 0. + await offererFuture.timeout(const Duration(seconds: 30)); + + // Chrome peer must observe its data channel closing too (its onclose + // handler sets dcClosed). Give the RE-CONFIG response a moment to land. + await waitFor( + () async => await browserState(d, 'dcClosed') == true, + timeout: const Duration(seconds: 10), + ); + expect(await browserState(d, 'dcClosed'), isTrue); + }); }); // ── Scenario 1b: Data channel (Chrome offerer ↔ webdartc answerer) ──────── @@ -1085,7 +1136,8 @@ Future _runWebdartcEcho(int signalingPort) async { /// Runs the webdartc offerer as a subprocess, streaming stderr to the console. /// Returns when the exchange is complete (exit 0) or throws on failure. -Future _runWebdartcOfferer(int signalingPort, {int timeoutSec = 30}) async { +Future _runWebdartcOfferer(int signalingPort, + {int timeoutSec = 30, bool closeChannel = false}) async { final proc = await Process.start( Platform.resolvedExecutable, [ @@ -1093,6 +1145,7 @@ Future _runWebdartcOfferer(int signalingPort, {int timeoutSec = 30}) async 'test/e2e/webdartc_offerer_helper.dart', '--port=$signalingPort', '--timeout=$timeoutSec', + if (closeChannel) '--close-dc', ], environment: {...Platform.environment, 'WEBDARTC_DEBUG': '1'}, ); diff --git a/dart/test/e2e/webdartc_offerer_helper.dart b/dart/test/e2e/webdartc_offerer_helper.dart index b4297c0..fc4cbb3 100644 --- a/dart/test/e2e/webdartc_offerer_helper.dart +++ b/dart/test/e2e/webdartc_offerer_helper.dart @@ -166,19 +166,22 @@ final class _WsClient { void main(List args) async { int port = 8080; int timeoutSec = 30; + var closeChannel = false; for (final arg in args) { if (arg.startsWith('--port=')) { port = int.parse(arg.substring('--port='.length)); } else if (arg.startsWith('--timeout=')) { timeoutSec = int.parse(arg.substring('--timeout='.length)); + } else if (arg == '--close-dc') { + closeChannel = true; } } - final exitCode = await _run(port, timeoutSec: timeoutSec); + final exitCode = await _run(port, timeoutSec: timeoutSec, closeChannel: closeChannel); exit(exitCode); } -Future _run(int sigPort, {int timeoutSec = 30}) async { +Future _run(int sigPort, {int timeoutSec = 30, bool closeChannel = false}) async { final ws = await _WsClient.connect(sigPort); ws.sendJson({'type': 'register', 'role': 'offerer'}); @@ -195,6 +198,15 @@ Future _run(int sigPort, {int timeoutSec = 30}) async { var binaryEchoed = false; final done = Completer(); + // When --close-dc is set, after both echoes the offerer closes the data + // channel and waits for its own onClose to fire (RFC 8831 §6.7). onClose + // only fires once the SCTP stream reset round-trips with Chrome, so a + // clean exit 0 proves both peers tore the channel down. + dc.onClose.listen((_) { + stdout.writeln('[offerer] DataChannel onClose fired (stream reset complete)'); + if (!done.isCompleted) done.complete(0); + }); + // ICE candidates → relay. pc.onIceCandidate.listen((evt) { stderr.writeln('[offerer] local ICE candidate: ${evt.candidate}'); @@ -254,8 +266,13 @@ Future _run(int sigPort, {int timeoutSec = 30}) async { } } if (textEchoed && binaryEchoed && !done.isCompleted) { - stdout.writeln('[offerer] PASS'); - done.complete(0); + if (closeChannel) { + stdout.writeln('[offerer] echoes OK — closing data channel'); + dc.close(); // exit 0 deferred to dc.onClose (stream reset complete) + } else { + stdout.writeln('[offerer] PASS'); + done.complete(0); + } } }); diff --git a/dart/test/peer_connection/data_channel_test.dart b/dart/test/peer_connection/data_channel_test.dart index e6f68dd..e02ceb8 100644 --- a/dart/test/peer_connection/data_channel_test.dart +++ b/dart/test/peer_connection/data_channel_test.dart @@ -34,4 +34,36 @@ void main() { expect(evt.text, contains('B')); }); }); + + group('DataChannel.close lifecycle', () { + test('with no SCTP association: closing → closed, both events fire', + () async { + final dc = DataChannel(label: 'x', id: 0); + final events = []; + dc.onClosing.listen((_) => events.add('closing')); + dc.onClose.listen((_) => events.add('close')); + + expect(dc.readyState, DataChannelState.connecting); + dc.close(); + // No close callback wired → the close finalizes immediately. + expect(dc.readyState, DataChannelState.closed); + + await Future.delayed(Duration.zero); + expect(events, ['closing', 'close']); + }); + + test('close() is idempotent', () { + final dc = DataChannel(label: 'x', id: 0); + dc.close(); + expect(dc.readyState, DataChannelState.closed); + expect(dc.close, returnsNormally); // second call is a no-op + expect(dc.readyState, DataChannelState.closed); + }); + + test('send after close throws', () { + final dc = DataChannel(label: 'x', id: 0); + dc.close(); + expect(() => dc.send('hi'), throwsStateError); + }); + }); } diff --git a/dart/test/sctp/reconfig_test.dart b/dart/test/sctp/reconfig_test.dart new file mode 100644 index 0000000..d9a21de --- /dev/null +++ b/dart/test/sctp/reconfig_test.dart @@ -0,0 +1,149 @@ +import 'dart:typed_data'; + +import 'package:test/test.dart'; +import 'package:webdartc/webdartc.dart'; +import 'package:webdartc/sctp/chunk.dart'; + +import 'sctp_test_helpers.dart'; + +void main() { + group('RE-CONFIG chunk (RFC 6525)', () { + test('Outgoing SSN Reset Request round-trips', () { + final chunk = SctpReconfigChunk([ + const SctpOutgoingSsnResetRequest( + requestSeq: 0x11223344, + responseSeq: 0x55667788, + lastAssignedTsn: 0x99AABBCC, + streams: [3, 5, 7], + ), + ]); + + final parsed = parseChunks(chunk.encode(), 0); + expect(parsed, hasLength(1)); + final rc = parsed.first as SctpReconfigChunk; + expect(rc.type, 0x82); + final p = rc.parameters.single as SctpOutgoingSsnResetRequest; + expect(p.requestSeq, 0x11223344); + expect(p.responseSeq, 0x55667788); + expect(p.lastAssignedTsn, 0x99AABBCC); + expect(p.streams, [3, 5, 7]); + }); + + test('Outgoing request with no streams means "all streams"', () { + final chunk = SctpReconfigChunk([ + const SctpOutgoingSsnResetRequest( + requestSeq: 1, + responseSeq: 0, + lastAssignedTsn: 0, + ), + ]); + final rc = parseChunks(chunk.encode(), 0).single as SctpReconfigChunk; + final p = rc.parameters.single as SctpOutgoingSsnResetRequest; + expect(p.streams, isEmpty); + }); + + test('Re-config Response round-trips', () { + final chunk = SctpReconfigChunk([ + const SctpReconfigResponse( + responseSeq: 0xDEADBEEF, + result: SctpReconfigResponse.resultSuccessPerformed, + ), + ]); + final rc = parseChunks(chunk.encode(), 0).single as SctpReconfigChunk; + final p = rc.parameters.single as SctpReconfigResponse; + expect(p.responseSeq, 0xDEADBEEF); + expect(p.result, SctpReconfigResponse.resultSuccessPerformed); + }); + + test('two parameters in one chunk round-trip', () { + final chunk = SctpReconfigChunk([ + const SctpReconfigResponse(responseSeq: 7, result: 1), + const SctpOutgoingSsnResetRequest( + requestSeq: 8, responseSeq: 7, lastAssignedTsn: 9, streams: [2]), + ]); + final rc = parseChunks(chunk.encode(), 0).single as SctpReconfigChunk; + expect(rc.parameters, hasLength(2)); + expect(rc.parameters[0], isA()); + expect(rc.parameters[1], isA()); + }); + }); + + group('SctpStateMachine stream reset', () { + test('resetStreams fails before the association is established', () { + final sctp = SctpStateMachine(isClient: true); + expect(sctp.resetStreams([0]).isErr, isTrue); + }); + + test('reset round-trips and both peers observe the stream reset', () { + final (client, server) = establishSctpPair(); + + final clientResets = []; + final serverResets = []; + client.onStreamReset = clientResets.add; + server.onStreamReset = serverResets.add; + + // Client closes stream 0 → emits an Outgoing SSN Reset Request. + final start = client.resetStreams([0]); + expect(start.isOk, isTrue); + final firstPkt = start.value.outputPackets.single; + expect(firstPkt.data[12], 0x82, reason: 'RE-CONFIG chunk type'); + + // Drive every resulting packet between the two peers until quiescent. + _pump(client, server, start.value.outputPackets); + + expect(clientResets, [0]); + expect(serverResets, [0]); + }); + + test('a duplicate reset request is answered without re-firing the reset', + () { + final (client, server) = establishSctpPair(); + final serverResets = []; + server.onStreamReset = serverResets.add; + + final start = client.resetStreams([0]); + final reconfig = start.value.outputPackets.single.data; + final ip = IpAddress.parse('127.0.0.1'); + + // Deliver the same request twice. + final first = + server.processInput(reconfig, remoteIp: ip, remotePort: 5000); + final second = + server.processInput(reconfig, remoteIp: ip, remotePort: 5000); + + expect(first.isOk, isTrue); + expect(second.isOk, isTrue); + // Both produce a response packet… + expect(first.value.outputPackets, isNotEmpty); + expect(second.value.outputPackets, isNotEmpty); + // …but the reset is reported exactly once. + expect(serverResets, [0]); + }); + }); +} + +/// Feed every output packet from one peer to the other, recursively, until no +/// peer produces further output. +void _pump( + SctpStateMachine client, + SctpStateMachine server, + List initial, +) { + final ip = IpAddress.parse('127.0.0.1'); + // Worklist of (receiver, bytes). `initial` came from the client, so it is + // destined for the server. + final queue = <(SctpStateMachine, Uint8List)>[ + for (final p in initial) (server, p.data), + ]; + var guard = 0; + while (queue.isNotEmpty) { + if (guard++ > 100) { fail('reset did not converge (possible loop)'); } + final (to, data) = queue.removeAt(0); + final r = to.processInput(data, remoteIp: ip, remotePort: 5000); + expect(r.isOk, isTrue); + final other = identical(to, client) ? server : client; + for (final p in r.value.outputPackets) { + queue.add((other, p.data)); + } + } +} diff --git a/dart/test/sctp/sctp_test.dart b/dart/test/sctp/sctp_test.dart index 83f6e3f..7778777 100644 --- a/dart/test/sctp/sctp_test.dart +++ b/dart/test/sctp/sctp_test.dart @@ -3,6 +3,8 @@ import 'dart:typed_data'; import 'package:test/test.dart'; import 'package:webdartc/webdartc.dart'; +import 'sctp_test_helpers.dart'; + void main() { group('SctpStateMachine', () { test('connect sends INIT packet', () { @@ -77,7 +79,7 @@ void main() { // RFC 8831 §6.6: an empty application message rides as a single // padding byte with a "WebRTC {String,Binary} Empty" PPID — a // zero-length SCTP DATA chunk is illegal (RFC 9260). - final (client, server) = _establish(); + final (client, server) = establishSctpPair(); final delivered = <(Uint8List, bool)>[]; server.onData = (int _, Uint8List data, bool isBinary) => delivered.add((data, isBinary)); @@ -123,27 +125,3 @@ void main() { }); }); } - -/// Run the four-way SCTP handshake and return both ends in the -/// `established` state (client at port 5000, server at 5001). -(SctpStateMachine, SctpStateMachine) _establish() { - final client = SctpStateMachine(isClient: true); - final server = SctpStateMachine(isClient: false); - final clientIp = IpAddress.parse('127.0.0.1'); - final serverIp = IpAddress.parse('127.0.0.1'); - const clientPort = 5000; - const serverPort = 5001; - - final init = client.connect(remoteIp: serverIp, remotePort: serverPort); - final initAck = server.processInput(init.value.outputPackets.first.data, - remoteIp: clientIp, remotePort: clientPort); - final cookieEcho = client.processInput(initAck.value.outputPackets.first.data, - remoteIp: serverIp, remotePort: serverPort); - final cookieAck = server.processInput( - cookieEcho.value.outputPackets.first.data, - remoteIp: clientIp, - remotePort: clientPort); - client.processInput(cookieAck.value.outputPackets.first.data, - remoteIp: serverIp, remotePort: serverPort); - return (client, server); -} diff --git a/dart/test/sctp/sctp_test_helpers.dart b/dart/test/sctp/sctp_test_helpers.dart new file mode 100644 index 0000000..39c4855 --- /dev/null +++ b/dart/test/sctp/sctp_test_helpers.dart @@ -0,0 +1,24 @@ +import 'package:webdartc/webdartc.dart'; + +/// Drive two [SctpStateMachine]s through the full INIT / INIT-ACK / +/// COOKIE-ECHO / COOKIE-ACK handshake and return the established pair. +(SctpStateMachine, SctpStateMachine) establishSctpPair() { + final client = SctpStateMachine(isClient: true); + final server = SctpStateMachine(isClient: false); + final ip = IpAddress.parse('127.0.0.1'); + const clientPort = 5000; + const serverPort = 5001; + + final init = client.connect(remoteIp: ip, remotePort: serverPort); + final initAck = server.processInput(init.value.outputPackets.first.data, + remoteIp: ip, remotePort: clientPort); + final cookieEcho = client.processInput(initAck.value.outputPackets.first.data, + remoteIp: ip, remotePort: serverPort); + final cookieAck = server.processInput( + cookieEcho.value.outputPackets.first.data, + remoteIp: ip, + remotePort: clientPort); + client.processInput(cookieAck.value.outputPackets.first.data, + remoteIp: ip, remotePort: serverPort); + return (client, server); +}