MacBook pro commited on
Commit
662ffc5
·
1 Parent(s): 495e6b0

Add ICE sampling & watchdog + /webrtc/ice_stats endpoint for prolonged checking diagnostics

Browse files
Files changed (1) hide show
  1. webrtc_server.py +90 -0
webrtc_server.py CHANGED
@@ -461,6 +461,9 @@ class PeerState:
461
  last_ice_state: Optional[str] = None
462
  cleanup_task: Optional[asyncio.Task] = None
463
  outbound_video: Optional['OutboundVideoTrack'] = None
 
 
 
464
 
465
 
466
  # In-memory single peer (extend to dict for multi-user)
@@ -1116,6 +1119,75 @@ async def webrtc_offer(offer: Dict[str, Any], x_api_key: Optional[str] = Header(
1116
 
1117
  _peer_state = PeerState(pc=pc, created=time.time(), outbound_video=outbound_video)
1118
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1119
  logger.info("WebRTC answer created")
1120
  return {"sdp": pc.localDescription.sdp, "type": pc.localDescription.type}
1121
 
@@ -1211,6 +1283,24 @@ async def pipeline_stats():
1211
  except Exception as e:
1212
  return {"error": str(e)}
1213
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1214
  # Optional: connection monitoring endpoint for diagnostics
1215
  if add_connection_monitoring is not None:
1216
  try:
 
461
  last_ice_state: Optional[str] = None
462
  cleanup_task: Optional[asyncio.Task] = None
463
  outbound_video: Optional['OutboundVideoTrack'] = None
464
+ ice_samples: list[dict[str, Any]] = None # rolling ICE stats snapshots
465
+ ice_sampler_task: Optional[asyncio.Task] = None
466
+ ice_watchdog_task: Optional[asyncio.Task] = None
467
 
468
 
469
  # In-memory single peer (extend to dict for multi-user)
 
1119
 
1120
  _peer_state = PeerState(pc=pc, created=time.time(), outbound_video=outbound_video)
1121
 
1122
+ # Initialize ice_samples list
1123
+ _peer_state.ice_samples = []
1124
+
1125
+ async def _sample_ice_loop(state_ref: PeerState): # pragma: no cover diagnostic
1126
+ try:
1127
+ while True:
1128
+ await asyncio.sleep(3)
1129
+ pc_local = state_ref.pc
1130
+ if pc_local.connectionState in ("closed", "failed"):
1131
+ break
1132
+ try:
1133
+ stats = await pc_local.getStats()
1134
+ summary = {
1135
+ 'ts': time.time(),
1136
+ 'connectionState': pc_local.connectionState,
1137
+ 'iceState': pc_local.iceConnectionState,
1138
+ 'pairs': 0,
1139
+ 'succeeded_pairs': 0,
1140
+ 'nominated_pairs': 0,
1141
+ 'local_candidates': 0,
1142
+ 'remote_candidates': 0,
1143
+ }
1144
+ for sid, rep in stats.items():
1145
+ tp = getattr(rep, 'type', None)
1146
+ if tp == 'candidate-pair':
1147
+ summary['pairs'] += 1
1148
+ st = getattr(rep, 'state', None)
1149
+ if st == 'succeeded':
1150
+ summary['succeeded_pairs'] += 1
1151
+ if getattr(rep, 'nominated', False):
1152
+ summary['nominated_pairs'] += 1
1153
+ elif tp == 'local-candidate':
1154
+ summary['local_candidates'] += 1
1155
+ elif tp == 'remote-candidate':
1156
+ summary['remote_candidates'] += 1
1157
+ samples = state_ref.ice_samples
1158
+ if samples is not None:
1159
+ samples.append(summary)
1160
+ # Keep last 20 samples
1161
+ if len(samples) > 20:
1162
+ samples.pop(0)
1163
+ except Exception as e:
1164
+ logger.debug(f"ICE sample failed: {e}")
1165
+ # Stop sampling if connected (we keep last snapshots)
1166
+ if pc_local.connectionState == 'connected':
1167
+ break
1168
+ except Exception:
1169
+ pass
1170
+
1171
+ async def _ice_watchdog(state_ref: PeerState): # pragma: no cover diagnostic
1172
+ try:
1173
+ # Wait 18s; if still 'checking' without any succeeded pair, close to force client retry logic
1174
+ await asyncio.sleep(18)
1175
+ pc_local = state_ref.pc
1176
+ if pc_local.iceConnectionState == 'checking' and pc_local.connectionState == 'connecting':
1177
+ # Inspect latest sample to confirm lack of progress
1178
+ last = state_ref.ice_samples[-1] if state_ref.ice_samples else {}
1179
+ if last.get('succeeded_pairs', 0) == 0:
1180
+ logger.warning('ICE watchdog: still checking after 18s with 0 succeeded pairs - closing PC to unblock client')
1181
+ try:
1182
+ await pc_local.close()
1183
+ except Exception:
1184
+ pass
1185
+ except Exception:
1186
+ pass
1187
+
1188
+ _peer_state.ice_sampler_task = asyncio.create_task(_sample_ice_loop(_peer_state))
1189
+ _peer_state.ice_watchdog_task = asyncio.create_task(_ice_watchdog(_peer_state))
1190
+
1191
  logger.info("WebRTC answer created")
1192
  return {"sdp": pc.localDescription.sdp, "type": pc.localDescription.type}
1193
 
 
1283
  except Exception as e:
1284
  return {"error": str(e)}
1285
 
1286
+ @router.get("/ice_stats")
1287
+ async def ice_stats(): # pragma: no cover diagnostic endpoint
1288
+ try:
1289
+ st = _peer_state
1290
+ if st is None:
1291
+ return {"active": False}
1292
+ samples = st.ice_samples or []
1293
+ latest = samples[-1] if samples else None
1294
+ return {
1295
+ 'active': True,
1296
+ 'connectionState': getattr(st.pc, 'connectionState', None),
1297
+ 'iceState': getattr(st.pc, 'iceConnectionState', None),
1298
+ 'samples': samples,
1299
+ 'latest': latest,
1300
+ }
1301
+ except Exception as e:
1302
+ return {'active': False, 'error': str(e)}
1303
+
1304
  # Optional: connection monitoring endpoint for diagnostics
1305
  if add_connection_monitoring is not None:
1306
  try: