Перейти к содержимому

HTTP/2 транспорт

HTTP/2 транспорт обеспечивает высокопроизводительную коммуникацию с использованием протокола HTTP/2 и совместимости с gRPC. Он разработан для распределённых систем, которым требуется эффективное мультиплексирование, возможности server push и бесшовная интеграция с существующей HTTP/2 инфраструктурой.

Совместимость с gRPC

Полная совместимость с протоколом gRPC, позволяющая сервисам RPC Dart общаться с существующими gRPC клиентами и серверами.

HTTP/2 мультиплексирование

Множественные RPC вызовы через одно соединение с эффективным мультиплексированием потоков и управлением потоком данных.

Безопасность TLS

Встроенная поддержка TLS для безопасной коммуникации с валидацией сертификатов и шифрованием.

Балансировка нагрузки

Совместимость с HTTP/2 балансировщиками нагрузки и service mesh для развёртываний корпоративного уровня.

HTTP/2 транспорт реализует спецификацию протокола gRPC:

// Сервер автоматически обрабатывает протокол gRPC
class CalculatorService extends RpcResponderContract {
CalculatorService() : super('calculator.Calculator');
@override
void setup() {
addUnaryMethod<AddRequest, AddResponse>(
methodName: 'Add',
handler: _add,
);
addServerStreamMethod<RangeRequest, NumberResponse>(
methodName: 'GenerateNumbers',
handler: _generateNumbers,
);
}
Future<AddResponse> _add(AddRequest request, {RpcContext? context}) async {
return AddResponse(result: request.a + request.b);
}
Stream<NumberResponse> _generateNumbers(RangeRequest request, {RpcContext? context}) async* {
for (int i = request.start; i <= request.end; i++) {
yield NumberResponse(value: i);
await Future.delayed(Duration(milliseconds: 100));
}
}
}

Множественные одновременные запросы через одно соединение:

// Множественные одновременные вызовы через одно соединение
final transport = RpcHttp2Transport(
baseUrl: 'https://api.example.com',
options: Http2TransportOptions(
maxConcurrentStreams: 100,
connectionTimeout: Duration(seconds: 30),
keepAliveInterval: Duration(seconds: 60),
),
);
final caller = RpcCallerEndpoint(transport: transport);
final calculator = CalculatorCaller(caller);
// Все эти вызовы мультиплексируются через одно соединение
final futures = List.generate(10, (i) async {
return await calculator.add(AddRequest(a: i, b: i + 1));
});
final results = await Future.wait(futures);
// Все 10 вызовов эффективно выполнены через одно HTTP/2 соединение

Настройка HTTP/2 сервера с совместимостью gRPC:

http2_server.dart
import 'dart:io';
import 'package:rpc_dart/rpc_dart.dart';
import 'package:rpc_dart_transports/rpc_dart_transports.dart';
class Http2RpcServer {
late HttpServer _server;
late RpcResponderEndpoint _endpoint;
Future<void> start({
String host = 'localhost',
int port = 8443,
SecurityContext? securityContext,
}) async {
// Создаём HTTP/2 сервер с TLS
_server = await HttpServer.bindSecure(
host,
port,
securityContext ?? _createSecurityContext(),
);
// Включаем HTTP/2
_server.autoCompress = true;
print('HTTP/2 gRPC сервер слушает на https://$host:$port');
// Создаём транспорт и эндпоинт
final transport = RpcHttp2ServerTransport(_server);
_endpoint = RpcResponderEndpoint(transport: transport);
// Регистрируем сервисы
_endpoint.registerServiceContract(CalculatorService());
_endpoint.registerServiceContract(FileService());
_endpoint.registerServiceContract(ChatService());
// Начинаем обработку запросов
await _endpoint.start();
}
SecurityContext _createSecurityContext() {
final context = SecurityContext();
// Загружаем сертификат сервера и приватный ключ
context.useCertificateChain('server.crt');
context.usePrivateKey('server.key');
// Опционально: настраиваем валидацию клиентских сертификатов
context.setTrustedCertificates('ca.crt');
context.setClientAuthorities('ca.crt');
return context;
}
Future<void> stop() async {
await _endpoint.close();
await _server.close();
}
}

Подключение к HTTP/2 gRPC серверу:

