Перейти к содержимому

WebSocket транспорт

WebSocket транспорт обеспечивает двунаправленную коммуникацию в реальном времени между клиентами и серверами RPC Dart. Он идеален для веб-приложений, систем реального времени и сценариев, где предпочтительны постоянные соединения вместо паттерна запрос-ответ.

Коммуникация в реальном времени

Полнодуплексная коммуникация с мгновенной доставкой сообщений, идеально для чатов, живых обновлений и инструментов совместной работы.

Кроссплатформенная поддержка

Работает беспрепятственно в браузерах, Flutter приложениях и серверных приложениях с консистентным поведением.

Управление соединениями

Автоматическое переподключение, мониторинг heartbeat и изящная деградация для надёжной работы с сетью.

Поддержка потоков

Нативная поддержка всех паттернов потоковой передачи RPC с эффективным мультиплексированием сообщений через одно соединение.

WebSocket транспорт обеспечивает коммуникацию в реальном времени в обоих направлениях:

// Сервер может отправлять обновления клиентам
class NotificationService extends RpcResponderContract {
final StreamController<NotificationEvent> _notifications = StreamController.broadcast();
NotificationService() : super('NotificationService');
@override
void setup() {
addServerStreamMethod<Empty, NotificationEvent>(
methodName: 'subscribe',
handler: _subscribe,
);
addUnaryMethod<NotificationRequest, Empty>(
methodName: 'notify',
handler: _notify,
);
}
Stream<NotificationEvent> _subscribe(Empty request, {RpcContext? context}) {
return _notifications.stream;
}
Future<Empty> _notify(NotificationRequest request, {RpcContext? context}) async {
_notifications.add(NotificationEvent(
type: request.type,
message: request.message,
timestamp: DateTime.now(),
));
return Empty();
}
}

В отличие от HTTP запросов, WebSocket соединения остаются открытыми:

// Соединение остаётся активным для множественных операций
final transport = RpcWebSocketTransport(
url: 'ws://localhost:8080/rpc',
options: WebSocketTransportOptions(
heartbeatInterval: Duration(seconds: 30),
reconnectInterval: Duration(seconds: 5),
maxReconnectAttempts: 10,
),
);
final caller = RpcCallerEndpoint(transport: transport);
// Множественные вызовы через одно соединение
for (int i = 0; i < 100; i++) {
final result = await service.process(DataRequest(id: i));
print('Обработано: ${result.id}');
}

Настройка WebSocket сервера для RPC Dart:

websocket_server.dart
import 'dart:io';
import 'package:rpc_dart/rpc_dart.dart';
import 'package:rpc_dart_transports/rpc_dart_transports.dart';
class WebSocketRpcServer {
late HttpServer _server;
final List<RpcResponderEndpoint> _endpoints = [];
Future<void> start({
String host = 'localhost',
int port = 8080,
String path = '/rpc',
}) async {
_server = await HttpServer.bind(host, port);
print('WebSocket RPC сервер слушает на ws://$host:$port$path');
await for (HttpRequest request in _server) {
if (request.uri.path == path) {
await _handleWebSocketConnection(request);
} else {
request.response.statusCode = HttpStatus.notFound;
await request.response.close();
}
}
}
Future<void> _handleWebSocketConnection(HttpRequest request) async {
try {
final socket = await WebSocketTransformer.upgrade(request);
final transport = RpcWebSocketServerTransport(socket);
final endpoint = RpcResponderEndpoint(transport: transport);
// Регистрируем контракты сервисов
endpoint.registerServiceContract(CalculatorResponder());
endpoint.registerServiceContract(NotificationService());
endpoint.registerServiceContract(FileService());
_endpoints.add(endpoint);
// Начинаем обработку RPC вызовов
await endpoint.start();
// Очищаем ресурсы при закрытии соединения
socket.done.then((_) {
_endpoints.remove(endpoint);
endpoint.close();
});
} catch (e) {
print('Ошибка обработки WebSocket соединения: $e');
request.response.statusCode = HttpStatus.badRequest;
await request.response.close();
}
}
Future<void> stop() async {
await _server.close();
await Future.wait(_endpoints.map((e) => e.close()));
_endpoints.clear();
}
}

Подключение к WebSocket RPC серверу:

