Skip to content

Custom Transport

The Custom Transport feature allows you to create your own transport implementations to integrate RPC Dart with any communication protocol, messaging system, or framework. This gives you complete flexibility to adapt RPC Dart to your specific infrastructure requirements.

Protocol Flexibility

Implement any communication protocol - from message queues to custom networking protocols to exotic transport mechanisms.

Infrastructure Integration

Seamlessly integrate with existing infrastructure like Redis, RabbitMQ, Apache Kafka, or proprietary systems.

Performance Optimization

Optimize for your specific use case with custom serialization, compression, and connection management strategies.

Full Control

Complete control over message routing, error handling, security, and performance characteristics.

All custom transports must implement the RpcTransport interface:

abstract class RpcTransport {
/// Send a message through the transport
Future<void> sendMessage(RpcMessage message);
/// Receive messages from the transport
Stream<RpcMessage> get messageStream;
/// Initialize and connect the transport
Future<void> connect();
/// Close the transport and clean up resources
Future<void> close();
/// Check if the transport is connected
bool get isConnected;
/// Transport-specific configuration
Map<String, dynamic> get configuration;
}

RPC messages follow a standardized structure:

class RpcMessage {
final String id; // Unique message identifier
final String contractName; // Service contract name
final String methodName; // Method being called
final RpcMessageType type; // Message type (request, response, error, etc.)
final Map<String, String> headers; // Transport headers
final dynamic payload; // Message payload
final RpcContext? context; // RPC context
RpcMessage({
required this.id,
required this.contractName,
required this.methodName,
required this.type,
this.headers = const {},
this.payload,
this.context,
});
}
enum RpcMessageType {
request,
response,
streamStart,
streamData,
streamEnd,
error,
cancel,
}

A transport using Redis pub/sub for communication:

redis_transport.dart
import 'dart:async';
import 'dart:convert';
import 'package:redis/redis.dart';
import 'package:rpc_dart/rpc_dart.dart';
class RpcRedisTransport implements RpcTransport {
final RedisConnection _redis;
final String _requestChannel;
final String _responseChannel;
final StreamController<RpcMessage> _messageController = StreamController.broadcast();
late Command _publisher;
late PubSub _subscriber;
bool _isConnected = false;
RpcRedisTransport({
required RedisConnection redis,
required String requestChannel,
required String responseChannel,
}) : _redis = redis,
_requestChannel = requestChannel,
_responseChannel = responseChannel;
@override
Stream<RpcMessage> get messageStream => _messageController.stream;
@override
bool get isConnected => _isConnected;
@override
Map<String, dynamic> get configuration => {
'transport_type': 'redis',
'request_channel': _requestChannel,
'response_channel': _responseChannel,
};
@override
Future<void> connect() async {
if (_isConnected) return;
try {
// Setup publisher for sending messages
_publisher = await _redis.connect();
// Setup subscriber for receiving messages
_subscriber = await _redis.subscribe();
await _subscriber.subscribe([_responseChannel]);
// Listen for incoming messages
_subscriber.getStream().listen((message) {
if (message.isNotEmpty) {
_handleIncomingMessage(message.last);
}
});
_isConnected = true;
print('Redis transport connected');
} catch (e) {
print('Failed to connect Redis transport: $e');
rethrow;
}
}
@override
Future<void> sendMessage(RpcMessage message) async {
if (!_isConnected) {
throw StateError('Transport not connected');
}
try {
final serialized = _serializeMessage(message);
await _publisher.send_object([
'PUBLISH',
_requestChannel,
serialized,
]);
} catch (e) {
print('Failed to send message: $e');
rethrow;
}
}
@override
Future<void> close() async {
if (!_isConnected) return;
try {
await _subscriber.unsubscribe([_responseChannel]);
await _publisher.get_connection().close();
await _messageController.close();
_isConnected = false;
} catch (e) {
print('Error closing Redis transport: $e');
}
}
void _handleIncomingMessage(String rawMessage) {
try {
final message = _deserializeMessage(rawMessage);
_messageController.add(message);
} catch (e) {
print('Failed to handle incoming message: $e');
}
}
String _serializeMessage(RpcMessage message) {
return jsonEncode({
'id': message.id,
'contract_name': message.contractName,
'method_name': message.methodName,
'type': message.type.toString(),
'headers': message.headers,
'payload': message.payload,
'context': message.context?.toJson(),
'timestamp': DateTime.now().toIso8601String(),
});
}
RpcMessage _deserializeMessage(String rawMessage) {
final data = jsonDecode(rawMessage) as Map<String, dynamic>;
return RpcMessage(
id: data['id'] as String,
contractName: data['contract_name'] as String,
methodName: data['method_name'] as String,
type: RpcMessageType.values.firstWhere(
(t) => t.toString() == data['type'],
),
headers: Map<String, String>.from(data['headers'] ?? {}),
payload: data['payload'],
context: data['context'] != null
? RpcContext.fromJson(data['context'])
: null,
);
}
}

