Skip to content

Isolate Transport

The Isolate Transport enables high-performance communication between Dart isolates with zero-copy object passing capabilities. It’s designed for CPU-intensive applications that need parallel processing while maintaining the performance benefits of direct object sharing.

Zero-Copy Object Passing

Direct object passing between isolates without serialization overhead, maintaining performance for large data structures.

Worker Process Support

Spawn dedicated worker isolates for CPU-intensive tasks while keeping the main isolate responsive.

Parallel Processing

Distribute workload across multiple isolates to take advantage of multi-core systems efficiently.

Type Safety

Full Dart type information is preserved across isolate boundaries, maintaining compile-time safety.

Unlike traditional isolate communication that requires message passing, Isolate transport can pass objects directly:

class LargeDataProcessor {
// Process massive datasets without copying
Future<ProcessedData> processLargeDataset(LargeDataset dataset) async {
// The entire dataset is passed by reference to the worker isolate
// No serialization, no copying - just direct object access
return ProcessedData(
results: dataset.records.map((record) => processRecord(record)).toList(),
metadata: {
'processed_at': DateTime.now(),
'record_count': dataset.records.length,
'processing_time_ms': processingTime.inMilliseconds,
},
);
}
}

Automatically manage worker isolates for parallel processing:

// Create a pool of worker isolates
final transport = RpcIsolateTransport.workerPool(
workerCount: Platform.numberOfProcessors,
workerScript: 'worker_isolate.dart',
options: IsolateTransportOptions(
enableZeroCopy: true,
maxMemoryUsage: 512 * 1024 * 1024, // 512MB per worker
idleTimeout: Duration(minutes: 5),
),
);
final caller = RpcCallerEndpoint(transport: transport);
final processor = DataProcessorCaller(caller);
// Work is automatically distributed across available workers
final futures = List.generate(100, (i) async {
return await processor.processChunk(DataChunk(id: i, data: generateData(i)));
});
final results = await Future.wait(futures);
// All 100 chunks processed in parallel across worker isolates

Create a dedicated worker script for CPU-intensive operations:

worker_isolate.dart
import 'dart:isolate';
import 'package:rpc_dart/rpc_dart.dart';
import 'package:rpc_dart_transports/rpc_dart_transports.dart';
void main(List<String> args, SendPort sendPort) async {
// Create isolate transport for worker
final transport = RpcIsolateWorkerTransport(sendPort);
// Create responder endpoint
final responder = RpcResponderEndpoint(transport: transport);
// Register worker services
responder.registerServiceContract(DataProcessorResponder());
responder.registerServiceContract(ImageProcessorResponder());
responder.registerServiceContract(ComputationResponder());
// Start the worker
await responder.start();
print('Worker isolate started with PID: ${Isolate.current.debugName}');
}
// CPU-intensive data processing service
class DataProcessorResponder extends RpcResponderContract {
DataProcessorResponder() : super('DataProcessor');
@override
void setup() {
addUnaryMethod<ProcessDataRequest, ProcessDataResponse>(
methodName: 'processData',
handler: _processData,
);
addUnaryMethod<ComputeRequest, ComputeResponse>(
methodName: 'compute',
handler: _compute,
);
addServerStreamMethod<BatchRequest, BatchResult>(
methodName: 'processBatch',
handler: _processBatch,
);
}
Future<ProcessDataResponse> _processData(ProcessDataRequest request, {RpcContext? context}) async {
// CPU-intensive processing without blocking main isolate
final results = <ProcessedItem>[];
for (final item in request.items) {
// Simulate heavy computation
final processed = await _performHeavyComputation(item);
results.add(processed);
}
return ProcessDataResponse(
results: results,
processingTimeMs: DateTime.now().difference(request.startTime).inMilliseconds,
workerId: Isolate.current.debugName ?? 'unknown',
);
}
Future<ComputeResponse> _compute(ComputeRequest request, {RpcContext? context}) async {
// Mathematical computations
final result = await _performMathematicalComputation(
request.operation,
request.parameters,
);
return ComputeResponse(
result: result,
metadata: {
'computation_type': request.operation,
'parameter_count': request.parameters.length,
'worker_id': Isolate.current.debugName,
},
);
}
Stream<BatchResult> _processBatch(BatchRequest request, {RpcContext? context}) async* {
final batchSize = request.batchSize ?? 10;
for (int i = 0; i < request.items.length; i += batchSize) {
final batch = request.items.skip(i).take(batchSize).toList();
final processed = await _processBatchItems(batch);
yield BatchResult(
batchIndex: i ~/ batchSize,
results: processed,
progress: (i + batch.length) / request.items.length,
);
}
}
Future<ProcessedItem> _performHeavyComputation(DataItem item) async {
// Simulate CPU-intensive work
await Future.delayed(Duration(milliseconds: 50));
return ProcessedItem(
id: item.id,
result: item.value * 2, // Simplified computation
metadata: {
'processed_by': Isolate.current.debugName,
'computation_time': 50,
},
);
}
}

