326 lines
12 KiB
Python
326 lines
12 KiB
Python
from pptx import Presentation
|
|
from pptx.enum.shapes import MSO_SHAPE_TYPE
|
|
import easyocr
|
|
from PIL import Image
|
|
import io
|
|
import os
|
|
import json
|
|
import numpy as np
|
|
from concurrent.futures import ProcessPoolExecutor, as_completed
|
|
from multiprocessing import cpu_count
|
|
import time
|
|
from functools import partial
|
|
|
|
|
|
class PowerPointExtractor:
|
|
def __init__(self, languages=["en", "bg"]):
|
|
"""Initialize EasyOCR reader with specified languages"""
|
|
print("Initializing EasyOCR reader...")
|
|
self.reader = easyocr.Reader(languages)
|
|
print("EasyOCR reader initialized successfully!")
|
|
|
|
def extract_text_from_slide(self, slide):
|
|
"""Extract regular text content from a slide"""
|
|
slide_text = []
|
|
|
|
for shape in slide.shapes:
|
|
if hasattr(shape, "text") and shape.text.strip():
|
|
slide_text.append(shape.text.strip())
|
|
|
|
return slide_text
|
|
|
|
def extract_images_from_slide(self, slide):
|
|
"""Extract and OCR text from images in a slide"""
|
|
image_texts = []
|
|
|
|
for shape in slide.shapes:
|
|
if shape.shape_type == MSO_SHAPE_TYPE.PICTURE:
|
|
try:
|
|
# Extract image
|
|
image = shape.image
|
|
image_bytes = image.blob
|
|
|
|
# Convert to PIL Image
|
|
pil_image = Image.open(io.BytesIO(image_bytes))
|
|
|
|
# Convert to RGB if needed
|
|
if pil_image.mode != "RGB":
|
|
pil_image = pil_image.convert("RGB")
|
|
|
|
# Convert to numpy array
|
|
image_array = np.array(pil_image)
|
|
|
|
# OCR the image using EasyOCR
|
|
results = self.reader.readtext(image_array)
|
|
|
|
# Extract text from results
|
|
extracted_text = []
|
|
confidence_scores = []
|
|
for bbox, text, confidence in results:
|
|
if confidence > 0.5: # Filter out low-confidence results
|
|
extracted_text.append(text.strip())
|
|
confidence_scores.append(confidence)
|
|
|
|
if extracted_text:
|
|
image_texts.append(
|
|
{
|
|
"text": extracted_text,
|
|
"confidence_scores": confidence_scores,
|
|
}
|
|
)
|
|
|
|
except Exception as e:
|
|
print(f"Error processing image: {e}")
|
|
continue
|
|
|
|
return image_texts
|
|
|
|
def process_powerpoint(self, file_path, output_dir="./output"):
|
|
"""Process a single PowerPoint file and save individual files"""
|
|
try:
|
|
filename = os.path.basename(file_path)
|
|
file_stem = os.path.splitext(filename)[0]
|
|
|
|
print(f"Processing: {filename}")
|
|
|
|
prs = Presentation(file_path)
|
|
presentation_data = {
|
|
"filename": filename,
|
|
"total_slides": len(prs.slides),
|
|
"slides": [],
|
|
}
|
|
|
|
for slide_num, slide in enumerate(prs.slides, 1):
|
|
# Extract regular text
|
|
regular_text = self.extract_text_from_slide(slide)
|
|
|
|
# Extract text from images
|
|
image_content = self.extract_images_from_slide(slide)
|
|
|
|
slide_data = {
|
|
"slide_number": slide_num,
|
|
"text_content": regular_text,
|
|
"image_content": image_content,
|
|
"has_text": len(regular_text) > 0,
|
|
"has_images_with_text": len(image_content) > 0,
|
|
}
|
|
|
|
presentation_data["slides"].append(slide_data)
|
|
|
|
# Save individual JSON file
|
|
json_output = os.path.join(output_dir, f"{file_stem}.json")
|
|
with open(json_output, "w", encoding="utf-8") as f:
|
|
json.dump(presentation_data, f, indent=2, ensure_ascii=False)
|
|
|
|
# Save individual readable text file
|
|
txt_output = os.path.join(output_dir, f"{file_stem}.txt")
|
|
self.save_readable_text([presentation_data], txt_output)
|
|
|
|
print(f"✓ Completed: {filename} -> {file_stem}.json + {file_stem}.txt")
|
|
|
|
return {
|
|
"success": True,
|
|
"filename": filename,
|
|
"data": presentation_data,
|
|
"json_file": json_output,
|
|
"txt_file": txt_output,
|
|
}
|
|
|
|
except Exception as e:
|
|
print(f"✗ Error processing {file_path}: {e}")
|
|
return {
|
|
"success": False,
|
|
"filename": os.path.basename(file_path),
|
|
"error": str(e),
|
|
}
|
|
|
|
def save_readable_text(self, data, output_file):
|
|
"""Save a human-readable version of the extracted content"""
|
|
with open(output_file, "w", encoding="utf-8") as f:
|
|
for presentation in data:
|
|
f.write(f"{'='*80}\n")
|
|
f.write(f"FILE: {presentation['filename']}\n")
|
|
f.write(f"TOTAL SLIDES: {presentation['total_slides']}\n")
|
|
f.write(f"{'='*80}\n")
|
|
|
|
for slide in presentation["slides"]:
|
|
f.write(f"\n--- SLIDE {slide['slide_number']} ---\n")
|
|
|
|
# Regular text content
|
|
if slide["text_content"]:
|
|
f.write("TEXT CONTENT:\n")
|
|
for text in slide["text_content"]:
|
|
f.write(f"• {text}\n")
|
|
|
|
# Image text content
|
|
if slide["image_content"]:
|
|
f.write("\nTEXT FROM IMAGES:\n")
|
|
for img_idx, img_data in enumerate(slide["image_content"], 1):
|
|
f.write(f"Image {img_idx}:\n")
|
|
for text in img_data["text"]:
|
|
f.write(f" • {text}\n")
|
|
|
|
f.write("\n")
|
|
|
|
|
|
# Global function for parallel processing (needed for multiprocessing)
|
|
def process_single_powerpoint(file_info):
|
|
"""Process a single PowerPoint file - used for parallel processing"""
|
|
file_path, output_dir, languages = file_info
|
|
|
|
# Create a new extractor instance for each process
|
|
extractor = PowerPointExtractor(languages=languages)
|
|
return extractor.process_powerpoint(file_path, output_dir)
|
|
|
|
|
|
class ParallelPowerPointProcessor:
|
|
def __init__(self, languages=["en", "bg"], max_workers=None):
|
|
"""Initialize parallel processor"""
|
|
self.languages = languages
|
|
self.max_workers = max_workers or min(
|
|
cpu_count(), 4
|
|
) # Limit to 4 to avoid overwhelming the system
|
|
print(f"Parallel processor initialized with {self.max_workers} workers")
|
|
|
|
def process_folder(self, folder_path="./data_ppt", output_dir="./output"):
|
|
"""Process all PowerPoint files in parallel"""
|
|
start_time = time.time()
|
|
|
|
# Create output directory
|
|
os.makedirs(output_dir, exist_ok=True)
|
|
|
|
# Find all PowerPoint files
|
|
if not os.path.exists(folder_path):
|
|
print(f"Error: Directory {folder_path} does not exist!")
|
|
return
|
|
|
|
ppt_files = [
|
|
f for f in os.listdir(folder_path) if f.endswith((".pptx", ".ppt"))
|
|
]
|
|
|
|
if not ppt_files:
|
|
print(f"No PowerPoint files found in {folder_path}")
|
|
return
|
|
|
|
print(f"Found {len(ppt_files)} PowerPoint files to process...")
|
|
print(f"Processing with {self.max_workers} parallel workers...")
|
|
|
|
# Prepare file information for parallel processing
|
|
file_infos = [
|
|
(os.path.join(folder_path, filename), output_dir, self.languages)
|
|
for filename in ppt_files
|
|
]
|
|
|
|
# Process files in parallel
|
|
results = []
|
|
successful_results = []
|
|
failed_results = []
|
|
|
|
with ProcessPoolExecutor(max_workers=self.max_workers) as executor:
|
|
# Submit all tasks
|
|
future_to_file = {
|
|
executor.submit(process_single_powerpoint, file_info): file_info[0]
|
|
for file_info in file_infos
|
|
}
|
|
|
|
# Collect results as they complete
|
|
for future in as_completed(future_to_file):
|
|
file_path = future_to_file[future]
|
|
try:
|
|
result = future.result()
|
|
results.append(result)
|
|
|
|
if result["success"]:
|
|
successful_results.append(result)
|
|
else:
|
|
failed_results.append(result)
|
|
|
|
except Exception as e:
|
|
print(f"✗ Exception processing {os.path.basename(file_path)}: {e}")
|
|
failed_results.append(
|
|
{
|
|
"success": False,
|
|
"filename": os.path.basename(file_path),
|
|
"error": str(e),
|
|
}
|
|
)
|
|
|
|
# Combine all successful results
|
|
if successful_results:
|
|
self.combine_results(successful_results, output_dir)
|
|
|
|
# Print summary
|
|
end_time = time.time()
|
|
self.print_summary(successful_results, failed_results, end_time - start_time)
|
|
|
|
return results
|
|
|
|
def combine_results(self, successful_results, output_dir):
|
|
"""Combine all individual results into master files"""
|
|
print("\nCombining individual results into master files...")
|
|
|
|
# Combine JSON data
|
|
combined_data = [result["data"] for result in successful_results]
|
|
|
|
# Save combined JSON
|
|
combined_json_path = os.path.join(output_dir, "combined_all_presentations.json")
|
|
with open(combined_json_path, "w", encoding="utf-8") as f:
|
|
json.dump(combined_data, f, indent=2, ensure_ascii=False)
|
|
|
|
# Save combined readable text
|
|
combined_txt_path = os.path.join(output_dir, "combined_all_presentations.txt")
|
|
extractor = PowerPointExtractor(
|
|
self.languages
|
|
) # Create instance for the method
|
|
extractor.save_readable_text(combined_data, combined_txt_path)
|
|
|
|
print(f"✓ Combined results saved:")
|
|
print(f" - JSON: {combined_json_path}")
|
|
print(f" - TXT: {combined_txt_path}")
|
|
|
|
def print_summary(self, successful_results, failed_results, total_time):
|
|
"""Print processing summary"""
|
|
total_files = len(successful_results) + len(failed_results)
|
|
total_slides = sum(
|
|
result["data"]["total_slides"] for result in successful_results
|
|
)
|
|
|
|
print("\n" + "=" * 60)
|
|
print("PARALLEL PROCESSING SUMMARY")
|
|
print("=" * 60)
|
|
print(f"Total files processed: {total_files}")
|
|
print(f"Successful: {len(successful_results)}")
|
|
print(f"Failed: {len(failed_results)}")
|
|
print(f"Total slides processed: {total_slides}")
|
|
print(f"Total processing time: {total_time:.2f} seconds")
|
|
print(f"Average time per file: {total_time/total_files:.2f} seconds")
|
|
|
|
if failed_results:
|
|
print("\nFailed files:")
|
|
for result in failed_results:
|
|
print(f" ✗ {result['filename']}: {result['error']}")
|
|
|
|
print("=" * 60)
|
|
|
|
|
|
def main():
|
|
"""Main function to run parallel processing"""
|
|
# Initialize parallel processor
|
|
processor = ParallelPowerPointProcessor(
|
|
languages=["en", "bg"], # Add Bulgarian for your medical presentations
|
|
max_workers=8, # Adjust based on your system capabilities
|
|
)
|
|
|
|
# Process all PowerPoints in parallel
|
|
results = processor.process_folder(folder_path="./data_test", output_dir="./output")
|
|
|
|
print("\n🎉 All processing completed!")
|
|
print("Check the './output' directory for:")
|
|
print(" - Individual JSON files for each presentation")
|
|
print(" - Individual TXT files for each presentation")
|
|
print(" - Combined master files with all presentations")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|