from pptx import Presentation from pptx.enum.shapes import MSO_SHAPE_TYPE import easyocr from PIL import Image import io import os import json import numpy as np from concurrent.futures import ProcessPoolExecutor, as_completed from multiprocessing import cpu_count import time from functools import partial class PowerPointExtractor: def __init__(self, languages=["en", "bg"]): """Initialize EasyOCR reader with specified languages""" print("Initializing EasyOCR reader...") self.reader = easyocr.Reader(languages) print("EasyOCR reader initialized successfully!") def extract_text_from_slide(self, slide): """Extract regular text content from a slide""" slide_text = [] for shape in slide.shapes: if hasattr(shape, "text") and shape.text.strip(): slide_text.append(shape.text.strip()) return slide_text def extract_images_from_slide(self, slide): """Extract and OCR text from images in a slide""" image_texts = [] for shape in slide.shapes: if shape.shape_type == MSO_SHAPE_TYPE.PICTURE: try: # Extract image image = shape.image image_bytes = image.blob # Convert to PIL Image pil_image = Image.open(io.BytesIO(image_bytes)) # Convert to RGB if needed if pil_image.mode != "RGB": pil_image = pil_image.convert("RGB") # Convert to numpy array image_array = np.array(pil_image) # OCR the image using EasyOCR results = self.reader.readtext(image_array) # Extract text from results extracted_text = [] confidence_scores = [] for bbox, text, confidence in results: if confidence > 0.5: # Filter out low-confidence results extracted_text.append(text.strip()) confidence_scores.append(confidence) if extracted_text: image_texts.append( { "text": extracted_text, "confidence_scores": confidence_scores, } ) except Exception as e: print(f"Error processing image: {e}") continue return image_texts def process_powerpoint(self, file_path, output_dir="./output"): """Process a single PowerPoint file and save individual files""" try: filename = os.path.basename(file_path) file_stem = os.path.splitext(filename)[0] print(f"Processing: {filename}") prs = Presentation(file_path) presentation_data = { "filename": filename, "total_slides": len(prs.slides), "slides": [], } for slide_num, slide in enumerate(prs.slides, 1): # Extract regular text regular_text = self.extract_text_from_slide(slide) # Extract text from images image_content = self.extract_images_from_slide(slide) slide_data = { "slide_number": slide_num, "text_content": regular_text, "image_content": image_content, "has_text": len(regular_text) > 0, "has_images_with_text": len(image_content) > 0, } presentation_data["slides"].append(slide_data) # Save individual JSON file json_output = os.path.join(output_dir, f"{file_stem}.json") with open(json_output, "w", encoding="utf-8") as f: json.dump(presentation_data, f, indent=2, ensure_ascii=False) # Save individual readable text file txt_output = os.path.join(output_dir, f"{file_stem}.txt") self.save_readable_text([presentation_data], txt_output) print(f"✓ Completed: {filename} -> {file_stem}.json + {file_stem}.txt") return { "success": True, "filename": filename, "data": presentation_data, "json_file": json_output, "txt_file": txt_output, } except Exception as e: print(f"✗ Error processing {file_path}: {e}") return { "success": False, "filename": os.path.basename(file_path), "error": str(e), } def save_readable_text(self, data, output_file): """Save a human-readable version of the extracted content""" with open(output_file, "w", encoding="utf-8") as f: for presentation in data: f.write(f"{'='*80}\n") f.write(f"FILE: {presentation['filename']}\n") f.write(f"TOTAL SLIDES: {presentation['total_slides']}\n") f.write(f"{'='*80}\n") for slide in presentation["slides"]: f.write(f"\n--- SLIDE {slide['slide_number']} ---\n") # Regular text content if slide["text_content"]: f.write("TEXT CONTENT:\n") for text in slide["text_content"]: f.write(f"• {text}\n") # Image text content if slide["image_content"]: f.write("\nTEXT FROM IMAGES:\n") for img_idx, img_data in enumerate(slide["image_content"], 1): f.write(f"Image {img_idx}:\n") for text in img_data["text"]: f.write(f" • {text}\n") f.write("\n") # Global function for parallel processing (needed for multiprocessing) def process_single_powerpoint(file_info): """Process a single PowerPoint file - used for parallel processing""" file_path, output_dir, languages = file_info # Create a new extractor instance for each process extractor = PowerPointExtractor(languages=languages) return extractor.process_powerpoint(file_path, output_dir) class ParallelPowerPointProcessor: def __init__(self, languages=["en", "bg"], max_workers=None): """Initialize parallel processor""" self.languages = languages self.max_workers = max_workers or min( cpu_count(), 4 ) # Limit to 4 to avoid overwhelming the system print(f"Parallel processor initialized with {self.max_workers} workers") def process_folder(self, folder_path="./data_ppt", output_dir="./output"): """Process all PowerPoint files in parallel""" start_time = time.time() # Create output directory os.makedirs(output_dir, exist_ok=True) # Find all PowerPoint files if not os.path.exists(folder_path): print(f"Error: Directory {folder_path} does not exist!") return ppt_files = [ f for f in os.listdir(folder_path) if f.endswith((".pptx", ".ppt")) ] if not ppt_files: print(f"No PowerPoint files found in {folder_path}") return print(f"Found {len(ppt_files)} PowerPoint files to process...") print(f"Processing with {self.max_workers} parallel workers...") # Prepare file information for parallel processing file_infos = [ (os.path.join(folder_path, filename), output_dir, self.languages) for filename in ppt_files ] # Process files in parallel results = [] successful_results = [] failed_results = [] with ProcessPoolExecutor(max_workers=self.max_workers) as executor: # Submit all tasks future_to_file = { executor.submit(process_single_powerpoint, file_info): file_info[0] for file_info in file_infos } # Collect results as they complete for future in as_completed(future_to_file): file_path = future_to_file[future] try: result = future.result() results.append(result) if result["success"]: successful_results.append(result) else: failed_results.append(result) except Exception as e: print(f"✗ Exception processing {os.path.basename(file_path)}: {e}") failed_results.append( { "success": False, "filename": os.path.basename(file_path), "error": str(e), } ) # Combine all successful results if successful_results: self.combine_results(successful_results, output_dir) # Print summary end_time = time.time() self.print_summary(successful_results, failed_results, end_time - start_time) return results def combine_results(self, successful_results, output_dir): """Combine all individual results into master files""" print("\nCombining individual results into master files...") # Combine JSON data combined_data = [result["data"] for result in successful_results] # Save combined JSON combined_json_path = os.path.join(output_dir, "combined_all_presentations.json") with open(combined_json_path, "w", encoding="utf-8") as f: json.dump(combined_data, f, indent=2, ensure_ascii=False) # Save combined readable text combined_txt_path = os.path.join(output_dir, "combined_all_presentations.txt") extractor = PowerPointExtractor( self.languages ) # Create instance for the method extractor.save_readable_text(combined_data, combined_txt_path) print(f"✓ Combined results saved:") print(f" - JSON: {combined_json_path}") print(f" - TXT: {combined_txt_path}") def print_summary(self, successful_results, failed_results, total_time): """Print processing summary""" total_files = len(successful_results) + len(failed_results) total_slides = sum( result["data"]["total_slides"] for result in successful_results ) print("\n" + "=" * 60) print("PARALLEL PROCESSING SUMMARY") print("=" * 60) print(f"Total files processed: {total_files}") print(f"Successful: {len(successful_results)}") print(f"Failed: {len(failed_results)}") print(f"Total slides processed: {total_slides}") print(f"Total processing time: {total_time:.2f} seconds") print(f"Average time per file: {total_time/total_files:.2f} seconds") if failed_results: print("\nFailed files:") for result in failed_results: print(f" ✗ {result['filename']}: {result['error']}") print("=" * 60) def main(): """Main function to run parallel processing""" # Initialize parallel processor processor = ParallelPowerPointProcessor( languages=["en", "bg"], # Add Bulgarian for your medical presentations max_workers=8, # Adjust based on your system capabilities ) # Process all PowerPoints in parallel results = processor.process_folder(folder_path="./data_test", output_dir="./output") print("\n🎉 All processing completed!") print("Check the './output' directory for:") print(" - Individual JSON files for each presentation") print(" - Individual TXT files for each presentation") print(" - Combined master files with all presentations") if __name__ == "__main__": main()