Перейти к содержимому

Пользовательский транспорт

Функция Пользовательского транспорта позволяет создавать собственные реализации транспортов для интеграции RPC Dart с любыми протоколами связи, системами обмена сообщениями или фреймворками. Это дает полную гибкость для адаптации RPC Dart под ваши специфические требования инфраструктуры.

Гибкость протоколов

Реализуйте любой протокол связи - от очередей сообщений до пользовательских сетевых протоколов и экзотических транспортных механизмов.

Интеграция с инфраструктурой

Легко интегрируйтесь с существующей инфраструктурой как Redis, RabbitMQ, Apache Kafka или проприетарными системами.

Оптимизация производительности

Оптимизируйте под ваш конкретный случай использования с пользовательской сериализацией, сжатием и стратегиями управления соединениями.

Полный контроль

Полный контроль над маршрутизацией сообщений, обработкой ошибок, безопасностью и характеристиками производительности.

Все пользовательские транспорты должны реализовывать интерфейс RpcTransport:

abstract class RpcTransport {
/// Отправить сообщение через транспорт
Future<void> sendMessage(RpcMessage message);
/// Получить сообщения от транспорта
Stream<RpcMessage> get messageStream;
/// Инициализировать и подключить транспорт
Future<void> connect();
/// Закрыть транспорт и очистить ресурсы
Future<void> close();
/// Проверить, подключен ли транспорт
bool get isConnected;
/// Специфичная для транспорта конфигурация
Map<String, dynamic> get configuration;
}

RPC сообщения следуют стандартизированной структуре:

class RpcMessage {
final String id; // Уникальный идентификатор сообщения
final String contractName; // Название сервисного контракта
final String methodName; // Вызываемый метод
final RpcMessageType type; // Тип сообщения (запрос, ответ, ошибка и т.д.)
final Map<String, String> headers; // Заголовки транспорта
final dynamic payload; // Полезная нагрузка сообщения
final RpcContext? context; // RPC контекст
RpcMessage({
required this.id,
required this.contractName,
required this.methodName,
required this.type,
this.headers = const {},
this.payload,
this.context,
});
}
enum RpcMessageType {
request,
response,
streamStart,
streamData,
streamEnd,
error,
cancel,
}

Транспорт использующий Redis pub/sub для связи:

redis_transport.dart
import 'dart:async';
import 'dart:convert';
import 'package:redis/redis.dart';
import 'package:rpc_dart/rpc_dart.dart';
class RpcRedisTransport implements RpcTransport {
final RedisConnection _redis;
final String _requestChannel;
final String _responseChannel;
final StreamController<RpcMessage> _messageController = StreamController.broadcast();
late Command _publisher;
late PubSub _subscriber;
bool _isConnected = false;
RpcRedisTransport({
required RedisConnection redis,
required String requestChannel,
required String responseChannel,
}) : _redis = redis,
_requestChannel = requestChannel,
_responseChannel = responseChannel;
@override
Stream<RpcMessage> get messageStream => _messageController.stream;
@override
bool get isConnected => _isConnected;
@override
Map<String, dynamic> get configuration => {
'transport_type': 'redis',
'request_channel': _requestChannel,
'response_channel': _responseChannel,
};
@override
Future<void> connect() async {
if (_isConnected) return;
try {
// Настройка издателя для отправки сообщений
_publisher = await _redis.connect();
// Настройка подписчика для получения сообщений
_subscriber = await _redis.subscribe();
await _subscriber.subscribe([_responseChannel]);
// Прослушивание входящих сообщений
_subscriber.getStream().listen((message) {
if (message.isNotEmpty) {
_handleIncomingMessage(message.last);
}
});
_isConnected = true;
print('Redis транспорт подключен');
} catch (e) {
print('Не удалось подключить Redis транспорт: $e');
rethrow;
}
}
@override
Future<void> sendMessage(RpcMessage message) async {
if (!_isConnected) {
throw StateError('Транспорт не подключен');
}
try {
final serialized = _serializeMessage(message);
await _publisher.send_object([
'PUBLISH',
_requestChannel,
serialized,
]);
} catch (e) {
print('Не удалось отправить сообщение: $e');
rethrow;
}
}
@override
Future<void> close() async {
if (!_isConnected) return;
try {
await _subscriber.unsubscribe([_responseChannel]);
await _publisher.get_connection().close();
await _messageController.close();
_isConnected = false;
} catch (e) {
print('Ошибка закрытия Redis транспорта: $e');
}
}
void _handleIncomingMessage(String rawMessage) {
try {
final message = _deserializeMessage(rawMessage);
_messageController.add(message);
} catch (e) {
print('Не удалось обработать входящее сообщение: $e');
}
}
String _serializeMessage(RpcMessage message) {
return jsonEncode({
'id': message.id,
'contract_name': message.contractName,
'method_name': message.methodName,
'type': message.type.toString(),
'headers': message.headers,
'payload': message.payload,
'context': message.context?.toJson(),
'timestamp': DateTime.now().toIso8601String(),
});
}
RpcMessage _deserializeMessage(String rawMessage) {
final data = jsonDecode(rawMessage) as Map<String, dynamic>;
return RpcMessage(
id: data['id'] as String,
contractName: data['contract_name'] as String,
methodName: data['method_name'] as String,
type: RpcMessageType.values.firstWhere(
(t) => t.toString() == data['type'],
),
headers: Map<String, String>.from(data['headers'] ?? {}),
payload: data['payload'],
context: data['context'] != null
? RpcContext.fromJson(data['context'])
: null,
);
}
}

