Spaces:
Running
Running
| import psycopg2 | |
| from psycopg2.extras import execute_values | |
| import pandas as pd | |
| from sentence_transformers import SentenceTransformer | |
| import os | |
| import datetime | |
| import logging | |
| from collections import deque | |
| from fastapi import FastAPI, BackgroundTasks, HTTPException | |
| from contextlib import asynccontextmanager | |
| from fastapi.responses import HTMLResponse | |
| import threading | |
| # --- Configuration --- | |
| SUPABASE_CONNECTION_STRING = os.getenv("SUPABASE_CONNECTION_STRING") | |
| # --- Toggles & Tuning --- | |
| PROCESSING_CHUNK_SIZE = 32 | |
| EMBEDDING_BATCH_SIZE = 32 | |
| DRY_RUN = False | |
| # --- Global State --- | |
| model = None | |
| execution_logs = deque(maxlen=50) | |
| is_processing = False | |
| processing_lock = threading.Lock() | |
| # --- Lifespan Manager --- | |
| async def lifespan(app: FastAPI): | |
| global model | |
| print("β³ Loading Model...") | |
| model = SentenceTransformer('Alibaba-NLP/gte-modernbert-base', trust_remote_code=True) | |
| print("β Model Loaded.") | |
| yield | |
| print("π Shutting down...") | |
| app = FastAPI(lifespan=lifespan) | |
| # --- Helper Functions --- | |
| def fetch_and_lock_chunk(conn, chunk_size): | |
| """ | |
| Fetches candidates from the denormalized table where embeddings are missing. | |
| """ | |
| query = """ | |
| SELECT | |
| id, | |
| name, | |
| summary, | |
| work_experience, | |
| projects, | |
| education, | |
| achievements, | |
| certifications, | |
| volunteering, | |
| skills, | |
| languages | |
| FROM public.candidates | |
| WHERE | |
| -- Condition 1: Embedding is missing (New Job) | |
| embeddings IS NULL | |
| OR | |
| -- Condition 2: Job created after the last embedding (Retry/Update Logic) | |
| -- Note: Since there is no 'updated_at' column, we rely on created_at vs embeddings_created_at | |
| (embeddings_created_at IS NOT NULL AND created_at > embeddings_created_at) | |
| FOR UPDATE SKIP LOCKED | |
| LIMIT %s | |
| """ | |
| # Note: If you add an 'updated_at' column later, change WHERE to: | |
| # WHERE embeddings IS NULL OR updated_at > embeddings_created_at | |
| return pd.read_sql_query(query, conn, params=(chunk_size,)) | |
| def clean_and_format_text(row): | |
| """ | |
| Parses the JSONB and Array columns from the new schema to create a | |
| rich text representation for embedding. | |
| """ | |
| text_parts = [] | |
| # 1. Basic Info | |
| if row.get('name'): | |
| text_parts.append(f"Name: {row['name']}") | |
| if row.get('summary'): | |
| text_parts.append(f"Summary: {row['summary']}") | |
| # 2. Skills (Postgres Array -> Python List) | |
| if row.get('skills') and isinstance(row['skills'], list): | |
| # Filter out empty strings/None | |
| valid_skills = [s for s in row['skills'] if s] | |
| if valid_skills: | |
| text_parts.append(f"Skills: {', '.join(valid_skills)}") | |
| # 3. Work Experience (JSONB List of Dicts) | |
| # Schema keys: role, company, description, duration | |
| if row.get('work_experience') and isinstance(row['work_experience'], list): | |
| exps = [] | |
| for item in row['work_experience']: | |
| if isinstance(item, dict): | |
| role = item.get('role', '') | |
| company = item.get('company', '') | |
| desc = item.get('description', '') | |
| # Format: "Role at Company: Description" | |
| entry = f"{role} at {company}".strip() | |
| if desc: | |
| entry += f": {desc}" | |
| exps.append(entry) | |
| if exps: | |
| text_parts.append("Work Experience:\n" + "\n".join(exps)) | |
| # 4. Projects (JSONB List of Dicts) | |
| # Schema keys: title, description, link | |
| if row.get('projects') and isinstance(row['projects'], list): | |
| projs = [] | |
| for item in row['projects']: | |
| if isinstance(item, dict): | |
| title = item.get('title', '') | |
| desc = item.get('description', '') | |
| entry = f"{title}".strip() | |
| if desc: | |
| entry += f": {desc}" | |
| projs.append(entry) | |
| if projs: | |
| text_parts.append("Projects:\n" + "\n".join(projs)) | |
| # 5. Education (JSONB List of Dicts) | |
| # Schema keys: degree, institution, year | |
| if row.get('education') and isinstance(row['education'], list): | |
| edus = [] | |
| for item in row['education']: | |
| if isinstance(item, dict): | |
| degree = item.get('degree', '') | |
| inst = item.get('institution', '') | |
| entry = f"{degree} from {inst}".strip() | |
| edus.append(entry) | |
| if edus: | |
| text_parts.append("Education: " + ", ".join(edus)) | |
| # 6. Certifications (JSONB List of Dicts) | |
| # Schema keys: name, issuer | |
| if row.get('certifications') and isinstance(row['certifications'], list): | |
| certs = [] | |
| for item in row['certifications']: | |
| if isinstance(item, dict): | |
| name = item.get('name', '') | |
| issuer = item.get('issuer', '') | |
| entry = f"{name} by {issuer}".strip() | |
| certs.append(entry) | |
| if certs: | |
| text_parts.append("Certifications: " + ", ".join(certs)) | |
| # 7. Achievements (JSONB List of Dicts) | |
| if row.get('achievements') and isinstance(row['achievements'], list): | |
| achievements = [] | |
| for item in row['achievements']: | |
| if isinstance(item, dict): | |
| title = item.get('title', '') | |
| desc = item.get('description', '') | |
| entry = f"{title}: {desc}".strip() | |
| achievements.append(entry) | |
| if achievements: | |
| text_parts.append("Achievements: " + "; ".join(achievements)) | |
| return "\n\n".join(text_parts) | |
| def update_db_batch(conn, updates): | |
| if DRY_RUN: return | |
| # Updated to target public.candidates and cast ID to UUID | |
| query = """ | |
| UPDATE public.candidates AS c | |
| SET embeddings = data.vector::vector, | |
| embeddings_created_at = NOW() | |
| FROM (VALUES %s) AS data (id, vector) | |
| WHERE c.id = data.id::uuid | |
| """ | |
| cursor = conn.cursor() | |
| try: | |
| execute_values(cursor, query, updates) | |
| conn.commit() | |
| except Exception as e: | |
| conn.rollback() | |
| raise e | |
| finally: | |
| cursor.close() | |
| def run_worker_logic(): | |
| """ | |
| The core logic that runs one single batch processing. | |
| """ | |
| log_buffer = [] | |
| timestamp = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S") | |
| log_buffer.append(f"<b>BATCH RUN: {timestamp}</b>") | |
| conn = None | |
| try: | |
| conn = psycopg2.connect(SUPABASE_CONNECTION_STRING, sslmode='require') | |
| # 1. Fetch & Lock | |
| df = fetch_and_lock_chunk(conn, PROCESSING_CHUNK_SIZE) | |
| if df.empty: | |
| conn.rollback() | |
| log_buffer.append("π€ No pending candidates found.") | |
| execution_logs.appendleft("<br>".join(log_buffer)) | |
| return "No data" | |
| log_buffer.append(f"π Locked & Processing {len(df)} candidates...") | |
| # 2. Clean Text | |
| df['full_text'] = df.apply(clean_and_format_text, axis=1) | |
| # 3. Log Inputs (For the Root API view) | |
| for index, row in df.iterrows(): | |
| log_buffer.append(f"<div style='border:1px solid #ccc; margin:5px; padding:5px; background:#f9f9f9'>") | |
| # row['id'] is now the UUID | |
| log_buffer.append(f"<strong>ID: {row['id']} ({row.get('name', 'Unknown')})</strong>") | |
| log_buffer.append(f"<pre style='white-space: pre-wrap;'>{row['full_text']}</pre>") | |
| log_buffer.append("</div>") | |
| # 4. Generate Embeddings | |
| embeddings = model.encode( | |
| df['full_text'].tolist(), | |
| batch_size=EMBEDDING_BATCH_SIZE, | |
| show_progress_bar=False, | |
| convert_to_numpy=True, | |
| normalize_embeddings=True | |
| ) | |
| # 5. Update DB | |
| # Ensure ID is converted to string for the tuple list if it isn't already | |
| updates = list(zip(df['id'].astype(str).tolist(), embeddings.tolist())) | |
| if not DRY_RUN: | |
| update_db_batch(conn, updates) | |
| log_buffer.append(f"β Successfully updated {len(df)} profiles.") | |
| else: | |
| conn.rollback() | |
| log_buffer.append("β οΈ Dry Run: No DB updates made.") | |
| except Exception as e: | |
| if conn: conn.rollback() | |
| log_buffer.append(f"β ERROR: {str(e)}") | |
| print(f"Error: {e}") | |
| finally: | |
| if conn: conn.close() | |
| execution_logs.appendleft("<br>".join(log_buffer)) | |
| # --- API Endpoints --- | |
| async def read_root(): | |
| html_content = """ | |
| <html> | |
| <head> | |
| <title>Embedding Worker Logs</title> | |
| <style> | |
| body { font-family: monospace; padding: 20px; } | |
| h1 { color: #333; } | |
| .log-entry { margin-bottom: 20px; border-bottom: 2px solid #333; padding-bottom: 20px; } | |
| </style> | |
| </head> | |
| <body> | |
| <h1>π Candidates Embedding Worker</h1> | |
| <p><i>Most recent batches shown first.</i></p> | |
| <hr> | |
| """ | |
| if not execution_logs: | |
| html_content += "<p>No logs yet. Hit the <code>/trigger-batch</code> endpoint to start processing.</p>" | |
| for entry in execution_logs: | |
| html_content += f"<div class='log-entry'>{entry}</div>" | |
| html_content += "</body></html>" | |
| return html_content | |
| async def trigger_processing(background_tasks: BackgroundTasks): | |
| if processing_lock.locked(): | |
| return {"status": "busy", "message": "Worker is currently processing a previous batch."} | |
| background_tasks.add_task(wrapped_worker) | |
| return {"status": "started", "message": "Batch processing started in background."} | |
| def wrapped_worker(): | |
| if processing_lock.acquire(blocking=False): | |
| try: | |
| run_worker_logic() | |
| finally: | |
| processing_lock.release() |