Совместимость с gRPC
Полная совместимость с протоколом gRPC, позволяющая сервисам RPC Dart общаться с существующими gRPC клиентами и серверами.
HTTP/2 транспорт обеспечивает высокопроизводительную коммуникацию с использованием протокола HTTP/2 и совместимости с gRPC. Он разработан для распределённых систем, которым требуется эффективное мультиплексирование, возможности server push и бесшовная интеграция с существующей HTTP/2 инфраструктурой.
Совместимость с gRPC
Полная совместимость с протоколом gRPC, позволяющая сервисам RPC Dart общаться с существующими gRPC клиентами и серверами.
HTTP/2 мультиплексирование
Множественные RPC вызовы через одно соединение с эффективным мультиплексированием потоков и управлением потоком данных.
Безопасность TLS
Встроенная поддержка TLS для безопасной коммуникации с валидацией сертификатов и шифрованием.
Балансировка нагрузки
Совместимость с HTTP/2 балансировщиками нагрузки и service mesh для развёртываний корпоративного уровня.
HTTP/2 транспорт реализует спецификацию протокола gRPC:
// Сервер автоматически обрабатывает протокол gRPCclass CalculatorService extends RpcResponderContract { CalculatorService() : super('calculator.Calculator');
@override void setup() { addUnaryMethod<AddRequest, AddResponse>( methodName: 'Add', handler: _add, );
addServerStreamMethod<RangeRequest, NumberResponse>( methodName: 'GenerateNumbers', handler: _generateNumbers, ); }
Future<AddResponse> _add(AddRequest request, {RpcContext? context}) async { return AddResponse(result: request.a + request.b); }
Stream<NumberResponse> _generateNumbers(RangeRequest request, {RpcContext? context}) async* { for (int i = request.start; i <= request.end; i++) { yield NumberResponse(value: i); await Future.delayed(Duration(milliseconds: 100)); } }}
Множественные одновременные запросы через одно соединение:
// Множественные одновременные вызовы через одно соединениеfinal transport = RpcHttp2Transport( baseUrl: 'https://api.example.com', options: Http2TransportOptions( maxConcurrentStreams: 100, connectionTimeout: Duration(seconds: 30), keepAliveInterval: Duration(seconds: 60), ),);
final caller = RpcCallerEndpoint(transport: transport);final calculator = CalculatorCaller(caller);
// Все эти вызовы мультиплексируются через одно соединениеfinal futures = List.generate(10, (i) async { return await calculator.add(AddRequest(a: i, b: i + 1));});
final results = await Future.wait(futures);// Все 10 вызовов эффективно выполнены через одно HTTP/2 соединение
Настройка HTTP/2 сервера с совместимостью gRPC:
import 'dart:io';import 'package:rpc_dart/rpc_dart.dart';import 'package:rpc_dart_transports/rpc_dart_transports.dart';
class Http2RpcServer { late HttpServer _server; late RpcResponderEndpoint _endpoint;
Future<void> start({ String host = 'localhost', int port = 8443, SecurityContext? securityContext, }) async { // Создаём HTTP/2 сервер с TLS _server = await HttpServer.bindSecure( host, port, securityContext ?? _createSecurityContext(), );
// Включаем HTTP/2 _server.autoCompress = true;
print('HTTP/2 gRPC сервер слушает на https://$host:$port');
// Создаём транспорт и эндпоинт final transport = RpcHttp2ServerTransport(_server); _endpoint = RpcResponderEndpoint(transport: transport);
// Регистрируем сервисы _endpoint.registerServiceContract(CalculatorService()); _endpoint.registerServiceContract(FileService()); _endpoint.registerServiceContract(ChatService());
// Начинаем обработку запросов await _endpoint.start(); }
SecurityContext _createSecurityContext() { final context = SecurityContext();
// Загружаем сертификат сервера и приватный ключ context.useCertificateChain('server.crt'); context.usePrivateKey('server.key');
// Опционально: настраиваем валидацию клиентских сертификатов context.setTrustedCertificates('ca.crt'); context.setClientAuthorities('ca.crt');
return context; }
Future<void> stop() async { await _endpoint.close(); await _server.close(); }}
class CalculatorService extends RpcResponderContract { CalculatorService() : super('calculator.Calculator');
@override void setup() { addUnaryMethod<AddRequest, AddResponse>( methodName: 'Add', handler: _add, );
addUnaryMethod<MultiplyRequest, MultiplyResponse>( methodName: 'Multiply', handler: _multiply, );
addServerStreamMethod<RangeRequest, NumberResponse>( methodName: 'GenerateNumbers', handler: _generateNumbers, );
addClientStreamMethod<NumberRequest, SumResponse>( methodName: 'SumNumbers', handler: _sumNumbers, );
addBidirectionalStreamMethod<ProcessRequest, ProcessResponse>( methodName: 'ProcessStream', handler: _processStream, ); }
Future<AddResponse> _add(AddRequest request, {RpcContext? context}) async { // Имитируем время обработки await Future.delayed(Duration(milliseconds: 10));
return AddResponse( result: request.a + request.b, timestamp: DateTime.now().toIso8601String(), ); }
Future<MultiplyResponse> _multiply(MultiplyRequest request, {RpcContext? context}) async { return MultiplyResponse( result: request.a * request.b, timestamp: DateTime.now().toIso8601String(), ); }
Stream<NumberResponse> _generateNumbers(RangeRequest request, {RpcContext? context}) async* { for (int i = request.start; i <= request.end; i++) { yield NumberResponse( value: i, timestamp: DateTime.now().toIso8601String(), );
// Имитируем генерацию в реальном времени await Future.delayed(Duration(milliseconds: request.intervalMs ?? 100)); } }
Future<SumResponse> _sumNumbers(Stream<NumberRequest> numbers, {RpcContext? context}) async { double sum = 0; int count = 0;
await for (final number in numbers) { sum += number.value; count++; }
return SumResponse( total: sum, count: count, average: count > 0 ? sum / count : 0, ); }
Stream<ProcessResponse> _processStream( Stream<ProcessRequest> requests, {RpcContext? context} ) async* { await for (final request in requests) { try { // Имитируем обработку await Future.delayed(Duration(milliseconds: 50));
yield ProcessResponse( id: request.id, success: true, result: 'Обработано: ${request.data}', processedAt: DateTime.now().toIso8601String(), ); } catch (e) { yield ProcessResponse( id: request.id, success: false, error: e.toString(), processedAt: DateTime.now().toIso8601String(), ); } } }}
void main() async { final server = Http2RpcServer();
// Обрабатываем изящное завершение ProcessSignal.sigint.watch().listen((_) async { print('Завершение работы HTTP/2 сервера...'); await server.stop(); exit(0); });
try { await server.start( host: 'localhost', port: 8443, ); } catch (e) { print('Не удалось запустить сервер: $e'); exit(1); }}
Подключение к HTTP/2 gRPC серверу:
import 'package:rpc_dart/rpc_dart.dart';import 'package:rpc_dart_transports/rpc_dart_transports.dart';
void main() async { // Создаём HTTP/2 транспорт final transport = RpcHttp2Transport( baseUrl: 'https://localhost:8443', options: Http2TransportOptions( // Настройки соединения connectionTimeout: Duration(seconds: 30), requestTimeout: Duration(seconds: 60), keepAliveInterval: Duration(seconds: 60),
// HTTP/2 специфичные настройки maxConcurrentStreams: 100, windowSize: 65535, maxFrameSize: 16384,
// Настройки безопасности validateCertificate: false, // Только для разработки clientCertificate: 'client.crt', clientPrivateKey: 'client.key',
// Заголовки defaultHeaders: { 'User-Agent': 'RpcDartClient/1.0', 'Accept-Encoding': 'gzip, deflate', }, ), );
// Создаём caller эндпоинт final caller = RpcCallerEndpoint(transport: transport); final calculator = CalculatorCaller(caller);
try { // Делаем RPC вызовы final addResult = await calculator.add(AddRequest(a: 10, b: 5)); print('10 + 5 = ${addResult.result}');
final multiplyResult = await calculator.multiply(MultiplyRequest(a: 4, b: 7)); print('4 * 7 = ${multiplyResult.result}');
// Используем потоковые методы print('Генерируем числа 1-5:'); await for (final number in calculator.generateNumbers(RangeRequest(start: 1, end: 5, intervalMs: 200))) { print('Сгенерировано: ${number.value} в ${number.timestamp}'); }
} finally { await caller.close(); }}
Настройка HTTP/2 транспорта для продакшена:
final transport = RpcHttp2Transport( baseUrl: 'https://production-api.example.com', options: Http2TransportOptions( // Пулинг соединений maxConnectionsPerHost: 5, connectionTimeout: Duration(seconds: 30), keepAliveInterval: Duration(minutes: 2),
// Настройки запросов requestTimeout: Duration(minutes: 5), maxRetries: 3, retryDelay: Duration(seconds: 1),
// HTTP/2 управление потоком maxConcurrentStreams: 200, windowSize: 1048576, // 1МБ maxFrameSize: 32768, // 32КБ
// Сжатие enableCompression: true, compressionLevel: 6,
// Безопасность validateCertificate: true, pinnedCertificates: ['sha256:...'], // Закрепление сертификатов
// Аутентификация defaultHeaders: { 'Authorization': 'Bearer $accessToken', 'X-API-Key': apiKey, },
// Мониторинг onRequestStart: (request) => print('Начало запроса: ${request.method}'), onRequestComplete: (request, response) => print('Запрос завершён: ${response.statusCode}'), onError: (error) => print('Ошибка транспорта: $error'), ),);
Идеально для связи сервис-сервис:
class UserService { late RpcCallerEndpoint _caller; late PaymentServiceCaller _paymentService; late NotificationServiceCaller _notificationService;
Future<void> initialize() async { final transport = RpcHttp2Transport( baseUrl: 'https://internal-api.company.com', options: Http2TransportOptions( maxConcurrentStreams: 50, defaultHeaders: { 'X-Service-Name': 'user-service', 'Authorization': 'Bearer $serviceToken', }, ), );
_caller = RpcCallerEndpoint(transport: transport); _paymentService = PaymentServiceCaller(_caller); _notificationService = NotificationServiceCaller(_caller); }
Future<User> createUser(CreateUserRequest request) async { // Создаём аккаунт пользователя final user = await _createUserAccount(request);
// Настраиваем платёжный аккаунт (одновременный вызов) final paymentFuture = _paymentService.createAccount( CreatePaymentAccountRequest(userId: user.id), );
// Отправляем приветственное уведомление (одновременный вызов) final notificationFuture = _notificationService.sendWelcomeEmail( WelcomeEmailRequest( userId: user.id, email: user.email, name: user.name, ), );
// Ждём завершения обеих операций await Future.wait([paymentFuture, notificationFuture]);
return user; }}
Интеграция с существующей HTTP/2 инфраструктурой:
class ApiGateway { final Map<String, RpcCallerEndpoint> _serviceClients = {};
Future<void> initializeServices() async { // Сервис пользователей final userTransport = RpcHttp2Transport( baseUrl: 'https://user-service.internal', options: Http2TransportOptions( maxConcurrentStreams: 100, defaultHeaders: {'X-Gateway': 'api-gateway-v1'}, ), ); _serviceClients['user'] = RpcCallerEndpoint(transport: userTransport);
// Сервис заказов final orderTransport = RpcHttp2Transport( baseUrl: 'https://order-service.internal', options: Http2TransportOptions( maxConcurrentStreams: 100, defaultHeaders: {'X-Gateway': 'api-gateway-v1'}, ), ); _serviceClients['order'] = RpcCallerEndpoint(transport: orderTransport);
// Сервис товаров final productTransport = RpcHttp2Transport( baseUrl: 'https://product-service.internal', options: Http2TransportOptions( maxConcurrentStreams: 100, defaultHeaders: {'X-Gateway': 'api-gateway-v1'}, ), ); _serviceClients['product'] = RpcCallerEndpoint(transport: productTransport); }
Future<OrderDetails> getOrderDetails(String orderId) async { final orderCaller = OrderServiceCaller(_serviceClients['order']!); final userCaller = UserServiceCaller(_serviceClients['user']!); final productCaller = ProductServiceCaller(_serviceClients['product']!);
// Получаем информацию о заказе final order = await orderCaller.getOrder(GetOrderRequest(orderId: orderId));
// Получаем информацию о пользователе и товарах одновременно final userFuture = userCaller.getUser(GetUserRequest(userId: order.userId)); final productsFuture = Future.wait( order.items.map((item) => productCaller.getProduct(GetProductRequest(productId: item.productId)) ), );
final user = await userFuture; final products = await productsFuture;
return OrderDetails( order: order, user: user, products: products, ); }}
Эффективная обработка потоковых данных:
class DataProcessor { late RpcCallerEndpoint _caller; late ProcessingServiceCaller _processingService;
Future<void> processDataStream() async { final transport = RpcHttp2Transport( baseUrl: 'https://processing-cluster.company.com', options: Http2TransportOptions( maxConcurrentStreams: 200, windowSize: 2097152, // 2МБ для больших потоков данных ), );
_caller = RpcCallerEndpoint(transport: transport); _processingService = ProcessingServiceCaller(_caller);
// Создаём двунаправленный поток final inputController = StreamController<ProcessRequest>(); final responseStream = _processingService.processStream(inputController.stream);
// Начинаем обработку ответов responseStream.listen((response) { if (response.success) { print('Обработано ${response.id}: ${response.result}'); } else { print('Ошибка обработки ${response.id}: ${response.error}'); } });
// Отправляем данные для обработки for (int i = 0; i < 1000; i++) { inputController.add(ProcessRequest( id: i.toString(), data: generateDataPoint(i), ));
// Контролируем поток, чтобы не перегружать сервер if (i % 10 == 0) { await Future.delayed(Duration(milliseconds: 100)); } }
await inputController.close(); }}
HTTP/2 транспорт RPC Dart может вызывать существующие gRPC сервисы:
// Вызов существующего gRPC сервисаclass GrpcInterop { Future<void> callExistingGrpcService() async { final transport = RpcHttp2Transport( baseUrl: 'https://existing-grpc-service.com', options: Http2TransportOptions( // gRPC специфичные заголовки defaultHeaders: { 'content-type': 'application/grpc+proto', 'grpc-encoding': 'gzip', }, ), );
final caller = RpcCallerEndpoint(transport: transport);
// Вызываем существующий gRPC сервис используя контракты RPC Dart final grpcCalculator = ExistingGrpcCalculatorCaller(caller); final result = await grpcCalculator.add(GrpcAddRequest(a: 10, b: 5));
print('Результат gRPC сервиса: ${result.sum}'); }}
Сервисы RPC Dart автоматически совместимы с gRPC клиентами:
# Любой gRPC клиент может вызывать сервисы RPC Dartgrpcurl -plaintext -d '{"a": 10, "b": 5}' \ localhost:8443 calculator.Calculator/Add
class LoadBalancedClient { final List<String> _endpoints = [ 'https://service1.company.com', 'https://service2.company.com', 'https://service3.company.com', ];
final List<RpcCallerEndpoint> _clients = []; int _currentIndex = 0;
Future<void> initialize() async { for (final endpoint in _endpoints) { final transport = RpcHttp2Transport( baseUrl: endpoint, options: Http2TransportOptions( maxConcurrentStreams: 50, healthCheckInterval: Duration(seconds: 30), ), );
_clients.add(RpcCallerEndpoint(transport: transport)); } }
RpcCallerEndpoint getHealthyClient() { // Простой round-robin (в продакшене используйте проверки здоровья) final client = _clients[_currentIndex]; _currentIndex = (_currentIndex + 1) % _clients.length; return client; }
Future<T> callWithLoadBalancing<T>( Future<T> Function(RpcCallerEndpoint) call, ) async { RpcException? lastError;
// Пробуем каждый эндпоинт пока один не сработает for (int i = 0; i < _clients.length; i++) { try { final client = getHealthyClient(); return await call(client); } on RpcException catch (e) { lastError = e; print('Эндпоинт не работает, пробуем следующий: ${e.message}'); } }
throw lastError ?? RpcException( code: 'ALL_ENDPOINTS_FAILED', message: 'Все эндпоинты недоступны', ); }}
class Http2ConnectionPool { final Map<String, RpcHttp2Transport> _connections = {}; final int _maxConnectionsPerHost;
Http2ConnectionPool({int maxConnectionsPerHost = 5}) : _maxConnectionsPerHost = maxConnectionsPerHost;
RpcHttp2Transport getConnection(String baseUrl) { return _connections.putIfAbsent(baseUrl, () { return RpcHttp2Transport( baseUrl: baseUrl, options: Http2TransportOptions( maxConcurrentStreams: 100, keepAliveInterval: Duration(minutes: 2), connectionTimeout: Duration(seconds: 30),
// Оптимизируем для высокой пропускной способности windowSize: 2097152, // 2МБ maxFrameSize: 32768, // 32КБ enableCompression: true, ), ); }); }
Future<void> closeAll() async { await Future.wait(_connections.values.map((t) => t.close())); _connections.clear(); }}
class OptimizedHttp2Client { late RpcCallerEndpoint _caller;
Future<void> optimizedBatchCalls() async { final transport = RpcHttp2Transport( baseUrl: 'https://api.example.com', options: Http2TransportOptions( // Оптимизируем для пакетных операций maxConcurrentStreams: 200, windowSize: 4194304, // 4МБ enableCompression: true,
// Конвейеризация запросов enablePipelining: true, maxPipelinedRequests: 10, ), );
_caller = RpcCallerEndpoint(transport: transport); final service = ServiceCaller(_caller);
// Эффективная обработка пакетных вызовов final batchSize = 50; final futures = <Future>[];
for (int i = 0; i < 1000; i += batchSize) { final batchFutures = List.generate( math.min(batchSize, 1000 - i), (index) => service.process(ProcessRequest(id: i + index)), );
futures.addAll(batchFutures);
// Обрабатываем пакетами, чтобы не перегружать сервер if (futures.length >= 200) { await Future.wait(futures); futures.clear(); } }
// Обрабатываем оставшиеся futures if (futures.isNotEmpty) { await Future.wait(futures); } }}
final transport = RpcHttp2Transport( baseUrl: 'https://api.example.com', options: Http2TransportOptions( // Таймаут соединения connectionTimeout: Duration(seconds: 30),
// Таймаут запроса (варьируется по типу операции) requestTimeout: Duration(minutes: 5),
// Keep-alive для поддержания соединений keepAliveInterval: Duration(minutes: 2),
// Таймауты для потоков streamTimeout: Duration(minutes: 30), ),);
class ResilientHttp2Client { Future<T> callWithRetry<T>(Future<T> Function() call) async { int attempts = 0; const maxAttempts = 3;
while (attempts < maxAttempts) { try { return await call(); } on RpcException catch (e) { attempts++;
if (e.code == 'UNAVAILABLE' && attempts < maxAttempts) { // Экспоненциальная задержка await Future.delayed(Duration(seconds: math.pow(2, attempts).toInt())); continue; }
rethrow; } }
throw RpcException( code: 'MAX_RETRIES_EXCEEDED', message: 'Неудача после $maxAttempts попыток', ); }}
class Http2PerformanceMonitor { void setupMonitoring(RpcHttp2Transport transport) { transport.metrics.listen((metrics) { print('Активные потоки: ${metrics.activeStreams}'); print('Всего запросов: ${metrics.totalRequests}'); print('Средняя задержка: ${metrics.averageLatency.inMilliseconds}мс'); print('Процент ошибок: ${metrics.errorRate}%'); }); }}
HTTP/2 транспорт - идеальный выбор для распределённых систем, архитектур микросервисов и приложений, требующих высокопроизводительной коммуникации с существующей gRPC инфраструктурой. Его возможности мультиплексирования и функции HTTP/2 делают его совершенным для масштабируемых корпоративных приложений.