Гибкость протоколов
Реализуйте любой протокол связи - от очередей сообщений до пользовательских сетевых протоколов и экзотических транспортных механизмов.
Функция Пользовательского транспорта позволяет создавать собственные реализации транспортов для интеграции RPC Dart с любыми протоколами связи, системами обмена сообщениями или фреймворками. Это дает полную гибкость для адаптации RPC Dart под ваши специфические требования инфраструктуры.
Гибкость протоколов
Реализуйте любой протокол связи - от очередей сообщений до пользовательских сетевых протоколов и экзотических транспортных механизмов.
Интеграция с инфраструктурой
Легко интегрируйтесь с существующей инфраструктурой как Redis, RabbitMQ, Apache Kafka или проприетарными системами.
Оптимизация производительности
Оптимизируйте под ваш конкретный случай использования с пользовательской сериализацией, сжатием и стратегиями управления соединениями.
Полный контроль
Полный контроль над маршрутизацией сообщений, обработкой ошибок, безопасностью и характеристиками производительности.
Все пользовательские транспорты должны реализовывать интерфейс RpcTransport
:
abstract class RpcTransport { /// Отправить сообщение через транспорт Future<void> sendMessage(RpcMessage message);
/// Получить сообщения от транспорта Stream<RpcMessage> get messageStream;
/// Инициализировать и подключить транспорт Future<void> connect();
/// Закрыть транспорт и очистить ресурсы Future<void> close();
/// Проверить, подключен ли транспорт bool get isConnected;
/// Специфичная для транспорта конфигурация Map<String, dynamic> get configuration;}
RPC сообщения следуют стандартизированной структуре:
class RpcMessage { final String id; // Уникальный идентификатор сообщения final String contractName; // Название сервисного контракта final String methodName; // Вызываемый метод final RpcMessageType type; // Тип сообщения (запрос, ответ, ошибка и т.д.) final Map<String, String> headers; // Заголовки транспорта final dynamic payload; // Полезная нагрузка сообщения final RpcContext? context; // RPC контекст
RpcMessage({ required this.id, required this.contractName, required this.methodName, required this.type, this.headers = const {}, this.payload, this.context, });}
enum RpcMessageType { request, response, streamStart, streamData, streamEnd, error, cancel,}
Транспорт использующий Redis pub/sub для связи:
import 'dart:async';import 'dart:convert';import 'package:redis/redis.dart';import 'package:rpc_dart/rpc_dart.dart';
class RpcRedisTransport implements RpcTransport { final RedisConnection _redis; final String _requestChannel; final String _responseChannel; final StreamController<RpcMessage> _messageController = StreamController.broadcast();
late Command _publisher; late PubSub _subscriber; bool _isConnected = false;
RpcRedisTransport({ required RedisConnection redis, required String requestChannel, required String responseChannel, }) : _redis = redis, _requestChannel = requestChannel, _responseChannel = responseChannel;
@override Stream<RpcMessage> get messageStream => _messageController.stream;
@override bool get isConnected => _isConnected;
@override Map<String, dynamic> get configuration => { 'transport_type': 'redis', 'request_channel': _requestChannel, 'response_channel': _responseChannel, };
@override Future<void> connect() async { if (_isConnected) return;
try { // Настройка издателя для отправки сообщений _publisher = await _redis.connect();
// Настройка подписчика для получения сообщений _subscriber = await _redis.subscribe(); await _subscriber.subscribe([_responseChannel]);
// Прослушивание входящих сообщений _subscriber.getStream().listen((message) { if (message.isNotEmpty) { _handleIncomingMessage(message.last); } });
_isConnected = true; print('Redis транспорт подключен');
} catch (e) { print('Не удалось подключить Redis транспорт: $e'); rethrow; } }
@override Future<void> sendMessage(RpcMessage message) async { if (!_isConnected) { throw StateError('Транспорт не подключен'); }
try { final serialized = _serializeMessage(message); await _publisher.send_object([ 'PUBLISH', _requestChannel, serialized, ]);
} catch (e) { print('Не удалось отправить сообщение: $e'); rethrow; } }
@override Future<void> close() async { if (!_isConnected) return;
try { await _subscriber.unsubscribe([_responseChannel]); await _publisher.get_connection().close(); await _messageController.close(); _isConnected = false;
} catch (e) { print('Ошибка закрытия Redis транспорта: $e'); } }
void _handleIncomingMessage(String rawMessage) { try { final message = _deserializeMessage(rawMessage); _messageController.add(message);
} catch (e) { print('Не удалось обработать входящее сообщение: $e'); } }
String _serializeMessage(RpcMessage message) { return jsonEncode({ 'id': message.id, 'contract_name': message.contractName, 'method_name': message.methodName, 'type': message.type.toString(), 'headers': message.headers, 'payload': message.payload, 'context': message.context?.toJson(), 'timestamp': DateTime.now().toIso8601String(), }); }
RpcMessage _deserializeMessage(String rawMessage) { final data = jsonDecode(rawMessage) as Map<String, dynamic>;
return RpcMessage( id: data['id'] as String, contractName: data['contract_name'] as String, methodName: data['method_name'] as String, type: RpcMessageType.values.firstWhere( (t) => t.toString() == data['type'], ), headers: Map<String, String>.from(data['headers'] ?? {}), payload: data['payload'], context: data['context'] != null ? RpcContext.fromJson(data['context']) : null, ); }}
void main() async { // Настройка Redis соединения final redis = RedisConnection();
// Создание транспорта для клиента final clientTransport = RpcRedisTransport( redis: redis, requestChannel: 'rpc_requests', responseChannel: 'rpc_responses_client', );
// Создание транспорта для сервера final serverTransport = RpcRedisTransport( redis: redis, requestChannel: 'rpc_requests', responseChannel: 'rpc_responses_server', );
// Настройка сервера final responder = RpcResponderEndpoint(transport: serverTransport); responder.registerServiceContract(CalculatorResponder()); await responder.start();
// Настройка клиента final caller = RpcCallerEndpoint(transport: clientTransport); final calculator = CalculatorCaller(caller);
try { // Выполнение RPC вызова через Redis final result = await calculator.add(AddRequest(a: 10, b: 5)); print('Результат: ${result.sum}');
} finally { await caller.close(); await responder.close(); }}
Транспорт использующий RabbitMQ для надежной доставки сообщений:
import 'dart:async';import 'dart:convert';import 'package:dart_amqp/dart_amqp.dart';import 'package:rpc_dart/rpc_dart.dart';
class RpcRabbitMQTransport implements RpcTransport { final String _connectionString; final String _requestQueue; final String _responseQueue; final RabbitMQTransportOptions _options;
late Client _client; late Channel _channel; late Queue _requestQueueHandler; late Queue _responseQueueHandler;
final StreamController<RpcMessage> _messageController = StreamController.broadcast(); bool _isConnected = false;
RpcRabbitMQTransport({ required String connectionString, required String requestQueue, required String responseQueue, RabbitMQTransportOptions? options, }) : _connectionString = connectionString, _requestQueue = requestQueue, _responseQueue = responseQueue, _options = options ?? RabbitMQTransportOptions();
@override Stream<RpcMessage> get messageStream => _messageController.stream;
@override bool get isConnected => _isConnected;
@override Map<String, dynamic> get configuration => { 'transport_type': 'rabbitmq', 'request_queue': _requestQueue, 'response_queue': _responseQueue, 'durable': _options.durable, 'auto_delete': _options.autoDelete, };
@override Future<void> connect() async { if (_isConnected) return;
try { // Подключение к RabbitMQ _client = Client(settings: ConnectionSettings.fromUri(_connectionString)); _channel = await _client.channel();
// Объявление очередей _requestQueueHandler = await _channel.queue( _requestQueue, durable: _options.durable, autoDelete: _options.autoDelete, );
_responseQueueHandler = await _channel.queue( _responseQueue, durable: _options.durable, autoDelete: _options.autoDelete, );
// Настройка потребителя для ответов final consumer = await _responseQueueHandler.consume(); consumer.listen((AmqpMessage message) { _handleIncomingMessage(String.fromCharCodes(message.payload!)); message.ack(); });
_isConnected = true; print('RabbitMQ транспорт подключен');
} catch (e) { print('Не удалось подключить RabbitMQ транспорт: $e'); rethrow; } }
@override Future<void> sendMessage(RpcMessage message) async { if (!_isConnected) { throw StateError('Транспорт не подключен'); }
try { final serialized = _serializeMessage(message); final messageProperties = MessageProperties();
// Установка свойств сообщения для надежности messageProperties.deliveryMode = _options.persistent ? 2 : 1; messageProperties.timestamp = DateTime.now(); messageProperties.messageId = message.id; messageProperties.correlationId = message.context?.correlationId;
await _requestQueueHandler.publish( serialized, properties: messageProperties, );
} catch (e) { print('Не удалось отправить сообщение: $e'); rethrow; } }
@override Future<void> close() async { if (!_isConnected) return;
try { await _channel.close(); await _client.close(); await _messageController.close(); _isConnected = false;
} catch (e) { print('Ошибка закрытия RabbitMQ транспорта: $e'); } }
void _handleIncomingMessage(String rawMessage) { try { final message = _deserializeMessage(rawMessage); _messageController.add(message);
} catch (e) { print('Не удалось обработать входящее сообщение: $e'); } }
String _serializeMessage(RpcMessage message) { return jsonEncode({ 'id': message.id, 'contract_name': message.contractName, 'method_name': message.methodName, 'type': message.type.toString(), 'headers': message.headers, 'payload': message.payload, 'context': message.context?.toJson(), 'timestamp': DateTime.now().toIso8601String(), }); }
RpcMessage _deserializeMessage(String rawMessage) { final data = jsonDecode(rawMessage) as Map<String, dynamic>;
return RpcMessage( id: data['id'] as String, contractName: data['contract_name'] as String, methodName: data['method_name'] as String, type: RpcMessageType.values.firstWhere( (t) => t.toString() == data['type'], ), headers: Map<String, String>.from(data['headers'] ?? {}), payload: data['payload'], context: data['context'] != null ? RpcContext.fromJson(data['context']) : null, ); }}
class RabbitMQTransportOptions { final bool durable; final bool autoDelete; final bool persistent; final int prefetchCount;
const RabbitMQTransportOptions({ this.durable = true, this.autoDelete = false, this.persistent = true, this.prefetchCount = 10, });}
class RabbitMQConfig { static const String defaultConnectionString = 'amqp://guest:guest@localhost:5672/';
static RpcRabbitMQTransport createReliableTransport({ required String requestQueue, required String responseQueue, String? connectionString, }) { return RpcRabbitMQTransport( connectionString: connectionString ?? defaultConnectionString, requestQueue: requestQueue, responseQueue: responseQueue, options: RabbitMQTransportOptions( durable: true, // Переживание перезапусков сервера autoDelete: false, // Не удалять очереди автоматически persistent: true, // Сохранение сообщений на диск prefetchCount: 50, // Обрабатывать до 50 сообщений одновременно ), ); }
static RpcRabbitMQTransport createFastTransport({ required String requestQueue, required String responseQueue, String? connectionString, }) { return RpcRabbitMQTransport( connectionString: connectionString ?? defaultConnectionString, requestQueue: requestQueue, responseQueue: responseQueue, options: RabbitMQTransportOptions( durable: false, // Быстрее, но менее надежно autoDelete: true, // Автоматическая очистка persistent: false, // Только в памяти prefetchCount: 100, // Высокая пропускная способность ), ); }}
Транспорт использующий файловую систему для асинхронной связи:
import 'dart:async';import 'dart:convert';import 'dart:io';import 'package:path/path.dart' as path;import 'package:rpc_dart/rpc_dart.dart';
class RpcFileSystemTransport implements RpcTransport { final String _basePath; final String _instanceId; final FileSystemTransportOptions _options;
late Directory _inboxDir; late Directory _outboxDir; late StreamSubscription _watcher;
final StreamController<RpcMessage> _messageController = StreamController.broadcast(); bool _isConnected = false;
RpcFileSystemTransport({ required String basePath, String? instanceId, FileSystemTransportOptions? options, }) : _basePath = basePath, _instanceId = instanceId ?? _generateInstanceId(), _options = options ?? FileSystemTransportOptions();
@override Stream<RpcMessage> get messageStream => _messageController.stream;
@override bool get isConnected => _isConnected;
@override Map<String, dynamic> get configuration => { 'transport_type': 'filesystem', 'base_path': _basePath, 'instance_id': _instanceId, 'polling_interval': _options.pollingInterval.inMilliseconds, };
@override Future<void> connect() async { if (_isConnected) return;
try { // Создание директорий _inboxDir = Directory(path.join(_basePath, 'inbox', _instanceId)); _outboxDir = Directory(path.join(_basePath, 'outbox', _instanceId));
await _inboxDir.create(recursive: true); await _outboxDir.create(recursive: true);
// Начало наблюдения за входящими сообщениями _startWatching();
_isConnected = true; print('Файловый транспорт подключен в $_basePath');
} catch (e) { print('Не удалось подключить файловый транспорт: $e'); rethrow; } }
@override Future<void> sendMessage(RpcMessage message) async { if (!_isConnected) { throw StateError('Транспорт не подключен'); }
try { final serialized = _serializeMessage(message); final fileName = '${message.id}_${DateTime.now().millisecondsSinceEpoch}.msg'; final filePath = path.join(_outboxDir.path, fileName);
final file = File(filePath); await file.writeAsString(serialized);
// Перемещение в целевой inbox (атомарная операция) final targetPath = path.join(_basePath, 'inbox', 'target', fileName); await file.rename(targetPath);
} catch (e) { print('Не удалось отправить сообщение: $e'); rethrow; } }
@override Future<void> close() async { if (!_isConnected) return;
try { await _watcher.cancel(); await _messageController.close();
// Очистка директорий при настройке if (_options.cleanupOnClose) { await _inboxDir.delete(recursive: true); await _outboxDir.delete(recursive: true); }
_isConnected = false;
} catch (e) { print('Ошибка закрытия файлового транспорта: $e'); } }
void _startWatching() { // Опрос новых файлов (простая реализация) _watcher = Stream.periodic(_options.pollingInterval).listen((_) async { await _processIncomingFiles(); }); }
Future<void> _processIncomingFiles() async { try { final files = _inboxDir.listSync() .whereType<File>() .where((f) => f.path.endsWith('.msg')) .toList();
for (final file in files) { try { final content = await file.readAsString(); final message = _deserializeMessage(content); _messageController.add(message);
// Архивирование или удаление обработанного файла if (_options.archiveProcessed) { final archivePath = path.join(_basePath, 'archive', path.basename(file.path)); await file.rename(archivePath); } else { await file.delete(); }
} catch (e) { print('Не удалось обработать файл ${file.path}: $e'); // Перемещение в директорию ошибок final errorPath = path.join(_basePath, 'error', path.basename(file.path)); await file.rename(errorPath); } }
} catch (e) { print('Ошибка обработки входящих файлов: $e'); } }
String _serializeMessage(RpcMessage message) { return jsonEncode({ 'id': message.id, 'contract_name': message.contractName, 'method_name': message.methodName, 'type': message.type.toString(), 'headers': message.headers, 'payload': message.payload, 'context': message.context?.toJson(), 'timestamp': DateTime.now().toIso8601String(), 'sender': _instanceId, }); }
RpcMessage _deserializeMessage(String rawMessage) { final data = jsonDecode(rawMessage) as Map<String, dynamic>;
return RpcMessage( id: data['id'] as String, contractName: data['contract_name'] as String, methodName: data['method_name'] as String, type: RpcMessageType.values.firstWhere( (t) => t.toString() == data['type'], ), headers: Map<String, String>.from(data['headers'] ?? {}), payload: data['payload'], context: data['context'] != null ? RpcContext.fromJson(data['context']) : null, ); }
static String _generateInstanceId() { return 'fs_${DateTime.now().millisecondsSinceEpoch}_${Platform.operatingSystem}'; }}
class FileSystemTransportOptions { final Duration pollingInterval; final bool cleanupOnClose; final bool archiveProcessed; final int maxFileAge;
const FileSystemTransportOptions({ this.pollingInterval = const Duration(seconds: 1), this.cleanupOnClose = false, this.archiveProcessed = true, this.maxFileAge = 3600, // 1 час в секундах });}
Добавление middleware для сквозных вопросов:
abstract class TransportMiddleware { Future<RpcMessage> onMessageSend(RpcMessage message, TransportNext next); Future<RpcMessage> onMessageReceive(RpcMessage message, TransportNext next);}
class LoggingMiddleware implements TransportMiddleware { @override Future<RpcMessage> onMessageSend(RpcMessage message, TransportNext next) async { print('Отправляем: ${message.contractName}.${message.methodName}'); final stopwatch = Stopwatch()..start();
try { final result = await next(message); stopwatch.stop(); print('Отправлено за ${stopwatch.elapsedMilliseconds}мс'); return result; } catch (e) { print('Ошибка отправки: $e'); rethrow; } }
@override Future<RpcMessage> onMessageReceive(RpcMessage message, TransportNext next) async { print('Получено: ${message.contractName}.${message.methodName}'); return await next(message); }}
class CompressionMiddleware implements TransportMiddleware { @override Future<RpcMessage> onMessageSend(RpcMessage message, TransportNext next) async { // Сжатие больших полезных нагрузок if (_shouldCompress(message.payload)) { final compressed = await _compressPayload(message.payload); final compressedMessage = message.copyWith( payload: compressed, headers: {...message.headers, 'compressed': 'gzip'}, ); return await next(compressedMessage); } return await next(message); }
@override Future<RpcMessage> onMessageReceive(RpcMessage message, TransportNext next) async { // Распаковка при необходимости if (message.headers['compressed'] == 'gzip') { final decompressed = await _decompressPayload(message.payload); final decompressedMessage = message.copyWith(payload: decompressed); return await next(decompressedMessage); } return await next(message); }}
Создание фабрики для управления различными типами транспортов:
class TransportFactory { static final Map<String, TransportBuilder> _builders = {};
static void registerTransport<T extends RpcTransport>( String name, TransportBuilder<T> builder, ) { _builders[name] = builder; }
static RpcTransport create(String name, Map<String, dynamic> config) { final builder = _builders[name]; if (builder == null) { throw ArgumentError('Неизвестный тип транспорта: $name'); } return builder(config); }
static void registerDefaultTransports() { registerTransport('redis', (config) => RpcRedisTransport( redis: config['redis'], requestChannel: config['request_channel'], responseChannel: config['response_channel'], ));
registerTransport('rabbitmq', (config) => RpcRabbitMQTransport( connectionString: config['connection_string'], requestQueue: config['request_queue'], responseQueue: config['response_queue'], ));
registerTransport('filesystem', (config) => RpcFileSystemTransport( basePath: config['base_path'], instanceId: config['instance_id'], )); }}
typedef TransportBuilder<T extends RpcTransport> = T Function(Map<String, dynamic> config);
// Использованиеvoid main() { TransportFactory.registerDefaultTransports();
final transport = TransportFactory.create('redis', { 'redis': redisConnection, 'request_channel': 'rpc_requests', 'response_channel': 'rpc_responses', });}
Создание комплексных тестов для вашего пользовательского транспорта:
import 'package:test/test.dart';import 'package:rpc_dart/rpc_dart.dart';
abstract class TransportTestSuite { RpcTransport createClientTransport(); RpcTransport createServerTransport();
void runTests() { group('Тесты транспорта', () { late RpcTransport clientTransport; late RpcTransport serverTransport;
setUp(() async { clientTransport = createClientTransport(); serverTransport = createServerTransport(); await clientTransport.connect(); await serverTransport.connect(); });
tearDown(() async { await clientTransport.close(); await serverTransport.close(); });
test('должен успешно подключиться', () { expect(clientTransport.isConnected, isTrue); expect(serverTransport.isConnected, isTrue); });
test('должен отправлять и получать сообщения', () async { final completer = Completer<RpcMessage>();
serverTransport.messageStream.listen((message) { completer.complete(message); });
final testMessage = RpcMessage( id: 'test-123', contractName: 'TestContract', methodName: 'testMethod', type: RpcMessageType.request, payload: {'test': 'data'}, );
await clientTransport.sendMessage(testMessage);
final receivedMessage = await completer.future.timeout(Duration(seconds: 5));
expect(receivedMessage.id, equals(testMessage.id)); expect(receivedMessage.contractName, equals(testMessage.contractName)); expect(receivedMessage.methodName, equals(testMessage.methodName)); });
test('должен обрабатывать большие сообщения', () async { final largePayload = List.generate(10000, (i) => 'data_$i');
final completer = Completer<RpcMessage>(); serverTransport.messageStream.listen((message) { completer.complete(message); });
final largeMessage = RpcMessage( id: 'large-test', contractName: 'TestContract', methodName: 'testLarge', type: RpcMessageType.request, payload: largePayload, );
await clientTransport.sendMessage(largeMessage); final received = await completer.future.timeout(Duration(seconds: 10));
expect(received.payload, equals(largePayload)); });
test('должен корректно обрабатывать сбои соединения', () async { await clientTransport.close();
expect(() => clientTransport.sendMessage(RpcMessage( id: 'fail-test', contractName: 'Test', methodName: 'test', type: RpcMessageType.request, )), throwsStateError); }); }); }}
// Пример использования для Redis транспортаclass RedisTransportTest extends TransportTestSuite { @override RpcTransport createClientTransport() { return RpcRedisTransport( redis: RedisConnection(), requestChannel: 'test_requests', responseChannel: 'test_responses_client', ); }
@override RpcTransport createServerTransport() { return RpcRedisTransport( redis: RedisConnection(), requestChannel: 'test_requests', responseChannel: 'test_responses_server', ); }}
void main() { RedisTransportTest().runTests();}
Реализация надежной обработки ошибок:
class RobustCustomTransport implements RpcTransport { @override Future<void> sendMessage(RpcMessage message) async { int retryCount = 0; const maxRetries = 3;
while (retryCount < maxRetries) { try { await _doSendMessage(message); return; } on TransportException catch (e) { retryCount++; if (retryCount >= maxRetries) rethrow;
await Future.delayed(Duration(seconds: retryCount)); print('Повторная попытка отправки (попытка $retryCount): ${e.message}'); } } }
void _handleError(dynamic error, StackTrace stackTrace) { if (error is ConnectionException) { _handleConnectionError(error); } else if (error is SerializationException) { _handleSerializationError(error); } else { _handleUnknownError(error, stackTrace); } }}
Добавление мониторинга производительности:
class MonitoredTransport implements RpcTransport { final RpcTransport _underlying; final TransportMetrics _metrics = TransportMetrics();
MonitoredTransport(this._underlying);
@override Future<void> sendMessage(RpcMessage message) async { final stopwatch = Stopwatch()..start();
try { await _underlying.sendMessage(message); stopwatch.stop();
_metrics.recordSendSuccess(stopwatch.elapsed); } catch (e) { stopwatch.stop(); _metrics.recordSendFailure(stopwatch.elapsed, e); rethrow; } }
TransportMetrics get metrics => _metrics;}
class TransportMetrics { int _messagesSent = 0; int _messagesReceived = 0; int _sendFailures = 0; int _receiveFailures = 0;
final List<Duration> _sendLatencies = []; final List<Duration> _receiveLatencies = [];
void recordSendSuccess(Duration latency) { _messagesSent++; _sendLatencies.add(latency); _trimLatencies(); }
void recordSendFailure(Duration latency, dynamic error) { _sendFailures++; _sendLatencies.add(latency); _trimLatencies(); }
double get averageSendLatency { if (_sendLatencies.isEmpty) return 0.0; final total = _sendLatencies.fold(Duration.zero, (a, b) => a + b); return total.inMicroseconds / _sendLatencies.length / 1000.0; // мс }
double get successRate { final total = _messagesSent + _sendFailures; return total > 0 ? _messagesSent / total : 0.0; }
void _trimLatencies() { // Сохранение только недавних измерений const maxSamples = 1000; if (_sendLatencies.length > maxSamples) { _sendLatencies.removeAt(0); } }}
Использование структурированной конфигурации:
class TransportConfig { final String type; final Map<String, dynamic> settings; final Duration connectionTimeout; final Duration messageTimeout; final int maxRetries; final bool enableMetrics;
const TransportConfig({ required this.type, this.settings = const {}, this.connectionTimeout = const Duration(seconds: 30), this.messageTimeout = const Duration(seconds: 60), this.maxRetries = 3, this.enableMetrics = true, });
factory TransportConfig.fromJson(Map<String, dynamic> json) { return TransportConfig( type: json['type'] as String, settings: json['settings'] as Map<String, dynamic>? ?? {}, connectionTimeout: Duration(seconds: json['connection_timeout'] ?? 30), messageTimeout: Duration(seconds: json['message_timeout'] ?? 60), maxRetries: json['max_retries'] ?? 3, enableMetrics: json['enable_metrics'] ?? true, ); }
Map<String, dynamic> toJson() => { 'type': type, 'settings': settings, 'connection_timeout': connectionTimeout.inSeconds, 'message_timeout': messageTimeout.inSeconds, 'max_retries': maxRetries, 'enable_metrics': enableMetrics, };}
Пользовательские транспорты дают вам максимальную гибкость для интеграции RPC Dart с любой системой связи или протоколом. Независимо от того, нужно ли вам работать с очередями сообщений, базами данных, файлами или экзотическими сетевыми протоколами, интерфейс транспорта предоставляет все инструменты, необходимые для создания надежной, высокопроизводительной RPC связи.