b462c9fb1e
duplica els resultats de fitxers brossa mes info en --listar --aplanar
313 lines
14 KiB
Python
313 lines
14 KiB
Python
# core/pipeline.py
|
|
|
|
import os
|
|
import tempfile
|
|
import shutil
|
|
|
|
import rarfile
|
|
|
|
from core.archive import detect_real_format, extract_archive, repack_as_cbz, ArchiveError, list_archive_names
|
|
from core.backup import move_to_backup
|
|
from core.collision import CollisionPolicy, resolve_collision
|
|
from core.result import ComicResult, StepResult
|
|
from processors.validator import validate_archive
|
|
from processors.cleaner import clean_directory, flatten_directory
|
|
from processors.converter import needs_conversion, conversion_step_result
|
|
from processors.checks import (
|
|
check_trash,
|
|
check_page_numbering,
|
|
check_image_extensions,
|
|
check_comicinfo,
|
|
check_foreign,
|
|
check_nested,
|
|
check_extension_case,
|
|
)
|
|
from processors.page_normalizer import normalize_pages, preview_normalize_pages
|
|
from processors.image_normalizer import (
|
|
normalize_images,
|
|
preview_normalize_images,
|
|
uniformize_images,
|
|
preview_uniformize_images,
|
|
)
|
|
from processors.case_normalizer import normalize_case, preview_normalize_case
|
|
|
|
|
|
class Pipeline:
|
|
def __init__(
|
|
self,
|
|
steps: list,
|
|
desired_format: str = "cbz",
|
|
desired_image_format: str = ".jpg",
|
|
collision_policy: str = CollisionPolicy.ABORT,
|
|
dry_run: bool = False,
|
|
case_mode: str = "lower",
|
|
):
|
|
self.steps = steps
|
|
self.desired_format = desired_format
|
|
self.desired_image_format = desired_image_format
|
|
self.collision_policy = collision_policy
|
|
self.dry_run = dry_run
|
|
self.case_mode = case_mode
|
|
|
|
def _compute_preview(self, step: str, temp_dir: str, step_results: list) -> dict:
|
|
if step == "clean":
|
|
trash_r = next((r for r in step_results if r.step == "check_trash"), None)
|
|
foreign_r = next((r for r in step_results if r.step == "check_foreign"), None)
|
|
prefix = "Basura detectada: "
|
|
items = [w.removeprefix(prefix) for w in (trash_r.warnings if trash_r else []) if w.startswith(prefix)]
|
|
items += [w.removeprefix("Fichero extraño: ") for w in (foreign_r.warnings if foreign_r else [])]
|
|
return {"items": items}
|
|
|
|
elif step == "flatten":
|
|
nested_r = next((r for r in step_results if r.step == "check_nested"), None)
|
|
flatten_files: list[tuple[str, str]] = []
|
|
if nested_r and nested_r.warnings and nested_r.warnings[0].startswith("Imágenes en subdirectorio: "):
|
|
for entry in sorted(os.listdir(temp_dir)):
|
|
subpath = os.path.join(temp_dir, entry)
|
|
if not os.path.isdir(subpath):
|
|
continue
|
|
for root, _dirs, files in os.walk(subpath):
|
|
for f in sorted(files):
|
|
src_abs = os.path.join(root, f)
|
|
src_rel = os.path.relpath(src_abs, temp_dir)
|
|
flatten_files.append((src_rel, f))
|
|
return {"flatten_files": flatten_files}
|
|
|
|
elif step == "normalize_pages":
|
|
renames = preview_normalize_pages(temp_dir)
|
|
return {"renames": renames}
|
|
|
|
elif step == "normalize_images":
|
|
conversions = preview_uniformize_images(temp_dir, self.desired_image_format)
|
|
return {"conversions": conversions, "target_ext": self.desired_image_format}
|
|
|
|
elif step == "convert_images":
|
|
conversions = preview_normalize_images(temp_dir, self.desired_image_format)
|
|
return {"conversions": conversions, "target_ext": self.desired_image_format}
|
|
|
|
elif step == "normalize_case":
|
|
renames = preview_normalize_case(temp_dir, self.case_mode)
|
|
return {"renames": renames, "mode": self.case_mode}
|
|
|
|
elif step == "convert":
|
|
return {"target_format": self.desired_format.upper()}
|
|
|
|
return {}
|
|
|
|
def _needs_extraction(self, step_results: list, real_format: str, path: str) -> bool:
|
|
for step in self.steps:
|
|
if step in ("normalize_pages", "normalize_images", "convert_images"):
|
|
return True
|
|
if step == "normalize_case":
|
|
case_r = next((r for r in step_results if r.step == "check_extension_case"), None)
|
|
if case_r and case_r.warnings:
|
|
return True
|
|
if step == "convert":
|
|
if needs_conversion(real_format, self.desired_format):
|
|
return True
|
|
ext = os.path.splitext(path)[1].lower().lstrip(".")
|
|
if ext != self.desired_format:
|
|
return True
|
|
if step == "clean":
|
|
trash = next((r for r in step_results if r.step == "check_trash"), None)
|
|
foreign = next((r for r in step_results if r.step == "check_foreign"), None)
|
|
if (trash and trash.warnings) or (foreign and foreign.warnings):
|
|
return True
|
|
if step == "flatten":
|
|
nested = next((r for r in step_results if r.step == "check_nested"), None)
|
|
if nested and nested.warnings and nested.warnings[0].startswith("Imágenes en subdirectorio: "):
|
|
return True
|
|
return False
|
|
|
|
def run(self, path: str, confirm_fn=None) -> ComicResult:
|
|
step_results = []
|
|
|
|
# 1. Validar siempre, antes de extraer
|
|
val = validate_archive(path)
|
|
step_results.append(val)
|
|
if val.errors:
|
|
return ComicResult(original_path=path, final_path=None, steps=step_results)
|
|
|
|
real_format = detect_real_format(path)
|
|
|
|
# 2. Obtener lista de miembros sin extraer
|
|
try:
|
|
names = list_archive_names(path)
|
|
except Exception as exc:
|
|
step_results.append(StepResult(step="list", changed=False, errors=[str(exc)]))
|
|
return ComicResult(original_path=path, final_path=None, steps=step_results)
|
|
|
|
# 3. Ejecutar siempre los content checks sobre los nombres (sin extraer)
|
|
step_results += [
|
|
check_trash(names),
|
|
check_page_numbering(names),
|
|
check_image_extensions(names),
|
|
check_comicinfo(names),
|
|
check_foreign(names),
|
|
check_nested(names),
|
|
check_extension_case(names, self.case_mode),
|
|
]
|
|
|
|
# 3b. Corrección del case de la extensión exterior (renombrado en-sitio, sin repack)
|
|
if "normalize_case" in self.steps:
|
|
outer_ext = os.path.splitext(path)[1]
|
|
target_outer_ext = outer_ext.lower() if self.case_mode == "lower" else outer_ext.upper()
|
|
if outer_ext != target_outer_ext:
|
|
base_no_ext = os.path.splitext(path)[0]
|
|
new_outer_path = base_no_ext + target_outer_ext
|
|
try:
|
|
safe_outer = resolve_collision(new_outer_path, self.collision_policy)
|
|
if not self.dry_run:
|
|
os.rename(path, safe_outer)
|
|
path = safe_outer
|
|
step_results.append(StepResult(
|
|
step="normalize_case_outer",
|
|
changed=True,
|
|
details=[f"Extensión exterior corregida: {outer_ext} → {target_outer_ext}"],
|
|
))
|
|
except (FileExistsError, OSError) as exc:
|
|
step_results.append(StepResult(
|
|
step="normalize_case_outer",
|
|
changed=False,
|
|
errors=[str(exc)],
|
|
))
|
|
return ComicResult(original_path=path, final_path=None, steps=step_results)
|
|
|
|
# 4. Pre-flight: si ningún step necesita extracción, salir sin tocar el archivo
|
|
if not self._needs_extraction(step_results, real_format, path):
|
|
return ComicResult(original_path=path, final_path=path, steps=step_results)
|
|
|
|
# 5. Extraer una sola vez
|
|
temp_dir = tempfile.mkdtemp()
|
|
try:
|
|
extract_archive(path, temp_dir)
|
|
|
|
# 6. Aplicar cada fix step sobre el directorio temporal
|
|
any_changed = False
|
|
|
|
if "clean" in self.steps:
|
|
preview = self._compute_preview("clean", temp_dir, step_results)
|
|
if preview.get("items"):
|
|
if confirm_fn is None or confirm_fn("clean", preview):
|
|
clean_result = clean_directory(temp_dir)
|
|
step_results.append(clean_result)
|
|
if clean_result.changed:
|
|
any_changed = True
|
|
|
|
if "flatten" in self.steps:
|
|
preview = self._compute_preview("flatten", temp_dir, step_results)
|
|
if preview.get("flatten_files"):
|
|
if confirm_fn is None or confirm_fn("flatten", preview):
|
|
flat_result = flatten_directory(temp_dir)
|
|
step_results.append(flat_result)
|
|
if flat_result.changed:
|
|
any_changed = True
|
|
|
|
if "normalize_pages" in self.steps:
|
|
preview = self._compute_preview("normalize_pages", temp_dir, step_results)
|
|
if preview.get("renames"):
|
|
if confirm_fn is None or confirm_fn("normalize_pages", preview):
|
|
norm_result = normalize_pages(temp_dir)
|
|
step_results.append(norm_result)
|
|
if norm_result.changed:
|
|
any_changed = True
|
|
|
|
if "normalize_images" in self.steps:
|
|
preview = self._compute_preview("normalize_images", temp_dir, step_results)
|
|
if preview.get("conversions"):
|
|
if confirm_fn is None or confirm_fn("normalize_images", preview):
|
|
img_result = uniformize_images(temp_dir, self.desired_image_format)
|
|
step_results.append(img_result)
|
|
if img_result.errors:
|
|
return ComicResult(
|
|
original_path=path, final_path=None, steps=step_results
|
|
)
|
|
if img_result.changed:
|
|
any_changed = True
|
|
|
|
if "convert_images" in self.steps:
|
|
preview = self._compute_preview("convert_images", temp_dir, step_results)
|
|
if preview.get("conversions"):
|
|
if confirm_fn is None or confirm_fn("convert_images", preview):
|
|
img_result = normalize_images(temp_dir, self.desired_image_format)
|
|
step_results.append(img_result)
|
|
if img_result.errors:
|
|
return ComicResult(
|
|
original_path=path, final_path=None, steps=step_results
|
|
)
|
|
if img_result.changed:
|
|
any_changed = True
|
|
|
|
if "normalize_case" in self.steps:
|
|
preview = self._compute_preview("normalize_case", temp_dir, step_results)
|
|
if preview.get("renames"):
|
|
if confirm_fn is None or confirm_fn("normalize_case", preview):
|
|
case_result = normalize_case(temp_dir, self.case_mode)
|
|
step_results.append(case_result)
|
|
if case_result.changed:
|
|
any_changed = True
|
|
|
|
if "convert" in self.steps:
|
|
preview = self._compute_preview("convert", temp_dir, step_results)
|
|
if confirm_fn is None or confirm_fn("convert", preview):
|
|
conv_result = conversion_step_result(real_format, self.desired_format)
|
|
# Extensión incorrecta aunque el formato real ya sea correcto
|
|
file_ext = os.path.splitext(path)[1].lower().lstrip(".")
|
|
if (
|
|
not conv_result.errors
|
|
and not conv_result.changed
|
|
and file_ext != self.desired_format
|
|
):
|
|
conv_result = StepResult(
|
|
step="convert",
|
|
changed=True,
|
|
details=[f"Extensión incorrecta corregida: .{file_ext} → .{self.desired_format}"],
|
|
)
|
|
step_results.append(conv_result)
|
|
if conv_result.errors:
|
|
return ComicResult(
|
|
original_path=path, final_path=None, steps=step_results
|
|
)
|
|
if conv_result.changed:
|
|
any_changed = True
|
|
|
|
# 7. Reempaquetar si hubo cambios o conversión de formato
|
|
ext = os.path.splitext(path)[1].lower().lstrip(".")
|
|
needs_repack = any_changed or (
|
|
"convert" in self.steps
|
|
and (
|
|
needs_conversion(real_format, self.desired_format)
|
|
or ext != self.desired_format
|
|
)
|
|
)
|
|
|
|
if not needs_repack:
|
|
return ComicResult(
|
|
original_path=path, final_path=path, steps=step_results
|
|
)
|
|
|
|
base, _ = os.path.splitext(path)
|
|
target_path = f"{base}.{self.desired_format}"
|
|
|
|
if not self.dry_run:
|
|
safe_target = resolve_collision(target_path, self.collision_policy)
|
|
repack_as_cbz(temp_dir, safe_target)
|
|
# Eliminar o mover a backup el original si el nombre cambió
|
|
if safe_target != path and os.path.exists(path):
|
|
if self.collision_policy == CollisionPolicy.BACKUP:
|
|
move_to_backup(path)
|
|
else:
|
|
os.remove(path)
|
|
else:
|
|
safe_target = target_path
|
|
|
|
except (ArchiveError, rarfile.BadRarFile, rarfile.Error, FileExistsError, OSError) as exc:
|
|
step_results.append(
|
|
StepResult(step="repack", changed=False, errors=[str(exc)])
|
|
)
|
|
return ComicResult(original_path=path, final_path=None, steps=step_results)
|
|
finally:
|
|
shutil.rmtree(temp_dir, ignore_errors=True)
|
|
|
|
return ComicResult(original_path=path, final_path=safe_target, steps=step_results)
|