Application Layer
Your business logic and service implementations that use RPC contracts.
RPC Dart follows a layered architecture that separates concerns and provides flexibility in how components communicate. Understanding this architecture will help you build better applications and make informed decisions about transport selection and service design.
RPC Dart’s architecture consists of several layers, each with specific responsibilities:
Application Layer
Your business logic and service implementations that use RPC contracts.
RPC Layer
Contracts, callers, responders, and routing logic.
Transport Layer
Network communication, serialization, and message delivery.
Core Layer
Base classes, utilities, and framework fundamentals.
┌─────────────────────────────────────────┐│ Application Layer ││ ┌─────────────┐ ┌─────────────┐ ││ │ Service │ │ Client │ ││ │ A │ │Application │ ││ └─────────────┘ └─────────────┘ │└─────────────────────────────────────────┘┌─────────────────────────────────────────┐│ RPC Layer ││ ┌─────────────┐ ┌─────────────┐ ││ │ Responder │ │ Caller │ ││ │ Endpoint │ │ Endpoint │ ││ └─────────────┘ └─────────────┘ ││ ┌─────────────────────────────────┐ ││ │ Contracts │ ││ └─────────────────────────────────┘ │└─────────────────────────────────────────┘┌─────────────────────────────────────────┐│ Transport Layer ││ ┌─────────┐ ┌─────────┐ ┌─────────┐ ││ │InMemory │ │ HTTP │ │WebSocket│ ││ └─────────┘ └─────────┘ └─────────┘ │└─────────────────────────────────────────┘┌─────────────────────────────────────────┐│ Core Layer ││ ┌─────────────────────────────────┐ ││ │ Base Classes & Utilities │ ││ └─────────────────────────────────┘ │└─────────────────────────────────────────┘
Contracts define the interface between services. They specify:
abstract interface class IUserServiceContract { static const name = 'UserService'; static const methodGetUser = 'getUser'; static const methodUpdateUser = 'updateUser'; static const methodStreamNotifications = 'streamNotifications';}
Handles incoming RPC requests:
final responder = RpcResponderEndpoint(transport: serverTransport);
// Register service implementationsresponder.registerServiceContract(UserServiceResponder());responder.registerServiceContract(PaymentServiceResponder());
// Start handling requestsresponder.start();
Initiates RPC calls to remote services:
final caller = RpcCallerEndpoint(transport: clientTransport);
// Create service clientsfinal userService = UserServiceCaller(caller);final paymentService = PaymentServiceCaller(caller);
// Make RPC callsfinal user = await userService.getUser(userId);
The transport layer handles message delivery between endpoints:
abstract class RpcTransport { Stream<RpcMessage> get messageStream; Future<void> sendMessage(RpcMessage message); Future<void> close();}
This abstraction allows switching between different transport implementations without changing application code.
Services are defined using contracts that specify the interface:
// Contract definitionabstract interface class ICalculatorContract { static const name = 'Calculator'; static const methodAdd = 'add'; static const methodSubtract = 'subtract';}
// Responder implementationclass CalculatorResponder extends RpcResponderContract { CalculatorResponder() : super(ICalculatorContract.name);
@override void setup() { addUnaryMethod<AddRequest, AddResponse>( methodName: ICalculatorContract.methodAdd, handler: _add, ); }
Future<AddResponse> _add(AddRequest request, {RpcContext? context}) async { return AddResponse(request.a + request.b); }}
// Caller implementationclass CalculatorCaller extends RpcCallerContract { CalculatorCaller(RpcCallerEndpoint endpoint) : super(ICalculatorContract.name, endpoint);
Future<AddResponse> add(AddRequest request) { return callUnary<AddRequest, AddResponse>( methodName: ICalculatorContract.methodAdd, request: request, ); }}
For complex applications, you can route different services through different transports:
final router = RpcTransportRouter();
// Route UserService through HTTProuter.addRoute( serviceName: 'UserService', transport: httpTransport,);
// Route NotificationService through WebSocketrouter.addRoute( serviceName: 'NotificationService', transport: webSocketTransport,);
// Route internal services through InMemoryrouter.addRoute( serviceName: 'CacheService', transport: inMemoryTransport,);
final caller = RpcCallerEndpoint(transport: router);
Implement cross-cutting concerns using middleware:
class AuthenticationMiddleware extends RpcMiddleware { @override Future<T> call<T>(RpcCall call, RpcNext next) async { // Check authentication final token = call.context?.metadata['auth-token']; if (token == null || !isValidToken(token)) { throw RpcException(code: 'UNAUTHORIZED'); }
return next(call); }}
class LoggingMiddleware extends RpcMiddleware { @override Future<T> call<T>(RpcCall call, RpcNext next) async { final start = DateTime.now(); try { final result = await next(call); final duration = DateTime.now().difference(start); log('${call.service}.${call.method} completed in ${duration.inMilliseconds}ms'); return result; } catch (e) { log('${call.service}.${call.method} failed: $e'); rethrow; } }}
// Register middlewareresponder.addMiddleware(AuthenticationMiddleware());responder.addMiddleware(LoggingMiddleware());
Client Transport Server │ │ │ │ 1. callUnary() │ │ ├─────────────────────────▶│ │ │ │ 2. RpcMessage │ │ ├─────────────────────────▶│ │ │ │ 3. processRequest() │ │ ├──────────────┐ │ │ │ │ │ │ │◀─────────────┘ │ │ 4. RpcMessage │ │ │◀─────────────────────────┤ │ 5. Response │ │ │◀─────────────────────────┤ │
Client Transport Server │ │ │ │ 1. callServerStream() │ │ ├─────────────────────────▶│ │ │ │ 2. RpcMessage │ │ ├─────────────────────────▶│ │ │ │ 3. processStreamRequest() │ │ ├──────────────┐ │ │ 4. Stream messages │ │ │ │◀─────────────────────────┤ │ │ 5. Stream<Response> │ │ │ │◀─────────────────────────┤ │ │ │ │ │◀─────────────┘
RPC Dart implements a structured error handling system:
abstract class RpcException implements Exception { final String code; final String message; final Map<String, dynamic>? details;}
class RpcTimeoutException extends RpcException { RpcTimeoutException(String message, Duration timeout);}
class RpcCancelledException extends RpcException { RpcCancelledException(String message);}
class RpcTransportException extends RpcException { RpcTransportException(String message, Exception cause);}
Errors are automatically serialized and propagated across transport boundaries:
// Server sideFuture<UserResponse> getUser(UserRequest request) async { if (request.userId.isEmpty) { throw RpcException( code: 'INVALID_USER_ID', message: 'User ID cannot be empty', details: {'field': 'userId'}, ); } // ... implementation}
// Client sidetry { final user = await userService.getUser(UserRequest(userId: ''));} on RpcException catch (e) { // Handle structured RPC errors print('Error: ${e.code} - ${e.message}'); if (e.details != null) { print('Field error: ${e.details!['field']}'); }}
For single-process applications, InMemory transport provides maximum performance:
class LargeDataService extends RpcResponderContract { @override void setup() { addUnaryMethod<LargeData, ProcessedData>( methodName: 'processLargeData', handler: _processLargeData, ); }
Future<ProcessedData> _processLargeData(LargeData data) async { // Direct object access - no serialization overhead return ProcessedData(data.processDirectly()); }}
Network transports automatically handle serialization:
// Uses CBOR encoding for efficient serializationfinal httpTransport = RpcHttpTransport( url: 'https://api.example.com', codec: CborCodec(), // Efficient binary encoding);
HTTP transport includes connection pooling:
final httpTransport = RpcHttpTransport( url: 'https://api.example.com', maxConnections: 10, keepAlive: true,);
Break large applications into focused services:
// User management serviceabstract interface class IUserServiceContract { static const name = 'UserService'; // User CRUD operations}
// Payment processing serviceabstract interface class IPaymentServiceContract { static const name = 'PaymentService'; // Payment operations}
// Notification serviceabstract interface class INotificationServiceContract { static const name = 'NotificationService'; // Real-time notifications}
Use appropriate transports for different service types:
// Real-time services use WebSocketfinal notificationService = NotificationServiceCaller( RpcCallerEndpoint(transport: webSocketTransport));
// CRUD services use HTTPfinal userService = UserServiceCaller( RpcCallerEndpoint(transport: httpTransport));
// Internal cache uses InMemoryfinal cacheService = CacheServiceCaller( RpcCallerEndpoint(transport: inMemoryTransport));
Distribute load across multiple service instances:
final loadBalancer = RpcLoadBalancingTransport([ RpcHttpTransport(url: 'https://api1.example.com'), RpcHttpTransport(url: 'https://api2.example.com'), RpcHttpTransport(url: 'https://api3.example.com'),]);
final userService = UserServiceCaller( RpcCallerEndpoint(transport: loadBalancer));
RPC Dart’s architecture makes testing straightforward:
class MockUserService extends RpcResponderContract { MockUserService() : super(IUserServiceContract.name);
@override void setup() { addUnaryMethod<GetUserRequest, UserResponse>( methodName: IUserServiceContract.methodGetUser, handler: _getMockUser, ); }
Future<UserResponse> _getMockUser(GetUserRequest request) async { return UserResponse( user: User(id: request.userId, name: 'Test User'), ); }}
void main() { group('UserService Integration', () { late RpcResponderEndpoint responder; late RpcCallerEndpoint caller;
setUp(() async { final (clientTransport, serverTransport) = RpcInMemoryTransport.pair();
responder = RpcResponderEndpoint(transport: serverTransport); responder.registerServiceContract(MockUserService()); responder.start();
caller = RpcCallerEndpoint(transport: clientTransport); });
tearDown(() async { await caller.close(); await responder.close(); });
test('should get user successfully', () async { final userService = UserServiceCaller(caller); final response = await userService.getUser(GetUserRequest(userId: '123'));
expect(response.user.id, equals('123')); expect(response.user.name, equals('Test User')); }); });}
The architecture of RPC Dart provides flexibility, performance, and maintainability while keeping the complexity manageable. By understanding these patterns and principles, you can build robust, scalable applications that are easy to test and maintain.