Транспорт использующий RabbitMQ для надежной доставки сообщений:

rabbitmq_transport.dart
import 'dart:async';
import 'dart:convert';
import 'package:dart_amqp/dart_amqp.dart';
import 'package:rpc_dart/rpc_dart.dart';
class RpcRabbitMQTransport implements RpcTransport {
final String _connectionString;
final String _requestQueue;
final String _responseQueue;
final RabbitMQTransportOptions _options;
late Client _client;
late Channel _channel;
late Queue _requestQueueHandler;
late Queue _responseQueueHandler;
final StreamController<RpcMessage> _messageController = StreamController.broadcast();
bool _isConnected = false;
RpcRabbitMQTransport({
required String connectionString,
required String requestQueue,
required String responseQueue,
RabbitMQTransportOptions? options,
}) : _connectionString = connectionString,
_requestQueue = requestQueue,
_responseQueue = responseQueue,
_options = options ?? RabbitMQTransportOptions();
@override
Stream<RpcMessage> get messageStream => _messageController.stream;
@override
bool get isConnected => _isConnected;
@override
Map<String, dynamic> get configuration => {
'transport_type': 'rabbitmq',
'request_queue': _requestQueue,
'response_queue': _responseQueue,
'durable': _options.durable,
'auto_delete': _options.autoDelete,
};
@override
Future<void> connect() async {
if (_isConnected) return;
try {
// Подключение к RabbitMQ
_client = Client(settings: ConnectionSettings.fromUri(_connectionString));
_channel = await _client.channel();
// Объявление очередей
_requestQueueHandler = await _channel.queue(
_requestQueue,
durable: _options.durable,
autoDelete: _options.autoDelete,
);
_responseQueueHandler = await _channel.queue(
_responseQueue,
durable: _options.durable,
autoDelete: _options.autoDelete,
);
// Настройка потребителя для ответов
final consumer = await _responseQueueHandler.consume();
consumer.listen((AmqpMessage message) {
_handleIncomingMessage(String.fromCharCodes(message.payload!));
message.ack();
});
_isConnected = true;
print('RabbitMQ транспорт подключен');
} catch (e) {
print('Не удалось подключить RabbitMQ транспорт: $e');
rethrow;
}
}
@override
Future<void> sendMessage(RpcMessage message) async {
if (!_isConnected) {
throw StateError('Транспорт не подключен');
}
try {
final serialized = _serializeMessage(message);
final messageProperties = MessageProperties();
// Установка свойств сообщения для надежности
messageProperties.deliveryMode = _options.persistent ? 2 : 1;
messageProperties.timestamp = DateTime.now();
messageProperties.messageId = message.id;
messageProperties.correlationId = message.context?.correlationId;
await _requestQueueHandler.publish(
serialized,
properties: messageProperties,
);
} catch (e) {
print('Не удалось отправить сообщение: $e');
rethrow;
}
}
@override
Future<void> close() async {
if (!_isConnected) return;
try {
await _channel.close();
await _client.close();
await _messageController.close();
_isConnected = false;
} catch (e) {
print('Ошибка закрытия RabbitMQ транспорта: $e');
}
}
void _handleIncomingMessage(String rawMessage) {
try {
final message = _deserializeMessage(rawMessage);
_messageController.add(message);
} catch (e) {
print('Не удалось обработать входящее сообщение: $e');
}
}
String _serializeMessage(RpcMessage message) {
return jsonEncode({
'id': message.id,
'contract_name': message.contractName,
'method_name': message.methodName,
'type': message.type.toString(),
'headers': message.headers,
'payload': message.payload,
'context': message.context?.toJson(),
'timestamp': DateTime.now().toIso8601String(),
});
}
RpcMessage _deserializeMessage(String rawMessage) {
final data = jsonDecode(rawMessage) as Map<String, dynamic>;
return RpcMessage(
id: data['id'] as String,
contractName: data['contract_name'] as String,
methodName: data['method_name'] as String,
type: RpcMessageType.values.firstWhere(
(t) => t.toString() == data['type'],
),
headers: Map<String, String>.from(data['headers'] ?? {}),
payload: data['payload'],
context: data['context'] != null
? RpcContext.fromJson(data['context'])
: null,
);
}
}
class RabbitMQTransportOptions {
final bool durable;
final bool autoDelete;
final bool persistent;
final int prefetchCount;
const RabbitMQTransportOptions({
this.durable = true,
this.autoDelete = false,
this.persistent = true,
this.prefetchCount = 10,
});
}

