Skip to content

Архитектура

RPC Dart следует слоистой архитектуре, которая разделяет ответственности и обеспечивает гибкость в том, как компоненты взаимодействуют друг с другом. Понимание этой архитектуры поможет вам создавать лучшие приложения и принимать обоснованные решения о выборе транспорта и дизайне сервисов.

Обзор архитектуры

Архитектура RPC Dart состоит из нескольких слоёв, каждый из которых имеет специфические обязанности:

  • Слой приложения – Ваша бизнес-логика и реализации сервисов, которые используют RPC контракты.

  • RPC слой – Контракты, вызывающие, отвечающие и логика маршрутизации.

  • Слой транспорта – Сетевая коммуникация, сериализация и доставка сообщений.

  • Основной слой – Базовые классы, утилиты и основы фреймворка.

Слоистая архитектура

┌─────────────────────────────────────────┐
│           Слой приложения               │
│   ┌─────────────┐   ┌─────────────┐    │
│   │   Сервис    │   │  Клиентское │    │
│   │     A       │   │ приложение  │    │
│   └─────────────┘   └─────────────┘    │
└─────────────────────────────────────────┘
┌─────────────────────────────────────────┐
│             RPC слой                    │
│   ┌─────────────┐   ┌─────────────┐    │
│   │ Responder   │   │   Caller    │    │
│   │ Endpoint    │   │ Endpoint    │    │
│   └─────────────┘   └─────────────┘    │
│   ┌─────────────────────────────────┐   │
│   │         Контракты               │   │
│   └─────────────────────────────────┘   │
└─────────────────────────────────────────┘
┌─────────────────────────────────────────┐
│           Слой транспорта               │
│   ┌─────────┐ ┌─────────┐ ┌─────────┐  │
│   │InMemory │ │  HTTP   │ │WebSocket│  │
│   └─────────┘ └─────────┘ └─────────┘  │
└─────────────────────────────────────────┘
┌─────────────────────────────────────────┐
│           Основной слой                 │
│   ┌─────────────────────────────────┐   │
│   │   Базовые классы и утилиты      │   │
│   └─────────────────────────────────┘   │
└─────────────────────────────────────────┘

Основные компоненты

1. Контракты

Контракты определяют интерфейс между сервисами. Они указывают:

  • Имена сервисов и методов
  • Типы запросов и ответов
  • Поддерживаемые паттерны RPC (унарные, потоковые)
abstract interface class IUserServiceContract {
  static const name = 'UserService';
  static const methodGetUser = 'getUser';
  static const methodUpdateUser = 'updateUser';
  static const methodStreamNotifications = 'streamNotifications';
}

2. Эндпоинты

  • RpcResponderEndpoint – Обрабатывает входящие RPC запросы:
final responder = RpcResponderEndpoint(transport: serverTransport);

// Регистрируем реализации сервисов
responder.registerServiceContract(UserServiceResponder());
responder.registerServiceContract(PaymentServiceResponder());

// Начинаем обработку запросов
responder.start();
  • RpcCallerEndpoint – Инициирует RPC вызовы к удалённым сервисам:
final caller = RpcCallerEndpoint(transport: clientTransport);

// Создаём клиенты сервисов
final userService = UserServiceCaller(caller);
final paymentService = PaymentServiceCaller(caller);

// Делаем RPC вызовы
final user = await userService.getUser(userId);

3. Абстракция транспорта

Слой транспорта обрабатывает доставку сообщений между эндпоинтами:

abstract class IRpcTransport {
  bool get isClient;
  bool get supportsZeroCopy;
  int createStream();
  Future<void> sendMetadata(int streamId, RpcMetadata metadata, {bool endStream = false});
  Future<void> sendMessage(int streamId, Uint8List data, {bool endStream = false});
  Future<void> sendDirectObject(int streamId, Object object, {bool endStream = false});
  Stream<RpcTransportMessage> get incomingMessages;
  Future<void> finishSending(int streamId);
  Future<void> close();
  Future<RpcHealthStatus> health();
  Future<RpcHealthStatus> reconnect();
}

Такой контракт реализован в RPC Dart и позволяет заменять транспорты без изменения бизнес-логики.

Паттерны проектирования

1. Паттерн контракта

Сервисы определяются с использованием контрактов, которые задают интерфейс:

// Определение контракта
abstract interface class ICalculatorContract {
  static const name = 'Calculator';
  static const methodAdd = 'add';
  static const methodSubtract = 'subtract';
}

// Реализация responder'а
class CalculatorResponder extends RpcResponderContract {
  CalculatorResponder() : super(ICalculatorContract.name);

  @override
  void setup() {
addUnaryMethod<AddRequest, AddResponse>(
  methodName: ICalculatorContract.methodAdd,
  handler: _add,
);
  }

  Future<AddResponse> _add(AddRequest request, {RpcContext? context}) async {
return AddResponse(request.a + request.b);
  }
}

