Skip to content

WebSocket Transport

The WebSocket Transport provides real-time, bidirectional communication between RPC Dart clients and servers. It’s ideal for web applications, real-time systems, and scenarios where persistent connections are preferred over request-response patterns.

Real-time Communication

Full-duplex communication with instant message delivery, perfect for chat applications, live updates, and collaborative tools.

Cross-platform Support

Works seamlessly in browsers, Flutter apps, and server applications with consistent behavior across platforms.

Connection Management

Automatic reconnection, heartbeat monitoring, and graceful degradation for robust network handling.

Stream Support

Native support for all RPC streaming patterns with efficient message multiplexing over a single connection.

WebSocket transport enables real-time communication in both directions:

// Server can push updates to clients
class NotificationService extends RpcResponderContract {
final StreamController<NotificationEvent> _notifications = StreamController.broadcast();
NotificationService() : super('NotificationService');
@override
void setup() {
addServerStreamMethod<Empty, NotificationEvent>(
methodName: 'subscribe',
handler: _subscribe,
);
addUnaryMethod<NotificationRequest, Empty>(
methodName: 'notify',
handler: _notify,
);
}
Stream<NotificationEvent> _subscribe(Empty request, {RpcContext? context}) {
return _notifications.stream;
}
Future<Empty> _notify(NotificationRequest request, {RpcContext? context}) async {
_notifications.add(NotificationEvent(
type: request.type,
message: request.message,
timestamp: DateTime.now(),
));
return Empty();
}
}

Unlike HTTP requests, WebSocket connections remain open:

// Connection stays alive for multiple operations
final transport = RpcWebSocketTransport(
url: 'ws://localhost:8080/rpc',
options: WebSocketTransportOptions(
heartbeatInterval: Duration(seconds: 30),
reconnectInterval: Duration(seconds: 5),
maxReconnectAttempts: 10,
),
);
final caller = RpcCallerEndpoint(transport: transport);
// Multiple calls over the same connection
for (int i = 0; i < 100; i++) {
final result = await service.process(DataRequest(id: i));
print('Processed: ${result.id}');
}

Setting up a WebSocket server for RPC Dart:

websocket_server.dart
import 'dart:io';
import 'package:rpc_dart/rpc_dart.dart';
import 'package:rpc_dart_transports/rpc_dart_transports.dart';
class WebSocketRpcServer {
late HttpServer _server;
final List<RpcResponderEndpoint> _endpoints = [];
Future<void> start({
String host = 'localhost',
int port = 8080,
String path = '/rpc',
}) async {
_server = await HttpServer.bind(host, port);
print('WebSocket RPC Server listening on ws://$host:$port$path');
await for (HttpRequest request in _server) {
if (request.uri.path == path) {
await _handleWebSocketConnection(request);
} else {
request.response.statusCode = HttpStatus.notFound;
await request.response.close();
}
}
}
Future<void> _handleWebSocketConnection(HttpRequest request) async {
try {
final socket = await WebSocketTransformer.upgrade(request);
final transport = RpcWebSocketServerTransport(socket);
final endpoint = RpcResponderEndpoint(transport: transport);
// Register your service contracts
endpoint.registerServiceContract(CalculatorResponder());
endpoint.registerServiceContract(NotificationService());
endpoint.registerServiceContract(FileService());
_endpoints.add(endpoint);
// Start handling RPC calls
await endpoint.start();
// Clean up when connection closes
socket.done.then((_) {
_endpoints.remove(endpoint);
endpoint.close();
});
} catch (e) {
print('Error handling WebSocket connection: $e');
request.response.statusCode = HttpStatus.badRequest;
await request.response.close();
}
}
Future<void> stop() async {
await _server.close();
await Future.wait(_endpoints.map((e) => e.close()));
_endpoints.clear();
}
}

Connecting to a WebSocket RPC server:

import 'package:rpc_dart/rpc_dart.dart';
import 'package:rpc_dart_transports/rpc_dart_transports.dart';
void main() async {
// Create WebSocket transport
final transport = RpcWebSocketTransport(
url: 'ws://localhost:8080/rpc',
options: WebSocketTransportOptions(
heartbeatInterval: Duration(seconds: 30),
reconnectInterval: Duration(seconds: 5),
maxReconnectAttempts: 10,
connectTimeout: Duration(seconds: 10),
),
);
// Create caller endpoint
final caller = RpcCallerEndpoint(transport: transport);
final calculator = CalculatorCaller(caller);
try {
// Wait for connection
await transport.connect();
// Make RPC calls
final sum = await calculator.add(MathRequest(a: 10, b: 5));
print('10 + 5 = ${sum.result}');
// Use streaming methods
await for (final number in calculator.generateRange(RangeRequest(start: 1, end: 5))) {
print('Generated: ${number.value}');
}
} finally {
await caller.close();
}
}

Configure WebSocket transport behavior:

final transport = RpcWebSocketTransport(
url: 'wss://secure-api.example.com/rpc',
options: WebSocketTransportOptions(
// Connection settings
connectTimeout: Duration(seconds: 10),
protocols: ['rpc-dart-v1'],
headers: {
'Authorization': 'Bearer $token',
'User-Agent': 'RpcDartClient/1.0',
},
// Heartbeat and reconnection
heartbeatInterval: Duration(seconds: 30),
reconnectInterval: Duration(seconds: 5),
maxReconnectAttempts: 10,
backoffMultiplier: 1.5,
// Message handling
maxMessageSize: 1024 * 1024, // 1MB
compression: CompressionOptions.defaultDeflate,
// Connection lifecycle
onConnect: () => print('Connected to RPC server'),
onDisconnect: (reason) => print('Disconnected: $reason'),
onReconnect: (attempt) => print('Reconnecting (attempt $attempt)'),
onError: (error) => print('Connection error: $error'),
),
);

Perfect for applications requiring instant updates:

class ChatApplication {
late RpcCallerEndpoint _caller;
late ChatServiceCaller _chatService;
Future<void> connect(String serverUrl) async {
final transport = RpcWebSocketTransport(url: serverUrl);
_caller = RpcCallerEndpoint(transport: transport);
_chatService = ChatServiceCaller(_caller);
await transport.connect();
}
Future<void> startChat(String roomId) async {
// Subscribe to messages
await for (final message in _chatService.subscribeToMessages(RoomRequest(roomId: roomId))) {
displayMessage(message);
}
}
Future<void> sendMessage(String roomId, String text) async {
await _chatService.sendMessage(SendMessageRequest(
roomId: roomId,
text: text,
timestamp: DateTime.now(),
));
}
void displayMessage(ChatMessage message) {
print('${message.author}: ${message.text}');
}
}

Stream real-time data efficiently:

class StockPriceMonitor {
late RpcCallerEndpoint _caller;
late MarketDataCaller _marketData;
Future<void> monitorStocks(List<String> symbols) async {
final transport = RpcWebSocketTransport(url: 'wss://market-data.example.com/rpc');
_caller = RpcCallerEndpoint(transport: transport);
_marketData = MarketDataCaller(_caller);
await transport.connect();
// Subscribe to price updates
final subscription = _marketData.subscribeToPrices(SymbolListRequest(symbols: symbols));
await for (final priceUpdate in subscription) {
updateUI(priceUpdate);
}
}
void updateUI(PriceUpdate update) {
print('${update.symbol}: \$${update.price} (${update.change > 0 ? '+' : ''}${update.change})');
}
}

Enable real-time collaboration:

class CollaborativeEditor {
late RpcCallerEndpoint _caller;
late EditorServiceCaller _editorService;
Future<void> joinDocument(String documentId) async {
final transport = RpcWebSocketTransport(url: 'wss://collab.example.com/rpc');
_caller = RpcCallerEndpoint(transport: transport);
_editorService = EditorServiceCaller(_caller);
await transport.connect();
// Subscribe to document changes
final changes = _editorService.subscribeToChanges(DocumentRequest(documentId: documentId));
await for (final change in changes) {
applyChange(change);
}
}
Future<void> makeEdit(String documentId, EditOperation operation) async {
await _editorService.applyEdit(EditRequest(
documentId: documentId,
operation: operation,
authorId: currentUserId,
));
}
void applyChange(DocumentChange change) {
// Apply the change to the local document
editor.applyOperation(change.operation);
showAuthorCursor(change.authorId, change.position);
}
}

WebSocket transport provides efficient streaming capabilities:

