File size: 4,385 Bytes
ba71a1e
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
69bb7ad
 
ba71a1e
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
9fd5c13
 
 
 
 
 
 
 
ba71a1e
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
9fd5c13
ba71a1e
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
from __future__ import annotations

import threading
import time
from collections import deque
from typing import Deque, Dict, Any


class Metrics:
    """Simple in-process metrics accumulator with lightweight locking.

    Tracks audio/video counts, sizes, EMAs, and rolling FPS.
    """

    def __init__(self, fps_window: int = 30, ema_alpha: float = 0.2) -> None:
        self._lock = threading.Lock()
        # Audio counters
        self.audio_chunks = 0
        self.audio_bytes = 0
        self.audio_avg_chunk_size = 0.0
        self.audio_loop_interval_ema = 0.0  # ms between loop ticks (if fed)
        self.audio_infer_time_ema = 0.0  # ms processing (placeholder for future inference)
        self._last_audio_ts = None  # type: ignore

        # Video counters
        self.video_frames = 0
        self.video_bytes = 0
        self.video_avg_frame_size = 0.0
        self._fps_window = fps_window
        self._frame_times: Deque[float] = deque(maxlen=fps_window)
        self.video_frame_interval_ema = 0.0
        self._last_video_ts = None  # type: ignore

        self.ema_alpha = ema_alpha

    # ---------------- Audio -----------------
    def record_audio_chunk(self, size_bytes: int, loop_interval_ms: float | None = None, infer_time_ms: float | None = None) -> None:
        with self._lock:
            self.audio_chunks += 1
            self.audio_bytes += size_bytes
            # Running average chunk size
            self.audio_avg_chunk_size = ((self.audio_avg_chunk_size * (self.audio_chunks - 1)) + size_bytes) / self.audio_chunks
            now = time.time() * 1000.0
            if loop_interval_ms is None and self._last_audio_ts is not None:
                loop_interval_ms = now - self._last_audio_ts
            if loop_interval_ms is not None:
                if self.audio_loop_interval_ema == 0.0:
                    self.audio_loop_interval_ema = loop_interval_ms
                else:
                    self.audio_loop_interval_ema = (self.ema_alpha * loop_interval_ms) + (1 - self.ema_alpha) * self.audio_loop_interval_ema
            self._last_audio_ts = now
            if infer_time_ms is not None:
                if self.audio_infer_time_ema == 0.0:
                    self.audio_infer_time_ema = infer_time_ms
                else:
                    self.audio_infer_time_ema = (self.ema_alpha * infer_time_ms) + (1 - self.ema_alpha) * self.audio_infer_time_ema

    # ---------------- Video -----------------
    def record_video_frame(self, size_bytes: int) -> None:
        with self._lock:
            self.video_frames += 1
            self.video_bytes += size_bytes
            self.video_avg_frame_size = ((self.video_avg_frame_size * (self.video_frames - 1)) + size_bytes) / self.video_frames
            now = time.time()
            self._frame_times.append(now)
            # Frame interval EMA (ms)
            if self._last_video_ts is not None:
                interval_ms = (now - self._last_video_ts) * 1000.0
                if self.video_frame_interval_ema == 0.0:
                    self.video_frame_interval_ema = interval_ms
                else:
                    self.video_frame_interval_ema = (self.ema_alpha * interval_ms) + (1 - self.ema_alpha) * self.video_frame_interval_ema
            self._last_video_ts = now

    # --------------- Report ------------------
    def snapshot(self) -> Dict[str, Any]:
        with self._lock:
            fps = 0.0
            if len(self._frame_times) > 1:
                span = self._frame_times[-1] - self._frame_times[0]
                if span > 0:
                    fps = (len(self._frame_times) - 1) / span
            return {
                "audio_chunks": self.audio_chunks,
                "audio_bytes": self.audio_bytes,
                "audio_avg_chunk_size": self.audio_avg_chunk_size,
                "audio_loop_interval_ema_ms": self.audio_loop_interval_ema,
                "audio_infer_time_ema_ms": self.audio_infer_time_ema,
                "video_frames": self.video_frames,
                "video_bytes": self.video_bytes,
                "video_avg_frame_size": self.video_avg_frame_size,
                "video_fps_rolling": fps,
                "video_frame_interval_ema_ms": self.video_frame_interval_ema,
                "fps_window": self._fps_window,
            }

# Global singleton for simple use
metrics = Metrics()