Custom Transport Toolkit¶
RPC Dart is intentionally transport-neutral. When the built-in transports are not
enough you can use the toolkit found in rpc_dart/src/rpc/transports/transport_toolkit.dart
to create bespoke transports without rewriting the protocol layer.
RpcBaseTransport essentials¶
RpcBaseTransport implements IRpcTransport and handles the heavy lifting:
- Stream ID management via
RpcStreamIdManager(including client/server semantics and reuse). - Tracking active streams and releasing resources when streams end.
- Broadcasting incoming messages to listeners through
incomingMessages. - Default implementations of
close(),health(), andreconnect().
You only implement how messages are sent to the underlying medium.
class UnixSocketTransport extends RpcBaseTransport {
UnixSocketTransport(this._socket)
: super(isClient: true, logger: RpcLogger('UnixSocketTransport')) {
_listen();
}
final StreamChannel<List<int>> _socket;
void _listen() {
_socket.stream.listen((frame) {
final parsed = RpcTransportUtils.parseLengthPrefixedFrame(frame);
if (parsed == null) return;
addIncomingMessage(RpcTransportMessage(
payload: Uint8List.fromList(parsed),
streamId: generateStreamId(), // or decode from frame headers
));
});
}
@override
Future<void> sendMetadata(int streamId, RpcMetadata metadata, {bool endStream = false}) async {
final headers = RpcTransportUtils.metadataToHeaders(metadata);
final frame = RpcTransportUtils.createLengthPrefixedFrame(
utf8.encode(jsonEncode({'streamId': streamId, 'headers': headers})),
);
_socket.sink.add(frame);
}
@override
Future<void> sendMessage(int streamId, Uint8List data, {bool endStream = false}) async {
final frame = RpcTransportUtils.createLengthPrefixedFrame(data);
_socket.sink.add(frame);
}
@override
Future<void> finishSending(int streamId) async {
// Optional: send FIN frame or mark completion in your protocol
}
}
addIncomingMessage pushes data into the RPC runtime once you decode it from
your protocol. You can also override supportsZeroCopy to expose zero-copy
capabilities when appropriate.
Mixins for advanced behaviour¶
The toolkit ships with mixins that you can opt into:
RpcZeroCopySupport— exposessendDirectObjectso zero-copy contracts work with your transport. Perfect for shared-memory or isolate-based transports.RpcAutoReconnect— implements exponential backoff reconnection logic viaattemptReconnect()andperformReconnect().RpcMessageBuffering— queues outgoing messages while the connection is down and flushes them when it recovers.
Mixins compose naturally:
class ResilientWebSocketTransport extends RpcBaseTransport
with RpcAutoReconnect, RpcMessageBuffering {
ResilientWebSocketTransport(this._connect)
: super(isClient: true, logger: RpcLogger('WsTransport'));
final Future<WebSocket> Function() _connect;
WebSocket? _socket;
Future<void> connect() async {
_socket = await _connect();
_socket!.listen(_handleFrame, onDone: () => attemptReconnect());
resetReconnectAttempts();
}
@override
Future<void> performReconnect() async {
await connect();
}
void _handleFrame(dynamic frame) {
if (frame is! List<int>) return;
final parsed = RpcTransportUtils.parseWebSocketFrame(frame);
if (parsed == null) return;
addIncomingMessage(RpcTransportMessage(
payload: Uint8List.fromList(parsed['data'] as List<int>),
streamId: parsed['streamId'] as int,
));
}
@override
Future<void> sendMessage(int streamId, Uint8List data, {bool endStream = false}) async {
if (_socket == null) {
await super.sendMessage(streamId, data, endStream: endStream); // buffers until ready
return;
}
final frame = RpcTransportUtils.createWebSocketFrame(streamId, data);
_socket!.add(frame);
}
@override
Future<void> sendMetadata(int streamId, RpcMetadata metadata, {bool endStream = false}) async {
// Convert metadata to headers and reuse sendMessage
final headers = jsonEncode(RpcTransportUtils.metadataToHeaders(metadata));
await sendMessage(streamId, Uint8List.fromList(utf8.encode(headers)), endStream: endStream);
}
@override
Future<void> finishSending(int streamId) async {
_socket?.add(RpcTransportUtils.createWebSocketFrame(streamId, const []));
}
}
Utility helpers¶
RpcTransportUtils includes helpers for:
- Creating and parsing length-prefixed frames.
- Converting metadata between header maps and
RpcMetadata. - Building WebSocket frames with embedded stream IDs.
These utilities keep the hot path lean so you can focus on your transport-specific logic.
Health and lifecycle hooks¶
RpcBaseTransport.close() releases active stream IDs, shuts down incoming
controllers, and calls onClose() for transport-specific cleanup. Override
onClose() to dispose sockets or timers.
Health checks are built in—health() reports whether the transport is closed and
how many streams are active, while reconnect() provides a default
implementation that signals unsupported behaviour. Mix it with
RpcAutoReconnect to attempt actual reconnection.
With the transport toolkit you can integrate RPC Dart with bespoke protocols (Unix sockets, named pipes, message queues) while keeping contract code and endpoints untouched.