import 'package:rpc_dart/rpc_dart.dart';
import 'package:rpc_dart_transports/rpc_dart_transports.dart';
void main() async {
// Создаём WebSocket транспорт
final transport = RpcWebSocketTransport(
url: 'ws://localhost:8080/rpc',
options: WebSocketTransportOptions(
heartbeatInterval: Duration(seconds: 30),
reconnectInterval: Duration(seconds: 5),
maxReconnectAttempts: 10,
connectTimeout: Duration(seconds: 10),
),
);
// Создаём caller эндпоинт
final caller = RpcCallerEndpoint(transport: transport);
final calculator = CalculatorCaller(caller);
try {
// Ждём подключения
await transport.connect();
// Делаем RPC вызовы
final sum = await calculator.add(MathRequest(a: 10, b: 5));
print('10 + 5 = ${sum.result}');
// Используем потоковые методы
await for (final number in calculator.generateRange(RangeRequest(start: 1, end: 5))) {
print('Сгенерировано: ${number.value}');
}
} finally {
await caller.close();
}
}

Настройка поведения WebSocket транспорта:

final transport = RpcWebSocketTransport(
url: 'wss://secure-api.example.com/rpc',
options: WebSocketTransportOptions(
// Настройки подключения
connectTimeout: Duration(seconds: 10),
protocols: ['rpc-dart-v1'],
headers: {
'Authorization': 'Bearer $token',
'User-Agent': 'RpcDartClient/1.0',
},
// Heartbeat и переподключение
heartbeatInterval: Duration(seconds: 30),
reconnectInterval: Duration(seconds: 5),
maxReconnectAttempts: 10,
backoffMultiplier: 1.5,
// Обработка сообщений
maxMessageSize: 1024 * 1024, // 1MB
compression: CompressionOptions.defaultDeflate,
// Жизненный цикл соединения
onConnect: () => print('Подключён к RPC серверу'),
onDisconnect: (reason) => print('Отключён: $reason'),
onReconnect: (attempt) => print('Переподключение (попытка $attempt)'),
onError: (error) => print('Ошибка соединения: $error'),
),
);

Идеально для приложений, требующих мгновенных обновлений:

class ChatApplication {
late RpcCallerEndpoint _caller;
late ChatServiceCaller _chatService;
Future<void> connect(String serverUrl) async {
final transport = RpcWebSocketTransport(url: serverUrl);
_caller = RpcCallerEndpoint(transport: transport);
_chatService = ChatServiceCaller(_caller);
await transport.connect();
}
Future<void> startChat(String roomId) async {
// Подписываемся на сообщения
await for (final message in _chatService.subscribeToMessages(RoomRequest(roomId: roomId))) {
displayMessage(message);
}
}
Future<void> sendMessage(String roomId, String text) async {
await _chatService.sendMessage(SendMessageRequest(
roomId: roomId,
text: text,
timestamp: DateTime.now(),
));
}
void displayMessage(ChatMessage message) {
print('${message.author}: ${message.text}');
}
}

Эффективная потоковая передача данных в реальном времени:

class StockPriceMonitor {
late RpcCallerEndpoint _caller;
late MarketDataCaller _marketData;
Future<void> monitorStocks(List<String> symbols) async {
final transport = RpcWebSocketTransport(url: 'wss://market-data.example.com/rpc');
_caller = RpcCallerEndpoint(transport: transport);
_marketData = MarketDataCaller(_caller);
await transport.connect();
// Подписываемся на обновления цен
final subscription = _marketData.subscribeToPrices(SymbolListRequest(symbols: symbols));
await for (final priceUpdate in subscription) {
updateUI(priceUpdate);
}
}
void updateUI(PriceUpdate update) {
print('${update.symbol}: \$${update.price} (${update.change > 0 ? '+' : ''}${update.change})');
}
}

Обеспечение совместной работы в реальном времени:

class CollaborativeEditor {
late RpcCallerEndpoint _caller;
late EditorServiceCaller _editorService;
Future<void> joinDocument(String documentId) async {
final transport = RpcWebSocketTransport(url: 'wss://collab.example.com/rpc');
_caller = RpcCallerEndpoint(transport: transport);
_editorService = EditorServiceCaller(_caller);
await transport.connect();
// Подписываемся на изменения документа
final changes = _editorService.subscribeToChanges(DocumentRequest(documentId: documentId));
await for (final change in changes) {
applyChange(change);
}
}
Future<void> makeEdit(String documentId, EditOperation operation) async {
await _editorService.applyEdit(EditRequest(
documentId: documentId,
operation: operation,
authorId: currentUserId,
));
}
void applyChange(DocumentChange change) {
// Применяем изменение к локальному документу
editor.applyOperation(change.operation);
showAuthorCursor(change.authorId, change.position);
}
}

WebSocket транспорт обеспечивает эффективные возможности потоковой передачи:

