76 lines
2.1 KiB
Python
76 lines
2.1 KiB
Python
# core/archive.py
|
|
|
|
import os
|
|
import zipfile
|
|
import rarfile
|
|
import shutil
|
|
|
|
|
|
class ArchiveError(Exception):
|
|
pass
|
|
|
|
|
|
def detect_real_format(path: str) -> str | None:
|
|
"""Devuelve 'zip', 'rar' o None según los magic bytes del archivo."""
|
|
try:
|
|
zipfile.ZipFile(path).close()
|
|
return "zip"
|
|
except Exception:
|
|
pass
|
|
|
|
try:
|
|
rarfile.RarFile(path).close()
|
|
return "rar"
|
|
except Exception:
|
|
pass
|
|
|
|
return None
|
|
|
|
|
|
def open_archive(path: str):
|
|
"""Devuelve un ZipFile o RarFile abierto en modo lectura."""
|
|
fmt = detect_real_format(path)
|
|
if fmt == "zip":
|
|
return zipfile.ZipFile(path, "r")
|
|
if fmt == "rar":
|
|
return rarfile.RarFile(path, "r")
|
|
raise ArchiveError(f"Formato desconocido o archivo corrupto: {path}")
|
|
|
|
|
|
def list_archive_names(path: str) -> list[str]:
|
|
"""Lista los miembros del archivo sin extraerlo. Lanza ArchiveError si falla."""
|
|
with open_archive(path) as arc:
|
|
return arc.namelist()
|
|
|
|
|
|
def extract_archive(path: str, dest_dir: str) -> str:
|
|
"""Extrae el archivo en dest_dir. Devuelve dest_dir."""
|
|
archive = open_archive(path)
|
|
try:
|
|
archive.extractall(dest_dir)
|
|
finally:
|
|
archive.close()
|
|
return dest_dir
|
|
|
|
|
|
def repack_as_cbz(source_dir: str, target_path: str) -> None:
|
|
"""Empaqueta todos los archivos de source_dir en un CBZ (ZIP deflated).
|
|
|
|
Escribe primero a un fichero temporal (.tmp) en el mismo directorio y
|
|
hace un os.replace() atómico al final, de modo que una interrupción
|
|
nunca deja un CBZ parcial ni destruye el original.
|
|
"""
|
|
tmp_path = target_path + ".tmp"
|
|
try:
|
|
with zipfile.ZipFile(tmp_path, "w", zipfile.ZIP_DEFLATED) as zf:
|
|
for root, _, files in os.walk(source_dir):
|
|
for f in files:
|
|
full = os.path.join(root, f)
|
|
rel = os.path.relpath(full, source_dir)
|
|
zf.write(full, rel)
|
|
os.replace(tmp_path, target_path)
|
|
except BaseException:
|
|
if os.path.exists(tmp_path):
|
|
os.remove(tmp_path)
|
|
raise
|