Datasourceforcryptocurrency-5 / hf_dataset_uploader.py
Your Name
feat: UI improvements and error suppression - Enhanced dashboard and market pages with improved header buttons, logo, and currency symbol display - Stopped animated ticker - Removed pie chart legends - Added error suppressor for external service errors (SSE, Permissions-Policy warnings) - Improved header button prominence and icon appearance - Enhanced logo with glow effects and better design - Fixed currency symbol visibility in market tables
8b7b267
raw
history blame
25.6 kB
#!/usr/bin/env python3
"""
HuggingFace Dataset Uploader - Upload Real Data to HuggingFace Datasets
Ensures all data from external APIs is stored in HuggingFace Datasets first,
then served to clients from there.
Data Flow:
External APIs → SQLite Cache → HuggingFace Datasets → Clients
"""
import os
import json
import logging
from datetime import datetime
from pathlib import Path
from typing import List, Dict, Any, Optional
import pandas as pd
try:
from huggingface_hub import HfApi, create_repo, upload_file
from datasets import Dataset, DatasetDict
HF_HUB_AVAILABLE = True
except ImportError:
HF_HUB_AVAILABLE = False
print("⚠️ WARNING: huggingface_hub and datasets libraries not available")
print(" Install with: pip install huggingface_hub datasets")
from utils.logger import setup_logger
logger = setup_logger("hf_dataset_uploader")
class HuggingFaceDatasetUploader:
"""
Upload cryptocurrency data to HuggingFace Datasets
Features:
1. Upload market data (prices, volumes, etc.)
2. Upload OHLC/candlestick data
3. Automatic dataset creation if not exists
4. Incremental updates (append new data)
5. Dataset versioning and metadata
"""
def __init__(
self,
hf_token: Optional[str] = None,
dataset_namespace: Optional[str] = None,
auto_create: bool = True
):
"""
Initialize HuggingFace Dataset Uploader
Args:
hf_token: HuggingFace API token (or from HF_TOKEN env var)
dataset_namespace: Dataset namespace (username or org name)
auto_create: Automatically create datasets if they don't exist
"""
if not HF_HUB_AVAILABLE:
raise ImportError(
"huggingface_hub and datasets libraries required. "
"Install with: pip install huggingface_hub datasets"
)
self.token = hf_token or os.getenv("HF_TOKEN") or os.getenv("HF_API_TOKEN")
if not self.token:
raise ValueError(
"HuggingFace token required. Set HF_TOKEN environment variable "
"or pass hf_token parameter"
)
self.namespace = dataset_namespace or os.getenv("HF_USERNAME")
if not self.namespace:
# Try to get username from HF API
try:
api = HfApi(token=self.token)
user_info = api.whoami()
self.namespace = user_info.get("name")
logger.info(f"Detected HuggingFace username: {self.namespace}")
except Exception as e:
logger.warning(f"Could not detect HuggingFace username: {e}")
self.namespace = "crypto-data-hub" # Default namespace
self.auto_create = auto_create
self.api = HfApi(token=self.token)
# Dataset names - ALL data types
self.market_data_dataset = f"{self.namespace}/crypto-market-data"
self.ohlc_dataset = f"{self.namespace}/crypto-ohlc-data"
self.news_dataset = f"{self.namespace}/crypto-news-data"
self.sentiment_dataset = f"{self.namespace}/crypto-sentiment-data"
self.onchain_dataset = f"{self.namespace}/crypto-onchain-data"
self.whale_dataset = f"{self.namespace}/crypto-whale-data"
self.explorer_dataset = f"{self.namespace}/crypto-explorer-data"
logger.info(f"HuggingFace Dataset Uploader initialized")
logger.info(f" Namespace: {self.namespace}")
logger.info(f" Datasets:")
logger.info(f" - Market: {self.market_data_dataset}")
logger.info(f" - OHLC: {self.ohlc_dataset}")
logger.info(f" - News: {self.news_dataset}")
logger.info(f" - Sentiment: {self.sentiment_dataset}")
logger.info(f" - On-chain: {self.onchain_dataset}")
logger.info(f" - Whale: {self.whale_dataset}")
logger.info(f" - Explorer: {self.explorer_dataset}")
def _ensure_dataset_exists(self, dataset_name: str, description: str) -> bool:
"""
Ensure dataset exists on HuggingFace Hub
Args:
dataset_name: Full dataset name (namespace/dataset)
description: Dataset description
Returns:
bool: True if dataset exists or was created
"""
try:
# Check if dataset exists
try:
self.api.dataset_info(dataset_name, token=self.token)
logger.info(f"Dataset exists: {dataset_name}")
return True
except Exception as check_error:
# Check if it's an authentication error
if "401" in str(check_error) or "Unauthorized" in str(check_error) or "expired" in str(check_error).lower():
logger.error(
f"❌ HuggingFace token authentication failed for {dataset_name}. "
f"Token may be expired or invalid. Please update HF_TOKEN environment variable."
)
return False
# Dataset doesn't exist
if self.auto_create:
logger.info(f"Creating dataset: {dataset_name}")
create_repo(
dataset_name,
token=self.token,
repo_type="dataset",
private=False # Public dataset
)
# Upload README
readme_content = f"""---
tags:
- cryptocurrency
- crypto
- market-data
- real-time
- data-hub
license: mit
---
# {dataset_name}
{description}
## Data Source
This dataset is automatically updated from real cryptocurrency APIs:
- CoinGecko API (market data)
- Binance API (OHLC data)
## Update Frequency
Data is updated every 60 seconds with real-time information.
## Usage
```python
from datasets import load_dataset
# Load the dataset
dataset = load_dataset("{dataset_name}")
# Access data
df = dataset['train'].to_pandas()
print(df.head())
```
## Data Hub Architecture
```
External APIs → Data Hub → HuggingFace Datasets → Clients
```
All data is real - no mock or fake data.
## Last Updated
{datetime.utcnow().isoformat()}Z
"""
readme_path = Path("/tmp") / "README.md"
readme_path.write_text(readme_content)
self.api.upload_file(
path_or_fileobj=str(readme_path),
path_in_repo="README.md",
repo_id=dataset_name,
repo_type="dataset",
token=self.token
)
logger.info(f"✅ Created dataset: {dataset_name}")
return True
else:
logger.error(f"Dataset does not exist and auto_create=False: {dataset_name}")
return False
except Exception as e:
# Check for authentication errors
error_msg = str(e)
if "401" in error_msg or "Unauthorized" in error_msg or "expired" in error_msg.lower():
logger.error(
f"❌ HuggingFace authentication error: {error_msg}\n"
f" Please update your HF_TOKEN with a valid token from https://huggingface.co/settings/tokens"
)
else:
logger.error(f"Error ensuring dataset exists: {e}", exc_info=True)
return False
async def upload_market_data(
self,
market_data: List[Dict[str, Any]],
append: bool = True
) -> bool:
"""
Upload market data to HuggingFace Dataset
Args:
market_data: List of market data dictionaries
append: If True, append to existing data; if False, replace
Returns:
bool: True if upload successful
"""
try:
if not market_data:
logger.warning("No market data to upload")
return False
# Ensure dataset exists
if not self._ensure_dataset_exists(
self.market_data_dataset,
"Real-time cryptocurrency market data from multiple sources"
):
return False
# Add timestamp if not present
current_time = datetime.utcnow().isoformat() + "Z"
for data in market_data:
if "timestamp" not in data:
data["timestamp"] = current_time
if "fetched_at" not in data:
data["fetched_at"] = current_time
# Convert to pandas DataFrame
df = pd.DataFrame(market_data)
# Create HuggingFace Dataset
dataset = Dataset.from_pandas(df)
# If append mode, we need to download existing data first
if append:
try:
from datasets import load_dataset
existing_dataset = load_dataset(
self.market_data_dataset,
split="train",
token=self.token
)
# Combine with new data
existing_df = existing_dataset.to_pandas()
combined_df = pd.concat([existing_df, df], ignore_index=True)
# Remove duplicates based on symbol and timestamp
# Keep only the latest record for each symbol
combined_df = combined_df.sort_values(
by=["symbol", "timestamp"],
ascending=[True, False]
)
combined_df = combined_df.drop_duplicates(
subset=["symbol"],
keep="first"
)
dataset = Dataset.from_pandas(combined_df)
logger.info(f"Appended {len(df)} new records to {len(existing_df)} existing records")
except Exception as e:
logger.warning(f"Could not load existing dataset (might be first upload): {e}")
# First upload, use new data only
pass
# Push to hub
logger.info(f"Uploading {len(dataset)} records to {self.market_data_dataset}...")
dataset.push_to_hub(
self.market_data_dataset,
token=self.token,
private=False
)
logger.info(f"✅ Successfully uploaded market data to {self.market_data_dataset}")
logger.info(f" Records: {len(dataset)}")
logger.info(f" Columns: {dataset.column_names}")
return True
except Exception as e:
logger.error(f"Error uploading market data: {e}", exc_info=True)
return False
async def upload_ohlc_data(
self,
ohlc_data: List[Dict[str, Any]],
append: bool = True
) -> bool:
"""
Upload OHLC/candlestick data to HuggingFace Dataset
Args:
ohlc_data: List of OHLC data dictionaries
append: If True, append to existing data; if False, replace
Returns:
bool: True if upload successful
"""
try:
if not ohlc_data:
logger.warning("No OHLC data to upload")
return False
# Ensure dataset exists
if not self._ensure_dataset_exists(
self.ohlc_dataset,
"Real-time cryptocurrency OHLC/candlestick data from multiple exchanges"
):
return False
# Add fetched_at timestamp if not present
current_time = datetime.utcnow().isoformat() + "Z"
for data in ohlc_data:
if "fetched_at" not in data:
data["fetched_at"] = current_time
# Convert to pandas DataFrame
df = pd.DataFrame(ohlc_data)
# Create HuggingFace Dataset
dataset = Dataset.from_pandas(df)
# If append mode, download and combine with existing data
if append:
try:
from datasets import load_dataset
existing_dataset = load_dataset(
self.ohlc_dataset,
split="train",
token=self.token
)
existing_df = existing_dataset.to_pandas()
combined_df = pd.concat([existing_df, df], ignore_index=True)
# Remove duplicates based on symbol, interval, and timestamp
combined_df = combined_df.drop_duplicates(
subset=["symbol", "interval", "timestamp"],
keep="last"
)
dataset = Dataset.from_pandas(combined_df)
logger.info(f"Appended {len(df)} new OHLC records to {len(existing_df)} existing records")
except Exception as e:
logger.warning(f"Could not load existing OHLC dataset: {e}")
pass
# Push to hub
logger.info(f"Uploading {len(dataset)} OHLC records to {self.ohlc_dataset}...")
dataset.push_to_hub(
self.ohlc_dataset,
token=self.token,
private=False
)
logger.info(f"✅ Successfully uploaded OHLC data to {self.ohlc_dataset}")
logger.info(f" Records: {len(dataset)}")
logger.info(f" Columns: {dataset.column_names}")
return True
except Exception as e:
logger.error(f"Error uploading OHLC data: {e}", exc_info=True)
return False
async def upload_news_data(
self,
news_data: List[Dict[str, Any]],
append: bool = True
) -> bool:
"""Upload news data to HuggingFace Dataset"""
try:
if not news_data:
return False
if not self._ensure_dataset_exists(
self.news_dataset,
"Real-time cryptocurrency news from multiple sources"
):
return False
df = pd.DataFrame(news_data)
dataset = Dataset.from_pandas(df)
if append:
try:
from datasets import load_dataset
existing = load_dataset(self.news_dataset, split="train", token=self.token)
existing_df = existing.to_pandas()
combined_df = pd.concat([existing_df, df], ignore_index=True)
combined_df = combined_df.drop_duplicates(subset=["url"], keep="last")
dataset = Dataset.from_pandas(combined_df)
except:
pass
dataset.push_to_hub(self.news_dataset, token=self.token, private=False)
logger.info(f"✅ Uploaded {len(dataset)} news records to {self.news_dataset}")
return True
except Exception as e:
logger.error(f"Error uploading news data: {e}", exc_info=True)
return False
async def upload_sentiment_data(
self,
sentiment_data: List[Dict[str, Any]],
append: bool = True
) -> bool:
"""Upload sentiment data to HuggingFace Dataset"""
try:
if not sentiment_data:
return False
if not self._ensure_dataset_exists(
self.sentiment_dataset,
"Cryptocurrency market sentiment indicators from multiple sources"
):
return False
df = pd.DataFrame(sentiment_data)
dataset = Dataset.from_pandas(df)
if append:
try:
from datasets import load_dataset
existing = load_dataset(self.sentiment_dataset, split="train", token=self.token)
existing_df = existing.to_pandas()
combined_df = pd.concat([existing_df, df], ignore_index=True)
dataset = Dataset.from_pandas(combined_df)
except:
pass
dataset.push_to_hub(self.sentiment_dataset, token=self.token, private=False)
logger.info(f"✅ Uploaded {len(dataset)} sentiment records to {self.sentiment_dataset}")
return True
except Exception as e:
logger.error(f"Error uploading sentiment data: {e}", exc_info=True)
return False
async def upload_onchain_data(
self,
onchain_data: List[Dict[str, Any]],
append: bool = True
) -> bool:
"""Upload on-chain analytics to HuggingFace Dataset"""
try:
if not onchain_data:
return False
if not self._ensure_dataset_exists(
self.onchain_dataset,
"On-chain cryptocurrency analytics and metrics"
):
return False
df = pd.DataFrame(onchain_data)
dataset = Dataset.from_pandas(df)
if append:
try:
from datasets import load_dataset
existing = load_dataset(self.onchain_dataset, split="train", token=self.token)
existing_df = existing.to_pandas()
combined_df = pd.concat([existing_df, df], ignore_index=True)
dataset = Dataset.from_pandas(combined_df)
except:
pass
dataset.push_to_hub(self.onchain_dataset, token=self.token, private=False)
logger.info(f"✅ Uploaded {len(dataset)} on-chain records to {self.onchain_dataset}")
return True
except Exception as e:
logger.error(f"Error uploading on-chain data: {e}", exc_info=True)
return False
async def upload_whale_data(
self,
whale_data: List[Dict[str, Any]],
append: bool = True
) -> bool:
"""Upload whale transaction data to HuggingFace Dataset"""
try:
if not whale_data:
return False
if not self._ensure_dataset_exists(
self.whale_dataset,
"Large cryptocurrency transactions and whale movements"
):
return False
df = pd.DataFrame(whale_data)
dataset = Dataset.from_pandas(df)
if append:
try:
from datasets import load_dataset
existing = load_dataset(self.whale_dataset, split="train", token=self.token)
existing_df = existing.to_pandas()
combined_df = pd.concat([existing_df, df], ignore_index=True)
dataset = Dataset.from_pandas(combined_df)
except:
pass
dataset.push_to_hub(self.whale_dataset, token=self.token, private=False)
logger.info(f"✅ Uploaded {len(dataset)} whale transaction records to {self.whale_dataset}")
return True
except Exception as e:
logger.error(f"Error uploading whale data: {e}", exc_info=True)
return False
async def upload_explorer_data(
self,
explorer_data: List[Dict[str, Any]],
append: bool = True
) -> bool:
"""Upload block explorer data to HuggingFace Dataset"""
try:
if not explorer_data:
return False
if not self._ensure_dataset_exists(
self.explorer_dataset,
"Blockchain data from multiple block explorers"
):
return False
df = pd.DataFrame(explorer_data)
dataset = Dataset.from_pandas(df)
if append:
try:
from datasets import load_dataset
existing = load_dataset(self.explorer_dataset, split="train", token=self.token)
existing_df = existing.to_pandas()
combined_df = pd.concat([existing_df, df], ignore_index=True)
dataset = Dataset.from_pandas(combined_df)
except:
pass
dataset.push_to_hub(self.explorer_dataset, token=self.token, private=False)
logger.info(f"✅ Uploaded {len(dataset)} explorer records to {self.explorer_dataset}")
return True
except Exception as e:
logger.error(f"Error uploading explorer data: {e}", exc_info=True)
return False
def get_dataset_info(self, dataset_type: str = "market") -> Optional[Dict[str, Any]]:
"""
Get information about a dataset
Args:
dataset_type: "market", "ohlc", "news", "sentiment", "onchain", "whale", or "explorer"
Returns:
Dataset information dictionary
"""
try:
dataset_map = {
"market": self.market_data_dataset,
"ohlc": self.ohlc_dataset,
"news": self.news_dataset,
"sentiment": self.sentiment_dataset,
"onchain": self.onchain_dataset,
"whale": self.whale_dataset,
"explorer": self.explorer_dataset
}
dataset_name = dataset_map.get(dataset_type, self.market_data_dataset)
info = self.api.dataset_info(dataset_name, token=self.token)
return {
"id": info.id,
"author": info.author,
"created_at": str(info.created_at),
"last_modified": str(info.last_modified),
"downloads": info.downloads,
"likes": info.likes,
"tags": info.tags,
"private": info.private,
"url": f"https://huggingface.co/datasets/{dataset_name}"
}
except Exception as e:
logger.error(f"Error getting dataset info: {e}")
return None
# Singleton instance
_uploader_instance: Optional[HuggingFaceDatasetUploader] = None
def get_dataset_uploader(
hf_token: Optional[str] = None,
dataset_namespace: Optional[str] = None
) -> HuggingFaceDatasetUploader:
"""
Get or create HuggingFace Dataset Uploader singleton instance
Args:
hf_token: HuggingFace API token
dataset_namespace: Dataset namespace
Returns:
HuggingFaceDatasetUploader instance
"""
global _uploader_instance
if _uploader_instance is None:
_uploader_instance = HuggingFaceDatasetUploader(
hf_token=hf_token,
dataset_namespace=dataset_namespace
)
return _uploader_instance
# Testing
if __name__ == "__main__":
import asyncio
async def test_uploader():
"""Test the uploader"""
print("=" * 80)
print("Testing HuggingFace Dataset Uploader")
print("=" * 80)
# Sample market data
sample_market_data = [
{
"symbol": "BTC",
"price": 45000.50,
"market_cap": 850000000000.0,
"volume_24h": 25000000000.0,
"change_24h": 2.5,
"high_24h": 45500.0,
"low_24h": 44000.0,
"provider": "coingecko",
"timestamp": datetime.utcnow().isoformat() + "Z"
},
{
"symbol": "ETH",
"price": 3200.75,
"market_cap": 380000000000.0,
"volume_24h": 15000000000.0,
"change_24h": 3.2,
"high_24h": 3250.0,
"low_24h": 3100.0,
"provider": "coingecko",
"timestamp": datetime.utcnow().isoformat() + "Z"
}
]
# Sample OHLC data
sample_ohlc_data = [
{
"symbol": "BTCUSDT",
"interval": "1h",
"timestamp": datetime.utcnow().isoformat() + "Z",
"open": 44500.0,
"high": 45000.0,
"low": 44300.0,
"close": 44800.0,
"volume": 1250000.0,
"provider": "binance"
}
]
try:
# Create uploader
uploader = get_dataset_uploader()
# Upload market data
print("\n📤 Uploading market data...")
success = await uploader.upload_market_data(sample_market_data)
print(f" Result: {'✅ Success' if success else '❌ Failed'}")
# Upload OHLC data
print("\n📤 Uploading OHLC data...")
success = await uploader.upload_ohlc_data(sample_ohlc_data)
print(f" Result: {'✅ Success' if success else '❌ Failed'}")
# Get dataset info
print("\n📊 Dataset Information:")
market_info = uploader.get_dataset_info("market")
if market_info:
print(f" Market Data Dataset:")
print(f" URL: {market_info['url']}")
print(f" Downloads: {market_info['downloads']}")
print(f" Likes: {market_info['likes']}")
except Exception as e:
print(f"❌ Error: {e}")
import traceback
traceback.print_exc()
asyncio.run(test_uploader())