Process images in parallel without blocking the UI:

class ImageProcessingPipeline {
late RpcCallerEndpoint _caller;
late ImageProcessorCaller _imageProcessor;
Future<void> initializeWorkers() async {
final transport = RpcIsolateTransport.workerPool(
workerCount: 4, // 4 worker isolates for image processing
workerScript: 'image_worker.dart',
options: IsolateTransportOptions(
enableZeroCopy: true,
maxMemoryUsage: 1024 * 1024 * 1024, // 1GB per worker for images
),
);
_caller = RpcCallerEndpoint(transport: transport);
_imageProcessor = ImageProcessorCaller(_caller);
}
Future<List<ProcessedImage>> processImageBatch(List<RawImage> images) async {
print('Processing ${images.length} images in parallel...');
// Process all images concurrently across workers
final futures = images.map((image) async {
return await _imageProcessor.processImage(ProcessImageRequest(
image: image,
filters: [
ImageFilter.resize(width: 800, height: 600),
ImageFilter.sharpen(intensity: 0.5),
ImageFilter.colorCorrection(brightness: 1.1),
],
outputFormat: ImageFormat.jpeg,
quality: 85,
));
}).toList();
final results = await Future.wait(futures);
print('Processed ${results.length} images');
return results.map((r) => r.processedImage).toList();
}
Stream<BatchProcessingUpdate> processImageDirectory(String directoryPath) async* {
final imageFiles = Directory(directoryPath).listSync()
.where((f) => f.path.endsWith('.jpg') || f.path.endsWith('.png'))
.toList();
const batchSize = 10;
int processed = 0;
for (int i = 0; i < imageFiles.length; i += batchSize) {
final batch = imageFiles.skip(i).take(batchSize).toList();
final images = await Future.wait(
batch.map((file) => loadImageFromFile(file.path)),
);
final processedImages = await processImageBatch(images);
processed += processedImages.length;
yield BatchProcessingUpdate(
processedCount: processed,
totalCount: imageFiles.length,
progress: processed / imageFiles.length,
currentBatch: processedImages,
);
}
}
}

Perform heavy computational work in background isolates:

class DataAnalytics {
late RpcCallerEndpoint _caller;
late AnalyticsProcessorCaller _analyticsProcessor;
Future<void> initializeAnalyticsCluster() async {
final transport = RpcIsolateTransport.workerPool(
workerCount: Platform.numberOfProcessors,
workerScript: 'analytics_worker.dart',
options: IsolateTransportOptions(
enableZeroCopy: true,
loadBalancing: LoadBalancingStrategy.leastLoaded,
// Analytics-specific settings
memoryManagement: MemoryManagement(
maxMemoryUsage: 2 * 1024 * 1024 * 1024, // 2GB per worker
gcTriggerThreshold: 0.8,
enableMemoryProfiling: true,
),
performanceMonitoring: PerformanceMonitoring(
enableCpuProfiling: true,
enableLatencyTracking: true,
reportInterval: Duration(seconds: 30),
),
),
);
_caller = RpcCallerEndpoint(transport: transport);
_analyticsProcessor = AnalyticsProcessorCaller(_caller);
}
Future<AnalyticsReport> generateReport(AnalyticsDataset dataset) async {
print('Generating analytics report for ${dataset.records.length} records...');
// Distribute different types of analysis across workers
final futures = [
_analyticsProcessor.calculateStatistics(StatisticsRequest(
data: dataset.numericalData,
operations: [
StatisticalOperation.mean,
StatisticalOperation.median,
StatisticalOperation.standardDeviation,
StatisticalOperation.percentiles,
],
)),
_analyticsProcessor.performTimeSeriesAnalysis(TimeSeriesRequest(
timeSeries: dataset.timeSeriesData,
analysisTypes: [
TimeSeriesAnalysis.trend,
TimeSeriesAnalysis.seasonality,
TimeSeriesAnalysis.correlation,
],
)),
_analyticsProcessor.runMachineLearning(MLRequest(
features: dataset.features,
labels: dataset.labels,
algorithms: [
MLAlgorithm.linearRegression,
MLAlgorithm.randomForest,
MLAlgorithm.clustering,
],
)),
];
final results = await Future.wait(futures);
return AnalyticsReport(
statistics: results[0] as StatisticsResult,
timeSeriesAnalysis: results[1] as TimeSeriesResult,
machineLearningResults: results[2] as MLResult,
generatedAt: DateTime.now(),
processingMetrics: await _getProcessingMetrics(),
);
}
Stream<RealtimeAnalytics> startRealtimeAnalysis(Stream<DataPoint> dataStream) async* {
final controller = StreamController<AnalyzeDataPointRequest>();
// Send real-time data to analytics workers
dataStream.listen((dataPoint) {
controller.add(AnalyzeDataPointRequest(
dataPoint: dataPoint,
analysisConfig: RealtimeAnalysisConfig(
enableAnomalyDetection: true,
enableTrendAnalysis: true,
windowSize: Duration(minutes: 5),
),
));
});
// Get real-time results
await for (final result in _analyticsProcessor.analyzeRealtimeStream(controller.stream)) {
yield RealtimeAnalytics(
analysis: result,
timestamp: DateTime.now(),
workerId: result.workerId,
);
}
}
}

High-performance scientific computations:

class ScientificComputing {
late RpcCallerEndpoint _caller;
late ComputationEngineCaller _computationEngine;
Future<void> initializeComputeCluster() async {
final transport = RpcIsolateTransport.workerPool(
workerCount: Platform.numberOfProcessors * 2, // Over-provision for CPU-bound work
workerScript: 'scientific_worker.dart',
options: IsolateTransportOptions(
enableZeroCopy: true,
// Optimize for scientific computing
computeOptimization: ComputeOptimization(
enableVectorization: true,
enableParallelMath: true,
floatingPointPrecision: FloatingPointPrecision.double,
),
// Memory management for large datasets
memoryManagement: MemoryManagement(
maxMemoryUsage: 4 * 1024 * 1024 * 1024, // 4GB per worker
enableLargeObjectHeap: true,
gcStrategy: GCStrategy.throughput,
),
),
);
_caller = RpcCallerEndpoint(transport: transport);
_computationEngine = ComputationEngineCaller(_caller);
}
Future<SimulationResult> runMolecularDynamicsSimulation(MDSimulationRequest request) async {
print('Starting molecular dynamics simulation with ${request.particles.length} particles...');
// Distribute simulation steps across workers
final timeSteps = request.totalTime ~/ request.timeStep;
final results = <MDStepResult>[];
for (int step = 0; step < timeSteps; step += 100) {
final stepBatch = math.min(100, timeSteps - step);
final batchFutures = List.generate(stepBatch, (i) async {
return await _computationEngine.simulateTimeStep(MDStepRequest(
particles: step == 0 ? request.particles : results.last.finalState,
stepNumber: step + i,
timeStep: request.timeStep,
forces: request.forces,
boundary: request.boundaryConditions,
));
});
final batchResults = await Future.wait(batchFutures);
results.addAll(batchResults);
// Report progress
final progress = (step + stepBatch) / timeSteps;
print('Simulation progress: ${(progress * 100).toStringAsFixed(1)}%');
}
return SimulationResult(
steps: results,
totalEnergy: results.last.totalEnergy,
finalState: results.last.finalState,
computationTime: results.fold(Duration.zero, (sum, r) => sum + r.computationTime),
);
}
Future<MatrixOperationResult> performLargeMatrixOperations(List<Matrix> matrices) async {
// Distribute matrix operations across workers
final operations = [
MatrixOperation.multiply,
MatrixOperation.invert,
MatrixOperation.eigenvalues,
MatrixOperation.singularValueDecomposition,
];
final futures = <Future>[];
for (final matrix in matrices) {
for (final operation in operations) {
futures.add(_computationEngine.performMatrixOperation(MatrixRequest(
matrix: matrix,
operation: operation,
precision: NumericalPrecision.high,
)));
}
}
final results = await Future.wait(futures);
return MatrixOperationResult(
results: results.cast<MatrixResult>(),
totalComputationTime: results.fold(
Duration.zero,
(sum, r) => sum + (r as MatrixResult).computationTime,
),
);
}
}