// Server pushes multiple responses
class LogStreamService extends RpcResponderContract {
LogStreamService() : super('LogStream');
@override
void setup() {
addServerStreamMethod<LogFilter, LogEntry>(
methodName: 'streamLogs',
handler: _streamLogs,
);
}
Stream<LogEntry> _streamLogs(LogFilter filter, {RpcContext? context}) async* {
final logFile = File('application.log');
final stream = logFile.openRead();
await for (final line in stream.transform(utf8.decoder).transform(LineSplitter())) {
if (matchesFilter(line, filter)) {
yield LogEntry(
message: line,
timestamp: DateTime.now(),
level: extractLogLevel(line),
);
}
}
}
}
// Client consumption
await for (final logEntry in logService.streamLogs(LogFilter(level: 'ERROR'))) {
print('${logEntry.timestamp}: ${logEntry.message}');
}
// Client sends multiple requests
class BatchUploadService extends RpcResponderContract {
BatchUploadService() : super('BatchUpload');
@override
void setup() {
addClientStreamMethod<FileChunk, UploadResult>(
methodName: 'uploadFile',
handler: _uploadFile,
);
}
Future<UploadResult> _uploadFile(Stream<FileChunk> chunks, {RpcContext? context}) async {
final buffer = BytesBuilder();
int totalChunks = 0;
await for (final chunk in chunks) {
buffer.add(chunk.data);
totalChunks++;
}
final fileBytes = buffer.toBytes();
final filePath = await saveFile(fileBytes);
return UploadResult(
filePath: filePath,
size: fileBytes.length,
chunks: totalChunks,
);
}
}
// Client usage
Stream<FileChunk> uploadFile(File file) async* {
final bytes = await file.readAsBytes();
const chunkSize = 64 * 1024; // 64KB chunks
for (int i = 0; i < bytes.length; i += chunkSize) {
final end = math.min(i + chunkSize, bytes.length);
yield FileChunk(
data: bytes.sublist(i, end),
sequenceNumber: i ~/ chunkSize,
);
}
}
final result = await uploadService.uploadFile(uploadFile(myFile));
// Real-time processing with feedback
class ProcessingService extends RpcResponderContract {
ProcessingService() : super('Processing');
@override
void setup() {
addBidirectionalStreamMethod<ProcessRequest, ProcessResponse>(
methodName: 'processStream',
handler: _processStream,
);
}
Stream<ProcessResponse> _processStream(
Stream<ProcessRequest> requests,
{RpcContext? context}
) async* {
await for (final request in requests) {
try {
final result = await processData(request.data);
yield ProcessResponse(
id: request.id,
success: true,
result: result,
);
} catch (e) {
yield ProcessResponse(
id: request.id,
success: false,
error: e.toString(),
);
}
}
}
}

WebSocket transport handles connection issues gracefully:

class ResilientClient {
late RpcWebSocketTransport _transport;
late RpcCallerEndpoint _caller;
late ServiceCaller _service;
Future<void> connect(String url) async {
_transport = RpcWebSocketTransport(
url: url,
options: WebSocketTransportOptions(
reconnectInterval: Duration(seconds: 5),
maxReconnectAttempts: 10,
backoffMultiplier: 1.5,
onReconnect: (attempt) {
print('Reconnection attempt $attempt');
},
onConnect: () {
print('Connected to server');
_onConnected();
},
onDisconnect: (reason) {
print('Disconnected: $reason');
_onDisconnected();
},
),
);
_caller = RpcCallerEndpoint(transport: _transport);
_service = ServiceCaller(_caller);
await _transport.connect();
}
void _onConnected() {
// Resume operations
_resumeSubscriptions();
}
void _onDisconnected() {
// Handle disconnection
_pauseOperations();
}
Future<void> _resumeSubscriptions() async {
// Resubscribe to streams that were active
await _subscribeToNotifications();
}
void _pauseOperations() {
// Queue operations for retry
_queueOperations = true;
}
}
class ConnectionMonitor {
final RpcWebSocketTransport transport;
ConnectionMonitor(this.transport) {
_setupMonitoring();
}
void _setupMonitoring() {
transport.connectionState.listen((state) {
switch (state) {
case ConnectionState.connecting:
showStatus('Connecting...');
break;
case ConnectionState.connected:
showStatus('Connected');
break;
case ConnectionState.disconnected:
showStatus('Disconnected');
break;
case ConnectionState.reconnecting:
showStatus('Reconnecting...');
break;
case ConnectionState.failed:
showStatus('Connection failed');
break;
}
});
}
void showStatus(String message) {
print('Connection: $message');
}
}

