# core/pipeline.py import os import tempfile import shutil import rarfile from core.archive import detect_real_format, extract_archive, repack_as_cbz, ArchiveError, list_archive_names from core.backup import move_to_backup from core.collision import CollisionPolicy, resolve_collision from core.result import ComicResult, StepResult from processors.validator import validate_archive from processors.cleaner import clean_directory, flatten_directory from processors.converter import needs_conversion, conversion_step_result from processors.checks import ( check_trash, check_page_numbering, check_image_extensions, check_comicinfo, check_foreign, check_nested, check_extension_case, ) from processors.page_normalizer import normalize_pages, preview_normalize_pages from processors.image_normalizer import ( normalize_images, preview_normalize_images, uniformize_images, preview_uniformize_images, ) from processors.case_normalizer import normalize_case, preview_normalize_case class Pipeline: def __init__( self, steps: list, desired_format: str = "cbz", desired_image_format: str = ".jpg", collision_policy: str = CollisionPolicy.ABORT, dry_run: bool = False, case_mode: str = "lower", ): self.steps = steps self.desired_format = desired_format self.desired_image_format = desired_image_format self.collision_policy = collision_policy self.dry_run = dry_run self.case_mode = case_mode def _compute_preview(self, step: str, temp_dir: str, step_results: list) -> dict: if step == "clean": trash_r = next((r for r in step_results if r.step == "check_trash"), None) foreign_r = next((r for r in step_results if r.step == "check_foreign"), None) nested_r = next((r for r in step_results if r.step == "check_nested"), None) prefix = "Basura detectada: " items = [w.removeprefix(prefix) for w in (trash_r.warnings if trash_r else []) if w.startswith(prefix)] items += [w.removeprefix("Fichero extraño: ") for w in (foreign_r.warnings if foreign_r else [])] flatten = False flatten_files: list[tuple[str, str]] = [] if nested_r and nested_r.warnings: w = nested_r.warnings[0] if w.startswith("Imágenes en subdirectorio: "): flatten = True for entry in sorted(os.listdir(temp_dir)): subpath = os.path.join(temp_dir, entry) if not os.path.isdir(subpath): continue for root, _dirs, files in os.walk(subpath): for f in sorted(files): src_abs = os.path.join(root, f) src_rel = os.path.relpath(src_abs, temp_dir) flatten_files.append((src_rel, f)) return {"items": items, "flatten": flatten, "flatten_files": flatten_files} elif step == "normalize_pages": renames = preview_normalize_pages(temp_dir) return {"renames": renames} elif step == "normalize_images": conversions = preview_uniformize_images(temp_dir, self.desired_image_format) return {"conversions": conversions, "target_ext": self.desired_image_format} elif step == "convert_images": conversions = preview_normalize_images(temp_dir, self.desired_image_format) return {"conversions": conversions, "target_ext": self.desired_image_format} elif step == "normalize_case": renames = preview_normalize_case(temp_dir, self.case_mode) return {"renames": renames, "mode": self.case_mode} elif step == "convert": return {"target_format": self.desired_format.upper()} return {} def _needs_extraction(self, step_results: list, real_format: str, path: str) -> bool: for step in self.steps: if step in ("normalize_pages", "normalize_images", "convert_images"): return True if step == "normalize_case": case_r = next((r for r in step_results if r.step == "check_extension_case"), None) if case_r and case_r.warnings: return True if step == "convert": if needs_conversion(real_format, self.desired_format): return True ext = os.path.splitext(path)[1].lower().lstrip(".") if ext != self.desired_format: return True if step == "clean": trash = next((r for r in step_results if r.step == "check_trash"), None) foreign = next((r for r in step_results if r.step == "check_foreign"), None) nested = next((r for r in step_results if r.step == "check_nested"), None) if (trash and trash.warnings) or (foreign and foreign.warnings): return True # Solo necesita extracción si es el caso aplanable (1 subdir) if nested and nested.warnings and nested.warnings[0].startswith("Imágenes en subdirectorio: "): return True return False def run(self, path: str, confirm_fn=None) -> ComicResult: step_results = [] # 1. Validar siempre, antes de extraer val = validate_archive(path) step_results.append(val) if val.errors: return ComicResult(original_path=path, final_path=None, steps=step_results) real_format = detect_real_format(path) # 2. Obtener lista de miembros sin extraer try: names = list_archive_names(path) except Exception as exc: step_results.append(StepResult(step="list", changed=False, errors=[str(exc)])) return ComicResult(original_path=path, final_path=None, steps=step_results) # 3. Ejecutar siempre los content checks sobre los nombres (sin extraer) step_results += [ check_trash(names), check_page_numbering(names), check_image_extensions(names), check_comicinfo(names), check_foreign(names), check_nested(names), check_extension_case(names, self.case_mode), ] # 3b. Corrección del case de la extensión exterior (renombrado en-sitio, sin repack) if "normalize_case" in self.steps: outer_ext = os.path.splitext(path)[1] target_outer_ext = outer_ext.lower() if self.case_mode == "lower" else outer_ext.upper() if outer_ext != target_outer_ext: base_no_ext = os.path.splitext(path)[0] new_outer_path = base_no_ext + target_outer_ext try: safe_outer = resolve_collision(new_outer_path, self.collision_policy) if not self.dry_run: os.rename(path, safe_outer) path = safe_outer step_results.append(StepResult( step="normalize_case_outer", changed=True, details=[f"Extensión exterior corregida: {outer_ext} → {target_outer_ext}"], )) except (FileExistsError, OSError) as exc: step_results.append(StepResult( step="normalize_case_outer", changed=False, errors=[str(exc)], )) return ComicResult(original_path=path, final_path=None, steps=step_results) # 4. Pre-flight: si ningún step necesita extracción, salir sin tocar el archivo if not self._needs_extraction(step_results, real_format, path): return ComicResult(original_path=path, final_path=path, steps=step_results) # 5. Extraer una sola vez temp_dir = tempfile.mkdtemp() try: extract_archive(path, temp_dir) # 6. Aplicar cada fix step sobre el directorio temporal any_changed = False if "clean" in self.steps: preview = self._compute_preview("clean", temp_dir, step_results) if preview.get("items") or preview.get("flatten"): if confirm_fn is None or confirm_fn("clean", preview): clean_result = clean_directory(temp_dir) step_results.append(clean_result) if clean_result.changed: any_changed = True # Aplanar si caso simple if preview.get("flatten"): flat_result = flatten_directory(temp_dir) step_results.append(flat_result) if flat_result.changed: any_changed = True if "normalize_pages" in self.steps: preview = self._compute_preview("normalize_pages", temp_dir, step_results) if preview.get("renames"): if confirm_fn is None or confirm_fn("normalize_pages", preview): norm_result = normalize_pages(temp_dir) step_results.append(norm_result) if norm_result.changed: any_changed = True if "normalize_images" in self.steps: preview = self._compute_preview("normalize_images", temp_dir, step_results) if preview.get("conversions"): if confirm_fn is None or confirm_fn("normalize_images", preview): img_result = uniformize_images(temp_dir, self.desired_image_format) step_results.append(img_result) if img_result.errors: return ComicResult( original_path=path, final_path=None, steps=step_results ) if img_result.changed: any_changed = True if "convert_images" in self.steps: preview = self._compute_preview("convert_images", temp_dir, step_results) if preview.get("conversions"): if confirm_fn is None or confirm_fn("convert_images", preview): img_result = normalize_images(temp_dir, self.desired_image_format) step_results.append(img_result) if img_result.errors: return ComicResult( original_path=path, final_path=None, steps=step_results ) if img_result.changed: any_changed = True if "normalize_case" in self.steps: preview = self._compute_preview("normalize_case", temp_dir, step_results) if preview.get("renames"): if confirm_fn is None or confirm_fn("normalize_case", preview): case_result = normalize_case(temp_dir, self.case_mode) step_results.append(case_result) if case_result.changed: any_changed = True if "convert" in self.steps: preview = self._compute_preview("convert", temp_dir, step_results) if confirm_fn is None or confirm_fn("convert", preview): conv_result = conversion_step_result(real_format, self.desired_format) # Extensión incorrecta aunque el formato real ya sea correcto file_ext = os.path.splitext(path)[1].lower().lstrip(".") if ( not conv_result.errors and not conv_result.changed and file_ext != self.desired_format ): conv_result = StepResult( step="convert", changed=True, details=[f"Extensión incorrecta corregida: .{file_ext} → .{self.desired_format}"], ) step_results.append(conv_result) if conv_result.errors: return ComicResult( original_path=path, final_path=None, steps=step_results ) if conv_result.changed: any_changed = True # 7. Reempaquetar si hubo cambios o conversión de formato ext = os.path.splitext(path)[1].lower().lstrip(".") needs_repack = any_changed or ( "convert" in self.steps and ( needs_conversion(real_format, self.desired_format) or ext != self.desired_format ) ) if not needs_repack: return ComicResult( original_path=path, final_path=path, steps=step_results ) base, _ = os.path.splitext(path) target_path = f"{base}.{self.desired_format}" if not self.dry_run: safe_target = resolve_collision(target_path, self.collision_policy) repack_as_cbz(temp_dir, safe_target) # Eliminar o mover a backup el original si el nombre cambió if safe_target != path and os.path.exists(path): if self.collision_policy == CollisionPolicy.BACKUP: move_to_backup(path) else: os.remove(path) else: safe_target = target_path except (ArchiveError, rarfile.BadRarFile, rarfile.Error, FileExistsError, OSError) as exc: step_results.append( StepResult(step="repack", changed=False, errors=[str(exc)]) ) return ComicResult(original_path=path, final_path=None, steps=step_results) finally: shutil.rmtree(temp_dir, ignore_errors=True) return ComicResult(original_path=path, final_path=safe_target, steps=step_results)