Слой приложения
Ваша бизнес-логика и реализации сервисов, которые используют RPC контракты.
RPC Dart следует слоистой архитектуре, которая разделяет ответственности и обеспечивает гибкость в том, как компоненты взаимодействуют друг с другом. Понимание этой архитектуры поможет вам создавать лучшие приложения и принимать обоснованные решения о выборе транспорта и дизайне сервисов.
Архитектура RPC Dart состоит из нескольких слоёв, каждый из которых имеет специфические обязанности:
Слой приложения
Ваша бизнес-логика и реализации сервисов, которые используют RPC контракты.
RPC слой
Контракты, вызывающие, отвечающие и логика маршрутизации.
Слой транспорта
Сетевая коммуникация, сериализация и доставка сообщений.
Основной слой
Базовые классы, утилиты и основы фреймворка.
┌─────────────────────────────────────────┐│ Слой приложения ││ ┌─────────────┐ ┌─────────────┐ ││ │ Сервис │ │ Клиентское │ ││ │ A │ │ приложение │ ││ └─────────────┘ └─────────────┘ │└─────────────────────────────────────────┘┌─────────────────────────────────────────┐│ RPC слой ││ ┌─────────────┐ ┌─────────────┐ ││ │ Responder │ │ Caller │ ││ │ Endpoint │ │ Endpoint │ ││ └─────────────┘ └─────────────┘ ││ ┌─────────────────────────────────┐ ││ │ Контракты │ ││ └─────────────────────────────────┘ │└─────────────────────────────────────────┘┌─────────────────────────────────────────┐│ Слой транспорта ││ ┌─────────┐ ┌─────────┐ ┌─────────┐ ││ │InMemory │ │ HTTP │ │WebSocket│ ││ └─────────┘ └─────────┘ └─────────┘ │└─────────────────────────────────────────┘┌─────────────────────────────────────────┐│ Основной слой ││ ┌─────────────────────────────────┐ ││ │ Базовые классы и утилиты │ ││ └─────────────────────────────────┘ │└─────────────────────────────────────────┘
Контракты определяют интерфейс между сервисами. Они указывают:
abstract interface class IUserServiceContract { static const name = 'UserService'; static const methodGetUser = 'getUser'; static const methodUpdateUser = 'updateUser'; static const methodStreamNotifications = 'streamNotifications';}
Обрабатывает входящие RPC запросы:
final responder = RpcResponderEndpoint(transport: serverTransport);
// Регистрируем реализации сервисовresponder.registerServiceContract(UserServiceResponder());responder.registerServiceContract(PaymentServiceResponder());
// Начинаем обработку запросовresponder.start();
Инициирует RPC вызовы к удалённым сервисам:
final caller = RpcCallerEndpoint(transport: clientTransport);
// Создаём клиенты сервисовfinal userService = UserServiceCaller(caller);final paymentService = PaymentServiceCaller(caller);
// Делаем RPC вызовыfinal user = await userService.getUser(userId);
Слой транспорта обрабатывает доставку сообщений между эндпоинтами:
abstract class RpcTransport { Stream<RpcMessage> get messageStream; Future<void> sendMessage(RpcMessage message); Future<void> close();}
Эта абстракция позволяет переключаться между различными реализациями транспорта без изменения кода приложения.
Сервисы определяются с использованием контрактов, которые задают интерфейс:
// Определение контрактаabstract interface class ICalculatorContract { static const name = 'Calculator'; static const methodAdd = 'add'; static const methodSubtract = 'subtract';}
// Реализация responder'аclass CalculatorResponder extends RpcResponderContract { CalculatorResponder() : super(ICalculatorContract.name);
@override void setup() { addUnaryMethod<AddRequest, AddResponse>( methodName: ICalculatorContract.methodAdd, handler: _add, ); }
Future<AddResponse> _add(AddRequest request, {RpcContext? context}) async { return AddResponse(request.a + request.b); }}
// Реализация caller'аclass CalculatorCaller extends RpcCallerContract { CalculatorCaller(RpcCallerEndpoint endpoint) : super(ICalculatorContract.name, endpoint);
Future<AddResponse> add(AddRequest request) { return callUnary<AddRequest, AddResponse>( methodName: ICalculatorContract.methodAdd, request: request, ); }}
Для сложных приложений вы можете маршрутизировать различные сервисы через разные транспорты:
final router = RpcTransportRouter();
// Маршрутизируем UserService через HTTProuter.addRoute( serviceName: 'UserService', transport: httpTransport,);
// Маршрутизируем NotificationService через WebSocketrouter.addRoute( serviceName: 'NotificationService', transport: webSocketTransport,);
// Маршрутизируем внутренние сервисы через InMemoryrouter.addRoute( serviceName: 'CacheService', transport: inMemoryTransport,);
final caller = RpcCallerEndpoint(transport: router);
Реализуйте сквозные задачи с помощью middleware:
class AuthenticationMiddleware extends RpcMiddleware { @override Future<T> call<T>(RpcCall call, RpcNext next) async { // Проверяем аутентификацию final token = call.context?.metadata['auth-token']; if (token == null || !isValidToken(token)) { throw RpcException(code: 'UNAUTHORIZED'); }
return next(call); }}
class LoggingMiddleware extends RpcMiddleware { @override Future<T> call<T>(RpcCall call, RpcNext next) async { final start = DateTime.now(); try { final result = await next(call); final duration = DateTime.now().difference(start); log('${call.service}.${call.method} завершён за ${duration.inMilliseconds}мс'); return result; } catch (e) { log('${call.service}.${call.method} неудачно: $e'); rethrow; } }}
// Регистрируем middlewareresponder.addMiddleware(AuthenticationMiddleware());responder.addMiddleware(LoggingMiddleware());
Клиент Транспорт Сервер │ │ │ │ 1. callUnary() │ │ ├─────────────────────────▶│ │ │ │ 2. RpcMessage │ │ ├─────────────────────────▶│ │ │ │ 3. processRequest() │ │ ├──────────────┐ │ │ │ │ │ │ │◀─────────────┘ │ │ 4. RpcMessage │ │ │◀─────────────────────────┤ │ 5. Response │ │ │◀─────────────────────────┤ │
Клиент Транспорт Сервер │ │ │ │ 1. callServerStream() │ │ ├─────────────────────────▶│ │ │ │ 2. RpcMessage │ │ ├─────────────────────────▶│ │ │ │ 3. processStreamRequest() │ │ ├──────────────┐ │ │ 4. Stream messages │ │ │ │◀─────────────────────────┤ │ │ 5. Stream<Response> │ │ │ │◀─────────────────────────┤ │ │ │ │ │◀─────────────┘
RPC Dart реализует структурированную систему обработки ошибок:
abstract class RpcException implements Exception { final String code; final String message; final Map<String, dynamic>? details;}
class RpcTimeoutException extends RpcException { RpcTimeoutException(String message, Duration timeout);}
class RpcCancelledException extends RpcException { RpcCancelledException(String message);}
class RpcTransportException extends RpcException { RpcTransportException(String message, Exception cause);}
Ошибки автоматически сериализуются и распространяются через границы транспорта:
// Серверная сторонаFuture<UserResponse> getUser(UserRequest request) async { if (request.userId.isEmpty) { throw RpcException( code: 'INVALID_USER_ID', message: 'ID пользователя не может быть пустым', details: {'field': 'userId'}, ); } // ... реализация}
// Клиентская сторонаtry { final user = await userService.getUser(UserRequest(userId: ''));} on RpcException catch (e) { // Обрабатываем структурированные RPC ошибки print('Ошибка: ${e.code} - ${e.message}'); if (e.details != null) { print('Ошибка поля: ${e.details!['field']}'); }}
Для одно-процессных приложений InMemory транспорт обеспечивает максимальную производительность:
class LargeDataService extends RpcResponderContract { @override void setup() { addUnaryMethod<LargeData, ProcessedData>( methodName: 'processLargeData', handler: _processLargeData, ); }
Future<ProcessedData> _processLargeData(LargeData data) async { // Прямой доступ к объекту - без накладных расходов на сериализацию return ProcessedData(data.processDirectly()); }}
Сетевые транспорты автоматически обрабатывают сериализацию:
// Использует CBOR кодирование для эффективной сериализацииfinal httpTransport = RpcHttpTransport( url: 'https://api.example.com', codec: CborCodec(), // Эффективное бинарное кодирование);
HTTP транспорт включает пулинг соединений:
final httpTransport = RpcHttpTransport( url: 'https://api.example.com', maxConnections: 10, keepAlive: true,);
Разбивайте большие приложения на сфокусированные сервисы:
// Сервис управления пользователямиabstract interface class IUserServiceContract { static const name = 'UserService'; // Операции CRUD пользователей}
// Сервис обработки платежейabstract interface class IPaymentServiceContract { static const name = 'PaymentService'; // Операции платежей}
// Сервис уведомленийabstract interface class INotificationServiceContract { static const name = 'NotificationService'; // Уведомления в реальном времени}
Используйте подходящие транспорты для разных типов сервисов:
// Сервисы реального времени используют WebSocketfinal notificationService = NotificationServiceCaller( RpcCallerEndpoint(transport: webSocketTransport));
// CRUD сервисы используют HTTPfinal userService = UserServiceCaller( RpcCallerEndpoint(transport: httpTransport));
// Внутренний кеш использует InMemoryfinal cacheService = CacheServiceCaller( RpcCallerEndpoint(transport: inMemoryTransport));
Распределяйте нагрузку между несколькими экземплярами сервиса:
final loadBalancer = RpcLoadBalancingTransport([ RpcHttpTransport(url: 'https://api1.example.com'), RpcHttpTransport(url: 'https://api2.example.com'), RpcHttpTransport(url: 'https://api3.example.com'),]);
final userService = UserServiceCaller( RpcCallerEndpoint(transport: loadBalancer));
Архитектура RPC Dart делает тестирование простым:
class MockUserService extends RpcResponderContract { MockUserService() : super(IUserServiceContract.name);
@override void setup() { addUnaryMethod<GetUserRequest, UserResponse>( methodName: IUserServiceContract.methodGetUser, handler: _getMockUser, ); }
Future<UserResponse> _getMockUser(GetUserRequest request) async { return UserResponse( user: User(id: request.userId, name: 'Тестовый пользователь'), ); }}
void main() { group('Интеграция UserService', () { late RpcResponderEndpoint responder; late RpcCallerEndpoint caller;
setUp(() async { final (clientTransport, serverTransport) = RpcInMemoryTransport.pair();
responder = RpcResponderEndpoint(transport: serverTransport); responder.registerServiceContract(MockUserService()); responder.start();
caller = RpcCallerEndpoint(transport: clientTransport); });
tearDown(() async { await caller.close(); await responder.close(); });
test('должен успешно получить пользователя', () async { final userService = UserServiceCaller(caller); final response = await userService.getUser(GetUserRequest(userId: '123'));
expect(response.user.id, equals('123')); expect(response.user.name, equals('Тестовый пользователь')); }); });}
Архитектура RPC Dart обеспечивает гибкость, производительность и поддерживаемость, сохраняя управляемую сложность. Понимая эти паттерны и принципы, вы можете создавать надёжные, масштабируемые приложения, которые легко тестировать и поддерживать.