// Реализация caller'а
class CalculatorCaller extends RpcCallerContract {
  CalculatorCaller(RpcCallerEndpoint endpoint) 
: super(ICalculatorContract.name, endpoint);

  Future<AddResponse> add(AddRequest request) {
return callUnary<AddRequest, AddResponse>(
  methodName: ICalculatorContract.methodAdd,
  request: request,
);
  }
}

2. Паттерн маршрутизатора транспорта

Для сложных приложений вы можете маршрутизировать различные сервисы через разные транспорты:

final router = RpcTransportRouterBuilder.client()
.routeCall(calledServiceName: 'UserService', toTransport: httpTransport)
.routeCall(
  calledServiceName: 'NotificationService',
  toTransport: webSocketTransport,
)
.routeCall(
  calledServiceName: 'CacheService',
  toTransport: inMemoryTransport,
)
.build();

final caller = RpcCallerEndpoint(transport: router);

3. Паттерн middleware

Реализуйте сквозные задачи с помощью middleware:

class LoggingMiddleware implements IRpcMiddleware {
  @override
  Future<dynamic> processRequest(
String serviceName,
String methodName,
dynamic request,
  ) async {
log('Вызов $serviceName.$methodName');
return request;
  }

  @override
  Future<dynamic> processResponse(
String serviceName,
String methodName,
dynamic response,
  ) async {
log('Завершён $serviceName.$methodName');
return response;
  }
}

// Регистрируем middleware (для запуска хуков расширьте эндпоинты)
responder.addMiddleware(LoggingMiddleware());

Примечание. Базовые эндпоинты хранят зарегистрированные middleware и отображают их количество в диагностике. Чтобы выполнять логику до/после обработчиков, расширьте эндпоинт и вызовите processRequest/processResponse вручную.

Поток сообщений

1. Поток унарного вызова

Клиент                    Транспорт                   Сервер
  │                          │                          │
  │ 1. callUnary()           │                          │
  ├─────────────────────────▶│                          │
  │                          │ 2. RpcMessage            │
  │                          ├─────────────────────────▶│
  │                          │                          │ 3. processRequest()
  │                          │                          ├──────────────┐
  │                          │                          │              │
  │                          │                          │◀─────────────┘
  │                          │ 4. RpcMessage            │
  │                          │◀─────────────────────────┤
  │ 5. Response              │                          │
  │◀─────────────────────────┤                          │

2. Поток потоковой передачи

Клиент                    Транспорт                   Сервер
  │                          │                          │
  │ 1. callServerStream()    │                          │
  ├─────────────────────────▶│                          │
  │                          │ 2. RpcMessage            │
  │                          ├─────────────────────────▶│
  │                          │                          │ 3. processStreamRequest()
  │                          │                          ├──────────────┐
  │                          │ 4. Stream messages       │              │
  │                          │◀─────────────────────────┤              │
  │ 5. Stream<Response>      │                          │              │
  │◀─────────────────────────┤                          │              │
  │                          │                          │◀─────────────┘

Архитектура обработки ошибок

RPC Dart реализует структурированную систему обработки ошибок:

1. Иерархия исключений

abstract class RpcException implements Exception {
  final String code;
  final String message;
  final Map<String, dynamic>? details;
}

class RpcTimeoutException extends RpcException {
  RpcTimeoutException(String message, Duration timeout);
}

class RpcCancelledException extends RpcException {
  RpcCancelledException(String message);
}

class RpcTransportException extends RpcException {
  RpcTransportException(String message, Exception cause);
}

2. Распространение ошибок

Ошибки автоматически сериализуются и распространяются через границы транспорта:

// Серверная сторона
Future<UserResponse> getUser(UserRequest request) async {
  if (request.userId.isEmpty) {
throw RpcException(
  code: 'INVALID_USER_ID',
  message: 'ID пользователя не может быть пустым',
  details: {'field': 'userId'},
);
  }
  // ... реализация
}

// Клиентская сторона
try {
  final user = await userService.getUser(UserRequest(userId: ''));
} on RpcException catch (e) {
  // Обрабатываем структурированные RPC ошибки
  print('Ошибка: ${e.code} - ${e.message}');
  if (e.details != null) {
print('Ошибка поля: ${e.details!['field']}');
  }
}

Соображения производительности

1. Нулевое копирование с InMemory транспортом

Для одно-процессных приложений InMemory транспорт обеспечивает максимальную производительность:

class LargeDataService extends RpcResponderContract {
  @override
  void setup() {
addUnaryMethod<LargeData, ProcessedData>(
  methodName: 'processLargeData',
  handler: _processLargeData,
);
  }

  Future<ProcessedData> _processLargeData(LargeData data) async {
// Прямой доступ к объекту - без накладных расходов на сериализацию
return ProcessedData(data.processDirectly());
  }
}

2. Сериализация с сетевыми транспортами

Сетевые транспорты автоматически обрабатывают сериализацию:

