Коммуникация в реальном времени
Полнодуплексная коммуникация с мгновенной доставкой сообщений, идеально для чатов, живых обновлений и инструментов совместной работы.
WebSocket транспорт обеспечивает двунаправленную коммуникацию в реальном времени между клиентами и серверами RPC Dart. Он идеален для веб-приложений, систем реального времени и сценариев, где предпочтительны постоянные соединения вместо паттерна запрос-ответ.
Коммуникация в реальном времени
Полнодуплексная коммуникация с мгновенной доставкой сообщений, идеально для чатов, живых обновлений и инструментов совместной работы.
Кроссплатформенная поддержка
Работает беспрепятственно в браузерах, Flutter приложениях и серверных приложениях с консистентным поведением.
Управление соединениями
Автоматическое переподключение, мониторинг heartbeat и изящная деградация для надёжной работы с сетью.
Поддержка потоков
Нативная поддержка всех паттернов потоковой передачи RPC с эффективным мультиплексированием сообщений через одно соединение.
WebSocket транспорт обеспечивает коммуникацию в реальном времени в обоих направлениях:
// Сервер может отправлять обновления клиентамclass NotificationService extends RpcResponderContract { final StreamController<NotificationEvent> _notifications = StreamController.broadcast();
NotificationService() : super('NotificationService');
@override void setup() { addServerStreamMethod<Empty, NotificationEvent>( methodName: 'subscribe', handler: _subscribe, );
addUnaryMethod<NotificationRequest, Empty>( methodName: 'notify', handler: _notify, ); }
Stream<NotificationEvent> _subscribe(Empty request, {RpcContext? context}) { return _notifications.stream; }
Future<Empty> _notify(NotificationRequest request, {RpcContext? context}) async { _notifications.add(NotificationEvent( type: request.type, message: request.message, timestamp: DateTime.now(), )); return Empty(); }}
В отличие от HTTP запросов, WebSocket соединения остаются открытыми:
// Соединение остаётся активным для множественных операцийfinal transport = RpcWebSocketTransport( url: 'ws://localhost:8080/rpc', options: WebSocketTransportOptions( heartbeatInterval: Duration(seconds: 30), reconnectInterval: Duration(seconds: 5), maxReconnectAttempts: 10, ),);
final caller = RpcCallerEndpoint(transport: transport);
// Множественные вызовы через одно соединениеfor (int i = 0; i < 100; i++) { final result = await service.process(DataRequest(id: i)); print('Обработано: ${result.id}');}
Настройка WebSocket сервера для RPC Dart:
import 'dart:io';import 'package:rpc_dart/rpc_dart.dart';import 'package:rpc_dart_transports/rpc_dart_transports.dart';
class WebSocketRpcServer { late HttpServer _server; final List<RpcResponderEndpoint> _endpoints = [];
Future<void> start({ String host = 'localhost', int port = 8080, String path = '/rpc', }) async { _server = await HttpServer.bind(host, port); print('WebSocket RPC сервер слушает на ws://$host:$port$path');
await for (HttpRequest request in _server) { if (request.uri.path == path) { await _handleWebSocketConnection(request); } else { request.response.statusCode = HttpStatus.notFound; await request.response.close(); } } }
Future<void> _handleWebSocketConnection(HttpRequest request) async { try { final socket = await WebSocketTransformer.upgrade(request); final transport = RpcWebSocketServerTransport(socket);
final endpoint = RpcResponderEndpoint(transport: transport);
// Регистрируем контракты сервисов endpoint.registerServiceContract(CalculatorResponder()); endpoint.registerServiceContract(NotificationService()); endpoint.registerServiceContract(FileService());
_endpoints.add(endpoint);
// Начинаем обработку RPC вызовов await endpoint.start();
// Очищаем ресурсы при закрытии соединения socket.done.then((_) { _endpoints.remove(endpoint); endpoint.close(); });
} catch (e) { print('Ошибка обработки WebSocket соединения: $e'); request.response.statusCode = HttpStatus.badRequest; await request.response.close(); } }
Future<void> stop() async { await _server.close(); await Future.wait(_endpoints.map((e) => e.close())); _endpoints.clear(); }}
class CalculatorResponder extends RpcResponderContract { CalculatorResponder() : super('Calculator');
@override void setup() { addUnaryMethod<MathRequest, MathResponse>( methodName: 'add', handler: _add, );
addServerStreamMethod<RangeRequest, NumberValue>( methodName: 'generateRange', handler: _generateRange, );
addClientStreamMethod<NumberValue, SumResponse>( methodName: 'sum', handler: _sum, );
addBidirectionalStreamMethod<NumberValue, ProcessedNumber>( methodName: 'processNumbers', handler: _processNumbers, ); }
Future<MathResponse> _add(MathRequest request, {RpcContext? context}) async { return MathResponse(result: request.a + request.b); }
Stream<NumberValue> _generateRange(RangeRequest request, {RpcContext? context}) async* { for (int i = request.start; i <= request.end; i++) { yield NumberValue(value: i); await Future.delayed(Duration(milliseconds: 100)); } }
Future<SumResponse> _sum(Stream<NumberValue> numbers, {RpcContext? context}) async { double sum = 0; await for (final number in numbers) { sum += number.value; } return SumResponse(total: sum); }
Stream<ProcessedNumber> _processNumbers( Stream<NumberValue> numbers, {RpcContext? context} ) async* { await for (final number in numbers) { yield ProcessedNumber( original: number.value, squared: number.value * number.value, timestamp: DateTime.now(), ); } }}
void main() async { final server = WebSocketRpcServer();
// Обрабатываем изящное завершение ProcessSignal.sigint.watch().listen((_) async { print('Завершение работы сервера...'); await server.stop(); exit(0); });
// Запускаем сервер await server.start( host: 'localhost', port: 8080, path: '/rpc', );}
Подключение к WebSocket RPC серверу:
import 'package:rpc_dart/rpc_dart.dart';import 'package:rpc_dart_transports/rpc_dart_transports.dart';
void main() async { // Создаём WebSocket транспорт final transport = RpcWebSocketTransport( url: 'ws://localhost:8080/rpc', options: WebSocketTransportOptions( heartbeatInterval: Duration(seconds: 30), reconnectInterval: Duration(seconds: 5), maxReconnectAttempts: 10, connectTimeout: Duration(seconds: 10), ), );
// Создаём caller эндпоинт final caller = RpcCallerEndpoint(transport: transport); final calculator = CalculatorCaller(caller);
try { // Ждём подключения await transport.connect();
// Делаем RPC вызовы final sum = await calculator.add(MathRequest(a: 10, b: 5)); print('10 + 5 = ${sum.result}');
// Используем потоковые методы await for (final number in calculator.generateRange(RangeRequest(start: 1, end: 5))) { print('Сгенерировано: ${number.value}'); }
} finally { await caller.close(); }}
Настройка поведения WebSocket транспорта:
final transport = RpcWebSocketTransport( url: 'wss://secure-api.example.com/rpc', options: WebSocketTransportOptions( // Настройки подключения connectTimeout: Duration(seconds: 10), protocols: ['rpc-dart-v1'], headers: { 'Authorization': 'Bearer $token', 'User-Agent': 'RpcDartClient/1.0', },
// Heartbeat и переподключение heartbeatInterval: Duration(seconds: 30), reconnectInterval: Duration(seconds: 5), maxReconnectAttempts: 10, backoffMultiplier: 1.5,
// Обработка сообщений maxMessageSize: 1024 * 1024, // 1MB compression: CompressionOptions.defaultDeflate,
// Жизненный цикл соединения onConnect: () => print('Подключён к RPC серверу'), onDisconnect: (reason) => print('Отключён: $reason'), onReconnect: (attempt) => print('Переподключение (попытка $attempt)'), onError: (error) => print('Ошибка соединения: $error'), ),);
Идеально для приложений, требующих мгновенных обновлений:
class ChatApplication { late RpcCallerEndpoint _caller; late ChatServiceCaller _chatService;
Future<void> connect(String serverUrl) async { final transport = RpcWebSocketTransport(url: serverUrl); _caller = RpcCallerEndpoint(transport: transport); _chatService = ChatServiceCaller(_caller);
await transport.connect(); }
Future<void> startChat(String roomId) async { // Подписываемся на сообщения await for (final message in _chatService.subscribeToMessages(RoomRequest(roomId: roomId))) { displayMessage(message); } }
Future<void> sendMessage(String roomId, String text) async { await _chatService.sendMessage(SendMessageRequest( roomId: roomId, text: text, timestamp: DateTime.now(), )); }
void displayMessage(ChatMessage message) { print('${message.author}: ${message.text}'); }}
Эффективная потоковая передача данных в реальном времени:
class StockPriceMonitor { late RpcCallerEndpoint _caller; late MarketDataCaller _marketData;
Future<void> monitorStocks(List<String> symbols) async { final transport = RpcWebSocketTransport(url: 'wss://market-data.example.com/rpc'); _caller = RpcCallerEndpoint(transport: transport); _marketData = MarketDataCaller(_caller);
await transport.connect();
// Подписываемся на обновления цен final subscription = _marketData.subscribeToPrices(SymbolListRequest(symbols: symbols));
await for (final priceUpdate in subscription) { updateUI(priceUpdate); } }
void updateUI(PriceUpdate update) { print('${update.symbol}: \$${update.price} (${update.change > 0 ? '+' : ''}${update.change})'); }}
Обеспечение совместной работы в реальном времени:
class CollaborativeEditor { late RpcCallerEndpoint _caller; late EditorServiceCaller _editorService;
Future<void> joinDocument(String documentId) async { final transport = RpcWebSocketTransport(url: 'wss://collab.example.com/rpc'); _caller = RpcCallerEndpoint(transport: transport); _editorService = EditorServiceCaller(_caller);
await transport.connect();
// Подписываемся на изменения документа final changes = _editorService.subscribeToChanges(DocumentRequest(documentId: documentId));
await for (final change in changes) { applyChange(change); } }
Future<void> makeEdit(String documentId, EditOperation operation) async { await _editorService.applyEdit(EditRequest( documentId: documentId, operation: operation, authorId: currentUserId, )); }
void applyChange(DocumentChange change) { // Применяем изменение к локальному документу editor.applyOperation(change.operation); showAuthorCursor(change.authorId, change.position); }}
WebSocket транспорт обеспечивает эффективные возможности потоковой передачи:
// Сервер отправляет множественные ответыclass LogStreamService extends RpcResponderContract { LogStreamService() : super('LogStream');
@override void setup() { addServerStreamMethod<LogFilter, LogEntry>( methodName: 'streamLogs', handler: _streamLogs, ); }
Stream<LogEntry> _streamLogs(LogFilter filter, {RpcContext? context}) async* { final logFile = File('application.log'); final stream = logFile.openRead();
await for (final line in stream.transform(utf8.decoder).transform(LineSplitter())) { if (matchesFilter(line, filter)) { yield LogEntry( message: line, timestamp: DateTime.now(), level: extractLogLevel(line), ); } } }}
// Потребление клиентомawait for (final logEntry in logService.streamLogs(LogFilter(level: 'ERROR'))) { print('${logEntry.timestamp}: ${logEntry.message}');}
// Клиент отправляет множественные запросыclass BatchUploadService extends RpcResponderContract { BatchUploadService() : super('BatchUpload');
@override void setup() { addClientStreamMethod<FileChunk, UploadResult>( methodName: 'uploadFile', handler: _uploadFile, ); }
Future<UploadResult> _uploadFile(Stream<FileChunk> chunks, {RpcContext? context}) async { final buffer = BytesBuilder(); int totalChunks = 0;
await for (final chunk in chunks) { buffer.add(chunk.data); totalChunks++; }
final fileBytes = buffer.toBytes(); final filePath = await saveFile(fileBytes);
return UploadResult( filePath: filePath, size: fileBytes.length, chunks: totalChunks, ); }}
// Использование клиентаStream<FileChunk> uploadFile(File file) async* { final bytes = await file.readAsBytes(); const chunkSize = 64 * 1024; // 64KB чанки
for (int i = 0; i < bytes.length; i += chunkSize) { final end = math.min(i + chunkSize, bytes.length); yield FileChunk( data: bytes.sublist(i, end), sequenceNumber: i ~/ chunkSize, ); }}
final result = await uploadService.uploadFile(uploadFile(myFile));
// Обработка в реальном времени с обратной связьюclass ProcessingService extends RpcResponderContract { ProcessingService() : super('Processing');
@override void setup() { addBidirectionalStreamMethod<ProcessRequest, ProcessResponse>( methodName: 'processStream', handler: _processStream, ); }
Stream<ProcessResponse> _processStream( Stream<ProcessRequest> requests, {RpcContext? context} ) async* { await for (final request in requests) { try { final result = await processData(request.data); yield ProcessResponse( id: request.id, success: true, result: result, ); } catch (e) { yield ProcessResponse( id: request.id, success: false, error: e.toString(), ); } } }}
WebSocket транспорт изящно обрабатывает проблемы с соединением:
class ResilientClient { late RpcWebSocketTransport _transport; late RpcCallerEndpoint _caller; late ServiceCaller _service;
Future<void> connect(String url) async { _transport = RpcWebSocketTransport( url: url, options: WebSocketTransportOptions( reconnectInterval: Duration(seconds: 5), maxReconnectAttempts: 10, backoffMultiplier: 1.5, onReconnect: (attempt) { print('Попытка переподключения $attempt'); }, onConnect: () { print('Подключён к серверу'); _onConnected(); }, onDisconnect: (reason) { print('Отключён: $reason'); _onDisconnected(); }, ), );
_caller = RpcCallerEndpoint(transport: _transport); _service = ServiceCaller(_caller);
await _transport.connect(); }
void _onConnected() { // Возобновляем операции _resumeSubscriptions(); }
void _onDisconnected() { // Обрабатываем отключение _pauseOperations(); }
Future<void> _resumeSubscriptions() async { // Переподписываемся на активные потоки await _subscribeToNotifications(); }
void _pauseOperations() { // Ставим операции в очередь для повтора _queueOperations = true; }}
class ConnectionMonitor { final RpcWebSocketTransport transport;
ConnectionMonitor(this.transport) { _setupMonitoring(); }
void _setupMonitoring() { transport.connectionState.listen((state) { switch (state) { case ConnectionState.connecting: showStatus('Подключение...'); break; case ConnectionState.connected: showStatus('Подключён'); break; case ConnectionState.disconnected: showStatus('Отключён'); break; case ConnectionState.reconnecting: showStatus('Переподключение...'); break; case ConnectionState.failed: showStatus('Ошибка подключения'); break; } }); }
void showStatus(String message) { print('Соединение: $message'); }}
Используйте WSS для продакшена:
final transport = RpcWebSocketTransport( url: 'wss://secure-api.example.com/rpc', options: WebSocketTransportOptions( headers: { 'Authorization': 'Bearer $authToken', }, // Дополнительные заголовки безопасности extraHeaders: { 'X-API-Key': apiKey, 'X-Client-Version': clientVersion, }, ),);
Реализация правильной аутентификации:
class AuthenticatedWebSocketClient { String? _authToken; late RpcWebSocketTransport _transport;
Future<void> authenticate(String username, String password) async { // Получаем токен аутентификации из сервиса авторизации _authToken = await _getAuthToken(username, password);
// Создаём транспорт с заголовками авторизации _transport = RpcWebSocketTransport( url: 'wss://api.example.com/rpc', options: WebSocketTransportOptions( headers: { 'Authorization': 'Bearer $_authToken', }, onConnect: () => _onAuthenticatedConnect(), onDisconnect: (reason) => _onAuthenticatedDisconnect(reason), ), );
await _transport.connect(); }
void _onAuthenticatedConnect() { print('Аутентифицированное соединение установлено'); }
void _onAuthenticatedDisconnect(String reason) { if (reason.contains('unauthorized')) { // Токен истёк, повторная аутентификация _refreshAuthentication(); } }
Future<void> _refreshAuthentication() async { // Логика обновления токена _authToken = await _refreshAuthToken(_authToken!);
// Обновляем заголовки транспорта _transport.updateHeaders({ 'Authorization': 'Bearer $_authToken', }); }}
Оптимизация производительности с пакетированием сообщений:
class BatchingWebSocketTransport extends RpcWebSocketTransport { final List<Message> _messageQueue = []; Timer? _batchTimer;
@override void sendMessage(Message message) { _messageQueue.add(message);
_batchTimer?.cancel(); _batchTimer = Timer(Duration(milliseconds: 10), _flushBatch); }
void _flushBatch() { if (_messageQueue.isNotEmpty) { final batch = MessageBatch(messages: List.from(_messageQueue)); super.sendMessage(batch); _messageQueue.clear(); } }}
Для высокопроизводительных сценариев:
class WebSocketConnectionPool { final List<RpcWebSocketTransport> _connections = []; final String _baseUrl; final int _poolSize; int _currentIndex = 0;
WebSocketConnectionPool(this._baseUrl, this._poolSize);
Future<void> initialize() async { for (int i = 0; i < _poolSize; i++) { final transport = RpcWebSocketTransport(url: '$_baseUrl/rpc'); await transport.connect(); _connections.add(transport); } }
RpcWebSocketTransport getConnection() { final connection = _connections[_currentIndex]; _currentIndex = (_currentIndex + 1) % _poolSize; return connection; }
Future<void> close() async { await Future.wait(_connections.map((c) => c.close())); _connections.clear(); }}
class WebSocketClient { Future<void> connect() async { try { await transport.connect(); await _initializeServices(); } catch (e) { print('Не удалось подключиться: $e'); await _handleConnectionFailure(); } }
Future<void> _initializeServices() async { // Инициализируем сервисы после подключения await _subscribeToRequiredStreams(); await _syncInitialState(); }
Future<void> _handleConnectionFailure() async { // Реализуем резервную логику await _switchToHttpTransport(); }}
class AdaptiveClient { Future<void> makeCall() async { try { if (transport.isConnected) { return await _makeWebSocketCall(); } else { return await _makeHttpCall(); } } catch (e) { print('WebSocket вызов не удался, переключаемся на HTTP: $e'); return await _makeHttpCall(); } }}
class PerformanceMonitor { void trackWebSocketMetrics(RpcWebSocketTransport transport) { transport.messagesSent.listen((count) { print('Сообщений отправлено: $count'); });
transport.messagesReceived.listen((count) { print('Сообщений получено: $count'); });
transport.latency.listen((duration) { print('Текущая задержка: ${duration.inMilliseconds}мс'); }); }}
WebSocket транспорт - идеальный выбор для приложений реального времени, систем совместной работы и сценариев, требующих постоянных соединений. Его двунаправленная природа и возможности потоковой передачи делают его совершенным для современных интерактивных приложений, сохраняя при этом надёжность и типобезопасность RPC Dart.