A transport using RabbitMQ for reliable message delivery:

rabbitmq_transport.dart
import 'dart:async';
import 'dart:convert';
import 'package:dart_amqp/dart_amqp.dart';
import 'package:rpc_dart/rpc_dart.dart';
class RpcRabbitMQTransport implements RpcTransport {
final String _connectionString;
final String _requestQueue;
final String _responseQueue;
final RabbitMQTransportOptions _options;
late Client _client;
late Channel _channel;
late Queue _requestQueueHandler;
late Queue _responseQueueHandler;
final StreamController<RpcMessage> _messageController = StreamController.broadcast();
bool _isConnected = false;
RpcRabbitMQTransport({
required String connectionString,
required String requestQueue,
required String responseQueue,
RabbitMQTransportOptions? options,
}) : _connectionString = connectionString,
_requestQueue = requestQueue,
_responseQueue = responseQueue,
_options = options ?? RabbitMQTransportOptions();
@override
Stream<RpcMessage> get messageStream => _messageController.stream;
@override
bool get isConnected => _isConnected;
@override
Map<String, dynamic> get configuration => {
'transport_type': 'rabbitmq',
'request_queue': _requestQueue,
'response_queue': _responseQueue,
'durable': _options.durable,
'auto_delete': _options.autoDelete,
};
@override
Future<void> connect() async {
if (_isConnected) return;
try {
// Connect to RabbitMQ
_client = Client(settings: ConnectionSettings.fromUri(_connectionString));
_channel = await _client.channel();
// Declare queues
_requestQueueHandler = await _channel.queue(
_requestQueue,
durable: _options.durable,
autoDelete: _options.autoDelete,
);
_responseQueueHandler = await _channel.queue(
_responseQueue,
durable: _options.durable,
autoDelete: _options.autoDelete,
);
// Setup consumer for responses
final consumer = await _responseQueueHandler.consume();
consumer.listen((AmqpMessage message) {
_handleIncomingMessage(String.fromCharCodes(message.payload!));
message.ack();
});
_isConnected = true;
print('RabbitMQ transport connected');
} catch (e) {
print('Failed to connect RabbitMQ transport: $e');
rethrow;
}
}
@override
Future<void> sendMessage(RpcMessage message) async {
if (!_isConnected) {
throw StateError('Transport not connected');
}
try {
final serialized = _serializeMessage(message);
final messageProperties = MessageProperties();
// Set message properties for reliability
messageProperties.deliveryMode = _options.persistent ? 2 : 1;
messageProperties.timestamp = DateTime.now();
messageProperties.messageId = message.id;
messageProperties.correlationId = message.context?.correlationId;
await _requestQueueHandler.publish(
serialized,
properties: messageProperties,
);
} catch (e) {
print('Failed to send message: $e');
rethrow;
}
}
@override
Future<void> close() async {
if (!_isConnected) return;
try {
await _channel.close();
await _client.close();
await _messageController.close();
_isConnected = false;
} catch (e) {
print('Error closing RabbitMQ transport: $e');
}
}
void _handleIncomingMessage(String rawMessage) {
try {
final message = _deserializeMessage(rawMessage);
_messageController.add(message);
} catch (e) {
print('Failed to handle incoming message: $e');
}
}
String _serializeMessage(RpcMessage message) {
return jsonEncode({
'id': message.id,
'contract_name': message.contractName,
'method_name': message.methodName,
'type': message.type.toString(),
'headers': message.headers,
'payload': message.payload,
'context': message.context?.toJson(),
'timestamp': DateTime.now().toIso8601String(),
});
}
RpcMessage _deserializeMessage(String rawMessage) {
final data = jsonDecode(rawMessage) as Map<String, dynamic>;
return RpcMessage(
id: data['id'] as String,
contractName: data['contract_name'] as String,
methodName: data['method_name'] as String,
type: RpcMessageType.values.firstWhere(
(t) => t.toString() == data['type'],
),
headers: Map<String, String>.from(data['headers'] ?? {}),
payload: data['payload'],
context: data['context'] != null
? RpcContext.fromJson(data['context'])
: null,
);
}
}
class RabbitMQTransportOptions {
final bool durable;
final bool autoDelete;
final bool persistent;
final int prefetchCount;
const RabbitMQTransportOptions({
this.durable = true,
this.autoDelete = false,
this.persistent = true,
this.prefetchCount = 10,
});
}

