Слой приложения
Ваша бизнес-логика и реализации сервисов, которые используют RPC контракты.
RPC Dart следует слоистой архитектуре, которая разделяет ответственности и обеспечивает гибкость в том, как компоненты взаимодействуют друг с другом. Понимание этой архитектуры поможет вам создавать лучшие приложения и принимать обоснованные решения о выборе транспорта и дизайне сервисов.
Архитектура RPC Dart состоит из нескольких слоёв, каждый из которых имеет специфические обязанности:
Слой приложения
Ваша бизнес-логика и реализации сервисов, которые используют RPC контракты.
RPC слой
Контракты, вызывающие, отвечающие и логика маршрутизации.
Слой транспорта
Сетевая коммуникация, сериализация и доставка сообщений.
Основной слой
Базовые классы, утилиты и основы фреймворка.
┌─────────────────────────────────────────┐│ Слой приложения ││ ┌─────────────┐ ┌─────────────┐ ││ │ Сервис │ │ Клиентское │ ││ │ A │ │ приложение │ ││ └─────────────┘ └─────────────┘ │└─────────────────────────────────────────┘┌─────────────────────────────────────────┐│ RPC слой ││ ┌─────────────┐ ┌─────────────┐ ││ │ Responder │ │ Caller │ ││ │ Endpoint │ │ Endpoint │ ││ └─────────────┘ └─────────────┘ ││ ┌─────────────────────────────────┐ ││ │ Контракты │ ││ └─────────────────────────────────┘ │└─────────────────────────────────────────┘┌─────────────────────────────────────────┐│ Слой транспорта ││ ┌─────────┐ ┌─────────┐ ┌─────────┐ ││ │InMemory │ │ HTTP │ │WebSocket│ ││ └─────────┘ └─────────┘ └─────────┘ │└─────────────────────────────────────────┘┌─────────────────────────────────────────┐│ Основной слой ││ ┌─────────────────────────────────┐ ││ │ Базовые классы и утилиты │ ││ └─────────────────────────────────┘ │└─────────────────────────────────────────┘
Контракты определяют интерфейс между сервисами. Они указывают:
abstract interface class IUserServiceContract { static const name = 'UserService'; static const methodGetUser = 'getUser'; static const methodUpdateUser = 'updateUser'; static const methodStreamNotifications = 'streamNotifications';}
Обрабатывает входящие RPC запросы:
final responder = RpcResponderEndpoint(transport: serverTransport);
// Регистрируем реализации сервисовresponder.registerServiceContract(UserServiceResponder());responder.registerServiceContract(PaymentServiceResponder());
// Начинаем обработку запросовresponder.start();
Инициирует RPC вызовы к удалённым сервисам:
final caller = RpcCallerEndpoint(transport: clientTransport);
// Создаём клиенты сервисовfinal userService = UserServiceCaller(caller);final paymentService = PaymentServiceCaller(caller);
// Делаем RPC вызовыfinal user = await userService.getUser(userId);
Слой транспорта обрабатывает доставку сообщений между эндпоинтами:
abstract class IRpcTransport { bool get isClient; bool get supportsZeroCopy; int createStream(); Future<void> sendMetadata(int streamId, RpcMetadata metadata, {bool endStream = false}); Future<void> sendMessage(int streamId, Uint8List data, {bool endStream = false}); Future<void> sendDirectObject(int streamId, Object object, {bool endStream = false}); Stream<RpcTransportMessage> get incomingMessages; Future<void> finishSending(int streamId); Future<void> close(); Future<RpcHealthStatus> health(); Future<RpcHealthStatus> reconnect();}
Такой контракт реализован в RPC Dart и позволяет заменять транспорты без изменения бизнес-логики.
Сервисы определяются с использованием контрактов, которые задают интерфейс:
// Определение контрактаabstract interface class ICalculatorContract { static const name = 'Calculator'; static const methodAdd = 'add'; static const methodSubtract = 'subtract';}
// Реализация responder'аclass CalculatorResponder extends RpcResponderContract { CalculatorResponder() : super(ICalculatorContract.name);
@override void setup() { addUnaryMethod<AddRequest, AddResponse>( methodName: ICalculatorContract.methodAdd, handler: _add, ); }
Future<AddResponse> _add(AddRequest request, {RpcContext? context}) async { return AddResponse(request.a + request.b); }}
// Реализация caller'аclass CalculatorCaller extends RpcCallerContract { CalculatorCaller(RpcCallerEndpoint endpoint) : super(ICalculatorContract.name, endpoint);
Future<AddResponse> add(AddRequest request) { return callUnary<AddRequest, AddResponse>( methodName: ICalculatorContract.methodAdd, request: request, ); }}
Для сложных приложений вы можете маршрутизировать различные сервисы через разные транспорты:
final router = RpcTransportRouterBuilder.client() .routeCall(calledServiceName: 'UserService', toTransport: httpTransport) .routeCall( calledServiceName: 'NotificationService', toTransport: webSocketTransport, ) .routeCall( calledServiceName: 'CacheService', toTransport: inMemoryTransport, ) .build();
final caller = RpcCallerEndpoint(transport: router);
Реализуйте сквозные задачи с помощью middleware:
class LoggingMiddleware implements IRpcMiddleware { @override Future<dynamic> processRequest( String serviceName, String methodName, dynamic request, ) async { log('Вызов $serviceName.$methodName'); return request; }
@override Future<dynamic> processResponse( String serviceName, String methodName, dynamic response, ) async { log('Завершён $serviceName.$methodName'); return response; }}
// Регистрируем middleware (для запуска хуков расширьте эндпоинты)responder.addMiddleware(LoggingMiddleware());
Примечание. Базовые эндпоинты хранят зарегистрированные middleware и отображают их количество в диагностике. Чтобы выполнять логику до/после обработчиков, расширьте эндпоинт и вызовите
processRequest
/processResponse
вручную.
Клиент Транспорт Сервер │ │ │ │ 1. callUnary() │ │ ├─────────────────────────▶│ │ │ │ 2. RpcMessage │ │ ├─────────────────────────▶│ │ │ │ 3. processRequest() │ │ ├──────────────┐ │ │ │ │ │ │ │◀─────────────┘ │ │ 4. RpcMessage │ │ │◀─────────────────────────┤ │ 5. Response │ │ │◀─────────────────────────┤ │
Клиент Транспорт Сервер │ │ │ │ 1. callServerStream() │ │ ├─────────────────────────▶│ │ │ │ 2. RpcMessage │ │ ├─────────────────────────▶│ │ │ │ 3. processStreamRequest() │ │ ├──────────────┐ │ │ 4. Stream messages │ │ │ │◀─────────────────────────┤ │ │ 5. Stream<Response> │ │ │ │◀─────────────────────────┤ │ │ │ │ │◀─────────────┘
RPC Dart реализует структурированную систему обработки ошибок:
abstract class RpcException implements Exception { final String code; final String message; final Map<String, dynamic>? details;}
class RpcTimeoutException extends RpcException { RpcTimeoutException(String message, Duration timeout);}
class RpcCancelledException extends RpcException { RpcCancelledException(String message);}
class RpcTransportException extends RpcException { RpcTransportException(String message, Exception cause);}
Ошибки автоматически сериализуются и распространяются через границы транспорта:
// Серверная сторонаFuture<UserResponse> getUser(UserRequest request) async { if (request.userId.isEmpty) { throw RpcException( code: 'INVALID_USER_ID', message: 'ID пользователя не может быть пустым', details: {'field': 'userId'}, ); } // ... реализация}
// Клиентская сторонаtry { final user = await userService.getUser(UserRequest(userId: ''));} on RpcException catch (e) { // Обрабатываем структурированные RPC ошибки print('Ошибка: ${e.code} - ${e.message}'); if (e.details != null) { print('Ошибка поля: ${e.details!['field']}'); }}
Для одно-процессных приложений InMemory транспорт обеспечивает максимальную производительность:
class LargeDataService extends RpcResponderContract { @override void setup() { addUnaryMethod<LargeData, ProcessedData>( methodName: 'processLargeData', handler: _processLargeData, ); }
Future<ProcessedData> _processLargeData(LargeData data) async { // Прямой доступ к объекту - без накладных расходов на сериализацию return ProcessedData(data.processDirectly()); }}
Сетевые транспорты автоматически обрабатывают сериализацию:
// Для сетевых транспортов передавайте кодеки при регистрации методовaddUnaryMethod<MyRequest, MyResponse>( methodName: 'Process', handler: _process, requestCodec: RpcCodec(MyRequest.fromJson), responseCodec: RpcCodec(MyResponse.fromJson),);
RpcHttp2CallerTransport
поддерживает одно мультиплексированное соединение и
экспонирует health/reconnect-хуки:
final httpTransport = await RpcHttp2CallerTransport.secureConnect( host: 'api.example.com',);
final status = await httpTransport.health();if (!status.isHealthy) { await httpTransport.reconnect();}
Разбивайте большие приложения на сфокусированные сервисы:
// Сервис управления пользователямиabstract interface class IUserServiceContract { static const name = 'UserService'; // Операции CRUD пользователей}
// Сервис обработки платежейabstract interface class IPaymentServiceContract { static const name = 'PaymentService'; // Операции платежей}
// Сервис уведомленийabstract interface class INotificationServiceContract { static const name = 'NotificationService'; // Уведомления в реальном времени}
Используйте подходящие транспорты для разных типов сервисов (например
httpTransport
, созданный через RpcHttp2CallerTransport.secureConnect
, и
webSocketTransport
, созданный через RpcWebSocketCallerTransport.connect
):
// Сервисы реального времени используют WebSocketfinal notificationService = NotificationServiceCaller( RpcCallerEndpoint(transport: webSocketTransport));
// CRUD сервисы используют HTTPfinal userService = UserServiceCaller( RpcCallerEndpoint(transport: httpTransport));
// Внутренний кеш использует InMemoryfinal cacheService = CacheServiceCaller( RpcCallerEndpoint(transport: inMemoryTransport));
Используйте несколько транспортов вместе с RpcTransportRouter
или расширьте
RpcBaseTransport
, чтобы реализовать round-robin/health-check стратегию. Можно
распределять вызовы между несколькими RpcHttp2CallerTransport
или маршрутизировать
по метаданным, таким как тариф или география.
Архитектура RPC Dart делает тестирование простым:
class MockUserService extends RpcResponderContract { MockUserService() : super(IUserServiceContract.name);
@override void setup() { addUnaryMethod<GetUserRequest, UserResponse>( methodName: IUserServiceContract.methodGetUser, handler: _getMockUser, ); }
Future<UserResponse> _getMockUser(GetUserRequest request) async { return UserResponse( user: User(id: request.userId, name: 'Тестовый пользователь'), ); }}
void main() { group('Интеграция UserService', () { late RpcResponderEndpoint responder; late RpcCallerEndpoint caller;
setUp(() async { final (clientTransport, serverTransport) = RpcInMemoryTransport.pair();
responder = RpcResponderEndpoint(transport: serverTransport); responder.registerServiceContract(MockUserService()); responder.start();
caller = RpcCallerEndpoint(transport: clientTransport); });
tearDown(() async { await caller.close(); await responder.close(); });
test('должен успешно получить пользователя', () async { final userService = UserServiceCaller(caller); final response = await userService.getUser(GetUserRequest(userId: '123'));
expect(response.user.id, equals('123')); expect(response.user.name, equals('Тестовый пользователь')); }); });}
Архитектура RPC Dart обеспечивает гибкость, производительность и поддерживаемость, сохраняя управляемую сложность. Понимая эти паттерны и принципы, вы можете создавать надёжные, масштабируемые приложения, которые легко тестировать и поддерживать.