Unary
Single request, single response. Returns Future<T>
on both sides.
RPC Dart treats streaming as a first-class citizen. The same contract-based API covers all gRPC patterns while giving you fine-grained control over serialisation, cancellation, and flow control.
Unary
Single request, single response. Returns Future<T>
on both sides.
Server streaming
Single request, multiple responses. Returns Stream<T>
to the caller.
Client streaming
Multiple requests, single response. Caller sends a Stream<T>
.
Bidirectional
Full-duplex messaging. Both sides use Stream<T>
concurrently.
class ChatContract extends RpcResponderContract { ChatContract() : super('Chat');
@override void setup() { addServerStreamMethod<JoinRequest, ChatMessage>( methodName: 'JoinRoom', handler: _joinRoom, );
addBidirectionalMethod<ClientEvent, ChatMessage>( methodName: 'LiveChat', handler: _liveChat, ); }
Stream<ChatMessage> _joinRoom(JoinRequest request, {RpcContext? context}) { return chatRooms.openStream(request.roomId, request.userId); }
Stream<ChatMessage> _liveChat( Stream<ClientEvent> events, { RpcContext? context, }) { return liveChatBroker.connect(events, context: context); }}
On the caller side you get symmetrical helpers:
class ChatCaller extends RpcCallerContract { ChatCaller(RpcCallerEndpoint endpoint) : super('Chat', endpoint);
Stream<ChatMessage> joinRoom(String roomId, String userId) { return callServerStream<JoinRequest, ChatMessage>( methodName: 'JoinRoom', request: JoinRequest(roomId, userId), ); }
Stream<ChatMessage> liveChat(Stream<ClientEvent> events) { return callBidirectionalStream<ClientEvent, ChatMessage>( methodName: 'LiveChat', requests: events, ); }}
Any RpcContext
passed to streaming calls is propagated to processors. When
the deadline expires or the cancellation token is triggered the caller and
responder terminate the stream gracefully.
Streams use Dart’s async primitives, so the responder honours pause/resume
signals from the caller. The built-in processors buffer minimally, letting
you compose with StreamTransformer
s for custom flow control.
When you omit codecs in add*StreamMethod
and the transport supports
supportsZeroCopy
, RPC Dart forwards objects by reference even for streams –
ideal for high-frequency in-process messaging.
StreamDistributor<T>
simplifies building broadcast or multi-subscriber
channels on top of streaming RPCs. Declare events that implement
IRpcSerializable
so they work with both zero-copy and codec-based transports.
final distributor = StreamDistributor<ChatMessage>();
Stream<ChatMessage> connectClient(String clientId) { return distributor.getOrCreateClientStream(clientId);}
void publishToRoom(ChatMessage message) { distributor.publish(message, metadata: { 'roomId': message.roomId, 'sender': message.senderId, });}
Key capabilities:
publishFiltered
to specific clients.distributor.metrics
(total/current streams, total messages, average message size).The RpcInMemoryTransport
makes it trivial to test streaming handlers without a
network stack:
final (clientTransport, serverTransport) = RpcInMemoryTransport.pair();final responder = RpcResponderEndpoint(transport: serverTransport) ..registerServiceContract(ChatContract()) ..start();final caller = RpcCallerEndpoint(transport: clientTransport);
final chat = ChatCaller(caller);final stream = chat.joinRoom('general', 'u-42');
expectLater(stream, emitsThrough(predicate<ChatMessage>((msg) => msg.roomId == 'general')));
Because the in-memory transport supports zero-copy, the exact objects your handler emits are the ones received by the client—perfect for verifying complex state transitions without serialisation noise.
Streaming RPCs let you build collaborative apps, background workers, and live updates without switching frameworks. RPC Dart keeps the API ergonomic while letting you opt into advanced features only when you need them.