Zero-Copy Object Passing
Direct object passing between isolates without serialization overhead, maintaining performance for large data structures.
The Isolate Transport enables high-performance communication between Dart isolates with zero-copy object passing capabilities. It’s designed for CPU-intensive applications that need parallel processing while maintaining the performance benefits of direct object sharing.
Zero-Copy Object Passing
Direct object passing between isolates without serialization overhead, maintaining performance for large data structures.
Worker Process Support
Spawn dedicated worker isolates for CPU-intensive tasks while keeping the main isolate responsive.
Parallel Processing
Distribute workload across multiple isolates to take advantage of multi-core systems efficiently.
Type Safety
Full Dart type information is preserved across isolate boundaries, maintaining compile-time safety.
Unlike traditional isolate communication that requires message passing, Isolate transport can pass objects directly:
class LargeDataProcessor { // Process massive datasets without copying Future<ProcessedData> processLargeDataset(LargeDataset dataset) async { // The entire dataset is passed by reference to the worker isolate // No serialization, no copying - just direct object access return ProcessedData( results: dataset.records.map((record) => processRecord(record)).toList(), metadata: { 'processed_at': DateTime.now(), 'record_count': dataset.records.length, 'processing_time_ms': processingTime.inMilliseconds, }, ); }}
Automatically manage worker isolates for parallel processing:
// Create a pool of worker isolatesfinal transport = RpcIsolateTransport.workerPool( workerCount: Platform.numberOfProcessors, workerScript: 'worker_isolate.dart', options: IsolateTransportOptions( enableZeroCopy: true, maxMemoryUsage: 512 * 1024 * 1024, // 512MB per worker idleTimeout: Duration(minutes: 5), ),);
final caller = RpcCallerEndpoint(transport: transport);final processor = DataProcessorCaller(caller);
// Work is automatically distributed across available workersfinal futures = List.generate(100, (i) async { return await processor.processChunk(DataChunk(id: i, data: generateData(i)));});
final results = await Future.wait(futures);// All 100 chunks processed in parallel across worker isolates
Create a dedicated worker script for CPU-intensive operations:
import 'dart:isolate';import 'package:rpc_dart/rpc_dart.dart';import 'package:rpc_dart_transports/rpc_dart_transports.dart';
void main(List<String> args, SendPort sendPort) async { // Create isolate transport for worker final transport = RpcIsolateWorkerTransport(sendPort);
// Create responder endpoint final responder = RpcResponderEndpoint(transport: transport);
// Register worker services responder.registerServiceContract(DataProcessorResponder()); responder.registerServiceContract(ImageProcessorResponder()); responder.registerServiceContract(ComputationResponder());
// Start the worker await responder.start();
print('Worker isolate started with PID: ${Isolate.current.debugName}');}
// CPU-intensive data processing serviceclass DataProcessorResponder extends RpcResponderContract { DataProcessorResponder() : super('DataProcessor');
@override void setup() { addUnaryMethod<ProcessDataRequest, ProcessDataResponse>( methodName: 'processData', handler: _processData, );
addUnaryMethod<ComputeRequest, ComputeResponse>( methodName: 'compute', handler: _compute, );
addServerStreamMethod<BatchRequest, BatchResult>( methodName: 'processBatch', handler: _processBatch, ); }
Future<ProcessDataResponse> _processData(ProcessDataRequest request, {RpcContext? context}) async { // CPU-intensive processing without blocking main isolate final results = <ProcessedItem>[];
for (final item in request.items) { // Simulate heavy computation final processed = await _performHeavyComputation(item); results.add(processed); }
return ProcessDataResponse( results: results, processingTimeMs: DateTime.now().difference(request.startTime).inMilliseconds, workerId: Isolate.current.debugName ?? 'unknown', ); }
Future<ComputeResponse> _compute(ComputeRequest request, {RpcContext? context}) async { // Mathematical computations final result = await _performMathematicalComputation( request.operation, request.parameters, );
return ComputeResponse( result: result, metadata: { 'computation_type': request.operation, 'parameter_count': request.parameters.length, 'worker_id': Isolate.current.debugName, }, ); }
Stream<BatchResult> _processBatch(BatchRequest request, {RpcContext? context}) async* { final batchSize = request.batchSize ?? 10;
for (int i = 0; i < request.items.length; i += batchSize) { final batch = request.items.skip(i).take(batchSize).toList(); final processed = await _processBatchItems(batch);
yield BatchResult( batchIndex: i ~/ batchSize, results: processed, progress: (i + batch.length) / request.items.length, ); } }
Future<ProcessedItem> _performHeavyComputation(DataItem item) async { // Simulate CPU-intensive work await Future.delayed(Duration(milliseconds: 50));
return ProcessedItem( id: item.id, result: item.value * 2, // Simplified computation metadata: { 'processed_by': Isolate.current.debugName, 'computation_time': 50, }, ); }}
import 'package:rpc_dart/rpc_dart.dart';import 'package:rpc_dart_transports/rpc_dart_transports.dart';
class MainApplication { late RpcCallerEndpoint _caller; late DataProcessorCaller _dataProcessor; late RpcIsolateTransport _transport;
Future<void> initialize() async { // Create isolate transport with worker pool _transport = RpcIsolateTransport.workerPool( workerCount: Platform.numberOfProcessors, workerScript: 'worker_isolate.dart', options: IsolateTransportOptions( enableZeroCopy: true, loadBalancing: LoadBalancingStrategy.roundRobin, maxMemoryUsage: 256 * 1024 * 1024, // 256MB per worker idleTimeout: Duration(minutes: 10),
// Worker lifecycle management onWorkerCreated: (workerId) => print('Worker created: $workerId'), onWorkerTerminated: (workerId) => print('Worker terminated: $workerId'), onWorkerError: (workerId, error) => print('Worker error: $workerId - $error'), ), );
_caller = RpcCallerEndpoint(transport: _transport); _dataProcessor = DataProcessorCaller(_caller);
await _transport.initialize(); print('Initialized ${_transport.workerCount} worker isolates'); }
Future<void> processLargeDataset() async { final dataset = generateLargeDataset(10000); // 10k items
print('Processing ${dataset.items.length} items...'); final stopwatch = Stopwatch()..start();
// Process in parallel across workers final response = await _dataProcessor.processData(ProcessDataRequest( items: dataset.items, startTime: DateTime.now(), ));
stopwatch.stop();
print('Processed ${response.results.length} items in ${stopwatch.elapsedMilliseconds}ms'); print('Worker: ${response.workerId}'); print('Processing time: ${response.processingTimeMs}ms'); }
Future<void> close() async { await _caller.close(); await _transport.close(); }}
void main() async { final app = MainApplication();
try { await app.initialize(); await app.processLargeDataset(); } finally { await app.close(); }}
// Data structures for zero-copy passingclass DataItem { final String id; final double value; final Map<String, dynamic> metadata;
DataItem({ required this.id, required this.value, required this.metadata, });}
class ProcessedItem { final String id; final double result; final Map<String, dynamic> metadata;
ProcessedItem({ required this.id, required this.result, required this.metadata, });}
class ProcessDataRequest { final List<DataItem> items; final DateTime startTime;
ProcessDataRequest({ required this.items, required this.startTime, });}
class ProcessDataResponse { final List<ProcessedItem> results; final int processingTimeMs; final String workerId;
ProcessDataResponse({ required this.results, required this.processingTimeMs, required this.workerId, });}
class ComputeRequest { final String operation; final List<double> parameters;
ComputeRequest({ required this.operation, required this.parameters, });}
class ComputeResponse { final double result; final Map<String, dynamic> metadata;
ComputeResponse({ required this.result, required this.metadata, });}
class BatchRequest { final List<DataItem> items; final int? batchSize;
BatchRequest({ required this.items, this.batchSize, });}
class BatchResult { final int batchIndex; final List<ProcessedItem> results; final double progress;
BatchResult({ required this.batchIndex, required this.results, required this.progress, });}
Process images in parallel without blocking the UI:
class ImageProcessingPipeline { late RpcCallerEndpoint _caller; late ImageProcessorCaller _imageProcessor;
Future<void> initializeWorkers() async { final transport = RpcIsolateTransport.workerPool( workerCount: 4, // 4 worker isolates for image processing workerScript: 'image_worker.dart', options: IsolateTransportOptions( enableZeroCopy: true, maxMemoryUsage: 1024 * 1024 * 1024, // 1GB per worker for images ), );
_caller = RpcCallerEndpoint(transport: transport); _imageProcessor = ImageProcessorCaller(_caller); }
Future<List<ProcessedImage>> processImageBatch(List<RawImage> images) async { print('Processing ${images.length} images in parallel...');
// Process all images concurrently across workers final futures = images.map((image) async { return await _imageProcessor.processImage(ProcessImageRequest( image: image, filters: [ ImageFilter.resize(width: 800, height: 600), ImageFilter.sharpen(intensity: 0.5), ImageFilter.colorCorrection(brightness: 1.1), ], outputFormat: ImageFormat.jpeg, quality: 85, )); }).toList();
final results = await Future.wait(futures); print('Processed ${results.length} images');
return results.map((r) => r.processedImage).toList(); }
Stream<BatchProcessingUpdate> processImageDirectory(String directoryPath) async* { final imageFiles = Directory(directoryPath).listSync() .where((f) => f.path.endsWith('.jpg') || f.path.endsWith('.png')) .toList();
const batchSize = 10; int processed = 0;
for (int i = 0; i < imageFiles.length; i += batchSize) { final batch = imageFiles.skip(i).take(batchSize).toList(); final images = await Future.wait( batch.map((file) => loadImageFromFile(file.path)), );
final processedImages = await processImageBatch(images); processed += processedImages.length;
yield BatchProcessingUpdate( processedCount: processed, totalCount: imageFiles.length, progress: processed / imageFiles.length, currentBatch: processedImages, ); } }}
Perform heavy computational work in background isolates:
class DataAnalytics { late RpcCallerEndpoint _caller; late AnalyticsProcessorCaller _analyticsProcessor;
Future<void> initializeAnalyticsCluster() async { final transport = RpcIsolateTransport.workerPool( workerCount: Platform.numberOfProcessors, workerScript: 'analytics_worker.dart', options: IsolateTransportOptions( enableZeroCopy: true, loadBalancing: LoadBalancingStrategy.leastLoaded,
// Analytics-specific settings memoryManagement: MemoryManagement( maxMemoryUsage: 2 * 1024 * 1024 * 1024, // 2GB per worker gcTriggerThreshold: 0.8, enableMemoryProfiling: true, ),
performanceMonitoring: PerformanceMonitoring( enableCpuProfiling: true, enableLatencyTracking: true, reportInterval: Duration(seconds: 30), ), ), );
_caller = RpcCallerEndpoint(transport: transport); _analyticsProcessor = AnalyticsProcessorCaller(_caller); }
Future<AnalyticsReport> generateReport(AnalyticsDataset dataset) async { print('Generating analytics report for ${dataset.records.length} records...');
// Distribute different types of analysis across workers final futures = [ _analyticsProcessor.calculateStatistics(StatisticsRequest( data: dataset.numericalData, operations: [ StatisticalOperation.mean, StatisticalOperation.median, StatisticalOperation.standardDeviation, StatisticalOperation.percentiles, ], )),
_analyticsProcessor.performTimeSeriesAnalysis(TimeSeriesRequest( timeSeries: dataset.timeSeriesData, analysisTypes: [ TimeSeriesAnalysis.trend, TimeSeriesAnalysis.seasonality, TimeSeriesAnalysis.correlation, ], )),
_analyticsProcessor.runMachineLearning(MLRequest( features: dataset.features, labels: dataset.labels, algorithms: [ MLAlgorithm.linearRegression, MLAlgorithm.randomForest, MLAlgorithm.clustering, ], )), ];
final results = await Future.wait(futures);
return AnalyticsReport( statistics: results[0] as StatisticsResult, timeSeriesAnalysis: results[1] as TimeSeriesResult, machineLearningResults: results[2] as MLResult, generatedAt: DateTime.now(), processingMetrics: await _getProcessingMetrics(), ); }
Stream<RealtimeAnalytics> startRealtimeAnalysis(Stream<DataPoint> dataStream) async* { final controller = StreamController<AnalyzeDataPointRequest>();
// Send real-time data to analytics workers dataStream.listen((dataPoint) { controller.add(AnalyzeDataPointRequest( dataPoint: dataPoint, analysisConfig: RealtimeAnalysisConfig( enableAnomalyDetection: true, enableTrendAnalysis: true, windowSize: Duration(minutes: 5), ), )); });
// Get real-time results await for (final result in _analyticsProcessor.analyzeRealtimeStream(controller.stream)) { yield RealtimeAnalytics( analysis: result, timestamp: DateTime.now(), workerId: result.workerId, ); } }}
High-performance scientific computations:
class ScientificComputing { late RpcCallerEndpoint _caller; late ComputationEngineCaller _computationEngine;
Future<void> initializeComputeCluster() async { final transport = RpcIsolateTransport.workerPool( workerCount: Platform.numberOfProcessors * 2, // Over-provision for CPU-bound work workerScript: 'scientific_worker.dart', options: IsolateTransportOptions( enableZeroCopy: true,
// Optimize for scientific computing computeOptimization: ComputeOptimization( enableVectorization: true, enableParallelMath: true, floatingPointPrecision: FloatingPointPrecision.double, ),
// Memory management for large datasets memoryManagement: MemoryManagement( maxMemoryUsage: 4 * 1024 * 1024 * 1024, // 4GB per worker enableLargeObjectHeap: true, gcStrategy: GCStrategy.throughput, ), ), );
_caller = RpcCallerEndpoint(transport: transport); _computationEngine = ComputationEngineCaller(_caller); }
Future<SimulationResult> runMolecularDynamicsSimulation(MDSimulationRequest request) async { print('Starting molecular dynamics simulation with ${request.particles.length} particles...');
// Distribute simulation steps across workers final timeSteps = request.totalTime ~/ request.timeStep; final results = <MDStepResult>[];
for (int step = 0; step < timeSteps; step += 100) { final stepBatch = math.min(100, timeSteps - step);
final batchFutures = List.generate(stepBatch, (i) async { return await _computationEngine.simulateTimeStep(MDStepRequest( particles: step == 0 ? request.particles : results.last.finalState, stepNumber: step + i, timeStep: request.timeStep, forces: request.forces, boundary: request.boundaryConditions, )); });
final batchResults = await Future.wait(batchFutures); results.addAll(batchResults);
// Report progress final progress = (step + stepBatch) / timeSteps; print('Simulation progress: ${(progress * 100).toStringAsFixed(1)}%'); }
return SimulationResult( steps: results, totalEnergy: results.last.totalEnergy, finalState: results.last.finalState, computationTime: results.fold(Duration.zero, (sum, r) => sum + r.computationTime), ); }
Future<MatrixOperationResult> performLargeMatrixOperations(List<Matrix> matrices) async { // Distribute matrix operations across workers final operations = [ MatrixOperation.multiply, MatrixOperation.invert, MatrixOperation.eigenvalues, MatrixOperation.singularValueDecomposition, ];
final futures = <Future>[];
for (final matrix in matrices) { for (final operation in operations) { futures.add(_computationEngine.performMatrixOperation(MatrixRequest( matrix: matrix, operation: operation, precision: NumericalPrecision.high, ))); } }
final results = await Future.wait(futures);
return MatrixOperationResult( results: results.cast<MatrixResult>(), totalComputationTime: results.fold( Duration.zero, (sum, r) => sum + (r as MatrixResult).computationTime, ), ); }}
Optimize memory usage for large-scale processing:
class OptimizedIsolateTransport { static RpcIsolateTransport createOptimized({ required int workerCount, required String workerScript, }) { return RpcIsolateTransport.workerPool( workerCount: workerCount, workerScript: workerScript, options: IsolateTransportOptions( enableZeroCopy: true,
// Memory optimization memoryManagement: MemoryManagement( maxMemoryUsage: _calculateOptimalMemoryPerWorker(), gcTriggerThreshold: 0.7, enableMemoryProfiling: true, enableLargeObjectHeap: true, gcStrategy: GCStrategy.lowLatency, ),
// Performance monitoring performanceMonitoring: PerformanceMonitoring( enableCpuProfiling: true, enableMemoryTracking: true, enableLatencyTracking: true, reportInterval: Duration(seconds: 30), onPerformanceReport: _handlePerformanceReport, ),
// Load balancing loadBalancing: LoadBalancingStrategy.adaptive, healthCheckInterval: Duration(seconds: 10),
// Worker lifecycle workerLifecycle: WorkerLifecycle( idleTimeout: Duration(minutes: 5), maxWorkersPerCore: 2, enableWorkerRecycling: true, recycleAfterOperations: 10000, ), ), ); }
static int _calculateOptimalMemoryPerWorker() { final totalMemory = Platform.operatingSystemVersion.contains('memory') ? _parseMemoryFromOS() : 8 * 1024 * 1024 * 1024; // Default 8GB
final workerCount = Platform.numberOfProcessors; return (totalMemory * 0.7) ~/ workerCount; // Use 70% of available memory }
static void _handlePerformanceReport(PerformanceReport report) { print('Performance Report:'); print(' CPU Usage: ${report.cpuUsage.toStringAsFixed(1)}%'); print(' Memory Usage: ${report.memoryUsage ~/ (1024 * 1024)}MB'); print(' Average Latency: ${report.averageLatency.inMilliseconds}ms'); print(' Throughput: ${report.operationsPerSecond.toStringAsFixed(0)} ops/sec');
// Trigger optimizations based on metrics if (report.memoryUsage > report.maxMemoryUsage * 0.9) { print('Warning: High memory usage, triggering GC'); // Trigger garbage collection }
if (report.averageLatency > Duration(seconds: 1)) { print('Warning: High latency detected'); // Consider adding more workers or optimizing algorithms } }}
Implement intelligent work distribution:
class SmartLoadBalancer { final Map<String, WorkerMetrics> _workerMetrics = {}; final Queue<PendingWork> _workQueue = Queue();
String selectOptimalWorker(WorkType workType) { switch (workType) { case WorkType.cpuIntensive: return _selectByCpuUsage(); case WorkType.memoryIntensive: return _selectByMemoryUsage(); case WorkType.balanced: return _selectByOverallLoad(); default: return _selectRoundRobin(); } }
String _selectByCpuUsage() { return _workerMetrics.entries .where((entry) => entry.value.isHealthy) .reduce((a, b) => a.value.cpuUsage < b.value.cpuUsage ? a : b) .key; }
String _selectByMemoryUsage() { return _workerMetrics.entries .where((entry) => entry.value.isHealthy) .reduce((a, b) => a.value.memoryUsage < b.value.memoryUsage ? a : b) .key; }
String _selectByOverallLoad() { return _workerMetrics.entries .where((entry) => entry.value.isHealthy) .reduce((a, b) => a.value.overallLoad < b.value.overallLoad ? a : b) .key; }
void updateWorkerMetrics(String workerId, WorkerMetrics metrics) { _workerMetrics[workerId] = metrics; }
Future<void> rebalanceWorkload() async { final overloadedWorkers = _workerMetrics.entries .where((entry) => entry.value.overallLoad > 0.8) .map((entry) => entry.key) .toList();
if (overloadedWorkers.isNotEmpty) { await _redistributeWork(overloadedWorkers); } }}
class ZeroCopyOptimizations { // ✅ Good: Pass large objects directly Future<ProcessedData> processLargeData(LargeDataset dataset) async { // Object is passed by reference, no copying return await processor.process(dataset); }
// ❌ Avoid: Unnecessary data transformation before passing Future<ProcessedData> processLargeDataBad(LargeDataset dataset) async { // This creates unnecessary copies final serialized = dataset.toJson(); final reconstructed = LargeDataset.fromJson(serialized); return await processor.process(reconstructed); }
// ✅ Good: Use streaming for large collections Stream<ProcessedItem> processStreamingData(Stream<DataItem> items) async* { await for (final item in items) { yield await processor.processItem(item); } }}
class WorkerLifecycleManager { final Map<String, WorkerInfo> _workers = {};
Future<void> initializeWorkers() async { for (int i = 0; i < Platform.numberOfProcessors; i++) { await _createWorker('worker_$i'); } }
Future<void> _createWorker(String workerId) async { try { final worker = await IsolateWorker.spawn( 'worker_script.dart', onExit: () => _handleWorkerExit(workerId), onError: (error) => _handleWorkerError(workerId, error), );
_workers[workerId] = WorkerInfo( worker: worker, createdAt: DateTime.now(), taskCount: 0, );
} catch (e) { print('Failed to create worker $workerId: $e'); rethrow; } }
void _handleWorkerExit(String workerId) { print('Worker $workerId exited, creating replacement...'); _workers.remove(workerId); _createWorker(workerId); }
void _handleWorkerError(String workerId, dynamic error) { print('Worker $workerId error: $error'); // Implement error recovery strategy }
Future<void> gracefulShutdown() async { print('Shutting down ${_workers.length} workers...');
await Future.wait(_workers.values.map((info) async { await info.worker.terminate(graceful: true); }));
_workers.clear(); }}
class IsolatePerformanceProfiler { final Map<String, List<Duration>> _latencyHistory = {}; final Map<String, int> _throughputCounters = {}; Timer? _reportTimer;
void startProfiling() { _reportTimer = Timer.periodic(Duration(seconds: 30), (_) { _generatePerformanceReport(); }); }
void recordLatency(String operation, Duration latency) { _latencyHistory.putIfAbsent(operation, () => <Duration>[]); _latencyHistory[operation]!.add(latency);
// Keep only recent measurements if (_latencyHistory[operation]!.length > 1000) { _latencyHistory[operation]!.removeAt(0); } }
void recordThroughput(String operation) { _throughputCounters[operation] = (_throughputCounters[operation] ?? 0) + 1; }
void _generatePerformanceReport() { print('\n=== Isolate Performance Report ===');
for (final operation in _latencyHistory.keys) { final latencies = _latencyHistory[operation]!; final avgLatency = latencies.fold(Duration.zero, (sum, l) => sum + l) ~/ latencies.length; final throughput = _throughputCounters[operation] ?? 0;
print('$operation:'); print(' Average Latency: ${avgLatency.inMilliseconds}ms'); print(' Throughput: ${throughput / 30} ops/sec'); print(' Total Operations: $throughput'); }
// Reset counters _throughputCounters.clear(); }
void stopProfiling() { _reportTimer?.cancel(); _generatePerformanceReport(); }}
Isolate transport is the perfect choice for CPU-intensive applications that need to maintain UI responsiveness while performing heavy computational work. Its zero-copy capabilities and worker management make it ideal for scientific computing, data processing, and any scenario where parallel processing can significantly improve performance.