Перейти к содержимому

Архитектура

RPC Dart следует слоистой архитектуре, которая разделяет ответственности и обеспечивает гибкость в том, как компоненты взаимодействуют друг с другом. Понимание этой архитектуры поможет вам создавать лучшие приложения и принимать обоснованные решения о выборе транспорта и дизайне сервисов.

Архитектура RPC Dart состоит из нескольких слоёв, каждый из которых имеет специфические обязанности:

Слой приложения

Ваша бизнес-логика и реализации сервисов, которые используют RPC контракты.

RPC слой

Контракты, вызывающие, отвечающие и логика маршрутизации.

Слой транспорта

Сетевая коммуникация, сериализация и доставка сообщений.

Основной слой

Базовые классы, утилиты и основы фреймворка.

┌─────────────────────────────────────────┐
│ Слой приложения │
│ ┌─────────────┐ ┌─────────────┐ │
│ │ Сервис │ │ Клиентское │ │
│ │ A │ │ приложение │ │
│ └─────────────┘ └─────────────┘ │
└─────────────────────────────────────────┘
┌─────────────────────────────────────────┐
│ RPC слой │
│ ┌─────────────┐ ┌─────────────┐ │
│ │ Responder │ │ Caller │ │
│ │ Endpoint │ │ Endpoint │ │
│ └─────────────┘ └─────────────┘ │
│ ┌─────────────────────────────────┐ │
│ │ Контракты │ │
│ └─────────────────────────────────┘ │
└─────────────────────────────────────────┘
┌─────────────────────────────────────────┐
│ Слой транспорта │
│ ┌─────────┐ ┌─────────┐ ┌─────────┐ │
│ │InMemory │ │ HTTP │ │WebSocket│ │
│ └─────────┘ └─────────┘ └─────────┘ │
└─────────────────────────────────────────┘
┌─────────────────────────────────────────┐
│ Основной слой │
│ ┌─────────────────────────────────┐ │
│ │ Базовые классы и утилиты │ │
│ └─────────────────────────────────┘ │
└─────────────────────────────────────────┘

Контракты определяют интерфейс между сервисами. Они указывают:

  • Имена сервисов и методов
  • Типы запросов и ответов
  • Поддерживаемые паттерны RPC (унарные, потоковые)
abstract interface class IUserServiceContract {
static const name = 'UserService';
static const methodGetUser = 'getUser';
static const methodUpdateUser = 'updateUser';
static const methodStreamNotifications = 'streamNotifications';
}

Обрабатывает входящие RPC запросы:

final responder = RpcResponderEndpoint(transport: serverTransport);
// Регистрируем реализации сервисов
responder.registerServiceContract(UserServiceResponder());
responder.registerServiceContract(PaymentServiceResponder());
// Начинаем обработку запросов
responder.start();

Инициирует RPC вызовы к удалённым сервисам:

final caller = RpcCallerEndpoint(transport: clientTransport);
// Создаём клиенты сервисов
final userService = UserServiceCaller(caller);
final paymentService = PaymentServiceCaller(caller);
// Делаем RPC вызовы
final user = await userService.getUser(userId);

Слой транспорта обрабатывает доставку сообщений между эндпоинтами:

abstract class RpcTransport {
Stream<RpcMessage> get messageStream;
Future<void> sendMessage(RpcMessage message);
Future<void> close();
}

Эта абстракция позволяет переключаться между различными реализациями транспорта без изменения кода приложения.

Сервисы определяются с использованием контрактов, которые задают интерфейс:

// Определение контракта
abstract interface class ICalculatorContract {
static const name = 'Calculator';
static const methodAdd = 'add';
static const methodSubtract = 'subtract';
}
// Реализация responder'а
class CalculatorResponder extends RpcResponderContract {
CalculatorResponder() : super(ICalculatorContract.name);
@override
void setup() {
addUnaryMethod<AddRequest, AddResponse>(
methodName: ICalculatorContract.methodAdd,
handler: _add,
);
}
Future<AddResponse> _add(AddRequest request, {RpcContext? context}) async {
return AddResponse(request.a + request.b);
}
}
// Реализация caller'а
class CalculatorCaller extends RpcCallerContract {
CalculatorCaller(RpcCallerEndpoint endpoint)
: super(ICalculatorContract.name, endpoint);
Future<AddResponse> add(AddRequest request) {
return callUnary<AddRequest, AddResponse>(
methodName: ICalculatorContract.methodAdd,
request: request,
);
}
}

