Custom Transport Toolkit
Это содержимое пока не доступно на вашем языке.
RPC Dart is intentionally transport-neutral. When the built-in transports are not
enough you can use the toolkit found in rpc_dart/src/rpc/transports/transport_toolkit.dart
to create bespoke transports without rewriting the protocol layer.
RpcBaseTransport essentials
Section titled “RpcBaseTransport essentials”RpcBaseTransport
implements IRpcTransport
and handles the heavy lifting:
- Stream ID management via
RpcStreamIdManager
(including client/server semantics and reuse). - Tracking active streams and releasing resources when streams end.
- Broadcasting incoming messages to listeners through
incomingMessages
. - Default implementations of
close()
,health()
, andreconnect()
.
You only implement how messages are sent to the underlying medium.
class UnixSocketTransport extends RpcBaseTransport { UnixSocketTransport(this._socket) : super(isClient: true, logger: RpcLogger('UnixSocketTransport')) { _listen(); }
final StreamChannel<List<int>> _socket;
void _listen() { _socket.stream.listen((frame) { final parsed = RpcTransportUtils.parseLengthPrefixedFrame(frame); if (parsed == null) return;
addIncomingMessage(RpcTransportMessage( payload: Uint8List.fromList(parsed), streamId: generateStreamId(), // or decode from frame headers )); }); }
@override Future<void> sendMetadata(int streamId, RpcMetadata metadata, {bool endStream = false}) async { final headers = RpcTransportUtils.metadataToHeaders(metadata); final frame = RpcTransportUtils.createLengthPrefixedFrame( utf8.encode(jsonEncode({'streamId': streamId, 'headers': headers})), ); _socket.sink.add(frame); }
@override Future<void> sendMessage(int streamId, Uint8List data, {bool endStream = false}) async { final frame = RpcTransportUtils.createLengthPrefixedFrame(data); _socket.sink.add(frame); }
@override Future<void> finishSending(int streamId) async { // Optional: send FIN frame or mark completion in your protocol }}
addIncomingMessage
pushes data into the RPC runtime once you decode it from
your protocol. You can also override supportsZeroCopy
to expose zero-copy
capabilities when appropriate.
Mixins for advanced behaviour
Section titled “Mixins for advanced behaviour”The toolkit ships with mixins that you can opt into:
RpcZeroCopySupport
— exposessendDirectObject
so zero-copy contracts work with your transport. Perfect for shared-memory or isolate-based transports.RpcAutoReconnect
— implements exponential backoff reconnection logic viaattemptReconnect()
andperformReconnect()
.RpcMessageBuffering
— queues outgoing messages while the connection is down and flushes them when it recovers.
Mixins compose naturally:
class ResilientWebSocketTransport extends RpcBaseTransport with RpcAutoReconnect, RpcMessageBuffering { ResilientWebSocketTransport(this._connect) : super(isClient: true, logger: RpcLogger('WsTransport'));
final Future<WebSocket> Function() _connect; WebSocket? _socket;
Future<void> connect() async { _socket = await _connect(); _socket!.listen(_handleFrame, onDone: () => attemptReconnect()); resetReconnectAttempts(); }
@override Future<void> performReconnect() async { await connect(); }
void _handleFrame(dynamic frame) { if (frame is! List<int>) return; final parsed = RpcTransportUtils.parseWebSocketFrame(frame); if (parsed == null) return;
addIncomingMessage(RpcTransportMessage( payload: Uint8List.fromList(parsed['data'] as List<int>), streamId: parsed['streamId'] as int, )); }
@override Future<void> sendMessage(int streamId, Uint8List data, {bool endStream = false}) async { if (_socket == null) { await super.sendMessage(streamId, data, endStream: endStream); // buffers until ready return; }
final frame = RpcTransportUtils.createWebSocketFrame(streamId, data); _socket!.add(frame); }
@override Future<void> sendMetadata(int streamId, RpcMetadata metadata, {bool endStream = false}) async { // Convert metadata to headers and reuse sendMessage final headers = jsonEncode(RpcTransportUtils.metadataToHeaders(metadata)); await sendMessage(streamId, Uint8List.fromList(utf8.encode(headers)), endStream: endStream); }
@override Future<void> finishSending(int streamId) async { _socket?.add(RpcTransportUtils.createWebSocketFrame(streamId, const [])); }}
Utility helpers
Section titled “Utility helpers”RpcTransportUtils
includes helpers for:
- Creating and parsing length-prefixed frames.
- Converting metadata between header maps and
RpcMetadata
. - Building WebSocket frames with embedded stream IDs.
These utilities keep the hot path lean so you can focus on your transport-specific logic.
Health and lifecycle hooks
Section titled “Health and lifecycle hooks”RpcBaseTransport.close()
releases active stream IDs, shuts down incoming
controllers, and calls onClose()
for transport-specific cleanup. Override
onClose()
to dispose sockets or timers.
Health checks are built in—health()
reports whether the transport is closed and
how many streams are active, while reconnect()
provides a default
implementation that signals unsupported behaviour. Mix it with
RpcAutoReconnect
to attempt actual reconnection.
With the transport toolkit you can integrate RPC Dart with bespoke protocols (Unix sockets, named pipes, message queues) while keeping contract code and endpoints untouched.