Перейти к содержимому

Custom Transport Toolkit

Это содержимое пока не доступно на вашем языке.

RPC Dart is intentionally transport-neutral. When the built-in transports are not enough you can use the toolkit found in rpc_dart/src/rpc/transports/transport_toolkit.dart to create bespoke transports without rewriting the protocol layer.

RpcBaseTransport implements IRpcTransport and handles the heavy lifting:

  • Stream ID management via RpcStreamIdManager (including client/server semantics and reuse).
  • Tracking active streams and releasing resources when streams end.
  • Broadcasting incoming messages to listeners through incomingMessages.
  • Default implementations of close(), health(), and reconnect().

You only implement how messages are sent to the underlying medium.

class UnixSocketTransport extends RpcBaseTransport {
UnixSocketTransport(this._socket)
: super(isClient: true, logger: RpcLogger('UnixSocketTransport')) {
_listen();
}
final StreamChannel<List<int>> _socket;
void _listen() {
_socket.stream.listen((frame) {
final parsed = RpcTransportUtils.parseLengthPrefixedFrame(frame);
if (parsed == null) return;
addIncomingMessage(RpcTransportMessage(
payload: Uint8List.fromList(parsed),
streamId: generateStreamId(), // or decode from frame headers
));
});
}
@override
Future<void> sendMetadata(int streamId, RpcMetadata metadata, {bool endStream = false}) async {
final headers = RpcTransportUtils.metadataToHeaders(metadata);
final frame = RpcTransportUtils.createLengthPrefixedFrame(
utf8.encode(jsonEncode({'streamId': streamId, 'headers': headers})),
);
_socket.sink.add(frame);
}
@override
Future<void> sendMessage(int streamId, Uint8List data, {bool endStream = false}) async {
final frame = RpcTransportUtils.createLengthPrefixedFrame(data);
_socket.sink.add(frame);
}
@override
Future<void> finishSending(int streamId) async {
// Optional: send FIN frame or mark completion in your protocol
}
}

addIncomingMessage pushes data into the RPC runtime once you decode it from your protocol. You can also override supportsZeroCopy to expose zero-copy capabilities when appropriate.

The toolkit ships with mixins that you can opt into:

  • RpcZeroCopySupport — exposes sendDirectObject so zero-copy contracts work with your transport. Perfect for shared-memory or isolate-based transports.
  • RpcAutoReconnect — implements exponential backoff reconnection logic via attemptReconnect() and performReconnect().
  • RpcMessageBuffering — queues outgoing messages while the connection is down and flushes them when it recovers.

Mixins compose naturally:

class ResilientWebSocketTransport extends RpcBaseTransport
with RpcAutoReconnect, RpcMessageBuffering {
ResilientWebSocketTransport(this._connect)
: super(isClient: true, logger: RpcLogger('WsTransport'));
final Future<WebSocket> Function() _connect;
WebSocket? _socket;
Future<void> connect() async {
_socket = await _connect();
_socket!.listen(_handleFrame, onDone: () => attemptReconnect());
resetReconnectAttempts();
}
@override
Future<void> performReconnect() async {
await connect();
}
void _handleFrame(dynamic frame) {
if (frame is! List<int>) return;
final parsed = RpcTransportUtils.parseWebSocketFrame(frame);
if (parsed == null) return;
addIncomingMessage(RpcTransportMessage(
payload: Uint8List.fromList(parsed['data'] as List<int>),
streamId: parsed['streamId'] as int,
));
}
@override
Future<void> sendMessage(int streamId, Uint8List data, {bool endStream = false}) async {
if (_socket == null) {
await super.sendMessage(streamId, data, endStream: endStream); // buffers until ready
return;
}
final frame = RpcTransportUtils.createWebSocketFrame(streamId, data);
_socket!.add(frame);
}
@override
Future<void> sendMetadata(int streamId, RpcMetadata metadata, {bool endStream = false}) async {
// Convert metadata to headers and reuse sendMessage
final headers = jsonEncode(RpcTransportUtils.metadataToHeaders(metadata));
await sendMessage(streamId, Uint8List.fromList(utf8.encode(headers)), endStream: endStream);
}
@override
Future<void> finishSending(int streamId) async {
_socket?.add(RpcTransportUtils.createWebSocketFrame(streamId, const []));
}
}

RpcTransportUtils includes helpers for:

  • Creating and parsing length-prefixed frames.
  • Converting metadata between header maps and RpcMetadata.
  • Building WebSocket frames with embedded stream IDs.

These utilities keep the hot path lean so you can focus on your transport-specific logic.

RpcBaseTransport.close() releases active stream IDs, shuts down incoming controllers, and calls onClose() for transport-specific cleanup. Override onClose() to dispose sockets or timers.

Health checks are built in—health() reports whether the transport is closed and how many streams are active, while reconnect() provides a default implementation that signals unsupported behaviour. Mix it with RpcAutoReconnect to attempt actual reconnection.

With the transport toolkit you can integrate RPC Dart with bespoke protocols (Unix sockets, named pipes, message queues) while keeping contract code and endpoints untouched.