Real-time Communication
Full-duplex communication with instant message delivery, perfect for chat applications, live updates, and collaborative tools.
The WebSocket Transport provides real-time, bidirectional communication between RPC Dart clients and servers. It’s ideal for web applications, real-time systems, and scenarios where persistent connections are preferred over request-response patterns.
Real-time Communication
Full-duplex communication with instant message delivery, perfect for chat applications, live updates, and collaborative tools.
Cross-platform Support
Works seamlessly in browsers, Flutter apps, and server applications with consistent behavior across platforms.
Connection Management
Automatic reconnection, heartbeat monitoring, and graceful degradation for robust network handling.
Stream Support
Native support for all RPC streaming patterns with efficient message multiplexing over a single connection.
WebSocket transport enables real-time communication in both directions:
// Server can push updates to clientsclass NotificationService extends RpcResponderContract { final StreamController<NotificationEvent> _notifications = StreamController.broadcast();
NotificationService() : super('NotificationService');
@override void setup() { addServerStreamMethod<Empty, NotificationEvent>( methodName: 'subscribe', handler: _subscribe, );
addUnaryMethod<NotificationRequest, Empty>( methodName: 'notify', handler: _notify, ); }
Stream<NotificationEvent> _subscribe(Empty request, {RpcContext? context}) { return _notifications.stream; }
Future<Empty> _notify(NotificationRequest request, {RpcContext? context}) async { _notifications.add(NotificationEvent( type: request.type, message: request.message, timestamp: DateTime.now(), )); return Empty(); }}
Unlike HTTP requests, WebSocket connections remain open:
// Connection stays alive for multiple operationsfinal transport = RpcWebSocketTransport( url: 'ws://localhost:8080/rpc', options: WebSocketTransportOptions( heartbeatInterval: Duration(seconds: 30), reconnectInterval: Duration(seconds: 5), maxReconnectAttempts: 10, ),);
final caller = RpcCallerEndpoint(transport: transport);
// Multiple calls over the same connectionfor (int i = 0; i < 100; i++) { final result = await service.process(DataRequest(id: i)); print('Processed: ${result.id}');}
Setting up a WebSocket server for RPC Dart:
import 'dart:io';import 'package:rpc_dart/rpc_dart.dart';import 'package:rpc_dart_transports/rpc_dart_transports.dart';
class WebSocketRpcServer { late HttpServer _server; final List<RpcResponderEndpoint> _endpoints = [];
Future<void> start({ String host = 'localhost', int port = 8080, String path = '/rpc', }) async { _server = await HttpServer.bind(host, port); print('WebSocket RPC Server listening on ws://$host:$port$path');
await for (HttpRequest request in _server) { if (request.uri.path == path) { await _handleWebSocketConnection(request); } else { request.response.statusCode = HttpStatus.notFound; await request.response.close(); } } }
Future<void> _handleWebSocketConnection(HttpRequest request) async { try { final socket = await WebSocketTransformer.upgrade(request); final transport = RpcWebSocketServerTransport(socket);
final endpoint = RpcResponderEndpoint(transport: transport);
// Register your service contracts endpoint.registerServiceContract(CalculatorResponder()); endpoint.registerServiceContract(NotificationService()); endpoint.registerServiceContract(FileService());
_endpoints.add(endpoint);
// Start handling RPC calls await endpoint.start();
// Clean up when connection closes socket.done.then((_) { _endpoints.remove(endpoint); endpoint.close(); });
} catch (e) { print('Error handling WebSocket connection: $e'); request.response.statusCode = HttpStatus.badRequest; await request.response.close(); } }
Future<void> stop() async { await _server.close(); await Future.wait(_endpoints.map((e) => e.close())); _endpoints.clear(); }}
class CalculatorResponder extends RpcResponderContract { CalculatorResponder() : super('Calculator');
@override void setup() { addUnaryMethod<MathRequest, MathResponse>( methodName: 'add', handler: _add, );
addServerStreamMethod<RangeRequest, NumberValue>( methodName: 'generateRange', handler: _generateRange, );
addClientStreamMethod<NumberValue, SumResponse>( methodName: 'sum', handler: _sum, );
addBidirectionalStreamMethod<NumberValue, ProcessedNumber>( methodName: 'processNumbers', handler: _processNumbers, ); }
Future<MathResponse> _add(MathRequest request, {RpcContext? context}) async { return MathResponse(result: request.a + request.b); }
Stream<NumberValue> _generateRange(RangeRequest request, {RpcContext? context}) async* { for (int i = request.start; i <= request.end; i++) { yield NumberValue(value: i); await Future.delayed(Duration(milliseconds: 100)); } }
Future<SumResponse> _sum(Stream<NumberValue> numbers, {RpcContext? context}) async { double sum = 0; await for (final number in numbers) { sum += number.value; } return SumResponse(total: sum); }
Stream<ProcessedNumber> _processNumbers( Stream<NumberValue> numbers, {RpcContext? context} ) async* { await for (final number in numbers) { yield ProcessedNumber( original: number.value, squared: number.value * number.value, timestamp: DateTime.now(), ); } }}
void main() async { final server = WebSocketRpcServer();
// Handle graceful shutdown ProcessSignal.sigint.watch().listen((_) async { print('Shutting down server...'); await server.stop(); exit(0); });
// Start the server await server.start( host: 'localhost', port: 8080, path: '/rpc', );}
Connecting to a WebSocket RPC server:
import 'package:rpc_dart/rpc_dart.dart';import 'package:rpc_dart_transports/rpc_dart_transports.dart';
void main() async { // Create WebSocket transport final transport = RpcWebSocketTransport( url: 'ws://localhost:8080/rpc', options: WebSocketTransportOptions( heartbeatInterval: Duration(seconds: 30), reconnectInterval: Duration(seconds: 5), maxReconnectAttempts: 10, connectTimeout: Duration(seconds: 10), ), );
// Create caller endpoint final caller = RpcCallerEndpoint(transport: transport); final calculator = CalculatorCaller(caller);
try { // Wait for connection await transport.connect();
// Make RPC calls final sum = await calculator.add(MathRequest(a: 10, b: 5)); print('10 + 5 = ${sum.result}');
// Use streaming methods await for (final number in calculator.generateRange(RangeRequest(start: 1, end: 5))) { print('Generated: ${number.value}'); }
} finally { await caller.close(); }}
Configure WebSocket transport behavior:
final transport = RpcWebSocketTransport( url: 'wss://secure-api.example.com/rpc', options: WebSocketTransportOptions( // Connection settings connectTimeout: Duration(seconds: 10), protocols: ['rpc-dart-v1'], headers: { 'Authorization': 'Bearer $token', 'User-Agent': 'RpcDartClient/1.0', },
// Heartbeat and reconnection heartbeatInterval: Duration(seconds: 30), reconnectInterval: Duration(seconds: 5), maxReconnectAttempts: 10, backoffMultiplier: 1.5,
// Message handling maxMessageSize: 1024 * 1024, // 1MB compression: CompressionOptions.defaultDeflate,
// Connection lifecycle onConnect: () => print('Connected to RPC server'), onDisconnect: (reason) => print('Disconnected: $reason'), onReconnect: (attempt) => print('Reconnecting (attempt $attempt)'), onError: (error) => print('Connection error: $error'), ),);
Perfect for applications requiring instant updates:
class ChatApplication { late RpcCallerEndpoint _caller; late ChatServiceCaller _chatService;
Future<void> connect(String serverUrl) async { final transport = RpcWebSocketTransport(url: serverUrl); _caller = RpcCallerEndpoint(transport: transport); _chatService = ChatServiceCaller(_caller);
await transport.connect(); }
Future<void> startChat(String roomId) async { // Subscribe to messages await for (final message in _chatService.subscribeToMessages(RoomRequest(roomId: roomId))) { displayMessage(message); } }
Future<void> sendMessage(String roomId, String text) async { await _chatService.sendMessage(SendMessageRequest( roomId: roomId, text: text, timestamp: DateTime.now(), )); }
void displayMessage(ChatMessage message) { print('${message.author}: ${message.text}'); }}
Stream real-time data efficiently:
class StockPriceMonitor { late RpcCallerEndpoint _caller; late MarketDataCaller _marketData;
Future<void> monitorStocks(List<String> symbols) async { final transport = RpcWebSocketTransport(url: 'wss://market-data.example.com/rpc'); _caller = RpcCallerEndpoint(transport: transport); _marketData = MarketDataCaller(_caller);
await transport.connect();
// Subscribe to price updates final subscription = _marketData.subscribeToPrices(SymbolListRequest(symbols: symbols));
await for (final priceUpdate in subscription) { updateUI(priceUpdate); } }
void updateUI(PriceUpdate update) { print('${update.symbol}: \$${update.price} (${update.change > 0 ? '+' : ''}${update.change})'); }}
Enable real-time collaboration:
class CollaborativeEditor { late RpcCallerEndpoint _caller; late EditorServiceCaller _editorService;
Future<void> joinDocument(String documentId) async { final transport = RpcWebSocketTransport(url: 'wss://collab.example.com/rpc'); _caller = RpcCallerEndpoint(transport: transport); _editorService = EditorServiceCaller(_caller);
await transport.connect();
// Subscribe to document changes final changes = _editorService.subscribeToChanges(DocumentRequest(documentId: documentId));
await for (final change in changes) { applyChange(change); } }
Future<void> makeEdit(String documentId, EditOperation operation) async { await _editorService.applyEdit(EditRequest( documentId: documentId, operation: operation, authorId: currentUserId, )); }
void applyChange(DocumentChange change) { // Apply the change to the local document editor.applyOperation(change.operation); showAuthorCursor(change.authorId, change.position); }}
WebSocket transport provides efficient streaming capabilities:
// Server pushes multiple responsesclass LogStreamService extends RpcResponderContract { LogStreamService() : super('LogStream');
@override void setup() { addServerStreamMethod<LogFilter, LogEntry>( methodName: 'streamLogs', handler: _streamLogs, ); }
Stream<LogEntry> _streamLogs(LogFilter filter, {RpcContext? context}) async* { final logFile = File('application.log'); final stream = logFile.openRead();
await for (final line in stream.transform(utf8.decoder).transform(LineSplitter())) { if (matchesFilter(line, filter)) { yield LogEntry( message: line, timestamp: DateTime.now(), level: extractLogLevel(line), ); } } }}
// Client consumptionawait for (final logEntry in logService.streamLogs(LogFilter(level: 'ERROR'))) { print('${logEntry.timestamp}: ${logEntry.message}');}
// Client sends multiple requestsclass BatchUploadService extends RpcResponderContract { BatchUploadService() : super('BatchUpload');
@override void setup() { addClientStreamMethod<FileChunk, UploadResult>( methodName: 'uploadFile', handler: _uploadFile, ); }
Future<UploadResult> _uploadFile(Stream<FileChunk> chunks, {RpcContext? context}) async { final buffer = BytesBuilder(); int totalChunks = 0;
await for (final chunk in chunks) { buffer.add(chunk.data); totalChunks++; }
final fileBytes = buffer.toBytes(); final filePath = await saveFile(fileBytes);
return UploadResult( filePath: filePath, size: fileBytes.length, chunks: totalChunks, ); }}
// Client usageStream<FileChunk> uploadFile(File file) async* { final bytes = await file.readAsBytes(); const chunkSize = 64 * 1024; // 64KB chunks
for (int i = 0; i < bytes.length; i += chunkSize) { final end = math.min(i + chunkSize, bytes.length); yield FileChunk( data: bytes.sublist(i, end), sequenceNumber: i ~/ chunkSize, ); }}
final result = await uploadService.uploadFile(uploadFile(myFile));
// Real-time processing with feedbackclass ProcessingService extends RpcResponderContract { ProcessingService() : super('Processing');
@override void setup() { addBidirectionalStreamMethod<ProcessRequest, ProcessResponse>( methodName: 'processStream', handler: _processStream, ); }
Stream<ProcessResponse> _processStream( Stream<ProcessRequest> requests, {RpcContext? context} ) async* { await for (final request in requests) { try { final result = await processData(request.data); yield ProcessResponse( id: request.id, success: true, result: result, ); } catch (e) { yield ProcessResponse( id: request.id, success: false, error: e.toString(), ); } } }}
WebSocket transport handles connection issues gracefully:
class ResilientClient { late RpcWebSocketTransport _transport; late RpcCallerEndpoint _caller; late ServiceCaller _service;
Future<void> connect(String url) async { _transport = RpcWebSocketTransport( url: url, options: WebSocketTransportOptions( reconnectInterval: Duration(seconds: 5), maxReconnectAttempts: 10, backoffMultiplier: 1.5, onReconnect: (attempt) { print('Reconnection attempt $attempt'); }, onConnect: () { print('Connected to server'); _onConnected(); }, onDisconnect: (reason) { print('Disconnected: $reason'); _onDisconnected(); }, ), );
_caller = RpcCallerEndpoint(transport: _transport); _service = ServiceCaller(_caller);
await _transport.connect(); }
void _onConnected() { // Resume operations _resumeSubscriptions(); }
void _onDisconnected() { // Handle disconnection _pauseOperations(); }
Future<void> _resumeSubscriptions() async { // Resubscribe to streams that were active await _subscribeToNotifications(); }
void _pauseOperations() { // Queue operations for retry _queueOperations = true; }}
class ConnectionMonitor { final RpcWebSocketTransport transport;
ConnectionMonitor(this.transport) { _setupMonitoring(); }
void _setupMonitoring() { transport.connectionState.listen((state) { switch (state) { case ConnectionState.connecting: showStatus('Connecting...'); break; case ConnectionState.connected: showStatus('Connected'); break; case ConnectionState.disconnected: showStatus('Disconnected'); break; case ConnectionState.reconnecting: showStatus('Reconnecting...'); break; case ConnectionState.failed: showStatus('Connection failed'); break; } }); }
void showStatus(String message) { print('Connection: $message'); }}
Use WSS for production:
final transport = RpcWebSocketTransport( url: 'wss://secure-api.example.com/rpc', options: WebSocketTransportOptions( headers: { 'Authorization': 'Bearer $authToken', }, // Additional security headers extraHeaders: { 'X-API-Key': apiKey, 'X-Client-Version': clientVersion, }, ),);
Implement proper authentication:
class AuthenticatedWebSocketClient { String? _authToken; late RpcWebSocketTransport _transport;
Future<void> authenticate(String username, String password) async { // Get auth token from auth service _authToken = await _getAuthToken(username, password);
// Create transport with auth headers _transport = RpcWebSocketTransport( url: 'wss://api.example.com/rpc', options: WebSocketTransportOptions( headers: { 'Authorization': 'Bearer $_authToken', }, onConnect: () => _onAuthenticatedConnect(), onDisconnect: (reason) => _onAuthenticatedDisconnect(reason), ), );
await _transport.connect(); }
void _onAuthenticatedConnect() { print('Authenticated connection established'); }
void _onAuthenticatedDisconnect(String reason) { if (reason.contains('unauthorized')) { // Token expired, re-authenticate _refreshAuthentication(); } }
Future<void> _refreshAuthentication() async { // Refresh token logic _authToken = await _refreshAuthToken(_authToken!);
// Update transport headers _transport.updateHeaders({ 'Authorization': 'Bearer $_authToken', }); }}
Optimize performance with message batching:
class BatchingWebSocketTransport extends RpcWebSocketTransport { final List<Message> _messageQueue = []; Timer? _batchTimer;
@override void sendMessage(Message message) { _messageQueue.add(message);
_batchTimer?.cancel(); _batchTimer = Timer(Duration(milliseconds: 10), _flushBatch); }
void _flushBatch() { if (_messageQueue.isNotEmpty) { final batch = MessageBatch(messages: List.from(_messageQueue)); super.sendMessage(batch); _messageQueue.clear(); } }}
For high-throughput scenarios:
class WebSocketConnectionPool { final List<RpcWebSocketTransport> _connections = []; final String _baseUrl; final int _poolSize; int _currentIndex = 0;
WebSocketConnectionPool(this._baseUrl, this._poolSize);
Future<void> initialize() async { for (int i = 0; i < _poolSize; i++) { final transport = RpcWebSocketTransport(url: '$_baseUrl/rpc'); await transport.connect(); _connections.add(transport); } }
RpcWebSocketTransport getConnection() { final connection = _connections[_currentIndex]; _currentIndex = (_currentIndex + 1) % _poolSize; return connection; }
Future<void> close() async { await Future.wait(_connections.map((c) => c.close())); _connections.clear(); }}
class WebSocketClient { Future<void> connect() async { try { await transport.connect(); await _initializeServices(); } catch (e) { print('Failed to connect: $e'); await _handleConnectionFailure(); } }
Future<void> _initializeServices() async { // Initialize services after connection await _subscribeToRequiredStreams(); await _syncInitialState(); }
Future<void> _handleConnectionFailure() async { // Implement fallback logic await _switchToHttpTransport(); }}
class AdaptiveClient { Future<void> makeCall() async { try { if (transport.isConnected) { return await _makeWebSocketCall(); } else { return await _makeHttpCall(); } } catch (e) { print('WebSocket call failed, falling back to HTTP: $e'); return await _makeHttpCall(); } }}
class PerformanceMonitor { void trackWebSocketMetrics(RpcWebSocketTransport transport) { transport.messagesSent.listen((count) { print('Messages sent: $count'); });
transport.messagesReceived.listen((count) { print('Messages received: $count'); });
transport.latency.listen((duration) { print('Current latency: ${duration.inMilliseconds}ms'); }); }}
WebSocket transport is the ideal choice for real-time applications, collaborative systems, and scenarios requiring persistent connections. Its bidirectional nature and streaming capabilities make it perfect for modern interactive applications while maintaining the robustness and type safety of RPC Dart.