Skip to content

HTTP/2 Transport

The HTTP/2 Transport provides high-performance communication using the HTTP/2 protocol with gRPC compatibility. It’s designed for distributed systems that need efficient multiplexing, server push capabilities, and seamless integration with existing HTTP/2 infrastructure.

gRPC Compatibility

Full compatibility with gRPC protocol, allowing RPC Dart services to communicate with existing gRPC clients and servers.

HTTP/2 Multiplexing

Multiple RPC calls over a single connection with efficient stream multiplexing and flow control.

TLS Security

Built-in TLS support for secure communication with certificate validation and encryption.

Load Balancing

Compatible with HTTP/2 load balancers and service meshes for enterprise-grade deployments.

HTTP/2 transport implements the gRPC protocol specification:

// Server automatically handles gRPC protocol
class CalculatorService extends RpcResponderContract {
CalculatorService() : super('calculator.Calculator');
@override
void setup() {
addUnaryMethod<AddRequest, AddResponse>(
methodName: 'Add',
handler: _add,
);
addServerStreamMethod<RangeRequest, NumberResponse>(
methodName: 'GenerateNumbers',
handler: _generateNumbers,
);
}
Future<AddResponse> _add(AddRequest request, {RpcContext? context}) async {
return AddResponse(result: request.a + request.b);
}
Stream<NumberResponse> _generateNumbers(RangeRequest request, {RpcContext? context}) async* {
for (int i = request.start; i <= request.end; i++) {
yield NumberResponse(value: i);
await Future.delayed(Duration(milliseconds: 100));
}
}
}

Multiple concurrent requests over a single connection:

// Multiple concurrent calls over one connection
final transport = RpcHttp2Transport(
baseUrl: 'https://api.example.com',
options: Http2TransportOptions(
maxConcurrentStreams: 100,
connectionTimeout: Duration(seconds: 30),
keepAliveInterval: Duration(seconds: 60),
),
);
final caller = RpcCallerEndpoint(transport: transport);
final calculator = CalculatorCaller(caller);
// All these calls multiplex over the same connection
final futures = List.generate(10, (i) async {
return await calculator.add(AddRequest(a: i, b: i + 1));
});
final results = await Future.wait(futures);
// All 10 calls completed efficiently over one HTTP/2 connection

Setting up an HTTP/2 server with gRPC compatibility:

http2_server.dart
import 'dart:io';
import 'package:rpc_dart/rpc_dart.dart';
import 'package:rpc_dart_transports/rpc_dart_transports.dart';
class Http2RpcServer {
late HttpServer _server;
late RpcResponderEndpoint _endpoint;
Future<void> start({
String host = 'localhost',
int port = 8443,
SecurityContext? securityContext,
}) async {
// Create HTTP/2 server with TLS
_server = await HttpServer.bindSecure(
host,
port,
securityContext ?? _createSecurityContext(),
);
// Enable HTTP/2
_server.autoCompress = true;
print('HTTP/2 gRPC Server listening on https://$host:$port');
// Create transport and endpoint
final transport = RpcHttp2ServerTransport(_server);
_endpoint = RpcResponderEndpoint(transport: transport);
// Register services
_endpoint.registerServiceContract(CalculatorService());
_endpoint.registerServiceContract(FileService());
_endpoint.registerServiceContract(ChatService());
// Start handling requests
await _endpoint.start();
}
SecurityContext _createSecurityContext() {
final context = SecurityContext();
// Load server certificate and private key
context.useCertificateChain('server.crt');
context.usePrivateKey('server.key');
// Optional: Configure client certificate validation
context.setTrustedCertificates('ca.crt');
context.setClientAuthorities('ca.crt');
return context;
}
Future<void> stop() async {
await _endpoint.close();
await _server.close();
}
}

Connecting to an HTTP/2 gRPC server:

import 'package:rpc_dart/rpc_dart.dart';
import 'package:rpc_dart_transports/rpc_dart_transports.dart';
void main() async {
// Create HTTP/2 transport
final transport = RpcHttp2Transport(
baseUrl: 'https://localhost:8443',
options: Http2TransportOptions(
// Connection settings
connectionTimeout: Duration(seconds: 30),
requestTimeout: Duration(seconds: 60),
keepAliveInterval: Duration(seconds: 60),
// HTTP/2 specific settings
maxConcurrentStreams: 100,
windowSize: 65535,
maxFrameSize: 16384,
// Security settings
validateCertificate: false, // Only for development
clientCertificate: 'client.crt',
clientPrivateKey: 'client.key',
// Headers
defaultHeaders: {
'User-Agent': 'RpcDartClient/1.0',
'Accept-Encoding': 'gzip, deflate',
},
),
);
// Create caller endpoint
final caller = RpcCallerEndpoint(transport: transport);
final calculator = CalculatorCaller(caller);
try {
// Make RPC calls
final addResult = await calculator.add(AddRequest(a: 10, b: 5));
print('10 + 5 = ${addResult.result}');
final multiplyResult = await calculator.multiply(MultiplyRequest(a: 4, b: 7));
print('4 * 7 = ${multiplyResult.result}');
// Use streaming methods
print('Generating numbers 1-5:');
await for (final number in calculator.generateNumbers(RangeRequest(start: 1, end: 5, intervalMs: 200))) {
print('Generated: ${number.value} at ${number.timestamp}');
}
} finally {
await caller.close();
}
}

Configure HTTP/2 transport for production:

final transport = RpcHttp2Transport(
baseUrl: 'https://production-api.example.com',
options: Http2TransportOptions(
// Connection pooling
maxConnectionsPerHost: 5,
connectionTimeout: Duration(seconds: 30),
keepAliveInterval: Duration(minutes: 2),
// Request settings
requestTimeout: Duration(minutes: 5),
maxRetries: 3,
retryDelay: Duration(seconds: 1),
// HTTP/2 flow control
maxConcurrentStreams: 200,
windowSize: 1048576, // 1MB
maxFrameSize: 32768, // 32KB
// Compression
enableCompression: true,
compressionLevel: 6,
// Security
validateCertificate: true,
pinnedCertificates: ['sha256:...'], // Certificate pinning
// Authentication
defaultHeaders: {
'Authorization': 'Bearer $accessToken',
'X-API-Key': apiKey,
},
// Monitoring
onRequestStart: (request) => print('Starting request: ${request.method}'),
onRequestComplete: (request, response) => print('Request completed: ${response.statusCode}'),
onError: (error) => print('Transport error: $error'),
),
);

Perfect for service-to-service communication:

class UserService {
late RpcCallerEndpoint _caller;
late PaymentServiceCaller _paymentService;
late NotificationServiceCaller _notificationService;
Future<void> initialize() async {
final transport = RpcHttp2Transport(
baseUrl: 'https://internal-api.company.com',
options: Http2TransportOptions(
maxConcurrentStreams: 50,
defaultHeaders: {
'X-Service-Name': 'user-service',
'Authorization': 'Bearer $serviceToken',
},
),
);
_caller = RpcCallerEndpoint(transport: transport);
_paymentService = PaymentServiceCaller(_caller);
_notificationService = NotificationServiceCaller(_caller);
}
Future<User> createUser(CreateUserRequest request) async {
// Create user account
final user = await _createUserAccount(request);
// Setup payment account (concurrent call)
final paymentFuture = _paymentService.createAccount(
CreatePaymentAccountRequest(userId: user.id),
);
// Send welcome notification (concurrent call)
final notificationFuture = _notificationService.sendWelcomeEmail(
WelcomeEmailRequest(
userId: user.id,
email: user.email,
name: user.name,
),
);
// Wait for both operations to complete
await Future.wait([paymentFuture, notificationFuture]);
return user;
}
}

Integrate with existing HTTP/2 infrastructure:

class ApiGateway {
final Map<String, RpcCallerEndpoint> _serviceClients = {};
Future<void> initializeServices() async {
// User service
final userTransport = RpcHttp2Transport(
baseUrl: 'https://user-service.internal',
options: Http2TransportOptions(
maxConcurrentStreams: 100,
defaultHeaders: {'X-Gateway': 'api-gateway-v1'},
),
);
_serviceClients['user'] = RpcCallerEndpoint(transport: userTransport);
// Order service
final orderTransport = RpcHttp2Transport(
baseUrl: 'https://order-service.internal',
options: Http2TransportOptions(
maxConcurrentStreams: 100,
defaultHeaders: {'X-Gateway': 'api-gateway-v1'},
),
);
_serviceClients['order'] = RpcCallerEndpoint(transport: orderTransport);
// Product service
final productTransport = RpcHttp2Transport(
baseUrl: 'https://product-service.internal',
options: Http2TransportOptions(
maxConcurrentStreams: 100,
defaultHeaders: {'X-Gateway': 'api-gateway-v1'},
),
);
_serviceClients['product'] = RpcCallerEndpoint(transport: productTransport);
}
Future<OrderDetails> getOrderDetails(String orderId) async {
final orderCaller = OrderServiceCaller(_serviceClients['order']!);
final userCaller = UserServiceCaller(_serviceClients['user']!);
final productCaller = ProductServiceCaller(_serviceClients['product']!);
// Get order info
final order = await orderCaller.getOrder(GetOrderRequest(orderId: orderId));
// Get user and product info concurrently
final userFuture = userCaller.getUser(GetUserRequest(userId: order.userId));
final productsFuture = Future.wait(
order.items.map((item) =>
productCaller.getProduct(GetProductRequest(productId: item.productId))
),
);
final user = await userFuture;
final products = await productsFuture;
return OrderDetails(
order: order,
user: user,
products: products,
);
}
}

Process streaming data efficiently:

class DataProcessor {
late RpcCallerEndpoint _caller;
late ProcessingServiceCaller _processingService;
Future<void> processDataStream() async {
final transport = RpcHttp2Transport(
baseUrl: 'https://processing-cluster.company.com',
options: Http2TransportOptions(
maxConcurrentStreams: 200,
windowSize: 2097152, // 2MB for large data streams
),
);
_caller = RpcCallerEndpoint(transport: transport);
_processingService = ProcessingServiceCaller(_caller);
// Create bidirectional stream
final inputController = StreamController<ProcessRequest>();
final responseStream = _processingService.processStream(inputController.stream);
// Start processing responses
responseStream.listen((response) {
if (response.success) {
print('Processed ${response.id}: ${response.result}');
} else {
print('Error processing ${response.id}: ${response.error}');
}
});
// Send data for processing
for (int i = 0; i < 1000; i++) {
inputController.add(ProcessRequest(
id: i.toString(),
data: generateDataPoint(i),
));
// Control flow to avoid overwhelming the server
if (i % 10 == 0) {
await Future.delayed(Duration(milliseconds: 100));
}
}
await inputController.close();
}
}

RPC Dart HTTP/2 transport can call existing gRPC services:

// Call existing gRPC service
class GrpcInterop {
Future<void> callExistingGrpcService() async {
final transport = RpcHttp2Transport(
baseUrl: 'https://existing-grpc-service.com',
options: Http2TransportOptions(
// gRPC specific headers
defaultHeaders: {
'content-type': 'application/grpc+proto',
'grpc-encoding': 'gzip',
},
),
);
final caller = RpcCallerEndpoint(transport: transport);
// Call the existing gRPC service using RPC Dart contracts
final grpcCalculator = ExistingGrpcCalculatorCaller(caller);
final result = await grpcCalculator.add(GrpcAddRequest(a: 10, b: 5));
print('gRPC service result: ${result.sum}');
}
}

RPC Dart services are automatically compatible with gRPC clients:

Terminal window
# Any gRPC client can call RPC Dart services
grpcurl -plaintext -d '{"a": 10, "b": 5}' \
localhost:8443 calculator.Calculator/Add
class LoadBalancedClient {
final List<String> _endpoints = [
'https://service1.company.com',
'https://service2.company.com',
'https://service3.company.com',
];
final List<RpcCallerEndpoint> _clients = [];
int _currentIndex = 0;
Future<void> initialize() async {
for (final endpoint in _endpoints) {
final transport = RpcHttp2Transport(
baseUrl: endpoint,
options: Http2TransportOptions(
maxConcurrentStreams: 50,
healthCheckInterval: Duration(seconds: 30),
),
);
_clients.add(RpcCallerEndpoint(transport: transport));
}
}
RpcCallerEndpoint getHealthyClient() {
// Simple round-robin (in production, use health checks)
final client = _clients[_currentIndex];
_currentIndex = (_currentIndex + 1) % _clients.length;
return client;
}
Future<T> callWithLoadBalancing<T>(
Future<T> Function(RpcCallerEndpoint) call,
) async {
RpcException? lastError;
// Try each endpoint until one succeeds
for (int i = 0; i < _clients.length; i++) {
try {
final client = getHealthyClient();
return await call(client);
} on RpcException catch (e) {
lastError = e;
print('Endpoint failed, trying next: ${e.message}');
}
}
throw lastError ?? RpcException(
code: 'ALL_ENDPOINTS_FAILED',
message: 'All endpoints failed',
);
}
}
class Http2ConnectionPool {
final Map<String, RpcHttp2Transport> _connections = {};
final int _maxConnectionsPerHost;
Http2ConnectionPool({int maxConnectionsPerHost = 5})
: _maxConnectionsPerHost = maxConnectionsPerHost;
RpcHttp2Transport getConnection(String baseUrl) {
return _connections.putIfAbsent(baseUrl, () {
return RpcHttp2Transport(
baseUrl: baseUrl,
options: Http2TransportOptions(
maxConcurrentStreams: 100,
keepAliveInterval: Duration(minutes: 2),
connectionTimeout: Duration(seconds: 30),
// Optimize for high throughput
windowSize: 2097152, // 2MB
maxFrameSize: 32768, // 32KB
enableCompression: true,
),
);
});
}
Future<void> closeAll() async {
await Future.wait(_connections.values.map((t) => t.close()));
_connections.clear();
}
}
class OptimizedHttp2Client {
late RpcCallerEndpoint _caller;
Future<void> optimizedBatchCalls() async {
final transport = RpcHttp2Transport(
baseUrl: 'https://api.example.com',
options: Http2TransportOptions(
// Optimize for batch operations
maxConcurrentStreams: 200,
windowSize: 4194304, // 4MB
enableCompression: true,
// Request pipelining
enablePipelining: true,
maxPipelinedRequests: 10,
),
);
_caller = RpcCallerEndpoint(transport: transport);
final service = ServiceCaller(_caller);
// Batch multiple calls efficiently
final batchSize = 50;
final futures = <Future>[];
for (int i = 0; i < 1000; i += batchSize) {
final batchFutures = List.generate(
math.min(batchSize, 1000 - i),
(index) => service.process(ProcessRequest(id: i + index)),
);
futures.addAll(batchFutures);
// Process in batches to avoid overwhelming the server
if (futures.length >= 200) {
await Future.wait(futures);
futures.clear();
}
}
// Process remaining futures
if (futures.isNotEmpty) {
await Future.wait(futures);
}
}
}
final transport = RpcHttp2Transport(
baseUrl: 'https://api.example.com',
options: Http2TransportOptions(
// Connection timeout
connectionTimeout: Duration(seconds: 30),
// Request timeout (varies by operation type)
requestTimeout: Duration(minutes: 5),
// Keep-alive to maintain connections
keepAliveInterval: Duration(minutes: 2),
// Stream-specific timeouts
streamTimeout: Duration(minutes: 30),
),
);
class ResilientHttp2Client {
Future<T> callWithRetry<T>(Future<T> Function() call) async {
int attempts = 0;
const maxAttempts = 3;
while (attempts < maxAttempts) {
try {
return await call();
} on RpcException catch (e) {
attempts++;
if (e.code == 'UNAVAILABLE' && attempts < maxAttempts) {
// Exponential backoff
await Future.delayed(Duration(seconds: math.pow(2, attempts).toInt()));
continue;
}
rethrow;
}
}
throw RpcException(
code: 'MAX_RETRIES_EXCEEDED',
message: 'Failed after $maxAttempts attempts',
);
}
}
class Http2PerformanceMonitor {
void setupMonitoring(RpcHttp2Transport transport) {
transport.metrics.listen((metrics) {
print('Active streams: ${metrics.activeStreams}');
print('Total requests: ${metrics.totalRequests}');
print('Average latency: ${metrics.averageLatency.inMilliseconds}ms');
print('Error rate: ${metrics.errorRate}%');
});
}
}

HTTP/2 transport is the ideal choice for distributed systems, microservices architectures, and applications requiring high-performance communication with existing gRPC infrastructure. Its multiplexing capabilities and HTTP/2 features make it perfect for scalable enterprise applications.