InMemory транспорт¶
InMemory транспорт - это самый производительный транспорт RPC Dart, предназначенный для коммуникации внутри одного процесса. Он обеспечивает передачу объектов без копирования, что делает его идеальным для высокопроизводительных приложений, тестирования и сценариев, где требуется максимальная производительность.
Обзор¶
-
Производительность без копирования – Объекты передаются по ссылке без накладных расходов на сериализацию, обеспечивая максимальную производительность для передачи больших данных.
-
Типобезопасность – Полная информация о типах Dart сохраняется через границы RPC, поддерживая безопасность на этапе компиляции.
-
Идеально для тестирования – Идеально подходит для модульных и интеграционных тестов, где нужна быстрая, предсказуемая коммуникация.
-
Эффективность памяти – Отсутствие дублирования данных или сериализации означает минимальный отпечаток в памяти.
Ключевые возможности¶
Передача объектов без копирования¶
Самое значительное преимущество InMemory транспорта - передача объектов без копирования:
class LargeDataSet {
final List<ComplexObject> items = List.generate(
100000,
(i) => ComplexObject(id: i, data: List.filled(1000, i)),
);
}
// С InMemory транспортом весь этот объект передаётся по ссылке
// Никакой сериализации, никакого копирования, только прямая передача ссылки!
final result = await service.processLargeDataSet(LargeDataSet());
Сохранение типов¶
В отличие от сетевых транспортов, которые требуют сериализации, InMemory транспорт сохраняет полную информацию о типах Dart:
// Сложные типы работают беспрепятственно
class User {
final String id;
final DateTime createdAt;
final Set<String> permissions;
final Map<String, dynamic> metadata;
User({
required this.id,
required this.createdAt,
required this.permissions,
required this.metadata,
});
}
// Типы сохраняются в точности такими, какие они есть
final user = await userService.getUser('123');
// user имеет точно тип User, а не Map или восстановленный объект
Использование¶
Создание пар транспортов¶
InMemory транспорт работает с парами транспортов - соединённым клиентским и серверным транспортом:
// Создаём пару соединённых транспортов
final (clientTransport, serverTransport) = RpcInMemoryTransport.pair();
// Настраиваем серверный эндпоинт
final responder = RpcResponderEndpoint(transport: serverTransport);
responder.registerServiceContract(CalculatorResponder());
responder.start();
// Настраиваем клиентский эндпоинт
final caller = RpcCallerEndpoint(transport: clientTransport);
final calculator = CalculatorCaller(caller);
// Делаем RPC вызовы
final result = await calculator.add(AddRequest(10, 5));
print('Результат: ${result.result}'); // Результат: 15
Полный пример¶
Вот полный пример, показывающий InMemory транспорт в действии:
Контракт¶
// calculator_contract.dart
abstract interface class ICalculatorContract {
static const name = 'Calculator';
static const methodAdd = 'add';
static const methodSubtract = 'subtract';
static const methodMultiply = 'multiply';
static const methodDivide = 'divide';
}
class MathRequest {
final double a;
final double b;
MathRequest(this.a, this.b);
}
class MathResponse {
final double result;
MathResponse(this.result);
}
Responder¶
// calculator_responder.dart
class CalculatorResponder extends RpcResponderContract {
CalculatorResponder() : super(ICalculatorContract.name);
@override
void setup() {
addUnaryMethod<MathRequest, MathResponse>(
methodName: ICalculatorContract.methodAdd,
handler: _add,
);
addUnaryMethod<MathRequest, MathResponse>(
methodName: ICalculatorContract.methodSubtract,
handler: _subtract,
);
addUnaryMethod<MathRequest, MathResponse>(
methodName: ICalculatorContract.methodMultiply,
handler: _multiply,
);
addUnaryMethod<MathRequest, MathResponse>(
methodName: ICalculatorContract.methodDivide,
handler: _divide,
);
}
Future<MathResponse> _add(MathRequest request, {RpcContext? context}) async {
return MathResponse(request.a + request.b);
}
Future<MathResponse> _subtract(MathRequest request, {RpcContext? context}) async {
return MathResponse(request.a - request.b);
}
Future<MathResponse> _multiply(MathRequest request, {RpcContext? context}) async {
return MathResponse(request.a * request.b);
}
Future<MathResponse> _divide(MathRequest request, {RpcContext? context}) async {
if (request.b == 0) {
throw RpcException(
code: 'DIVISION_BY_ZERO',
message: 'Нельзя делить на ноль',
details: {'operand': 'b', 'value': request.b},
);
}
return MathResponse(request.a / request.b);
}
}
Caller¶
// calculator_caller.dart
class CalculatorCaller extends RpcCallerContract {
CalculatorCaller(RpcCallerEndpoint endpoint)
: super(ICalculatorContract.name, endpoint);
Future<MathResponse> add(MathRequest request) {
return callUnary<MathRequest, MathResponse>(
methodName: ICalculatorContract.methodAdd,
request: request,
);
}
Future<MathResponse> subtract(MathRequest request) {
return callUnary<MathRequest, MathResponse>(
methodName: ICalculatorContract.methodSubtract,
request: request,
);
}
Future<MathResponse> multiply(MathRequest request) {
return callUnary<MathRequest, MathResponse>(
methodName: ICalculatorContract.methodMultiply,
request: request,
);
}
Future<MathResponse> divide(MathRequest request) {
return callUnary<MathRequest, MathResponse>(
methodName: ICalculatorContract.methodDivide,
request: request,
);
}
}
Main¶
// main.dart
void main() async {
// Создаём пару InMemory транспортов
final (clientTransport, serverTransport) = RpcInMemoryTransport.pair();
// Настраиваем responder эндпоинт
final responder = RpcResponderEndpoint(transport: serverTransport);
responder.registerServiceContract(CalculatorResponder());
responder.start();
// Настраиваем caller эндпоинт
final caller = RpcCallerEndpoint(transport: clientTransport);
final calculator = CalculatorCaller(caller);
try {
// Выполняем вычисления
final sum = await calculator.add(MathRequest(10, 5));
print('10 + 5 = ${sum.result}'); // 10 + 5 = 15
final difference = await calculator.subtract(MathRequest(10, 3));
print('10 - 3 = ${difference.result}'); // 10 - 3 = 7
final product = await calculator.multiply(MathRequest(4, 6));
print('4 * 6 = ${product.result}'); // 4 * 6 = 24
final quotient = await calculator.divide(MathRequest(15, 3));
print('15 / 3 = ${quotient.result}'); // 15 / 3 = 5
// Тестируем обработку ошибок
try {
await calculator.divide(MathRequest(10, 0));
} on RpcException catch (e) {
print('Ошибка: ${e.code} - ${e.message}');
// Ошибка: DIVISION_BY_ZERO - Нельзя делить на ноль
}
} finally {
// Очистка ресурсов
await caller.close();
await responder.close();
}
}
Случаи использования¶
1. Высокопроизводительные приложения¶
Для приложений, которым нужна максимальная производительность:
class DataProcessor {
Future<ProcessedData> processLargeDataset(LargeDataset data) async {
// С InMemory транспортом весь набор данных передаётся по ссылке
// Никаких накладных расходов на сериализацию для миллионов записей
return ProcessedData(
results: data.records.map((record) => processRecord(record)).toList(),
);
}
}
2. Тестирование и разработка¶
InMemory транспорт идеален для тестирования:
void main() {
group('Тесты сервиса калькулятора', () {
late RpcResponderEndpoint responder;
late RpcCallerEndpoint caller;
late CalculatorCaller calculator;
setUp(() async {
final (clientTransport, serverTransport) = RpcInMemoryTransport.pair();
responder = RpcResponderEndpoint(transport: serverTransport);
responder.registerServiceContract(CalculatorResponder());
responder.start();
caller = RpcCallerEndpoint(transport: clientTransport);
calculator = CalculatorCaller(caller);
});
tearDown(() async {
await caller.close();
await responder.close();
});
test('должен правильно складывать числа', () async {
final result = await calculator.add(MathRequest(2, 3));
expect(result.result, equals(5));
});
test('должен обрабатывать деление на ноль', () async {
expect(
() => calculator.divide(MathRequest(10, 0)),
throwsA(isA<RpcException>()),
);
});
});
}
3. Монолитные приложения¶
Для приложений, где все сервисы выполняются в одном процессе:
void main() async {
// Создаём пары транспортов для различных сервисов
final (userClientTransport, userServerTransport) = RpcInMemoryTransport.pair();
final (orderClientTransport, orderServerTransport) = RpcInMemoryTransport.pair();
final (paymentClientTransport, paymentServerTransport) = RpcInMemoryTransport.pair();
// Настраиваем все responder'ы сервисов
final userResponder = RpcResponderEndpoint(transport: userServerTransport);
userResponder.registerServiceContract(UserServiceResponder());
final orderResponder = RpcResponderEndpoint(transport: orderServerTransport);
orderResponder.registerServiceContract(OrderServiceResponder(
userService: UserServiceCaller(RpcCallerEndpoint(transport: userClientTransport)),
paymentService: PaymentServiceCaller(RpcCallerEndpoint(transport: paymentClientTransport)),
));
final paymentResponder = RpcResponderEndpoint(transport: paymentServerTransport);
paymentResponder.registerServiceContract(PaymentServiceResponder());
// Запускаем все сервисы
await Future.wait([
userResponder.start(),
orderResponder.start(),
paymentResponder.start(),
]);
// Сервисы теперь могут общаться без накладных расходов на сериализацию
}
Поддержка потоков¶
InMemory транспорт поддерживает все паттерны потоковой передачи RPC с производительностью без копирования:
Серверная потоковая передача¶
// Responder
Stream<NumberData> getNumbers(NumberRequest request, {RpcContext? context}) async* {
for (int i = 1; i <= request.count; i++) {
// Каждый объект NumberData передаётся по ссылке
yield NumberData(
value: i,
metadata: {'generated_at': DateTime.now()},
complexData: generateComplexData(i),
);
await Future.delayed(Duration(milliseconds: 100));
}
}
// Caller
Stream<NumberData> getNumbers(NumberRequest request) {
return callServerStream<NumberRequest, NumberData>(
methodName: 'getNumbers',
request: request,
);
}
// Использование
await for (final numberData in calculator.getNumbers(NumberRequest(count: 10))) {
// numberData сохраняет полную информацию о типах и сложные вложенные объекты
print('Число: ${numberData.value}, Сложные данные: ${numberData.complexData}');
}
Клиентская потоковая передача¶
// Responder
Future<SumResult> sumNumbers(Stream<NumberData> numbers, {RpcContext? context}) async {
double sum = 0;
int count = 0;
await for (final numberData in numbers) {
// Каждый объект numberData приходит с полной информацией о типах
sum += numberData.value;
count++;
}
return SumResult(total: sum, count: count);
}
// Caller
Future<SumResult> sumNumbers(Stream<NumberData> numbers) {
return callClientStream<NumberData, SumResult>(
methodName: 'sumNumbers',
requests: numbers,
);
}
Двунаправленная потоковая передача¶
// Responder
Stream<ProcessedData> processStream(Stream<RawData> input, {RpcContext? context}) async* {
await for (final rawData in input) {
// Обрабатываем сырые данные (объекты передаются по ссылке)
final processed = ProcessedData(
id: rawData.id,
processedAt: DateTime.now(),
result: processComplexData(rawData),
);
yield processed;
}
}
// Caller
Stream<ProcessedData> processStream(Stream<RawData> input) {
return callBidirectionalStream<RawData, ProcessedData>(
methodName: 'processStream',
requests: input,
);
}
Характеристики производительности¶
Результаты бенчмарков¶
InMemory транспорт обеспечивает исключительную производительность:
| Операция | InMemory | HTTP | WebSocket |
|---|---|---|---|
| Простой RPC | ~0.01мс | ~5-10мс | ~2-5мс |
| Большой объект (1МБ) | ~0.01мс | ~50-100мс | ~20-40мс |
| Потоки (1000 элементов) | ~5мс | ~1-2с | ~0.5-1с |
Использование памяти¶
// Сравнение передачи больших объектов
class LargeData {
final List<ComplexObject> items = List.generate(10000, (i) => ComplexObject(i));
}
// InMemory транспорт: ~0 дополнительной памяти (передача ссылок)
// Сетевые транспорты: ~2x использование памяти (оригинал + сериализованная копия)
Ограничения¶
Только один процесс¶
InMemory транспорт работает только внутри одного процесса Dart:
// ✅ Это работает - тот же процесс
final (client, server) = RpcInMemoryTransport.pair();
// ❌ Это не работает - разные процессы
// Нельзя использовать InMemory транспорт через границы процессов
Нет сетевой коммуникации¶
Нельзя использовать для распределённых систем:
// ❌ Нельзя использовать InMemory для микросервисов
// Используйте HTTP, WebSocket или другие сетевые транспорты
// ✅ Используйте для монолитных приложений
final (userClient, userServer) = RpcInMemoryTransport.pair();
final (orderClient, orderServer) = RpcInMemoryTransport.pair();
Соображения времени жизни объектов¶
Объекты, передаваемые через InMemory транспорт, разделяют ссылки:
class MutableData {
List<String> items = [];
}
// ⚠️ Будьте осторожны с изменяемыми объектами
final data = MutableData();
data.items.add('initial');
final result = await service.processData(data);
// Исходный объект может быть изменён сервисом
// Это одновременно преимущество (производительность) и соображение (побочные эффекты)
Лучшие практики¶
1. Используйте для разработки и тестирования¶
// Идеально для модульных тестов
void main() {
group('Тесты сервиса', () {
late ServiceCaller service;
setUp(() async {
final (client, server) = RpcInMemoryTransport.pair();
// Настройка эндпоинтов...
service = ServiceCaller(caller);
});
test('быстрые и надёжные тесты', () async {
final result = await service.performOperation(data);
expect(result, meets(criteria));
});
});
}
2. Используйте для высокопроизводительных сценариев¶
// Идеально для CPU-интенсивной обработки
class ImageProcessor {
Future<ProcessedImage> processImage(RawImageData data) async {
// Большие данные изображения передаются по ссылке - без накладных расходов на копирование
return ProcessedImage(
processed: applyFilters(data),
metadata: generateMetadata(data),
);
}
}
3. Комбинируйте с другими транспортами¶
// Используйте InMemory для внутренних сервисов, сетевые транспорты для внешних
class HybridApplication {
void setup() {
// Внутренняя высокопроизводительная коммуникация
final (cacheClient, cacheServer) = RpcInMemoryTransport.pair();
setupCacheService(cacheServer);
// Внешняя API коммуникация через HTTP/2
final httpTransport = await RpcHttp2CallerTransport.secureConnect(
host: 'api.external.com',
);
setupExternalApiClient(httpTransport);
}
}
4. Изящно обрабатывайте ошибки¶
try {
final result = await service.processData(data);
return result;
} on RpcException catch (e) {
// Обрабатываем RPC-специфичные ошибки
logger.error('RPC ошибка: ${e.code} - ${e.message}');
return defaultResult;
} catch (e) {
// Обрабатываем другие ошибки
logger.error('Неожиданная ошибка: $e');
rethrow;
}
Руководство по миграции¶
От прямых вызовов методов¶
// До: Прямые вызовы методов
class Calculator {
double add(double a, double b) => a + b;
}
final calc = Calculator();
final result = calc.add(10, 5);
// После: RPC с InMemory транспортом
abstract interface class ICalculatorContract {
static const name = 'Calculator';
static const methodAdd = 'add';
}
// Настройка один раз
final (client, server) = RpcInMemoryTransport.pair();
// ... настройка эндпоинтов ...
// Использование везде
final result = await calculator.add(MathRequest(10, 5));
К сетевым транспортам¶
Когда нужно масштабироваться за пределы одного процесса:
// Разработка с InMemory
final (client, server) = RpcInMemoryTransport.pair();
// Продакшн с HTTP/2
final httpTransport = await RpcHttp2CallerTransport.secureConnect(
host: 'api.production.com',
);
// Тот же код сервиса работает с обоими!
final calculator = CalculatorCaller(RpcCallerEndpoint(transport: httpTransport));
InMemory транспорт - это идеальный выбор, когда вам нужна максимальная производительность внутри одного процесса, будь то тестирование, разработка или высокопроизводительные монолитные приложения. Его природа без копирования делает его непревзойдённым для сценариев, где накладные расходы на сериализацию были бы узким местом.