Protocol Flexibility
Implement any communication protocol - from message queues to custom networking protocols to exotic transport mechanisms.
The Custom Transport feature allows you to create your own transport implementations to integrate RPC Dart with any communication protocol, messaging system, or framework. This gives you complete flexibility to adapt RPC Dart to your specific infrastructure requirements.
Protocol Flexibility
Implement any communication protocol - from message queues to custom networking protocols to exotic transport mechanisms.
Infrastructure Integration
Seamlessly integrate with existing infrastructure like Redis, RabbitMQ, Apache Kafka, or proprietary systems.
Performance Optimization
Optimize for your specific use case with custom serialization, compression, and connection management strategies.
Full Control
Complete control over message routing, error handling, security, and performance characteristics.
All custom transports must implement the RpcTransport
interface:
abstract class RpcTransport { /// Send a message through the transport Future<void> sendMessage(RpcMessage message);
/// Receive messages from the transport Stream<RpcMessage> get messageStream;
/// Initialize and connect the transport Future<void> connect();
/// Close the transport and clean up resources Future<void> close();
/// Check if the transport is connected bool get isConnected;
/// Transport-specific configuration Map<String, dynamic> get configuration;}
RPC messages follow a standardized structure:
class RpcMessage { final String id; // Unique message identifier final String contractName; // Service contract name final String methodName; // Method being called final RpcMessageType type; // Message type (request, response, error, etc.) final Map<String, String> headers; // Transport headers final dynamic payload; // Message payload final RpcContext? context; // RPC context
RpcMessage({ required this.id, required this.contractName, required this.methodName, required this.type, this.headers = const {}, this.payload, this.context, });}
enum RpcMessageType { request, response, streamStart, streamData, streamEnd, error, cancel,}
A transport using Redis pub/sub for communication:
import 'dart:async';import 'dart:convert';import 'package:redis/redis.dart';import 'package:rpc_dart/rpc_dart.dart';
class RpcRedisTransport implements RpcTransport { final RedisConnection _redis; final String _requestChannel; final String _responseChannel; final StreamController<RpcMessage> _messageController = StreamController.broadcast();
late Command _publisher; late PubSub _subscriber; bool _isConnected = false;
RpcRedisTransport({ required RedisConnection redis, required String requestChannel, required String responseChannel, }) : _redis = redis, _requestChannel = requestChannel, _responseChannel = responseChannel;
@override Stream<RpcMessage> get messageStream => _messageController.stream;
@override bool get isConnected => _isConnected;
@override Map<String, dynamic> get configuration => { 'transport_type': 'redis', 'request_channel': _requestChannel, 'response_channel': _responseChannel, };
@override Future<void> connect() async { if (_isConnected) return;
try { // Setup publisher for sending messages _publisher = await _redis.connect();
// Setup subscriber for receiving messages _subscriber = await _redis.subscribe(); await _subscriber.subscribe([_responseChannel]);
// Listen for incoming messages _subscriber.getStream().listen((message) { if (message.isNotEmpty) { _handleIncomingMessage(message.last); } });
_isConnected = true; print('Redis transport connected');
} catch (e) { print('Failed to connect Redis transport: $e'); rethrow; } }
@override Future<void> sendMessage(RpcMessage message) async { if (!_isConnected) { throw StateError('Transport not connected'); }
try { final serialized = _serializeMessage(message); await _publisher.send_object([ 'PUBLISH', _requestChannel, serialized, ]);
} catch (e) { print('Failed to send message: $e'); rethrow; } }
@override Future<void> close() async { if (!_isConnected) return;
try { await _subscriber.unsubscribe([_responseChannel]); await _publisher.get_connection().close(); await _messageController.close(); _isConnected = false;
} catch (e) { print('Error closing Redis transport: $e'); } }
void _handleIncomingMessage(String rawMessage) { try { final message = _deserializeMessage(rawMessage); _messageController.add(message);
} catch (e) { print('Failed to handle incoming message: $e'); } }
String _serializeMessage(RpcMessage message) { return jsonEncode({ 'id': message.id, 'contract_name': message.contractName, 'method_name': message.methodName, 'type': message.type.toString(), 'headers': message.headers, 'payload': message.payload, 'context': message.context?.toJson(), 'timestamp': DateTime.now().toIso8601String(), }); }
RpcMessage _deserializeMessage(String rawMessage) { final data = jsonDecode(rawMessage) as Map<String, dynamic>;
return RpcMessage( id: data['id'] as String, contractName: data['contract_name'] as String, methodName: data['method_name'] as String, type: RpcMessageType.values.firstWhere( (t) => t.toString() == data['type'], ), headers: Map<String, String>.from(data['headers'] ?? {}), payload: data['payload'], context: data['context'] != null ? RpcContext.fromJson(data['context']) : null, ); }}
void main() async { // Setup Redis connection final redis = RedisConnection();
// Create transport for client final clientTransport = RpcRedisTransport( redis: redis, requestChannel: 'rpc_requests', responseChannel: 'rpc_responses_client', );
// Create transport for server final serverTransport = RpcRedisTransport( redis: redis, requestChannel: 'rpc_requests', responseChannel: 'rpc_responses_server', );
// Setup server final responder = RpcResponderEndpoint(transport: serverTransport); responder.registerServiceContract(CalculatorResponder()); await responder.start();
// Setup client final caller = RpcCallerEndpoint(transport: clientTransport); final calculator = CalculatorCaller(caller);
try { // Make RPC call through Redis final result = await calculator.add(AddRequest(a: 10, b: 5)); print('Result: ${result.sum}');
} finally { await caller.close(); await responder.close(); }}
A transport using RabbitMQ for reliable message delivery:
import 'dart:async';import 'dart:convert';import 'package:dart_amqp/dart_amqp.dart';import 'package:rpc_dart/rpc_dart.dart';
class RpcRabbitMQTransport implements RpcTransport { final String _connectionString; final String _requestQueue; final String _responseQueue; final RabbitMQTransportOptions _options;
late Client _client; late Channel _channel; late Queue _requestQueueHandler; late Queue _responseQueueHandler;
final StreamController<RpcMessage> _messageController = StreamController.broadcast(); bool _isConnected = false;
RpcRabbitMQTransport({ required String connectionString, required String requestQueue, required String responseQueue, RabbitMQTransportOptions? options, }) : _connectionString = connectionString, _requestQueue = requestQueue, _responseQueue = responseQueue, _options = options ?? RabbitMQTransportOptions();
@override Stream<RpcMessage> get messageStream => _messageController.stream;
@override bool get isConnected => _isConnected;
@override Map<String, dynamic> get configuration => { 'transport_type': 'rabbitmq', 'request_queue': _requestQueue, 'response_queue': _responseQueue, 'durable': _options.durable, 'auto_delete': _options.autoDelete, };
@override Future<void> connect() async { if (_isConnected) return;
try { // Connect to RabbitMQ _client = Client(settings: ConnectionSettings.fromUri(_connectionString)); _channel = await _client.channel();
// Declare queues _requestQueueHandler = await _channel.queue( _requestQueue, durable: _options.durable, autoDelete: _options.autoDelete, );
_responseQueueHandler = await _channel.queue( _responseQueue, durable: _options.durable, autoDelete: _options.autoDelete, );
// Setup consumer for responses final consumer = await _responseQueueHandler.consume(); consumer.listen((AmqpMessage message) { _handleIncomingMessage(String.fromCharCodes(message.payload!)); message.ack(); });
_isConnected = true; print('RabbitMQ transport connected');
} catch (e) { print('Failed to connect RabbitMQ transport: $e'); rethrow; } }
@override Future<void> sendMessage(RpcMessage message) async { if (!_isConnected) { throw StateError('Transport not connected'); }
try { final serialized = _serializeMessage(message); final messageProperties = MessageProperties();
// Set message properties for reliability messageProperties.deliveryMode = _options.persistent ? 2 : 1; messageProperties.timestamp = DateTime.now(); messageProperties.messageId = message.id; messageProperties.correlationId = message.context?.correlationId;
await _requestQueueHandler.publish( serialized, properties: messageProperties, );
} catch (e) { print('Failed to send message: $e'); rethrow; } }
@override Future<void> close() async { if (!_isConnected) return;
try { await _channel.close(); await _client.close(); await _messageController.close(); _isConnected = false;
} catch (e) { print('Error closing RabbitMQ transport: $e'); } }
void _handleIncomingMessage(String rawMessage) { try { final message = _deserializeMessage(rawMessage); _messageController.add(message);
} catch (e) { print('Failed to handle incoming message: $e'); } }
String _serializeMessage(RpcMessage message) { return jsonEncode({ 'id': message.id, 'contract_name': message.contractName, 'method_name': message.methodName, 'type': message.type.toString(), 'headers': message.headers, 'payload': message.payload, 'context': message.context?.toJson(), 'timestamp': DateTime.now().toIso8601String(), }); }
RpcMessage _deserializeMessage(String rawMessage) { final data = jsonDecode(rawMessage) as Map<String, dynamic>;
return RpcMessage( id: data['id'] as String, contractName: data['contract_name'] as String, methodName: data['method_name'] as String, type: RpcMessageType.values.firstWhere( (t) => t.toString() == data['type'], ), headers: Map<String, String>.from(data['headers'] ?? {}), payload: data['payload'], context: data['context'] != null ? RpcContext.fromJson(data['context']) : null, ); }}
class RabbitMQTransportOptions { final bool durable; final bool autoDelete; final bool persistent; final int prefetchCount;
const RabbitMQTransportOptions({ this.durable = true, this.autoDelete = false, this.persistent = true, this.prefetchCount = 10, });}
class RabbitMQConfig { static const String defaultConnectionString = 'amqp://guest:guest@localhost:5672/';
static RpcRabbitMQTransport createReliableTransport({ required String requestQueue, required String responseQueue, String? connectionString, }) { return RpcRabbitMQTransport( connectionString: connectionString ?? defaultConnectionString, requestQueue: requestQueue, responseQueue: responseQueue, options: RabbitMQTransportOptions( durable: true, // Survive server restarts autoDelete: false, // Don't auto-delete queues persistent: true, // Persist messages to disk prefetchCount: 50, // Process up to 50 messages at once ), ); }
static RpcRabbitMQTransport createFastTransport({ required String requestQueue, required String responseQueue, String? connectionString, }) { return RpcRabbitMQTransport( connectionString: connectionString ?? defaultConnectionString, requestQueue: requestQueue, responseQueue: responseQueue, options: RabbitMQTransportOptions( durable: false, // Faster, but less reliable autoDelete: true, // Clean up automatically persistent: false, // In-memory only prefetchCount: 100, // Higher throughput ), ); }}
A transport using file system for asynchronous communication:
import 'dart:async';import 'dart:convert';import 'dart:io';import 'package:path/path.dart' as path;import 'package:rpc_dart/rpc_dart.dart';
class RpcFileSystemTransport implements RpcTransport { final String _basePath; final String _instanceId; final FileSystemTransportOptions _options;
late Directory _inboxDir; late Directory _outboxDir; late StreamSubscription _watcher;
final StreamController<RpcMessage> _messageController = StreamController.broadcast(); bool _isConnected = false;
RpcFileSystemTransport({ required String basePath, String? instanceId, FileSystemTransportOptions? options, }) : _basePath = basePath, _instanceId = instanceId ?? _generateInstanceId(), _options = options ?? FileSystemTransportOptions();
@override Stream<RpcMessage> get messageStream => _messageController.stream;
@override bool get isConnected => _isConnected;
@override Map<String, dynamic> get configuration => { 'transport_type': 'filesystem', 'base_path': _basePath, 'instance_id': _instanceId, 'polling_interval': _options.pollingInterval.inMilliseconds, };
@override Future<void> connect() async { if (_isConnected) return;
try { // Create directories _inboxDir = Directory(path.join(_basePath, 'inbox', _instanceId)); _outboxDir = Directory(path.join(_basePath, 'outbox', _instanceId));
await _inboxDir.create(recursive: true); await _outboxDir.create(recursive: true);
// Start watching for incoming messages _startWatching();
_isConnected = true; print('FileSystem transport connected at $_basePath');
} catch (e) { print('Failed to connect FileSystem transport: $e'); rethrow; } }
@override Future<void> sendMessage(RpcMessage message) async { if (!_isConnected) { throw StateError('Transport not connected'); }
try { final serialized = _serializeMessage(message); final fileName = '${message.id}_${DateTime.now().millisecondsSinceEpoch}.msg'; final filePath = path.join(_outboxDir.path, fileName);
final file = File(filePath); await file.writeAsString(serialized);
// Move to target inbox (atomic operation) final targetPath = path.join(_basePath, 'inbox', 'target', fileName); await file.rename(targetPath);
} catch (e) { print('Failed to send message: $e'); rethrow; } }
@override Future<void> close() async { if (!_isConnected) return;
try { await _watcher.cancel(); await _messageController.close();
// Cleanup directories if configured if (_options.cleanupOnClose) { await _inboxDir.delete(recursive: true); await _outboxDir.delete(recursive: true); }
_isConnected = false;
} catch (e) { print('Error closing FileSystem transport: $e'); } }
void _startWatching() { // Poll for new files (simple implementation) _watcher = Stream.periodic(_options.pollingInterval).listen((_) async { await _processIncomingFiles(); }); }
Future<void> _processIncomingFiles() async { try { final files = _inboxDir.listSync() .whereType<File>() .where((f) => f.path.endsWith('.msg')) .toList();
for (final file in files) { try { final content = await file.readAsString(); final message = _deserializeMessage(content); _messageController.add(message);
// Archive or delete processed file if (_options.archiveProcessed) { final archivePath = path.join(_basePath, 'archive', path.basename(file.path)); await file.rename(archivePath); } else { await file.delete(); }
} catch (e) { print('Failed to process file ${file.path}: $e'); // Move to error directory final errorPath = path.join(_basePath, 'error', path.basename(file.path)); await file.rename(errorPath); } }
} catch (e) { print('Error processing incoming files: $e'); } }
String _serializeMessage(RpcMessage message) { return jsonEncode({ 'id': message.id, 'contract_name': message.contractName, 'method_name': message.methodName, 'type': message.type.toString(), 'headers': message.headers, 'payload': message.payload, 'context': message.context?.toJson(), 'timestamp': DateTime.now().toIso8601String(), 'sender': _instanceId, }); }
RpcMessage _deserializeMessage(String rawMessage) { final data = jsonDecode(rawMessage) as Map<String, dynamic>;
return RpcMessage( id: data['id'] as String, contractName: data['contract_name'] as String, methodName: data['method_name'] as String, type: RpcMessageType.values.firstWhere( (t) => t.toString() == data['type'], ), headers: Map<String, String>.from(data['headers'] ?? {}), payload: data['payload'], context: data['context'] != null ? RpcContext.fromJson(data['context']) : null, ); }
static String _generateInstanceId() { return 'fs_${DateTime.now().millisecondsSinceEpoch}_${Platform.operatingSystem}'; }}
class FileSystemTransportOptions { final Duration pollingInterval; final bool cleanupOnClose; final bool archiveProcessed; final int maxFileAge;
const FileSystemTransportOptions({ this.pollingInterval = const Duration(seconds: 1), this.cleanupOnClose = false, this.archiveProcessed = true, this.maxFileAge = 3600, // 1 hour in seconds });}
Add middleware for cross-cutting concerns:
abstract class TransportMiddleware { Future<RpcMessage> onMessageSend(RpcMessage message, TransportNext next); Future<RpcMessage> onMessageReceive(RpcMessage message, TransportNext next);}
class LoggingMiddleware implements TransportMiddleware { @override Future<RpcMessage> onMessageSend(RpcMessage message, TransportNext next) async { print('Sending: ${message.contractName}.${message.methodName}'); final stopwatch = Stopwatch()..start();
try { final result = await next(message); stopwatch.stop(); print('Sent in ${stopwatch.elapsedMilliseconds}ms'); return result; } catch (e) { print('Send failed: $e'); rethrow; } }
@override Future<RpcMessage> onMessageReceive(RpcMessage message, TransportNext next) async { print('Received: ${message.contractName}.${message.methodName}'); return await next(message); }}
class CompressionMiddleware implements TransportMiddleware { @override Future<RpcMessage> onMessageSend(RpcMessage message, TransportNext next) async { // Compress large payloads if (_shouldCompress(message.payload)) { final compressed = await _compressPayload(message.payload); final compressedMessage = message.copyWith( payload: compressed, headers: {...message.headers, 'compressed': 'gzip'}, ); return await next(compressedMessage); } return await next(message); }
@override Future<RpcMessage> onMessageReceive(RpcMessage message, TransportNext next) async { // Decompress if needed if (message.headers['compressed'] == 'gzip') { final decompressed = await _decompressPayload(message.payload); final decompressedMessage = message.copyWith(payload: decompressed); return await next(decompressedMessage); } return await next(message); }}
Create a factory for managing different transport types:
class TransportFactory { static final Map<String, TransportBuilder> _builders = {};
static void registerTransport<T extends RpcTransport>( String name, TransportBuilder<T> builder, ) { _builders[name] = builder; }
static RpcTransport create(String name, Map<String, dynamic> config) { final builder = _builders[name]; if (builder == null) { throw ArgumentError('Unknown transport type: $name'); } return builder(config); }
static void registerDefaultTransports() { registerTransport('redis', (config) => RpcRedisTransport( redis: config['redis'], requestChannel: config['request_channel'], responseChannel: config['response_channel'], ));
registerTransport('rabbitmq', (config) => RpcRabbitMQTransport( connectionString: config['connection_string'], requestQueue: config['request_queue'], responseQueue: config['response_queue'], ));
registerTransport('filesystem', (config) => RpcFileSystemTransport( basePath: config['base_path'], instanceId: config['instance_id'], )); }}
typedef TransportBuilder<T extends RpcTransport> = T Function(Map<String, dynamic> config);
// Usagevoid main() { TransportFactory.registerDefaultTransports();
final transport = TransportFactory.create('redis', { 'redis': redisConnection, 'request_channel': 'rpc_requests', 'response_channel': 'rpc_responses', });}
Create comprehensive tests for your custom transport:
import 'package:test/test.dart';import 'package:rpc_dart/rpc_dart.dart';
abstract class TransportTestSuite { RpcTransport createClientTransport(); RpcTransport createServerTransport();
void runTests() { group('Transport Tests', () { late RpcTransport clientTransport; late RpcTransport serverTransport;
setUp(() async { clientTransport = createClientTransport(); serverTransport = createServerTransport(); await clientTransport.connect(); await serverTransport.connect(); });
tearDown(() async { await clientTransport.close(); await serverTransport.close(); });
test('should connect successfully', () { expect(clientTransport.isConnected, isTrue); expect(serverTransport.isConnected, isTrue); });
test('should send and receive messages', () async { final completer = Completer<RpcMessage>();
serverTransport.messageStream.listen((message) { completer.complete(message); });
final testMessage = RpcMessage( id: 'test-123', contractName: 'TestContract', methodName: 'testMethod', type: RpcMessageType.request, payload: {'test': 'data'}, );
await clientTransport.sendMessage(testMessage);
final receivedMessage = await completer.future.timeout(Duration(seconds: 5));
expect(receivedMessage.id, equals(testMessage.id)); expect(receivedMessage.contractName, equals(testMessage.contractName)); expect(receivedMessage.methodName, equals(testMessage.methodName)); });
test('should handle large messages', () async { final largePayload = List.generate(10000, (i) => 'data_$i');
final completer = Completer<RpcMessage>(); serverTransport.messageStream.listen((message) { completer.complete(message); });
final largeMessage = RpcMessage( id: 'large-test', contractName: 'TestContract', methodName: 'testLarge', type: RpcMessageType.request, payload: largePayload, );
await clientTransport.sendMessage(largeMessage); final received = await completer.future.timeout(Duration(seconds: 10));
expect(received.payload, equals(largePayload)); });
test('should handle connection failures gracefully', () async { await clientTransport.close();
expect(() => clientTransport.sendMessage(RpcMessage( id: 'fail-test', contractName: 'Test', methodName: 'test', type: RpcMessageType.request, )), throwsStateError); }); }); }}
// Example usage for Redis transportclass RedisTransportTest extends TransportTestSuite { @override RpcTransport createClientTransport() { return RpcRedisTransport( redis: RedisConnection(), requestChannel: 'test_requests', responseChannel: 'test_responses_client', ); }
@override RpcTransport createServerTransport() { return RpcRedisTransport( redis: RedisConnection(), requestChannel: 'test_requests', responseChannel: 'test_responses_server', ); }}
void main() { RedisTransportTest().runTests();}
Implement robust error handling:
class RobustCustomTransport implements RpcTransport { @override Future<void> sendMessage(RpcMessage message) async { int retryCount = 0; const maxRetries = 3;
while (retryCount < maxRetries) { try { await _doSendMessage(message); return; } on TransportException catch (e) { retryCount++; if (retryCount >= maxRetries) rethrow;
await Future.delayed(Duration(seconds: retryCount)); print('Retrying send (attempt $retryCount): ${e.message}'); } } }
void _handleError(dynamic error, StackTrace stackTrace) { if (error is ConnectionException) { _handleConnectionError(error); } else if (error is SerializationException) { _handleSerializationError(error); } else { _handleUnknownError(error, stackTrace); } }}
Add performance monitoring:
class MonitoredTransport implements RpcTransport { final RpcTransport _underlying; final TransportMetrics _metrics = TransportMetrics();
MonitoredTransport(this._underlying);
@override Future<void> sendMessage(RpcMessage message) async { final stopwatch = Stopwatch()..start();
try { await _underlying.sendMessage(message); stopwatch.stop();
_metrics.recordSendSuccess(stopwatch.elapsed); } catch (e) { stopwatch.stop(); _metrics.recordSendFailure(stopwatch.elapsed, e); rethrow; } }
TransportMetrics get metrics => _metrics;}
class TransportMetrics { int _messagesSent = 0; int _messagesReceived = 0; int _sendFailures = 0; int _receiveFailures = 0;
final List<Duration> _sendLatencies = []; final List<Duration> _receiveLatencies = [];
void recordSendSuccess(Duration latency) { _messagesSent++; _sendLatencies.add(latency); _trimLatencies(); }
void recordSendFailure(Duration latency, dynamic error) { _sendFailures++; _sendLatencies.add(latency); _trimLatencies(); }
double get averageSendLatency { if (_sendLatencies.isEmpty) return 0.0; final total = _sendLatencies.fold(Duration.zero, (a, b) => a + b); return total.inMicroseconds / _sendLatencies.length / 1000.0; // ms }
double get successRate { final total = _messagesSent + _sendFailures; return total > 0 ? _messagesSent / total : 0.0; }
void _trimLatencies() { // Keep only recent measurements const maxSamples = 1000; if (_sendLatencies.length > maxSamples) { _sendLatencies.removeAt(0); } }}
Use structured configuration:
class TransportConfig { final String type; final Map<String, dynamic> settings; final Duration connectionTimeout; final Duration messageTimeout; final int maxRetries; final bool enableMetrics;
const TransportConfig({ required this.type, this.settings = const {}, this.connectionTimeout = const Duration(seconds: 30), this.messageTimeout = const Duration(seconds: 60), this.maxRetries = 3, this.enableMetrics = true, });
factory TransportConfig.fromJson(Map<String, dynamic> json) { return TransportConfig( type: json['type'] as String, settings: json['settings'] as Map<String, dynamic>? ?? {}, connectionTimeout: Duration(seconds: json['connection_timeout'] ?? 30), messageTimeout: Duration(seconds: json['message_timeout'] ?? 60), maxRetries: json['max_retries'] ?? 3, enableMetrics: json['enable_metrics'] ?? true, ); }
Map<String, dynamic> toJson() => { 'type': type, 'settings': settings, 'connection_timeout': connectionTimeout.inSeconds, 'message_timeout': messageTimeout.inSeconds, 'max_retries': maxRetries, 'enable_metrics': enableMetrics, };}
Custom transports give you the ultimate flexibility to integrate RPC Dart with any communication system or protocol. Whether you need to work with message queues, databases, files, or exotic networking protocols, the transport interface provides all the tools you need to build robust, performant RPC communication.