import 'package:rpc_dart/rpc_dart.dart';
import 'package:rpc_dart_transports/rpc_dart_transports.dart';
void main() async {
// Создаём HTTP/2 транспорт
final transport = RpcHttp2Transport(
baseUrl: 'https://localhost:8443',
options: Http2TransportOptions(
// Настройки соединения
connectionTimeout: Duration(seconds: 30),
requestTimeout: Duration(seconds: 60),
keepAliveInterval: Duration(seconds: 60),
// HTTP/2 специфичные настройки
maxConcurrentStreams: 100,
windowSize: 65535,
maxFrameSize: 16384,
// Настройки безопасности
validateCertificate: false, // Только для разработки
clientCertificate: 'client.crt',
clientPrivateKey: 'client.key',
// Заголовки
defaultHeaders: {
'User-Agent': 'RpcDartClient/1.0',
'Accept-Encoding': 'gzip, deflate',
},
),
);
// Создаём caller эндпоинт
final caller = RpcCallerEndpoint(transport: transport);
final calculator = CalculatorCaller(caller);
try {
// Делаем RPC вызовы
final addResult = await calculator.add(AddRequest(a: 10, b: 5));
print('10 + 5 = ${addResult.result}');
final multiplyResult = await calculator.multiply(MultiplyRequest(a: 4, b: 7));
print('4 * 7 = ${multiplyResult.result}');
// Используем потоковые методы
print('Генерируем числа 1-5:');
await for (final number in calculator.generateNumbers(RangeRequest(start: 1, end: 5, intervalMs: 200))) {
print('Сгенерировано: ${number.value} в ${number.timestamp}');
}
} finally {
await caller.close();
}
}

Настройка HTTP/2 транспорта для продакшена:

final transport = RpcHttp2Transport(
baseUrl: 'https://production-api.example.com',
options: Http2TransportOptions(
// Пулинг соединений
maxConnectionsPerHost: 5,
connectionTimeout: Duration(seconds: 30),
keepAliveInterval: Duration(minutes: 2),
// Настройки запросов
requestTimeout: Duration(minutes: 5),
maxRetries: 3,
retryDelay: Duration(seconds: 1),
// HTTP/2 управление потоком
maxConcurrentStreams: 200,
windowSize: 1048576, // 1МБ
maxFrameSize: 32768, // 32КБ
// Сжатие
enableCompression: true,
compressionLevel: 6,
// Безопасность
validateCertificate: true,
pinnedCertificates: ['sha256:...'], // Закрепление сертификатов
// Аутентификация
defaultHeaders: {
'Authorization': 'Bearer $accessToken',
'X-API-Key': apiKey,
},
// Мониторинг
onRequestStart: (request) => print('Начало запроса: ${request.method}'),
onRequestComplete: (request, response) => print('Запрос завершён: ${response.statusCode}'),
onError: (error) => print('Ошибка транспорта: $error'),
),
);

Идеально для связи сервис-сервис:

class UserService {
late RpcCallerEndpoint _caller;
late PaymentServiceCaller _paymentService;
late NotificationServiceCaller _notificationService;
Future<void> initialize() async {
final transport = RpcHttp2Transport(
baseUrl: 'https://internal-api.company.com',
options: Http2TransportOptions(
maxConcurrentStreams: 50,
defaultHeaders: {
'X-Service-Name': 'user-service',
'Authorization': 'Bearer $serviceToken',
},
),
);
_caller = RpcCallerEndpoint(transport: transport);
_paymentService = PaymentServiceCaller(_caller);
_notificationService = NotificationServiceCaller(_caller);
}
Future<User> createUser(CreateUserRequest request) async {
// Создаём аккаунт пользователя
final user = await _createUserAccount(request);
// Настраиваем платёжный аккаунт (одновременный вызов)
final paymentFuture = _paymentService.createAccount(
CreatePaymentAccountRequest(userId: user.id),
);
// Отправляем приветственное уведомление (одновременный вызов)
final notificationFuture = _notificationService.sendWelcomeEmail(
WelcomeEmailRequest(
userId: user.id,
email: user.email,
name: user.name,
),
);
// Ждём завершения обеих операций
await Future.wait([paymentFuture, notificationFuture]);
return user;
}
}

Интеграция с существующей HTTP/2 инфраструктурой:

