# core/pipeline.py import os import tempfile import shutil import rarfile from core.archive import detect_real_format, extract_archive, repack_as_cbz, ArchiveError, list_archive_names from core.backup import move_to_backup from core.collision import CollisionPolicy, resolve_collision from core.result import ComicResult, StepResult from processors.validator import validate_archive from processors.cleaner import clean_directory from processors.converter import needs_conversion, conversion_step_result from processors.checks import ( check_trash, check_page_numbering, check_image_extensions, check_comicinfo, ) from processors.page_normalizer import normalize_pages, preview_normalize_pages from processors.image_normalizer import ( normalize_images, preview_normalize_images, uniformize_images, preview_uniformize_images, ) class Pipeline: def __init__( self, steps: list, desired_format: str = "cbz", desired_image_format: str = ".jpg", collision_policy: str = CollisionPolicy.ABORT, dry_run: bool = False, ): self.steps = steps self.desired_format = desired_format self.desired_image_format = desired_image_format self.collision_policy = collision_policy self.dry_run = dry_run def _compute_preview(self, step: str, temp_dir: str, step_results: list) -> dict: if step == "clean": trash_result = next((r for r in step_results if r.step == "check_trash"), None) if trash_result: prefix = "Basura detectada: " items = [w.removeprefix(prefix) for w in trash_result.warnings if w.startswith(prefix)] else: items = [] return {"items": items} elif step == "normalize_pages": renames = preview_normalize_pages(temp_dir) return {"renames": renames} elif step == "normalize_images": conversions = preview_uniformize_images(temp_dir, self.desired_image_format) return {"conversions": conversions, "target_ext": self.desired_image_format} elif step == "convert_images": conversions = preview_normalize_images(temp_dir, self.desired_image_format) return {"conversions": conversions, "target_ext": self.desired_image_format} elif step == "convert": return {"target_format": self.desired_format.upper()} return {} def _needs_extraction(self, step_results: list, real_format: str, path: str) -> bool: for step in self.steps: if step in ("normalize_pages", "normalize_images", "convert_images"): return True if step == "convert": if needs_conversion(real_format, self.desired_format): return True ext = os.path.splitext(path)[1].lower().lstrip(".") if ext != self.desired_format: return True if step == "clean": trash = next((r for r in step_results if r.step == "check_trash"), None) if trash and trash.warnings: return True return False def run(self, path: str, confirm_fn=None) -> ComicResult: step_results = [] # 1. Validar siempre, antes de extraer val = validate_archive(path) step_results.append(val) if val.errors: return ComicResult(original_path=path, final_path=None, steps=step_results) real_format = detect_real_format(path) # 2. Obtener lista de miembros sin extraer try: names = list_archive_names(path) except Exception as exc: step_results.append(StepResult(step="list", changed=False, errors=[str(exc)])) return ComicResult(original_path=path, final_path=None, steps=step_results) # 3. Ejecutar siempre los 4 content checks sobre los nombres (sin extraer) step_results += [ check_trash(names), check_page_numbering(names), check_image_extensions(names), check_comicinfo(names), ] # 4. Pre-flight: si ningún step necesita extracción, salir sin tocar el archivo if not self._needs_extraction(step_results, real_format, path): return ComicResult(original_path=path, final_path=path, steps=step_results) # 5. Extraer una sola vez temp_dir = tempfile.mkdtemp() try: extract_archive(path, temp_dir) # 6. Aplicar cada fix step sobre el directorio temporal any_changed = False if "clean" in self.steps: preview = self._compute_preview("clean", temp_dir, step_results) if preview.get("items"): if confirm_fn is None or confirm_fn("clean", preview): clean_result = clean_directory(temp_dir) step_results.append(clean_result) if clean_result.changed: any_changed = True if "normalize_pages" in self.steps: preview = self._compute_preview("normalize_pages", temp_dir, step_results) if preview.get("renames"): if confirm_fn is None or confirm_fn("normalize_pages", preview): norm_result = normalize_pages(temp_dir) step_results.append(norm_result) if norm_result.changed: any_changed = True if "normalize_images" in self.steps: preview = self._compute_preview("normalize_images", temp_dir, step_results) if preview.get("conversions"): if confirm_fn is None or confirm_fn("normalize_images", preview): img_result = uniformize_images(temp_dir, self.desired_image_format) step_results.append(img_result) if img_result.errors: return ComicResult( original_path=path, final_path=None, steps=step_results ) if img_result.changed: any_changed = True if "convert_images" in self.steps: preview = self._compute_preview("convert_images", temp_dir, step_results) if preview.get("conversions"): if confirm_fn is None or confirm_fn("convert_images", preview): img_result = normalize_images(temp_dir, self.desired_image_format) step_results.append(img_result) if img_result.errors: return ComicResult( original_path=path, final_path=None, steps=step_results ) if img_result.changed: any_changed = True if "convert" in self.steps: preview = self._compute_preview("convert", temp_dir, step_results) if confirm_fn is None or confirm_fn("convert", preview): conv_result = conversion_step_result(real_format, self.desired_format) # Extensión incorrecta aunque el formato real ya sea correcto file_ext = os.path.splitext(path)[1].lower().lstrip(".") if ( not conv_result.errors and not conv_result.changed and file_ext != self.desired_format ): conv_result = StepResult( step="convert", changed=True, details=[f"Extensión incorrecta corregida: .{file_ext} → .{self.desired_format}"], ) step_results.append(conv_result) if conv_result.errors: return ComicResult( original_path=path, final_path=None, steps=step_results ) if conv_result.changed: any_changed = True # 7. Reempaquetar si hubo cambios o conversión de formato ext = os.path.splitext(path)[1].lower().lstrip(".") needs_repack = any_changed or ( "convert" in self.steps and ( needs_conversion(real_format, self.desired_format) or ext != self.desired_format ) ) if not needs_repack: return ComicResult( original_path=path, final_path=path, steps=step_results ) base, _ = os.path.splitext(path) target_path = f"{base}.{self.desired_format}" if not self.dry_run: safe_target = resolve_collision(target_path, self.collision_policy) repack_as_cbz(temp_dir, safe_target) # Eliminar o mover a backup el original si el nombre cambió if safe_target != path and os.path.exists(path): if self.collision_policy == CollisionPolicy.BACKUP: move_to_backup(path) else: os.remove(path) else: safe_target = target_path except (ArchiveError, rarfile.BadRarFile, rarfile.Error, FileExistsError, OSError) as exc: step_results.append( StepResult(step="repack", changed=False, errors=[str(exc)]) ) return ComicResult(original_path=path, final_path=None, steps=step_results) finally: shutil.rmtree(temp_dir, ignore_errors=True) return ComicResult(original_path=path, final_path=safe_target, steps=step_results)