This commit is contained in:
2026-02-19 08:36:06 +01:00
parent e5bf95e4fa
commit 4e86771964
11 changed files with 344 additions and 203 deletions
+1
View File
@@ -1,4 +1,5 @@
backup/
CLAUDE.md
# ---> Python
# Byte-compiled / optimized / DLL files
+57
View File
@@ -0,0 +1,57 @@
# core/archive.py
import os
import zipfile
import rarfile
import shutil
class ArchiveError(Exception):
pass
def detect_real_format(path: str) -> str | None:
"""Devuelve 'zip', 'rar' o None según los magic bytes del archivo."""
try:
zipfile.ZipFile(path).close()
return "zip"
except Exception:
pass
try:
rarfile.RarFile(path).close()
return "rar"
except Exception:
pass
return None
def open_archive(path: str):
"""Devuelve un ZipFile o RarFile abierto en modo lectura."""
fmt = detect_real_format(path)
if fmt == "zip":
return zipfile.ZipFile(path, "r")
if fmt == "rar":
return rarfile.RarFile(path, "r")
raise ArchiveError(f"Formato desconocido o archivo corrupto: {path}")
def extract_archive(path: str, dest_dir: str) -> str:
"""Extrae el archivo en dest_dir. Devuelve dest_dir."""
archive = open_archive(path)
try:
archive.extractall(dest_dir)
finally:
archive.close()
return dest_dir
def repack_as_cbz(source_dir: str, target_path: str) -> None:
"""Empaqueta todos los archivos de source_dir en un CBZ (ZIP deflated)."""
with zipfile.ZipFile(target_path, "w", zipfile.ZIP_DEFLATED) as zf:
for root, _, files in os.walk(source_dir):
for f in files:
full = os.path.join(root, f)
rel = os.path.relpath(full, source_dir)
zf.write(full, rel)
+4 -2
View File
@@ -4,12 +4,14 @@ import os
import shutil
from core.paths import get_project_root
def move_to_backup(path):
"""Mueve un archivo al directorio centralizado /backup."""
"""Mueve un archivo al directorio centralizado /backup sin sobrescribir."""
from core.collision import safe_backup_name
root = get_project_root()
backup_dir = os.path.join(root, "backup")
os.makedirs(backup_dir, exist_ok=True)
target = os.path.join(backup_dir, os.path.basename(path))
target = safe_backup_name(path)
shutil.move(path, target)
return target
+60
View File
@@ -0,0 +1,60 @@
# core/collision.py
import os
from core.paths import get_project_root
class CollisionPolicy:
ABORT = "abort" # comportamiento estricto actual
BACKUP = "backup" # mueve el existente a backup/ y sigue
RENAME = "rename" # añade sufijo _1, _2... hasta nombre libre
def safe_backup_name(path: str) -> str:
"""
Devuelve una ruta sin colisión dentro de backup/.
Añade sufijo _1, _2... si el nombre base ya existe.
"""
root = get_project_root()
backup_dir = os.path.join(root, "backup")
base = os.path.basename(path)
name, ext = os.path.splitext(base)
candidate = os.path.join(backup_dir, base)
counter = 1
while os.path.exists(candidate):
candidate = os.path.join(backup_dir, f"{name}_{counter}{ext}")
counter += 1
return candidate
def resolve_collision(target_path: str, policy: str = CollisionPolicy.ABORT) -> str:
"""
Comprueba si target_path existe y aplica la política:
ABORT → lanza FileExistsError
BACKUP → mueve el existente a backup/ y devuelve target_path
RENAME → devuelve una ruta alternativa libre (target_1.cbz, ...)
Devuelve la ruta segura donde escribir.
"""
if not os.path.exists(target_path):
return target_path
if policy == CollisionPolicy.ABORT:
raise FileExistsError(
f"El archivo destino ya existe y no se sobrescribirá: {target_path}"
)
if policy == CollisionPolicy.BACKUP:
from core.backup import move_to_backup
move_to_backup(target_path)
return target_path
if policy == CollisionPolicy.RENAME:
base, ext = os.path.splitext(target_path)
counter = 1
candidate = f"{base}_{counter}{ext}"
while os.path.exists(candidate):
counter += 1
candidate = f"{base}_{counter}{ext}"
return candidate
raise ValueError(f"Política de colisión desconocida: {policy}")
+94
View File
@@ -0,0 +1,94 @@
# core/pipeline.py
import os
import tempfile
import shutil
from core.archive import detect_real_format, extract_archive, repack_as_cbz, ArchiveError
from core.collision import CollisionPolicy, resolve_collision
from core.result import ComicResult, StepResult
from processors.validator import validate_archive
from processors.cleaner import clean_directory
from processors.converter import needs_conversion, conversion_step_result
class Pipeline:
def __init__(
self,
steps: list,
desired_format: str = "cbz",
collision_policy: str = CollisionPolicy.ABORT,
dry_run: bool = False,
):
self.steps = steps
self.desired_format = desired_format
self.collision_policy = collision_policy
self.dry_run = dry_run
def run(self, path: str) -> ComicResult:
step_results = []
# 1. Validar siempre, antes de extraer
val = validate_archive(path)
step_results.append(val)
if val.errors:
return ComicResult(original_path=path, final_path=None, steps=step_results)
real_format = detect_real_format(path)
# 2. Extraer una sola vez
temp_dir = tempfile.mkdtemp()
try:
extract_archive(path, temp_dir)
# 3. Aplicar cada step sobre el directorio temporal
any_changed = False
if "clean" in self.steps:
clean_result = clean_directory(temp_dir)
step_results.append(clean_result)
if clean_result.changed:
any_changed = True
if "convert" in self.steps:
conv_result = conversion_step_result(real_format, self.desired_format)
step_results.append(conv_result)
if conv_result.errors:
return ComicResult(
original_path=path, final_path=None, steps=step_results
)
if conv_result.changed:
any_changed = True
# 4. Reempaquetar si hubo cambios o conversión de formato
needs_repack = any_changed or (
"convert" in self.steps
and needs_conversion(real_format, self.desired_format)
)
if not needs_repack:
return ComicResult(
original_path=path, final_path=path, steps=step_results
)
base, _ = os.path.splitext(path)
target_path = f"{base}.{self.desired_format}"
if not self.dry_run:
safe_target = resolve_collision(target_path, self.collision_policy)
repack_as_cbz(temp_dir, safe_target)
# Eliminar original si el nombre cambió
if safe_target != path and os.path.exists(path):
os.remove(path)
else:
safe_target = target_path
except (ArchiveError, FileExistsError, OSError) as exc:
step_results.append(
StepResult(step="repack", changed=False, errors=[str(exc)])
)
return ComicResult(original_path=path, final_path=None, steps=step_results)
finally:
shutil.rmtree(temp_dir, ignore_errors=True)
return ComicResult(original_path=path, final_path=safe_target, steps=step_results)
+47
View File
@@ -0,0 +1,47 @@
# core/result.py
from dataclasses import dataclass, field
@dataclass
class StepResult:
step: str # "validate" | "clean" | "convert" | ...
changed: bool
details: list = field(default_factory=list) # e.g. ["Eliminado: thumbs.db"]
warnings: list = field(default_factory=list)
errors: list = field(default_factory=list)
@dataclass
class ComicResult:
original_path: str
final_path: str | None # None si se abortó
steps: list = field(default_factory=list)
def ok(self) -> bool:
return all(not s.errors for s in self.steps)
def summary(self) -> str:
if not self.ok():
errors = [e for s in self.steps for e in s.errors]
return f"ERROR [{self.original_path}]: {'; '.join(errors)}"
changed_steps = [s.step for s in self.steps if s.changed]
if changed_steps:
dest = self.final_path or self.original_path
return f"OK [{self.original_path}] → {dest} ({', '.join(changed_steps)})"
return f"OK [{self.original_path}] (sin cambios)"
def full_report(self) -> str:
lines = [f"Cómic: {self.original_path}"]
for s in self.steps:
status = "CAMBIADO" if s.changed else "sin cambios"
lines.append(f" [{s.step}] {status}")
for d in s.details:
lines.append(f" - {d}")
for w in s.warnings:
lines.append(f" AVISO: {w}")
for e in s.errors:
lines.append(f" ERROR: {e}")
if self.final_path:
lines.append(f" Resultado final: {self.final_path}")
return "\n".join(lines)
+12 -24
View File
@@ -2,10 +2,8 @@
import argparse
from core.scanner import find_comic_files
from core.pipeline import Pipeline
from processors.validator import validate_comic
from processors.cleaner import clean_comic
from processors.converter import convert_comic
from processors.standardizer import standardize_comic
def parse_args():
@@ -33,31 +31,21 @@ def main():
if args.validar:
for f in comic_files:
print(f"Validando: {f}")
print(validate_comic(f))
res = validate_comic(f)
print(f"{f}{res.summary()}")
print()
if args.limpiar:
for f in comic_files:
print(f"Limpieza: {f}")
print(clean_comic(f))
print()
steps = []
if args.limpiar or args.estandarizar:
steps.append("clean")
if args.convertir or args.estandarizar:
steps.append("convert")
if args.convertir:
if steps:
pipeline = Pipeline(steps=steps, desired_format=args.formato)
for f in comic_files:
print(f"Convirtiendo: {f}")
info = convert_comic(f, args.formato)
if info["needs_conversion"]:
print(f" → Convertido a {args.formato.upper()}: {info['target_path']}")
else:
print(f" → Sin cambios (ya era {args.formato.upper()})")
print()
if args.estandarizar:
for f in comic_files:
print(f"Estandarizando: {f}")
print(standardize_comic(f, args.formato))
print()
result = pipeline.run(f)
print(result.summary())
if __name__ == "__main__":
+14 -66
View File
@@ -1,78 +1,26 @@
# processors/cleaner.py
import os
import zipfile
import rarfile
import tempfile
import shutil
from core.constants import TRASH_FILES
from processors.validator import validate_comic
from core.result import StepResult
class CleanResult:
def __init__(self, original_path):
self.original_path = original_path
self.cleaned_path = None
self.removed_files = []
self.repacked = False
def __str__(self):
msg = f"Limpieza de: {self.original_path}\n"
msg += f" Archivos eliminados: {len(self.removed_files)}\n"
msg += f" Resultado final: {self.cleaned_path}\n"
return msg
def clean_comic(path):
validation = validate_comic(path)
if validation.errors:
raise Exception(f"Archivo corrupto: {path}")
real = validation.real_format
# Abrir según formato real
archive = zipfile.ZipFile(path, "r") if real == "zip" else rarfile.RarFile(path, "r")
temp_dir = tempfile.mkdtemp()
archive.extractall(temp_dir)
archive.close()
def clean_directory(work_dir: str) -> StepResult:
"""
Elimina TRASH_FILES del directorio ya extraído.
Sin I/O de archivo de cómic; trabaja sobre el directorio temporal.
"""
removed = []
for root, _, files in os.walk(temp_dir):
for root, _, files in os.walk(work_dir):
for f in files:
if f.lower() in TRASH_FILES:
full = os.path.join(root, f)
os.remove(full)
removed.append(os.path.relpath(full, temp_dir))
removed.append(os.path.relpath(full, work_dir))
# Si no se eliminó nada → no reconstruir
if not removed:
shutil.rmtree(temp_dir)
result = CleanResult(path)
result.cleaned_path = path
return result
# Reconstruir como CBZ
base, _ = os.path.splitext(path)
final_path = base + ".cbz"
with zipfile.ZipFile(final_path, "w", zipfile.ZIP_DEFLATED) as new_zip:
for root, _, files in os.walk(temp_dir):
for f in files:
full = os.path.join(root, f)
rel = os.path.relpath(full, temp_dir)
new_zip.write(full, rel)
shutil.rmtree(temp_dir)
# Mover original a backup
from core.backup import move_to_backup
move_to_backup(path)
result = CleanResult(path)
result.cleaned_path = final_path
result.removed_files = removed
result.repacked = True
return result
details = [f"Eliminado: {r}" for r in removed]
return StepResult(
step="clean",
changed=bool(removed),
details=details,
)
+30 -47
View File
@@ -1,54 +1,37 @@
# processors/converter.py
import os
import zipfile
import rarfile
import tempfile
import shutil
from processors.validator import validate_comic
from core.result import StepResult
def convert_comic(path, desired_format="cbz"):
validation = validate_comic(path)
def needs_conversion(real_format: str, desired_format: str) -> bool:
"""True si el formato real no coincide con el deseado."""
if desired_format == "cbz" and real_format == "zip":
return False
if desired_format == "cbr" and real_format == "rar":
return False
return True
if validation.errors:
raise Exception(f"Archivo corrupto: {path}")
real = validation.real_format
def conversion_step_result(real_format: str, desired_format: str) -> StepResult:
"""
Informa si se necesita conversión.
El pipeline decide si reempaquetar; este step solo evalúa.
"""
if desired_format == "cbr":
return StepResult(
step="convert",
changed=False,
errors=["Crear CBR requiere 'rar' instalado (no implementado)"],
)
# Si ya está en el formato deseado → nada que hacer
if desired_format == "cbz" and real == "zip":
return {"needs_conversion": False, "target_path": path}
if desired_format == "cbr" and real == "rar":
return {"needs_conversion": False, "target_path": path}
# Abrir según formato real
archive = zipfile.ZipFile(path, "r") if real == "zip" else rarfile.RarFile(path, "r")
temp_dir = tempfile.mkdtemp()
archive.extractall(temp_dir)
archive.close()
base, _ = os.path.splitext(path)
target_path = f"{base}.{desired_format}"
if desired_format == "cbz":
with zipfile.ZipFile(target_path, "w", zipfile.ZIP_DEFLATED) as new_zip:
for root, _, files in os.walk(temp_dir):
for f in files:
full = os.path.join(root, f)
rel = os.path.relpath(full, temp_dir)
new_zip.write(full, rel)
elif desired_format == "cbr":
raise NotImplementedError("Crear CBR requiere 'rar' instalado")
shutil.rmtree(temp_dir)
# Mover original a backup
from core.backup import move_to_backup
move_to_backup(path)
return {"needs_conversion": True, "target_path": target_path}
convert = needs_conversion(real_format, desired_format)
details = (
[f"Conversión necesaria: {real_format}{desired_format}"]
if convert
else [f"Sin conversión: ya es {desired_format}"]
)
return StepResult(
step="convert",
changed=convert,
details=details,
)
-44
View File
@@ -1,44 +0,0 @@
# processors/standardizer.py
from processors.validator import validate_comic
from processors.cleaner import clean_comic
from processors.converter import convert_comic
class StandardizeResult:
def __init__(self, path):
self.path = path
self.cleaned = None
self.converted = None
self.final_path = None
def __str__(self):
msg = f"Estandarización de: {self.path}\n"
if self.cleaned:
msg += f" Limpieza: OK ({len(self.cleaned.removed_files)} eliminados)\n"
if self.converted:
if self.converted["needs_conversion"]:
msg += f" Conversión: OK → {self.converted['target_path']}\n"
else:
msg += " Conversión: no necesaria\n"
msg += f" Resultado final: {self.final_path}\n"
return msg
def standardize_comic(path, desired_format="cbz"):
validation = validate_comic(path)
if validation.errors:
raise Exception(f"Archivo corrupto: {path}")
result = StandardizeResult(path)
clean_result = clean_comic(path)
result.cleaned = clean_result
convert_result = convert_comic(clean_result.cleaned_path, desired_format)
result.converted = convert_result
result.final_path = convert_result["target_path"] if convert_result["needs_conversion"] else clean_result.cleaned_path
return result
+25 -20
View File
@@ -1,8 +1,8 @@
# processors/validator.py
import os
import zipfile
import rarfile
from core.archive import detect_real_format
from core.result import StepResult
class ValidationResult:
@@ -13,7 +13,16 @@ class ValidationResult:
self.errors = []
self.warnings = []
def summary(self):
"""Resumen compacto para mostrar en consola."""
if self.errors:
return f"ERROR: {', '.join(self.errors)}"
if self.warnings:
return f"AVISO: {', '.join(self.warnings)}"
return "OK"
def __str__(self):
"""Salida detallada (solo si el usuario la pide)."""
msg = f"Validación de: {self.path}\n"
msg += f" Formato real: {self.real_format}\n"
msg += f" Extensión: {self.extension}\n"
@@ -28,24 +37,8 @@ class ValidationResult:
return msg
def detect_real_format(path):
"""Devuelve 'zip', 'rar' o None."""
try:
zipfile.ZipFile(path).close()
return "zip"
except:
pass
try:
rarfile.RarFile(path).close()
return "rar"
except:
pass
return None
def validate_comic(path):
def validate_comic(path) -> ValidationResult:
"""Modo standalone: comprueba formato e integridad. Para --validar en CLI."""
result = ValidationResult(path)
ext = os.path.splitext(path)[1].lower()
result.extension = ext
@@ -64,3 +57,15 @@ def validate_comic(path):
result.warnings.append("Extensión incorrecta: debería ser .cbz")
return result
def validate_archive(path: str) -> StepResult:
"""Para el pipeline: devuelve StepResult con errores/avisos de validación."""
vr = validate_comic(path)
return StepResult(
step="validate",
changed=False,
details=[f"Formato real: {vr.real_format}, extensión: {vr.extension}"],
warnings=list(vr.warnings),
errors=list(vr.errors),
)