import os import sys import gc import numpy as np import cv2 from PIL import Image, ImageEnhance, ImageFilter import logging import base64 import io import pytesseract from flask import Flask, request, jsonify from flask_cors import CORS import re # Set up logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) app = Flask(__name__) CORS(app) # Configure Tesseract path (adjust if needed) # For Ubuntu/Debian: usually /usr/bin/tesseract # For Windows: might need to set custom path # pytesseract.pytesseract.tesseract_cmd = r'/usr/bin/tesseract' def preprocess_image_advanced(image, enhance_type="default"): """ Advanced image preprocessing with better handling for exam papers and documents """ try: # Convert PIL to OpenCV format if needed if isinstance(image, Image.Image): # Convert to RGB first if image.mode != 'RGB': image = image.convert('RGB') cv_image = cv2.cvtColor(np.array(image), cv2.COLOR_RGB2BGR) else: cv_image = image # Convert to grayscale if len(cv_image.shape) == 3: gray = cv2.cvtColor(cv_image, cv2.COLOR_BGR2GRAY) else: gray = cv_image # Get image dimensions height, width = gray.shape # Apply preprocessing based on type if enhance_type == "default": # Resize if too small (important for OCR accuracy) if max(height, width) < 600: scale_factor = 600 / max(height, width) new_width = int(width * scale_factor) new_height = int(height * scale_factor) gray = cv2.resize(gray, (new_width, new_height), interpolation=cv2.INTER_CUBIC) # Denoise gray = cv2.medianBlur(gray, 3) # Enhance contrast slightly gray = cv2.convertScaleAbs(gray, alpha=1.1, beta=10) elif enhance_type == "document": # Optimized for document/exam paper processing # Resize for better OCR if max(height, width) < 800: scale_factor = 800 / max(height, width) new_width = int(width * scale_factor) new_height = int(height * scale_factor) gray = cv2.resize(gray, (new_width, new_height), interpolation=cv2.INTER_CUBIC) # Remove noise while preserving text gray = cv2.bilateralFilter(gray, 9, 75, 75) # Enhance contrast with CLAHE clahe = cv2.createCLAHE(clipLimit=3.0, tileGridSize=(8,8)) gray = clahe.apply(gray) # Morphological operations to clean up text kernel = np.ones((1,1), np.uint8) gray = cv2.morphologyEx(gray, cv2.MORPH_CLOSE, kernel) elif enhance_type == "enhance": # Maximum enhancement for poor quality images # Aggressive resizing if max(height, width) < 1000: scale_factor = 1000 / max(height, width) new_width = int(width * scale_factor) new_height = int(height * scale_factor) gray = cv2.resize(gray, (new_width, new_height), interpolation=cv2.INTER_CUBIC) # Strong denoising gray = cv2.bilateralFilter(gray, 15, 80, 80) # Enhance contrast significantly clahe = cv2.createCLAHE(clipLimit=4.0, tileGridSize=(8,8)) gray = clahe.apply(gray) # Sharpening kernel = np.array([[-1,-1,-1], [-1,9,-1], [-1,-1,-1]]) gray = cv2.filter2D(gray, -1, kernel) elif enhance_type == "binary": # Convert to binary for text documents # Use adaptive thresholding gray = cv2.adaptiveThreshold(gray, 255, cv2.ADAPTIVE_THRESH_GAUSSIAN_C, cv2.THRESH_BINARY, 11, 2) # Clean up with morphological operations kernel = np.ones((1,1), np.uint8) gray = cv2.morphologyEx(gray, cv2.MORPH_CLOSE, kernel) return gray except Exception as e: logger.error(f"Preprocessing error: {e}") # Return original grayscale as fallback if isinstance(image, Image.Image): return np.array(image.convert('L')) return cv2.cvtColor(image, cv2.COLOR_BGR2GRAY) if len(image.shape) == 3 else image def post_process_text(text): """ Post-process OCR text to fix common issues """ if not text or not text.strip(): return text # Clean up the text processed_text = text # Fix common OCR character substitutions char_replacements = { '0': 'O', # Zero to O in words '1': 'I', # One to I in words '5': 'S', # Five to S in words '8': 'B', # Eight to B in words 'rn': 'm', # Common OCR error 'vv': 'w', # Common OCR error '|': 'I', # Pipe to I '!': 'I', # Exclamation to I } # Apply character replacements contextually words = processed_text.split() corrected_words = [] for word in words: if word and len(word) > 1: # Don't replace numbers in obvious numeric contexts if not re.match(r'^\d+$', word): corrected_word = word for old_char, new_char in char_replacements.items(): if old_char in corrected_word and not corrected_word.isdigit(): corrected_word = corrected_word.replace(old_char, new_char) corrected_words.append(corrected_word) else: corrected_words.append(word) else: corrected_words.append(word) processed_text = ' '.join(corrected_words) # Add spaces before capital letters that seem to be concatenated words processed_text = re.sub(r'([a-z])([A-Z])', r'\1 \2', processed_text) # Add spaces before numbers that seem concatenated with letters processed_text = re.sub(r'([a-zA-Z])(\d)', r'\1 \2', processed_text) processed_text = re.sub(r'(\d)([a-zA-Z])', r'\1 \2', processed_text) # Fix common word concatenations common_fixes = { 'thebest': 'the best', 'inall': 'in all', 'whichts': 'which is', 'Q1': 'Q1.', 'Q2': 'Q2.', 'Q3': 'Q3.', 'Q4': 'Q4.', 'Q5': 'Q5.', 'Q6': 'Q6.', 'Q7': 'Q7.', 'Q8': 'Q8.', 'Q9': 'Q9.', 'aWhat': 'a) What', 'bWhat': 'b) What', 'cWhat': 'c) What', 'dWhat': 'd) What', 'eWhat': 'e) What', 'bMention': 'b) Mention', 'cState': 'c) State', 'aState': 'a) State', 'bExplain': 'b) Explain', 'aExplain': 'a) Explain', 'cExplain': 'c) Explain', 'dExplain': 'd) Explain', 'eExplain': 'e) Explain', 'ENDTERM': 'END TERM', 'EXAMINATION': 'EXAMINATION', 'MaxtmumMarks': 'Maximum Marks', 'Attemptfivequestions': 'Attempt five questions', 'compulsory': 'compulsory', 'Sx525': '5×5=25', 'bjDefine': 'b) Define', 'foracoin': 'for a coin', 'tossingexperiment': 'tossing experiment', 'reasonsforhigher': 'reasons for higher', 'noiseinmixers': 'noise in mixers', 'typesofanalog': 'types of analog', 'advantagesofVSBAM': 'advantages of VSBAM', 'jointprobability': 'joint probability', 'conditionalprobability': 'conditional probability', 'twoproperties': 'two properties', 'GaussianProcess': 'Gaussian Process', 'fourproperties': 'four properties', 'powerspectraldensity': 'power spectral density', 'poissionprocess': 'Poisson process', 'weinerprocess': 'Wiener process', 'analogmodulation': 'analog modulation', 'suitablediagram': 'suitable diagram', 'needmodulation': 'need modulation', 'DSBSCmodulation': 'DSBSC modulation', 'demodulationwith': 'demodulation with', 'coherentdetection': 'coherent detection', 'theirdrawbacks': 'their drawbacks', 'broadcastradio': 'broadcast radio', 'transmitterradiates': 'transmitter radiates', 'kWpowerwhen': 'kW power when', 'modulationpercentage': 'modulation percentage', 'carrierpower': 'carrier power', 'carrierfrequency': 'carrier frequency', 'frequencymodulated': 'frequency modulated', 'sinusoidalsignal': 'sinusoidal signal', 'KHzresulting': 'KHz resulting', 'maximumfrequency': 'maximum frequency', 'deviationof': 'deviation of', 'approximatebandwidth': 'approximate bandwidth', 'modulatedsignal': 'modulated signal', 'narrowbandFM': 'narrowband FM', 'widebandFM': 'wideband FM', 'twomethods': 'two methods', 'producingFM': 'producing FM', 'ratiodetector': 'ratio detector', 'preemphasis': 'pre-emphasis', 'deemphasis': 'de-emphasis', 'processprovideoferall': 'process provide overall', 'SNRimprovement': 'SNR improvement', 'FMsystems': 'FM systems', 'shortnoteon': 'short note on', 'captureeffect': 'capture effect', 'thresholdeffect': 'threshold effect', 'externalnoise': 'external noise', 'externalsources': 'external sources', 'desemphasis': 'de-emphasis' } for old_phrase, new_phrase in common_fixes.items(): processed_text = processed_text.replace(old_phrase, new_phrase) # Clean up extra spaces processed_text = re.sub(r'\s+', ' ', processed_text) processed_text = processed_text.strip() return processed_text def extract_text_tesseract_adaptive(image, lang='eng', psm=6): """ Adaptive OCR that tries multiple configurations for different image types """ try: # Strategy 1: Try with conservative whitelist first try: whitelist_chars = '0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz .,!?-:;()[]{}=+×÷%/' custom_config = f'--oem 3 --psm {psm} -c tessedit_char_whitelist={whitelist_chars}' text = pytesseract.image_to_string(image, lang=lang, config=custom_config) data = pytesseract.image_to_data(image, lang=lang, config=custom_config, output_type=pytesseract.Output.DICT) # Check if we got reasonable results if text.strip() and len(text.strip()) > 0: logger.info("Strategy 1 (whitelist) successful") return process_ocr_result(text, data, "whitelist") except Exception as e: logger.warning(f"Strategy 1 (whitelist) failed: {e}") # Strategy 2: Try without whitelist but with other optimizations try: custom_config = f'--oem 3 --psm {psm} -c tessedit_do_invert=0' text = pytesseract.image_to_string(image, lang=lang, config=custom_config) data = pytesseract.image_to_data(image, lang=lang, config=custom_config, output_type=pytesseract.Output.DICT) if text.strip() and len(text.strip()) > 0: logger.info("Strategy 2 (no whitelist) successful") return process_ocr_result(text, data, "no_whitelist") except Exception as e: logger.warning(f"Strategy 2 (no whitelist) failed: {e}") # Strategy 3: Basic configuration as fallback try: custom_config = f'--oem 3 --psm {psm}' text = pytesseract.image_to_string(image, lang=lang, config=custom_config) data = pytesseract.image_to_data(image, lang=lang, config=custom_config, output_type=pytesseract.Output.DICT) logger.info("Strategy 3 (basic) used as fallback") return process_ocr_result(text, data, "basic") except Exception as e: logger.error(f"All OCR strategies failed: {e}") return {'text': '', 'raw_text': '', 'confidence': 0.0, 'word_count': 0} except Exception as e: logger.error(f"Adaptive OCR error: {e}") return {'text': '', 'raw_text': '', 'confidence': 0.0, 'word_count': 0} def process_ocr_result(text, data, strategy): """Helper function to process OCR results consistently""" # Calculate average confidence confidences = [int(conf) for conf in data['conf'] if int(conf) > 0] avg_confidence = sum(confidences) / len(confidences) if confidences else 0 # Post-process the text cleaned_text = post_process_text(text) return { 'text': cleaned_text, 'raw_text': text, 'confidence': avg_confidence / 100.0, 'word_count': len([w for w in data['text'] if w.strip()]), 'strategy': strategy } def process_image_smart_improved(image, enhance_type="default"): """ Smart processing with adaptive OCR strategies """ try: # First, try with advanced preprocessing processed_img = preprocess_image_advanced(image, enhance_type) # Try different approaches with adaptive OCR results = [] # Mode 6: Block of text (best for documents) result = extract_text_tesseract_adaptive(processed_img, psm=6) if result['text']: results.append(('psm_6', result)) # If confidence is low, try document-specific preprocessing if not results or results[0][1]['confidence'] < 0.6: if enhance_type != "document": doc_processed = preprocess_image_advanced(image, "document") result = extract_text_tesseract_adaptive(doc_processed, psm=6) if result['text'] and result['confidence'] > (results[0][1]['confidence'] if results else 0): results = [('psm_6_document', result)] # Try other PSM modes if still poor results if not results or results[0][1]['confidence'] < 0.5: # Mode 4: Single column of text result = extract_text_tesseract_adaptive(processed_img, psm=4) if result['text']: results.append(('psm_4', result)) # Mode 13: Single text line result = extract_text_tesseract_adaptive(processed_img, psm=13) if result['text']: results.append(('psm_13', result)) # Return the best result if results: best_method, best_result = max(results, key=lambda x: x[1]['confidence']) best_result['method'] = best_method best_result['preprocessing'] = enhance_type return best_result else: return { 'text': '', 'raw_text': '', 'confidence': 0.0, 'word_count': 0, 'method': 'none', 'preprocessing': enhance_type } except Exception as e: logger.error(f"Smart processing error: {e}") return { 'text': '', 'raw_text': '', 'confidence': 0.0, 'word_count': 0, 'method': 'error', 'preprocessing': enhance_type } # Alternative: Image-specific preprocessing detector def detect_image_type(image): """ Detect image characteristics to choose optimal processing """ try: # Convert to numpy array for analysis if isinstance(image, Image.Image): img_array = np.array(image.convert('RGB')) else: img_array = image # Calculate image statistics gray = cv2.cvtColor(img_array, cv2.COLOR_RGB2GRAY) if len(img_array.shape) == 3 else img_array height, width = gray.shape # Check image size is_small = max(height, width) < 600 # Check contrast contrast = gray.std() is_low_contrast = contrast < 50 # Check if mostly text (high edge density in certain patterns) edges = cv2.Canny(gray, 50, 150) edge_density = np.sum(edges > 0) / (height * width) is_text_heavy = edge_density > 0.1 # Determine optimal enhancement if is_small or is_low_contrast: return "enhance" elif is_text_heavy: return "document" else: return "default" except Exception as e: logger.warning(f"Image type detection failed: {e}") return "default" # Enhanced OCR endpoint with auto-detection def ocr_endpoint_enhanced(): """ OCR endpoint with automatic image type detection """ try: logger.info("OCR request received") # ... (existing parameter handling code) ... # Auto-detect optimal enhancement if not specified if enhancement == 'auto': enhancement = detect_image_type(image) logger.info(f"Auto-detected enhancement type: {enhancement}") # Process image with improved OCR logger.info("Starting adaptive OCR processing") result = process_image_smart_improved(image, enhancement) # Add debugging info response = { "success": True, "text": result['text'], "confidence": round(result['confidence'], 3), "character_count": len(result['text']), "word_count": result.get('word_count', 0), "method_used": result.get('method', 'unknown'), "preprocessing_used": result.get('preprocessing', 'unknown'), "ocr_strategy": result.get('strategy', 'unknown'), # New field "language": language, "engine": "PyTesseract Adaptive" } return jsonify(response) except Exception as e: logger.error(f"OCR processing error: {str(e)}") return jsonify({"error": str(e), "success": False}), 500 @app.route('/') def home(): """Root endpoint""" return jsonify({ "service": "Enhanced PyTesseract OCR", "status": "running", "version": "2.0.0", "engine": "PyTesseract", "description": "Advanced OCR service with improved text processing for documents and exam papers", "endpoints": { "health": "/health", "ocr": "/ocr (POST)", "batch_ocr": "/ocr/batch (POST)" }, "supported_formats": ["PNG", "JPEG", "JPG", "BMP", "TIFF", "GIF"], "preprocessing_types": ["default", "document", "enhance", "binary"], "languages": ["eng", "fra", "deu", "spa", "ita", "por"], "features": [ "Advanced text post-processing", "Document-optimized preprocessing", "Smart character correction", "Word separation for concatenated text", "Exam paper and form optimization", "Multiple OCR modes with fallback", "Improved spacing and formatting" ] }) @app.route('/health', methods=['GET']) def health_check(): """Health check endpoint""" try: test_result = pytesseract.get_tesseract_version() return jsonify({ "status": "healthy", "tesseract_version": test_result.public, "service": "Enhanced PyTesseract OCR" }) except Exception as e: return jsonify({ "status": "error", "error": str(e), "service": "Enhanced PyTesseract OCR" }), 500 @app.route('/ocr', methods=['POST']) def ocr_endpoint(): """Enhanced OCR endpoint with better text processing""" try: logger.info("OCR request received") # Check if image is provided if 'image' not in request.files and not request.is_json: return jsonify({"error": "No image provided. Use 'image' field for file upload or JSON with 'image_base64'"}), 400 # Get parameters if request.is_json: enhancement = request.json.get('enhancement', 'default') language = request.json.get('language', 'eng') include_raw = request.json.get('include_raw', False) else: enhancement = request.form.get('enhancement', 'default') language = request.form.get('language', 'eng') include_raw = request.form.get('include_raw', 'false').lower() == 'true' # Validate parameters valid_enhancements = ['default', 'document', 'enhance', 'binary'] if enhancement not in valid_enhancements: return jsonify({"error": f"Invalid enhancement type. Use: {', '.join(valid_enhancements)}"}), 400 # Load image try: if 'image' in request.files: image_file = request.files['image'] if image_file.filename == '': return jsonify({"error": "No file selected"}), 400 image_data = image_file.read() image = Image.open(io.BytesIO(image_data)) else: image_data = request.json['image_base64'] if image_data.startswith('data:image'): image_data = image_data.split(',')[1] image_bytes = base64.b64decode(image_data) image = Image.open(io.BytesIO(image_bytes)) # Validate image if image.size[0] == 0 or image.size[1] == 0: return jsonify({"error": "Invalid image dimensions"}), 400 except Exception as e: return jsonify({"error": f"Invalid image: {str(e)}"}), 400 # Process image with improved OCR logger.info("Starting enhanced OCR processing") result = process_image_smart_improved(image, enhancement) # Clean up del image gc.collect() logger.info(f"OCR completed. Text length: {len(result['text'])}, Confidence: {result['confidence']:.2f}") response = { "success": True, "text": result['text'], "confidence": round(result['confidence'], 3), "character_count": len(result['text']), "word_count": result.get('word_count', 0), "method_used": result.get('method', 'unknown'), "preprocessing_used": result.get('preprocessing', 'unknown'), "language": language, "engine": "PyTesseract Enhanced" } # Include raw text if requested if include_raw and 'raw_text' in result: response["raw_text"] = result['raw_text'] return jsonify(response) except Exception as e: logger.error(f"OCR processing error: {str(e)}") gc.collect() return jsonify({"error": str(e), "success": False}), 500 @app.route('/ocr/batch', methods=['POST']) def batch_ocr_endpoint(): """Enhanced batch OCR endpoint""" try: logger.info("Batch OCR request received") if 'images' not in request.files: return jsonify({"error": "No images provided. Use 'images' field for multiple file upload"}), 400 images = request.files.getlist('images') if not images: return jsonify({"error": "No images found"}), 400 # Limit batch size max_batch_size = 5 if len(images) > max_batch_size: return jsonify({"error": f"Maximum {max_batch_size} images allowed per batch"}), 400 enhancement = request.form.get('enhancement', 'default') language = request.form.get('language', 'eng') include_raw = request.form.get('include_raw', 'false').lower() == 'true' results = [] for i, image_file in enumerate(images): try: logger.info(f"Processing image {i+1}/{len(images)}") if image_file.filename == '': results.append({ "index": i, "filename": "empty_file", "error": "Empty filename", "success": False }) continue image_data = image_file.read() image = Image.open(io.BytesIO(image_data)) # Process with enhanced OCR result = process_image_smart_improved(image, enhancement) batch_result = { "index": i, "filename": image_file.filename, "text": result['text'], "confidence": round(result['confidence'], 3), "character_count": len(result['text']), "word_count": result.get('word_count', 0), "method_used": result.get('method', 'unknown'), "success": True } if include_raw and 'raw_text' in result: batch_result["raw_text"] = result['raw_text'] results.append(batch_result) # Clean up del image gc.collect() except Exception as e: logger.error(f"Error processing image {i}: {str(e)}") results.append({ "index": i, "filename": image_file.filename if hasattr(image_file, 'filename') else f"image_{i}", "error": str(e), "success": False }) gc.collect() successful_count = sum(1 for r in results if r["success"]) return jsonify({ "success": True, "results": results, "total_processed": len(results), "successful": successful_count, "failed": len(results) - successful_count, "enhancement_used": enhancement, "language": language, "engine": "PyTesseract Enhanced" }) except Exception as e: logger.error(f"Batch OCR error: {str(e)}") gc.collect() return jsonify({"error": str(e), "success": False}), 500 @app.route('/languages', methods=['GET']) def get_languages(): """Get available languages""" try: languages = { 'eng': 'English', 'fra': 'French', 'deu': 'German', 'spa': 'Spanish', 'ita': 'Italian', 'por': 'Portuguese', 'rus': 'Russian', 'chi_sim': 'Chinese Simplified', 'chi_tra': 'Chinese Traditional', 'jpn': 'Japanese', 'kor': 'Korean', 'ara': 'Arabic', 'hin': 'Hindi' } return jsonify({ "available_languages": languages, "default": "eng", "note": "Language support depends on your Tesseract installation" }) except Exception as e: return jsonify({"error": str(e)}), 500 @app.errorhandler(404) def not_found(error): return jsonify({ "error": "Endpoint not found", "available_endpoints": { "GET /": "Service information", "GET /health": "Health check", "POST /ocr": "Single image OCR", "POST /ocr/batch": "Batch image OCR", "GET /languages": "Available languages" } }), 404 @app.errorhandler(500) def internal_error(error): gc.collect() return jsonify({ "error": "Internal server error", "message": "Please check server logs" }), 500 if __name__ == '__main__': logger.info("Starting Enhanced PyTesseract OCR service...") port = int(os.environ.get('PORT', 7860)) app.run(host='0.0.0.0', port=port, debug=False, threaded=True)