314 lines
11 KiB
Python
314 lines
11 KiB
Python
#!/usr/bin/env python3
|
|
import os
|
|
import re
|
|
import zipfile
|
|
import subprocess
|
|
import xml.etree.ElementTree as ET
|
|
import shutil
|
|
import json
|
|
from pathlib import Path
|
|
import difflib
|
|
import hashlib
|
|
|
|
|
|
def extract_first_line_docx(docx_file):
|
|
"""Extract the first line of text from a docx file using built-in libraries."""
|
|
try:
|
|
# Open the docx file as a zip archive
|
|
with zipfile.ZipFile(docx_file) as zip_ref:
|
|
# Extract document.xml which contains the main content
|
|
xml_content = zip_ref.read("word/document.xml")
|
|
|
|
# Parse the XML
|
|
root = ET.fromstring(xml_content)
|
|
|
|
# Define namespace
|
|
namespaces = {
|
|
"w": "http://schemas.openxmlformats.org/wordprocessingml/2006/main"
|
|
}
|
|
|
|
# Find all text elements
|
|
text_elements = root.findall(".//w:t", namespaces)
|
|
|
|
# Concatenate text from first paragraph with content
|
|
paragraph_text = ""
|
|
for element in text_elements:
|
|
text = element.text
|
|
if text and text.strip():
|
|
paragraph_text += text
|
|
# If we have some text and hit a line break or period, that's good enough for a first line
|
|
if "\n" in paragraph_text or "." in paragraph_text:
|
|
break
|
|
|
|
# Clean and return the first line if found
|
|
if paragraph_text.strip():
|
|
return paragraph_text.strip()
|
|
|
|
return "NoTextFound"
|
|
|
|
except Exception as e:
|
|
print(f"Error reading {docx_file}: {e}")
|
|
return "ErrorReadingFile"
|
|
|
|
|
|
def extract_first_line_doc(doc_file):
|
|
"""Extract the first line of text from a doc file using antiword if available, otherwise return a placeholder."""
|
|
try:
|
|
# Check if antiword is installed
|
|
result = subprocess.run(
|
|
["which", "antiword"], stdout=subprocess.PIPE, stderr=subprocess.PIPE
|
|
)
|
|
|
|
if result.returncode == 0:
|
|
# Use antiword to extract text
|
|
result = subprocess.run(
|
|
["antiword", str(doc_file)],
|
|
stdout=subprocess.PIPE,
|
|
stderr=subprocess.PIPE,
|
|
text=True,
|
|
)
|
|
if result.returncode == 0:
|
|
text = result.stdout
|
|
# Get the first non-empty line
|
|
for line in text.split("\n"):
|
|
if line.strip():
|
|
return line.strip()
|
|
else:
|
|
print(f"Antiword error: {result.stderr}")
|
|
else:
|
|
# Try catdoc if available
|
|
result = subprocess.run(
|
|
["which", "catdoc"], stdout=subprocess.PIPE, stderr=subprocess.PIPE
|
|
)
|
|
if result.returncode == 0:
|
|
result = subprocess.run(
|
|
["catdoc", str(doc_file)],
|
|
stdout=subprocess.PIPE,
|
|
stderr=subprocess.PIPE,
|
|
text=True,
|
|
)
|
|
if result.returncode == 0:
|
|
text = result.stdout
|
|
# Get the first non-empty line
|
|
for line in text.split("\n"):
|
|
if line.strip():
|
|
return line.strip()
|
|
else:
|
|
print(f"Catdoc error: {result.stderr}")
|
|
else:
|
|
print(
|
|
"Neither antiword nor catdoc is installed. Cannot extract text from .doc files."
|
|
)
|
|
return f"Doc_FileNeedsConversion_{Path(doc_file).stem}"
|
|
|
|
return "NoTextFound"
|
|
|
|
except Exception as e:
|
|
print(f"Error reading {doc_file}: {e}")
|
|
return "ErrorReadingFile"
|
|
|
|
|
|
def clean_title(title):
|
|
"""Clean title by removing various prefixes and formatting."""
|
|
# Remove leading numbers and dots (like "15 ", "3.", "8", etc.)
|
|
title = re.sub(r"^\d+\.?\s*", "", title)
|
|
|
|
# More aggressive pattern to match different variations of "Тема: N" or "Тема N"
|
|
title = re.sub(r"^[Тт]ема[\s:]*\d+\s*", "", title)
|
|
|
|
# Remove "copy" at the end
|
|
title = re.sub(r"\s+copy$", "", title)
|
|
|
|
# Remove extra spaces
|
|
title = re.sub(r"\s+", " ", title).strip()
|
|
|
|
return title
|
|
|
|
|
|
def calculate_file_hash(filepath):
|
|
"""Calculate MD5 hash of a file to compare content."""
|
|
hasher = hashlib.md5()
|
|
with open(filepath, "rb") as f:
|
|
buf = f.read(65536) # Read in 64k chunks
|
|
while len(buf) > 0:
|
|
hasher.update(buf)
|
|
buf = f.read(65536)
|
|
return hasher.hexdigest()
|
|
|
|
|
|
def find_similar_files(files):
|
|
"""Group similar files to detect originals and copies."""
|
|
# First pass: group by filename similarity
|
|
filename_groups = {}
|
|
|
|
for file_path in files:
|
|
filename = file_path.name
|
|
# Remove "copy" for comparison
|
|
base_name = re.sub(r"\s+copy\.[^.]+$", "", filename)
|
|
base_name = re.sub(r"\s+copy$", "", base_name)
|
|
|
|
if base_name not in filename_groups:
|
|
filename_groups[base_name] = []
|
|
filename_groups[base_name].append(file_path)
|
|
|
|
# Filter groups to keep only those with multiple files
|
|
duplicate_groups = {k: v for k, v in filename_groups.items() if len(v) > 1}
|
|
|
|
# Second pass: verify content similarity using hashes
|
|
verified_duplicates = {}
|
|
|
|
for base_name, file_paths in duplicate_groups.items():
|
|
hashes = {}
|
|
for file_path in file_paths:
|
|
file_hash = calculate_file_hash(file_path)
|
|
if file_hash not in hashes:
|
|
hashes[file_hash] = []
|
|
hashes[file_hash].append(file_path)
|
|
|
|
# Keep groups with identical content
|
|
identical_files = [
|
|
files for hash_val, files in hashes.items() if len(files) > 1
|
|
]
|
|
if identical_files:
|
|
verified_duplicates[base_name] = identical_files
|
|
|
|
# Also track files with the same name but different content
|
|
if len(hashes) > 1:
|
|
print(
|
|
f"Warning: Files with similar names but different content: {file_paths}"
|
|
)
|
|
|
|
return verified_duplicates, duplicate_groups
|
|
|
|
|
|
def main():
|
|
# Directory containing the doc/docx files
|
|
data_dir = "./data"
|
|
|
|
# Create backup and duplicates directories
|
|
backup_dir = os.path.join(data_dir, "docs_backup")
|
|
duplicates_dir = os.path.join(data_dir, "duplicates")
|
|
os.makedirs(backup_dir, exist_ok=True)
|
|
os.makedirs(duplicates_dir, exist_ok=True)
|
|
|
|
# Get all doc and docx files in the data directory
|
|
docs_path = Path(data_dir)
|
|
doc_files = list(docs_path.glob("*.doc"))
|
|
docx_files = list(docs_path.glob("*.docx"))
|
|
all_files = doc_files + docx_files
|
|
|
|
if not all_files:
|
|
print(f"No doc/docx files found in {data_dir}")
|
|
return
|
|
|
|
print(f"Found {len(all_files)} documents")
|
|
|
|
# First, let's handle duplicates
|
|
print("\nChecking for duplicate files...")
|
|
verified_duplicates, potential_duplicates = find_similar_files(all_files)
|
|
|
|
# Move duplicate files to the duplicates directory
|
|
files_to_remove = []
|
|
for base_name, duplicate_groups in verified_duplicates.items():
|
|
for duplicate_group in duplicate_groups:
|
|
# Keep the first file, move others to duplicates directory
|
|
keeper = duplicate_group[0]
|
|
for dupe in duplicate_group[1:]:
|
|
print(
|
|
f"Moving duplicate: {dupe.name} → duplicates/ (identical to {keeper.name})"
|
|
)
|
|
shutil.move(dupe, os.path.join(duplicates_dir, dupe.name))
|
|
files_to_remove.append(dupe)
|
|
|
|
# For potential duplicates with "copy" in the name
|
|
for base_name, file_paths in potential_duplicates.items():
|
|
# Check if any have "copy" in the name
|
|
copy_files = [f for f in file_paths if "copy" in f.name.lower()]
|
|
non_copy_files = [f for f in file_paths if "copy" not in f.name.lower()]
|
|
|
|
# If we have both originals and copies
|
|
if copy_files and non_copy_files:
|
|
for copy_file in copy_files:
|
|
# Check if we already moved this as a verified duplicate
|
|
if copy_file in files_to_remove:
|
|
continue
|
|
|
|
print(
|
|
f"Moving potential duplicate: {copy_file.name} → duplicates/ (has 'copy' in name)"
|
|
)
|
|
shutil.move(copy_file, os.path.join(duplicates_dir, copy_file.name))
|
|
files_to_remove.append(copy_file)
|
|
|
|
# Remove duplicates from the processing list
|
|
for file_path in files_to_remove:
|
|
if file_path in all_files:
|
|
all_files.remove(file_path)
|
|
|
|
# Now let's process the remaining files
|
|
print(f"\nProcessing {len(all_files)} unique files...")
|
|
|
|
# Sort files by name for consistent processing
|
|
all_files.sort(key=lambda x: x.name)
|
|
|
|
# Dictionary to store full titles
|
|
title_map = {}
|
|
|
|
# Process each file
|
|
for counter, filepath in enumerate(all_files, 1):
|
|
filename = filepath.name
|
|
|
|
# Skip files in the backup/duplicates directories
|
|
if any(d in str(filepath) for d in ["docs_backup", "duplicates"]):
|
|
continue
|
|
|
|
print(f"Processing {filename}...")
|
|
|
|
# Get first line of content based on file type
|
|
if filepath.suffix.lower() == ".docx":
|
|
first_line = extract_first_line_docx(filepath)
|
|
else: # .doc
|
|
first_line = extract_first_line_doc(filepath)
|
|
|
|
# Clean the title
|
|
clean_full_title = clean_title(first_line)
|
|
|
|
# Format counter with leading zeros
|
|
counter_str = f"{counter:03d}"
|
|
|
|
# Store the full title in the dictionary with counter as key
|
|
title_map[counter_str] = clean_full_title
|
|
|
|
# Create new filename with requested format: "TXXX.extension"
|
|
new_filename = f"T{counter_str}{filepath.suffix}"
|
|
new_filepath = filepath.parent / new_filename
|
|
|
|
# Create a backup
|
|
backup_path = Path(backup_dir) / filename
|
|
shutil.copy2(filepath, backup_path)
|
|
|
|
print(f"Renaming: {filename} → {new_filename}")
|
|
|
|
# Rename the file
|
|
filepath.rename(new_filepath)
|
|
|
|
# Save the full titles to a JSON file
|
|
json_path = os.path.join(data_dir, "document_titles.json")
|
|
with open(json_path, "w", encoding="utf-8") as json_file:
|
|
json.dump(title_map, json_file, ensure_ascii=False, indent=2)
|
|
|
|
print(f"\nAll files renamed. Backups stored in '{backup_dir}' directory.")
|
|
print(f"Duplicate files moved to '{duplicates_dir}' directory.")
|
|
print(f"Full titles saved to '{json_path}'")
|
|
|
|
# Print installation instructions if needed
|
|
if doc_files:
|
|
print(
|
|
"\nNOTE: To process .doc files, you may need to install one of these tools:"
|
|
)
|
|
print(" - antiword: sudo apt-get install antiword")
|
|
print(" - catdoc: sudo apt-get install catdoc")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|