// Сервер отправляет множественные ответы
class LogStreamService extends RpcResponderContract {
LogStreamService() : super('LogStream');
@override
void setup() {
addServerStreamMethod<LogFilter, LogEntry>(
methodName: 'streamLogs',
handler: _streamLogs,
);
}
Stream<LogEntry> _streamLogs(LogFilter filter, {RpcContext? context}) async* {
final logFile = File('application.log');
final stream = logFile.openRead();
await for (final line in stream.transform(utf8.decoder).transform(LineSplitter())) {
if (matchesFilter(line, filter)) {
yield LogEntry(
message: line,
timestamp: DateTime.now(),
level: extractLogLevel(line),
);
}
}
}
}
// Потребление клиентом
await for (final logEntry in logService.streamLogs(LogFilter(level: 'ERROR'))) {
print('${logEntry.timestamp}: ${logEntry.message}');
}
// Клиент отправляет множественные запросы
class BatchUploadService extends RpcResponderContract {
BatchUploadService() : super('BatchUpload');
@override
void setup() {
addClientStreamMethod<FileChunk, UploadResult>(
methodName: 'uploadFile',
handler: _uploadFile,
);
}
Future<UploadResult> _uploadFile(Stream<FileChunk> chunks, {RpcContext? context}) async {
final buffer = BytesBuilder();
int totalChunks = 0;
await for (final chunk in chunks) {
buffer.add(chunk.data);
totalChunks++;
}
final fileBytes = buffer.toBytes();
final filePath = await saveFile(fileBytes);
return UploadResult(
filePath: filePath,
size: fileBytes.length,
chunks: totalChunks,
);
}
}
// Использование клиента
Stream<FileChunk> uploadFile(File file) async* {
final bytes = await file.readAsBytes();
const chunkSize = 64 * 1024; // 64KB чанки
for (int i = 0; i < bytes.length; i += chunkSize) {
final end = math.min(i + chunkSize, bytes.length);
yield FileChunk(
data: bytes.sublist(i, end),
sequenceNumber: i ~/ chunkSize,
);
}
}
final result = await uploadService.uploadFile(uploadFile(myFile));
// Обработка в реальном времени с обратной связью
class ProcessingService extends RpcResponderContract {
ProcessingService() : super('Processing');
@override
void setup() {
addBidirectionalStreamMethod<ProcessRequest, ProcessResponse>(
methodName: 'processStream',
handler: _processStream,
);
}
Stream<ProcessResponse> _processStream(
Stream<ProcessRequest> requests,
{RpcContext? context}
) async* {
await for (final request in requests) {
try {
final result = await processData(request.data);
yield ProcessResponse(
id: request.id,
success: true,
result: result,
);
} catch (e) {
yield ProcessResponse(
id: request.id,
success: false,
error: e.toString(),
);
}
}
}
}

WebSocket транспорт изящно обрабатывает проблемы с соединением:

class ResilientClient {
late RpcWebSocketTransport _transport;
late RpcCallerEndpoint _caller;
late ServiceCaller _service;
Future<void> connect(String url) async {
_transport = RpcWebSocketTransport(
url: url,
options: WebSocketTransportOptions(
reconnectInterval: Duration(seconds: 5),
maxReconnectAttempts: 10,
backoffMultiplier: 1.5,
onReconnect: (attempt) {
print('Попытка переподключения $attempt');
},
onConnect: () {
print('Подключён к серверу');
_onConnected();
},
onDisconnect: (reason) {
print('Отключён: $reason');
_onDisconnected();
},
),
);
_caller = RpcCallerEndpoint(transport: _transport);
_service = ServiceCaller(_caller);
await _transport.connect();
}
void _onConnected() {
// Возобновляем операции
_resumeSubscriptions();
}
void _onDisconnected() {
// Обрабатываем отключение
_pauseOperations();
}
Future<void> _resumeSubscriptions() async {
// Переподписываемся на активные потоки
await _subscribeToNotifications();
}
void _pauseOperations() {
// Ставим операции в очередь для повтора
_queueOperations = true;
}
}
class ConnectionMonitor {
final RpcWebSocketTransport transport;
ConnectionMonitor(this.transport) {
_setupMonitoring();
}
void _setupMonitoring() {
transport.connectionState.listen((state) {
switch (state) {
case ConnectionState.connecting:
showStatus('Подключение...');
break;
case ConnectionState.connected:
showStatus('Подключён');
break;
case ConnectionState.disconnected:
showStatus('Отключён');
break;
case ConnectionState.reconnecting:
showStatus('Переподключение...');
break;
case ConnectionState.failed:
showStatus('Ошибка подключения');
break;
}
});
}
void showStatus(String message) {
print('Соединение: $message');
}
}

