Refactorización completa: modularización, setup automático y mejoras de configuración

- Reemplaza zxdb.py por main.py + paquete zxdb/ (database, organizer, downloader, filesystem)
- Añade zxdb/setup/: orquestador Docker, descarga e import de ZXDB automáticos
- main.py integra el setup al arrancar y detiene el contenedor al salir (try/finally)
- Elimina DB_HOST de config.py: la conexión usa siempre 127.0.0.1 (port mapping Docker)
- Actualiza requirements.txt a versiones más recientes y elimina logging (stdlib)
- Actualiza README con el nuevo flujo de uso

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
2026-03-06 12:54:03 +01:00
parent a54d741c92
commit f967af541c
17 changed files with 1019 additions and 690 deletions
+2 -2
View File
@@ -1,2 +1,2 @@
zxdbenv
__pycache__
__pycache__
.venv
+80
View File
@@ -0,0 +1,80 @@
# CLAUDE.md
This file provides guidance to Claude Code (claude.ai/code) when working with code in this repository.
## Overview
This is a Python script that downloads ZX Spectrum game files from the [ZXDB](https://github.com/zxdb/ZXDB) open database. It queries a local MySQL instance containing ZXDB data, then downloads and organizes game files (tapes, disks, snapshots, pokes) into a configurable folder structure with caching support.
## Setup
**Virtual environment:**
```bash
python3 -m venv .venv
source .venv/bin/activate
pip install -r requirements.txt
```
**MySQL database:** Runs in a Docker container. Connect with:
```bash
mysql -h 172.18.0.2 -P 3306 -u root -p
# Password: unJEPimbJddHP8
# Then: use zxdb
# To import: source /path/to/ZXDB.sql
```
**Configuration:** Edit `config.py` directly (tracked in git; credentials are intentionally non-sensitive). Key settings:
- `DB_*` — MySQL connection params
- `DESTINATION_PATH` / `CACHE_PATH` / `TEMP_FILE` — local paths
- `SHOULD_CLEAR_DESTINATION_PATH` — wipes destination before each run
- `SHOULD_SPLIT_MODERN_AND_CLASSIC` / `SHOULD_SORT_BY_YEAR` / `SHOULD_SORT_BY_LETTER` / `SHOULD_SORT_BY_DEVELOPER` — folder organization modes
- `WAIT` / `MIN_WAIT` / `MAX_WAIT` — rate limiting between downloads
- `LAST_CLASSIC_YEAR` — year cutoff for classic/modern split
- `ZXDB_SQL_PATH` — where to place the extracted `ZXDB_mysql.sql` (used by `zxdb/setup/database_import.py`)
- `ZXDB_STATE_FILE` — JSON file tracking the last successful import (github_commit_date + last_import)
## Running
```bash
python main.py
```
## Architecture
The code is organized as a Python package (`zxdb/`) orchestrated by `main.py`. There is no global state: data flows as a list of dictionaries between functions.
**Entry point (`main.py`):**
```python
elements = database.connect(query_index=3)
elements = organizer.process_elements(elements)
filesystem.print_elements(elements, mode=1)
filesystem.clear_destination_folder()
downloader.get_files(elements)
filesystem.remove_empty_directories(config.DESTINATION_PATH)
```
**Module structure:**
- `zxdb/database.py``load_queries()`, `select()`, `connect()`: connects to MySQL and returns elements list
- `zxdb/organizer.py``normalize_path()`, `update_url()`, `url_filename()`, `process_elements()`, `get_final_destination_folder()`
- `zxdb/downloader.py``download_file()`, `unzip_file()`, `process_cache_file()`, `get_files(elements)`
- `zxdb/filesystem.py``clear_destination_folder()`, `remove_empty_directories()`, `print_elements(elements, mode)`, `print_status()`
- `zxdb/queries.sql` — Four SQL queries (index 03), loaded via `Path(__file__).parent / "queries.sql"`
- `zxdb/setup/database_import.py``download_zxdb(destination)`: streaming download of `ZXDB_mysql.sql.zip` from GitHub + ZIP extraction; runnable via `python -m zxdb.setup.database_import`
- `zxdb/setup/docker_manager.py``ensure_container()`, `wait_for_mysql(container, timeout)`, `db_exists(container)`, `import_sql(container, sql_path)`: Docker container management and SQL import
- `zxdb/setup/__main__.py` — Full setup orchestrator: Docker → MySQL ready → GitHub change detection → download → import → save state. Run via `python -m zxdb.setup`
**SQL queries (`zxdb/queries.sql`):** Four queries (index 03). Query 3 (current default) filters for ZX-Spectrum machines only (`m.text like 'ZX-%'`) and returns only tape/disk/snapshot/poke file types.
**Folder organization (`get_final_destination_folder()`):** Builds a path from up to four nested levels based on the sort flags in `config.py`:
- `folder1`: `classics`/`modern`, `by_developer`, `by_year`, or empty
- `folder2`: developer letter + developer name (e.g., `D/Dinamic`) or empty
- `folder3`: release year or `unknown`
- `folder4`: first letter of game title (e.g., `A`, `0-9`) or empty
**URL resolution (`update_url()`):** URLs from the DB are relative paths. They get prefixed based on their start: `/zxdb` → spectrumcomputing.co.uk, `/pub` → WOS mirror, `/nvg` → NVG mirror.
**Cache:** Files are stored in `CACHE_PATH/<game_folder_name>/<subfolder>/`. The cache key is the original `game_folder_name` (title + year + developer), independent of the current sort configuration, so changing sort options reuses cached files.
**File types stored at game root** (no subfolder): Tape image, Disk image, Snapshot image, POK pokes file. All other file types go into a named subfolder.
**ZIP extraction (`unzip_file()`):** Only extracts files matching known emulator formats: `.tap`, `.tzx`, `.stl`, `.sna`, `.z80`, `.dsk`, `.fdi`, `.trd`, `.img`, `.mgt`, `.szx`, `.dck`.
+94 -37
View File
@@ -1,52 +1,109 @@
## Anotacions
Per a connectar-se a la base de dades local (que està al container de MySQL)
# ZXDB Downloader
mysql -h 172.18.0.2 -P 3306 -u root -p
Script que descarga juegos de ZX Spectrum a partir de la base de datos [ZXDB](https://github.com/zxdb/ZXDB) (MySQL en Docker).
Password
## Requisitos
unJEPimbJddHP8
- Python 3.x
- Docker
Si la base de dades ja existeix, per seleccionar-la
## Setup
use zxdb
### Entorno virtual
Per descarregar una nova versió, baixar-la desde
[GitHub - zxdb/ZXDB: Open database with historical information about Sinclair machines](https://github.com/zxdb/ZXDB)
Per executar el fitxer .sql, amb la base de dades zxdb activa:
```bash
python3 -m venv .venv
source .venv/bin/activate
pip install -r requirements.txt
```
source /ruta/fichero.sql
### Configuración
Per instalar els requisits del script de python
Edita `config.py` en la raíz del proyecto. Variables principales:
pip install -r requirements.txt
| Variable | Descripción |
|----------|-------------|
| `DB_*` | Parámetros de conexión a MySQL |
| `DESTINATION_PATH` | Carpeta donde se depositan los juegos organizados |
| `CACHE_PATH` | Carpeta de caché (evita re-descargar ficheros) |
| `TEMP_FILE` | Fichero temporal durante la descarga |
| `SHOULD_CLEAR_DESTINATION_PATH` | Si es `True`, limpia el destino antes de cada ejecución |
| `SHOULD_SPLIT_MODERN_AND_CLASSIC` | Divide los juegos en carpetas `classics`/`modern` |
| `SHOULD_SORT_BY_YEAR` | Organiza por año de lanzamiento |
| `SHOULD_SORT_BY_LETTER` | Organiza por primera letra del título |
| `SHOULD_SORT_BY_DEVELOPER` | Organiza por desarrollador |
| `WAIT` / `MIN_WAIT` / `MAX_WAIT` | Control de velocidad de descarga |
| `LAST_CLASSIC_YEAR` | Año de corte entre clásicos y modernos |
| `ZXDB_SQL_PATH` | Ruta temporal donde se extrae `ZXDB_mysql.sql` |
| `ZXDB_STATE_FILE` | Fichero JSON con el estado del último import |
Per crear un entorn virtual "zxdbenv"
## Ejecución
python3 -m venv zxdbenv
```bash
source .venv/bin/activate
python main.py
```
Per activar el entorn "zxdbenv"
`main.py` hace el setup automáticamente al arrancar:
1. Asegura que el contenedor MySQL Docker existe y está corriendo (lo crea si no existe).
2. Comprueba en GitHub si hay una versión más nueva de ZXDB.
3. Si es necesario: descarga `ZXDB_mysql.sql.zip` e importa en MySQL.
4. Descarga y organiza los juegos.
5. Al terminar (o con Ctrl+C), detiene el contenedor Docker.
source zxdbenv/bin/activate
El estado del último import se guarda en `/tmp/zxdb_state.json`. Una segunda ejecución sin cambios en GitHub omite el import.
Exemple de fitxer .env
Para conectar manualmente a MySQL mientras el contenedor está en marcha:
DB_USER=root
DB_PASSWORD=unJEPimbJddHP8
DB_HOST=172.18.0.2
DB_PORT=3306
DB_NAME=zxdb
DESTINATION_PATH=/home/sergio/zx/zxdb/games/
CACHE_PATH=/home/sergio/zx/zxdb/cache/games/
TEMP_FILE=/tmp/zxdb.download.tmp
SHOULD_CLEAR_DESTINATION_PATH=True
SHOULD_SPLIT_MODERN_AND_CLASSIC=True
SHOULD_SORT_BY_YEAR=True
SHOULD_SORT_BY_LETTER=True
WAIT=True
MIN_WAIT=2
MAX_WAIT=4
LAST_CLASSIC_YEAR=1993
```bash
mysql -h 127.0.0.1 -P 3306 -u root -p
# Contraseña: unJEPimbJddHP8
# Luego: use zxdb
```
## Estructura del proyecto
```
zxdb/
├── main.py # Punto de entrada: orquesta el flujo principal
├── config.py # Configuración (credenciales y rutas locales, no en git)
├── requirements.txt # Dependencias Python
└── zxdb/ # Paquete con la lógica del programa
├── __init__.py
├── database.py # Conexión a MySQL y ejecución de consultas
├── queries.sql # Consultas SQL (índices 0-3)
├── organizer.py # Construcción de rutas y enriquecimiento de elementos
├── downloader.py # Descargas, caché y descompresión de ficheros
├── filesystem.py # Utilidades de sistema de ficheros y salida por pantalla
└── setup/
├── __init__.py
├── __main__.py # Orquestador: Docker + detección de cambios + import
├── database_import.py # Descarga y extracción de ZXDB_mysql.sql.zip
└── docker_manager.py # Gestión del contenedor MySQL e import SQL
```
## Módulos
### `zxdb/database.py`
Gestiona la conexión a MySQL. `connect(query_index)` devuelve la lista de elementos obtenidos de la base de datos.
### `zxdb/organizer.py`
Construye rutas y enriquece cada elemento con metadatos: `game_folder_name`, URL completa, nombre de fichero, subcarpeta y flag de ZIP. `get_final_destination_folder()` aplica la lógica de organización según la configuración.
### `zxdb/downloader.py`
Gestiona las descargas desde internet y el uso de la caché. `get_files(elements)` itera sobre todos los elementos, sirve desde caché si es posible, o descarga y almacena en caché.
### `zxdb/filesystem.py`
Utilidades de sistema de ficheros: limpiar carpeta de destino, eliminar directorios vacíos, e imprimir el progreso por pantalla.
### `zxdb/queries.sql`
Cuatro consultas SQL (índices 0-3). La consulta 3 (usada por defecto) filtra juegos de ZX Spectrum (`m.text like 'ZX-%'`) y devuelve solo ficheros de cinta, disco, snapshot y pokes.
### `zxdb/setup/` — Setup de la base de datos
Invocado automáticamente por `main.py`. También puede ejecutarse de forma independiente con `python -m zxdb.setup`.
- **`__main__.py`**: Orquestador del flujo completo (Docker + detección de cambios + import).
- **`docker_manager.py`**: `ensure_container()`, `wait_for_mysql()`, `db_exists()`, `import_sql()`, `stop_container()` — gestión del contenedor Docker y del import SQL.
- **`database_import.py`**: `download_zxdb(destination)` — descarga `ZXDB_mysql.sql.zip` desde GitHub (streaming, ~150 MB) y extrae `ZXDB_mysql.sql`. Ejecutable directamente con `python -m zxdb.setup.database_import`.
+6 -1
View File
@@ -1,7 +1,6 @@
# Configuración de base de datos
DB_USER = 'root'
DB_PASSWORD = 'unJEPimbJddHP8'
DB_HOST = '172.18.0.2'
DB_PORT = 3306
DB_NAME = 'zxdb'
@@ -24,3 +23,9 @@ MAX_WAIT = 4 # Cantidad de segundos máxima a esperar entre descargas
# Año usado para la separación entre juegos clásicos y modernos
LAST_CLASSIC_YEAR = 1993
# Ruta donde se deposita ZXDB_mysql.sql tras la descarga desde GitHub
ZXDB_SQL_PATH = '/tmp/ZXDB_mysql.sql'
# Fichero de estado del setup de ZXDB (registro del último import)
ZXDB_STATE_FILE = '/tmp/zxdb_state.json'
+28
View File
@@ -0,0 +1,28 @@
import logging
import sys
import config
from zxdb import database, organizer, filesystem, downloader
from zxdb.setup.__main__ import main as run_setup
from zxdb.setup.docker_manager import stop_container, CONTAINER_NAME
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
def main():
try:
if run_setup() != 0:
sys.exit(1)
elements = database.connect(query_index=3)
elements = organizer.process_elements(elements)
filesystem.print_elements(elements, mode=1)
filesystem.clear_destination_folder()
downloader.get_files(elements)
filesystem.remove_empty_directories(config.DESTINATION_PATH)
finally:
stop_container(CONTAINER_NAME)
if __name__ == "__main__":
main()
+3 -4
View File
@@ -1,4 +1,3 @@
mysql-connector-python==9.1.0
requests==2.32.3
logging==0.4.9.6
unidecode==1.3.8
mysql-connector-python==9.6.0
requests==2.32.5
unidecode==1.4.0
-646
View File
@@ -1,646 +0,0 @@
## Script para descargar ficheros de spectrum a partir de zxdb
## Imports utilizados en el script
import config
import logging
import mysql.connector
import os
import random
import requests
import shutil
import sqlite3
import time
import zipfile
from mysql.connector import errorcode
from unidecode import unidecode
from urllib.parse import urlparse
from urllib.request import urlretrieve
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
# Configuración del logger
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
# Configuración de base de datos
CONFIG_DB = {
"user": config.DB_USER,
"password": config.DB_PASSWORD,
"host": config.DB_HOST,
"port": config.DB_PORT,
"database": config.DB_NAME,
"raise_on_warnings": True,
}
# Direcciones de internet de donde descargar los datos
URL_PREFIX = {
"spectrum_computing": r"https://spectrumcomputing.co.uk",
"wos": r"https://php.sustancia.synology.me/wos",
"nvg": r"https://php.sustancia.synology.me/nvg",
}
# Rutas locales donde depositar los resultados
DESTINATION_PATH = config.DESTINATION_PATH
CACHE_PATH = config.CACHE_PATH
TEMP_FILE = config.TEMP_FILE
# Parámetros de configuración
SHOULD_CLEAR_DESTINATION_PATH = config.SHOULD_CLEAR_DESTINATION_PATH # Establece si se limpia primero la carpeta de destino
SHOULD_SPLIT_MODERN_AND_CLASSIC = config.SHOULD_SPLIT_MODERN_AND_CLASSIC # Separa los juegos en dos carpetas a partir de un año especificado
SHOULD_SORT_BY_YEAR = config.SHOULD_SORT_BY_YEAR # Separa los juegos por carpetas en función de su año de lanzamiento
SHOULD_SORT_BY_LETTER = config.SHOULD_SORT_BY_LETTER # Separa los juegos por carpetas en función de su primera letra
SHOULD_SORT_BY_DEVELOPER = config.SHOULD_SORT_BY_DEVELOPER # Separa los juegos por desarrollador
WAIT = config.WAIT # Establece una pausa aleatoria entre descargas
# Leer variables numéricas
MIN_WAIT = config.MIN_WAIT # Cantidad de segundos mínima a esperar entre descargas
MAX_WAIT = config.MAX_WAIT # Cantidad de segundos máxima a esperar entre descargas
LAST_CLASSIC_YEAR = config.LAST_CLASSIC_YEAR # Año usado para la separación entre juegos clásicos y modernos
# Tipos de fichero que se guardan en la carpeta raíz del juego
FILETYPES_ON_ROOT = [
"Tape image",
"Disk image",
"Snapshot image",
"POK pokes file",
]
# Resto de variables globales
ELEMENTS = []
# Carga un fichero con consultas SQL
def load_queries(file_path):
with open(file_path, 'r') as file:
queries = file.read().split(';')
return [query.strip() for query in queries if query.strip()]
# Carga las consultas desde el archivo
QUERIES = load_queries('queries.sql')
def select(cursor, query_index=0):
"""
Ejecuta la consulta seleccionada y procesa los resultados.
Parámetros:
cursor (MySQLCursor): El cursor de la base de datos para ejecutar la consulta.
query_index (int): El índice de la consulta a ejecutar en la lista de consultas QUERIES (predeterminado es 0).
Comportamiento:
- Ejecuta la consulta indicada por `query_index` usando el cursor proporcionado.
- Procesa los resultados de la consulta y los guarda en la lista ELEMENTS.
- Cada fila de resultados se convierte en un diccionario con las claves 'title', 'developer', 'release_year', 'url', y 'filetype'.
- Registra un mensaje informativo indicando que la consulta se ejecutó correctamente y el número de resultados obtenidos.
Ejemplo:
>>> cursor = connection.cursor()
>>> select(cursor, query_index=1)
"""
# Ejecutar la consulta seleccionada
cursor.execute(QUERIES[query_index])
# Procesar los resultados
for row in cursor:
element = {
"title": unidecode(row[0]),
"developer": unidecode(row[1]),
"release_year": row[2],
"url": row[3],
"filetype": row[4],
}
ELEMENTS.append(element)
# Registro de consulta ejecutada
logging.info(f"Consulta {query_index} ejecutada correctamente con {len(ELEMENTS)} resultados.")
def connect(query_index=0):
"""
Establece la conexión a la base de datos y ejecuta la consulta.
Parámetros:
query_index (int): Índice de la consulta a ejecutar (predeterminado es 0).
Excepciones:
mysql.connector.Error: Captura y maneja errores específicos de MySQL.
Exception: Captura y maneja cualquier otro error inesperado.
Comportamiento:
- Conecta a la base de datos usando los parámetros de configuración en CONFIG_DB.
- Ejecuta la consulta definida por la función select utilizando el cursor.
- Maneja errores de acceso denegado y base de datos no encontrada, así como cualquier otro error inesperado.
Ejemplo:
>>> connect(1)
"""
try:
with mysql.connector.connect(**CONFIG_DB) as connection:
with connection.cursor() as cursor:
# Ejecutar la consulta 1
select(cursor, query_index=query_index)
except mysql.connector.Error as err:
if err.errno == errorcode.ER_ACCESS_DENIED_ERROR:
logging.error("Algo está mal con tu nombre de usuario o contraseña.")
elif err.errno == errorcode.ER_BAD_DB_ERROR:
logging.error("La base de datos no existe.")
else:
logging.error(err)
except Exception as e:
logging.error(f"Error inesperado: {e}")
def update_url(element, url_prefix):
"""
Añade un prefijo a la URL en el diccionario `element` según el prefijo proporcionado en `url_prefix`.
Parámetros:
element (dict): Diccionario que contiene la clave 'url'.
url_prefix (dict): Diccionario que contiene los prefijos de las URLs para 'spectrum_computing', 'wos' y 'nvg'.
Modifica:
element (dict): Actualiza el valor de 'url' en el diccionario `element` con el prefijo correspondiente.
Ejemplo:
>>> element = {"url": "/zxdb/example"}
>>> url_prefix = {"spectrum_computing": "https://spectrumcomputing.co.uk", "wos": "https://php.sustancia.synology.me/wos", "nvg": "https://php.sustancia.synology.me/nvg"}
>>> update_url(element, url_prefix)
>>> print(element["url"])
https://spectrumcomputing.co.uk/zxdb/example
"""
url = element["url"]
if url.startswith("/zxdb"):
element["url"] = url_prefix["spectrum_computing"] + url
elif url.startswith("/pub"):
element["url"] = url_prefix["wos"] + url[4:]
elif url.startswith("/nvg"):
element["url"] = url_prefix["nvg"] + url[4:]
def process_elements():
"""
Procesa todos los elementos, modificando cada uno de sus parámetros.
Comportamiento:
- Construye el nombre de la carpeta raíz basado en el título y año de lanzamiento, o título, año de lanzamiento y desarrollador.
- Normaliza el nombre de la carpeta raíz.
- Añade el prefijo a la URL y normaliza los enlaces de "wos".
- Obtiene el nombre del fichero a partir de la URL de descarga.
- Establece la subcarpeta dentro de la raíz basada en el tipo de archivo.
- Verifica si el fichero está en formato .zip.
- Calcula el nombre del fichero si es un zip.
Modifica:
- ELEMENTS (list): Actualiza cada diccionario en ELEMENTS con las claves 'game_folder_name', 'url', 'file_name', 'subfolder', 'is_zip', y 'non_zip_file_name'.
Ejemplo:
>>> process_elements()
"""
global ELEMENTS
for i in range(len(ELEMENTS)):
# Construye el nombre de la carpeta raiz
ELEMENTS[i]["game_folder_name"] = f"{ELEMENTS[i]['title']} ({ELEMENTS[i]['release_year']})({ELEMENTS[i]['developer']})"
ELEMENTS[i]["game_folder_name"] = normalize_path(ELEMENTS[i]["game_folder_name"])
# Añade el prefijo a la url y normaliza los enlaces de "wos"
update_url(ELEMENTS[i], URL_PREFIX)
# Obtiene el nombre del fichero a partir de la url de descarga
ELEMENTS[i]["file_name"] = url_filename(ELEMENTS[i]["url"])
# Establece la subcarpeta dentro de la raiz
ELEMENTS[i]["subfolder"] = normalize_path(ELEMENTS[i]["filetype"]) if ELEMENTS[i]["filetype"] not in FILETYPES_ON_ROOT else ""
# Averigua si el fichero está en formato .zip
ELEMENTS[i]["is_zip"] = ELEMENTS[i]["file_name"].lower().endswith(".zip")
# Calcula el nombre del fichero si es un zip
ELEMENTS[i]["non_zip_file_name"] = ELEMENTS[i]["file_name"][:-4] if ELEMENTS[i]["is_zip"] else ELEMENTS[i]["file_name"]
def url_filename(url):
"""
Devuelve el fichero que forma la parte final de una URL.
Parámetros:
url (str): La URL de la cual se quiere extraer el nombre del fichero.
Retorna:
str: El nombre del fichero que forma la parte final de la URL.
Ejemplo:
>>> url = "https://example.com/path/to/file.txt"
>>> url_filename(url)
'file.txt'
"""
parsed_url = urlparse(url)
path = parsed_url.path
filename = os.path.basename(path)
return filename
def download_file(url, destination):
"""
Descarga un fichero a partir de una URL.
Parámetros:
url (str): La URL del fichero que se desea descargar.
destination (str): La ruta de destino donde se guardará el fichero descargado.
Retorna:
bool: True si la descarga fue exitosa, False en caso de error.
Comportamiento:
- Crea una sesión de requests con un adaptador que permite reintentos en caso de fallos.
- Intenta descargar el fichero desde la URL especificada.
- Guarda el contenido del fichero en la ruta de destino especificada.
- Maneja errores de conexión y otros problemas de la red, registrando cualquier error ocurrido.
Ejemplo:
>>> success = download_file("https://example.com/file.zip", "/path/to/destination/file.zip")
>>> if success:
>>> print("Descarga completada con éxito.")
>>> else:
>>> print("Error en la descarga.")
"""
session = requests.Session()
retries = Retry(total=3, backoff_factor=1, status_forcelist=[500, 502, 503, 504])
session.mount('https://', HTTPAdapter(max_retries=retries))
try:
response = session.get(url, timeout=10)
response.raise_for_status()
with open(destination, 'wb') as file:
file.write(response.content)
return True
except requests.exceptions.RequestException as e:
logging.error(f"Error al descargar el archivo: {e}")
return False
def unzip_file(src, dst):
"""
Descomprime los ficheros que coinciden con la lista de extensiones.
Parámetros:
src (str): Ruta del archivo ZIP que se desea descomprimir.
dst (str): Ruta del directorio donde se extraerán los ficheros.
Extensiones soportadas:
.tap, .tzx, .stl, .sna, .z80, .dsk, .fdi, .trd, .img, .mgt, .szx, .dck
Comportamiento:
- Abre el archivo ZIP especificado por `src`.
- Extrae los ficheros que coinciden con las extensiones definidas en el directorio especificado por `dst`.
- Maneja errores de archivo ZIP corrupto, archivo no encontrado y otros errores generales, registrándolos adecuadamente.
Excepciones:
zipfile.BadZipFile: El archivo ZIP está corrupto.
FileNotFoundError: El archivo ZIP no se encontró.
Exception: Cualquier otro error inesperado.
Ejemplo:
>>> unzip_file('/path/to/archivo.zip', '/path/to/destination/')
"""
archive = src
directory = dst
tape_file_formats = (".tap", ".tzx")
snapshot_file_formats = (".stl", ".sna", ".z80")
disk_file_formats = (".dsk", ".fdi", ".trd", ".img", ".mgt")
emulator_specific_formats = (".szx", ".dck")
# Sumar todas las listas anteriores
extensions = tape_file_formats + snapshot_file_formats + disk_file_formats + emulator_specific_formats
try:
with zipfile.ZipFile(archive, "r") as zip_file:
for file in zip_file.namelist():
if file.lower().endswith(extensions):
zip_file.extract(file, directory)
# logging.info(f"Archivo {file} extraído a {directory}")
except zipfile.BadZipFile:
logging.error("El archivo ZIP está corrupto.")
except FileNotFoundError:
logging.error("El archivo ZIP no se encontró.")
except Exception as e:
logging.error(f"Ocurrió un error: {e}")
def print_status(current_file, total_files, element, total_files_width, status="cached"):
"""
Imprime el estado de un archivo en el proceso de descarga.
Parámetros:
current_file (int): El número del archivo actual en el proceso de descarga.
total_files (int): El número total de archivos en el proceso de descarga.
element (dict): Diccionario que contiene información del archivo actual, incluyendo 'file_name' y 'filetype'.
total_files_width (int): El ancho del campo para el número total de archivos, para un formato de impresión alineado.
status (str): El estado del archivo (por defecto es "cached").
Comportamiento:
- Imprime el estado del archivo actual en el proceso de descarga de una manera formateada y alineada.
Ejemplo:
>>> element = {"file_name": "example.zip", "filetype": "zip"}
>>> print_status(1, 100, element, 3)
( 1 / 100) : cached : example.zip (zip)
"""
print(
"({:{width}} / {}) : {:<10} : {} ({})".format(
current_file,
total_files,
status,
element["file_name"],
element["filetype"],
width=total_files_width,
)
)
def get_final_destination_folder(title, year, developer):
"""
Compone la carpeta de destino en función de varios parámetros.
Parámetros:
developer (str): Nombre del desarrollador del juego.
year (int o str): Año de lanzamiento del juego.
game_folder_name (str): Nombre de la carpeta raíz.
Retorna:
str: La ruta completa de la carpeta de destino final.
Comportamiento:
- Determina la carpeta base (`folder1`) en función de si el juego es clásico o moderno, el desarrollador o el año.
- Determina la subcarpeta (`folder2`) basada en el desarrollador, si está habilitado.
- Determina la subcarpeta (`folder3`) basada en el año de lanzamiento, si está habilitado.
- Determina la subcarpeta (`folder4`) basada en la primera letra del nombre de la carpeta raíz, si está habilitado.
- Combina las carpetas calculadas y la carpeta raíz para obtener la ruta final de la carpeta de destino.
Ejemplo:
>>> get_final_destination_folder("Nintendo", 1985, "Super Mario")
'by_developer/N/Nintendo/1985/Super Mario'
"""
# Compone el nombre de la carpeta de destino del juego
game_folder_name = f"{title} ({year})" if SHOULD_SORT_BY_DEVELOPER else f"{title} ({year})({developer})"
game_folder_name = normalize_path(game_folder_name)
# Carpeta basada en los años de vida comercial del spectrum o el tipo de agrupación
folder1 = ""
if SHOULD_SPLIT_MODERN_AND_CLASSIC:
folder1 = "modern" if year == "none" or year is None or year > LAST_CLASSIC_YEAR else "classics"
elif SHOULD_SORT_BY_DEVELOPER:
folder1 = "by_developer"
elif SHOULD_SORT_BY_YEAR:
folder1 = "by_year"
# Carpeta basada en el desarrollador
folder2 = ""
if SHOULD_SORT_BY_DEVELOPER:
developer = developer or ""
folder2 = os.path.join("0-9", developer) if developer[0].isdigit() else os.path.join(developer[0].upper(), developer)
# Carpeta basada en el año de lanzamiento
folder3 = ""
if SHOULD_SORT_BY_YEAR:
folder3 = str(year) if year not in [None, "none"] else "unknown"
# Carpeta basada en la primera letra del nombre de la carpeta de destino del juego
folder4 = ""
if SHOULD_SORT_BY_LETTER:
folder4 = "0-9" if game_folder_name[0].isdigit() else game_folder_name[0].upper()
# Combina los prefijos y la carpeta raíz para obtener la carpeta de destino final
return os.path.join(folder1, folder2, folder3, folder4, game_folder_name)
def process_cache_file(cache_file, destination_subfolder, destination_file, element):
"""
Crea las carpetas de destino y copia o extrae el archivo de la caché.
Parámetros:
cache_file (str): Ruta del archivo en la caché.
destination_subfolder (str): Ruta del subdirectorio de destino donde se guardarán los archivos extraídos.
destination_file (str): Ruta del archivo de destino si no se extrae.
element (dict): Diccionario que contiene información del archivo, incluyendo si tiene subcarpeta específica.
Comportamiento:
- Crea las carpetas de destino necesarias.
- Si el archivo en caché es un zip y no tiene subcarpeta especificada, descomprime el archivo en el subdirectorio de destino.
- Si no, copia el archivo en caché al archivo de destino especificado.
Ejemplo:
>>> process_cache_file('/path/to/cache.zip', '/path/to/destination/subfolder', '/path/to/destination/file', element)
"""
os.makedirs(destination_subfolder, exist_ok=True)
if cache_file.endswith(".zip") and element["subfolder"] == "":
unzip_file(cache_file, destination_subfolder)
else:
shutil.copyfile(cache_file, destination_file)
def get_files():
"""
Obtiene los ficheros de la consulta desde internet o desde la caché y los deposita en la carpeta destino,
descomprimiendo los archivos necesarios.
Comportamiento:
- Presenta en pantalla el progreso de la descarga.
- Para cada elemento en ELEMENTS:
- Calcula la carpeta de clasificación basada en el desarrollador, año de lanzamiento y carpeta raíz.
- Determina las rutas de la carpeta de destino y de caché.
- Si el fichero no existe en la carpeta de destino:
- Si el fichero existe en la caché, lo copia al destino.
- Si no existe en la caché, lo descarga y lo guarda en la caché y en el destino.
- Si el fichero ya existe en el destino, lo omite.
- Maneja errores durante el procesamiento, registrándolos adecuadamente.
Ejemplo:
>>> get_files()
"""
# Variables para la presentación en pantalla de la descarga
current_file = 0
total_files = len(ELEMENTS)
total_files_width = len(str(total_files))
last_game_folder = ""
for element in ELEMENTS:
classification_folder = get_final_destination_folder(element["title"], element["release_year"], element["developer"])
destination_folder = os.path.join(DESTINATION_PATH, classification_folder)
destination_subfolder = os.path.join(destination_folder, element["subfolder"])
cache_folder = os.path.join(CACHE_PATH, element["game_folder_name"])
cache_subfolder = os.path.join(cache_folder, element["subfolder"])
# Ruta completa hasta el fichero de destino y de caché
destination_file = os.path.join(destination_subfolder, element["file_name"])
cache_file = os.path.join(cache_subfolder, element["file_name"])
# Actualiza las variables de presentación
current_file += 1
if element["game_folder_name"] != last_game_folder:
print("\n{}".format(element["game_folder_name"]))
last_game_folder = element["game_folder_name"]
try:
# Si el fichero no existe en la carpeta de destino
if not os.path.isfile(destination_file) and not os.path.isfile(os.path.join(destination_subfolder, element["non_zip_file_name"])):
# Si existe en la caché, lo copia
if os.path.isfile(cache_file):
process_cache_file(cache_file, destination_subfolder, destination_file, element)
print_status(current_file, total_files, element, total_files_width, status="cached")
# Si no existe en la caché, lo descarga
else:
if download_file(element["url"], TEMP_FILE):
print_status(current_file, total_files, element, total_files_width, status="downloaded")
if os.path.isfile(TEMP_FILE):
# Mueve el fichero temporal descargado a la cache
os.makedirs(cache_subfolder, exist_ok=True)
shutil.move(TEMP_FILE, cache_file)
# Copia el fichero de la cache al destino
if os.path.isfile(cache_file):
process_cache_file(cache_file, destination_subfolder, destination_file, element)
else:
print_status(current_file, total_files, element, total_files_width, status="not found")
if WAIT:
time.sleep(random.randint(MIN_WAIT, MAX_WAIT))
# Si el fichero ya existe en el destino, no hace nada
else:
print_status(current_file, total_files, element, total_files_width, status="skipping")
except Exception as e:
logging.error(f"Error al procesar el fichero {element['file_name']}: {e}")
def normalize_path(path):
"""
Elimina los caracteres ilegales de la cadena de texto.
Parámetros:
path (str): La cadena de texto que se desea normalizar.
Retorna:
str: La cadena de texto normalizada sin los caracteres ilegales.
Comportamiento:
- Reemplaza los caracteres ilegales en la cadena de texto con un carácter vacío.
- Los caracteres ilegales incluyen: <, >, :, ", /, \\, |, ?, *.
- Utiliza la función unidecode para manejar caracteres Unicode.
Ejemplo:
>>> normalize_path("file:name/with|illegal*chars?")
'filenamewithillegalchars'
"""
illegal_chars = ["<", ">", ":", '"', "/", "\\", "|", "?", "*"]
replace_with = ""
for char in illegal_chars:
path = unidecode(path.replace(char, replace_with))
return path
def remove_empty_directories(path):
"""
Elimina los subdirectorios vacíos.
Parámetros:
path (str): La ruta del directorio raíz desde donde se iniciará la eliminación de subdirectorios vacíos.
Comportamiento:
- Recorre la estructura de directorios desde el `path` especificado, de forma descendente.
- Intenta eliminar cada subdirectorio encontrado.
- Registra un mensaje informativo si un subdirectorio vacío es eliminado.
- Si no se puede eliminar un directorio porque no está vacío u ocurre otro error, el error es ignorado.
Ejemplo:
>>> remove_empty_directories('/path/to/directory')
"""
for root, dirs, files in os.walk(path, topdown=False):
for dir in dirs:
dir_path = os.path.join(root, dir)
try:
os.rmdir(dir_path)
logging.info(f"Directorio vacío eliminado: {dir_path}")
except OSError as e:
# El directorio no está vacío o ocurrió otro error
pass
def clear_destination_folder():
"""
Limpia la carpeta de destino.
Comportamiento:
- Verifica si la opción de limpiar la carpeta de destino (`SHOULD_CLEAR_DESTINATION_PATH`) está habilitada.
- Si está habilitada, elimina todos los archivos y subdirectorios en la carpeta de destino (`DESTINATION_PATH`).
- Registra un mensaje informativo para cada archivo o directorio eliminado.
- Maneja y registra cualquier error que ocurra durante la eliminación.
Ejemplo:
>>> clear_destination_folder()
"""
if SHOULD_CLEAR_DESTINATION_PATH:
logging.info("Limpiando la carpeta de destino ...")
for filename in os.listdir(DESTINATION_PATH):
file_path = os.path.join(DESTINATION_PATH, filename)
try:
if os.path.isfile(file_path) or os.path.islink(file_path):
os.unlink(file_path)
logging.info(f"Archivo eliminado: {file_path}")
elif os.path.isdir(file_path):
shutil.rmtree(file_path)
logging.info(f"Directorio eliminado: {file_path}")
except Exception as e:
logging.error(f'No se pudo eliminar {file_path}. Razón: {e}')
def print_elements(mode=0):
"""
Imprime la lista de elementos en diferentes modos.
Parámetros:
mode (int): El modo de impresión.
- Si es 0, imprime todos los elementos con sus claves y valores.
- Si es 1, imprime los nombres de las carpetas raíz eliminando duplicados y muestra el número de entradas únicas.
Comportamiento:
- Modo 0: Recorre todos los elementos en ELEMENTS e imprime cada clave y valor en un formato legible.
- Modo 1: Elimina duplicados basándose en la carpeta raíz ('game_folder_name') y los imprime. Luego, muestra el número total de entradas únicas.
Ejemplo:
>>> print_elements(0)
>>> print_elements(1)
"""
if mode == 0:
# Primer bucle for
for element in ELEMENTS:
print('')
for key, value in element.items():
print(key, ':', value)
elif mode == 1:
# Segundo bucle for con eliminación de duplicados
seen = set()
for element in ELEMENTS:
game_folder_name = element['game_folder_name']
if game_folder_name not in seen:
print(game_folder_name)
seen.add(game_folder_name)
# Imprimir el número de elementos únicos
print(f"Número de entradas: {len(seen)}")
def main():
"""
Bucle principal que ejecuta las principales funciones del programa.
Comportamiento:
- Establece la conexión a la base de datos y ejecuta la consulta con `query_index=3`.
- Procesa todos los elementos, modificando sus parámetros.
- Imprime la lista de elementos en el modo 1, que elimina duplicados y muestra el número total de entradas únicas.
- Limpia la carpeta de destino si la opción está habilitada.
- Obtiene los archivos desde internet o desde la caché, y los deposita en la carpeta de destino, descomprimiendo los necesarios.
- Elimina los subdirectorios vacíos en la carpeta de destino.
Ejemplo:
>>> main()
"""
connect(query_index=3)
process_elements()
print_elements(mode=1)
clear_destination_folder()
get_files()
remove_empty_directories(DESTINATION_PATH)
if __name__ == "__main__":
main()
View File
+79
View File
@@ -0,0 +1,79 @@
import logging
import mysql.connector
from mysql.connector import errorcode
from pathlib import Path
from unidecode import unidecode
import config
CONFIG_DB = {
"user": config.DB_USER,
"password": config.DB_PASSWORD,
"host": "127.0.0.1",
"port": config.DB_PORT,
"database": config.DB_NAME,
"raise_on_warnings": True,
}
def load_queries():
"""Carga las consultas SQL desde el fichero queries.sql junto a este módulo."""
file_path = Path(__file__).parent / "queries.sql"
with open(file_path, 'r') as file:
queries = file.read().split(';')
return [query.strip() for query in queries if query.strip()]
def select(cursor, query_index=0):
"""
Ejecuta la consulta seleccionada y devuelve los resultados como lista de diccionarios.
Parámetros:
cursor (MySQLCursor): El cursor de la base de datos para ejecutar la consulta.
query_index (int): El índice de la consulta a ejecutar (predeterminado es 0).
Retorna:
list: Lista de diccionarios con las claves 'title', 'developer', 'release_year', 'url', 'filetype'.
"""
queries = load_queries()
cursor.execute(queries[query_index])
elements = []
for row in cursor:
element = {
"title": unidecode(row[0]),
"developer": unidecode(row[1]),
"release_year": row[2],
"url": row[3],
"filetype": row[4],
}
elements.append(element)
logging.info(f"Consulta {query_index} ejecutada correctamente con {len(elements)} resultados.")
return elements
def connect(query_index=0):
"""
Establece la conexión a la base de datos, ejecuta la consulta y devuelve los resultados.
Parámetros:
query_index (int): Índice de la consulta a ejecutar (predeterminado es 0).
Retorna:
list: Lista de elementos obtenidos de la consulta, o lista vacía en caso de error.
"""
try:
with mysql.connector.connect(**CONFIG_DB) as connection:
with connection.cursor() as cursor:
return select(cursor, query_index=query_index)
except mysql.connector.Error as err:
if err.errno == errorcode.ER_ACCESS_DENIED_ERROR:
logging.error("Algo está mal con tu nombre de usuario o contraseña.")
elif err.errno == errorcode.ER_BAD_DB_ERROR:
logging.error("La base de datos no existe.")
else:
logging.error(err)
except Exception as e:
logging.error(f"Error inesperado: {e}")
return []
+144
View File
@@ -0,0 +1,144 @@
import logging
import os
import random
import shutil
import time
import zipfile
import requests
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
import config
from zxdb.organizer import get_final_destination_folder
from zxdb.filesystem import print_status
def download_file(url, destination):
"""
Descarga un fichero a partir de una URL.
Parámetros:
url (str): La URL del fichero que se desea descargar.
destination (str): La ruta de destino donde se guardará el fichero descargado.
Retorna:
bool: True si la descarga fue exitosa, False en caso de error.
"""
session = requests.Session()
retries = Retry(total=3, backoff_factor=1, status_forcelist=[500, 502, 503, 504])
session.mount('https://', HTTPAdapter(max_retries=retries))
try:
response = session.get(url, timeout=10)
response.raise_for_status()
with open(destination, 'wb') as file:
file.write(response.content)
return True
except requests.exceptions.RequestException as e:
logging.error(f"Error al descargar el archivo: {e}")
return False
def unzip_file(src, dst):
"""
Descomprime los ficheros que coinciden con la lista de extensiones.
Parámetros:
src (str): Ruta del archivo ZIP que se desea descomprimir.
dst (str): Ruta del directorio donde se extraerán los ficheros.
Extensiones soportadas:
.tap, .tzx, .stl, .sna, .z80, .dsk, .fdi, .trd, .img, .mgt, .szx, .dck
"""
tape_file_formats = (".tap", ".tzx")
snapshot_file_formats = (".stl", ".sna", ".z80")
disk_file_formats = (".dsk", ".fdi", ".trd", ".img", ".mgt")
emulator_specific_formats = (".szx", ".dck")
extensions = tape_file_formats + snapshot_file_formats + disk_file_formats + emulator_specific_formats
try:
with zipfile.ZipFile(src, "r") as zip_file:
for file in zip_file.namelist():
if file.lower().endswith(extensions):
zip_file.extract(file, dst)
except zipfile.BadZipFile:
logging.error("El archivo ZIP está corrupto.")
except FileNotFoundError:
logging.error("El archivo ZIP no se encontró.")
except Exception as e:
logging.error(f"Ocurrió un error: {e}")
def process_cache_file(cache_file, destination_subfolder, destination_file, element):
"""
Crea las carpetas de destino y copia o extrae el archivo de la caché.
Parámetros:
cache_file (str): Ruta del archivo en la caché.
destination_subfolder (str): Ruta del subdirectorio de destino.
destination_file (str): Ruta del archivo de destino si no se extrae.
element (dict): Diccionario que contiene información del archivo.
"""
os.makedirs(destination_subfolder, exist_ok=True)
if cache_file.endswith(".zip") and element["subfolder"] == "":
unzip_file(cache_file, destination_subfolder)
else:
shutil.copyfile(cache_file, destination_file)
def get_files(elements):
"""
Obtiene los ficheros desde internet o desde la caché y los deposita en la carpeta destino.
Parámetros:
elements (list): Lista de diccionarios con la información de cada fichero a procesar.
"""
current_file = 0
total_files = len(elements)
total_files_width = len(str(total_files))
last_game_folder = ""
for element in elements:
classification_folder = get_final_destination_folder(element["title"], element["release_year"], element["developer"])
destination_folder = os.path.join(config.DESTINATION_PATH, classification_folder)
destination_subfolder = os.path.join(destination_folder, element["subfolder"])
cache_folder = os.path.join(config.CACHE_PATH, element["game_folder_name"])
cache_subfolder = os.path.join(cache_folder, element["subfolder"])
destination_file = os.path.join(destination_subfolder, element["file_name"])
cache_file = os.path.join(cache_subfolder, element["file_name"])
current_file += 1
if element["game_folder_name"] != last_game_folder:
print("\n{}".format(element["game_folder_name"]))
last_game_folder = element["game_folder_name"]
try:
if not os.path.isfile(destination_file) and not os.path.isfile(os.path.join(destination_subfolder, element["non_zip_file_name"])):
if os.path.isfile(cache_file):
process_cache_file(cache_file, destination_subfolder, destination_file, element)
print_status(current_file, total_files, element, total_files_width, status="cached")
else:
if download_file(element["url"], config.TEMP_FILE):
print_status(current_file, total_files, element, total_files_width, status="downloaded")
if os.path.isfile(config.TEMP_FILE):
os.makedirs(cache_subfolder, exist_ok=True)
shutil.move(config.TEMP_FILE, cache_file)
if os.path.isfile(cache_file):
process_cache_file(cache_file, destination_subfolder, destination_file, element)
else:
print_status(current_file, total_files, element, total_files_width, status="not found")
if config.WAIT:
time.sleep(random.randint(config.MIN_WAIT, config.MAX_WAIT))
else:
print_status(current_file, total_files, element, total_files_width, status="skipping")
except Exception as e:
logging.error(f"Error al procesar el fichero {element['file_name']}: {e}")
+89
View File
@@ -0,0 +1,89 @@
import logging
import os
import shutil
import config
def print_status(current_file, total_files, element, total_files_width, status="cached"):
"""
Imprime el estado de un archivo en el proceso de descarga.
Parámetros:
current_file (int): El número del archivo actual en el proceso de descarga.
total_files (int): El número total de archivos en el proceso de descarga.
element (dict): Diccionario con información del archivo actual ('file_name' y 'filetype').
total_files_width (int): Ancho del campo para el número total de archivos.
status (str): El estado del archivo (predeterminado es "cached").
"""
print(
"({:{width}} / {}) : {:<10} : {} ({})".format(
current_file,
total_files,
status,
element["file_name"],
element["filetype"],
width=total_files_width,
)
)
def print_elements(elements, mode=0):
"""
Imprime la lista de elementos en diferentes modos.
Parámetros:
elements (list): Lista de diccionarios con los datos de los elementos.
mode (int): El modo de impresión.
- Si es 0, imprime todos los elementos con sus claves y valores.
- Si es 1, imprime los nombres de carpeta únicos y el número total.
"""
if mode == 0:
for element in elements:
print('')
for key, value in element.items():
print(key, ':', value)
elif mode == 1:
seen = set()
for element in elements:
game_folder_name = element['game_folder_name']
if game_folder_name not in seen:
print(game_folder_name)
seen.add(game_folder_name)
print(f"Número de entradas: {len(seen)}")
def clear_destination_folder():
"""
Limpia la carpeta de destino si la opción está habilitada en la configuración.
"""
if config.SHOULD_CLEAR_DESTINATION_PATH:
logging.info("Limpiando la carpeta de destino ...")
for filename in os.listdir(config.DESTINATION_PATH):
file_path = os.path.join(config.DESTINATION_PATH, filename)
try:
if os.path.isfile(file_path) or os.path.islink(file_path):
os.unlink(file_path)
logging.info(f"Archivo eliminado: {file_path}")
elif os.path.isdir(file_path):
shutil.rmtree(file_path)
logging.info(f"Directorio eliminado: {file_path}")
except Exception as e:
logging.error(f'No se pudo eliminar {file_path}. Razón: {e}')
def remove_empty_directories(path):
"""
Elimina los subdirectorios vacíos de forma recursiva.
Parámetros:
path (str): La ruta del directorio raíz desde donde se inicia la eliminación.
"""
for root, dirs, files in os.walk(path, topdown=False):
for dir in dirs:
dir_path = os.path.join(root, dir)
try:
os.rmdir(dir_path)
logging.info(f"Directorio vacío eliminado: {dir_path}")
except OSError:
pass
+135
View File
@@ -0,0 +1,135 @@
import os
from unidecode import unidecode
import config
URL_PREFIX = {
"spectrum_computing": r"https://spectrumcomputing.co.uk",
"wos": r"https://php.sustancia.synology.me/wos",
"nvg": r"https://php.sustancia.synology.me/nvg",
}
FILETYPES_ON_ROOT = [
"Tape image",
"Disk image",
"Snapshot image",
"POK pokes file",
]
def normalize_path(path):
"""
Elimina los caracteres ilegales de la cadena de texto.
Parámetros:
path (str): La cadena de texto que se desea normalizar.
Retorna:
str: La cadena de texto normalizada sin los caracteres ilegales.
"""
illegal_chars = ["<", ">", ":", '"', "/", "\\", "|", "?", "*"]
replace_with = ""
for char in illegal_chars:
path = unidecode(path.replace(char, replace_with))
return path
def url_filename(url):
"""
Devuelve el fichero que forma la parte final de una URL.
Parámetros:
url (str): La URL de la cual se quiere extraer el nombre del fichero.
Retorna:
str: El nombre del fichero que forma la parte final de la URL.
"""
from urllib.parse import urlparse
parsed_url = urlparse(url)
path = parsed_url.path
filename = os.path.basename(path)
return filename
def update_url(element, url_prefix):
"""
Añade un prefijo a la URL en el diccionario `element` según el prefijo proporcionado en `url_prefix`.
Parámetros:
element (dict): Diccionario que contiene la clave 'url'.
url_prefix (dict): Diccionario con los prefijos para 'spectrum_computing', 'wos' y 'nvg'.
"""
url = element["url"]
if url.startswith("/zxdb"):
element["url"] = url_prefix["spectrum_computing"] + url
elif url.startswith("/pub"):
element["url"] = url_prefix["wos"] + url[4:]
elif url.startswith("/nvg"):
element["url"] = url_prefix["nvg"] + url[4:]
def process_elements(elements):
"""
Procesa todos los elementos, enriqueciendo cada diccionario con metadatos adicionales.
Parámetros:
elements (list): Lista de diccionarios con los datos crudos de la base de datos.
Retorna:
list: La misma lista con cada elemento enriquecido con 'game_folder_name', 'url',
'file_name', 'subfolder', 'is_zip' y 'non_zip_file_name'.
"""
for i in range(len(elements)):
elements[i]["game_folder_name"] = f"{elements[i]['title']} ({elements[i]['release_year']})({elements[i]['developer']})"
elements[i]["game_folder_name"] = normalize_path(elements[i]["game_folder_name"])
update_url(elements[i], URL_PREFIX)
elements[i]["file_name"] = url_filename(elements[i]["url"])
elements[i]["subfolder"] = normalize_path(elements[i]["filetype"]) if elements[i]["filetype"] not in FILETYPES_ON_ROOT else ""
elements[i]["is_zip"] = elements[i]["file_name"].lower().endswith(".zip")
elements[i]["non_zip_file_name"] = elements[i]["file_name"][:-4] if elements[i]["is_zip"] else elements[i]["file_name"]
return elements
def get_final_destination_folder(title, year, developer):
"""
Compone la carpeta de destino en función de varios parámetros de configuración.
Parámetros:
title (str): Título del juego.
year (int o str): Año de lanzamiento del juego.
developer (str): Nombre del desarrollador del juego.
Retorna:
str: La ruta relativa de la carpeta de destino final.
"""
game_folder_name = f"{title} ({year})" if config.SHOULD_SORT_BY_DEVELOPER else f"{title} ({year})({developer})"
game_folder_name = normalize_path(game_folder_name)
folder1 = ""
if config.SHOULD_SPLIT_MODERN_AND_CLASSIC:
folder1 = "modern" if year == "none" or year is None or year > config.LAST_CLASSIC_YEAR else "classics"
elif config.SHOULD_SORT_BY_DEVELOPER:
folder1 = "by_developer"
elif config.SHOULD_SORT_BY_YEAR:
folder1 = "by_year"
folder2 = ""
if config.SHOULD_SORT_BY_DEVELOPER:
developer = developer or ""
folder2 = os.path.join("0-9", developer) if developer[0].isdigit() else os.path.join(developer[0].upper(), developer)
folder3 = ""
if config.SHOULD_SORT_BY_YEAR:
folder3 = str(year) if year not in [None, "none"] else "unknown"
folder4 = ""
if config.SHOULD_SORT_BY_LETTER:
folder4 = "0-9" if game_folder_name[0].isdigit() else game_folder_name[0].upper()
return os.path.join(folder1, folder2, folder3, folder4, game_folder_name)
View File
View File
+154
View File
@@ -0,0 +1,154 @@
"""Orchestrator for the full ZXDB setup flow.
Usage:
python -m zxdb.setup
"""
import json
import logging
import sys
from datetime import datetime, timezone
from pathlib import Path
import requests
sys.path.insert(0, str(Path(__file__).parent.parent.parent))
import config
from zxdb.setup.database_import import download_zxdb
from zxdb.setup.docker_manager import (
db_exists,
ensure_container,
get_db_creation_date,
import_sql,
wait_for_mysql,
)
logging.basicConfig(level=logging.INFO, format="%(levelname)s: %(message)s")
logger = logging.getLogger(__name__)
GITHUB_COMMITS_URL = (
"https://api.github.com/repos/zxdb/ZXDB/commits"
"?path=ZXDB_mysql.sql.zip&per_page=1"
)
def get_github_commit_date() -> str | None:
"""Return the ISO 8601 date of the latest commit touching ZXDB_mysql.sql.zip, or None."""
try:
response = requests.get(
GITHUB_COMMITS_URL,
headers={"Accept": "application/vnd.github+json"},
timeout=15,
)
response.raise_for_status()
commits = response.json()
if commits:
return commits[0]["commit"]["committer"]["date"]
logger.warning("GitHub returned an empty commit list.")
except Exception as e:
logger.warning("Could not fetch GitHub commit date: %s", e)
return None
def load_state() -> dict:
"""Load state from ZXDB_STATE_FILE. Returns empty dict if file is missing or invalid."""
path = Path(config.ZXDB_STATE_FILE)
if path.exists():
try:
return json.loads(path.read_text())
except Exception as e:
logger.warning("Could not read state file: %s", e)
return {}
def save_state(github_commit_date: str | None) -> None:
"""Save current import state to ZXDB_STATE_FILE."""
state = {
"github_commit_date": github_commit_date,
"last_import": datetime.now(timezone.utc).isoformat(),
}
path = Path(config.ZXDB_STATE_FILE)
path.write_text(json.dumps(state, indent=2))
logger.info("State saved to %s", path)
def main() -> int:
# 1. Ensure Docker container is running
container = ensure_container()
# 2. Wait for MySQL to be ready
if not wait_for_mysql(container):
logger.error("MySQL did not become ready in time.")
return 1
# 3. Fetch latest GitHub commit date for the SQL file
github_commit_date = get_github_commit_date()
# 4. Load persisted state
state = load_state()
# 5. Decide whether to import
needs_import = False
saved_commit_date = state.get("github_commit_date")
if not db_exists(container):
logger.info("Database 'zxdb' not found — import required.")
needs_import = True
elif saved_commit_date is None:
# No state file: compare GitHub date against actual DB creation time.
db_date = get_db_creation_date(container)
if db_date is None:
logger.warning("Cannot determine DB creation date; assuming up to date.")
save_state(github_commit_date)
elif github_commit_date is None:
logger.warning("Cannot reach GitHub; assuming DB is up to date.")
save_state(None)
else:
gh_date = datetime.fromisoformat(github_commit_date.replace("Z", "+00:00"))
if gh_date > db_date:
logger.info(
"GitHub version (%s) is newer than DB creation date (%s) — import required.",
github_commit_date,
db_date.strftime("%Y-%m-%d %H:%M:%S UTC"),
)
needs_import = True
else:
logger.info(
"DB creation date (%s) is current — no import needed.",
db_date.strftime("%Y-%m-%d"),
)
save_state(github_commit_date)
elif github_commit_date is None:
logger.warning("Cannot determine GitHub version; skipping import (DB already exists).")
elif github_commit_date > saved_commit_date:
logger.info(
"New ZXDB version detected (%s > %s) — import required.",
github_commit_date,
saved_commit_date,
)
needs_import = True
else:
logger.info("ZXDB is up to date, skipping import.")
if not needs_import:
return 0
# 6. Download and extract the SQL file
sql_dir = str(Path(config.ZXDB_SQL_PATH).parent)
sql_path = download_zxdb(sql_dir)
if sql_path is None:
logger.error("Download failed.")
return 1
# 7. Import into MySQL
if not import_sql(container, sql_path):
logger.error("SQL import failed.")
return 1
# 8. Persist state
save_state(github_commit_date)
return 0
if __name__ == "__main__":
sys.exit(main())
+67
View File
@@ -0,0 +1,67 @@
import logging
import zipfile
from pathlib import Path
import requests
ZXDB_ZIP_URL = "https://github.com/zxdb/ZXDB/raw/master/ZXDB_mysql.sql.zip"
ZXDB_SQL_FILENAME = "ZXDB_mysql.sql"
logging.basicConfig(level=logging.INFO, format="%(levelname)s: %(message)s")
logger = logging.getLogger(__name__)
def download_zxdb(destination: str) -> Path | None:
"""Download ZXDB_mysql.sql.zip from GitHub and extract ZXDB_mysql.sql to destination.
Returns the path to the extracted .sql file, or None on error.
"""
dest_dir = Path(destination)
dest_dir.mkdir(parents=True, exist_ok=True)
zip_path = dest_dir / "ZXDB_mysql.sql.zip"
sql_path = dest_dir / ZXDB_SQL_FILENAME
if sql_path.exists():
logger.info("Using cached %s (already downloaded).", sql_path)
return sql_path
logger.info("Downloading %s ...", ZXDB_ZIP_URL)
try:
with requests.get(ZXDB_ZIP_URL, stream=True, timeout=60) as response:
response.raise_for_status()
total = int(response.headers.get("content-length", 0))
downloaded = 0
with open(zip_path, "wb") as f:
for chunk in response.iter_content(chunk_size=1024 * 1024):
f.write(chunk)
downloaded += len(chunk)
if total:
pct = downloaded * 100 // total
logger.info(" %d%% (%d / %d bytes)", pct, downloaded, total)
except requests.RequestException as e:
logger.error("Download failed: %s", e)
zip_path.unlink(missing_ok=True)
return None
logger.info("Extracting %s ...", ZXDB_SQL_FILENAME)
try:
with zipfile.ZipFile(zip_path) as zf:
zf.extract(ZXDB_SQL_FILENAME, dest_dir)
except (zipfile.BadZipFile, KeyError) as e:
logger.error("Extraction failed: %s", e)
zip_path.unlink(missing_ok=True)
return None
zip_path.unlink(missing_ok=True)
logger.info("Extracted to %s", sql_path)
return sql_path
if __name__ == "__main__":
import sys
sys.path.insert(0, str(Path(__file__).parent.parent.parent))
import config
result = download_zxdb(str(Path(config.ZXDB_SQL_PATH).parent))
if result is None:
sys.exit(1)
+138
View File
@@ -0,0 +1,138 @@
import logging
import subprocess
import time
from datetime import datetime, timezone
from pathlib import Path
logger = logging.getLogger(__name__)
CONTAINER_NAME = "mysql"
def _run(args: list[str], **kwargs) -> subprocess.CompletedProcess:
return subprocess.run(args, capture_output=True, text=True, **kwargs)
def ensure_container() -> str:
"""Ensure the MySQL Docker container exists and is running. Returns container name."""
import sys
sys.path.insert(0, str(Path(__file__).parent.parent.parent))
import config
result = _run(["docker", "ps", "-a", "--filter", f"name=^{CONTAINER_NAME}$", "--format", "{{.Names}}"])
existing = result.stdout.strip()
if existing == CONTAINER_NAME:
# Check if it's running
running = _run(["docker", "ps", "--filter", f"name=^{CONTAINER_NAME}$", "--format", "{{.Names}}"])
if running.stdout.strip() != CONTAINER_NAME:
logger.info("Starting existing container '%s'...", CONTAINER_NAME)
_run(["docker", "start", CONTAINER_NAME], check=True)
else:
logger.info("Container '%s' is already running.", CONTAINER_NAME)
else:
logger.info("Creating new MySQL container '%s'...", CONTAINER_NAME)
_run([
"docker", "run", "-d",
"--name", CONTAINER_NAME,
"-p", "3306:3306",
"-e", f"MYSQL_ROOT_PASSWORD={config.DB_PASSWORD}",
"mysql:latest",
], check=True)
return CONTAINER_NAME
def wait_for_mysql(container: str, timeout: int = 60) -> bool:
"""Poll until MySQL is ready or timeout (seconds) is reached. Returns True if ready."""
import sys
sys.path.insert(0, str(Path(__file__).parent.parent.parent))
import config
logger.info("Waiting for MySQL to be ready (timeout=%ds)...", timeout)
deadline = time.time() + timeout
while time.time() < deadline:
result = _run([
"docker", "exec", container,
"mysqladmin", "ping", "-h", "127.0.0.1",
f"-p{config.DB_PASSWORD}", "--silent",
])
if result.returncode == 0:
logger.info("MySQL is ready.")
return True
time.sleep(2)
logger.error("Timed out waiting for MySQL.")
return False
def db_exists(container: str) -> bool:
"""Return True if the 'zxdb' database exists in the container."""
import sys
sys.path.insert(0, str(Path(__file__).parent.parent.parent))
import config
result = _run([
"docker", "exec", container,
"mysql", "-u", "root", f"-p{config.DB_PASSWORD}",
"-e", "SHOW DATABASES LIKE 'zxdb';",
])
return "zxdb" in result.stdout
def get_db_creation_date(container: str) -> datetime | None:
"""Return the approximate import datetime of the zxdb database, or None."""
import sys
sys.path.insert(0, str(Path(__file__).parent.parent.parent))
import config
result = _run([
"docker", "exec", container,
"mysql", "-u", "root", f"-p{config.DB_PASSWORD}", "--skip-column-names", "-e",
"SELECT MIN(CREATE_TIME) FROM information_schema.TABLES WHERE TABLE_SCHEMA = 'zxdb' AND CREATE_TIME IS NOT NULL;",
])
value = result.stdout.strip()
if not value or value == "NULL":
return None
# MySQL format: "2024-11-12 17:07:23"
return datetime.strptime(value, "%Y-%m-%d %H:%M:%S").replace(tzinfo=timezone.utc)
def stop_container(container: str) -> None:
"""Stop the MySQL Docker container."""
logger.info("Stopping container '%s'...", container)
_run(["docker", "stop", container])
def import_sql(container: str, sql_path: Path) -> bool:
"""Create the zxdb database if needed and import the SQL file. Returns True on success."""
import sys
sys.path.insert(0, str(Path(__file__).parent.parent.parent))
import config
# Drop and recreate database to ensure a clean import
create_result = _run([
"docker", "exec", container,
"mysql", "-u", "root", f"-p{config.DB_PASSWORD}",
"-e", "DROP DATABASE IF EXISTS zxdb; CREATE DATABASE zxdb CHARACTER SET utf8mb4;",
])
if create_result.returncode != 0:
logger.error("Failed to recreate database: %s", create_result.stderr)
return False
logger.info("Importing %s into zxdb (this may take several minutes)...", sql_path)
start = time.time()
try:
with open(sql_path, "rb") as f:
result = subprocess.run(
["docker", "exec", "-i", container,
"mysql", "-u", "root", f"-p{config.DB_PASSWORD}", "zxdb"],
stdin=f,
check=True,
)
elapsed = time.time() - start
logger.info("Import completed in %.1f seconds.", elapsed)
return True
except subprocess.CalledProcessError as e:
logger.error("Import failed: %s", e)
return False