Транспорт использующий файловую систему для асинхронной связи:

filesystem_transport.dart
import 'dart:async';
import 'dart:convert';
import 'dart:io';
import 'package:path/path.dart' as path;
import 'package:rpc_dart/rpc_dart.dart';
class RpcFileSystemTransport implements RpcTransport {
final String _basePath;
final String _instanceId;
final FileSystemTransportOptions _options;
late Directory _inboxDir;
late Directory _outboxDir;
late StreamSubscription _watcher;
final StreamController<RpcMessage> _messageController = StreamController.broadcast();
bool _isConnected = false;
RpcFileSystemTransport({
required String basePath,
String? instanceId,
FileSystemTransportOptions? options,
}) : _basePath = basePath,
_instanceId = instanceId ?? _generateInstanceId(),
_options = options ?? FileSystemTransportOptions();
@override
Stream<RpcMessage> get messageStream => _messageController.stream;
@override
bool get isConnected => _isConnected;
@override
Map<String, dynamic> get configuration => {
'transport_type': 'filesystem',
'base_path': _basePath,
'instance_id': _instanceId,
'polling_interval': _options.pollingInterval.inMilliseconds,
};
@override
Future<void> connect() async {
if (_isConnected) return;
try {
// Создание директорий
_inboxDir = Directory(path.join(_basePath, 'inbox', _instanceId));
_outboxDir = Directory(path.join(_basePath, 'outbox', _instanceId));
await _inboxDir.create(recursive: true);
await _outboxDir.create(recursive: true);
// Начало наблюдения за входящими сообщениями
_startWatching();
_isConnected = true;
print('Файловый транспорт подключен в $_basePath');
} catch (e) {
print('Не удалось подключить файловый транспорт: $e');
rethrow;
}
}
@override
Future<void> sendMessage(RpcMessage message) async {
if (!_isConnected) {
throw StateError('Транспорт не подключен');
}
try {
final serialized = _serializeMessage(message);
final fileName = '${message.id}_${DateTime.now().millisecondsSinceEpoch}.msg';
final filePath = path.join(_outboxDir.path, fileName);
final file = File(filePath);
await file.writeAsString(serialized);
// Перемещение в целевой inbox (атомарная операция)
final targetPath = path.join(_basePath, 'inbox', 'target', fileName);
await file.rename(targetPath);
} catch (e) {
print('Не удалось отправить сообщение: $e');
rethrow;
}
}
@override
Future<void> close() async {
if (!_isConnected) return;
try {
await _watcher.cancel();
await _messageController.close();
// Очистка директорий при настройке
if (_options.cleanupOnClose) {
await _inboxDir.delete(recursive: true);
await _outboxDir.delete(recursive: true);
}
_isConnected = false;
} catch (e) {
print('Ошибка закрытия файлового транспорта: $e');
}
}
void _startWatching() {
// Опрос новых файлов (простая реализация)
_watcher = Stream.periodic(_options.pollingInterval).listen((_) async {
await _processIncomingFiles();
});
}
Future<void> _processIncomingFiles() async {
try {
final files = _inboxDir.listSync()
.whereType<File>()
.where((f) => f.path.endsWith('.msg'))
.toList();
for (final file in files) {
try {
final content = await file.readAsString();
final message = _deserializeMessage(content);
_messageController.add(message);
// Архивирование или удаление обработанного файла
if (_options.archiveProcessed) {
final archivePath = path.join(_basePath, 'archive', path.basename(file.path));
await file.rename(archivePath);
} else {
await file.delete();
}
} catch (e) {
print('Не удалось обработать файл ${file.path}: $e');
// Перемещение в директорию ошибок
final errorPath = path.join(_basePath, 'error', path.basename(file.path));
await file.rename(errorPath);
}
}
} catch (e) {
print('Ошибка обработки входящих файлов: $e');
}
}
String _serializeMessage(RpcMessage message) {
return jsonEncode({
'id': message.id,
'contract_name': message.contractName,
'method_name': message.methodName,
'type': message.type.toString(),
'headers': message.headers,
'payload': message.payload,
'context': message.context?.toJson(),
'timestamp': DateTime.now().toIso8601String(),
'sender': _instanceId,
});
}
RpcMessage _deserializeMessage(String rawMessage) {
final data = jsonDecode(rawMessage) as Map<String, dynamic>;
return RpcMessage(
id: data['id'] as String,
contractName: data['contract_name'] as String,
methodName: data['method_name'] as String,
type: RpcMessageType.values.firstWhere(
(t) => t.toString() == data['type'],
),
headers: Map<String, String>.from(data['headers'] ?? {}),
payload: data['payload'],
context: data['context'] != null
? RpcContext.fromJson(data['context'])
: null,
);
}
static String _generateInstanceId() {
return 'fs_${DateTime.now().millisecondsSinceEpoch}_${Platform.operatingSystem}';
}
}
class FileSystemTransportOptions {
final Duration pollingInterval;
final bool cleanupOnClose;
final bool archiveProcessed;
final int maxFileAge;
const FileSystemTransportOptions({
this.pollingInterval = const Duration(seconds: 1),
this.cleanupOnClose = false,
this.archiveProcessed = true,
this.maxFileAge = 3600, // 1 час в секундах
});
}

