From 4e867719644aa9526f9fec83856c338ec38e1b50 Mon Sep 17 00:00:00 2001 From: Sergio Date: Thu, 19 Feb 2026 08:36:06 +0100 Subject: [PATCH] refactor --- .gitignore | 1 + core/archive.py | 57 +++++++++++++++++++++++ core/backup.py | 6 ++- core/collision.py | 60 ++++++++++++++++++++++++ core/pipeline.py | 94 ++++++++++++++++++++++++++++++++++++++ core/result.py | 47 +++++++++++++++++++ main.py | 36 +++++---------- processors/cleaner.py | 80 ++++++-------------------------- processors/converter.py | 77 ++++++++++++------------------- processors/standardizer.py | 44 ------------------ processors/validator.py | 45 ++++++++++-------- 11 files changed, 344 insertions(+), 203 deletions(-) create mode 100644 core/archive.py create mode 100644 core/collision.py create mode 100644 core/pipeline.py create mode 100644 core/result.py delete mode 100644 processors/standardizer.py diff --git a/.gitignore b/.gitignore index 8b87ad3..d258ce6 100644 --- a/.gitignore +++ b/.gitignore @@ -1,4 +1,5 @@ backup/ +CLAUDE.md # ---> Python # Byte-compiled / optimized / DLL files diff --git a/core/archive.py b/core/archive.py new file mode 100644 index 0000000..06ef6d5 --- /dev/null +++ b/core/archive.py @@ -0,0 +1,57 @@ +# core/archive.py + +import os +import zipfile +import rarfile +import shutil + + +class ArchiveError(Exception): + pass + + +def detect_real_format(path: str) -> str | None: + """Devuelve 'zip', 'rar' o None según los magic bytes del archivo.""" + try: + zipfile.ZipFile(path).close() + return "zip" + except Exception: + pass + + try: + rarfile.RarFile(path).close() + return "rar" + except Exception: + pass + + return None + + +def open_archive(path: str): + """Devuelve un ZipFile o RarFile abierto en modo lectura.""" + fmt = detect_real_format(path) + if fmt == "zip": + return zipfile.ZipFile(path, "r") + if fmt == "rar": + return rarfile.RarFile(path, "r") + raise ArchiveError(f"Formato desconocido o archivo corrupto: {path}") + + +def extract_archive(path: str, dest_dir: str) -> str: + """Extrae el archivo en dest_dir. Devuelve dest_dir.""" + archive = open_archive(path) + try: + archive.extractall(dest_dir) + finally: + archive.close() + return dest_dir + + +def repack_as_cbz(source_dir: str, target_path: str) -> None: + """Empaqueta todos los archivos de source_dir en un CBZ (ZIP deflated).""" + with zipfile.ZipFile(target_path, "w", zipfile.ZIP_DEFLATED) as zf: + for root, _, files in os.walk(source_dir): + for f in files: + full = os.path.join(root, f) + rel = os.path.relpath(full, source_dir) + zf.write(full, rel) diff --git a/core/backup.py b/core/backup.py index d078788..8f18633 100644 --- a/core/backup.py +++ b/core/backup.py @@ -4,12 +4,14 @@ import os import shutil from core.paths import get_project_root + def move_to_backup(path): - """Mueve un archivo al directorio centralizado /backup.""" + """Mueve un archivo al directorio centralizado /backup sin sobrescribir.""" + from core.collision import safe_backup_name root = get_project_root() backup_dir = os.path.join(root, "backup") os.makedirs(backup_dir, exist_ok=True) - target = os.path.join(backup_dir, os.path.basename(path)) + target = safe_backup_name(path) shutil.move(path, target) return target diff --git a/core/collision.py b/core/collision.py new file mode 100644 index 0000000..2f3d574 --- /dev/null +++ b/core/collision.py @@ -0,0 +1,60 @@ +# core/collision.py + +import os +from core.paths import get_project_root + + +class CollisionPolicy: + ABORT = "abort" # comportamiento estricto actual + BACKUP = "backup" # mueve el existente a backup/ y sigue + RENAME = "rename" # añade sufijo _1, _2... hasta nombre libre + + +def safe_backup_name(path: str) -> str: + """ + Devuelve una ruta sin colisión dentro de backup/. + Añade sufijo _1, _2... si el nombre base ya existe. + """ + root = get_project_root() + backup_dir = os.path.join(root, "backup") + base = os.path.basename(path) + name, ext = os.path.splitext(base) + candidate = os.path.join(backup_dir, base) + counter = 1 + while os.path.exists(candidate): + candidate = os.path.join(backup_dir, f"{name}_{counter}{ext}") + counter += 1 + return candidate + + +def resolve_collision(target_path: str, policy: str = CollisionPolicy.ABORT) -> str: + """ + Comprueba si target_path existe y aplica la política: + ABORT → lanza FileExistsError + BACKUP → mueve el existente a backup/ y devuelve target_path + RENAME → devuelve una ruta alternativa libre (target_1.cbz, ...) + Devuelve la ruta segura donde escribir. + """ + if not os.path.exists(target_path): + return target_path + + if policy == CollisionPolicy.ABORT: + raise FileExistsError( + f"El archivo destino ya existe y no se sobrescribirá: {target_path}" + ) + + if policy == CollisionPolicy.BACKUP: + from core.backup import move_to_backup + move_to_backup(target_path) + return target_path + + if policy == CollisionPolicy.RENAME: + base, ext = os.path.splitext(target_path) + counter = 1 + candidate = f"{base}_{counter}{ext}" + while os.path.exists(candidate): + counter += 1 + candidate = f"{base}_{counter}{ext}" + return candidate + + raise ValueError(f"Política de colisión desconocida: {policy}") diff --git a/core/pipeline.py b/core/pipeline.py new file mode 100644 index 0000000..23e6364 --- /dev/null +++ b/core/pipeline.py @@ -0,0 +1,94 @@ +# core/pipeline.py + +import os +import tempfile +import shutil + +from core.archive import detect_real_format, extract_archive, repack_as_cbz, ArchiveError +from core.collision import CollisionPolicy, resolve_collision +from core.result import ComicResult, StepResult +from processors.validator import validate_archive +from processors.cleaner import clean_directory +from processors.converter import needs_conversion, conversion_step_result + + +class Pipeline: + def __init__( + self, + steps: list, + desired_format: str = "cbz", + collision_policy: str = CollisionPolicy.ABORT, + dry_run: bool = False, + ): + self.steps = steps + self.desired_format = desired_format + self.collision_policy = collision_policy + self.dry_run = dry_run + + def run(self, path: str) -> ComicResult: + step_results = [] + + # 1. Validar siempre, antes de extraer + val = validate_archive(path) + step_results.append(val) + if val.errors: + return ComicResult(original_path=path, final_path=None, steps=step_results) + + real_format = detect_real_format(path) + + # 2. Extraer una sola vez + temp_dir = tempfile.mkdtemp() + try: + extract_archive(path, temp_dir) + + # 3. Aplicar cada step sobre el directorio temporal + any_changed = False + + if "clean" in self.steps: + clean_result = clean_directory(temp_dir) + step_results.append(clean_result) + if clean_result.changed: + any_changed = True + + if "convert" in self.steps: + conv_result = conversion_step_result(real_format, self.desired_format) + step_results.append(conv_result) + if conv_result.errors: + return ComicResult( + original_path=path, final_path=None, steps=step_results + ) + if conv_result.changed: + any_changed = True + + # 4. Reempaquetar si hubo cambios o conversión de formato + needs_repack = any_changed or ( + "convert" in self.steps + and needs_conversion(real_format, self.desired_format) + ) + + if not needs_repack: + return ComicResult( + original_path=path, final_path=path, steps=step_results + ) + + base, _ = os.path.splitext(path) + target_path = f"{base}.{self.desired_format}" + + if not self.dry_run: + safe_target = resolve_collision(target_path, self.collision_policy) + repack_as_cbz(temp_dir, safe_target) + # Eliminar original si el nombre cambió + if safe_target != path and os.path.exists(path): + os.remove(path) + else: + safe_target = target_path + + except (ArchiveError, FileExistsError, OSError) as exc: + step_results.append( + StepResult(step="repack", changed=False, errors=[str(exc)]) + ) + return ComicResult(original_path=path, final_path=None, steps=step_results) + finally: + shutil.rmtree(temp_dir, ignore_errors=True) + + return ComicResult(original_path=path, final_path=safe_target, steps=step_results) diff --git a/core/result.py b/core/result.py new file mode 100644 index 0000000..73b98d8 --- /dev/null +++ b/core/result.py @@ -0,0 +1,47 @@ +# core/result.py + +from dataclasses import dataclass, field + + +@dataclass +class StepResult: + step: str # "validate" | "clean" | "convert" | ... + changed: bool + details: list = field(default_factory=list) # e.g. ["Eliminado: thumbs.db"] + warnings: list = field(default_factory=list) + errors: list = field(default_factory=list) + + +@dataclass +class ComicResult: + original_path: str + final_path: str | None # None si se abortó + steps: list = field(default_factory=list) + + def ok(self) -> bool: + return all(not s.errors for s in self.steps) + + def summary(self) -> str: + if not self.ok(): + errors = [e for s in self.steps for e in s.errors] + return f"ERROR [{self.original_path}]: {'; '.join(errors)}" + changed_steps = [s.step for s in self.steps if s.changed] + if changed_steps: + dest = self.final_path or self.original_path + return f"OK [{self.original_path}] → {dest} ({', '.join(changed_steps)})" + return f"OK [{self.original_path}] (sin cambios)" + + def full_report(self) -> str: + lines = [f"Cómic: {self.original_path}"] + for s in self.steps: + status = "CAMBIADO" if s.changed else "sin cambios" + lines.append(f" [{s.step}] {status}") + for d in s.details: + lines.append(f" - {d}") + for w in s.warnings: + lines.append(f" AVISO: {w}") + for e in s.errors: + lines.append(f" ERROR: {e}") + if self.final_path: + lines.append(f" Resultado final: {self.final_path}") + return "\n".join(lines) diff --git a/main.py b/main.py index ba8cf97..d0b3cde 100644 --- a/main.py +++ b/main.py @@ -2,10 +2,8 @@ import argparse from core.scanner import find_comic_files +from core.pipeline import Pipeline from processors.validator import validate_comic -from processors.cleaner import clean_comic -from processors.converter import convert_comic -from processors.standardizer import standardize_comic def parse_args(): @@ -33,31 +31,21 @@ def main(): if args.validar: for f in comic_files: - print(f"Validando: {f}") - print(validate_comic(f)) + res = validate_comic(f) + print(f"{f} → {res.summary()}") print() - if args.limpiar: - for f in comic_files: - print(f"Limpieza: {f}") - print(clean_comic(f)) - print() + steps = [] + if args.limpiar or args.estandarizar: + steps.append("clean") + if args.convertir or args.estandarizar: + steps.append("convert") - if args.convertir: + if steps: + pipeline = Pipeline(steps=steps, desired_format=args.formato) for f in comic_files: - print(f"Convirtiendo: {f}") - info = convert_comic(f, args.formato) - if info["needs_conversion"]: - print(f" → Convertido a {args.formato.upper()}: {info['target_path']}") - else: - print(f" → Sin cambios (ya era {args.formato.upper()})") - print() - - if args.estandarizar: - for f in comic_files: - print(f"Estandarizando: {f}") - print(standardize_comic(f, args.formato)) - print() + result = pipeline.run(f) + print(result.summary()) if __name__ == "__main__": diff --git a/processors/cleaner.py b/processors/cleaner.py index f8721b5..1c5311b 100644 --- a/processors/cleaner.py +++ b/processors/cleaner.py @@ -1,78 +1,26 @@ # processors/cleaner.py import os -import zipfile -import rarfile -import tempfile -import shutil - from core.constants import TRASH_FILES -from processors.validator import validate_comic +from core.result import StepResult -class CleanResult: - def __init__(self, original_path): - self.original_path = original_path - self.cleaned_path = None - self.removed_files = [] - self.repacked = False - - def __str__(self): - msg = f"Limpieza de: {self.original_path}\n" - msg += f" Archivos eliminados: {len(self.removed_files)}\n" - msg += f" Resultado final: {self.cleaned_path}\n" - return msg - - -def clean_comic(path): - validation = validate_comic(path) - - if validation.errors: - raise Exception(f"Archivo corrupto: {path}") - - real = validation.real_format - - # Abrir según formato real - archive = zipfile.ZipFile(path, "r") if real == "zip" else rarfile.RarFile(path, "r") - - temp_dir = tempfile.mkdtemp() - archive.extractall(temp_dir) - archive.close() - +def clean_directory(work_dir: str) -> StepResult: + """ + Elimina TRASH_FILES del directorio ya extraído. + Sin I/O de archivo de cómic; trabaja sobre el directorio temporal. + """ removed = [] - for root, _, files in os.walk(temp_dir): + for root, _, files in os.walk(work_dir): for f in files: if f.lower() in TRASH_FILES: full = os.path.join(root, f) os.remove(full) - removed.append(os.path.relpath(full, temp_dir)) + removed.append(os.path.relpath(full, work_dir)) - # Si no se eliminó nada → no reconstruir - if not removed: - shutil.rmtree(temp_dir) - result = CleanResult(path) - result.cleaned_path = path - return result - - # Reconstruir como CBZ - base, _ = os.path.splitext(path) - final_path = base + ".cbz" - - with zipfile.ZipFile(final_path, "w", zipfile.ZIP_DEFLATED) as new_zip: - for root, _, files in os.walk(temp_dir): - for f in files: - full = os.path.join(root, f) - rel = os.path.relpath(full, temp_dir) - new_zip.write(full, rel) - - shutil.rmtree(temp_dir) - - # Mover original a backup - from core.backup import move_to_backup - move_to_backup(path) - - result = CleanResult(path) - result.cleaned_path = final_path - result.removed_files = removed - result.repacked = True - return result + details = [f"Eliminado: {r}" for r in removed] + return StepResult( + step="clean", + changed=bool(removed), + details=details, + ) diff --git a/processors/converter.py b/processors/converter.py index 79473f1..8351cc5 100644 --- a/processors/converter.py +++ b/processors/converter.py @@ -1,54 +1,37 @@ # processors/converter.py -import os -import zipfile -import rarfile -import tempfile -import shutil - -from processors.validator import validate_comic +from core.result import StepResult -def convert_comic(path, desired_format="cbz"): - validation = validate_comic(path) +def needs_conversion(real_format: str, desired_format: str) -> bool: + """True si el formato real no coincide con el deseado.""" + if desired_format == "cbz" and real_format == "zip": + return False + if desired_format == "cbr" and real_format == "rar": + return False + return True - if validation.errors: - raise Exception(f"Archivo corrupto: {path}") - real = validation.real_format +def conversion_step_result(real_format: str, desired_format: str) -> StepResult: + """ + Informa si se necesita conversión. + El pipeline decide si reempaquetar; este step solo evalúa. + """ + if desired_format == "cbr": + return StepResult( + step="convert", + changed=False, + errors=["Crear CBR requiere 'rar' instalado (no implementado)"], + ) - # Si ya está en el formato deseado → nada que hacer - if desired_format == "cbz" and real == "zip": - return {"needs_conversion": False, "target_path": path} - - if desired_format == "cbr" and real == "rar": - return {"needs_conversion": False, "target_path": path} - - # Abrir según formato real - archive = zipfile.ZipFile(path, "r") if real == "zip" else rarfile.RarFile(path, "r") - - temp_dir = tempfile.mkdtemp() - archive.extractall(temp_dir) - archive.close() - - base, _ = os.path.splitext(path) - target_path = f"{base}.{desired_format}" - - if desired_format == "cbz": - with zipfile.ZipFile(target_path, "w", zipfile.ZIP_DEFLATED) as new_zip: - for root, _, files in os.walk(temp_dir): - for f in files: - full = os.path.join(root, f) - rel = os.path.relpath(full, temp_dir) - new_zip.write(full, rel) - - elif desired_format == "cbr": - raise NotImplementedError("Crear CBR requiere 'rar' instalado") - - shutil.rmtree(temp_dir) - - # Mover original a backup - from core.backup import move_to_backup - move_to_backup(path) - - return {"needs_conversion": True, "target_path": target_path} + convert = needs_conversion(real_format, desired_format) + details = ( + [f"Conversión necesaria: {real_format} → {desired_format}"] + if convert + else [f"Sin conversión: ya es {desired_format}"] + ) + return StepResult( + step="convert", + changed=convert, + details=details, + ) diff --git a/processors/standardizer.py b/processors/standardizer.py deleted file mode 100644 index e3cb3e7..0000000 --- a/processors/standardizer.py +++ /dev/null @@ -1,44 +0,0 @@ -# processors/standardizer.py - -from processors.validator import validate_comic -from processors.cleaner import clean_comic -from processors.converter import convert_comic - - -class StandardizeResult: - def __init__(self, path): - self.path = path - self.cleaned = None - self.converted = None - self.final_path = None - - def __str__(self): - msg = f"Estandarización de: {self.path}\n" - if self.cleaned: - msg += f" Limpieza: OK ({len(self.cleaned.removed_files)} eliminados)\n" - if self.converted: - if self.converted["needs_conversion"]: - msg += f" Conversión: OK → {self.converted['target_path']}\n" - else: - msg += " Conversión: no necesaria\n" - msg += f" Resultado final: {self.final_path}\n" - return msg - - -def standardize_comic(path, desired_format="cbz"): - validation = validate_comic(path) - - if validation.errors: - raise Exception(f"Archivo corrupto: {path}") - - result = StandardizeResult(path) - - clean_result = clean_comic(path) - result.cleaned = clean_result - - convert_result = convert_comic(clean_result.cleaned_path, desired_format) - result.converted = convert_result - - result.final_path = convert_result["target_path"] if convert_result["needs_conversion"] else clean_result.cleaned_path - - return result diff --git a/processors/validator.py b/processors/validator.py index c27c76c..8fad737 100644 --- a/processors/validator.py +++ b/processors/validator.py @@ -1,8 +1,8 @@ # processors/validator.py import os -import zipfile -import rarfile +from core.archive import detect_real_format +from core.result import StepResult class ValidationResult: @@ -13,7 +13,16 @@ class ValidationResult: self.errors = [] self.warnings = [] + def summary(self): + """Resumen compacto para mostrar en consola.""" + if self.errors: + return f"ERROR: {', '.join(self.errors)}" + if self.warnings: + return f"AVISO: {', '.join(self.warnings)}" + return "OK" + def __str__(self): + """Salida detallada (solo si el usuario la pide).""" msg = f"Validación de: {self.path}\n" msg += f" Formato real: {self.real_format}\n" msg += f" Extensión: {self.extension}\n" @@ -28,24 +37,8 @@ class ValidationResult: return msg -def detect_real_format(path): - """Devuelve 'zip', 'rar' o None.""" - try: - zipfile.ZipFile(path).close() - return "zip" - except: - pass - - try: - rarfile.RarFile(path).close() - return "rar" - except: - pass - - return None - - -def validate_comic(path): +def validate_comic(path) -> ValidationResult: + """Modo standalone: comprueba formato e integridad. Para --validar en CLI.""" result = ValidationResult(path) ext = os.path.splitext(path)[1].lower() result.extension = ext @@ -64,3 +57,15 @@ def validate_comic(path): result.warnings.append("Extensión incorrecta: debería ser .cbz") return result + + +def validate_archive(path: str) -> StepResult: + """Para el pipeline: devuelve StepResult con errores/avisos de validación.""" + vr = validate_comic(path) + return StepResult( + step="validate", + changed=False, + details=[f"Formato real: {vr.real_format}, extensión: {vr.extension}"], + warnings=list(vr.warnings), + errors=list(vr.errors), + )