A transport using file system for asynchronous communication:

filesystem_transport.dart
import 'dart:async';
import 'dart:convert';
import 'dart:io';
import 'package:path/path.dart' as path;
import 'package:rpc_dart/rpc_dart.dart';
class RpcFileSystemTransport implements RpcTransport {
final String _basePath;
final String _instanceId;
final FileSystemTransportOptions _options;
late Directory _inboxDir;
late Directory _outboxDir;
late StreamSubscription _watcher;
final StreamController<RpcMessage> _messageController = StreamController.broadcast();
bool _isConnected = false;
RpcFileSystemTransport({
required String basePath,
String? instanceId,
FileSystemTransportOptions? options,
}) : _basePath = basePath,
_instanceId = instanceId ?? _generateInstanceId(),
_options = options ?? FileSystemTransportOptions();
@override
Stream<RpcMessage> get messageStream => _messageController.stream;
@override
bool get isConnected => _isConnected;
@override
Map<String, dynamic> get configuration => {
'transport_type': 'filesystem',
'base_path': _basePath,
'instance_id': _instanceId,
'polling_interval': _options.pollingInterval.inMilliseconds,
};
@override
Future<void> connect() async {
if (_isConnected) return;
try {
// Create directories
_inboxDir = Directory(path.join(_basePath, 'inbox', _instanceId));
_outboxDir = Directory(path.join(_basePath, 'outbox', _instanceId));
await _inboxDir.create(recursive: true);
await _outboxDir.create(recursive: true);
// Start watching for incoming messages
_startWatching();
_isConnected = true;
print('FileSystem transport connected at $_basePath');
} catch (e) {
print('Failed to connect FileSystem transport: $e');
rethrow;
}
}
@override
Future<void> sendMessage(RpcMessage message) async {
if (!_isConnected) {
throw StateError('Transport not connected');
}
try {
final serialized = _serializeMessage(message);
final fileName = '${message.id}_${DateTime.now().millisecondsSinceEpoch}.msg';
final filePath = path.join(_outboxDir.path, fileName);
final file = File(filePath);
await file.writeAsString(serialized);
// Move to target inbox (atomic operation)
final targetPath = path.join(_basePath, 'inbox', 'target', fileName);
await file.rename(targetPath);
} catch (e) {
print('Failed to send message: $e');
rethrow;
}
}
@override
Future<void> close() async {
if (!_isConnected) return;
try {
await _watcher.cancel();
await _messageController.close();
// Cleanup directories if configured
if (_options.cleanupOnClose) {
await _inboxDir.delete(recursive: true);
await _outboxDir.delete(recursive: true);
}
_isConnected = false;
} catch (e) {
print('Error closing FileSystem transport: $e');
}
}
void _startWatching() {
// Poll for new files (simple implementation)
_watcher = Stream.periodic(_options.pollingInterval).listen((_) async {
await _processIncomingFiles();
});
}
Future<void> _processIncomingFiles() async {
try {
final files = _inboxDir.listSync()
.whereType<File>()
.where((f) => f.path.endsWith('.msg'))
.toList();
for (final file in files) {
try {
final content = await file.readAsString();
final message = _deserializeMessage(content);
_messageController.add(message);
// Archive or delete processed file
if (_options.archiveProcessed) {
final archivePath = path.join(_basePath, 'archive', path.basename(file.path));
await file.rename(archivePath);
} else {
await file.delete();
}
} catch (e) {
print('Failed to process file ${file.path}: $e');
// Move to error directory
final errorPath = path.join(_basePath, 'error', path.basename(file.path));
await file.rename(errorPath);
}
}
} catch (e) {
print('Error processing incoming files: $e');
}
}
String _serializeMessage(RpcMessage message) {
return jsonEncode({
'id': message.id,
'contract_name': message.contractName,
'method_name': message.methodName,
'type': message.type.toString(),
'headers': message.headers,
'payload': message.payload,
'context': message.context?.toJson(),
'timestamp': DateTime.now().toIso8601String(),
'sender': _instanceId,
});
}
RpcMessage _deserializeMessage(String rawMessage) {
final data = jsonDecode(rawMessage) as Map<String, dynamic>;
return RpcMessage(
id: data['id'] as String,
contractName: data['contract_name'] as String,
methodName: data['method_name'] as String,
type: RpcMessageType.values.firstWhere(
(t) => t.toString() == data['type'],
),
headers: Map<String, String>.from(data['headers'] ?? {}),
payload: data['payload'],
context: data['context'] != null
? RpcContext.fromJson(data['context'])
: null,
);
}
static String _generateInstanceId() {
return 'fs_${DateTime.now().millisecondsSinceEpoch}_${Platform.operatingSystem}';
}
}
class FileSystemTransportOptions {
final Duration pollingInterval;
final bool cleanupOnClose;
final bool archiveProcessed;
final int maxFileAge;
const FileSystemTransportOptions({
this.pollingInterval = const Duration(seconds: 1),
this.cleanupOnClose = false,
this.archiveProcessed = true,
this.maxFileAge = 3600, // 1 hour in seconds
});
}