Добавление middleware для сквозных вопросов:

abstract class TransportMiddleware {
Future<RpcMessage> onMessageSend(RpcMessage message, TransportNext next);
Future<RpcMessage> onMessageReceive(RpcMessage message, TransportNext next);
}
class LoggingMiddleware implements TransportMiddleware {
@override
Future<RpcMessage> onMessageSend(RpcMessage message, TransportNext next) async {
print('Отправляем: ${message.contractName}.${message.methodName}');
final stopwatch = Stopwatch()..start();
try {
final result = await next(message);
stopwatch.stop();
print('Отправлено за ${stopwatch.elapsedMilliseconds}мс');
return result;
} catch (e) {
print('Ошибка отправки: $e');
rethrow;
}
}
@override
Future<RpcMessage> onMessageReceive(RpcMessage message, TransportNext next) async {
print('Получено: ${message.contractName}.${message.methodName}');
return await next(message);
}
}
class CompressionMiddleware implements TransportMiddleware {
@override
Future<RpcMessage> onMessageSend(RpcMessage message, TransportNext next) async {
// Сжатие больших полезных нагрузок
if (_shouldCompress(message.payload)) {
final compressed = await _compressPayload(message.payload);
final compressedMessage = message.copyWith(
payload: compressed,
headers: {...message.headers, 'compressed': 'gzip'},
);
return await next(compressedMessage);
}
return await next(message);
}
@override
Future<RpcMessage> onMessageReceive(RpcMessage message, TransportNext next) async {
// Распаковка при необходимости
if (message.headers['compressed'] == 'gzip') {
final decompressed = await _decompressPayload(message.payload);
final decompressedMessage = message.copyWith(payload: decompressed);
return await next(decompressedMessage);
}
return await next(message);
}
}

