Neda/Front/lib/services/ptt_service.dart

342 lines
11 KiB
Dart

// PTT Protocol (WebSocket + LiveKit):
//
// 1. connectToGroup(groupId, jwtToken, livekitUrl)
// → WS /ws/groups/{groupId}?token={jwtToken}
// → دریافت {"type":"livekit_token","token":"..."} از سرور
// → اتصال به LiveKit با listener token (can_publish=false)
//
// 2. startSpeaking()
// → ارسال {"type":"request_speak"} روی WS
// → دریافت {"type":"speaker_granted","token":"..."} یا {"type":"speaker_busy","speaker":"..."}
// → اگه granted: اتصال مجدد LiveKit با speaker token (can_publish=true) + روشن کردن میک
//
// 3. stopSpeaking()
// → خاموش کردن میک
// → ارسال {"type":"stop_speak"} روی WS
//
// Broadcasts از سرور:
// {"type":"speaker","user_id":"..."} → کسی شروع به صحبت کرد
// {"type":"speaker_released"} → خط آزاد شد
// {"type":"presence","users":[...]} → لیست آنلاین‌ها
import 'dart:async';
import 'dart:convert';
import 'dart:io';
import 'package:livekit_client/livekit_client.dart';
import '../config/app_config.dart';
enum PttState { idle, connected, speaking, receiving }
class PttService {
WebSocket? _ws;
Room? _room;
EventsListener<RoomEvent>? _listener;
String? _livekitUrl;
// completers برای دریافت async پیام‌های WS
Completer<String?>? _listenerTokenCompleter;
Completer<String?>? _speakGrantCompleter;
final _stateCtrl = StreamController<PttState>.broadcast();
final _speakerCtrl = StreamController<String?>.broadcast();
final _errorCtrl = StreamController<String>.broadcast();
PttState _state = PttState.idle;
String? _speakerName;
Stream<PttState> get stateStream => _stateCtrl.stream;
Stream<String?> get speakerStream => _speakerCtrl.stream;
Stream<String> get errorStream => _errorCtrl.stream;
PttState get currentState => _state;
String? get currentSpeaker => _speakerName;
void _setState(PttState s) {
_state = s;
_stateCtrl.add(s);
}
// ── اتصال به گروه ────────────────────────────────────────────────────────
Future<bool> connectToGroup(
String groupId,
String jwtToken,
String livekitUrl,
) async {
if (AppConfig.debug) {
await Future.delayed(const Duration(milliseconds: 400));
_setState(PttState.connected);
return true;
}
_livekitUrl = livekitUrl;
try {
final wsUrl = '${AppConfig.wsBaseUrl}/ws/groups/$groupId?token=$jwtToken';
_ws = await WebSocket.connect(wsUrl);
_listenerTokenCompleter = Completer<String?>();
_ws!.listen(
(data) {
if (data is String) {
final msg = jsonDecode(data) as Map<String, dynamic>;
_handleWsMessage(msg);
}
},
onError: (_) {
_listenerTokenCompleter?.complete(null);
if (_state != PttState.idle) {
_setState(PttState.idle);
_errorCtrl.add('اتصال به سرور قطع شد');
}
},
onDone: () {
_listenerTokenCompleter?.complete(null);
if (_state != PttState.idle) {
_setState(PttState.idle);
}
},
cancelOnError: true,
);
// منتظر دریافت listener token از سرور
final listenerToken = await _listenerTokenCompleter!.future.timeout(
const Duration(seconds: 10),
onTimeout: () => null,
);
_listenerTokenCompleter = null;
if (listenerToken == null) {
_errorCtrl.add('دریافت توکن ناموفق بود');
return false;
}
return await _connectLiveKit(livekitUrl, listenerToken);
} catch (_) {
_errorCtrl.add('اتصال به سرور برقرار نشد');
return false;
}
}
// ── پردازش پیام‌های WebSocket ─────────────────────────────────────────────
void _handleWsMessage(Map<String, dynamic> msg) {
final type = msg['type'] as String?;
switch (type) {
case 'livekit_token':
// اولین پیام بعد از اتصال: listener token
if (_listenerTokenCompleter != null &&
!_listenerTokenCompleter!.isCompleted) {
_listenerTokenCompleter!.complete(msg['token'] as String?);
}
case 'speaker_granted':
// درخواست صحبت تایید شد
if (_speakGrantCompleter != null &&
!_speakGrantCompleter!.isCompleted) {
_speakGrantCompleter!.complete(msg['token'] as String?);
}
case 'speaker_busy':
// خط اشغاله
if (_speakGrantCompleter != null &&
!_speakGrantCompleter!.isCompleted) {
_speakGrantCompleter!.complete(null);
}
_errorCtrl.add('خط اشغال است');
case 'speaker':
// broadcast: کسی مشغول صحبت شد
if (_state != PttState.speaking) {
final userId = msg['user_id'] as String?;
_speakerName = userId;
_speakerCtrl.add(userId);
_setState(PttState.receiving);
}
case 'speaker_released':
// broadcast: خط آزاد شد
if (_state == PttState.receiving) {
_speakerName = null;
_speakerCtrl.add(null);
_setState(PttState.connected);
}
}
}
// ── اتصال به LiveKit ──────────────────────────────────────────────────────
Future<bool> _connectLiveKit(
String url,
String token, {
bool setConnectedState = true,
}) async {
try {
try {
await _listener?.dispose();
} catch (_) {}
try {
await _room?.disconnect().timeout(
const Duration(seconds: 2),
onTimeout: () => null,
);
} catch (_) {}
_room = Room(
roomOptions: const RoomOptions(adaptiveStream: false, dynacast: false),
);
_listener = _room!.createListener();
_listener!
..on<ActiveSpeakersChangedEvent>((e) => _onSpeakersChanged(e.speakers))
..on<RoomDisconnectedEvent>((_) {
if (_state != PttState.speaking) _setState(PttState.idle);
});
await _room!.connect(url, token);
await _room!.localParticipant?.setMicrophoneEnabled(false);
if (setConnectedState) _setState(PttState.connected);
return true;
} catch (_) {
_errorCtrl.add('اتصال به سرور صوتی برقرار نشد');
return false;
}
}
// ── تشخیص متکلم ──────────────────────────────────────────────────────────
void _onSpeakersChanged(List<Participant> speakers) {
if (_state == PttState.speaking) return;
final remoteSpeakers = speakers.whereType<RemoteParticipant>().toList();
if (remoteSpeakers.isNotEmpty) {
final p = remoteSpeakers.first;
_speakerName = p.name.isNotEmpty ? p.name : p.identity;
_speakerCtrl.add(_speakerName);
_setState(PttState.receiving);
} else if (_state == PttState.receiving) {
_speakerName = null;
_speakerCtrl.add(null);
_setState(PttState.connected);
}
}
// ── PTT: شروع صحبت ────────────────────────────────────────────────────────
Future<void> startSpeaking() async {
if (_state != PttState.connected) return;
if (AppConfig.debug) {
_setState(PttState.speaking);
return;
}
_speakGrantCompleter = Completer<String?>();
_ws?.add(jsonEncode({'type': 'request_speak'}));
try {
final speakerToken = await _speakGrantCompleter!.future.timeout(
const Duration(seconds: 5),
onTimeout: () => null,
);
if (speakerToken == null) return; // پیام خطا قبلاً emit شده
// اتصال مجدد LiveKit با speaker token
final ok = await _connectLiveKit(
_livekitUrl!,
speakerToken,
setConnectedState: false,
);
if (!ok) return;
await _room?.localParticipant?.setMicrophoneEnabled(true);
_setState(PttState.speaking);
} catch (_) {
_errorCtrl.add('خطا در فعال‌سازی میکروفون');
} finally {
_speakGrantCompleter = null;
}
}
// ── PTT: پایان صحبت ──────────────────────────────────────────────────────
Future<void> stopSpeaking() async {
if (_state != PttState.speaking) return;
if (AppConfig.debug) {
_setState(PttState.connected);
_debugSimulateIncoming();
return;
}
try {
await _room?.localParticipant?.setMicrophoneEnabled(false);
} catch (_) {}
_ws?.add(jsonEncode({'type': 'stop_speak'}));
_setState(PttState.connected);
}
// ── Debug helper ──────────────────────────────────────────────────────────
Future<void> _debugSimulateIncoming() async {
await Future.delayed(const Duration(milliseconds: 800));
if (_state != PttState.connected) return;
_speakerName = 'کاربر تست';
_speakerCtrl.add(_speakerName);
_setState(PttState.receiving);
await Future.delayed(const Duration(seconds: 2));
if (_state != PttState.receiving) return;
_speakerName = null;
_speakerCtrl.add(null);
_setState(PttState.connected);
}
// ── قطع اتصال ────────────────────────────────────────────────────────────
Future<void> disconnect() async {
if (AppConfig.debug) {
_setState(PttState.idle);
return;
}
try {
await _room?.localParticipant?.setMicrophoneEnabled(false);
} catch (_) {}
try {
await _listener?.dispose();
} catch (_) {}
try {
if (_room != null) {
// LiveKit's disconnect can sometimes timeout or throw,
// we wrap it to ensure cleanup proceeds.
await _room!.disconnect().timeout(
const Duration(seconds: 3),
onTimeout: () => null,
);
}
} catch (e) {
// Ignore disconnect errors as we are shutting down
} finally {
_room = null;
_listener = null;
}
try {
await _ws?.close();
} catch (_) {}
_ws = null;
_setState(PttState.idle);
}
Future<void> dispose() async {
await disconnect();
await _stateCtrl.close();
await _speakerCtrl.close();
await _errorCtrl.close();
}
}