171 lines
7.4 KiB
Python
171 lines
7.4 KiB
Python
# core/summary.py
|
|
|
|
import datetime
|
|
import os
|
|
|
|
from core.result import ComicResult
|
|
|
|
_CORRUPT_PATTERNS = ("BadRarFile", "BadZipFile", "corrupto", "Failed to read")
|
|
_BORDER = "═" * 42
|
|
|
|
|
|
class SummaryCollector:
|
|
def __init__(self):
|
|
self._results: list[ComicResult] = []
|
|
|
|
def add(self, result: ComicResult) -> None:
|
|
self._results.append(result)
|
|
|
|
def _is_corrupt(self, result: ComicResult) -> bool:
|
|
all_errors = [e for s in result.steps for e in s.errors]
|
|
return any(
|
|
pattern in err
|
|
for err in all_errors
|
|
for pattern in _CORRUPT_PATTERNS
|
|
)
|
|
|
|
def render(self, _max=10) -> str:
|
|
results = self._results
|
|
total = len(results)
|
|
if total == 0:
|
|
return ""
|
|
|
|
# Categorize
|
|
errors = [r for r in results if not r.ok() or r.final_path is None]
|
|
ok_results = [r for r in results if r.ok() and r.final_path is not None]
|
|
|
|
modified = [r for r in ok_results if any(s.changed for s in r.steps)]
|
|
warnings_only = [
|
|
r for r in ok_results
|
|
if not any(s.changed for s in r.steps) and r.has_issues()
|
|
]
|
|
no_changes = [
|
|
r for r in ok_results
|
|
if not any(s.changed for s in r.steps) and not r.has_issues()
|
|
]
|
|
|
|
corrupt = [r for r in errors if self._is_corrupt(r)]
|
|
other_errors = [r for r in errors if not self._is_corrupt(r)]
|
|
|
|
# Operation breakdown (among modified)
|
|
def count_step(step_names: list[str]) -> int:
|
|
return sum(
|
|
1 for r in modified
|
|
if any(s.step in step_names and s.changed for s in r.steps)
|
|
)
|
|
|
|
cleaned = count_step(["clean"])
|
|
pages_normalized = count_step(["normalize_pages"])
|
|
images_converted = count_step(["normalize_images", "convert_images"])
|
|
format_converted = count_step(["convert"])
|
|
|
|
lines = [
|
|
_BORDER,
|
|
" RESUMEN DEL PROCESAMIENTO",
|
|
_BORDER,
|
|
f" Total procesados : {total:>3}",
|
|
f" Sin cambios : {len(no_changes):>3}",
|
|
f" Modificados : {len(modified):>3}",
|
|
]
|
|
if modified:
|
|
if cleaned:
|
|
lines.append(f" · Limpiados : {cleaned:>3}")
|
|
if pages_normalized:
|
|
lines.append(f" · Páginas normalizadas : {pages_normalized:>3}")
|
|
if images_converted:
|
|
lines.append(f" · Imágenes convertidas : {images_converted:>3}")
|
|
if format_converted:
|
|
lines.append(f" · Formato convertido : {format_converted:>3}")
|
|
lines.append(f" Advertencias : {len(warnings_only):>3}")
|
|
lines.append(f" Errores : {len(errors):>3}")
|
|
if errors:
|
|
lines.append(f" · Corruptos : {len(corrupt):>3}")
|
|
lines.append(f" · Otros errores : {len(other_errors):>3}")
|
|
lines.append(_BORDER)
|
|
|
|
if corrupt:
|
|
lines.append("")
|
|
lines.append("Archivos corruptos:")
|
|
for r in corrupt:
|
|
lines.append(f" {r.original_path}")
|
|
|
|
if other_errors:
|
|
lines.append("")
|
|
lines.append("Otros errores:")
|
|
for r in other_errors:
|
|
all_errs = [e for s in r.steps for e in s.errors]
|
|
lines.append(f" {r.original_path} — {'; '.join(all_errs)}")
|
|
|
|
warn_categories = self._warning_categories(results)
|
|
if warn_categories:
|
|
lines.append("")
|
|
lines.append("Advertencias por categoría:")
|
|
for label, entries in warn_categories:
|
|
lines.append(f"\n {label} ({len(entries)}):")
|
|
shown = entries[:_max] if _max is not None else entries
|
|
for path, msg, annotation in shown:
|
|
suffix = f" ({annotation})" if annotation else ""
|
|
if msg:
|
|
lines.append(f" {path} — {msg}{suffix}")
|
|
else:
|
|
lines.append(f" {path}{suffix}")
|
|
if _max is not None and len(entries) > _max:
|
|
lines.append(f" ... y {len(entries) - _max} más")
|
|
|
|
return "\n".join(lines)
|
|
|
|
def full_log(self) -> str:
|
|
parts = [self.render(_max=None)]
|
|
issues = [r for r in self._results if r.has_issues()]
|
|
if issues:
|
|
parts.append("")
|
|
parts.append("")
|
|
parts.append("Detalle por fichero:")
|
|
parts.append(_BORDER)
|
|
for r in issues:
|
|
parts.append(r.full_report())
|
|
parts.append("")
|
|
return "\n".join(parts)
|
|
|
|
def _warning_categories(self, results):
|
|
categories = [
|
|
("Extensión incorrecta", "validate", lambda w: "Extensión incorrecta" in w, ["convert"], "convertido"),
|
|
("Basura detectada", "check_trash", lambda w: True, ["clean"], "limpiado"),
|
|
("Ficheros extraños", "check_foreign", lambda w: True, ["clean"], "limpiado"),
|
|
("Estructura anidada", "check_nested", lambda w: w.startswith("Imágenes en subdirectorio: "), ["clean"], "aplanado"),
|
|
("Estructura compleja", "check_nested", lambda w: w.startswith("Múltiples subdirectorios"), [], None),
|
|
("Numeración de páginas", "check_page_numbering", lambda w: True, ["normalize_pages"], "renumerado"),
|
|
("Imágenes mezcladas", "check_image_extensions", lambda w: True, ["normalize_images", "convert_images"], "normalizado"),
|
|
("Sin ComicInfo.xml", "check_comicinfo", lambda w: True, [], None),
|
|
]
|
|
output = []
|
|
for label, step_name, predicate, resolver_steps, fix_label in categories:
|
|
entries = []
|
|
for r in results:
|
|
msgs = [
|
|
w for s in r.steps
|
|
if s.step == step_name
|
|
for w in s.warnings
|
|
if predicate(w)
|
|
]
|
|
if msgs:
|
|
resolved = bool(resolver_steps) and any(
|
|
s.step in resolver_steps and s.changed for s in r.steps
|
|
)
|
|
annotation = fix_label if resolved else ""
|
|
if step_name == "check_comicinfo":
|
|
entries.append((r.original_path, "", annotation))
|
|
elif step_name == "check_trash":
|
|
items = [w.removeprefix("Basura detectada: ") for w in msgs]
|
|
entries.append((r.original_path, ", ".join(items), annotation))
|
|
elif step_name == "check_foreign":
|
|
items = [w.removeprefix("Fichero extraño: ") for w in msgs]
|
|
entries.append((r.original_path, ", ".join(items), annotation))
|
|
elif step_name == "check_nested":
|
|
entries.append((r.original_path, msgs[0], annotation))
|
|
else:
|
|
entries.append((r.original_path, msgs[0], annotation))
|
|
if entries:
|
|
output.append((label, entries))
|
|
return output
|