Add middleware for cross-cutting concerns:

abstract class TransportMiddleware {
Future<RpcMessage> onMessageSend(RpcMessage message, TransportNext next);
Future<RpcMessage> onMessageReceive(RpcMessage message, TransportNext next);
}
class LoggingMiddleware implements TransportMiddleware {
@override
Future<RpcMessage> onMessageSend(RpcMessage message, TransportNext next) async {
print('Sending: ${message.contractName}.${message.methodName}');
final stopwatch = Stopwatch()..start();
try {
final result = await next(message);
stopwatch.stop();
print('Sent in ${stopwatch.elapsedMilliseconds}ms');
return result;
} catch (e) {
print('Send failed: $e');
rethrow;
}
}
@override
Future<RpcMessage> onMessageReceive(RpcMessage message, TransportNext next) async {
print('Received: ${message.contractName}.${message.methodName}');
return await next(message);
}
}
class CompressionMiddleware implements TransportMiddleware {
@override
Future<RpcMessage> onMessageSend(RpcMessage message, TransportNext next) async {
// Compress large payloads
if (_shouldCompress(message.payload)) {
final compressed = await _compressPayload(message.payload);
final compressedMessage = message.copyWith(
payload: compressed,
headers: {...message.headers, 'compressed': 'gzip'},
);
return await next(compressedMessage);
}
return await next(message);
}
@override
Future<RpcMessage> onMessageReceive(RpcMessage message, TransportNext next) async {
// Decompress if needed
if (message.headers['compressed'] == 'gzip') {
final decompressed = await _decompressPayload(message.payload);
final decompressedMessage = message.copyWith(payload: decompressed);
return await next(decompressedMessage);
}
return await next(message);
}
}