Для сложных приложений вы можете маршрутизировать различные сервисы через разные транспорты:

final router = RpcTransportRouter();
// Маршрутизируем UserService через HTTP
router.addRoute(
serviceName: 'UserService',
transport: httpTransport,
);
// Маршрутизируем NotificationService через WebSocket
router.addRoute(
serviceName: 'NotificationService',
transport: webSocketTransport,
);
// Маршрутизируем внутренние сервисы через InMemory
router.addRoute(
serviceName: 'CacheService',
transport: inMemoryTransport,
);
final caller = RpcCallerEndpoint(transport: router);

Реализуйте сквозные задачи с помощью middleware:

class AuthenticationMiddleware extends RpcMiddleware {
@override
Future<T> call<T>(RpcCall call, RpcNext next) async {
// Проверяем аутентификацию
final token = call.context?.metadata['auth-token'];
if (token == null || !isValidToken(token)) {
throw RpcException(code: 'UNAUTHORIZED');
}
return next(call);
}
}
class LoggingMiddleware extends RpcMiddleware {
@override
Future<T> call<T>(RpcCall call, RpcNext next) async {
final start = DateTime.now();
try {
final result = await next(call);
final duration = DateTime.now().difference(start);
log('${call.service}.${call.method} завершён за ${duration.inMilliseconds}мс');
return result;
} catch (e) {
log('${call.service}.${call.method} неудачно: $e');
rethrow;
}
}
}
// Регистрируем middleware
responder.addMiddleware(AuthenticationMiddleware());
responder.addMiddleware(LoggingMiddleware());
Клиент Транспорт Сервер
│ │ │
│ 1. callUnary() │ │
├─────────────────────────▶│ │
│ │ 2. RpcMessage │
│ ├─────────────────────────▶│
│ │ │ 3. processRequest()
│ │ ├──────────────┐
│ │ │ │
│ │ │◀─────────────┘
│ │ 4. RpcMessage │
│ │◀─────────────────────────┤
│ 5. Response │ │
│◀─────────────────────────┤ │
Клиент Транспорт Сервер
│ │ │
│ 1. callServerStream() │ │
├─────────────────────────▶│ │
│ │ 2. RpcMessage │
│ ├─────────────────────────▶│
│ │ │ 3. processStreamRequest()
│ │ ├──────────────┐
│ │ 4. Stream messages │ │
│ │◀─────────────────────────┤ │
│ 5. Stream<Response> │ │ │
│◀─────────────────────────┤ │ │
│ │ │◀─────────────┘

RPC Dart реализует структурированную систему обработки ошибок:

abstract class RpcException implements Exception {
final String code;
final String message;
final Map<String, dynamic>? details;
}
class RpcTimeoutException extends RpcException {
RpcTimeoutException(String message, Duration timeout);
}
class RpcCancelledException extends RpcException {
RpcCancelledException(String message);
}
class RpcTransportException extends RpcException {
RpcTransportException(String message, Exception cause);
}

Ошибки автоматически сериализуются и распространяются через границы транспорта:

// Серверная сторона
Future<UserResponse> getUser(UserRequest request) async {
if (request.userId.isEmpty) {
throw RpcException(
code: 'INVALID_USER_ID',
message: 'ID пользователя не может быть пустым',
details: {'field': 'userId'},
);
}
// ... реализация
}
// Клиентская сторона
try {
final user = await userService.getUser(UserRequest(userId: ''));
} on RpcException catch (e) {
// Обрабатываем структурированные RPC ошибки
print('Ошибка: ${e.code} - ${e.message}');
if (e.details != null) {
print('Ошибка поля: ${e.details!['field']}');
}
}

Для одно-процессных приложений InMemory транспорт обеспечивает максимальную производительность:

class LargeDataService extends RpcResponderContract {
@override
void setup() {
addUnaryMethod<LargeData, ProcessedData>(
methodName: 'processLargeData',
handler: _processLargeData,
);
}
Future<ProcessedData> _processLargeData(LargeData data) async {
// Прямой доступ к объекту - без накладных расходов на сериализацию
return ProcessedData(data.processDirectly());
}
}

