Files
comic-manager/core/pipeline.py
T

129 lines
4.5 KiB
Python

# core/pipeline.py
import os
import tempfile
import shutil
from core.archive import detect_real_format, extract_archive, repack_as_cbz, ArchiveError
from core.collision import CollisionPolicy, resolve_collision
from core.result import ComicResult, StepResult
from processors.validator import validate_archive
from processors.cleaner import clean_directory
from processors.converter import needs_conversion, conversion_step_result
from processors.checks import (
check_trash,
check_page_numbering,
check_image_extensions,
check_comicinfo,
)
from processors.page_normalizer import normalize_pages
from processors.image_normalizer import normalize_images
class Pipeline:
def __init__(
self,
steps: list,
desired_format: str = "cbz",
desired_image_format: str = ".jpg",
collision_policy: str = CollisionPolicy.ABORT,
dry_run: bool = False,
):
self.steps = steps
self.desired_format = desired_format
self.desired_image_format = desired_image_format
self.collision_policy = collision_policy
self.dry_run = dry_run
def run(self, path: str) -> ComicResult:
step_results = []
# 1. Validar siempre, antes de extraer
val = validate_archive(path)
step_results.append(val)
if val.errors:
return ComicResult(original_path=path, final_path=None, steps=step_results)
real_format = detect_real_format(path)
# 2. Extraer una sola vez
temp_dir = tempfile.mkdtemp()
try:
extract_archive(path, temp_dir)
# 3. Ejecutar siempre los 4 content checks
step_results += [
check_trash(temp_dir),
check_page_numbering(temp_dir),
check_image_extensions(temp_dir),
check_comicinfo(temp_dir),
]
# 4. Aplicar cada fix step sobre el directorio temporal
any_changed = False
if "clean" in self.steps:
clean_result = clean_directory(temp_dir)
step_results.append(clean_result)
if clean_result.changed:
any_changed = True
if "normalize_pages" in self.steps:
norm_result = normalize_pages(temp_dir)
step_results.append(norm_result)
if norm_result.changed:
any_changed = True
if "normalize_images" in self.steps:
img_result = normalize_images(temp_dir, self.desired_image_format)
step_results.append(img_result)
if img_result.errors:
return ComicResult(
original_path=path, final_path=None, steps=step_results
)
if img_result.changed:
any_changed = True
if "convert" in self.steps:
conv_result = conversion_step_result(real_format, self.desired_format)
step_results.append(conv_result)
if conv_result.errors:
return ComicResult(
original_path=path, final_path=None, steps=step_results
)
if conv_result.changed:
any_changed = True
# 5. Reempaquetar si hubo cambios o conversión de formato
needs_repack = any_changed or (
"convert" in self.steps
and needs_conversion(real_format, self.desired_format)
)
if not needs_repack:
return ComicResult(
original_path=path, final_path=path, steps=step_results
)
base, _ = os.path.splitext(path)
target_path = f"{base}.{self.desired_format}"
if not self.dry_run:
safe_target = resolve_collision(target_path, self.collision_policy)
repack_as_cbz(temp_dir, safe_target)
# Eliminar original si el nombre cambió
if safe_target != path and os.path.exists(path):
os.remove(path)
else:
safe_target = target_path
except (ArchiveError, FileExistsError, OSError) as exc:
step_results.append(
StepResult(step="repack", changed=False, errors=[str(exc)])
)
return ComicResult(original_path=path, final_path=None, steps=step_results)
finally:
shutil.rmtree(temp_dir, ignore_errors=True)
return ComicResult(original_path=path, final_path=safe_target, steps=step_results)