import 'dart:async'; import 'dart:convert'; import 'package:stomp_dart_client/stomp_dart_client.dart'; import '../../core/constants/api_endpoints.dart'; import '../models/kline_candle.dart'; /// K线 WebSocket 服务(STOMP 协议) class KlineWebSocketService { StompClient? _stompClient; final Map _subscriptions = {}; final Map> _controllers = {}; bool _isConnected = false; bool _isConnecting = false; int _reconnectDelay = 2000; // 初始重连延迟 static const int _maxReconnectDelay = 30000; /// 订阅某个币种的K线数据 Stream subscribe(String coinCode) { final key = coinCode.toUpperCase(); if (!_controllers.containsKey(key)) { _controllers[key] = StreamController.broadcast(); } _doSubscribe(key); return _controllers[key]!.stream; } /// 取消订阅 void unsubscribe(String coinCode) { final key = coinCode.toUpperCase(); _subscriptions[key]?.call(); _subscriptions.remove(key); } /// 连接状态 bool get isConnected => _isConnected; /// 连接 WebSocket void connect() { if (_isConnecting || _isConnected) return; _isConnecting = true; final wsUrl = ApiEndpoints.klineWs; _stompClient = StompClient( config: StompConfig( url: wsUrl, onConnect: _onConnect, onDisconnect: _onDisconnect, onStompError: _onError, onWebSocketError: _onError, reconnectDelay: const Duration(milliseconds: 5000), heartbeatIncoming: const Duration(seconds: 20), heartbeatOutgoing: const Duration(seconds: 20), ), ); _stompClient!.activate(); } /// 断开连接 void disconnect() { _isConnecting = false; _stompClient?.deactivate(); _stompClient = null; _isConnected = false; _subscriptions.clear(); } void _onConnect(StompFrame frame) { _isConnected = true; _isConnecting = false; _reconnectDelay = 2000; // 重置重连延迟 // 重新订阅所有已注册的币种 for (final key in _controllers.keys) { _doSubscribe(key); } } void _onDisconnect(StompFrame? frame) { _isConnected = false; _isConnecting = false; _scheduleReconnect(); } void _onError(dynamic error) { _isConnected = false; _isConnecting = false; _scheduleReconnect(); } void _scheduleReconnect() { Future.delayed(Duration(milliseconds: _reconnectDelay), () { if (!_isConnected && !_isConnecting) { _reconnectDelay = (_reconnectDelay * 2).clamp(2000, _maxReconnectDelay); connect(); } }); } void _doSubscribe(String coinCode) { if (_stompClient == null || !_isConnected) { connect(); // 触发连接,连接成功后会自动重新订阅 return; } // 避免重复订阅 if (_subscriptions.containsKey(coinCode)) return; final dest = '/topic/kline/$coinCode'; final sub = _stompClient!.subscribe( destination: dest, callback: (StompFrame frame) { if (frame.body != null) { try { final json = jsonDecode(frame.body!) as Map; final candle = KlineCandle.fromJson(json); _controllers[coinCode]?.add(candle); } catch (_) {} } }, ); _subscriptions[coinCode] = sub; } /// 释放资源 void dispose() { disconnect(); for (final controller in _controllers.values) { controller.close(); } _controllers.clear(); } }