Create a factory for managing different transport types:

class TransportFactory {
static final Map<String, TransportBuilder> _builders = {};
static void registerTransport<T extends RpcTransport>(
String name,
TransportBuilder<T> builder,
) {
_builders[name] = builder;
}
static RpcTransport create(String name, Map<String, dynamic> config) {
final builder = _builders[name];
if (builder == null) {
throw ArgumentError('Unknown transport type: $name');
}
return builder(config);
}
static void registerDefaultTransports() {
registerTransport('redis', (config) => RpcRedisTransport(
redis: config['redis'],
requestChannel: config['request_channel'],
responseChannel: config['response_channel'],
));
registerTransport('rabbitmq', (config) => RpcRabbitMQTransport(
connectionString: config['connection_string'],
requestQueue: config['request_queue'],
responseQueue: config['response_queue'],
));
registerTransport('filesystem', (config) => RpcFileSystemTransport(
basePath: config['base_path'],
instanceId: config['instance_id'],
));
}
}
typedef TransportBuilder<T extends RpcTransport> = T Function(Map<String, dynamic> config);
// Usage
void main() {
TransportFactory.registerDefaultTransports();
final transport = TransportFactory.create('redis', {
'redis': redisConnection,
'request_channel': 'rpc_requests',
'response_channel': 'rpc_responses',
});
}

Create comprehensive tests for your custom transport:

transport_test_suite.dart
import 'package:test/test.dart';
import 'package:rpc_dart/rpc_dart.dart';
abstract class TransportTestSuite {
RpcTransport createClientTransport();
RpcTransport createServerTransport();
void runTests() {
group('Transport Tests', () {
late RpcTransport clientTransport;
late RpcTransport serverTransport;
setUp(() async {
clientTransport = createClientTransport();
serverTransport = createServerTransport();
await clientTransport.connect();
await serverTransport.connect();
});
tearDown(() async {
await clientTransport.close();
await serverTransport.close();
});
test('should connect successfully', () {
expect(clientTransport.isConnected, isTrue);
expect(serverTransport.isConnected, isTrue);
});
test('should send and receive messages', () async {
final completer = Completer<RpcMessage>();
serverTransport.messageStream.listen((message) {
completer.complete(message);
});
final testMessage = RpcMessage(
id: 'test-123',
contractName: 'TestContract',
methodName: 'testMethod',
type: RpcMessageType.request,
payload: {'test': 'data'},
);
await clientTransport.sendMessage(testMessage);
final receivedMessage = await completer.future.timeout(Duration(seconds: 5));
expect(receivedMessage.id, equals(testMessage.id));
expect(receivedMessage.contractName, equals(testMessage.contractName));
expect(receivedMessage.methodName, equals(testMessage.methodName));
});
test('should handle large messages', () async {
final largePayload = List.generate(10000, (i) => 'data_$i');
final completer = Completer<RpcMessage>();
serverTransport.messageStream.listen((message) {
completer.complete(message);
});
final largeMessage = RpcMessage(
id: 'large-test',
contractName: 'TestContract',
methodName: 'testLarge',
type: RpcMessageType.request,
payload: largePayload,
);
await clientTransport.sendMessage(largeMessage);
final received = await completer.future.timeout(Duration(seconds: 10));
expect(received.payload, equals(largePayload));
});
test('should handle connection failures gracefully', () async {
await clientTransport.close();
expect(() => clientTransport.sendMessage(RpcMessage(
id: 'fail-test',
contractName: 'Test',
methodName: 'test',
type: RpcMessageType.request,
)), throwsStateError);
});
});
}
}
// Example usage for Redis transport
class RedisTransportTest extends TransportTestSuite {
@override
RpcTransport createClientTransport() {
return RpcRedisTransport(
redis: RedisConnection(),
requestChannel: 'test_requests',
responseChannel: 'test_responses_client',
);
}
@override
RpcTransport createServerTransport() {
return RpcRedisTransport(
redis: RedisConnection(),
requestChannel: 'test_requests',
responseChannel: 'test_responses_server',
);
}
}
void main() {
RedisTransportTest().runTests();
}

