med-notes/ppt_data/convert1.py
2025-06-25 11:30:35 +00:00

326 lines
12 KiB
Python

from pptx import Presentation
from pptx.enum.shapes import MSO_SHAPE_TYPE
import easyocr
from PIL import Image
import io
import os
import json
import numpy as np
from concurrent.futures import ProcessPoolExecutor, as_completed
from multiprocessing import cpu_count
import time
from functools import partial
class PowerPointExtractor:
def __init__(self, languages=["en", "bg"]):
"""Initialize EasyOCR reader with specified languages"""
print("Initializing EasyOCR reader...")
self.reader = easyocr.Reader(languages)
print("EasyOCR reader initialized successfully!")
def extract_text_from_slide(self, slide):
"""Extract regular text content from a slide"""
slide_text = []
for shape in slide.shapes:
if hasattr(shape, "text") and shape.text.strip():
slide_text.append(shape.text.strip())
return slide_text
def extract_images_from_slide(self, slide):
"""Extract and OCR text from images in a slide"""
image_texts = []
for shape in slide.shapes:
if shape.shape_type == MSO_SHAPE_TYPE.PICTURE:
try:
# Extract image
image = shape.image
image_bytes = image.blob
# Convert to PIL Image
pil_image = Image.open(io.BytesIO(image_bytes))
# Convert to RGB if needed
if pil_image.mode != "RGB":
pil_image = pil_image.convert("RGB")
# Convert to numpy array
image_array = np.array(pil_image)
# OCR the image using EasyOCR
results = self.reader.readtext(image_array)
# Extract text from results
extracted_text = []
confidence_scores = []
for bbox, text, confidence in results:
if confidence > 0.5: # Filter out low-confidence results
extracted_text.append(text.strip())
confidence_scores.append(confidence)
if extracted_text:
image_texts.append(
{
"text": extracted_text,
"confidence_scores": confidence_scores,
}
)
except Exception as e:
print(f"Error processing image: {e}")
continue
return image_texts
def process_powerpoint(self, file_path, output_dir="./output"):
"""Process a single PowerPoint file and save individual files"""
try:
filename = os.path.basename(file_path)
file_stem = os.path.splitext(filename)[0]
print(f"Processing: {filename}")
prs = Presentation(file_path)
presentation_data = {
"filename": filename,
"total_slides": len(prs.slides),
"slides": [],
}
for slide_num, slide in enumerate(prs.slides, 1):
# Extract regular text
regular_text = self.extract_text_from_slide(slide)
# Extract text from images
image_content = self.extract_images_from_slide(slide)
slide_data = {
"slide_number": slide_num,
"text_content": regular_text,
"image_content": image_content,
"has_text": len(regular_text) > 0,
"has_images_with_text": len(image_content) > 0,
}
presentation_data["slides"].append(slide_data)
# Save individual JSON file
json_output = os.path.join(output_dir, f"{file_stem}.json")
with open(json_output, "w", encoding="utf-8") as f:
json.dump(presentation_data, f, indent=2, ensure_ascii=False)
# Save individual readable text file
txt_output = os.path.join(output_dir, f"{file_stem}.txt")
self.save_readable_text([presentation_data], txt_output)
print(f"✓ Completed: {filename} -> {file_stem}.json + {file_stem}.txt")
return {
"success": True,
"filename": filename,
"data": presentation_data,
"json_file": json_output,
"txt_file": txt_output,
}
except Exception as e:
print(f"✗ Error processing {file_path}: {e}")
return {
"success": False,
"filename": os.path.basename(file_path),
"error": str(e),
}
def save_readable_text(self, data, output_file):
"""Save a human-readable version of the extracted content"""
with open(output_file, "w", encoding="utf-8") as f:
for presentation in data:
f.write(f"{'='*80}\n")
f.write(f"FILE: {presentation['filename']}\n")
f.write(f"TOTAL SLIDES: {presentation['total_slides']}\n")
f.write(f"{'='*80}\n")
for slide in presentation["slides"]:
f.write(f"\n--- SLIDE {slide['slide_number']} ---\n")
# Regular text content
if slide["text_content"]:
f.write("TEXT CONTENT:\n")
for text in slide["text_content"]:
f.write(f"{text}\n")
# Image text content
if slide["image_content"]:
f.write("\nTEXT FROM IMAGES:\n")
for img_idx, img_data in enumerate(slide["image_content"], 1):
f.write(f"Image {img_idx}:\n")
for text in img_data["text"]:
f.write(f"{text}\n")
f.write("\n")
# Global function for parallel processing (needed for multiprocessing)
def process_single_powerpoint(file_info):
"""Process a single PowerPoint file - used for parallel processing"""
file_path, output_dir, languages = file_info
# Create a new extractor instance for each process
extractor = PowerPointExtractor(languages=languages)
return extractor.process_powerpoint(file_path, output_dir)
class ParallelPowerPointProcessor:
def __init__(self, languages=["en", "bg"], max_workers=None):
"""Initialize parallel processor"""
self.languages = languages
self.max_workers = max_workers or min(
cpu_count(), 4
) # Limit to 4 to avoid overwhelming the system
print(f"Parallel processor initialized with {self.max_workers} workers")
def process_folder(self, folder_path="./data_ppt", output_dir="./output"):
"""Process all PowerPoint files in parallel"""
start_time = time.time()
# Create output directory
os.makedirs(output_dir, exist_ok=True)
# Find all PowerPoint files
if not os.path.exists(folder_path):
print(f"Error: Directory {folder_path} does not exist!")
return
ppt_files = [
f for f in os.listdir(folder_path) if f.endswith((".pptx", ".ppt"))
]
if not ppt_files:
print(f"No PowerPoint files found in {folder_path}")
return
print(f"Found {len(ppt_files)} PowerPoint files to process...")
print(f"Processing with {self.max_workers} parallel workers...")
# Prepare file information for parallel processing
file_infos = [
(os.path.join(folder_path, filename), output_dir, self.languages)
for filename in ppt_files
]
# Process files in parallel
results = []
successful_results = []
failed_results = []
with ProcessPoolExecutor(max_workers=self.max_workers) as executor:
# Submit all tasks
future_to_file = {
executor.submit(process_single_powerpoint, file_info): file_info[0]
for file_info in file_infos
}
# Collect results as they complete
for future in as_completed(future_to_file):
file_path = future_to_file[future]
try:
result = future.result()
results.append(result)
if result["success"]:
successful_results.append(result)
else:
failed_results.append(result)
except Exception as e:
print(f"✗ Exception processing {os.path.basename(file_path)}: {e}")
failed_results.append(
{
"success": False,
"filename": os.path.basename(file_path),
"error": str(e),
}
)
# Combine all successful results
if successful_results:
self.combine_results(successful_results, output_dir)
# Print summary
end_time = time.time()
self.print_summary(successful_results, failed_results, end_time - start_time)
return results
def combine_results(self, successful_results, output_dir):
"""Combine all individual results into master files"""
print("\nCombining individual results into master files...")
# Combine JSON data
combined_data = [result["data"] for result in successful_results]
# Save combined JSON
combined_json_path = os.path.join(output_dir, "combined_all_presentations.json")
with open(combined_json_path, "w", encoding="utf-8") as f:
json.dump(combined_data, f, indent=2, ensure_ascii=False)
# Save combined readable text
combined_txt_path = os.path.join(output_dir, "combined_all_presentations.txt")
extractor = PowerPointExtractor(
self.languages
) # Create instance for the method
extractor.save_readable_text(combined_data, combined_txt_path)
print(f"✓ Combined results saved:")
print(f" - JSON: {combined_json_path}")
print(f" - TXT: {combined_txt_path}")
def print_summary(self, successful_results, failed_results, total_time):
"""Print processing summary"""
total_files = len(successful_results) + len(failed_results)
total_slides = sum(
result["data"]["total_slides"] for result in successful_results
)
print("\n" + "=" * 60)
print("PARALLEL PROCESSING SUMMARY")
print("=" * 60)
print(f"Total files processed: {total_files}")
print(f"Successful: {len(successful_results)}")
print(f"Failed: {len(failed_results)}")
print(f"Total slides processed: {total_slides}")
print(f"Total processing time: {total_time:.2f} seconds")
print(f"Average time per file: {total_time/total_files:.2f} seconds")
if failed_results:
print("\nFailed files:")
for result in failed_results:
print(f"{result['filename']}: {result['error']}")
print("=" * 60)
def main():
"""Main function to run parallel processing"""
# Initialize parallel processor
processor = ParallelPowerPointProcessor(
languages=["en", "bg"], # Add Bulgarian for your medical presentations
max_workers=8, # Adjust based on your system capabilities
)
# Process all PowerPoints in parallel
results = processor.process_folder(folder_path="./data_test", output_dir="./output")
print("\n🎉 All processing completed!")
print("Check the './output' directory for:")
print(" - Individual JSON files for each presentation")
print(" - Individual TXT files for each presentation")
print(" - Combined master files with all presentations")
if __name__ == "__main__":
main()