// Для сетевых транспортов передавайте кодеки при регистрации методов
addUnaryMethod<MyRequest, MyResponse>(
  methodName: 'Process',
  handler: _process,
  requestCodec: RpcCodec(MyRequest.fromJson),
  responseCodec: RpcCodec(MyResponse.fromJson),
);

3. Управление соединением

RpcHttp2CallerTransport поддерживает одно мультиплексированное соединение и экспонирует health/reconnect-хуки:

final httpTransport = await RpcHttp2CallerTransport.secureConnect(
  host: 'api.example.com',
);

final status = await httpTransport.health();
if (!status.isHealthy) {
  await httpTransport.reconnect();
}

Паттерны масштабируемости

1. Декомпозиция сервисов

Разбивайте большие приложения на сфокусированные сервисы:

// Сервис управления пользователями
abstract interface class IUserServiceContract {
  static const name = 'UserService';
  // Операции CRUD пользователей
}

// Сервис обработки платежей
abstract interface class IPaymentServiceContract {
  static const name = 'PaymentService';
  // Операции платежей
}

// Сервис уведомлений
abstract interface class INotificationServiceContract {
  static const name = 'NotificationService';
  // Уведомления в реальном времени
}

2. Выбор транспорта по сервису

Используйте подходящие транспорты для разных типов сервисов (например httpTransport, созданный через RpcHttp2CallerTransport.secureConnect, и webSocketTransport, созданный через RpcWebSocketCallerTransport.connect):

// Сервисы реального времени используют WebSocket
final notificationService = NotificationServiceCaller(
  RpcCallerEndpoint(transport: webSocketTransport)
);

// CRUD сервисы используют HTTP
final userService = UserServiceCaller(
  RpcCallerEndpoint(transport: httpTransport)
);

// Внутренний кеш использует InMemory
final cacheService = CacheServiceCaller(
  RpcCallerEndpoint(transport: inMemoryTransport)
);

3. Балансировка нагрузки

Используйте несколько транспортов вместе с RpcTransportRouter или расширьте RpcBaseTransport, чтобы реализовать round-robin/health-check стратегию. Можно распределять вызовы между несколькими RpcHttp2CallerTransport или маршрутизировать по метаданным, таким как тариф или география.

Архитектура тестирования

Архитектура RPC Dart делает тестирование простым:

1. Мок сервисы

class MockUserService extends RpcResponderContract {
  MockUserService() : super(IUserServiceContract.name);

  @override
  void setup() {
addUnaryMethod<GetUserRequest, UserResponse>(
  methodName: IUserServiceContract.methodGetUser,
  handler: _getMockUser,
);
  }

  Future<UserResponse> _getMockUser(GetUserRequest request) async {
return UserResponse(
  user: User(id: request.userId, name: 'Тестовый пользователь'),
);
  }
}

2. Интеграционное тестирование

void main() {
  group('Интеграция UserService', () {
late RpcResponderEndpoint responder;
late RpcCallerEndpoint caller;

setUp(() async {
  final (clientTransport, serverTransport) = RpcInMemoryTransport.pair();

  responder = RpcResponderEndpoint(transport: serverTransport);
  responder.registerServiceContract(MockUserService());
  responder.start();

  caller = RpcCallerEndpoint(transport: clientTransport);
});

tearDown(() async {
  await caller.close();
  await responder.close();
});

test('должен успешно получить пользователя', () async {
  final userService = UserServiceCaller(caller);
  final response = await userService.getUser(GetUserRequest(userId: '123'));

  expect(response.user.id, equals('123'));
  expect(response.user.name, equals('Тестовый пользователь'));
});
  });
}

Лучшие практики

1. Дизайн сервисов

  • Единственная ответственность: Каждый сервис должен иметь одну чёткую цель
  • Разделение интерфейсов: Держите контракты сфокусированными и минимальными
  • Инверсия зависимостей: Зависьте от контрактов, а не от реализаций

2. Обработка ошибок

  • Структурированные ошибки: Используйте RpcException с кодами и деталями
  • Изящная деградация: Обрабатывайте ошибки на соответствующих уровнях
  • Прерыватель цепи: Реализуйте таймауты и повторы

3. Производительность

  • Выбор транспорта: Выбирайте правильный транспорт для каждого случая использования
  • Потоковая передача: Используйте потоки для больших наборов данных и обновлений в реальном времени
  • Кеширование: Реализуйте кеширование там, где это уместно

4. Тестирование

  • Модульные тесты: Тестируйте контракты и реализации отдельно
  • Интеграционные тесты: Тестируйте с реальными транспортами
  • Мок сервисы: Используйте InMemory транспорт для быстрых, изолированных тестов

Архитектура RPC Dart обеспечивает гибкость, производительность и поддерживаемость, сохраняя управляемую сложность. Понимая эти паттерны и принципы, вы можете создавать надёжные, масштабируемые приложения, которые легко тестировать и поддерживать.