Implement robust error handling:

class RobustCustomTransport implements RpcTransport {
@override
Future<void> sendMessage(RpcMessage message) async {
int retryCount = 0;
const maxRetries = 3;
while (retryCount < maxRetries) {
try {
await _doSendMessage(message);
return;
} on TransportException catch (e) {
retryCount++;
if (retryCount >= maxRetries) rethrow;
await Future.delayed(Duration(seconds: retryCount));
print('Retrying send (attempt $retryCount): ${e.message}');
}
}
}
void _handleError(dynamic error, StackTrace stackTrace) {
if (error is ConnectionException) {
_handleConnectionError(error);
} else if (error is SerializationException) {
_handleSerializationError(error);
} else {
_handleUnknownError(error, stackTrace);
}
}
}

Add performance monitoring:

class MonitoredTransport implements RpcTransport {
final RpcTransport _underlying;
final TransportMetrics _metrics = TransportMetrics();
MonitoredTransport(this._underlying);
@override
Future<void> sendMessage(RpcMessage message) async {
final stopwatch = Stopwatch()..start();
try {
await _underlying.sendMessage(message);
stopwatch.stop();
_metrics.recordSendSuccess(stopwatch.elapsed);
} catch (e) {
stopwatch.stop();
_metrics.recordSendFailure(stopwatch.elapsed, e);
rethrow;
}
}
TransportMetrics get metrics => _metrics;
}
class TransportMetrics {
int _messagesSent = 0;
int _messagesReceived = 0;
int _sendFailures = 0;
int _receiveFailures = 0;
final List<Duration> _sendLatencies = [];
final List<Duration> _receiveLatencies = [];
void recordSendSuccess(Duration latency) {
_messagesSent++;
_sendLatencies.add(latency);
_trimLatencies();
}
void recordSendFailure(Duration latency, dynamic error) {
_sendFailures++;
_sendLatencies.add(latency);
_trimLatencies();
}
double get averageSendLatency {
if (_sendLatencies.isEmpty) return 0.0;
final total = _sendLatencies.fold(Duration.zero, (a, b) => a + b);
return total.inMicroseconds / _sendLatencies.length / 1000.0; // ms
}
double get successRate {
final total = _messagesSent + _sendFailures;
return total > 0 ? _messagesSent / total : 0.0;
}
void _trimLatencies() {
// Keep only recent measurements
const maxSamples = 1000;
if (_sendLatencies.length > maxSamples) {
_sendLatencies.removeAt(0);
}
}
}

Use structured configuration:

class TransportConfig {
final String type;
final Map<String, dynamic> settings;
final Duration connectionTimeout;
final Duration messageTimeout;
final int maxRetries;
final bool enableMetrics;
const TransportConfig({
required this.type,
this.settings = const {},
this.connectionTimeout = const Duration(seconds: 30),
this.messageTimeout = const Duration(seconds: 60),
this.maxRetries = 3,
this.enableMetrics = true,
});
factory TransportConfig.fromJson(Map<String, dynamic> json) {
return TransportConfig(
type: json['type'] as String,
settings: json['settings'] as Map<String, dynamic>? ?? {},
connectionTimeout: Duration(seconds: json['connection_timeout'] ?? 30),
messageTimeout: Duration(seconds: json['message_timeout'] ?? 60),
maxRetries: json['max_retries'] ?? 3,
enableMetrics: json['enable_metrics'] ?? true,
);
}
Map<String, dynamic> toJson() => {
'type': type,
'settings': settings,
'connection_timeout': connectionTimeout.inSeconds,
'message_timeout': messageTimeout.inSeconds,
'max_retries': maxRetries,
'enable_metrics': enableMetrics,
};
}

Custom transports give you the ultimate flexibility to integrate RPC Dart with any communication system or protocol. Whether you need to work with message queues, databases, files, or exotic networking protocols, the transport interface provides all the tools you need to build robust, performant RPC communication.