Your Name
feat: UI improvements and error suppression - Enhanced dashboard and market pages with improved header buttons, logo, and currency symbol display - Stopped animated ticker - Removed pie chart legends - Added error suppressor for external service errors (SSE, Permissions-Policy warnings) - Improved header button prominence and icon appearance - Enhanced logo with glow effects and better design - Fixed currency symbol visibility in market tables
8b7b267
| """ | |
| News Data Collectors | |
| Fetches cryptocurrency news from CryptoPanic and NewsAPI | |
| """ | |
| import asyncio | |
| from datetime import datetime, timezone | |
| from typing import Dict, List, Optional, Any | |
| from utils.api_client import get_client | |
| from utils.logger import setup_logger, log_api_request, log_error | |
| from config import config | |
| logger = setup_logger("news_collector") | |
| def calculate_staleness_minutes(data_timestamp: Optional[datetime]) -> Optional[float]: | |
| """ | |
| Calculate staleness in minutes from data timestamp to now | |
| Args: | |
| data_timestamp: Timestamp of the data | |
| Returns: | |
| Staleness in minutes or None if timestamp not available | |
| """ | |
| if not data_timestamp: | |
| return None | |
| now = datetime.now(timezone.utc) | |
| if data_timestamp.tzinfo is None: | |
| data_timestamp = data_timestamp.replace(tzinfo=timezone.utc) | |
| delta = now - data_timestamp | |
| return delta.total_seconds() / 60.0 | |
| def parse_iso_timestamp(timestamp_str: str) -> Optional[datetime]: | |
| """ | |
| Parse ISO timestamp string to datetime | |
| Args: | |
| timestamp_str: ISO format timestamp string | |
| Returns: | |
| datetime object or None if parsing fails | |
| """ | |
| try: | |
| # Handle various ISO formats | |
| if timestamp_str.endswith('Z'): | |
| timestamp_str = timestamp_str.replace('Z', '+00:00') | |
| return datetime.fromisoformat(timestamp_str) | |
| except: | |
| return None | |
| async def get_cryptopanic_posts() -> Dict[str, Any]: | |
| """ | |
| Fetch latest cryptocurrency news posts from CryptoPanic | |
| Returns: | |
| Dict with provider, category, data, timestamp, staleness, success, error | |
| """ | |
| provider = "CryptoPanic" | |
| category = "news" | |
| endpoint = "/posts/" | |
| logger.info(f"Fetching posts from {provider}") | |
| try: | |
| client = get_client() | |
| provider_config = config.get_provider(provider) | |
| if not provider_config: | |
| error_msg = f"Provider {provider} not configured" | |
| log_error(logger, provider, "config_error", error_msg, endpoint) | |
| return { | |
| "provider": provider, | |
| "category": category, | |
| "data": None, | |
| "timestamp": datetime.now(timezone.utc).isoformat(), | |
| "staleness_minutes": None, | |
| "success": False, | |
| "error": error_msg | |
| } | |
| # Build request URL | |
| url = f"{provider_config.endpoint_url}{endpoint}" | |
| params = { | |
| "auth_token": "free", # CryptoPanic offers free tier | |
| "public": "true", | |
| "kind": "news", # Get news posts | |
| "filter": "rising" # Get rising news | |
| } | |
| # Make request | |
| response = await client.get(url, params=params, timeout=provider_config.timeout_ms // 1000) | |
| # Log request | |
| log_api_request( | |
| logger, | |
| provider, | |
| endpoint, | |
| response.get("response_time_ms", 0), | |
| "success" if response["success"] else "error", | |
| response.get("status_code") | |
| ) | |
| if not response["success"]: | |
| error_msg = response.get("error_message", "Unknown error") | |
| log_error(logger, provider, response.get("error_type", "unknown"), error_msg, endpoint) | |
| return { | |
| "provider": provider, | |
| "category": category, | |
| "data": None, | |
| "timestamp": datetime.now(timezone.utc).isoformat(), | |
| "staleness_minutes": None, | |
| "success": False, | |
| "error": error_msg, | |
| "error_type": response.get("error_type") | |
| } | |
| # Extract data | |
| data = response["data"] | |
| # Parse timestamp from most recent post | |
| data_timestamp = None | |
| if isinstance(data, dict) and "results" in data: | |
| results = data["results"] | |
| if isinstance(results, list) and len(results) > 0: | |
| # Get the most recent post's timestamp | |
| first_post = results[0] | |
| if isinstance(first_post, dict) and "created_at" in first_post: | |
| data_timestamp = parse_iso_timestamp(first_post["created_at"]) | |
| staleness = calculate_staleness_minutes(data_timestamp) | |
| # Count posts | |
| post_count = 0 | |
| if isinstance(data, dict) and "results" in data: | |
| post_count = len(data["results"]) | |
| logger.info( | |
| f"{provider} - {endpoint} - Retrieved {post_count} posts, " | |
| f"staleness: {staleness:.2f}m" if staleness else "staleness: N/A" | |
| ) | |
| return { | |
| "provider": provider, | |
| "category": category, | |
| "data": data, | |
| "timestamp": datetime.now(timezone.utc).isoformat(), | |
| "data_timestamp": data_timestamp.isoformat() if data_timestamp else None, | |
| "staleness_minutes": staleness, | |
| "success": True, | |
| "error": None, | |
| "response_time_ms": response.get("response_time_ms", 0), | |
| "post_count": post_count | |
| } | |
| except Exception as e: | |
| error_msg = f"Unexpected error: {str(e)}" | |
| log_error(logger, provider, "exception", error_msg, endpoint, exc_info=True) | |
| return { | |
| "provider": provider, | |
| "category": category, | |
| "data": None, | |
| "timestamp": datetime.now(timezone.utc).isoformat(), | |
| "staleness_minutes": None, | |
| "success": False, | |
| "error": error_msg, | |
| "error_type": "exception" | |
| } | |
| async def get_newsapi_headlines() -> Dict[str, Any]: | |
| """ | |
| Fetch cryptocurrency headlines from NewsAPI (newsdata.io) | |
| Returns: | |
| Dict with provider, category, data, timestamp, staleness, success, error | |
| """ | |
| provider = "NewsAPI" | |
| category = "news" | |
| endpoint = "/news" | |
| logger.info(f"Fetching headlines from {provider}") | |
| try: | |
| client = get_client() | |
| provider_config = config.get_provider(provider) | |
| if not provider_config: | |
| error_msg = f"Provider {provider} not configured" | |
| log_error(logger, provider, "config_error", error_msg, endpoint) | |
| return { | |
| "provider": provider, | |
| "category": category, | |
| "data": None, | |
| "timestamp": datetime.now(timezone.utc).isoformat(), | |
| "staleness_minutes": None, | |
| "success": False, | |
| "error": error_msg | |
| } | |
| # Check if API key is available | |
| if provider_config.requires_key and not provider_config.api_key: | |
| error_msg = f"API key required but not configured for {provider}" | |
| log_error(logger, provider, "auth_error", error_msg, endpoint) | |
| return { | |
| "provider": provider, | |
| "category": category, | |
| "data": None, | |
| "timestamp": datetime.now(timezone.utc).isoformat(), | |
| "staleness_minutes": None, | |
| "success": False, | |
| "error": error_msg, | |
| "error_type": "missing_api_key" | |
| } | |
| # Build request URL | |
| url = f"{provider_config.endpoint_url}{endpoint}" | |
| params = { | |
| "apikey": provider_config.api_key, | |
| "q": "cryptocurrency OR bitcoin OR ethereum", | |
| "language": "en", | |
| "category": "business,technology" | |
| } | |
| # Make request | |
| response = await client.get(url, params=params, timeout=provider_config.timeout_ms // 1000) | |
| # Log request | |
| log_api_request( | |
| logger, | |
| provider, | |
| endpoint, | |
| response.get("response_time_ms", 0), | |
| "success" if response["success"] else "error", | |
| response.get("status_code") | |
| ) | |
| if not response["success"]: | |
| error_msg = response.get("error_message", "Unknown error") | |
| log_error(logger, provider, response.get("error_type", "unknown"), error_msg, endpoint) | |
| return { | |
| "provider": provider, | |
| "category": category, | |
| "data": None, | |
| "timestamp": datetime.now(timezone.utc).isoformat(), | |
| "staleness_minutes": None, | |
| "success": False, | |
| "error": error_msg, | |
| "error_type": response.get("error_type") | |
| } | |
| # Extract data | |
| data = response["data"] | |
| # Parse timestamp from most recent article | |
| data_timestamp = None | |
| if isinstance(data, dict) and "results" in data: | |
| results = data["results"] | |
| if isinstance(results, list) and len(results) > 0: | |
| # Get the most recent article's timestamp | |
| first_article = results[0] | |
| if isinstance(first_article, dict): | |
| # Try different timestamp fields | |
| timestamp_field = first_article.get("pubDate") or first_article.get("publishedAt") | |
| if timestamp_field: | |
| data_timestamp = parse_iso_timestamp(timestamp_field) | |
| staleness = calculate_staleness_minutes(data_timestamp) | |
| # Count articles | |
| article_count = 0 | |
| if isinstance(data, dict) and "results" in data: | |
| article_count = len(data["results"]) | |
| logger.info( | |
| f"{provider} - {endpoint} - Retrieved {article_count} articles, " | |
| f"staleness: {staleness:.2f}m" if staleness else "staleness: N/A" | |
| ) | |
| return { | |
| "provider": provider, | |
| "category": category, | |
| "data": data, | |
| "timestamp": datetime.now(timezone.utc).isoformat(), | |
| "data_timestamp": data_timestamp.isoformat() if data_timestamp else None, | |
| "staleness_minutes": staleness, | |
| "success": True, | |
| "error": None, | |
| "response_time_ms": response.get("response_time_ms", 0), | |
| "article_count": article_count | |
| } | |
| except Exception as e: | |
| error_msg = f"Unexpected error: {str(e)}" | |
| log_error(logger, provider, "exception", error_msg, endpoint, exc_info=True) | |
| return { | |
| "provider": provider, | |
| "category": category, | |
| "data": None, | |
| "timestamp": datetime.now(timezone.utc).isoformat(), | |
| "staleness_minutes": None, | |
| "success": False, | |
| "error": error_msg, | |
| "error_type": "exception" | |
| } | |
| async def collect_news_data() -> List[Dict[str, Any]]: | |
| """ | |
| Main function to collect news data from all sources | |
| Returns: | |
| List of results from all news collectors | |
| """ | |
| logger.info("Starting news data collection from all sources") | |
| # Run all collectors concurrently | |
| results = await asyncio.gather( | |
| get_cryptopanic_posts(), | |
| get_newsapi_headlines(), | |
| return_exceptions=True | |
| ) | |
| # Process results | |
| processed_results = [] | |
| for result in results: | |
| if isinstance(result, Exception): | |
| logger.error(f"Collector failed with exception: {str(result)}") | |
| processed_results.append({ | |
| "provider": "Unknown", | |
| "category": "news", | |
| "data": None, | |
| "timestamp": datetime.now(timezone.utc).isoformat(), | |
| "staleness_minutes": None, | |
| "success": False, | |
| "error": str(result), | |
| "error_type": "exception" | |
| }) | |
| else: | |
| processed_results.append(result) | |
| # Log summary | |
| successful = sum(1 for r in processed_results if r.get("success", False)) | |
| total_items = sum( | |
| r.get("post_count", 0) + r.get("article_count", 0) | |
| for r in processed_results if r.get("success", False) | |
| ) | |
| logger.info( | |
| f"News data collection complete: {successful}/{len(processed_results)} successful, " | |
| f"{total_items} total items" | |
| ) | |
| return processed_results | |
| # Alias for backward compatibility | |
| collect_news = collect_news_data | |
| class NewsCollector: | |
| """ | |
| News Collector class for WebSocket streaming interface | |
| Wraps the standalone news collection functions | |
| """ | |
| def __init__(self, config: Any = None): | |
| """ | |
| Initialize the news collector | |
| Args: | |
| config: Configuration object (optional, for compatibility) | |
| """ | |
| self.config = config | |
| self.logger = logger | |
| async def collect(self) -> Dict[str, Any]: | |
| """ | |
| Collect news data from all sources | |
| Returns: | |
| Dict with aggregated news data | |
| """ | |
| results = await collect_news_data() | |
| # Aggregate data for WebSocket streaming | |
| aggregated = { | |
| "articles": [], | |
| "sources": [], | |
| "categories": [], | |
| "breaking": [], | |
| "timestamp": datetime.now(timezone.utc).isoformat() | |
| } | |
| for result in results: | |
| if result.get("success") and result.get("data"): | |
| provider = result.get("provider", "unknown") | |
| aggregated["sources"].append(provider) | |
| data = result["data"] | |
| # Parse CryptoPanic posts | |
| if provider == "CryptoPanic" and "results" in data: | |
| for post in data["results"][:10]: # Take top 10 | |
| aggregated["articles"].append({ | |
| "title": post.get("title"), | |
| "url": post.get("url"), | |
| "source": post.get("source", {}).get("title"), | |
| "published_at": post.get("published_at"), | |
| "kind": post.get("kind"), | |
| "votes": post.get("votes", {}) | |
| }) | |
| # Parse NewsAPI articles | |
| elif provider == "NewsAPI" and "articles" in data: | |
| for article in data["articles"][:10]: # Take top 10 | |
| aggregated["articles"].append({ | |
| "title": article.get("title"), | |
| "url": article.get("url"), | |
| "source": article.get("source", {}).get("name"), | |
| "published_at": article.get("publishedAt"), | |
| "description": article.get("description") | |
| }) | |
| return aggregated | |
| # Example usage | |
| if __name__ == "__main__": | |
| async def main(): | |
| results = await collect_news_data() | |
| print("\n=== News Data Collection Results ===") | |
| for result in results: | |
| print(f"\nProvider: {result['provider']}") | |
| print(f"Success: {result['success']}") | |
| print(f"Staleness: {result.get('staleness_minutes', 'N/A')} minutes") | |
| if result['success']: | |
| print(f"Response Time: {result.get('response_time_ms', 0):.2f}ms") | |
| print(f"Items: {result.get('post_count', 0) + result.get('article_count', 0)}") | |
| else: | |
| print(f"Error: {result.get('error', 'Unknown')}") | |
| asyncio.run(main()) | |