Создание фабрики для управления различными типами транспортов:

class TransportFactory {
static final Map<String, TransportBuilder> _builders = {};
static void registerTransport<T extends RpcTransport>(
String name,
TransportBuilder<T> builder,
) {
_builders[name] = builder;
}
static RpcTransport create(String name, Map<String, dynamic> config) {
final builder = _builders[name];
if (builder == null) {
throw ArgumentError('Неизвестный тип транспорта: $name');
}
return builder(config);
}
static void registerDefaultTransports() {
registerTransport('redis', (config) => RpcRedisTransport(
redis: config['redis'],
requestChannel: config['request_channel'],
responseChannel: config['response_channel'],
));
registerTransport('rabbitmq', (config) => RpcRabbitMQTransport(
connectionString: config['connection_string'],
requestQueue: config['request_queue'],
responseQueue: config['response_queue'],
));
registerTransport('filesystem', (config) => RpcFileSystemTransport(
basePath: config['base_path'],
instanceId: config['instance_id'],
));
}
}
typedef TransportBuilder<T extends RpcTransport> = T Function(Map<String, dynamic> config);
// Использование
void main() {
TransportFactory.registerDefaultTransports();
final transport = TransportFactory.create('redis', {
'redis': redisConnection,
'request_channel': 'rpc_requests',
'response_channel': 'rpc_responses',
});
}

Тестирование пользовательских транспортов

Заголовок раздела «Тестирование пользовательских транспортов»

Создание комплексных тестов для вашего пользовательского транспорта:

transport_test_suite.dart
import 'package:test/test.dart';
import 'package:rpc_dart/rpc_dart.dart';
abstract class TransportTestSuite {
RpcTransport createClientTransport();
RpcTransport createServerTransport();
void runTests() {
group('Тесты транспорта', () {
late RpcTransport clientTransport;
late RpcTransport serverTransport;
setUp(() async {
clientTransport = createClientTransport();
serverTransport = createServerTransport();
await clientTransport.connect();
await serverTransport.connect();
});
tearDown(() async {
await clientTransport.close();
await serverTransport.close();
});
test('должен успешно подключиться', () {
expect(clientTransport.isConnected, isTrue);
expect(serverTransport.isConnected, isTrue);
});
test('должен отправлять и получать сообщения', () async {
final completer = Completer<RpcMessage>();
serverTransport.messageStream.listen((message) {
completer.complete(message);
});
final testMessage = RpcMessage(
id: 'test-123',
contractName: 'TestContract',
methodName: 'testMethod',
type: RpcMessageType.request,
payload: {'test': 'data'},
);
await clientTransport.sendMessage(testMessage);
final receivedMessage = await completer.future.timeout(Duration(seconds: 5));
expect(receivedMessage.id, equals(testMessage.id));
expect(receivedMessage.contractName, equals(testMessage.contractName));
expect(receivedMessage.methodName, equals(testMessage.methodName));
});
test('должен обрабатывать большие сообщения', () async {
final largePayload = List.generate(10000, (i) => 'data_$i');
final completer = Completer<RpcMessage>();
serverTransport.messageStream.listen((message) {
completer.complete(message);
});
final largeMessage = RpcMessage(
id: 'large-test',
contractName: 'TestContract',
methodName: 'testLarge',
type: RpcMessageType.request,
payload: largePayload,
);
await clientTransport.sendMessage(largeMessage);
final received = await completer.future.timeout(Duration(seconds: 10));
expect(received.payload, equals(largePayload));
});
test('должен корректно обрабатывать сбои соединения', () async {
await clientTransport.close();
expect(() => clientTransport.sendMessage(RpcMessage(
id: 'fail-test',
contractName: 'Test',
methodName: 'test',
type: RpcMessageType.request,
)), throwsStateError);
});
});
}
}
// Пример использования для Redis транспорта
class RedisTransportTest extends TransportTestSuite {
@override
RpcTransport createClientTransport() {
return RpcRedisTransport(
redis: RedisConnection(),
requestChannel: 'test_requests',
responseChannel: 'test_responses_client',
);
}
@override
RpcTransport createServerTransport() {
return RpcRedisTransport(
redis: RedisConnection(),
requestChannel: 'test_requests',
responseChannel: 'test_responses_server',
);
}
}
void main() {
RedisTransportTest().runTests();
}

