""" FastAPI Backend for Text-to-3D Model Converter Deployed on Hugging Face Spaces with direct model loading """ import os import logging import time import uuid import asyncio from typing import Optional from contextlib import asynccontextmanager import uvicorn from fastapi import FastAPI, File, UploadFile, HTTPException, BackgroundTasks from fastapi.middleware.cors import CORSMiddleware from fastapi.responses import JSONResponse from pydantic import BaseModel from models.depth_processor import DepthProcessor from models.image_generator import ImageGenerator from utils.job_manager import JobManager from utils.cloudinary_client import CloudinaryClient # Configure logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) # Global variables for models depth_processor = None image_generator = None job_manager = None cloudinary_client = None @asynccontextmanager async def lifespan(app: FastAPI): """Initialize models on startup""" global depth_processor, image_generator, job_manager, cloudinary_client logger.info("🚀 Starting Text-to-3D Backend...") # Initialize utilities job_manager = JobManager() cloudinary_client = CloudinaryClient() # Initialize models logger.info("📦 Loading AI models...") try: # Initialize depth processor depth_processor = DepthProcessor() await asyncio.to_thread(depth_processor.load_model) logger.info("✅ Depth estimation model loaded") # Initialize image generator image_generator = ImageGenerator() await asyncio.to_thread(image_generator.load_model) logger.info("✅ Image generation model loaded") logger.info("🎉 All models loaded successfully!") except Exception as e: logger.error(f"❌ Failed to load models: {str(e)}") raise e yield # Cleanup on shutdown logger.info("🔄 Shutting down...") # Initialize FastAPI app app = FastAPI( title="Text-to-3D Backend", description="Convert text prompts and images to 3D models", version="1.0.0", lifespan=lifespan ) # Configure CORS app.add_middleware( CORSMiddleware, allow_origins=[ "http://localhost:3000", # Local development "https://*.render.com", # Render deployment "*" # Allow all for now, restrict in production ], allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) # Request/Response models class GenerateRequest(BaseModel): prompt: str user_id: Optional[str] = None class GenerateResponse(BaseModel): success: bool job_id: str image_url: Optional[str] = None model_url: Optional[str] = None depth_map_url: Optional[str] = None error: Optional[str] = None class ProgressResponse(BaseModel): stage: str progress: int message: str timestamp: Optional[float] = None @app.get("/") async def root(): """Health check endpoint""" return { "status": "Text-to-3D Backend is running! 🚀", "version": "1.0.0", "models_loaded": { "depth_processor": depth_processor is not None, "image_generator": image_generator is not None }, "gpu_available": depth_processor.device.type == "cuda" if depth_processor else False } @app.get("/health") async def health_check(): """Detailed health check""" return { "status": "healthy", "models": { "depth_estimation": "loaded" if depth_processor else "not_loaded", "image_generation": "loaded" if image_generator else "not_loaded" }, "device": str(depth_processor.device) if depth_processor else "unknown", "active_jobs": job_manager.get_active_job_count() if job_manager else 0 } @app.post("/generate", response_model=GenerateResponse) async def generate_from_text( request: GenerateRequest, background_tasks: BackgroundTasks ): """Generate 3D model from text prompt""" try: if not request.prompt.strip(): raise HTTPException(status_code=400, detail="Prompt cannot be empty") # Create job ID job_id = str(uuid.uuid4()) job_manager.register_job(job_id) logger.info(f"🎨 Starting text-to-3D generation: '{request.prompt}' (Job: {job_id})") # Start background processing background_tasks.add_task( process_text_to_3d, job_id, request.prompt, request.user_id ) return GenerateResponse( success=True, job_id=job_id, message="Generation started" ) except Exception as e: logger.error(f"❌ Error in generate endpoint: {str(e)}") raise HTTPException(status_code=500, detail=str(e)) @app.post("/upload") async def upload_image( file: UploadFile = File(...), background_tasks: BackgroundTasks = None, user_id: Optional[str] = None ): """Convert uploaded image to 3D model""" try: # Validate file type if not file.content_type.startswith('image/'): raise HTTPException(status_code=400, detail="File must be an image") # Create job ID job_id = str(uuid.uuid4()) job_manager.register_job(job_id) logger.info(f"📤 Processing uploaded image: {file.filename} (Job: {job_id})") # Read file content file_content = await file.read() # Start background processing background_tasks.add_task( process_upload_to_3d, job_id, file_content, file.filename, user_id ) return { "success": True, "job_id": job_id, "message": "Upload processing started" } except Exception as e: logger.error(f"❌ Error in upload endpoint: {str(e)}") raise HTTPException(status_code=500, detail=str(e)) @app.get("/progress/{job_id}", response_model=ProgressResponse) async def get_progress(job_id: str): """Get job progress""" try: progress = job_manager.get_job_progress(job_id) if not progress: raise HTTPException(status_code=404, detail="Job not found") return ProgressResponse(**progress) except Exception as e: logger.error(f"❌ Error getting progress: {str(e)}") raise HTTPException(status_code=500, detail=str(e)) @app.post("/cancel") async def cancel_job(job_id: str): """Cancel a running job""" try: success = job_manager.cancel_job(job_id) if success: return {"success": True, "message": f"Job {job_id} cancelled"} else: raise HTTPException(status_code=404, detail="Job not found") except Exception as e: logger.error(f"❌ Error cancelling job: {str(e)}") raise HTTPException(status_code=500, detail=str(e)) async def process_text_to_3d(job_id: str, prompt: str, user_id: Optional[str]): """Background task to process text to 3D""" try: # Update progress job_manager.update_job_progress(job_id, "generating_image", 10, "Generating image from text...") # Generate image from text image_result = await asyncio.to_thread( image_generator.generate_image, prompt ) if job_manager.is_job_cancelled(job_id): return job_manager.update_job_progress(job_id, "uploading_image", 40, "Uploading generated image...") # Upload image to Cloudinary image_url = await asyncio.to_thread( cloudinary_client.upload_image_from_bytes, image_result['image_bytes'], f"generated_{job_id}" ) if job_manager.is_job_cancelled(job_id): return job_manager.update_job_progress(job_id, "creating_depth", 60, "Creating depth map...") # Generate depth map and 3D model depth_result = await asyncio.to_thread( depth_processor.process_image_to_3d, image_result['image_pil'], job_id ) if job_manager.is_job_cancelled(job_id): return job_manager.update_job_progress(job_id, "uploading_results", 90, "Uploading 3D model...") # Upload results model_url = await asyncio.to_thread( cloudinary_client.upload_file, depth_result['obj_path'], f"model_{job_id}.obj" ) depth_map_url = await asyncio.to_thread( cloudinary_client.upload_image_from_path, depth_result['depth_map_path'], f"depth_{job_id}" ) # Complete job job_manager.complete_job(job_id, { "image_url": image_url, "model_url": model_url, "depth_map_url": depth_map_url }) logger.info(f"✅ Text-to-3D generation completed: {job_id}") except Exception as e: logger.error(f"❌ Error in text-to-3D processing: {str(e)}") job_manager.fail_job(job_id, str(e)) async def process_upload_to_3d(job_id: str, file_content: bytes, filename: str, user_id: Optional[str]): """Background task to process uploaded image to 3D""" try: job_manager.update_job_progress(job_id, "uploading", 20, "Uploading image to cloud...") # Upload original image image_url = await asyncio.to_thread( cloudinary_client.upload_image_from_bytes, file_content, f"upload_{job_id}_{filename}" ) if job_manager.is_job_cancelled(job_id): return job_manager.update_job_progress(job_id, "processing", 50, "Processing image to 3D...") # Convert to PIL Image from PIL import Image image_pil = Image.open(io.BytesIO(file_content)) # Generate depth map and 3D model depth_result = await asyncio.to_thread( depth_processor.process_image_to_3d, image_pil, job_id ) if job_manager.is_job_cancelled(job_id): return job_manager.update_job_progress(job_id, "uploading_results", 90, "Uploading 3D model...") # Upload results model_url = await asyncio.to_thread( cloudinary_client.upload_file, depth_result['obj_path'], f"model_{job_id}.obj" ) depth_map_url = await asyncio.to_thread( cloudinary_client.upload_image_from_path, depth_result['depth_map_path'], f"depth_{job_id}" ) # Complete job job_manager.complete_job(job_id, { "image_url": image_url, "model_url": model_url, "depth_map_url": depth_map_url }) logger.info(f"✅ Upload-to-3D processing completed: {job_id}") except Exception as e: logger.error(f"❌ Error in upload-to-3D processing: {str(e)}") job_manager.fail_job(job_id, str(e)) if __name__ == "__main__": uvicorn.run(app, host="0.0.0.0", port=7860)