""" Module: services.compression_service Description: Advanced compression service with metrics and optimization Author: Anderson H. Silva Date: 2025-01-25 License: Proprietary - All rights reserved """ import time from typing import Dict, Any, Optional, Tuple, List from enum import Enum import gzip import zlib from collections import defaultdict from datetime import datetime, timedelta, timezone from src.core import get_logger from src.core.config import settings logger = get_logger(__name__) try: import brotli HAS_BROTLI = True except ImportError: HAS_BROTLI = False brotli = None try: import zstandard as zstd HAS_ZSTD = True except ImportError: HAS_ZSTD = False zstd = None class CompressionAlgorithm(str, Enum): """Available compression algorithms.""" GZIP = "gzip" BROTLI = "br" ZSTD = "zstd" DEFLATE = "deflate" IDENTITY = "identity" # No compression class CompressionProfile: """Compression profile for different content types.""" def __init__( self, algorithm: CompressionAlgorithm, level: int, min_size: int = 1024, max_size: Optional[int] = None ): self.algorithm = algorithm self.level = level self.min_size = min_size self.max_size = max_size class CompressionService: """Service for managing response compression.""" # Default compression profiles by content type DEFAULT_PROFILES = { "application/json": CompressionProfile( CompressionAlgorithm.BROTLI if HAS_BROTLI else CompressionAlgorithm.GZIP, level=4, min_size=1024 ), "text/html": CompressionProfile( CompressionAlgorithm.BROTLI if HAS_BROTLI else CompressionAlgorithm.GZIP, level=6, min_size=512 ), "text/plain": CompressionProfile( CompressionAlgorithm.GZIP, level=6, min_size=1024 ), "application/javascript": CompressionProfile( CompressionAlgorithm.BROTLI if HAS_BROTLI else CompressionAlgorithm.GZIP, level=5, min_size=512 ), "text/css": CompressionProfile( CompressionAlgorithm.BROTLI if HAS_BROTLI else CompressionAlgorithm.GZIP, level=6, min_size=256 ), "application/xml": CompressionProfile( CompressionAlgorithm.GZIP, level=6, min_size=1024 ), "text/csv": CompressionProfile( CompressionAlgorithm.GZIP, level=9, # CSVs compress very well min_size=2048 ) } def __init__(self): """Initialize compression service.""" self._metrics = defaultdict(lambda: { "total_bytes": 0, "compressed_bytes": 0, "compression_time": 0, "count": 0 }) self._algorithm_stats = defaultdict(lambda: { "used": 0, "total_saved": 0, "avg_ratio": 0 }) self._content_type_stats = defaultdict(lambda: { "count": 0, "avg_size": 0, "avg_compressed": 0 }) def compress( self, data: bytes, content_type: str, accept_encoding: str, force_algorithm: Optional[CompressionAlgorithm] = None ) -> Tuple[bytes, str, Dict[str, Any]]: """ Compress data using the best available algorithm. Returns: Tuple of (compressed_data, encoding, metrics) """ start_time = time.time() original_size = len(data) # Get compression profile profile = self._get_profile(content_type) # Check size limits if original_size < profile.min_size: return data, "identity", { "reason": "below_min_size", "original_size": original_size, "min_size": profile.min_size } if profile.max_size and original_size > profile.max_size: return data, "identity", { "reason": "above_max_size", "original_size": original_size, "max_size": profile.max_size } # Choose algorithm if force_algorithm: algorithm = force_algorithm else: algorithm = self._choose_algorithm(accept_encoding, profile) # Compress try: compressed_data, encoding = self._compress_with_algorithm( data, algorithm, profile.level ) compression_time = time.time() - start_time compressed_size = len(compressed_data) ratio = 1 - (compressed_size / original_size) # Update metrics self._update_metrics( content_type, algorithm, original_size, compressed_size, compression_time ) metrics = { "algorithm": algorithm, "original_size": original_size, "compressed_size": compressed_size, "ratio": ratio, "saved_bytes": original_size - compressed_size, "compression_time_ms": compression_time * 1000, "throughput_mbps": (original_size / compression_time / 1024 / 1024) if compression_time > 0 else 0 } logger.debug( "compression_completed", content_type=content_type, algorithm=algorithm, ratio=f"{ratio:.1%}", time_ms=f"{compression_time * 1000:.1f}" ) return compressed_data, encoding, metrics except Exception as e: logger.error( "compression_failed", algorithm=algorithm, error=str(e) ) return data, "identity", {"error": str(e)} def _get_profile(self, content_type: str) -> CompressionProfile: """Get compression profile for content type.""" # Extract base content type base_type = content_type.split(";")[0].strip().lower() # Check exact match if base_type in self.DEFAULT_PROFILES: return self.DEFAULT_PROFILES[base_type] # Check prefix match if base_type.startswith("text/"): return CompressionProfile(CompressionAlgorithm.GZIP, level=6) if base_type.startswith("application/") and "json" in base_type: return CompressionProfile(CompressionAlgorithm.GZIP, level=6) # Default profile return CompressionProfile(CompressionAlgorithm.GZIP, level=5) def _choose_algorithm( self, accept_encoding: str, profile: CompressionProfile ) -> CompressionAlgorithm: """Choose best algorithm based on client support and profile.""" accept_encoding = accept_encoding.lower() # Parse quality values encodings = {} for encoding in accept_encoding.split(","): parts = encoding.strip().split(";") name = parts[0].strip() quality = 1.0 if len(parts) > 1: for param in parts[1:]: if param.strip().startswith("q="): try: quality = float(param.split("=")[1]) except: pass encodings[name] = quality # Prefer profile algorithm if supported if profile.algorithm == CompressionAlgorithm.BROTLI and "br" in encodings: return CompressionAlgorithm.BROTLI if profile.algorithm == CompressionAlgorithm.ZSTD and "zstd" in encodings and HAS_ZSTD: return CompressionAlgorithm.ZSTD # Check alternatives in order of preference if "br" in encodings and HAS_BROTLI and encodings.get("br", 0) > 0: return CompressionAlgorithm.BROTLI if "zstd" in encodings and HAS_ZSTD and encodings.get("zstd", 0) > 0: return CompressionAlgorithm.ZSTD if "gzip" in encodings and encodings.get("gzip", 0) > 0: return CompressionAlgorithm.GZIP if "deflate" in encodings and encodings.get("deflate", 0) > 0: return CompressionAlgorithm.DEFLATE # Default to gzip if nothing else return CompressionAlgorithm.GZIP def _compress_with_algorithm( self, data: bytes, algorithm: CompressionAlgorithm, level: int ) -> Tuple[bytes, str]: """Compress data with specified algorithm.""" if algorithm == CompressionAlgorithm.GZIP: return gzip.compress(data, compresslevel=level), "gzip" elif algorithm == CompressionAlgorithm.BROTLI: if not HAS_BROTLI: raise RuntimeError("Brotli not available") return brotli.compress(data, quality=level), "br" elif algorithm == CompressionAlgorithm.ZSTD: if not HAS_ZSTD: raise RuntimeError("Zstandard not available") cctx = zstd.ZstdCompressor(level=level) return cctx.compress(data), "zstd" elif algorithm == CompressionAlgorithm.DEFLATE: return zlib.compress(data, level=level), "deflate" else: return data, "identity" def _update_metrics( self, content_type: str, algorithm: CompressionAlgorithm, original_size: int, compressed_size: int, compression_time: float ): """Update compression metrics.""" # Overall metrics metrics = self._metrics["overall"] metrics["total_bytes"] += original_size metrics["compressed_bytes"] += compressed_size metrics["compression_time"] += compression_time metrics["count"] += 1 # Per content type metrics ct_metrics = self._metrics[content_type] ct_metrics["total_bytes"] += original_size ct_metrics["compressed_bytes"] += compressed_size ct_metrics["compression_time"] += compression_time ct_metrics["count"] += 1 # Algorithm statistics algo_stats = self._algorithm_stats[algorithm] algo_stats["used"] += 1 algo_stats["total_saved"] += (original_size - compressed_size) # Content type statistics ct_stats = self._content_type_stats[content_type] ct_stats["count"] += 1 ct_stats["avg_size"] = ( (ct_stats["avg_size"] * (ct_stats["count"] - 1) + original_size) / ct_stats["count"] ) ct_stats["avg_compressed"] = ( (ct_stats["avg_compressed"] * (ct_stats["count"] - 1) + compressed_size) / ct_stats["count"] ) def get_metrics(self) -> Dict[str, Any]: """Get compression metrics.""" overall = self._metrics["overall"] if overall["count"] == 0: return { "enabled": True, "algorithms_available": self._get_available_algorithms(), "total_requests": 0 } total_saved = overall["total_bytes"] - overall["compressed_bytes"] avg_ratio = total_saved / overall["total_bytes"] if overall["total_bytes"] > 0 else 0 return { "enabled": True, "algorithms_available": self._get_available_algorithms(), "total_requests": overall["count"], "total_bytes_original": overall["total_bytes"], "total_bytes_compressed": overall["compressed_bytes"], "total_bytes_saved": total_saved, "average_compression_ratio": avg_ratio, "average_compression_time_ms": (overall["compression_time"] / overall["count"] * 1000) if overall["count"] > 0 else 0, "content_types": self._get_content_type_metrics(), "algorithms": self._get_algorithm_metrics() } def _get_available_algorithms(self) -> List[str]: """Get list of available compression algorithms.""" algorithms = ["gzip", "deflate"] if HAS_BROTLI: algorithms.append("br") if HAS_ZSTD: algorithms.append("zstd") return algorithms def _get_content_type_metrics(self) -> Dict[str, Any]: """Get metrics grouped by content type.""" result = {} for content_type, metrics in self._metrics.items(): if content_type == "overall" or metrics["count"] == 0: continue saved = metrics["total_bytes"] - metrics["compressed_bytes"] ratio = saved / metrics["total_bytes"] if metrics["total_bytes"] > 0 else 0 result[content_type] = { "requests": metrics["count"], "total_size": metrics["total_bytes"], "compressed_size": metrics["compressed_bytes"], "saved_bytes": saved, "compression_ratio": ratio, "avg_time_ms": (metrics["compression_time"] / metrics["count"] * 1000) } return result def _get_algorithm_metrics(self) -> Dict[str, Any]: """Get metrics grouped by algorithm.""" result = {} for algorithm, stats in self._algorithm_stats.items(): if stats["used"] == 0: continue result[algorithm] = { "times_used": stats["used"], "total_bytes_saved": stats["total_saved"], "avg_bytes_saved": stats["total_saved"] / stats["used"] } return result def optimize_settings(self) -> Dict[str, Any]: """Analyze metrics and suggest optimizations.""" suggestions = [] # Check if Brotli should be enabled if not HAS_BROTLI: suggestions.append({ "type": "install_brotli", "reason": "Brotli provides better compression ratios", "command": "pip install brotli" }) # Check compression ratios by content type for content_type, stats in self._content_type_stats.items(): if stats["count"] < 10: continue avg_ratio = 1 - (stats["avg_compressed"] / stats["avg_size"]) if stats["avg_size"] > 0 else 0 if avg_ratio < 0.2: suggestions.append({ "type": "adjust_min_size", "content_type": content_type, "reason": f"Low compression ratio ({avg_ratio:.1%})", "current_avg_size": stats["avg_size"], "suggestion": "Consider increasing minimum size threshold" }) # Check algorithm usage gzip_stats = self._algorithm_stats.get(CompressionAlgorithm.GZIP, {"used": 0}) brotli_stats = self._algorithm_stats.get(CompressionAlgorithm.BROTLI, {"used": 0}) if HAS_BROTLI and brotli_stats["used"] < gzip_stats["used"] * 0.1: suggestions.append({ "type": "promote_brotli", "reason": "Brotli underutilized despite being available", "suggestion": "Check client Accept-Encoding headers" }) return { "suggestions": suggestions, "optimal_settings": self._calculate_optimal_settings() } def _calculate_optimal_settings(self) -> Dict[str, Any]: """Calculate optimal compression settings based on metrics.""" settings = {} # Recommend levels based on average compression time overall = self._metrics["overall"] if overall["count"] > 0: avg_time = overall["compression_time"] / overall["count"] if avg_time < 0.001: # < 1ms settings["recommended_gzip_level"] = 9 settings["recommended_brotli_quality"] = 6 elif avg_time < 0.005: # < 5ms settings["recommended_gzip_level"] = 6 settings["recommended_brotli_quality"] = 4 else: settings["recommended_gzip_level"] = 4 settings["recommended_brotli_quality"] = 2 return settings # Global instance compression_service = CompressionService()