Используйте WSS для продакшена:

final transport = RpcWebSocketTransport(
url: 'wss://secure-api.example.com/rpc',
options: WebSocketTransportOptions(
headers: {
'Authorization': 'Bearer $authToken',
},
// Дополнительные заголовки безопасности
extraHeaders: {
'X-API-Key': apiKey,
'X-Client-Version': clientVersion,
},
),
);

Реализация правильной аутентификации:

class AuthenticatedWebSocketClient {
String? _authToken;
late RpcWebSocketTransport _transport;
Future<void> authenticate(String username, String password) async {
// Получаем токен аутентификации из сервиса авторизации
_authToken = await _getAuthToken(username, password);
// Создаём транспорт с заголовками авторизации
_transport = RpcWebSocketTransport(
url: 'wss://api.example.com/rpc',
options: WebSocketTransportOptions(
headers: {
'Authorization': 'Bearer $_authToken',
},
onConnect: () => _onAuthenticatedConnect(),
onDisconnect: (reason) => _onAuthenticatedDisconnect(reason),
),
);
await _transport.connect();
}
void _onAuthenticatedConnect() {
print('Аутентифицированное соединение установлено');
}
void _onAuthenticatedDisconnect(String reason) {
if (reason.contains('unauthorized')) {
// Токен истёк, повторная аутентификация
_refreshAuthentication();
}
}
Future<void> _refreshAuthentication() async {
// Логика обновления токена
_authToken = await _refreshAuthToken(_authToken!);
// Обновляем заголовки транспорта
_transport.updateHeaders({
'Authorization': 'Bearer $_authToken',
});
}
}

Оптимизация производительности с пакетированием сообщений:

class BatchingWebSocketTransport extends RpcWebSocketTransport {
final List<Message> _messageQueue = [];
Timer? _batchTimer;
@override
void sendMessage(Message message) {
_messageQueue.add(message);
_batchTimer?.cancel();
_batchTimer = Timer(Duration(milliseconds: 10), _flushBatch);
}
void _flushBatch() {
if (_messageQueue.isNotEmpty) {
final batch = MessageBatch(messages: List.from(_messageQueue));
super.sendMessage(batch);
_messageQueue.clear();
}
}
}

Для высокопроизводительных сценариев:

class WebSocketConnectionPool {
final List<RpcWebSocketTransport> _connections = [];
final String _baseUrl;
final int _poolSize;
int _currentIndex = 0;
WebSocketConnectionPool(this._baseUrl, this._poolSize);
Future<void> initialize() async {
for (int i = 0; i < _poolSize; i++) {
final transport = RpcWebSocketTransport(url: '$_baseUrl/rpc');
await transport.connect();
_connections.add(transport);
}
}
RpcWebSocketTransport getConnection() {
final connection = _connections[_currentIndex];
_currentIndex = (_currentIndex + 1) % _poolSize;
return connection;
}
Future<void> close() async {
await Future.wait(_connections.map((c) => c.close()));
_connections.clear();
}
}

1. Обрабатывайте жизненный цикл соединения

Заголовок раздела «1. Обрабатывайте жизненный цикл соединения»
class WebSocketClient {
Future<void> connect() async {
try {
await transport.connect();
await _initializeServices();
} catch (e) {
print('Не удалось подключиться: $e');
await _handleConnectionFailure();
}
}
Future<void> _initializeServices() async {
// Инициализируем сервисы после подключения
await _subscribeToRequiredStreams();
await _syncInitialState();
}
Future<void> _handleConnectionFailure() async {
// Реализуем резервную логику
await _switchToHttpTransport();
}
}
class AdaptiveClient {
Future<void> makeCall() async {
try {
if (transport.isConnected) {
return await _makeWebSocketCall();
} else {
return await _makeHttpCall();
}
} catch (e) {
print('WebSocket вызов не удался, переключаемся на HTTP: $e');
return await _makeHttpCall();
}
}
}
class PerformanceMonitor {
void trackWebSocketMetrics(RpcWebSocketTransport transport) {
transport.messagesSent.listen((count) {
print('Сообщений отправлено: $count');
});
transport.messagesReceived.listen((count) {
print('Сообщений получено: $count');
});
transport.latency.listen((duration) {
print('Текущая задержка: ${duration.inMilliseconds}мс');
});
}
}

WebSocket транспорт - идеальный выбор для приложений реального времени, систем совместной работы и сценариев, требующих постоянных соединений. Его двунаправленная природа и возможности потоковой передачи делают его совершенным для современных интерактивных приложений, сохраняя при этом надёжность и типобезопасность RPC Dart.