Реализация надежной обработки ошибок:

class RobustCustomTransport implements RpcTransport {
@override
Future<void> sendMessage(RpcMessage message) async {
int retryCount = 0;
const maxRetries = 3;
while (retryCount < maxRetries) {
try {
await _doSendMessage(message);
return;
} on TransportException catch (e) {
retryCount++;
if (retryCount >= maxRetries) rethrow;
await Future.delayed(Duration(seconds: retryCount));
print('Повторная попытка отправки (попытка $retryCount): ${e.message}');
}
}
}
void _handleError(dynamic error, StackTrace stackTrace) {
if (error is ConnectionException) {
_handleConnectionError(error);
} else if (error is SerializationException) {
_handleSerializationError(error);
} else {
_handleUnknownError(error, stackTrace);
}
}
}

Добавление мониторинга производительности:

class MonitoredTransport implements RpcTransport {
final RpcTransport _underlying;
final TransportMetrics _metrics = TransportMetrics();
MonitoredTransport(this._underlying);
@override
Future<void> sendMessage(RpcMessage message) async {
final stopwatch = Stopwatch()..start();
try {
await _underlying.sendMessage(message);
stopwatch.stop();
_metrics.recordSendSuccess(stopwatch.elapsed);
} catch (e) {
stopwatch.stop();
_metrics.recordSendFailure(stopwatch.elapsed, e);
rethrow;
}
}
TransportMetrics get metrics => _metrics;
}
class TransportMetrics {
int _messagesSent = 0;
int _messagesReceived = 0;
int _sendFailures = 0;
int _receiveFailures = 0;
final List<Duration> _sendLatencies = [];
final List<Duration> _receiveLatencies = [];
void recordSendSuccess(Duration latency) {
_messagesSent++;
_sendLatencies.add(latency);
_trimLatencies();
}
void recordSendFailure(Duration latency, dynamic error) {
_sendFailures++;
_sendLatencies.add(latency);
_trimLatencies();
}
double get averageSendLatency {
if (_sendLatencies.isEmpty) return 0.0;
final total = _sendLatencies.fold(Duration.zero, (a, b) => a + b);
return total.inMicroseconds / _sendLatencies.length / 1000.0; // мс
}
double get successRate {
final total = _messagesSent + _sendFailures;
return total > 0 ? _messagesSent / total : 0.0;
}
void _trimLatencies() {
// Сохранение только недавних измерений
const maxSamples = 1000;
if (_sendLatencies.length > maxSamples) {
_sendLatencies.removeAt(0);
}
}
}

Использование структурированной конфигурации:

class TransportConfig {
final String type;
final Map<String, dynamic> settings;
final Duration connectionTimeout;
final Duration messageTimeout;
final int maxRetries;
final bool enableMetrics;
const TransportConfig({
required this.type,
this.settings = const {},
this.connectionTimeout = const Duration(seconds: 30),
this.messageTimeout = const Duration(seconds: 60),
this.maxRetries = 3,
this.enableMetrics = true,
});
factory TransportConfig.fromJson(Map<String, dynamic> json) {
return TransportConfig(
type: json['type'] as String,
settings: json['settings'] as Map<String, dynamic>? ?? {},
connectionTimeout: Duration(seconds: json['connection_timeout'] ?? 30),
messageTimeout: Duration(seconds: json['message_timeout'] ?? 60),
maxRetries: json['max_retries'] ?? 3,
enableMetrics: json['enable_metrics'] ?? true,
);
}
Map<String, dynamic> toJson() => {
'type': type,
'settings': settings,
'connection_timeout': connectionTimeout.inSeconds,
'message_timeout': messageTimeout.inSeconds,
'max_retries': maxRetries,
'enable_metrics': enableMetrics,
};
}

Пользовательские транспорты дают вам максимальную гибкость для интеграции RPC Dart с любой системой связи или протоколом. Независимо от того, нужно ли вам работать с очередями сообщений, базами данных, файлами или экзотическими сетевыми протоколами, интерфейс транспорта предоставляет все инструменты, необходимые для создания надежной, высокопроизводительной RPC связи.