Use WSS for production:

final transport = RpcWebSocketTransport(
url: 'wss://secure-api.example.com/rpc',
options: WebSocketTransportOptions(
headers: {
'Authorization': 'Bearer $authToken',
},
// Additional security headers
extraHeaders: {
'X-API-Key': apiKey,
'X-Client-Version': clientVersion,
},
),
);

Implement proper authentication:

class AuthenticatedWebSocketClient {
String? _authToken;
late RpcWebSocketTransport _transport;
Future<void> authenticate(String username, String password) async {
// Get auth token from auth service
_authToken = await _getAuthToken(username, password);
// Create transport with auth headers
_transport = RpcWebSocketTransport(
url: 'wss://api.example.com/rpc',
options: WebSocketTransportOptions(
headers: {
'Authorization': 'Bearer $_authToken',
},
onConnect: () => _onAuthenticatedConnect(),
onDisconnect: (reason) => _onAuthenticatedDisconnect(reason),
),
);
await _transport.connect();
}
void _onAuthenticatedConnect() {
print('Authenticated connection established');
}
void _onAuthenticatedDisconnect(String reason) {
if (reason.contains('unauthorized')) {
// Token expired, re-authenticate
_refreshAuthentication();
}
}
Future<void> _refreshAuthentication() async {
// Refresh token logic
_authToken = await _refreshAuthToken(_authToken!);
// Update transport headers
_transport.updateHeaders({
'Authorization': 'Bearer $_authToken',
});
}
}

Optimize performance with message batching:

class BatchingWebSocketTransport extends RpcWebSocketTransport {
final List<Message> _messageQueue = [];
Timer? _batchTimer;
@override
void sendMessage(Message message) {
_messageQueue.add(message);
_batchTimer?.cancel();
_batchTimer = Timer(Duration(milliseconds: 10), _flushBatch);
}
void _flushBatch() {
if (_messageQueue.isNotEmpty) {
final batch = MessageBatch(messages: List.from(_messageQueue));
super.sendMessage(batch);
_messageQueue.clear();
}
}
}

For high-throughput scenarios:

class WebSocketConnectionPool {
final List<RpcWebSocketTransport> _connections = [];
final String _baseUrl;
final int _poolSize;
int _currentIndex = 0;
WebSocketConnectionPool(this._baseUrl, this._poolSize);
Future<void> initialize() async {
for (int i = 0; i < _poolSize; i++) {
final transport = RpcWebSocketTransport(url: '$_baseUrl/rpc');
await transport.connect();
_connections.add(transport);
}
}
RpcWebSocketTransport getConnection() {
final connection = _connections[_currentIndex];
_currentIndex = (_currentIndex + 1) % _poolSize;
return connection;
}
Future<void> close() async {
await Future.wait(_connections.map((c) => c.close()));
_connections.clear();
}
}
class WebSocketClient {
Future<void> connect() async {
try {
await transport.connect();
await _initializeServices();
} catch (e) {
print('Failed to connect: $e');
await _handleConnectionFailure();
}
}
Future<void> _initializeServices() async {
// Initialize services after connection
await _subscribeToRequiredStreams();
await _syncInitialState();
}
Future<void> _handleConnectionFailure() async {
// Implement fallback logic
await _switchToHttpTransport();
}
}
class AdaptiveClient {
Future<void> makeCall() async {
try {
if (transport.isConnected) {
return await _makeWebSocketCall();
} else {
return await _makeHttpCall();
}
} catch (e) {
print('WebSocket call failed, falling back to HTTP: $e');
return await _makeHttpCall();
}
}
}
class PerformanceMonitor {
void trackWebSocketMetrics(RpcWebSocketTransport transport) {
transport.messagesSent.listen((count) {
print('Messages sent: $count');
});
transport.messagesReceived.listen((count) {
print('Messages received: $count');
});
transport.latency.listen((duration) {
print('Current latency: ${duration.inMilliseconds}ms');
});
}
}

WebSocket transport is the ideal choice for real-time applications, collaborative systems, and scenarios requiring persistent connections. Its bidirectional nature and streaming capabilities make it perfect for modern interactive applications while maintaining the robustness and type safety of RPC Dart.