diff --git a/.gitignore b/.gitignore index 5a9dd01..b0f2192 100644 --- a/.gitignore +++ b/.gitignore @@ -1,2 +1,2 @@ -zxdbenv -__pycache__ \ No newline at end of file +__pycache__ +.venv \ No newline at end of file diff --git a/CLAUDE.md b/CLAUDE.md new file mode 100644 index 0000000..da24f23 --- /dev/null +++ b/CLAUDE.md @@ -0,0 +1,80 @@ +# CLAUDE.md + +This file provides guidance to Claude Code (claude.ai/code) when working with code in this repository. + +## Overview + +This is a Python script that downloads ZX Spectrum game files from the [ZXDB](https://github.com/zxdb/ZXDB) open database. It queries a local MySQL instance containing ZXDB data, then downloads and organizes game files (tapes, disks, snapshots, pokes) into a configurable folder structure with caching support. + +## Setup + +**Virtual environment:** +```bash +python3 -m venv .venv +source .venv/bin/activate +pip install -r requirements.txt +``` + +**MySQL database:** Runs in a Docker container. Connect with: +```bash +mysql -h 172.18.0.2 -P 3306 -u root -p +# Password: unJEPimbJddHP8 +# Then: use zxdb +# To import: source /path/to/ZXDB.sql +``` + +**Configuration:** Edit `config.py` directly (tracked in git; credentials are intentionally non-sensitive). Key settings: +- `DB_*` — MySQL connection params +- `DESTINATION_PATH` / `CACHE_PATH` / `TEMP_FILE` — local paths +- `SHOULD_CLEAR_DESTINATION_PATH` — wipes destination before each run +- `SHOULD_SPLIT_MODERN_AND_CLASSIC` / `SHOULD_SORT_BY_YEAR` / `SHOULD_SORT_BY_LETTER` / `SHOULD_SORT_BY_DEVELOPER` — folder organization modes +- `WAIT` / `MIN_WAIT` / `MAX_WAIT` — rate limiting between downloads +- `LAST_CLASSIC_YEAR` — year cutoff for classic/modern split +- `ZXDB_SQL_PATH` — where to place the extracted `ZXDB_mysql.sql` (used by `zxdb/setup/database_import.py`) +- `ZXDB_STATE_FILE` — JSON file tracking the last successful import (github_commit_date + last_import) + +## Running + +```bash +python main.py +``` + +## Architecture + +The code is organized as a Python package (`zxdb/`) orchestrated by `main.py`. There is no global state: data flows as a list of dictionaries between functions. + +**Entry point (`main.py`):** +```python +elements = database.connect(query_index=3) +elements = organizer.process_elements(elements) +filesystem.print_elements(elements, mode=1) +filesystem.clear_destination_folder() +downloader.get_files(elements) +filesystem.remove_empty_directories(config.DESTINATION_PATH) +``` + +**Module structure:** +- `zxdb/database.py` — `load_queries()`, `select()`, `connect()`: connects to MySQL and returns elements list +- `zxdb/organizer.py` — `normalize_path()`, `update_url()`, `url_filename()`, `process_elements()`, `get_final_destination_folder()` +- `zxdb/downloader.py` — `download_file()`, `unzip_file()`, `process_cache_file()`, `get_files(elements)` +- `zxdb/filesystem.py` — `clear_destination_folder()`, `remove_empty_directories()`, `print_elements(elements, mode)`, `print_status()` +- `zxdb/queries.sql` — Four SQL queries (index 0–3), loaded via `Path(__file__).parent / "queries.sql"` +- `zxdb/setup/database_import.py` — `download_zxdb(destination)`: streaming download of `ZXDB_mysql.sql.zip` from GitHub + ZIP extraction; runnable via `python -m zxdb.setup.database_import` +- `zxdb/setup/docker_manager.py` — `ensure_container()`, `wait_for_mysql(container, timeout)`, `db_exists(container)`, `import_sql(container, sql_path)`: Docker container management and SQL import +- `zxdb/setup/__main__.py` — Full setup orchestrator: Docker → MySQL ready → GitHub change detection → download → import → save state. Run via `python -m zxdb.setup` + +**SQL queries (`zxdb/queries.sql`):** Four queries (index 0–3). Query 3 (current default) filters for ZX-Spectrum machines only (`m.text like 'ZX-%'`) and returns only tape/disk/snapshot/poke file types. + +**Folder organization (`get_final_destination_folder()`):** Builds a path from up to four nested levels based on the sort flags in `config.py`: +- `folder1`: `classics`/`modern`, `by_developer`, `by_year`, or empty +- `folder2`: developer letter + developer name (e.g., `D/Dinamic`) or empty +- `folder3`: release year or `unknown` +- `folder4`: first letter of game title (e.g., `A`, `0-9`) or empty + +**URL resolution (`update_url()`):** URLs from the DB are relative paths. They get prefixed based on their start: `/zxdb` → spectrumcomputing.co.uk, `/pub` → WOS mirror, `/nvg` → NVG mirror. + +**Cache:** Files are stored in `CACHE_PATH///`. The cache key is the original `game_folder_name` (title + year + developer), independent of the current sort configuration, so changing sort options reuses cached files. + +**File types stored at game root** (no subfolder): Tape image, Disk image, Snapshot image, POK pokes file. All other file types go into a named subfolder. + +**ZIP extraction (`unzip_file()`):** Only extracts files matching known emulator formats: `.tap`, `.tzx`, `.stl`, `.sna`, `.z80`, `.dsk`, `.fdi`, `.trd`, `.img`, `.mgt`, `.szx`, `.dck`. diff --git a/README.MD b/README.MD index 8d9ba61..b203c67 100644 --- a/README.MD +++ b/README.MD @@ -1,52 +1,109 @@ -## Anotacions -Per a connectar-se a la base de dades local (que està al container de MySQL) +# ZXDB Downloader - mysql -h 172.18.0.2 -P 3306 -u root -p +Script que descarga juegos de ZX Spectrum a partir de la base de datos [ZXDB](https://github.com/zxdb/ZXDB) (MySQL en Docker). -Password +## Requisitos - unJEPimbJddHP8 +- Python 3.x +- Docker -Si la base de dades ja existeix, per seleccionar-la +## Setup - use zxdb +### Entorno virtual -Per descarregar una nova versió, baixar-la desde -[GitHub - zxdb/ZXDB: Open database with historical information about Sinclair machines](https://github.com/zxdb/ZXDB) -Per executar el fitxer .sql, amb la base de dades zxdb activa: +```bash +python3 -m venv .venv +source .venv/bin/activate +pip install -r requirements.txt +``` - source /ruta/fichero.sql +### Configuración -Per instalar els requisits del script de python +Edita `config.py` en la raíz del proyecto. Variables principales: - pip install -r requirements.txt +| Variable | Descripción | +|----------|-------------| +| `DB_*` | Parámetros de conexión a MySQL | +| `DESTINATION_PATH` | Carpeta donde se depositan los juegos organizados | +| `CACHE_PATH` | Carpeta de caché (evita re-descargar ficheros) | +| `TEMP_FILE` | Fichero temporal durante la descarga | +| `SHOULD_CLEAR_DESTINATION_PATH` | Si es `True`, limpia el destino antes de cada ejecución | +| `SHOULD_SPLIT_MODERN_AND_CLASSIC` | Divide los juegos en carpetas `classics`/`modern` | +| `SHOULD_SORT_BY_YEAR` | Organiza por año de lanzamiento | +| `SHOULD_SORT_BY_LETTER` | Organiza por primera letra del título | +| `SHOULD_SORT_BY_DEVELOPER` | Organiza por desarrollador | +| `WAIT` / `MIN_WAIT` / `MAX_WAIT` | Control de velocidad de descarga | +| `LAST_CLASSIC_YEAR` | Año de corte entre clásicos y modernos | +| `ZXDB_SQL_PATH` | Ruta temporal donde se extrae `ZXDB_mysql.sql` | +| `ZXDB_STATE_FILE` | Fichero JSON con el estado del último import | -Per crear un entorn virtual "zxdbenv" +## Ejecución - python3 -m venv zxdbenv +```bash +source .venv/bin/activate +python main.py +``` -Per activar el entorn "zxdbenv" +`main.py` hace el setup automáticamente al arrancar: +1. Asegura que el contenedor MySQL Docker existe y está corriendo (lo crea si no existe). +2. Comprueba en GitHub si hay una versión más nueva de ZXDB. +3. Si es necesario: descarga `ZXDB_mysql.sql.zip` e importa en MySQL. +4. Descarga y organiza los juegos. +5. Al terminar (o con Ctrl+C), detiene el contenedor Docker. - source zxdbenv/bin/activate +El estado del último import se guarda en `/tmp/zxdb_state.json`. Una segunda ejecución sin cambios en GitHub omite el import. -Exemple de fitxer .env +Para conectar manualmente a MySQL mientras el contenedor está en marcha: - DB_USER=root - DB_PASSWORD=unJEPimbJddHP8 - DB_HOST=172.18.0.2 - DB_PORT=3306 - DB_NAME=zxdb - - DESTINATION_PATH=/home/sergio/zx/zxdb/games/ - CACHE_PATH=/home/sergio/zx/zxdb/cache/games/ - TEMP_FILE=/tmp/zxdb.download.tmp - - SHOULD_CLEAR_DESTINATION_PATH=True - SHOULD_SPLIT_MODERN_AND_CLASSIC=True - SHOULD_SORT_BY_YEAR=True - SHOULD_SORT_BY_LETTER=True - - WAIT=True - MIN_WAIT=2 - MAX_WAIT=4 - LAST_CLASSIC_YEAR=1993 +```bash +mysql -h 127.0.0.1 -P 3306 -u root -p +# Contraseña: unJEPimbJddHP8 +# Luego: use zxdb +``` + +## Estructura del proyecto + +``` +zxdb/ +├── main.py # Punto de entrada: orquesta el flujo principal +├── config.py # Configuración (credenciales y rutas locales, no en git) +├── requirements.txt # Dependencias Python +│ +└── zxdb/ # Paquete con la lógica del programa + ├── __init__.py + ├── database.py # Conexión a MySQL y ejecución de consultas + ├── queries.sql # Consultas SQL (índices 0-3) + ├── organizer.py # Construcción de rutas y enriquecimiento de elementos + ├── downloader.py # Descargas, caché y descompresión de ficheros + ├── filesystem.py # Utilidades de sistema de ficheros y salida por pantalla + └── setup/ + ├── __init__.py + ├── __main__.py # Orquestador: Docker + detección de cambios + import + ├── database_import.py # Descarga y extracción de ZXDB_mysql.sql.zip + └── docker_manager.py # Gestión del contenedor MySQL e import SQL +``` + +## Módulos + +### `zxdb/database.py` +Gestiona la conexión a MySQL. `connect(query_index)` devuelve la lista de elementos obtenidos de la base de datos. + +### `zxdb/organizer.py` +Construye rutas y enriquece cada elemento con metadatos: `game_folder_name`, URL completa, nombre de fichero, subcarpeta y flag de ZIP. `get_final_destination_folder()` aplica la lógica de organización según la configuración. + +### `zxdb/downloader.py` +Gestiona las descargas desde internet y el uso de la caché. `get_files(elements)` itera sobre todos los elementos, sirve desde caché si es posible, o descarga y almacena en caché. + +### `zxdb/filesystem.py` +Utilidades de sistema de ficheros: limpiar carpeta de destino, eliminar directorios vacíos, e imprimir el progreso por pantalla. + +### `zxdb/queries.sql` +Cuatro consultas SQL (índices 0-3). La consulta 3 (usada por defecto) filtra juegos de ZX Spectrum (`m.text like 'ZX-%'`) y devuelve solo ficheros de cinta, disco, snapshot y pokes. + +### `zxdb/setup/` — Setup de la base de datos + +Invocado automáticamente por `main.py`. También puede ejecutarse de forma independiente con `python -m zxdb.setup`. + +- **`__main__.py`**: Orquestador del flujo completo (Docker + detección de cambios + import). +- **`docker_manager.py`**: `ensure_container()`, `wait_for_mysql()`, `db_exists()`, `import_sql()`, `stop_container()` — gestión del contenedor Docker y del import SQL. +- **`database_import.py`**: `download_zxdb(destination)` — descarga `ZXDB_mysql.sql.zip` desde GitHub (streaming, ~150 MB) y extrae `ZXDB_mysql.sql`. Ejecutable directamente con `python -m zxdb.setup.database_import`. diff --git a/config.py b/config.py index 42d87a6..dba3d09 100644 --- a/config.py +++ b/config.py @@ -1,7 +1,6 @@ # Configuración de base de datos DB_USER = 'root' DB_PASSWORD = 'unJEPimbJddHP8' -DB_HOST = '172.18.0.2' DB_PORT = 3306 DB_NAME = 'zxdb' @@ -24,3 +23,9 @@ MAX_WAIT = 4 # Cantidad de segundos máxima a esperar entre descargas # Año usado para la separación entre juegos clásicos y modernos LAST_CLASSIC_YEAR = 1993 + +# Ruta donde se deposita ZXDB_mysql.sql tras la descarga desde GitHub +ZXDB_SQL_PATH = '/tmp/ZXDB_mysql.sql' + +# Fichero de estado del setup de ZXDB (registro del último import) +ZXDB_STATE_FILE = '/tmp/zxdb_state.json' diff --git a/main.py b/main.py new file mode 100644 index 0000000..ff5ef58 --- /dev/null +++ b/main.py @@ -0,0 +1,28 @@ +import logging +import sys + +import config +from zxdb import database, organizer, filesystem, downloader +from zxdb.setup.__main__ import main as run_setup +from zxdb.setup.docker_manager import stop_container, CONTAINER_NAME + +logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') + + +def main(): + try: + if run_setup() != 0: + sys.exit(1) + + elements = database.connect(query_index=3) + elements = organizer.process_elements(elements) + filesystem.print_elements(elements, mode=1) + filesystem.clear_destination_folder() + downloader.get_files(elements) + filesystem.remove_empty_directories(config.DESTINATION_PATH) + finally: + stop_container(CONTAINER_NAME) + + +if __name__ == "__main__": + main() diff --git a/requirements.txt b/requirements.txt index 4763400..6eb86cb 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,4 +1,3 @@ -mysql-connector-python==9.1.0 -requests==2.32.3 -logging==0.4.9.6 -unidecode==1.3.8 \ No newline at end of file +mysql-connector-python==9.6.0 +requests==2.32.5 +unidecode==1.4.0 diff --git a/zxdb.py b/zxdb.py deleted file mode 100644 index 39bfe86..0000000 --- a/zxdb.py +++ /dev/null @@ -1,646 +0,0 @@ -## Script para descargar ficheros de spectrum a partir de zxdb - -## Imports utilizados en el script -import config -import logging -import mysql.connector -import os -import random -import requests -import shutil -import sqlite3 -import time -import zipfile -from mysql.connector import errorcode -from unidecode import unidecode -from urllib.parse import urlparse -from urllib.request import urlretrieve -from requests.adapters import HTTPAdapter -from urllib3.util.retry import Retry - -# Configuración del logger -logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') - -# Configuración de base de datos -CONFIG_DB = { - "user": config.DB_USER, - "password": config.DB_PASSWORD, - "host": config.DB_HOST, - "port": config.DB_PORT, - "database": config.DB_NAME, - "raise_on_warnings": True, -} - -# Direcciones de internet de donde descargar los datos -URL_PREFIX = { - "spectrum_computing": r"https://spectrumcomputing.co.uk", - "wos": r"https://php.sustancia.synology.me/wos", - "nvg": r"https://php.sustancia.synology.me/nvg", -} - -# Rutas locales donde depositar los resultados -DESTINATION_PATH = config.DESTINATION_PATH -CACHE_PATH = config.CACHE_PATH -TEMP_FILE = config.TEMP_FILE - -# Parámetros de configuración -SHOULD_CLEAR_DESTINATION_PATH = config.SHOULD_CLEAR_DESTINATION_PATH # Establece si se limpia primero la carpeta de destino -SHOULD_SPLIT_MODERN_AND_CLASSIC = config.SHOULD_SPLIT_MODERN_AND_CLASSIC # Separa los juegos en dos carpetas a partir de un año especificado -SHOULD_SORT_BY_YEAR = config.SHOULD_SORT_BY_YEAR # Separa los juegos por carpetas en función de su año de lanzamiento -SHOULD_SORT_BY_LETTER = config.SHOULD_SORT_BY_LETTER # Separa los juegos por carpetas en función de su primera letra -SHOULD_SORT_BY_DEVELOPER = config.SHOULD_SORT_BY_DEVELOPER # Separa los juegos por desarrollador -WAIT = config.WAIT # Establece una pausa aleatoria entre descargas - -# Leer variables numéricas -MIN_WAIT = config.MIN_WAIT # Cantidad de segundos mínima a esperar entre descargas -MAX_WAIT = config.MAX_WAIT # Cantidad de segundos máxima a esperar entre descargas -LAST_CLASSIC_YEAR = config.LAST_CLASSIC_YEAR # Año usado para la separación entre juegos clásicos y modernos - -# Tipos de fichero que se guardan en la carpeta raíz del juego -FILETYPES_ON_ROOT = [ - "Tape image", - "Disk image", - "Snapshot image", - "POK pokes file", -] - -# Resto de variables globales -ELEMENTS = [] - -# Carga un fichero con consultas SQL -def load_queries(file_path): - with open(file_path, 'r') as file: - queries = file.read().split(';') - return [query.strip() for query in queries if query.strip()] - -# Carga las consultas desde el archivo -QUERIES = load_queries('queries.sql') - -def select(cursor, query_index=0): - """ - Ejecuta la consulta seleccionada y procesa los resultados. - - Parámetros: - cursor (MySQLCursor): El cursor de la base de datos para ejecutar la consulta. - query_index (int): El índice de la consulta a ejecutar en la lista de consultas QUERIES (predeterminado es 0). - - Comportamiento: - - Ejecuta la consulta indicada por `query_index` usando el cursor proporcionado. - - Procesa los resultados de la consulta y los guarda en la lista ELEMENTS. - - Cada fila de resultados se convierte en un diccionario con las claves 'title', 'developer', 'release_year', 'url', y 'filetype'. - - Registra un mensaje informativo indicando que la consulta se ejecutó correctamente y el número de resultados obtenidos. - - Ejemplo: - >>> cursor = connection.cursor() - >>> select(cursor, query_index=1) - """ - - # Ejecutar la consulta seleccionada - cursor.execute(QUERIES[query_index]) - - # Procesar los resultados - for row in cursor: - element = { - "title": unidecode(row[0]), - "developer": unidecode(row[1]), - "release_year": row[2], - "url": row[3], - "filetype": row[4], - } - ELEMENTS.append(element) - - # Registro de consulta ejecutada - logging.info(f"Consulta {query_index} ejecutada correctamente con {len(ELEMENTS)} resultados.") - -def connect(query_index=0): - """ - Establece la conexión a la base de datos y ejecuta la consulta. - - Parámetros: - query_index (int): Índice de la consulta a ejecutar (predeterminado es 0). - - Excepciones: - mysql.connector.Error: Captura y maneja errores específicos de MySQL. - Exception: Captura y maneja cualquier otro error inesperado. - - Comportamiento: - - Conecta a la base de datos usando los parámetros de configuración en CONFIG_DB. - - Ejecuta la consulta definida por la función select utilizando el cursor. - - Maneja errores de acceso denegado y base de datos no encontrada, así como cualquier otro error inesperado. - - Ejemplo: - >>> connect(1) - """ - - try: - with mysql.connector.connect(**CONFIG_DB) as connection: - with connection.cursor() as cursor: - # Ejecutar la consulta 1 - select(cursor, query_index=query_index) - except mysql.connector.Error as err: - if err.errno == errorcode.ER_ACCESS_DENIED_ERROR: - logging.error("Algo está mal con tu nombre de usuario o contraseña.") - elif err.errno == errorcode.ER_BAD_DB_ERROR: - logging.error("La base de datos no existe.") - else: - logging.error(err) - except Exception as e: - logging.error(f"Error inesperado: {e}") - -def update_url(element, url_prefix): - """ - Añade un prefijo a la URL en el diccionario `element` según el prefijo proporcionado en `url_prefix`. - - Parámetros: - element (dict): Diccionario que contiene la clave 'url'. - url_prefix (dict): Diccionario que contiene los prefijos de las URLs para 'spectrum_computing', 'wos' y 'nvg'. - - Modifica: - element (dict): Actualiza el valor de 'url' en el diccionario `element` con el prefijo correspondiente. - - Ejemplo: - >>> element = {"url": "/zxdb/example"} - >>> url_prefix = {"spectrum_computing": "https://spectrumcomputing.co.uk", "wos": "https://php.sustancia.synology.me/wos", "nvg": "https://php.sustancia.synology.me/nvg"} - >>> update_url(element, url_prefix) - >>> print(element["url"]) - https://spectrumcomputing.co.uk/zxdb/example - """ - url = element["url"] - if url.startswith("/zxdb"): - element["url"] = url_prefix["spectrum_computing"] + url - elif url.startswith("/pub"): - element["url"] = url_prefix["wos"] + url[4:] - elif url.startswith("/nvg"): - element["url"] = url_prefix["nvg"] + url[4:] - -def process_elements(): - """ - Procesa todos los elementos, modificando cada uno de sus parámetros. - - Comportamiento: - - Construye el nombre de la carpeta raíz basado en el título y año de lanzamiento, o título, año de lanzamiento y desarrollador. - - Normaliza el nombre de la carpeta raíz. - - Añade el prefijo a la URL y normaliza los enlaces de "wos". - - Obtiene el nombre del fichero a partir de la URL de descarga. - - Establece la subcarpeta dentro de la raíz basada en el tipo de archivo. - - Verifica si el fichero está en formato .zip. - - Calcula el nombre del fichero si es un zip. - - Modifica: - - ELEMENTS (list): Actualiza cada diccionario en ELEMENTS con las claves 'game_folder_name', 'url', 'file_name', 'subfolder', 'is_zip', y 'non_zip_file_name'. - - Ejemplo: - >>> process_elements() - """ - global ELEMENTS - for i in range(len(ELEMENTS)): - # Construye el nombre de la carpeta raiz - ELEMENTS[i]["game_folder_name"] = f"{ELEMENTS[i]['title']} ({ELEMENTS[i]['release_year']})({ELEMENTS[i]['developer']})" - ELEMENTS[i]["game_folder_name"] = normalize_path(ELEMENTS[i]["game_folder_name"]) - - # Añade el prefijo a la url y normaliza los enlaces de "wos" - update_url(ELEMENTS[i], URL_PREFIX) - - # Obtiene el nombre del fichero a partir de la url de descarga - ELEMENTS[i]["file_name"] = url_filename(ELEMENTS[i]["url"]) - - # Establece la subcarpeta dentro de la raiz - ELEMENTS[i]["subfolder"] = normalize_path(ELEMENTS[i]["filetype"]) if ELEMENTS[i]["filetype"] not in FILETYPES_ON_ROOT else "" - - # Averigua si el fichero está en formato .zip - ELEMENTS[i]["is_zip"] = ELEMENTS[i]["file_name"].lower().endswith(".zip") - - # Calcula el nombre del fichero si es un zip - ELEMENTS[i]["non_zip_file_name"] = ELEMENTS[i]["file_name"][:-4] if ELEMENTS[i]["is_zip"] else ELEMENTS[i]["file_name"] - -def url_filename(url): - """ - Devuelve el fichero que forma la parte final de una URL. - - Parámetros: - url (str): La URL de la cual se quiere extraer el nombre del fichero. - - Retorna: - str: El nombre del fichero que forma la parte final de la URL. - - Ejemplo: - >>> url = "https://example.com/path/to/file.txt" - >>> url_filename(url) - 'file.txt' - """ - parsed_url = urlparse(url) - path = parsed_url.path - filename = os.path.basename(path) - return filename - -def download_file(url, destination): - """ - Descarga un fichero a partir de una URL. - - Parámetros: - url (str): La URL del fichero que se desea descargar. - destination (str): La ruta de destino donde se guardará el fichero descargado. - - Retorna: - bool: True si la descarga fue exitosa, False en caso de error. - - Comportamiento: - - Crea una sesión de requests con un adaptador que permite reintentos en caso de fallos. - - Intenta descargar el fichero desde la URL especificada. - - Guarda el contenido del fichero en la ruta de destino especificada. - - Maneja errores de conexión y otros problemas de la red, registrando cualquier error ocurrido. - - Ejemplo: - >>> success = download_file("https://example.com/file.zip", "/path/to/destination/file.zip") - >>> if success: - >>> print("Descarga completada con éxito.") - >>> else: - >>> print("Error en la descarga.") - """ - session = requests.Session() - retries = Retry(total=3, backoff_factor=1, status_forcelist=[500, 502, 503, 504]) - session.mount('https://', HTTPAdapter(max_retries=retries)) - - try: - response = session.get(url, timeout=10) - response.raise_for_status() - with open(destination, 'wb') as file: - file.write(response.content) - return True - except requests.exceptions.RequestException as e: - logging.error(f"Error al descargar el archivo: {e}") - return False - -def unzip_file(src, dst): - """ - Descomprime los ficheros que coinciden con la lista de extensiones. - - Parámetros: - src (str): Ruta del archivo ZIP que se desea descomprimir. - dst (str): Ruta del directorio donde se extraerán los ficheros. - - Extensiones soportadas: - .tap, .tzx, .stl, .sna, .z80, .dsk, .fdi, .trd, .img, .mgt, .szx, .dck - - Comportamiento: - - Abre el archivo ZIP especificado por `src`. - - Extrae los ficheros que coinciden con las extensiones definidas en el directorio especificado por `dst`. - - Maneja errores de archivo ZIP corrupto, archivo no encontrado y otros errores generales, registrándolos adecuadamente. - - Excepciones: - zipfile.BadZipFile: El archivo ZIP está corrupto. - FileNotFoundError: El archivo ZIP no se encontró. - Exception: Cualquier otro error inesperado. - - Ejemplo: - >>> unzip_file('/path/to/archivo.zip', '/path/to/destination/') - """ - archive = src - directory = dst - tape_file_formats = (".tap", ".tzx") - snapshot_file_formats = (".stl", ".sna", ".z80") - disk_file_formats = (".dsk", ".fdi", ".trd", ".img", ".mgt") - emulator_specific_formats = (".szx", ".dck") - - # Sumar todas las listas anteriores - extensions = tape_file_formats + snapshot_file_formats + disk_file_formats + emulator_specific_formats - - try: - with zipfile.ZipFile(archive, "r") as zip_file: - for file in zip_file.namelist(): - if file.lower().endswith(extensions): - zip_file.extract(file, directory) - # logging.info(f"Archivo {file} extraído a {directory}") - except zipfile.BadZipFile: - logging.error("El archivo ZIP está corrupto.") - except FileNotFoundError: - logging.error("El archivo ZIP no se encontró.") - except Exception as e: - logging.error(f"Ocurrió un error: {e}") - -def print_status(current_file, total_files, element, total_files_width, status="cached"): - """ - Imprime el estado de un archivo en el proceso de descarga. - - Parámetros: - current_file (int): El número del archivo actual en el proceso de descarga. - total_files (int): El número total de archivos en el proceso de descarga. - element (dict): Diccionario que contiene información del archivo actual, incluyendo 'file_name' y 'filetype'. - total_files_width (int): El ancho del campo para el número total de archivos, para un formato de impresión alineado. - status (str): El estado del archivo (por defecto es "cached"). - - Comportamiento: - - Imprime el estado del archivo actual en el proceso de descarga de una manera formateada y alineada. - - Ejemplo: - >>> element = {"file_name": "example.zip", "filetype": "zip"} - >>> print_status(1, 100, element, 3) - ( 1 / 100) : cached : example.zip (zip) - """ - print( - "({:{width}} / {}) : {:<10} : {} ({})".format( - current_file, - total_files, - status, - element["file_name"], - element["filetype"], - width=total_files_width, - ) - ) - -def get_final_destination_folder(title, year, developer): - """ - Compone la carpeta de destino en función de varios parámetros. - - Parámetros: - developer (str): Nombre del desarrollador del juego. - year (int o str): Año de lanzamiento del juego. - game_folder_name (str): Nombre de la carpeta raíz. - - Retorna: - str: La ruta completa de la carpeta de destino final. - - Comportamiento: - - Determina la carpeta base (`folder1`) en función de si el juego es clásico o moderno, el desarrollador o el año. - - Determina la subcarpeta (`folder2`) basada en el desarrollador, si está habilitado. - - Determina la subcarpeta (`folder3`) basada en el año de lanzamiento, si está habilitado. - - Determina la subcarpeta (`folder4`) basada en la primera letra del nombre de la carpeta raíz, si está habilitado. - - Combina las carpetas calculadas y la carpeta raíz para obtener la ruta final de la carpeta de destino. - - Ejemplo: - >>> get_final_destination_folder("Nintendo", 1985, "Super Mario") - 'by_developer/N/Nintendo/1985/Super Mario' - """ - - # Compone el nombre de la carpeta de destino del juego - game_folder_name = f"{title} ({year})" if SHOULD_SORT_BY_DEVELOPER else f"{title} ({year})({developer})" - game_folder_name = normalize_path(game_folder_name) - - # Carpeta basada en los años de vida comercial del spectrum o el tipo de agrupación - folder1 = "" - if SHOULD_SPLIT_MODERN_AND_CLASSIC: - folder1 = "modern" if year == "none" or year is None or year > LAST_CLASSIC_YEAR else "classics" - elif SHOULD_SORT_BY_DEVELOPER: - folder1 = "by_developer" - elif SHOULD_SORT_BY_YEAR: - folder1 = "by_year" - - # Carpeta basada en el desarrollador - folder2 = "" - if SHOULD_SORT_BY_DEVELOPER: - developer = developer or "" - folder2 = os.path.join("0-9", developer) if developer[0].isdigit() else os.path.join(developer[0].upper(), developer) - - # Carpeta basada en el año de lanzamiento - folder3 = "" - if SHOULD_SORT_BY_YEAR: - folder3 = str(year) if year not in [None, "none"] else "unknown" - - # Carpeta basada en la primera letra del nombre de la carpeta de destino del juego - folder4 = "" - if SHOULD_SORT_BY_LETTER: - folder4 = "0-9" if game_folder_name[0].isdigit() else game_folder_name[0].upper() - - # Combina los prefijos y la carpeta raíz para obtener la carpeta de destino final - return os.path.join(folder1, folder2, folder3, folder4, game_folder_name) - -def process_cache_file(cache_file, destination_subfolder, destination_file, element): - """ - Crea las carpetas de destino y copia o extrae el archivo de la caché. - - Parámetros: - cache_file (str): Ruta del archivo en la caché. - destination_subfolder (str): Ruta del subdirectorio de destino donde se guardarán los archivos extraídos. - destination_file (str): Ruta del archivo de destino si no se extrae. - element (dict): Diccionario que contiene información del archivo, incluyendo si tiene subcarpeta específica. - - Comportamiento: - - Crea las carpetas de destino necesarias. - - Si el archivo en caché es un zip y no tiene subcarpeta especificada, descomprime el archivo en el subdirectorio de destino. - - Si no, copia el archivo en caché al archivo de destino especificado. - - Ejemplo: - >>> process_cache_file('/path/to/cache.zip', '/path/to/destination/subfolder', '/path/to/destination/file', element) - """ - os.makedirs(destination_subfolder, exist_ok=True) - if cache_file.endswith(".zip") and element["subfolder"] == "": - unzip_file(cache_file, destination_subfolder) - else: - shutil.copyfile(cache_file, destination_file) - -def get_files(): - """ - Obtiene los ficheros de la consulta desde internet o desde la caché y los deposita en la carpeta destino, - descomprimiendo los archivos necesarios. - - Comportamiento: - - Presenta en pantalla el progreso de la descarga. - - Para cada elemento en ELEMENTS: - - Calcula la carpeta de clasificación basada en el desarrollador, año de lanzamiento y carpeta raíz. - - Determina las rutas de la carpeta de destino y de caché. - - Si el fichero no existe en la carpeta de destino: - - Si el fichero existe en la caché, lo copia al destino. - - Si no existe en la caché, lo descarga y lo guarda en la caché y en el destino. - - Si el fichero ya existe en el destino, lo omite. - - Maneja errores durante el procesamiento, registrándolos adecuadamente. - - Ejemplo: - >>> get_files() - """ - # Variables para la presentación en pantalla de la descarga - current_file = 0 - total_files = len(ELEMENTS) - total_files_width = len(str(total_files)) - last_game_folder = "" - - for element in ELEMENTS: - classification_folder = get_final_destination_folder(element["title"], element["release_year"], element["developer"]) - - destination_folder = os.path.join(DESTINATION_PATH, classification_folder) - destination_subfolder = os.path.join(destination_folder, element["subfolder"]) - cache_folder = os.path.join(CACHE_PATH, element["game_folder_name"]) - cache_subfolder = os.path.join(cache_folder, element["subfolder"]) - - # Ruta completa hasta el fichero de destino y de caché - destination_file = os.path.join(destination_subfolder, element["file_name"]) - cache_file = os.path.join(cache_subfolder, element["file_name"]) - - # Actualiza las variables de presentación - current_file += 1 - - if element["game_folder_name"] != last_game_folder: - print("\n{}".format(element["game_folder_name"])) - last_game_folder = element["game_folder_name"] - - try: - # Si el fichero no existe en la carpeta de destino - if not os.path.isfile(destination_file) and not os.path.isfile(os.path.join(destination_subfolder, element["non_zip_file_name"])): - - # Si existe en la caché, lo copia - if os.path.isfile(cache_file): - process_cache_file(cache_file, destination_subfolder, destination_file, element) - print_status(current_file, total_files, element, total_files_width, status="cached") - - # Si no existe en la caché, lo descarga - else: - if download_file(element["url"], TEMP_FILE): - print_status(current_file, total_files, element, total_files_width, status="downloaded") - if os.path.isfile(TEMP_FILE): - # Mueve el fichero temporal descargado a la cache - os.makedirs(cache_subfolder, exist_ok=True) - shutil.move(TEMP_FILE, cache_file) - # Copia el fichero de la cache al destino - if os.path.isfile(cache_file): - process_cache_file(cache_file, destination_subfolder, destination_file, element) - else: - print_status(current_file, total_files, element, total_files_width, status="not found") - - if WAIT: - time.sleep(random.randint(MIN_WAIT, MAX_WAIT)) - - # Si el fichero ya existe en el destino, no hace nada - else: - print_status(current_file, total_files, element, total_files_width, status="skipping") - - except Exception as e: - logging.error(f"Error al procesar el fichero {element['file_name']}: {e}") - -def normalize_path(path): - """ - Elimina los caracteres ilegales de la cadena de texto. - - Parámetros: - path (str): La cadena de texto que se desea normalizar. - - Retorna: - str: La cadena de texto normalizada sin los caracteres ilegales. - - Comportamiento: - - Reemplaza los caracteres ilegales en la cadena de texto con un carácter vacío. - - Los caracteres ilegales incluyen: <, >, :, ", /, \\, |, ?, *. - - Utiliza la función unidecode para manejar caracteres Unicode. - - Ejemplo: - >>> normalize_path("file:name/with|illegal*chars?") - 'filenamewithillegalchars' - """ - illegal_chars = ["<", ">", ":", '"', "/", "\\", "|", "?", "*"] - replace_with = "" - for char in illegal_chars: - path = unidecode(path.replace(char, replace_with)) - return path - - -def remove_empty_directories(path): - """ - Elimina los subdirectorios vacíos. - - Parámetros: - path (str): La ruta del directorio raíz desde donde se iniciará la eliminación de subdirectorios vacíos. - - Comportamiento: - - Recorre la estructura de directorios desde el `path` especificado, de forma descendente. - - Intenta eliminar cada subdirectorio encontrado. - - Registra un mensaje informativo si un subdirectorio vacío es eliminado. - - Si no se puede eliminar un directorio porque no está vacío u ocurre otro error, el error es ignorado. - - Ejemplo: - >>> remove_empty_directories('/path/to/directory') - """ - for root, dirs, files in os.walk(path, topdown=False): - for dir in dirs: - dir_path = os.path.join(root, dir) - try: - os.rmdir(dir_path) - logging.info(f"Directorio vacío eliminado: {dir_path}") - except OSError as e: - # El directorio no está vacío o ocurrió otro error - pass - -def clear_destination_folder(): - """ - Limpia la carpeta de destino. - - Comportamiento: - - Verifica si la opción de limpiar la carpeta de destino (`SHOULD_CLEAR_DESTINATION_PATH`) está habilitada. - - Si está habilitada, elimina todos los archivos y subdirectorios en la carpeta de destino (`DESTINATION_PATH`). - - Registra un mensaje informativo para cada archivo o directorio eliminado. - - Maneja y registra cualquier error que ocurra durante la eliminación. - - Ejemplo: - >>> clear_destination_folder() - """ - if SHOULD_CLEAR_DESTINATION_PATH: - logging.info("Limpiando la carpeta de destino ...") - for filename in os.listdir(DESTINATION_PATH): - file_path = os.path.join(DESTINATION_PATH, filename) - try: - if os.path.isfile(file_path) or os.path.islink(file_path): - os.unlink(file_path) - logging.info(f"Archivo eliminado: {file_path}") - elif os.path.isdir(file_path): - shutil.rmtree(file_path) - logging.info(f"Directorio eliminado: {file_path}") - except Exception as e: - logging.error(f'No se pudo eliminar {file_path}. Razón: {e}') - -def print_elements(mode=0): - """ - Imprime la lista de elementos en diferentes modos. - - Parámetros: - mode (int): El modo de impresión. - - Si es 0, imprime todos los elementos con sus claves y valores. - - Si es 1, imprime los nombres de las carpetas raíz eliminando duplicados y muestra el número de entradas únicas. - - Comportamiento: - - Modo 0: Recorre todos los elementos en ELEMENTS e imprime cada clave y valor en un formato legible. - - Modo 1: Elimina duplicados basándose en la carpeta raíz ('game_folder_name') y los imprime. Luego, muestra el número total de entradas únicas. - - Ejemplo: - >>> print_elements(0) - >>> print_elements(1) - """ - if mode == 0: - # Primer bucle for - for element in ELEMENTS: - print('') - for key, value in element.items(): - print(key, ':', value) - elif mode == 1: - # Segundo bucle for con eliminación de duplicados - seen = set() - for element in ELEMENTS: - game_folder_name = element['game_folder_name'] - if game_folder_name not in seen: - print(game_folder_name) - seen.add(game_folder_name) - # Imprimir el número de elementos únicos - print(f"Número de entradas: {len(seen)}") - -def main(): - """ - Bucle principal que ejecuta las principales funciones del programa. - - Comportamiento: - - Establece la conexión a la base de datos y ejecuta la consulta con `query_index=3`. - - Procesa todos los elementos, modificando sus parámetros. - - Imprime la lista de elementos en el modo 1, que elimina duplicados y muestra el número total de entradas únicas. - - Limpia la carpeta de destino si la opción está habilitada. - - Obtiene los archivos desde internet o desde la caché, y los deposita en la carpeta de destino, descomprimiendo los necesarios. - - Elimina los subdirectorios vacíos en la carpeta de destino. - - Ejemplo: - >>> main() - """ - - connect(query_index=3) - process_elements() - print_elements(mode=1) - clear_destination_folder() - get_files() - remove_empty_directories(DESTINATION_PATH) - -if __name__ == "__main__": - main() - diff --git a/zxdb/__init__.py b/zxdb/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/zxdb/database.py b/zxdb/database.py new file mode 100644 index 0000000..f3f5eac --- /dev/null +++ b/zxdb/database.py @@ -0,0 +1,79 @@ +import logging +import mysql.connector +from mysql.connector import errorcode +from pathlib import Path +from unidecode import unidecode + +import config + +CONFIG_DB = { + "user": config.DB_USER, + "password": config.DB_PASSWORD, + "host": "127.0.0.1", + "port": config.DB_PORT, + "database": config.DB_NAME, + "raise_on_warnings": True, +} + + +def load_queries(): + """Carga las consultas SQL desde el fichero queries.sql junto a este módulo.""" + file_path = Path(__file__).parent / "queries.sql" + with open(file_path, 'r') as file: + queries = file.read().split(';') + return [query.strip() for query in queries if query.strip()] + + +def select(cursor, query_index=0): + """ + Ejecuta la consulta seleccionada y devuelve los resultados como lista de diccionarios. + + Parámetros: + cursor (MySQLCursor): El cursor de la base de datos para ejecutar la consulta. + query_index (int): El índice de la consulta a ejecutar (predeterminado es 0). + + Retorna: + list: Lista de diccionarios con las claves 'title', 'developer', 'release_year', 'url', 'filetype'. + """ + queries = load_queries() + cursor.execute(queries[query_index]) + + elements = [] + for row in cursor: + element = { + "title": unidecode(row[0]), + "developer": unidecode(row[1]), + "release_year": row[2], + "url": row[3], + "filetype": row[4], + } + elements.append(element) + + logging.info(f"Consulta {query_index} ejecutada correctamente con {len(elements)} resultados.") + return elements + + +def connect(query_index=0): + """ + Establece la conexión a la base de datos, ejecuta la consulta y devuelve los resultados. + + Parámetros: + query_index (int): Índice de la consulta a ejecutar (predeterminado es 0). + + Retorna: + list: Lista de elementos obtenidos de la consulta, o lista vacía en caso de error. + """ + try: + with mysql.connector.connect(**CONFIG_DB) as connection: + with connection.cursor() as cursor: + return select(cursor, query_index=query_index) + except mysql.connector.Error as err: + if err.errno == errorcode.ER_ACCESS_DENIED_ERROR: + logging.error("Algo está mal con tu nombre de usuario o contraseña.") + elif err.errno == errorcode.ER_BAD_DB_ERROR: + logging.error("La base de datos no existe.") + else: + logging.error(err) + except Exception as e: + logging.error(f"Error inesperado: {e}") + return [] diff --git a/zxdb/downloader.py b/zxdb/downloader.py new file mode 100644 index 0000000..42a5d71 --- /dev/null +++ b/zxdb/downloader.py @@ -0,0 +1,144 @@ +import logging +import os +import random +import shutil +import time +import zipfile + +import requests +from requests.adapters import HTTPAdapter +from urllib3.util.retry import Retry + +import config +from zxdb.organizer import get_final_destination_folder +from zxdb.filesystem import print_status + + +def download_file(url, destination): + """ + Descarga un fichero a partir de una URL. + + Parámetros: + url (str): La URL del fichero que se desea descargar. + destination (str): La ruta de destino donde se guardará el fichero descargado. + + Retorna: + bool: True si la descarga fue exitosa, False en caso de error. + """ + session = requests.Session() + retries = Retry(total=3, backoff_factor=1, status_forcelist=[500, 502, 503, 504]) + session.mount('https://', HTTPAdapter(max_retries=retries)) + + try: + response = session.get(url, timeout=10) + response.raise_for_status() + with open(destination, 'wb') as file: + file.write(response.content) + return True + except requests.exceptions.RequestException as e: + logging.error(f"Error al descargar el archivo: {e}") + return False + + +def unzip_file(src, dst): + """ + Descomprime los ficheros que coinciden con la lista de extensiones. + + Parámetros: + src (str): Ruta del archivo ZIP que se desea descomprimir. + dst (str): Ruta del directorio donde se extraerán los ficheros. + + Extensiones soportadas: + .tap, .tzx, .stl, .sna, .z80, .dsk, .fdi, .trd, .img, .mgt, .szx, .dck + """ + tape_file_formats = (".tap", ".tzx") + snapshot_file_formats = (".stl", ".sna", ".z80") + disk_file_formats = (".dsk", ".fdi", ".trd", ".img", ".mgt") + emulator_specific_formats = (".szx", ".dck") + extensions = tape_file_formats + snapshot_file_formats + disk_file_formats + emulator_specific_formats + + try: + with zipfile.ZipFile(src, "r") as zip_file: + for file in zip_file.namelist(): + if file.lower().endswith(extensions): + zip_file.extract(file, dst) + except zipfile.BadZipFile: + logging.error("El archivo ZIP está corrupto.") + except FileNotFoundError: + logging.error("El archivo ZIP no se encontró.") + except Exception as e: + logging.error(f"Ocurrió un error: {e}") + + +def process_cache_file(cache_file, destination_subfolder, destination_file, element): + """ + Crea las carpetas de destino y copia o extrae el archivo de la caché. + + Parámetros: + cache_file (str): Ruta del archivo en la caché. + destination_subfolder (str): Ruta del subdirectorio de destino. + destination_file (str): Ruta del archivo de destino si no se extrae. + element (dict): Diccionario que contiene información del archivo. + """ + os.makedirs(destination_subfolder, exist_ok=True) + if cache_file.endswith(".zip") and element["subfolder"] == "": + unzip_file(cache_file, destination_subfolder) + else: + shutil.copyfile(cache_file, destination_file) + + +def get_files(elements): + """ + Obtiene los ficheros desde internet o desde la caché y los deposita en la carpeta destino. + + Parámetros: + elements (list): Lista de diccionarios con la información de cada fichero a procesar. + """ + current_file = 0 + total_files = len(elements) + total_files_width = len(str(total_files)) + last_game_folder = "" + + for element in elements: + classification_folder = get_final_destination_folder(element["title"], element["release_year"], element["developer"]) + + destination_folder = os.path.join(config.DESTINATION_PATH, classification_folder) + destination_subfolder = os.path.join(destination_folder, element["subfolder"]) + cache_folder = os.path.join(config.CACHE_PATH, element["game_folder_name"]) + cache_subfolder = os.path.join(cache_folder, element["subfolder"]) + + destination_file = os.path.join(destination_subfolder, element["file_name"]) + cache_file = os.path.join(cache_subfolder, element["file_name"]) + + current_file += 1 + + if element["game_folder_name"] != last_game_folder: + print("\n{}".format(element["game_folder_name"])) + last_game_folder = element["game_folder_name"] + + try: + if not os.path.isfile(destination_file) and not os.path.isfile(os.path.join(destination_subfolder, element["non_zip_file_name"])): + + if os.path.isfile(cache_file): + process_cache_file(cache_file, destination_subfolder, destination_file, element) + print_status(current_file, total_files, element, total_files_width, status="cached") + + else: + if download_file(element["url"], config.TEMP_FILE): + print_status(current_file, total_files, element, total_files_width, status="downloaded") + if os.path.isfile(config.TEMP_FILE): + os.makedirs(cache_subfolder, exist_ok=True) + shutil.move(config.TEMP_FILE, cache_file) + if os.path.isfile(cache_file): + process_cache_file(cache_file, destination_subfolder, destination_file, element) + else: + print_status(current_file, total_files, element, total_files_width, status="not found") + + if config.WAIT: + time.sleep(random.randint(config.MIN_WAIT, config.MAX_WAIT)) + + else: + print_status(current_file, total_files, element, total_files_width, status="skipping") + + except Exception as e: + logging.error(f"Error al procesar el fichero {element['file_name']}: {e}") diff --git a/zxdb/filesystem.py b/zxdb/filesystem.py new file mode 100644 index 0000000..a4ef544 --- /dev/null +++ b/zxdb/filesystem.py @@ -0,0 +1,89 @@ +import logging +import os +import shutil + +import config + + +def print_status(current_file, total_files, element, total_files_width, status="cached"): + """ + Imprime el estado de un archivo en el proceso de descarga. + + Parámetros: + current_file (int): El número del archivo actual en el proceso de descarga. + total_files (int): El número total de archivos en el proceso de descarga. + element (dict): Diccionario con información del archivo actual ('file_name' y 'filetype'). + total_files_width (int): Ancho del campo para el número total de archivos. + status (str): El estado del archivo (predeterminado es "cached"). + """ + print( + "({:{width}} / {}) : {:<10} : {} ({})".format( + current_file, + total_files, + status, + element["file_name"], + element["filetype"], + width=total_files_width, + ) + ) + + +def print_elements(elements, mode=0): + """ + Imprime la lista de elementos en diferentes modos. + + Parámetros: + elements (list): Lista de diccionarios con los datos de los elementos. + mode (int): El modo de impresión. + - Si es 0, imprime todos los elementos con sus claves y valores. + - Si es 1, imprime los nombres de carpeta únicos y el número total. + """ + if mode == 0: + for element in elements: + print('') + for key, value in element.items(): + print(key, ':', value) + elif mode == 1: + seen = set() + for element in elements: + game_folder_name = element['game_folder_name'] + if game_folder_name not in seen: + print(game_folder_name) + seen.add(game_folder_name) + print(f"Número de entradas: {len(seen)}") + + +def clear_destination_folder(): + """ + Limpia la carpeta de destino si la opción está habilitada en la configuración. + """ + if config.SHOULD_CLEAR_DESTINATION_PATH: + logging.info("Limpiando la carpeta de destino ...") + for filename in os.listdir(config.DESTINATION_PATH): + file_path = os.path.join(config.DESTINATION_PATH, filename) + try: + if os.path.isfile(file_path) or os.path.islink(file_path): + os.unlink(file_path) + logging.info(f"Archivo eliminado: {file_path}") + elif os.path.isdir(file_path): + shutil.rmtree(file_path) + logging.info(f"Directorio eliminado: {file_path}") + except Exception as e: + logging.error(f'No se pudo eliminar {file_path}. Razón: {e}') + + +def remove_empty_directories(path): + """ + Elimina los subdirectorios vacíos de forma recursiva. + + Parámetros: + path (str): La ruta del directorio raíz desde donde se inicia la eliminación. + """ + for root, dirs, files in os.walk(path, topdown=False): + for dir in dirs: + dir_path = os.path.join(root, dir) + try: + os.rmdir(dir_path) + logging.info(f"Directorio vacío eliminado: {dir_path}") + except OSError: + pass diff --git a/zxdb/organizer.py b/zxdb/organizer.py new file mode 100644 index 0000000..74a7cbe --- /dev/null +++ b/zxdb/organizer.py @@ -0,0 +1,135 @@ +import os +from unidecode import unidecode + +import config + +URL_PREFIX = { + "spectrum_computing": r"https://spectrumcomputing.co.uk", + "wos": r"https://php.sustancia.synology.me/wos", + "nvg": r"https://php.sustancia.synology.me/nvg", +} + +FILETYPES_ON_ROOT = [ + "Tape image", + "Disk image", + "Snapshot image", + "POK pokes file", +] + + +def normalize_path(path): + """ + Elimina los caracteres ilegales de la cadena de texto. + + Parámetros: + path (str): La cadena de texto que se desea normalizar. + + Retorna: + str: La cadena de texto normalizada sin los caracteres ilegales. + """ + illegal_chars = ["<", ">", ":", '"', "/", "\\", "|", "?", "*"] + replace_with = "" + for char in illegal_chars: + path = unidecode(path.replace(char, replace_with)) + return path + + +def url_filename(url): + """ + Devuelve el fichero que forma la parte final de una URL. + + Parámetros: + url (str): La URL de la cual se quiere extraer el nombre del fichero. + + Retorna: + str: El nombre del fichero que forma la parte final de la URL. + """ + from urllib.parse import urlparse + parsed_url = urlparse(url) + path = parsed_url.path + filename = os.path.basename(path) + return filename + + +def update_url(element, url_prefix): + """ + Añade un prefijo a la URL en el diccionario `element` según el prefijo proporcionado en `url_prefix`. + + Parámetros: + element (dict): Diccionario que contiene la clave 'url'. + url_prefix (dict): Diccionario con los prefijos para 'spectrum_computing', 'wos' y 'nvg'. + """ + url = element["url"] + if url.startswith("/zxdb"): + element["url"] = url_prefix["spectrum_computing"] + url + elif url.startswith("/pub"): + element["url"] = url_prefix["wos"] + url[4:] + elif url.startswith("/nvg"): + element["url"] = url_prefix["nvg"] + url[4:] + + +def process_elements(elements): + """ + Procesa todos los elementos, enriqueciendo cada diccionario con metadatos adicionales. + + Parámetros: + elements (list): Lista de diccionarios con los datos crudos de la base de datos. + + Retorna: + list: La misma lista con cada elemento enriquecido con 'game_folder_name', 'url', + 'file_name', 'subfolder', 'is_zip' y 'non_zip_file_name'. + """ + for i in range(len(elements)): + elements[i]["game_folder_name"] = f"{elements[i]['title']} ({elements[i]['release_year']})({elements[i]['developer']})" + elements[i]["game_folder_name"] = normalize_path(elements[i]["game_folder_name"]) + + update_url(elements[i], URL_PREFIX) + + elements[i]["file_name"] = url_filename(elements[i]["url"]) + + elements[i]["subfolder"] = normalize_path(elements[i]["filetype"]) if elements[i]["filetype"] not in FILETYPES_ON_ROOT else "" + + elements[i]["is_zip"] = elements[i]["file_name"].lower().endswith(".zip") + + elements[i]["non_zip_file_name"] = elements[i]["file_name"][:-4] if elements[i]["is_zip"] else elements[i]["file_name"] + + return elements + + +def get_final_destination_folder(title, year, developer): + """ + Compone la carpeta de destino en función de varios parámetros de configuración. + + Parámetros: + title (str): Título del juego. + year (int o str): Año de lanzamiento del juego. + developer (str): Nombre del desarrollador del juego. + + Retorna: + str: La ruta relativa de la carpeta de destino final. + """ + game_folder_name = f"{title} ({year})" if config.SHOULD_SORT_BY_DEVELOPER else f"{title} ({year})({developer})" + game_folder_name = normalize_path(game_folder_name) + + folder1 = "" + if config.SHOULD_SPLIT_MODERN_AND_CLASSIC: + folder1 = "modern" if year == "none" or year is None or year > config.LAST_CLASSIC_YEAR else "classics" + elif config.SHOULD_SORT_BY_DEVELOPER: + folder1 = "by_developer" + elif config.SHOULD_SORT_BY_YEAR: + folder1 = "by_year" + + folder2 = "" + if config.SHOULD_SORT_BY_DEVELOPER: + developer = developer or "" + folder2 = os.path.join("0-9", developer) if developer[0].isdigit() else os.path.join(developer[0].upper(), developer) + + folder3 = "" + if config.SHOULD_SORT_BY_YEAR: + folder3 = str(year) if year not in [None, "none"] else "unknown" + + folder4 = "" + if config.SHOULD_SORT_BY_LETTER: + folder4 = "0-9" if game_folder_name[0].isdigit() else game_folder_name[0].upper() + + return os.path.join(folder1, folder2, folder3, folder4, game_folder_name) diff --git a/queries.sql b/zxdb/queries.sql similarity index 100% rename from queries.sql rename to zxdb/queries.sql diff --git a/zxdb/setup/__init__.py b/zxdb/setup/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/zxdb/setup/__main__.py b/zxdb/setup/__main__.py new file mode 100644 index 0000000..275679f --- /dev/null +++ b/zxdb/setup/__main__.py @@ -0,0 +1,154 @@ +"""Orchestrator for the full ZXDB setup flow. + +Usage: + python -m zxdb.setup +""" +import json +import logging +import sys +from datetime import datetime, timezone +from pathlib import Path + +import requests + +sys.path.insert(0, str(Path(__file__).parent.parent.parent)) +import config +from zxdb.setup.database_import import download_zxdb +from zxdb.setup.docker_manager import ( + db_exists, + ensure_container, + get_db_creation_date, + import_sql, + wait_for_mysql, +) + +logging.basicConfig(level=logging.INFO, format="%(levelname)s: %(message)s") +logger = logging.getLogger(__name__) + +GITHUB_COMMITS_URL = ( + "https://api.github.com/repos/zxdb/ZXDB/commits" + "?path=ZXDB_mysql.sql.zip&per_page=1" +) + + +def get_github_commit_date() -> str | None: + """Return the ISO 8601 date of the latest commit touching ZXDB_mysql.sql.zip, or None.""" + try: + response = requests.get( + GITHUB_COMMITS_URL, + headers={"Accept": "application/vnd.github+json"}, + timeout=15, + ) + response.raise_for_status() + commits = response.json() + if commits: + return commits[0]["commit"]["committer"]["date"] + logger.warning("GitHub returned an empty commit list.") + except Exception as e: + logger.warning("Could not fetch GitHub commit date: %s", e) + return None + + +def load_state() -> dict: + """Load state from ZXDB_STATE_FILE. Returns empty dict if file is missing or invalid.""" + path = Path(config.ZXDB_STATE_FILE) + if path.exists(): + try: + return json.loads(path.read_text()) + except Exception as e: + logger.warning("Could not read state file: %s", e) + return {} + + +def save_state(github_commit_date: str | None) -> None: + """Save current import state to ZXDB_STATE_FILE.""" + state = { + "github_commit_date": github_commit_date, + "last_import": datetime.now(timezone.utc).isoformat(), + } + path = Path(config.ZXDB_STATE_FILE) + path.write_text(json.dumps(state, indent=2)) + logger.info("State saved to %s", path) + + +def main() -> int: + # 1. Ensure Docker container is running + container = ensure_container() + + # 2. Wait for MySQL to be ready + if not wait_for_mysql(container): + logger.error("MySQL did not become ready in time.") + return 1 + + # 3. Fetch latest GitHub commit date for the SQL file + github_commit_date = get_github_commit_date() + + # 4. Load persisted state + state = load_state() + + # 5. Decide whether to import + needs_import = False + saved_commit_date = state.get("github_commit_date") + + if not db_exists(container): + logger.info("Database 'zxdb' not found — import required.") + needs_import = True + elif saved_commit_date is None: + # No state file: compare GitHub date against actual DB creation time. + db_date = get_db_creation_date(container) + if db_date is None: + logger.warning("Cannot determine DB creation date; assuming up to date.") + save_state(github_commit_date) + elif github_commit_date is None: + logger.warning("Cannot reach GitHub; assuming DB is up to date.") + save_state(None) + else: + gh_date = datetime.fromisoformat(github_commit_date.replace("Z", "+00:00")) + if gh_date > db_date: + logger.info( + "GitHub version (%s) is newer than DB creation date (%s) — import required.", + github_commit_date, + db_date.strftime("%Y-%m-%d %H:%M:%S UTC"), + ) + needs_import = True + else: + logger.info( + "DB creation date (%s) is current — no import needed.", + db_date.strftime("%Y-%m-%d"), + ) + save_state(github_commit_date) + elif github_commit_date is None: + logger.warning("Cannot determine GitHub version; skipping import (DB already exists).") + elif github_commit_date > saved_commit_date: + logger.info( + "New ZXDB version detected (%s > %s) — import required.", + github_commit_date, + saved_commit_date, + ) + needs_import = True + else: + logger.info("ZXDB is up to date, skipping import.") + + if not needs_import: + return 0 + + # 6. Download and extract the SQL file + sql_dir = str(Path(config.ZXDB_SQL_PATH).parent) + sql_path = download_zxdb(sql_dir) + if sql_path is None: + logger.error("Download failed.") + return 1 + + # 7. Import into MySQL + if not import_sql(container, sql_path): + logger.error("SQL import failed.") + return 1 + + # 8. Persist state + save_state(github_commit_date) + + return 0 + + +if __name__ == "__main__": + sys.exit(main()) diff --git a/zxdb/setup/database_import.py b/zxdb/setup/database_import.py new file mode 100644 index 0000000..bbbc0b2 --- /dev/null +++ b/zxdb/setup/database_import.py @@ -0,0 +1,67 @@ +import logging +import zipfile +from pathlib import Path + +import requests + +ZXDB_ZIP_URL = "https://github.com/zxdb/ZXDB/raw/master/ZXDB_mysql.sql.zip" +ZXDB_SQL_FILENAME = "ZXDB_mysql.sql" + +logging.basicConfig(level=logging.INFO, format="%(levelname)s: %(message)s") +logger = logging.getLogger(__name__) + + +def download_zxdb(destination: str) -> Path | None: + """Download ZXDB_mysql.sql.zip from GitHub and extract ZXDB_mysql.sql to destination. + + Returns the path to the extracted .sql file, or None on error. + """ + dest_dir = Path(destination) + dest_dir.mkdir(parents=True, exist_ok=True) + zip_path = dest_dir / "ZXDB_mysql.sql.zip" + sql_path = dest_dir / ZXDB_SQL_FILENAME + + if sql_path.exists(): + logger.info("Using cached %s (already downloaded).", sql_path) + return sql_path + + logger.info("Downloading %s ...", ZXDB_ZIP_URL) + try: + with requests.get(ZXDB_ZIP_URL, stream=True, timeout=60) as response: + response.raise_for_status() + total = int(response.headers.get("content-length", 0)) + downloaded = 0 + with open(zip_path, "wb") as f: + for chunk in response.iter_content(chunk_size=1024 * 1024): + f.write(chunk) + downloaded += len(chunk) + if total: + pct = downloaded * 100 // total + logger.info(" %d%% (%d / %d bytes)", pct, downloaded, total) + except requests.RequestException as e: + logger.error("Download failed: %s", e) + zip_path.unlink(missing_ok=True) + return None + + logger.info("Extracting %s ...", ZXDB_SQL_FILENAME) + try: + with zipfile.ZipFile(zip_path) as zf: + zf.extract(ZXDB_SQL_FILENAME, dest_dir) + except (zipfile.BadZipFile, KeyError) as e: + logger.error("Extraction failed: %s", e) + zip_path.unlink(missing_ok=True) + return None + + zip_path.unlink(missing_ok=True) + logger.info("Extracted to %s", sql_path) + return sql_path + + +if __name__ == "__main__": + import sys + sys.path.insert(0, str(Path(__file__).parent.parent.parent)) + import config + + result = download_zxdb(str(Path(config.ZXDB_SQL_PATH).parent)) + if result is None: + sys.exit(1) diff --git a/zxdb/setup/docker_manager.py b/zxdb/setup/docker_manager.py new file mode 100644 index 0000000..1c06b46 --- /dev/null +++ b/zxdb/setup/docker_manager.py @@ -0,0 +1,138 @@ +import logging +import subprocess +import time +from datetime import datetime, timezone +from pathlib import Path + +logger = logging.getLogger(__name__) + +CONTAINER_NAME = "mysql" + + +def _run(args: list[str], **kwargs) -> subprocess.CompletedProcess: + return subprocess.run(args, capture_output=True, text=True, **kwargs) + + +def ensure_container() -> str: + """Ensure the MySQL Docker container exists and is running. Returns container name.""" + import sys + sys.path.insert(0, str(Path(__file__).parent.parent.parent)) + import config + + result = _run(["docker", "ps", "-a", "--filter", f"name=^{CONTAINER_NAME}$", "--format", "{{.Names}}"]) + existing = result.stdout.strip() + + if existing == CONTAINER_NAME: + # Check if it's running + running = _run(["docker", "ps", "--filter", f"name=^{CONTAINER_NAME}$", "--format", "{{.Names}}"]) + if running.stdout.strip() != CONTAINER_NAME: + logger.info("Starting existing container '%s'...", CONTAINER_NAME) + _run(["docker", "start", CONTAINER_NAME], check=True) + else: + logger.info("Container '%s' is already running.", CONTAINER_NAME) + else: + logger.info("Creating new MySQL container '%s'...", CONTAINER_NAME) + _run([ + "docker", "run", "-d", + "--name", CONTAINER_NAME, + "-p", "3306:3306", + "-e", f"MYSQL_ROOT_PASSWORD={config.DB_PASSWORD}", + "mysql:latest", + ], check=True) + + return CONTAINER_NAME + + +def wait_for_mysql(container: str, timeout: int = 60) -> bool: + """Poll until MySQL is ready or timeout (seconds) is reached. Returns True if ready.""" + import sys + sys.path.insert(0, str(Path(__file__).parent.parent.parent)) + import config + + logger.info("Waiting for MySQL to be ready (timeout=%ds)...", timeout) + deadline = time.time() + timeout + while time.time() < deadline: + result = _run([ + "docker", "exec", container, + "mysqladmin", "ping", "-h", "127.0.0.1", + f"-p{config.DB_PASSWORD}", "--silent", + ]) + if result.returncode == 0: + logger.info("MySQL is ready.") + return True + time.sleep(2) + + logger.error("Timed out waiting for MySQL.") + return False + + +def db_exists(container: str) -> bool: + """Return True if the 'zxdb' database exists in the container.""" + import sys + sys.path.insert(0, str(Path(__file__).parent.parent.parent)) + import config + + result = _run([ + "docker", "exec", container, + "mysql", "-u", "root", f"-p{config.DB_PASSWORD}", + "-e", "SHOW DATABASES LIKE 'zxdb';", + ]) + return "zxdb" in result.stdout + + +def get_db_creation_date(container: str) -> datetime | None: + """Return the approximate import datetime of the zxdb database, or None.""" + import sys + sys.path.insert(0, str(Path(__file__).parent.parent.parent)) + import config + + result = _run([ + "docker", "exec", container, + "mysql", "-u", "root", f"-p{config.DB_PASSWORD}", "--skip-column-names", "-e", + "SELECT MIN(CREATE_TIME) FROM information_schema.TABLES WHERE TABLE_SCHEMA = 'zxdb' AND CREATE_TIME IS NOT NULL;", + ]) + value = result.stdout.strip() + if not value or value == "NULL": + return None + # MySQL format: "2024-11-12 17:07:23" + return datetime.strptime(value, "%Y-%m-%d %H:%M:%S").replace(tzinfo=timezone.utc) + + +def stop_container(container: str) -> None: + """Stop the MySQL Docker container.""" + logger.info("Stopping container '%s'...", container) + _run(["docker", "stop", container]) + + +def import_sql(container: str, sql_path: Path) -> bool: + """Create the zxdb database if needed and import the SQL file. Returns True on success.""" + import sys + sys.path.insert(0, str(Path(__file__).parent.parent.parent)) + import config + + # Drop and recreate database to ensure a clean import + create_result = _run([ + "docker", "exec", container, + "mysql", "-u", "root", f"-p{config.DB_PASSWORD}", + "-e", "DROP DATABASE IF EXISTS zxdb; CREATE DATABASE zxdb CHARACTER SET utf8mb4;", + ]) + if create_result.returncode != 0: + logger.error("Failed to recreate database: %s", create_result.stderr) + return False + + logger.info("Importing %s into zxdb (this may take several minutes)...", sql_path) + start = time.time() + try: + with open(sql_path, "rb") as f: + result = subprocess.run( + ["docker", "exec", "-i", container, + "mysql", "-u", "root", f"-p{config.DB_PASSWORD}", "zxdb"], + stdin=f, + check=True, + ) + elapsed = time.time() - start + logger.info("Import completed in %.1f seconds.", elapsed) + return True + except subprocess.CalledProcessError as e: + logger.error("Import failed: %s", e) + return False