gRPC Compatibility
Full compatibility with gRPC protocol, allowing RPC Dart services to communicate with existing gRPC clients and servers.
The HTTP/2 Transport provides high-performance communication using the HTTP/2 protocol with gRPC compatibility. It’s designed for distributed systems that need efficient multiplexing, server push capabilities, and seamless integration with existing HTTP/2 infrastructure.
gRPC Compatibility
Full compatibility with gRPC protocol, allowing RPC Dart services to communicate with existing gRPC clients and servers.
HTTP/2 Multiplexing
Multiple RPC calls over a single connection with efficient stream multiplexing and flow control.
TLS Security
Built-in TLS support for secure communication with certificate validation and encryption.
Load Balancing
Compatible with HTTP/2 load balancers and service meshes for enterprise-grade deployments.
HTTP/2 transport implements the gRPC protocol specification:
// Server automatically handles gRPC protocolclass CalculatorService extends RpcResponderContract { CalculatorService() : super('calculator.Calculator');
@override void setup() { addUnaryMethod<AddRequest, AddResponse>( methodName: 'Add', handler: _add, );
addServerStreamMethod<RangeRequest, NumberResponse>( methodName: 'GenerateNumbers', handler: _generateNumbers, ); }
Future<AddResponse> _add(AddRequest request, {RpcContext? context}) async { return AddResponse(result: request.a + request.b); }
Stream<NumberResponse> _generateNumbers(RangeRequest request, {RpcContext? context}) async* { for (int i = request.start; i <= request.end; i++) { yield NumberResponse(value: i); await Future.delayed(Duration(milliseconds: 100)); } }}
Multiple concurrent requests over a single connection:
// Multiple concurrent calls over one connectionfinal transport = RpcHttp2Transport( baseUrl: 'https://api.example.com', options: Http2TransportOptions( maxConcurrentStreams: 100, connectionTimeout: Duration(seconds: 30), keepAliveInterval: Duration(seconds: 60), ),);
final caller = RpcCallerEndpoint(transport: transport);final calculator = CalculatorCaller(caller);
// All these calls multiplex over the same connectionfinal futures = List.generate(10, (i) async { return await calculator.add(AddRequest(a: i, b: i + 1));});
final results = await Future.wait(futures);// All 10 calls completed efficiently over one HTTP/2 connection
Setting up an HTTP/2 server with gRPC compatibility:
import 'dart:io';import 'package:rpc_dart/rpc_dart.dart';import 'package:rpc_dart_transports/rpc_dart_transports.dart';
class Http2RpcServer { late HttpServer _server; late RpcResponderEndpoint _endpoint;
Future<void> start({ String host = 'localhost', int port = 8443, SecurityContext? securityContext, }) async { // Create HTTP/2 server with TLS _server = await HttpServer.bindSecure( host, port, securityContext ?? _createSecurityContext(), );
// Enable HTTP/2 _server.autoCompress = true;
print('HTTP/2 gRPC Server listening on https://$host:$port');
// Create transport and endpoint final transport = RpcHttp2ServerTransport(_server); _endpoint = RpcResponderEndpoint(transport: transport);
// Register services _endpoint.registerServiceContract(CalculatorService()); _endpoint.registerServiceContract(FileService()); _endpoint.registerServiceContract(ChatService());
// Start handling requests await _endpoint.start(); }
SecurityContext _createSecurityContext() { final context = SecurityContext();
// Load server certificate and private key context.useCertificateChain('server.crt'); context.usePrivateKey('server.key');
// Optional: Configure client certificate validation context.setTrustedCertificates('ca.crt'); context.setClientAuthorities('ca.crt');
return context; }
Future<void> stop() async { await _endpoint.close(); await _server.close(); }}
class CalculatorService extends RpcResponderContract { CalculatorService() : super('calculator.Calculator');
@override void setup() { addUnaryMethod<AddRequest, AddResponse>( methodName: 'Add', handler: _add, );
addUnaryMethod<MultiplyRequest, MultiplyResponse>( methodName: 'Multiply', handler: _multiply, );
addServerStreamMethod<RangeRequest, NumberResponse>( methodName: 'GenerateNumbers', handler: _generateNumbers, );
addClientStreamMethod<NumberRequest, SumResponse>( methodName: 'SumNumbers', handler: _sumNumbers, );
addBidirectionalStreamMethod<ProcessRequest, ProcessResponse>( methodName: 'ProcessStream', handler: _processStream, ); }
Future<AddResponse> _add(AddRequest request, {RpcContext? context}) async { // Simulate some processing time await Future.delayed(Duration(milliseconds: 10));
return AddResponse( result: request.a + request.b, timestamp: DateTime.now().toIso8601String(), ); }
Future<MultiplyResponse> _multiply(MultiplyRequest request, {RpcContext? context}) async { return MultiplyResponse( result: request.a * request.b, timestamp: DateTime.now().toIso8601String(), ); }
Stream<NumberResponse> _generateNumbers(RangeRequest request, {RpcContext? context}) async* { for (int i = request.start; i <= request.end; i++) { yield NumberResponse( value: i, timestamp: DateTime.now().toIso8601String(), );
// Simulate real-time generation await Future.delayed(Duration(milliseconds: request.intervalMs ?? 100)); } }
Future<SumResponse> _sumNumbers(Stream<NumberRequest> numbers, {RpcContext? context}) async { double sum = 0; int count = 0;
await for (final number in numbers) { sum += number.value; count++; }
return SumResponse( total: sum, count: count, average: count > 0 ? sum / count : 0, ); }
Stream<ProcessResponse> _processStream( Stream<ProcessRequest> requests, {RpcContext? context} ) async* { await for (final request in requests) { try { // Simulate processing await Future.delayed(Duration(milliseconds: 50));
yield ProcessResponse( id: request.id, success: true, result: 'Processed: ${request.data}', processedAt: DateTime.now().toIso8601String(), ); } catch (e) { yield ProcessResponse( id: request.id, success: false, error: e.toString(), processedAt: DateTime.now().toIso8601String(), ); } } }}
void main() async { final server = Http2RpcServer();
// Handle graceful shutdown ProcessSignal.sigint.watch().listen((_) async { print('Shutting down HTTP/2 server...'); await server.stop(); exit(0); });
try { await server.start( host: 'localhost', port: 8443, ); } catch (e) { print('Failed to start server: $e'); exit(1); }}
Connecting to an HTTP/2 gRPC server:
import 'package:rpc_dart/rpc_dart.dart';import 'package:rpc_dart_transports/rpc_dart_transports.dart';
void main() async { // Create HTTP/2 transport final transport = RpcHttp2Transport( baseUrl: 'https://localhost:8443', options: Http2TransportOptions( // Connection settings connectionTimeout: Duration(seconds: 30), requestTimeout: Duration(seconds: 60), keepAliveInterval: Duration(seconds: 60),
// HTTP/2 specific settings maxConcurrentStreams: 100, windowSize: 65535, maxFrameSize: 16384,
// Security settings validateCertificate: false, // Only for development clientCertificate: 'client.crt', clientPrivateKey: 'client.key',
// Headers defaultHeaders: { 'User-Agent': 'RpcDartClient/1.0', 'Accept-Encoding': 'gzip, deflate', }, ), );
// Create caller endpoint final caller = RpcCallerEndpoint(transport: transport); final calculator = CalculatorCaller(caller);
try { // Make RPC calls final addResult = await calculator.add(AddRequest(a: 10, b: 5)); print('10 + 5 = ${addResult.result}');
final multiplyResult = await calculator.multiply(MultiplyRequest(a: 4, b: 7)); print('4 * 7 = ${multiplyResult.result}');
// Use streaming methods print('Generating numbers 1-5:'); await for (final number in calculator.generateNumbers(RangeRequest(start: 1, end: 5, intervalMs: 200))) { print('Generated: ${number.value} at ${number.timestamp}'); }
} finally { await caller.close(); }}
Configure HTTP/2 transport for production:
final transport = RpcHttp2Transport( baseUrl: 'https://production-api.example.com', options: Http2TransportOptions( // Connection pooling maxConnectionsPerHost: 5, connectionTimeout: Duration(seconds: 30), keepAliveInterval: Duration(minutes: 2),
// Request settings requestTimeout: Duration(minutes: 5), maxRetries: 3, retryDelay: Duration(seconds: 1),
// HTTP/2 flow control maxConcurrentStreams: 200, windowSize: 1048576, // 1MB maxFrameSize: 32768, // 32KB
// Compression enableCompression: true, compressionLevel: 6,
// Security validateCertificate: true, pinnedCertificates: ['sha256:...'], // Certificate pinning
// Authentication defaultHeaders: { 'Authorization': 'Bearer $accessToken', 'X-API-Key': apiKey, },
// Monitoring onRequestStart: (request) => print('Starting request: ${request.method}'), onRequestComplete: (request, response) => print('Request completed: ${response.statusCode}'), onError: (error) => print('Transport error: $error'), ),);
Perfect for service-to-service communication:
class UserService { late RpcCallerEndpoint _caller; late PaymentServiceCaller _paymentService; late NotificationServiceCaller _notificationService;
Future<void> initialize() async { final transport = RpcHttp2Transport( baseUrl: 'https://internal-api.company.com', options: Http2TransportOptions( maxConcurrentStreams: 50, defaultHeaders: { 'X-Service-Name': 'user-service', 'Authorization': 'Bearer $serviceToken', }, ), );
_caller = RpcCallerEndpoint(transport: transport); _paymentService = PaymentServiceCaller(_caller); _notificationService = NotificationServiceCaller(_caller); }
Future<User> createUser(CreateUserRequest request) async { // Create user account final user = await _createUserAccount(request);
// Setup payment account (concurrent call) final paymentFuture = _paymentService.createAccount( CreatePaymentAccountRequest(userId: user.id), );
// Send welcome notification (concurrent call) final notificationFuture = _notificationService.sendWelcomeEmail( WelcomeEmailRequest( userId: user.id, email: user.email, name: user.name, ), );
// Wait for both operations to complete await Future.wait([paymentFuture, notificationFuture]);
return user; }}
Integrate with existing HTTP/2 infrastructure:
class ApiGateway { final Map<String, RpcCallerEndpoint> _serviceClients = {};
Future<void> initializeServices() async { // User service final userTransport = RpcHttp2Transport( baseUrl: 'https://user-service.internal', options: Http2TransportOptions( maxConcurrentStreams: 100, defaultHeaders: {'X-Gateway': 'api-gateway-v1'}, ), ); _serviceClients['user'] = RpcCallerEndpoint(transport: userTransport);
// Order service final orderTransport = RpcHttp2Transport( baseUrl: 'https://order-service.internal', options: Http2TransportOptions( maxConcurrentStreams: 100, defaultHeaders: {'X-Gateway': 'api-gateway-v1'}, ), ); _serviceClients['order'] = RpcCallerEndpoint(transport: orderTransport);
// Product service final productTransport = RpcHttp2Transport( baseUrl: 'https://product-service.internal', options: Http2TransportOptions( maxConcurrentStreams: 100, defaultHeaders: {'X-Gateway': 'api-gateway-v1'}, ), ); _serviceClients['product'] = RpcCallerEndpoint(transport: productTransport); }
Future<OrderDetails> getOrderDetails(String orderId) async { final orderCaller = OrderServiceCaller(_serviceClients['order']!); final userCaller = UserServiceCaller(_serviceClients['user']!); final productCaller = ProductServiceCaller(_serviceClients['product']!);
// Get order info final order = await orderCaller.getOrder(GetOrderRequest(orderId: orderId));
// Get user and product info concurrently final userFuture = userCaller.getUser(GetUserRequest(userId: order.userId)); final productsFuture = Future.wait( order.items.map((item) => productCaller.getProduct(GetProductRequest(productId: item.productId)) ), );
final user = await userFuture; final products = await productsFuture;
return OrderDetails( order: order, user: user, products: products, ); }}
Process streaming data efficiently:
class DataProcessor { late RpcCallerEndpoint _caller; late ProcessingServiceCaller _processingService;
Future<void> processDataStream() async { final transport = RpcHttp2Transport( baseUrl: 'https://processing-cluster.company.com', options: Http2TransportOptions( maxConcurrentStreams: 200, windowSize: 2097152, // 2MB for large data streams ), );
_caller = RpcCallerEndpoint(transport: transport); _processingService = ProcessingServiceCaller(_caller);
// Create bidirectional stream final inputController = StreamController<ProcessRequest>(); final responseStream = _processingService.processStream(inputController.stream);
// Start processing responses responseStream.listen((response) { if (response.success) { print('Processed ${response.id}: ${response.result}'); } else { print('Error processing ${response.id}: ${response.error}'); } });
// Send data for processing for (int i = 0; i < 1000; i++) { inputController.add(ProcessRequest( id: i.toString(), data: generateDataPoint(i), ));
// Control flow to avoid overwhelming the server if (i % 10 == 0) { await Future.delayed(Duration(milliseconds: 100)); } }
await inputController.close(); }}
RPC Dart HTTP/2 transport can call existing gRPC services:
// Call existing gRPC serviceclass GrpcInterop { Future<void> callExistingGrpcService() async { final transport = RpcHttp2Transport( baseUrl: 'https://existing-grpc-service.com', options: Http2TransportOptions( // gRPC specific headers defaultHeaders: { 'content-type': 'application/grpc+proto', 'grpc-encoding': 'gzip', }, ), );
final caller = RpcCallerEndpoint(transport: transport);
// Call the existing gRPC service using RPC Dart contracts final grpcCalculator = ExistingGrpcCalculatorCaller(caller); final result = await grpcCalculator.add(GrpcAddRequest(a: 10, b: 5));
print('gRPC service result: ${result.sum}'); }}
RPC Dart services are automatically compatible with gRPC clients:
# Any gRPC client can call RPC Dart servicesgrpcurl -plaintext -d '{"a": 10, "b": 5}' \ localhost:8443 calculator.Calculator/Add
class LoadBalancedClient { final List<String> _endpoints = [ 'https://service1.company.com', 'https://service2.company.com', 'https://service3.company.com', ];
final List<RpcCallerEndpoint> _clients = []; int _currentIndex = 0;
Future<void> initialize() async { for (final endpoint in _endpoints) { final transport = RpcHttp2Transport( baseUrl: endpoint, options: Http2TransportOptions( maxConcurrentStreams: 50, healthCheckInterval: Duration(seconds: 30), ), );
_clients.add(RpcCallerEndpoint(transport: transport)); } }
RpcCallerEndpoint getHealthyClient() { // Simple round-robin (in production, use health checks) final client = _clients[_currentIndex]; _currentIndex = (_currentIndex + 1) % _clients.length; return client; }
Future<T> callWithLoadBalancing<T>( Future<T> Function(RpcCallerEndpoint) call, ) async { RpcException? lastError;
// Try each endpoint until one succeeds for (int i = 0; i < _clients.length; i++) { try { final client = getHealthyClient(); return await call(client); } on RpcException catch (e) { lastError = e; print('Endpoint failed, trying next: ${e.message}'); } }
throw lastError ?? RpcException( code: 'ALL_ENDPOINTS_FAILED', message: 'All endpoints failed', ); }}
class Http2ConnectionPool { final Map<String, RpcHttp2Transport> _connections = {}; final int _maxConnectionsPerHost;
Http2ConnectionPool({int maxConnectionsPerHost = 5}) : _maxConnectionsPerHost = maxConnectionsPerHost;
RpcHttp2Transport getConnection(String baseUrl) { return _connections.putIfAbsent(baseUrl, () { return RpcHttp2Transport( baseUrl: baseUrl, options: Http2TransportOptions( maxConcurrentStreams: 100, keepAliveInterval: Duration(minutes: 2), connectionTimeout: Duration(seconds: 30),
// Optimize for high throughput windowSize: 2097152, // 2MB maxFrameSize: 32768, // 32KB enableCompression: true, ), ); }); }
Future<void> closeAll() async { await Future.wait(_connections.values.map((t) => t.close())); _connections.clear(); }}
class OptimizedHttp2Client { late RpcCallerEndpoint _caller;
Future<void> optimizedBatchCalls() async { final transport = RpcHttp2Transport( baseUrl: 'https://api.example.com', options: Http2TransportOptions( // Optimize for batch operations maxConcurrentStreams: 200, windowSize: 4194304, // 4MB enableCompression: true,
// Request pipelining enablePipelining: true, maxPipelinedRequests: 10, ), );
_caller = RpcCallerEndpoint(transport: transport); final service = ServiceCaller(_caller);
// Batch multiple calls efficiently final batchSize = 50; final futures = <Future>[];
for (int i = 0; i < 1000; i += batchSize) { final batchFutures = List.generate( math.min(batchSize, 1000 - i), (index) => service.process(ProcessRequest(id: i + index)), );
futures.addAll(batchFutures);
// Process in batches to avoid overwhelming the server if (futures.length >= 200) { await Future.wait(futures); futures.clear(); } }
// Process remaining futures if (futures.isNotEmpty) { await Future.wait(futures); } }}
final transport = RpcHttp2Transport( baseUrl: 'https://api.example.com', options: Http2TransportOptions( // Connection timeout connectionTimeout: Duration(seconds: 30),
// Request timeout (varies by operation type) requestTimeout: Duration(minutes: 5),
// Keep-alive to maintain connections keepAliveInterval: Duration(minutes: 2),
// Stream-specific timeouts streamTimeout: Duration(minutes: 30), ),);
class ResilientHttp2Client { Future<T> callWithRetry<T>(Future<T> Function() call) async { int attempts = 0; const maxAttempts = 3;
while (attempts < maxAttempts) { try { return await call(); } on RpcException catch (e) { attempts++;
if (e.code == 'UNAVAILABLE' && attempts < maxAttempts) { // Exponential backoff await Future.delayed(Duration(seconds: math.pow(2, attempts).toInt())); continue; }
rethrow; } }
throw RpcException( code: 'MAX_RETRIES_EXCEEDED', message: 'Failed after $maxAttempts attempts', ); }}
class Http2PerformanceMonitor { void setupMonitoring(RpcHttp2Transport transport) { transport.metrics.listen((metrics) { print('Active streams: ${metrics.activeStreams}'); print('Total requests: ${metrics.totalRequests}'); print('Average latency: ${metrics.averageLatency.inMilliseconds}ms'); print('Error rate: ${metrics.errorRate}%'); }); }}
HTTP/2 transport is the ideal choice for distributed systems, microservices architectures, and applications requiring high-performance communication with existing gRPC infrastructure. Its multiplexing capabilities and HTTP/2 features make it perfect for scalable enterprise applications.