InMemory transport¶
The InMemory transport is RPC Dart's highest performance transport, designed for communication within a single process. It provides zero-copy object transfer, making it ideal for high-throughput applications, testing, and scenarios where maximum performance is required.
Overview¶
- Zero-copy performance – objects are passed by reference without serialization overhead, providing maximum throughput for large data transfers.
- Type safety – full Dart type information is preserved across RPC boundaries, maintaining compile-time guarantees.
- Perfect for testing – ideal for unit and integration tests where you need fast, predictable communication.
- Memory efficient – no data duplication or serialization means minimal memory footprint.
Zero-copy object transfer¶
class LargeDataSet {
final List<ComplexObject> items = List.generate(
100000,
(i) => ComplexObject(id: i, data: List.filled(1000, i)),
);
}
// With InMemory transport, this entire object is passed by reference
// No serialization, no copying, just a direct reference transfer!
final result = await service.processLargeDataSet(LargeDataSet());
Type preservation¶
Unlike network transports that require serialization, InMemory transport preserves full Dart type information:
// Complex types work seamlessly
class User {
final String id;
final DateTime createdAt;
final Set<String> permissions;
final Map<String, dynamic> metadata;
User({
required this.id,
required this.createdAt,
required this.permissions,
required this.metadata,
});
}
// Types are preserved exactly as they are
final user = await userService.getUser('123');
// user is exactly User type, not a Map or reconstructed object
Usage¶
Creating transport pairs¶
InMemory transport works with transport pairs—a connected client and server transport:
// Create a pair of connected transports
final (clientTransport, serverTransport) = RpcInMemoryTransport.pair();
// Setup server endpoint
final responder = RpcResponderEndpoint(transport: serverTransport);
responder.registerServiceContract(CalculatorResponder());
responder.start();
// Setup client endpoint
final caller = RpcCallerEndpoint(transport: clientTransport);
final calculator = CalculatorCaller(caller);
// Make RPC calls
final result = await calculator.add(AddRequest(10, 5));
print('Result: ${result.result}'); // Result: 15
Complete example¶
Contract¶
// calculator_contract.dart
abstract interface class ICalculatorContract {
static const name = 'Calculator';
static const methodAdd = 'add';
static const methodSubtract = 'subtract';
static const methodMultiply = 'multiply';
static const methodDivide = 'divide';
}
class MathRequest {
final double a;
final double b;
MathRequest(this.a, this.b);
}
class MathResponse {
final double result;
MathResponse(this.result);
}
Responder¶
// calculator_responder.dart
import 'package:rpc_dart/rpc_dart.dart';
class CalculatorResponder extends RpcResponderContract {
CalculatorResponder() : super(ICalculatorContract.name);
@override
void setup() {
addUnaryMethod<MathRequest, MathResponse>(
methodName: ICalculatorContract.methodAdd,
handler: _add,
);
addUnaryMethod<MathRequest, MathResponse>(
methodName: ICalculatorContract.methodSubtract,
handler: _subtract,
);
addUnaryMethod<MathRequest, MathResponse>(
methodName: ICalculatorContract.methodMultiply,
handler: _multiply,
);
addUnaryMethod<MathRequest, MathResponse>(
methodName: ICalculatorContract.methodDivide,
handler: _divide,
);
}
Future<MathResponse> _add(MathRequest request, {RpcContext? context}) async {
return MathResponse(request.a + request.b);
}
Future<MathResponse> _subtract(MathRequest request, {RpcContext? context}) async {
return MathResponse(request.a - request.b);
}
Future<MathResponse> _multiply(MathRequest request, {RpcContext? context}) async {
return MathResponse(request.a * request.b);
}
Future<MathResponse> _divide(MathRequest request, {RpcContext? context}) async {
if (request.b == 0) {
throw RpcException(
code: 'DIVISION_BY_ZERO',
message: 'Cannot divide by zero',
details: {'operand': 'b', 'value': request.b},
);
}
return MathResponse(request.a / request.b);
}
}
Caller¶
// calculator_caller.dart
import 'package:rpc_dart/rpc_dart.dart';
class CalculatorCaller extends RpcCallerContract {
CalculatorCaller(RpcCallerEndpoint endpoint)
: super(ICalculatorContract.name, endpoint);
Future<MathResponse> add(MathRequest request) {
return callUnary<MathRequest, MathResponse>(
methodName: ICalculatorContract.methodAdd,
request: request,
);
}
Future<MathResponse> subtract(MathRequest request) {
return callUnary<MathRequest, MathResponse>(
methodName: ICalculatorContract.methodSubtract,
request: request,
);
}
Future<MathResponse> multiply(MathRequest request) {
return callUnary<MathRequest, MathResponse>(
methodName: ICalculatorContract.methodMultiply,
request: request,
);
}
Future<MathResponse> divide(MathRequest request) {
return callUnary<MathRequest, MathResponse>(
methodName: ICalculatorContract.methodDivide,
request: request,
);
}
}
Main¶
// main.dart
import 'package:rpc_dart/rpc_dart.dart';
void main() async {
// Create InMemory transport pair
final (clientTransport, serverTransport) = RpcInMemoryTransport.pair();
// Setup responder endpoint
final responder = RpcResponderEndpoint(transport: serverTransport);
responder.registerServiceContract(CalculatorResponder());
responder.start();
// Setup caller endpoint
final caller = RpcCallerEndpoint(transport: clientTransport);
final calculator = CalculatorCaller(caller);
try {
// Perform calculations
final sum = await calculator.add(MathRequest(10, 5));
print('10 + 5 = ${sum.result}');
final difference = await calculator.subtract(MathRequest(10, 3));
print('10 - 3 = ${difference.result}');
final product = await calculator.multiply(MathRequest(4, 6));
print('4 * 6 = ${product.result}');
final quotient = await calculator.divide(MathRequest(15, 3));
print('15 / 3 = ${quotient.result}');
// Test error handling
try {
await calculator.divide(MathRequest(10, 0));
} on RpcException catch (e) {
print('Error: ${e.code} - ${e.message}');
}
} finally {
await caller.close();
await responder.close();
}
}
Use cases¶
High-performance applications¶
class DataProcessor {
Future<ProcessedData> processLargeDataset(LargeDataset data) async {
// With InMemory transport, the entire dataset is passed by reference
// No serialization overhead for millions of records
return ProcessedData(
results: data.records.map((record) => processRecord(record)).toList(),
);
}
}
Testing and development¶
void main() {
group('Calculator Service Tests', () {
late RpcResponderEndpoint responder;
late RpcCallerEndpoint caller;
late CalculatorCaller calculator;
setUp(() async {
final (clientTransport, serverTransport) = RpcInMemoryTransport.pair();
responder = RpcResponderEndpoint(transport: serverTransport);
responder.registerServiceContract(CalculatorResponder());
responder.start();
caller = RpcCallerEndpoint(transport: clientTransport);
calculator = CalculatorCaller(caller);
});
tearDown(() async {
await caller.close();
await responder.close();
});
test('should add numbers correctly', () async {
final result = await calculator.add(MathRequest(2, 3));
expect(result.result, equals(5));
});
test('should handle division by zero', () async {
expect(
() => calculator.divide(MathRequest(10, 0)),
throwsA(isA<RpcException>()),
);
});
});
}
Monolithic applications¶
void main() async {
// Create transport pairs for different services
final (userClientTransport, userServerTransport) = RpcInMemoryTransport.pair();
final (orderClientTransport, orderServerTransport) = RpcInMemoryTransport.pair();
final (paymentClientTransport, paymentServerTransport) = RpcInMemoryTransport.pair();
// Setup all service responders
final userResponder = RpcResponderEndpoint(transport: userServerTransport);
userResponder.registerServiceContract(UserServiceResponder());
final orderResponder = RpcResponderEndpoint(transport: orderServerTransport);
orderResponder.registerServiceContract(OrderServiceResponder(
userService: UserServiceCaller(RpcCallerEndpoint(transport: userClientTransport)),
paymentService: PaymentServiceCaller(RpcCallerEndpoint(transport: paymentClientTransport)),
));
final paymentResponder = RpcResponderEndpoint(transport: paymentServerTransport);
paymentResponder.registerServiceContract(PaymentServiceResponder());
await Future.wait([
userResponder.start(),
orderResponder.start(),
paymentResponder.start(),
]);
}
Streaming support¶
Server streaming¶
// Responder
Stream<NumberData> getNumbers(NumberRequest request, {RpcContext? context}) async* {
for (int i = 1; i <= request.count; i++) {
yield NumberData(
value: i,
metadata: {'generated_at': DateTime.now()},
complexData: generateComplexData(i),
);
await Future.delayed(const Duration(milliseconds: 100));
}
}
// Caller
Stream<NumberData> getNumbers(NumberRequest request) {
return callServerStream<NumberRequest, NumberData>(
methodName: 'getNumbers',
request: request,
);
}
// Usage
await for (final numberData in calculator.getNumbers(NumberRequest(count: 10))) {
print('Number: ${numberData.value}, Complex: ${numberData.complexData}');
}
Client streaming¶
// Responder
Future<SumResult> sumNumbers(Stream<NumberData> numbers, {RpcContext? context}) async {
double sum = 0;
int count = 0;
await for (final numberData in numbers) {
sum += numberData.value;
count++;
}
return SumResult(total: sum, count: count);
}
// Caller
Future<SumResult> sumNumbers(Stream<NumberData> numbers) {
return callClientStream<NumberData, SumResult>(
methodName: 'sumNumbers',
requests: numbers,
);
}
Bidirectional streaming¶
// Responder
Stream<ProcessedData> processStream(Stream<RawData> input, {RpcContext? context}) async* {
await for (final rawData in input) {
final processed = ProcessedData(
id: rawData.id,
processedAt: DateTime.now(),
result: processComplexData(rawData),
);
yield processed;
}
}
// Caller
Stream<ProcessedData> processStream(Stream<RawData> input) {
return callBidirectionalStream<RawData, ProcessedData>(
methodName: 'processStream',
requests: input,
);
}
Performance characteristics¶
Benchmark results¶
| Operation | InMemory | HTTP | WebSocket |
|---|---|---|---|
| Simple RPC | ~0.01ms | ~5-10ms | ~2-5ms |
| Large object (1MB) | ~0.01ms | ~50-100ms | ~20-40ms |
| Streaming (1000 items) | ~5ms | ~1-2s | ~0.5-1s |
Memory usage¶
class LargeData {
final List<ComplexObject> items = List.generate(10000, (i) => ComplexObject(i));
}
// InMemory transport: ~0 additional memory (reference passing)
// Network transports: ~2x memory usage (original + serialized copy)
Limitations¶
Single process only¶
// ✅ This works - same process
final (client, server) = RpcInMemoryTransport.pair();
// ❌ This doesn't work - different processes
// Cannot share InMemory transport across process boundaries
No network communication¶
// ❌ Cannot use InMemory for microservices
// Use HTTP, WebSocket, or other network transports instead
// ✅ Use for monolithic applications
final (userClient, userServer) = RpcInMemoryTransport.pair();
final (orderClient, orderServer) = RpcInMemoryTransport.pair();
Object lifetime considerations¶
class MutableData {
List<String> items = [];
}
// ⚠️ Be careful with mutable objects
final data = MutableData();
data.items.add('initial');
final result = await service.processData(data);
// The original object might be modified by the service
Best practices¶
Use for development and testing¶
void main() {
group('Service Tests', () {
late ServiceCaller service;
setUp(() async {
final (client, server) = RpcInMemoryTransport.pair();
// Setup endpoints...
service = ServiceCaller(caller);
});
test('fast and reliable tests', () async {
final result = await service.performOperation(data);
expect(result, meets(criteria));
});
});
}
Use for high-performance scenarios¶
class ImageProcessor {
Future<ProcessedImage> processImage(RawImageData data) async {
// Large image data passed by reference - no copying overhead
return ProcessedImage(
processed: applyFilters(data),
metadata: generateMetadata(data),
);
}
}
Combine with other transports¶
class HybridApplication {
Future<void> setup() async {
// Internal high-performance communication
final (cacheClient, cacheServer) = RpcInMemoryTransport.pair();
setupCacheService(cacheServer);
// External API communication over HTTP/2
final httpTransport = await RpcHttp2CallerTransport.secureConnect(
host: 'api.external.com',
);
setupExternalApiClient(httpTransport);
}
}
Handle errors gracefully¶
try {
final result = await service.processData(data);
return result;
} on RpcException catch (e) {
logger.error('RPC Error: ${e.code} - ${e.message}');
return defaultResult;
} catch (e) {
logger.error('Unexpected error: $e');
rethrow;
}
Migration guide¶
From direct method calls¶
// Before: Direct method calls
class Calculator {
double add(double a, double b) => a + b;
}
final calc = Calculator();
final result = calc.add(10, 5);
// After: RPC with InMemory transport
abstract interface class ICalculatorContract {
static const name = 'Calculator';
static const methodAdd = 'add';
}
final (client, server) = RpcInMemoryTransport.pair();
// ... setup endpoints ...
final result = await calculator.add(MathRequest(10, 5));
To network transports¶
// Development with InMemory
final (client, server) = RpcInMemoryTransport.pair();
// Production with HTTP/2
final httpTransport = await RpcHttp2CallerTransport.secureConnect(
host: 'api.production.com',
);
// Same service code works with both!
final calculator = CalculatorCaller(RpcCallerEndpoint(transport: httpTransport));
The InMemory transport is the perfect choice when you need maximum performance within a single process, whether for testing, development, or high-performance monolithic applications. Its zero-copy nature makes it unmatched for scenarios where serialization overhead would be a bottleneck.