Skip to content

Streaming Patterns

RPC Dart treats streaming as a first-class citizen. The same contract-based API covers all gRPC patterns while giving you fine-grained control over serialisation, cancellation, and flow control.

Unary

Single request, single response. Returns Future<T> on both sides.

Server streaming

Single request, multiple responses. Returns Stream<T> to the caller.

Client streaming

Multiple requests, single response. Caller sends a Stream<T>.

Bidirectional

Full-duplex messaging. Both sides use Stream<T> concurrently.

class ChatContract extends RpcResponderContract {
ChatContract() : super('Chat');
@override
void setup() {
addServerStreamMethod<JoinRequest, ChatMessage>(
methodName: 'JoinRoom',
handler: _joinRoom,
);
addBidirectionalMethod<ClientEvent, ChatMessage>(
methodName: 'LiveChat',
handler: _liveChat,
);
}
Stream<ChatMessage> _joinRoom(JoinRequest request, {RpcContext? context}) {
return chatRooms.openStream(request.roomId, request.userId);
}
Stream<ChatMessage> _liveChat(
Stream<ClientEvent> events, {
RpcContext? context,
}) {
return liveChatBroker.connect(events, context: context);
}
}

On the caller side you get symmetrical helpers:

class ChatCaller extends RpcCallerContract {
ChatCaller(RpcCallerEndpoint endpoint) : super('Chat', endpoint);
Stream<ChatMessage> joinRoom(String roomId, String userId) {
return callServerStream<JoinRequest, ChatMessage>(
methodName: 'JoinRoom',
request: JoinRequest(roomId, userId),
);
}
Stream<ChatMessage> liveChat(Stream<ClientEvent> events) {
return callBidirectionalStream<ClientEvent, ChatMessage>(
methodName: 'LiveChat',
requests: events,
);
}
}

Any RpcContext passed to streaming calls is propagated to processors. When the deadline expires or the cancellation token is triggered the caller and responder terminate the stream gracefully.

Managing server-side fan-out with StreamDistributor

Section titled “Managing server-side fan-out with StreamDistributor”

StreamDistributor<T> simplifies building broadcast or multi-subscriber channels on top of streaming RPCs. Declare events that implement IRpcSerializable so they work with both zero-copy and codec-based transports.

final distributor = StreamDistributor<ChatMessage>();
Stream<ChatMessage> connectClient(String clientId) {
return distributor.getOrCreateClientStream(clientId);
}
void publishToRoom(ChatMessage message) {
distributor.publish(message, metadata: {
'roomId': message.roomId,
'sender': message.senderId,
});
}

Key capabilities:

  • Automatic cleanup of inactive client streams.
  • Targeted delivery via publishFiltered to specific clients.
  • Metrics via distributor.metrics (total/current streams, total messages, average message size).
  • Optional logging hooks for visibility.

The RpcInMemoryTransport makes it trivial to test streaming handlers without a network stack:

final (clientTransport, serverTransport) = RpcInMemoryTransport.pair();
final responder = RpcResponderEndpoint(transport: serverTransport)
..registerServiceContract(ChatContract())
..start();
final caller = RpcCallerEndpoint(transport: clientTransport);
final chat = ChatCaller(caller);
final stream = chat.joinRoom('general', 'u-42');
expectLater(stream, emitsThrough(predicate<ChatMessage>((msg) => msg.roomId == 'general')));

Because the in-memory transport supports zero-copy, the exact objects your handler emits are the ones received by the client—perfect for verifying complex state transitions without serialisation noise.

Streaming RPCs let you build collaborative apps, background workers, and live updates without switching frameworks. RPC Dart keeps the API ergonomic while letting you opt into advanced features only when you need them.