Перейти к содержимому

HTTP/2 транспорт

HTTP/2 транспорт использует тот же gRPC wire-формат, который ожидают классические gRPC клиенты. Соединение поддерживается постоянно, а все RPC-потоки мультиплексируются поверх одного TCP-сокета. Поэтому после подключения RpcCallerEndpoint и RpcResponderEndpoint можно использовать привычные паттерны — unary, серверные и клиентские стримы, двунаправленный обмен — без дополнительных адаптеров.

  • Совместимость с gRPC — подключайте сервисы RPC Dart к существующей gRPC инфраструктуре, инструментам и контрактам без прослойки трансляции.
  • Большое количество параллельных вызовов — выполняйте десятки запросов через одно соединение вместо управления пулом HTTP/1.1 клиентов.
  • Интеграция с сервис-мешем — HTTP/2 поддерживается Envoy, Istio и другими mesh-решениями, поэтому маршрутизация, трассировка и TLS-политики наследуются автоматически.

Создайте транспорт с помощью RpcHttp2CallerTransport.connect и передайте его в RpcCallerEndpoint:

import 'package:rpc_dart/rpc_dart.dart';
import 'package:rpc_dart_transports/rpc_dart_transports.dart';
Future<void> main() async {
final transport = await RpcHttp2CallerTransport.connect(
host: 'localhost',
port: 8765,
logger: RpcLogger('Http2Client'),
);
final endpoint = RpcCallerEndpoint(
transport: transport,
debugLabel: 'demo-http2-client',
);
final response = await endpoint.unaryRequest<RpcString, RpcString>(
serviceName: 'DemoService',
methodName: 'Echo',
requestCodec: RpcString.codec,
responseCodec: RpcString.codec,
request: RpcString('Hello over HTTP/2!'),
);
print('Ответ сервера: ${response.value}');
await transport.close();
}

Для подключения по TLS используйте RpcHttp2CallerTransport.secureConnect — он настраивает HTTPS и ALPN h2 за один вызов:

final transport = await RpcHttp2CallerTransport.secureConnect(
host: 'api.example.com',
port: 443,
logger: RpcLogger('Http2Client'),
);

Клиентские транспорты реализуют интерфейс IRpcTransport: метод transport.health() возвращает снимок состояния RpcHealthStatus, а transport.reconnect() переподключает HTTP/2 соединение через сохранённую фабрику соединений.

Самый быстрый вариант — RpcHttp2Server.createWithContracts. Он открывает TCP порт, поднимает HTTP/2 соединение и создаёт отдельный RpcResponderEndpoint для каждого клиента:

final server = RpcHttp2Server.createWithContracts(
port: 8765,
host: '0.0.0.0',
logger: RpcLogger('Http2Server'),
contracts: [
DemoServiceContract(),
],
);
await server.start();
// ... при завершении работы
await server.stop();

Конструктор RpcHttp2Server используйте, когда нужны хуки жизненного цикла. Колбэки onEndpointCreated, onConnectionOpened и onConnectionClosed позволяют подключить метрики, динамически регистрировать контракты или применять middleware во время работы.

Так выглядит контракт, который используется в примерах ниже: он регистрирует унарный метод, серверный стрим и двунаправленный стрим с явными кодеками:

class DemoServiceContract extends RpcResponderContract {
DemoServiceContract() : super('DemoService');
@override
void setup() {
addUnaryMethod<RpcString, RpcString>(
methodName: 'Echo',
handler: _echo,
requestCodec: RpcString.codec,
responseCodec: RpcString.codec,
);
addServerStreamMethod<RpcString, RpcString>(
methodName: 'GetStream',
handler: _getStream,
requestCodec: RpcString.codec,
responseCodec: RpcString.codec,
);
addBidirectionalMethod<RpcString, RpcString>(
methodName: 'Chat',
handler: _chat,
requestCodec: RpcString.codec,
responseCodec: RpcString.codec,
);
}
Future<RpcString> _echo(RpcString request, {RpcContext? context}) async {
return RpcString('echo: ${request.value}');
}
Stream<RpcString> _getStream(
RpcString request, {
RpcContext? context,
}) async* {
for (var i = 0; i < 3; i++) {
await Future.delayed(const Duration(milliseconds: 200));
yield RpcString('${request.value} #$i');
}
}
Stream<RpcString> _chat(
Stream<RpcString> requests, {
RpcContext? context,
}) async* {
await for (final message in requests) {
yield RpcString('received: ${message.value}');
}
}
}

Если слушатель уже существует (например, за reverse-proxy, в sidecar сервис-меша или через SecureServerSocket.bind), создайте RpcHttp2ResponderTransport вручную:

import 'dart:io';
import 'package:http2/http2.dart' as http2;
import 'package:rpc_dart/rpc_dart.dart';
import 'package:rpc_dart_transports/rpc_dart_transports.dart';
Future<void> serveOverTls(SecurityContext context) async {
final secureSocket = await SecureServerSocket.bind(
'0.0.0.0',
8443,
context,
supportedProtocols: const ['h2'],
);
secureSocket.listen((SecureSocket socket) {
final connection = http2.ServerTransportConnection.viaSocket(socket);
final transport = RpcHttp2ResponderTransport(
connection: connection,
logger: RpcLogger('Http2Responder'),
);
final endpoint = RpcResponderEndpoint(transport: transport);
endpoint.registerServiceContract(DemoServiceContract());
endpoint.start();
});
}

Такой endpoint также реализует IRpcTransport, поэтому middleware, перехватчики и health-check работают одинаково на всех транспортах.

HTTP/2 транспорт поддерживает все четыре варианта взаимодействия, а клиентский API совпадает с другими транспортами:

final endpoint = RpcCallerEndpoint(transport: transport);
// Серверный стрим
final updates = endpoint.serverStream<RpcString, RpcString>(
serviceName: 'DemoService',
methodName: 'GetStream',
requestCodec: RpcString.codec,
responseCodec: RpcString.codec,
request: RpcString('stream please'),
);
await for (final update in updates) {
print('update: ${update.value}');
}
// Двунаправленный стрим
final responses = endpoint.bidirectionalStream<RpcString, RpcString>(
serviceName: 'DemoService',
methodName: 'Chat',
requestCodec: RpcString.codec,
responseCodec: RpcString.codec,
requests: Stream.periodic(
const Duration(milliseconds: 250),
(i) => RpcString('message #$i'),
).take(3),
);
await for (final reply in responses) {
print('reply: ${reply.value}');
}

И клиентский, и серверный транспорт реализуют IRpcTransport, поэтому:

  • transport.health() выдаёт актуальное состояние для вашей системы мониторинга.
  • transport.reconnect() на клиенте восстанавливает соединение после обрыва. Серверный транспорт сообщает, что ручное переподключение не поддерживается, и endpoint корректно завершается.
  • transport.close() освобождает HTTP/2 потоки при остановке endpoint.

Дополните эти возможности RpcEndpoint.health() и RpcEndpointPingProtocol из rpc_dart, чтобы построить liveness/readiness проверки.

RPC Dart выставляет gRPC-заголовки и фрейминг, поэтому любой gRPC-клиент может общаться с сервисом, запущенным через RpcHttp2Server:

Окно терминала
grpcurl -plaintext -d '{"a": 10, "b": 5}' \
localhost:8765 DemoService/Add

А клиентский транспорт может ходить к существующим gRPC сервисам — достаточно, чтобы имена сервисов и методов совпадали с контрактами, сгенерированными из их protobuf-описаний.