Optimize memory usage for large-scale processing:

class OptimizedIsolateTransport {
static RpcIsolateTransport createOptimized({
required int workerCount,
required String workerScript,
}) {
return RpcIsolateTransport.workerPool(
workerCount: workerCount,
workerScript: workerScript,
options: IsolateTransportOptions(
enableZeroCopy: true,
// Memory optimization
memoryManagement: MemoryManagement(
maxMemoryUsage: _calculateOptimalMemoryPerWorker(),
gcTriggerThreshold: 0.7,
enableMemoryProfiling: true,
enableLargeObjectHeap: true,
gcStrategy: GCStrategy.lowLatency,
),
// Performance monitoring
performanceMonitoring: PerformanceMonitoring(
enableCpuProfiling: true,
enableMemoryTracking: true,
enableLatencyTracking: true,
reportInterval: Duration(seconds: 30),
onPerformanceReport: _handlePerformanceReport,
),
// Load balancing
loadBalancing: LoadBalancingStrategy.adaptive,
healthCheckInterval: Duration(seconds: 10),
// Worker lifecycle
workerLifecycle: WorkerLifecycle(
idleTimeout: Duration(minutes: 5),
maxWorkersPerCore: 2,
enableWorkerRecycling: true,
recycleAfterOperations: 10000,
),
),
);
}
static int _calculateOptimalMemoryPerWorker() {
final totalMemory = Platform.operatingSystemVersion.contains('memory')
? _parseMemoryFromOS()
: 8 * 1024 * 1024 * 1024; // Default 8GB
final workerCount = Platform.numberOfProcessors;
return (totalMemory * 0.7) ~/ workerCount; // Use 70% of available memory
}
static void _handlePerformanceReport(PerformanceReport report) {
print('Performance Report:');
print(' CPU Usage: ${report.cpuUsage.toStringAsFixed(1)}%');
print(' Memory Usage: ${report.memoryUsage ~/ (1024 * 1024)}MB');
print(' Average Latency: ${report.averageLatency.inMilliseconds}ms');
print(' Throughput: ${report.operationsPerSecond.toStringAsFixed(0)} ops/sec');
// Trigger optimizations based on metrics
if (report.memoryUsage > report.maxMemoryUsage * 0.9) {
print('Warning: High memory usage, triggering GC');
// Trigger garbage collection
}
if (report.averageLatency > Duration(seconds: 1)) {
print('Warning: High latency detected');
// Consider adding more workers or optimizing algorithms
}
}
}

Implement intelligent work distribution:

class SmartLoadBalancer {
final Map<String, WorkerMetrics> _workerMetrics = {};
final Queue<PendingWork> _workQueue = Queue();
String selectOptimalWorker(WorkType workType) {
switch (workType) {
case WorkType.cpuIntensive:
return _selectByCpuUsage();
case WorkType.memoryIntensive:
return _selectByMemoryUsage();
case WorkType.balanced:
return _selectByOverallLoad();
default:
return _selectRoundRobin();
}
}
String _selectByCpuUsage() {
return _workerMetrics.entries
.where((entry) => entry.value.isHealthy)
.reduce((a, b) => a.value.cpuUsage < b.value.cpuUsage ? a : b)
.key;
}
String _selectByMemoryUsage() {
return _workerMetrics.entries
.where((entry) => entry.value.isHealthy)
.reduce((a, b) => a.value.memoryUsage < b.value.memoryUsage ? a : b)
.key;
}
String _selectByOverallLoad() {
return _workerMetrics.entries
.where((entry) => entry.value.isHealthy)
.reduce((a, b) => a.value.overallLoad < b.value.overallLoad ? a : b)
.key;
}
void updateWorkerMetrics(String workerId, WorkerMetrics metrics) {
_workerMetrics[workerId] = metrics;
}
Future<void> rebalanceWorkload() async {
final overloadedWorkers = _workerMetrics.entries
.where((entry) => entry.value.overallLoad > 0.8)
.map((entry) => entry.key)
.toList();
if (overloadedWorkers.isNotEmpty) {
await _redistributeWork(overloadedWorkers);
}
}
}
class ZeroCopyOptimizations {
// ✅ Good: Pass large objects directly
Future<ProcessedData> processLargeData(LargeDataset dataset) async {
// Object is passed by reference, no copying
return await processor.process(dataset);
}
// ❌ Avoid: Unnecessary data transformation before passing
Future<ProcessedData> processLargeDataBad(LargeDataset dataset) async {
// This creates unnecessary copies
final serialized = dataset.toJson();
final reconstructed = LargeDataset.fromJson(serialized);
return await processor.process(reconstructed);
}
// ✅ Good: Use streaming for large collections
Stream<ProcessedItem> processStreamingData(Stream<DataItem> items) async* {
await for (final item in items) {
yield await processor.processItem(item);
}
}
}
class WorkerLifecycleManager {
final Map<String, WorkerInfo> _workers = {};
Future<void> initializeWorkers() async {
for (int i = 0; i < Platform.numberOfProcessors; i++) {
await _createWorker('worker_$i');
}
}
Future<void> _createWorker(String workerId) async {
try {
final worker = await IsolateWorker.spawn(
'worker_script.dart',
onExit: () => _handleWorkerExit(workerId),
onError: (error) => _handleWorkerError(workerId, error),
);
_workers[workerId] = WorkerInfo(
worker: worker,
createdAt: DateTime.now(),
taskCount: 0,
);
} catch (e) {
print('Failed to create worker $workerId: $e');
rethrow;
}
}
void _handleWorkerExit(String workerId) {
print('Worker $workerId exited, creating replacement...');
_workers.remove(workerId);
_createWorker(workerId);
}
void _handleWorkerError(String workerId, dynamic error) {
print('Worker $workerId error: $error');
// Implement error recovery strategy
}
Future<void> gracefulShutdown() async {
print('Shutting down ${_workers.length} workers...');
await Future.wait(_workers.values.map((info) async {
await info.worker.terminate(graceful: true);
}));
_workers.clear();
}
}
class IsolatePerformanceProfiler {
final Map<String, List<Duration>> _latencyHistory = {};
final Map<String, int> _throughputCounters = {};
Timer? _reportTimer;
void startProfiling() {
_reportTimer = Timer.periodic(Duration(seconds: 30), (_) {
_generatePerformanceReport();
});
}
void recordLatency(String operation, Duration latency) {
_latencyHistory.putIfAbsent(operation, () => <Duration>[]);
_latencyHistory[operation]!.add(latency);
// Keep only recent measurements
if (_latencyHistory[operation]!.length > 1000) {
_latencyHistory[operation]!.removeAt(0);
}
}
void recordThroughput(String operation) {
_throughputCounters[operation] = (_throughputCounters[operation] ?? 0) + 1;
}
void _generatePerformanceReport() {
print('\n=== Isolate Performance Report ===');
for (final operation in _latencyHistory.keys) {
final latencies = _latencyHistory[operation]!;
final avgLatency = latencies.fold(Duration.zero, (sum, l) => sum + l) ~/ latencies.length;
final throughput = _throughputCounters[operation] ?? 0;
print('$operation:');
print(' Average Latency: ${avgLatency.inMilliseconds}ms');
print(' Throughput: ${throughput / 30} ops/sec');
print(' Total Operations: $throughput');
}
// Reset counters
_throughputCounters.clear();
}
void stopProfiling() {
_reportTimer?.cancel();
_generatePerformanceReport();
}
}

Isolate transport is the perfect choice for CPU-intensive applications that need to maintain UI responsiveness while performing heavy computational work. Its zero-copy capabilities and worker management make it ideal for scientific computing, data processing, and any scenario where parallel processing can significantly improve performance.