update
This commit is contained in:
325
ppt_data/convert1.py
Normal file
325
ppt_data/convert1.py
Normal file
@@ -0,0 +1,325 @@
|
||||
from pptx import Presentation
|
||||
from pptx.enum.shapes import MSO_SHAPE_TYPE
|
||||
import easyocr
|
||||
from PIL import Image
|
||||
import io
|
||||
import os
|
||||
import json
|
||||
import numpy as np
|
||||
from concurrent.futures import ProcessPoolExecutor, as_completed
|
||||
from multiprocessing import cpu_count
|
||||
import time
|
||||
from functools import partial
|
||||
|
||||
|
||||
class PowerPointExtractor:
|
||||
def __init__(self, languages=["en", "bg"]):
|
||||
"""Initialize EasyOCR reader with specified languages"""
|
||||
print("Initializing EasyOCR reader...")
|
||||
self.reader = easyocr.Reader(languages)
|
||||
print("EasyOCR reader initialized successfully!")
|
||||
|
||||
def extract_text_from_slide(self, slide):
|
||||
"""Extract regular text content from a slide"""
|
||||
slide_text = []
|
||||
|
||||
for shape in slide.shapes:
|
||||
if hasattr(shape, "text") and shape.text.strip():
|
||||
slide_text.append(shape.text.strip())
|
||||
|
||||
return slide_text
|
||||
|
||||
def extract_images_from_slide(self, slide):
|
||||
"""Extract and OCR text from images in a slide"""
|
||||
image_texts = []
|
||||
|
||||
for shape in slide.shapes:
|
||||
if shape.shape_type == MSO_SHAPE_TYPE.PICTURE:
|
||||
try:
|
||||
# Extract image
|
||||
image = shape.image
|
||||
image_bytes = image.blob
|
||||
|
||||
# Convert to PIL Image
|
||||
pil_image = Image.open(io.BytesIO(image_bytes))
|
||||
|
||||
# Convert to RGB if needed
|
||||
if pil_image.mode != "RGB":
|
||||
pil_image = pil_image.convert("RGB")
|
||||
|
||||
# Convert to numpy array
|
||||
image_array = np.array(pil_image)
|
||||
|
||||
# OCR the image using EasyOCR
|
||||
results = self.reader.readtext(image_array)
|
||||
|
||||
# Extract text from results
|
||||
extracted_text = []
|
||||
confidence_scores = []
|
||||
for bbox, text, confidence in results:
|
||||
if confidence > 0.5: # Filter out low-confidence results
|
||||
extracted_text.append(text.strip())
|
||||
confidence_scores.append(confidence)
|
||||
|
||||
if extracted_text:
|
||||
image_texts.append(
|
||||
{
|
||||
"text": extracted_text,
|
||||
"confidence_scores": confidence_scores,
|
||||
}
|
||||
)
|
||||
|
||||
except Exception as e:
|
||||
print(f"Error processing image: {e}")
|
||||
continue
|
||||
|
||||
return image_texts
|
||||
|
||||
def process_powerpoint(self, file_path, output_dir="./output"):
|
||||
"""Process a single PowerPoint file and save individual files"""
|
||||
try:
|
||||
filename = os.path.basename(file_path)
|
||||
file_stem = os.path.splitext(filename)[0]
|
||||
|
||||
print(f"Processing: {filename}")
|
||||
|
||||
prs = Presentation(file_path)
|
||||
presentation_data = {
|
||||
"filename": filename,
|
||||
"total_slides": len(prs.slides),
|
||||
"slides": [],
|
||||
}
|
||||
|
||||
for slide_num, slide in enumerate(prs.slides, 1):
|
||||
# Extract regular text
|
||||
regular_text = self.extract_text_from_slide(slide)
|
||||
|
||||
# Extract text from images
|
||||
image_content = self.extract_images_from_slide(slide)
|
||||
|
||||
slide_data = {
|
||||
"slide_number": slide_num,
|
||||
"text_content": regular_text,
|
||||
"image_content": image_content,
|
||||
"has_text": len(regular_text) > 0,
|
||||
"has_images_with_text": len(image_content) > 0,
|
||||
}
|
||||
|
||||
presentation_data["slides"].append(slide_data)
|
||||
|
||||
# Save individual JSON file
|
||||
json_output = os.path.join(output_dir, f"{file_stem}.json")
|
||||
with open(json_output, "w", encoding="utf-8") as f:
|
||||
json.dump(presentation_data, f, indent=2, ensure_ascii=False)
|
||||
|
||||
# Save individual readable text file
|
||||
txt_output = os.path.join(output_dir, f"{file_stem}.txt")
|
||||
self.save_readable_text([presentation_data], txt_output)
|
||||
|
||||
print(f"✓ Completed: {filename} -> {file_stem}.json + {file_stem}.txt")
|
||||
|
||||
return {
|
||||
"success": True,
|
||||
"filename": filename,
|
||||
"data": presentation_data,
|
||||
"json_file": json_output,
|
||||
"txt_file": txt_output,
|
||||
}
|
||||
|
||||
except Exception as e:
|
||||
print(f"✗ Error processing {file_path}: {e}")
|
||||
return {
|
||||
"success": False,
|
||||
"filename": os.path.basename(file_path),
|
||||
"error": str(e),
|
||||
}
|
||||
|
||||
def save_readable_text(self, data, output_file):
|
||||
"""Save a human-readable version of the extracted content"""
|
||||
with open(output_file, "w", encoding="utf-8") as f:
|
||||
for presentation in data:
|
||||
f.write(f"{'='*80}\n")
|
||||
f.write(f"FILE: {presentation['filename']}\n")
|
||||
f.write(f"TOTAL SLIDES: {presentation['total_slides']}\n")
|
||||
f.write(f"{'='*80}\n")
|
||||
|
||||
for slide in presentation["slides"]:
|
||||
f.write(f"\n--- SLIDE {slide['slide_number']} ---\n")
|
||||
|
||||
# Regular text content
|
||||
if slide["text_content"]:
|
||||
f.write("TEXT CONTENT:\n")
|
||||
for text in slide["text_content"]:
|
||||
f.write(f"• {text}\n")
|
||||
|
||||
# Image text content
|
||||
if slide["image_content"]:
|
||||
f.write("\nTEXT FROM IMAGES:\n")
|
||||
for img_idx, img_data in enumerate(slide["image_content"], 1):
|
||||
f.write(f"Image {img_idx}:\n")
|
||||
for text in img_data["text"]:
|
||||
f.write(f" • {text}\n")
|
||||
|
||||
f.write("\n")
|
||||
|
||||
|
||||
# Global function for parallel processing (needed for multiprocessing)
|
||||
def process_single_powerpoint(file_info):
|
||||
"""Process a single PowerPoint file - used for parallel processing"""
|
||||
file_path, output_dir, languages = file_info
|
||||
|
||||
# Create a new extractor instance for each process
|
||||
extractor = PowerPointExtractor(languages=languages)
|
||||
return extractor.process_powerpoint(file_path, output_dir)
|
||||
|
||||
|
||||
class ParallelPowerPointProcessor:
|
||||
def __init__(self, languages=["en", "bg"], max_workers=None):
|
||||
"""Initialize parallel processor"""
|
||||
self.languages = languages
|
||||
self.max_workers = max_workers or min(
|
||||
cpu_count(), 4
|
||||
) # Limit to 4 to avoid overwhelming the system
|
||||
print(f"Parallel processor initialized with {self.max_workers} workers")
|
||||
|
||||
def process_folder(self, folder_path="./data_ppt", output_dir="./output"):
|
||||
"""Process all PowerPoint files in parallel"""
|
||||
start_time = time.time()
|
||||
|
||||
# Create output directory
|
||||
os.makedirs(output_dir, exist_ok=True)
|
||||
|
||||
# Find all PowerPoint files
|
||||
if not os.path.exists(folder_path):
|
||||
print(f"Error: Directory {folder_path} does not exist!")
|
||||
return
|
||||
|
||||
ppt_files = [
|
||||
f for f in os.listdir(folder_path) if f.endswith((".pptx", ".ppt"))
|
||||
]
|
||||
|
||||
if not ppt_files:
|
||||
print(f"No PowerPoint files found in {folder_path}")
|
||||
return
|
||||
|
||||
print(f"Found {len(ppt_files)} PowerPoint files to process...")
|
||||
print(f"Processing with {self.max_workers} parallel workers...")
|
||||
|
||||
# Prepare file information for parallel processing
|
||||
file_infos = [
|
||||
(os.path.join(folder_path, filename), output_dir, self.languages)
|
||||
for filename in ppt_files
|
||||
]
|
||||
|
||||
# Process files in parallel
|
||||
results = []
|
||||
successful_results = []
|
||||
failed_results = []
|
||||
|
||||
with ProcessPoolExecutor(max_workers=self.max_workers) as executor:
|
||||
# Submit all tasks
|
||||
future_to_file = {
|
||||
executor.submit(process_single_powerpoint, file_info): file_info[0]
|
||||
for file_info in file_infos
|
||||
}
|
||||
|
||||
# Collect results as they complete
|
||||
for future in as_completed(future_to_file):
|
||||
file_path = future_to_file[future]
|
||||
try:
|
||||
result = future.result()
|
||||
results.append(result)
|
||||
|
||||
if result["success"]:
|
||||
successful_results.append(result)
|
||||
else:
|
||||
failed_results.append(result)
|
||||
|
||||
except Exception as e:
|
||||
print(f"✗ Exception processing {os.path.basename(file_path)}: {e}")
|
||||
failed_results.append(
|
||||
{
|
||||
"success": False,
|
||||
"filename": os.path.basename(file_path),
|
||||
"error": str(e),
|
||||
}
|
||||
)
|
||||
|
||||
# Combine all successful results
|
||||
if successful_results:
|
||||
self.combine_results(successful_results, output_dir)
|
||||
|
||||
# Print summary
|
||||
end_time = time.time()
|
||||
self.print_summary(successful_results, failed_results, end_time - start_time)
|
||||
|
||||
return results
|
||||
|
||||
def combine_results(self, successful_results, output_dir):
|
||||
"""Combine all individual results into master files"""
|
||||
print("\nCombining individual results into master files...")
|
||||
|
||||
# Combine JSON data
|
||||
combined_data = [result["data"] for result in successful_results]
|
||||
|
||||
# Save combined JSON
|
||||
combined_json_path = os.path.join(output_dir, "combined_all_presentations.json")
|
||||
with open(combined_json_path, "w", encoding="utf-8") as f:
|
||||
json.dump(combined_data, f, indent=2, ensure_ascii=False)
|
||||
|
||||
# Save combined readable text
|
||||
combined_txt_path = os.path.join(output_dir, "combined_all_presentations.txt")
|
||||
extractor = PowerPointExtractor(
|
||||
self.languages
|
||||
) # Create instance for the method
|
||||
extractor.save_readable_text(combined_data, combined_txt_path)
|
||||
|
||||
print(f"✓ Combined results saved:")
|
||||
print(f" - JSON: {combined_json_path}")
|
||||
print(f" - TXT: {combined_txt_path}")
|
||||
|
||||
def print_summary(self, successful_results, failed_results, total_time):
|
||||
"""Print processing summary"""
|
||||
total_files = len(successful_results) + len(failed_results)
|
||||
total_slides = sum(
|
||||
result["data"]["total_slides"] for result in successful_results
|
||||
)
|
||||
|
||||
print("\n" + "=" * 60)
|
||||
print("PARALLEL PROCESSING SUMMARY")
|
||||
print("=" * 60)
|
||||
print(f"Total files processed: {total_files}")
|
||||
print(f"Successful: {len(successful_results)}")
|
||||
print(f"Failed: {len(failed_results)}")
|
||||
print(f"Total slides processed: {total_slides}")
|
||||
print(f"Total processing time: {total_time:.2f} seconds")
|
||||
print(f"Average time per file: {total_time/total_files:.2f} seconds")
|
||||
|
||||
if failed_results:
|
||||
print("\nFailed files:")
|
||||
for result in failed_results:
|
||||
print(f" ✗ {result['filename']}: {result['error']}")
|
||||
|
||||
print("=" * 60)
|
||||
|
||||
|
||||
def main():
|
||||
"""Main function to run parallel processing"""
|
||||
# Initialize parallel processor
|
||||
processor = ParallelPowerPointProcessor(
|
||||
languages=["en", "bg"], # Add Bulgarian for your medical presentations
|
||||
max_workers=8, # Adjust based on your system capabilities
|
||||
)
|
||||
|
||||
# Process all PowerPoints in parallel
|
||||
results = processor.process_folder(folder_path="./data_test", output_dir="./output")
|
||||
|
||||
print("\n🎉 All processing completed!")
|
||||
print("Check the './output' directory for:")
|
||||
print(" - Individual JSON files for each presentation")
|
||||
print(" - Individual TXT files for each presentation")
|
||||
print(" - Combined master files with all presentations")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
Reference in New Issue
Block a user