Akashmj22122002's picture
Upload folder using huggingface_hub
14edff4 verified
from __future__ import annotations
import os
import queue
import random
import threading
import time
from functools import cached_property
from typing import Any
import httpx
from ..logger import logger
from .processor_interface import TracingExporter, TracingProcessor
from .spans import Span
from .traces import Trace
class ConsoleSpanExporter(TracingExporter):
"""Prints the traces and spans to the console."""
def export(self, items: list[Trace | Span[Any]]) -> None:
for item in items:
if isinstance(item, Trace):
print(f"[Exporter] Export trace_id={item.trace_id}, name={item.name}")
else:
print(f"[Exporter] Export span: {item.export()}")
class BackendSpanExporter(TracingExporter):
def __init__(
self,
api_key: str | None = None,
organization: str | None = None,
project: str | None = None,
endpoint: str = "https://api.openai.com/v1/traces/ingest",
max_retries: int = 3,
base_delay: float = 1.0,
max_delay: float = 30.0,
):
"""
Args:
api_key: The API key for the "Authorization" header. Defaults to
`os.environ["OPENAI_API_KEY"]` if not provided.
organization: The OpenAI organization to use. Defaults to
`os.environ["OPENAI_ORG_ID"]` if not provided.
project: The OpenAI project to use. Defaults to
`os.environ["OPENAI_PROJECT_ID"]` if not provided.
endpoint: The HTTP endpoint to which traces/spans are posted.
max_retries: Maximum number of retries upon failures.
base_delay: Base delay (in seconds) for the first backoff.
max_delay: Maximum delay (in seconds) for backoff growth.
"""
self._api_key = api_key
self._organization = organization
self._project = project
self.endpoint = endpoint
self.max_retries = max_retries
self.base_delay = base_delay
self.max_delay = max_delay
# Keep a client open for connection pooling across multiple export calls
self._client = httpx.Client(timeout=httpx.Timeout(timeout=60, connect=5.0))
def set_api_key(self, api_key: str):
"""Set the OpenAI API key for the exporter.
Args:
api_key: The OpenAI API key to use. This is the same key used by the OpenAI Python
client.
"""
# Clear the cached property if it exists
if "api_key" in self.__dict__:
del self.__dict__["api_key"]
# Update the private attribute
self._api_key = api_key
@cached_property
def api_key(self):
return self._api_key or os.environ.get("OPENAI_API_KEY")
@cached_property
def organization(self):
return self._organization or os.environ.get("OPENAI_ORG_ID")
@cached_property
def project(self):
return self._project or os.environ.get("OPENAI_PROJECT_ID")
def export(self, items: list[Trace | Span[Any]]) -> None:
if not items:
return
if not self.api_key:
logger.warning("OPENAI_API_KEY is not set, skipping trace export")
return
data = [item.export() for item in items if item.export()]
payload = {"data": data}
headers = {
"Authorization": f"Bearer {self.api_key}",
"Content-Type": "application/json",
"OpenAI-Beta": "traces=v1",
}
if self.organization:
headers["OpenAI-Organization"] = self.organization
if self.project:
headers["OpenAI-Project"] = self.project
# Exponential backoff loop
attempt = 0
delay = self.base_delay
while True:
attempt += 1
try:
response = self._client.post(url=self.endpoint, headers=headers, json=payload)
# If the response is successful, break out of the loop
if response.status_code < 300:
logger.debug(f"Exported {len(items)} items")
return
# If the response is a client error (4xx), we won't retry
if 400 <= response.status_code < 500:
logger.error(
f"[non-fatal] Tracing client error {response.status_code}: {response.text}"
)
return
# For 5xx or other unexpected codes, treat it as transient and retry
logger.warning(
f"[non-fatal] Tracing: server error {response.status_code}, retrying."
)
except httpx.RequestError as exc:
# Network or other I/O error, we'll retry
logger.warning(f"[non-fatal] Tracing: request failed: {exc}")
# If we reach here, we need to retry or give up
if attempt >= self.max_retries:
logger.error("[non-fatal] Tracing: max retries reached, giving up on this batch.")
return
# Exponential backoff + jitter
sleep_time = delay + random.uniform(0, 0.1 * delay) # 10% jitter
time.sleep(sleep_time)
delay = min(delay * 2, self.max_delay)
def close(self):
"""Close the underlying HTTP client."""
self._client.close()
class BatchTraceProcessor(TracingProcessor):
"""Some implementation notes:
1. Using Queue, which is thread-safe.
2. Using a background thread to export spans, to minimize any performance issues.
3. Spans are stored in memory until they are exported.
"""
def __init__(
self,
exporter: TracingExporter,
max_queue_size: int = 8192,
max_batch_size: int = 128,
schedule_delay: float = 5.0,
export_trigger_ratio: float = 0.7,
):
"""
Args:
exporter: The exporter to use.
max_queue_size: The maximum number of spans to store in the queue. After this, we will
start dropping spans.
max_batch_size: The maximum number of spans to export in a single batch.
schedule_delay: The delay between checks for new spans to export.
export_trigger_ratio: The ratio of the queue size at which we will trigger an export.
"""
self._exporter = exporter
self._queue: queue.Queue[Trace | Span[Any]] = queue.Queue(maxsize=max_queue_size)
self._max_queue_size = max_queue_size
self._max_batch_size = max_batch_size
self._schedule_delay = schedule_delay
self._shutdown_event = threading.Event()
# The queue size threshold at which we export immediately.
self._export_trigger_size = max(1, int(max_queue_size * export_trigger_ratio))
# Track when we next *must* perform a scheduled export
self._next_export_time = time.time() + self._schedule_delay
# We lazily start the background worker thread the first time a span/trace is queued.
self._worker_thread: threading.Thread | None = None
self._thread_start_lock = threading.Lock()
def _ensure_thread_started(self) -> None:
# Fast path without holding the lock
if self._worker_thread and self._worker_thread.is_alive():
return
# Double-checked locking to avoid starting multiple threads
with self._thread_start_lock:
if self._worker_thread and self._worker_thread.is_alive():
return
self._worker_thread = threading.Thread(target=self._run, daemon=True)
self._worker_thread.start()
def on_trace_start(self, trace: Trace) -> None:
# Ensure the background worker is running before we enqueue anything.
self._ensure_thread_started()
try:
self._queue.put_nowait(trace)
except queue.Full:
logger.warning("Queue is full, dropping trace.")
def on_trace_end(self, trace: Trace) -> None:
# We send traces via on_trace_start, so we don't need to do anything here.
pass
def on_span_start(self, span: Span[Any]) -> None:
# We send spans via on_span_end, so we don't need to do anything here.
pass
def on_span_end(self, span: Span[Any]) -> None:
# Ensure the background worker is running before we enqueue anything.
self._ensure_thread_started()
try:
self._queue.put_nowait(span)
except queue.Full:
logger.warning("Queue is full, dropping span.")
def shutdown(self, timeout: float | None = None):
"""
Called when the application stops. We signal our thread to stop, then join it.
"""
self._shutdown_event.set()
# Only join if we ever started the background thread; otherwise flush synchronously.
if self._worker_thread and self._worker_thread.is_alive():
self._worker_thread.join(timeout=timeout)
else:
# No background thread: process any remaining items synchronously.
self._export_batches(force=True)
def force_flush(self):
"""
Forces an immediate flush of all queued spans.
"""
self._export_batches(force=True)
def _run(self):
while not self._shutdown_event.is_set():
current_time = time.time()
queue_size = self._queue.qsize()
# If it's time for a scheduled flush or queue is above the trigger threshold
if current_time >= self._next_export_time or queue_size >= self._export_trigger_size:
self._export_batches(force=False)
# Reset the next scheduled flush time
self._next_export_time = time.time() + self._schedule_delay
else:
# Sleep a short interval so we don't busy-wait.
time.sleep(0.2)
# Final drain after shutdown
self._export_batches(force=True)
def _export_batches(self, force: bool = False):
"""Drains the queue and exports in batches. If force=True, export everything.
Otherwise, export up to `max_batch_size` repeatedly until the queue is completely empty.
"""
while True:
items_to_export: list[Span[Any] | Trace] = []
# Gather a batch of spans up to max_batch_size
while not self._queue.empty() and (
force or len(items_to_export) < self._max_batch_size
):
try:
items_to_export.append(self._queue.get_nowait())
except queue.Empty:
# Another thread might have emptied the queue between checks
break
# If we collected nothing, we're done
if not items_to_export:
break
# Export the batch
self._exporter.export(items_to_export)
# Create a shared global instance:
_global_exporter = BackendSpanExporter()
_global_processor = BatchTraceProcessor(_global_exporter)
def default_exporter() -> BackendSpanExporter:
"""The default exporter, which exports traces and spans to the backend in batches."""
return _global_exporter
def default_processor() -> BatchTraceProcessor:
"""The default processor, which exports traces and spans to the backend in batches."""
return _global_processor