Сетевые транспорты автоматически обрабатывают сериализацию:

// Использует CBOR кодирование для эффективной сериализации
final httpTransport = RpcHttpTransport(
url: 'https://api.example.com',
codec: CborCodec(), // Эффективное бинарное кодирование
);

HTTP транспорт включает пулинг соединений:

final httpTransport = RpcHttpTransport(
url: 'https://api.example.com',
maxConnections: 10,
keepAlive: true,
);

Разбивайте большие приложения на сфокусированные сервисы:

// Сервис управления пользователями
abstract interface class IUserServiceContract {
static const name = 'UserService';
// Операции CRUD пользователей
}
// Сервис обработки платежей
abstract interface class IPaymentServiceContract {
static const name = 'PaymentService';
// Операции платежей
}
// Сервис уведомлений
abstract interface class INotificationServiceContract {
static const name = 'NotificationService';
// Уведомления в реальном времени
}

Используйте подходящие транспорты для разных типов сервисов:

// Сервисы реального времени используют WebSocket
final notificationService = NotificationServiceCaller(
RpcCallerEndpoint(transport: webSocketTransport)
);
// CRUD сервисы используют HTTP
final userService = UserServiceCaller(
RpcCallerEndpoint(transport: httpTransport)
);
// Внутренний кеш использует InMemory
final cacheService = CacheServiceCaller(
RpcCallerEndpoint(transport: inMemoryTransport)
);

Распределяйте нагрузку между несколькими экземплярами сервиса:

final loadBalancer = RpcLoadBalancingTransport([
RpcHttpTransport(url: 'https://api1.example.com'),
RpcHttpTransport(url: 'https://api2.example.com'),
RpcHttpTransport(url: 'https://api3.example.com'),
]);
final userService = UserServiceCaller(
RpcCallerEndpoint(transport: loadBalancer)
);

Архитектура RPC Dart делает тестирование простым:

class MockUserService extends RpcResponderContract {
MockUserService() : super(IUserServiceContract.name);
@override
void setup() {
addUnaryMethod<GetUserRequest, UserResponse>(
methodName: IUserServiceContract.methodGetUser,
handler: _getMockUser,
);
}
Future<UserResponse> _getMockUser(GetUserRequest request) async {
return UserResponse(
user: User(id: request.userId, name: 'Тестовый пользователь'),
);
}
}
void main() {
group('Интеграция UserService', () {
late RpcResponderEndpoint responder;
late RpcCallerEndpoint caller;
setUp(() async {
final (clientTransport, serverTransport) = RpcInMemoryTransport.pair();
responder = RpcResponderEndpoint(transport: serverTransport);
responder.registerServiceContract(MockUserService());
responder.start();
caller = RpcCallerEndpoint(transport: clientTransport);
});
tearDown(() async {
await caller.close();
await responder.close();
});
test('должен успешно получить пользователя', () async {
final userService = UserServiceCaller(caller);
final response = await userService.getUser(GetUserRequest(userId: '123'));
expect(response.user.id, equals('123'));
expect(response.user.name, equals('Тестовый пользователь'));
});
});
}
  • Единственная ответственность: Каждый сервис должен иметь одну чёткую цель
  • Разделение интерфейсов: Держите контракты сфокусированными и минимальными
  • Инверсия зависимостей: Зависьте от контрактов, а не от реализаций
  • Структурированные ошибки: Используйте RpcException с кодами и деталями
  • Изящная деградация: Обрабатывайте ошибки на соответствующих уровнях
  • Прерыватель цепи: Реализуйте таймауты и повторы
  • Выбор транспорта: Выбирайте правильный транспорт для каждого случая использования
  • Потоковая передача: Используйте потоки для больших наборов данных и обновлений в реальном времени
  • Кеширование: Реализуйте кеширование там, где это уместно
  • Модульные тесты: Тестируйте контракты и реализации отдельно
  • Интеграционные тесты: Тестируйте с реальными транспортами
  • Мок сервисы: Используйте InMemory транспорт для быстрых, изолированных тестов

Архитектура RPC Dart обеспечивает гибкость, производительность и поддерживаемость, сохраняя управляемую сложность. Понимая эти паттерны и принципы, вы можете создавать надёжные, масштабируемые приложения, которые легко тестировать и поддерживать.