class ApiGateway {
final Map<String, RpcCallerEndpoint> _serviceClients = {};
Future<void> initializeServices() async {
// Сервис пользователей
final userTransport = RpcHttp2Transport(
baseUrl: 'https://user-service.internal',
options: Http2TransportOptions(
maxConcurrentStreams: 100,
defaultHeaders: {'X-Gateway': 'api-gateway-v1'},
),
);
_serviceClients['user'] = RpcCallerEndpoint(transport: userTransport);
// Сервис заказов
final orderTransport = RpcHttp2Transport(
baseUrl: 'https://order-service.internal',
options: Http2TransportOptions(
maxConcurrentStreams: 100,
defaultHeaders: {'X-Gateway': 'api-gateway-v1'},
),
);
_serviceClients['order'] = RpcCallerEndpoint(transport: orderTransport);
// Сервис товаров
final productTransport = RpcHttp2Transport(
baseUrl: 'https://product-service.internal',
options: Http2TransportOptions(
maxConcurrentStreams: 100,
defaultHeaders: {'X-Gateway': 'api-gateway-v1'},
),
);
_serviceClients['product'] = RpcCallerEndpoint(transport: productTransport);
}
Future<OrderDetails> getOrderDetails(String orderId) async {
final orderCaller = OrderServiceCaller(_serviceClients['order']!);
final userCaller = UserServiceCaller(_serviceClients['user']!);
final productCaller = ProductServiceCaller(_serviceClients['product']!);
// Получаем информацию о заказе
final order = await orderCaller.getOrder(GetOrderRequest(orderId: orderId));
// Получаем информацию о пользователе и товарах одновременно
final userFuture = userCaller.getUser(GetUserRequest(userId: order.userId));
final productsFuture = Future.wait(
order.items.map((item) =>
productCaller.getProduct(GetProductRequest(productId: item.productId))
),
);
final user = await userFuture;
final products = await productsFuture;
return OrderDetails(
order: order,
user: user,
products: products,
);
}
}

Эффективная обработка потоковых данных:

class DataProcessor {
late RpcCallerEndpoint _caller;
late ProcessingServiceCaller _processingService;
Future<void> processDataStream() async {
final transport = RpcHttp2Transport(
baseUrl: 'https://processing-cluster.company.com',
options: Http2TransportOptions(
maxConcurrentStreams: 200,
windowSize: 2097152, // 2МБ для больших потоков данных
),
);
_caller = RpcCallerEndpoint(transport: transport);
_processingService = ProcessingServiceCaller(_caller);
// Создаём двунаправленный поток
final inputController = StreamController<ProcessRequest>();
final responseStream = _processingService.processStream(inputController.stream);
// Начинаем обработку ответов
responseStream.listen((response) {
if (response.success) {
print('Обработано ${response.id}: ${response.result}');
} else {
print('Ошибка обработки ${response.id}: ${response.error}');
}
});
// Отправляем данные для обработки
for (int i = 0; i < 1000; i++) {
inputController.add(ProcessRequest(
id: i.toString(),
data: generateDataPoint(i),
));
// Контролируем поток, чтобы не перегружать сервер
if (i % 10 == 0) {
await Future.delayed(Duration(milliseconds: 100));
}
}
await inputController.close();
}
}

HTTP/2 транспорт RPC Dart может вызывать существующие gRPC сервисы:

// Вызов существующего gRPC сервиса
class GrpcInterop {
Future<void> callExistingGrpcService() async {
final transport = RpcHttp2Transport(
baseUrl: 'https://existing-grpc-service.com',
options: Http2TransportOptions(
// gRPC специфичные заголовки
defaultHeaders: {
'content-type': 'application/grpc+proto',
'grpc-encoding': 'gzip',
},
),
);
final caller = RpcCallerEndpoint(transport: transport);
// Вызываем существующий gRPC сервис используя контракты RPC Dart
final grpcCalculator = ExistingGrpcCalculatorCaller(caller);
final result = await grpcCalculator.add(GrpcAddRequest(a: 10, b: 5));
print('Результат gRPC сервиса: ${result.sum}');
}
}

Сервисы RPC Dart автоматически совместимы с gRPC клиентами:

Окно терминала
# Любой gRPC клиент может вызывать сервисы RPC Dart
grpcurl -plaintext -d '{"a": 10, "b": 5}' \
localhost:8443 calculator.Calculator/Add
class LoadBalancedClient {
final List<String> _endpoints = [
'https://service1.company.com',
'https://service2.company.com',
'https://service3.company.com',
];
final List<RpcCallerEndpoint> _clients = [];
int _currentIndex = 0;
Future<void> initialize() async {
for (final endpoint in _endpoints) {
final transport = RpcHttp2Transport(
baseUrl: endpoint,
options: Http2TransportOptions(
maxConcurrentStreams: 50,
healthCheckInterval: Duration(seconds: 30),
),
);
_clients.add(RpcCallerEndpoint(transport: transport));
}
}
RpcCallerEndpoint getHealthyClient() {
// Простой round-robin (в продакшене используйте проверки здоровья)
final client = _clients[_currentIndex];
_currentIndex = (_currentIndex + 1) % _clients.length;
return client;
}
Future<T> callWithLoadBalancing<T>(
Future<T> Function(RpcCallerEndpoint) call,
) async {
RpcException? lastError;
// Пробуем каждый эндпоинт пока один не сработает
for (int i = 0; i < _clients.length; i++) {
try {
final client = getHealthyClient();
return await call(client);
} on RpcException catch (e) {
lastError = e;
print('Эндпоинт не работает, пробуем следующий: ${e.message}');
}
}
throw lastError ?? RpcException(
code: 'ALL_ENDPOINTS_FAILED',
message: 'Все эндпоинты недоступны',
);
}
}
class Http2ConnectionPool {
final Map<String, RpcHttp2Transport> _connections = {};
final int _maxConnectionsPerHost;
Http2ConnectionPool({int maxConnectionsPerHost = 5})
: _maxConnectionsPerHost = maxConnectionsPerHost;
RpcHttp2Transport getConnection(String baseUrl) {
return _connections.putIfAbsent(baseUrl, () {
return RpcHttp2Transport(
baseUrl: baseUrl,
options: Http2TransportOptions(
maxConcurrentStreams: 100,
keepAliveInterval: Duration(minutes: 2),
connectionTimeout: Duration(seconds: 30),
// Оптимизируем для высокой пропускной способности
windowSize: 2097152, // 2МБ
maxFrameSize: 32768, // 32КБ
enableCompression: true,
),
);
});
}
Future<void> closeAll() async {
await Future.wait(_connections.values.map((t) => t.close()));
_connections.clear();
}
}
class OptimizedHttp2Client {
late RpcCallerEndpoint _caller;
Future<void> optimizedBatchCalls() async {
final transport = RpcHttp2Transport(
baseUrl: 'https://api.example.com',
options: Http2TransportOptions(
// Оптимизируем для пакетных операций
maxConcurrentStreams: 200,
windowSize: 4194304, // 4МБ
enableCompression: true,
// Конвейеризация запросов
enablePipelining: true,
maxPipelinedRequests: 10,
),
);
_caller = RpcCallerEndpoint(transport: transport);
final service = ServiceCaller(_caller);
// Эффективная обработка пакетных вызовов
final batchSize = 50;
final futures = <Future>[];
for (int i = 0; i < 1000; i += batchSize) {
final batchFutures = List.generate(
math.min(batchSize, 1000 - i),
(index) => service.process(ProcessRequest(id: i + index)),
);
futures.addAll(batchFutures);
// Обрабатываем пакетами, чтобы не перегружать сервер
if (futures.length >= 200) {
await Future.wait(futures);
futures.clear();
}
}
// Обрабатываем оставшиеся futures
if (futures.isNotEmpty) {
await Future.wait(futures);
}
}
}
final transport = RpcHttp2Transport(
baseUrl: 'https://api.example.com',
options: Http2TransportOptions(
// Таймаут соединения
connectionTimeout: Duration(seconds: 30),
// Таймаут запроса (варьируется по типу операции)
requestTimeout: Duration(minutes: 5),
// Keep-alive для поддержания соединений
keepAliveInterval: Duration(minutes: 2),
// Таймауты для потоков
streamTimeout: Duration(minutes: 30),
),
);
class ResilientHttp2Client {
Future<T> callWithRetry<T>(Future<T> Function() call) async {
int attempts = 0;
const maxAttempts = 3;
while (attempts < maxAttempts) {
try {
return await call();
} on RpcException catch (e) {
attempts++;
if (e.code == 'UNAVAILABLE' && attempts < maxAttempts) {
// Экспоненциальная задержка
await Future.delayed(Duration(seconds: math.pow(2, attempts).toInt()));
continue;
}
rethrow;
}
}
throw RpcException(
code: 'MAX_RETRIES_EXCEEDED',
message: 'Неудача после $maxAttempts попыток',
);
}
}
class Http2PerformanceMonitor {
void setupMonitoring(RpcHttp2Transport transport) {
transport.metrics.listen((metrics) {
print('Активные потоки: ${metrics.activeStreams}');
print('Всего запросов: ${metrics.totalRequests}');
print('Средняя задержка: ${metrics.averageLatency.inMilliseconds}мс');
print('Процент ошибок: ${metrics.errorRate}%');
});
}
}

HTTP/2 транспорт - идеальный выбор для распределённых систем, архитектур микросервисов и приложений, требующих высокопроизводительной коммуникации с существующей gRPC инфраструктурой. Его возможности мультиплексирования и функции HTTP/2 делают его совершенным для масштабируемых корпоративных приложений.