Files
zxdb/zxdb.py
T

644 lines
26 KiB
Python

## Script para descargar ficheros de spectrum a partir de zxdb
## Imports utilizados en el script
import config
import logging
import mysql.connector
import os
import random
import requests
import shutil
import sqlite3
import time
import zipfile
from mysql.connector import errorcode
from unidecode import unidecode
from urllib.parse import urlparse
from urllib.request import urlretrieve
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
# Configuración del logger
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
# Configuración de base de datos
CONFIG_DB = {
"user": config.DB_USER,
"password": config.DB_PASSWORD,
"host": config.DB_HOST,
"port": config.DB_PORT,
"database": config.DB_NAME,
"raise_on_warnings": True,
}
# Direcciones de internet de donde descargar los datos
URL_PREFIX = {
"spectrum_computing": r"https://spectrumcomputing.co.uk",
"wos": r"https://php.sustancia.synology.me/wos",
"nvg": r"https://php.sustancia.synology.me/nvg",
}
# Rutas locales donde depositar los resultados
DESTINATION_PATH = config.DESTINATION_PATH
CACHE_PATH = config.CACHE_PATH
TEMP_FILE = config.TEMP_FILE
# Parámetros de configuración
SHOULD_CLEAR_DESTINATION_PATH = config.SHOULD_CLEAR_DESTINATION_PATH # Establece si se limpia primero la carpeta de destino
SHOULD_SPLIT_MODERN_AND_CLASSIC = config.SHOULD_SPLIT_MODERN_AND_CLASSIC # Separa los juegos en dos carpetas a partir de un año especificado
SHOULD_SORT_BY_YEAR = config.SHOULD_SORT_BY_YEAR # Separa los juegos por carpetas en función de su año de lanzamiento
SHOULD_SORT_BY_LETTER = config.SHOULD_SORT_BY_LETTER # Separa los juegos por carpetas en función de su primera letra
SHOULD_SORT_BY_DEVELOPER = config.SHOULD_SORT_BY_DEVELOPER # Separa los juegos por desarrollador
WAIT = config.WAIT # Establece una pausa aleatoria entre descargas
# Leer variables numéricas
MIN_WAIT = config.MIN_WAIT # Cantidad de segundos mínima a esperar entre descargas
MAX_WAIT = config.MAX_WAIT # Cantidad de segundos máxima a esperar entre descargas
LAST_CLASSIC_YEAR = config.LAST_CLASSIC_YEAR # Año usado para la separación entre juegos clásicos y modernos
# Tipos de fichero que se guardan en la carpeta raíz del juego
FILETYPES_ON_ROOT = [
"Tape image",
"Disk image",
"Snapshot image",
"POK pokes file",
]
# Resto de variables globales
ELEMENTS = []
# Carga un fichero con consultas SQL
def load_queries(file_path):
with open(file_path, 'r') as file:
queries = file.read().split(';')
return [query.strip() for query in queries if query.strip()]
# Carga las consultas desde el archivo
QUERIES = load_queries('queries.sql')
def select(cursor, query_index=0):
"""
Ejecuta la consulta seleccionada y procesa los resultados.
Parámetros:
cursor (MySQLCursor): El cursor de la base de datos para ejecutar la consulta.
query_index (int): El índice de la consulta a ejecutar en la lista de consultas QUERIES (predeterminado es 0).
Comportamiento:
- Ejecuta la consulta indicada por `query_index` usando el cursor proporcionado.
- Procesa los resultados de la consulta y los guarda en la lista ELEMENTS.
- Cada fila de resultados se convierte en un diccionario con las claves 'title', 'developer', 'release_year', 'url', y 'filetype'.
- Registra un mensaje informativo indicando que la consulta se ejecutó correctamente y el número de resultados obtenidos.
Ejemplo:
>>> cursor = connection.cursor()
>>> select(cursor, query_index=1)
"""
# Ejecutar la consulta seleccionada
cursor.execute(QUERIES[query_index])
# Procesar los resultados
for row in cursor:
element = {
"title": unidecode(row[0]),
"developer": unidecode(row[1]),
"release_year": row[2],
"url": row[3],
"filetype": row[4],
}
ELEMENTS.append(element)
# Registro de consulta ejecutada
logging.info(f"Consulta {query_index} ejecutada correctamente con {len(ELEMENTS)} resultados.")
def connect(query_index=0):
"""
Establece la conexión a la base de datos y ejecuta la consulta.
Parámetros:
query_index (int): Índice de la consulta a ejecutar (predeterminado es 0).
Excepciones:
mysql.connector.Error: Captura y maneja errores específicos de MySQL.
Exception: Captura y maneja cualquier otro error inesperado.
Comportamiento:
- Conecta a la base de datos usando los parámetros de configuración en CONFIG_DB.
- Ejecuta la consulta definida por la función select utilizando el cursor.
- Maneja errores de acceso denegado y base de datos no encontrada, así como cualquier otro error inesperado.
Ejemplo:
>>> connect(1)
"""
try:
with mysql.connector.connect(**CONFIG_DB) as connection:
with connection.cursor() as cursor:
# Ejecutar la consulta 1
select(cursor, query_index=query_index)
except mysql.connector.Error as err:
if err.errno == errorcode.ER_ACCESS_DENIED_ERROR:
logging.error("Algo está mal con tu nombre de usuario o contraseña.")
elif err.errno == errorcode.ER_BAD_DB_ERROR:
logging.error("La base de datos no existe.")
else:
logging.error(err)
except Exception as e:
logging.error(f"Error inesperado: {e}")
def update_url(element, url_prefix):
"""
Añade un prefijo a la URL en el diccionario `element` según el prefijo proporcionado en `url_prefix`.
Parámetros:
element (dict): Diccionario que contiene la clave 'url'.
url_prefix (dict): Diccionario que contiene los prefijos de las URLs para 'spectrum_computing', 'wos' y 'nvg'.
Modifica:
element (dict): Actualiza el valor de 'url' en el diccionario `element` con el prefijo correspondiente.
Ejemplo:
>>> element = {"url": "/zxdb/example"}
>>> url_prefix = {"spectrum_computing": "https://spectrumcomputing.co.uk", "wos": "https://php.sustancia.synology.me/wos", "nvg": "https://php.sustancia.synology.me/nvg"}
>>> update_url(element, url_prefix)
>>> print(element["url"])
https://spectrumcomputing.co.uk/zxdb/example
"""
url = element["url"]
if url.startswith("/zxdb"):
element["url"] = url_prefix["spectrum_computing"] + url
elif url.startswith("/pub"):
element["url"] = url_prefix["wos"] + url[4:]
elif url.startswith("/nvg"):
element["url"] = url_prefix["nvg"] + url[4:]
def process_elements():
"""
Procesa todos los elementos, modificando cada uno de sus parámetros.
Comportamiento:
- Construye el nombre de la carpeta raíz basado en el título y año de lanzamiento, o título, año de lanzamiento y desarrollador.
- Normaliza el nombre de la carpeta raíz.
- Añade el prefijo a la URL y normaliza los enlaces de "wos".
- Obtiene el nombre del fichero a partir de la URL de descarga.
- Establece la subcarpeta dentro de la raíz basada en el tipo de archivo.
- Verifica si el fichero está en formato .zip.
- Calcula el nombre del fichero si es un zip.
Modifica:
- ELEMENTS (list): Actualiza cada diccionario en ELEMENTS con las claves 'game_folder_name', 'url', 'file_name', 'subfolder', 'is_zip', y 'non_zip_file_name'.
Ejemplo:
>>> process_elements()
"""
global ELEMENTS
for i in range(len(ELEMENTS)):
# Construye el nombre de la carpeta raiz
ELEMENTS[i]["game_folder_name"] = f"{ELEMENTS[i]['title']} ({ELEMENTS[i]['release_year']})({ELEMENTS[i]['developer']})"
ELEMENTS[i]["game_folder_name"] = normalize_path(ELEMENTS[i]["game_folder_name"])
# Añade el prefijo a la url y normaliza los enlaces de "wos"
update_url(ELEMENTS[i], URL_PREFIX)
# Obtiene el nombre del fichero a partir de la url de descarga
ELEMENTS[i]["file_name"] = url_filename(ELEMENTS[i]["url"])
# Establece la subcarpeta dentro de la raiz
ELEMENTS[i]["subfolder"] = normalize_path(ELEMENTS[i]["filetype"]) if ELEMENTS[i]["filetype"] not in FILETYPES_ON_ROOT else ""
# Averigua si el fichero está en formato .zip
ELEMENTS[i]["is_zip"] = ELEMENTS[i]["file_name"].lower().endswith(".zip")
# Calcula el nombre del fichero si es un zip
ELEMENTS[i]["non_zip_file_name"] = ELEMENTS[i]["file_name"][:-4] if ELEMENTS[i]["is_zip"] else ELEMENTS[i]["file_name"]
def url_filename(url):
"""
Devuelve el fichero que forma la parte final de una URL.
Parámetros:
url (str): La URL de la cual se quiere extraer el nombre del fichero.
Retorna:
str: El nombre del fichero que forma la parte final de la URL.
Ejemplo:
>>> url = "https://example.com/path/to/file.txt"
>>> url_filename(url)
'file.txt'
"""
parsed_url = urlparse(url)
path = parsed_url.path
filename = os.path.basename(path)
return filename
def download_file(url, destination):
"""
Descarga un fichero a partir de una URL.
Parámetros:
url (str): La URL del fichero que se desea descargar.
destination (str): La ruta de destino donde se guardará el fichero descargado.
Retorna:
bool: True si la descarga fue exitosa, False en caso de error.
Comportamiento:
- Crea una sesión de requests con un adaptador que permite reintentos en caso de fallos.
- Intenta descargar el fichero desde la URL especificada.
- Guarda el contenido del fichero en la ruta de destino especificada.
- Maneja errores de conexión y otros problemas de la red, registrando cualquier error ocurrido.
Ejemplo:
>>> success = download_file("https://example.com/file.zip", "/path/to/destination/file.zip")
>>> if success:
>>> print("Descarga completada con éxito.")
>>> else:
>>> print("Error en la descarga.")
"""
session = requests.Session()
retries = Retry(total=3, backoff_factor=1, status_forcelist=[500, 502, 503, 504])
session.mount('https://', HTTPAdapter(max_retries=retries))
try:
response = session.get(url, timeout=10)
response.raise_for_status()
with open(destination, 'wb') as file:
file.write(response.content)
return True
except requests.exceptions.RequestException as e:
logging.error(f"Error al descargar el archivo: {e}")
return False
def unzip_file(src, dst):
"""
Descomprime los ficheros que coinciden con la lista de extensiones.
Parámetros:
src (str): Ruta del archivo ZIP que se desea descomprimir.
dst (str): Ruta del directorio donde se extraerán los ficheros.
Extensiones soportadas:
.z80, .sna, .tzx, .tap, .dsk, .trd
Comportamiento:
- Abre el archivo ZIP especificado por `src`.
- Extrae los ficheros que coinciden con las extensiones definidas en el directorio especificado por `dst`.
- Maneja errores de archivo ZIP corrupto, archivo no encontrado y otros errores generales, registrándolos adecuadamente.
Excepciones:
zipfile.BadZipFile: El archivo ZIP está corrupto.
FileNotFoundError: El archivo ZIP no se encontró.
Exception: Cualquier otro error inesperado.
Ejemplo:
>>> unzip_file('/path/to/archivo.zip', '/path/to/destination/')
"""
archive = src
directory = dst
extensions = (".z80", ".sna", ".tzx", ".tap", ".dsk", ".trd")
try:
with zipfile.ZipFile(archive, "r") as zip_file:
for file in zip_file.namelist():
if file.lower().endswith(extensions):
zip_file.extract(file, directory)
# logging.info(f"Archivo {file} extraído a {directory}")
except zipfile.BadZipFile:
logging.error("El archivo ZIP está corrupto.")
except FileNotFoundError:
logging.error("El archivo ZIP no se encontró.")
except Exception as e:
logging.error(f"Ocurrió un error: {e}")
def print_status(current_file, total_files, element, total_files_width, status="cached"):
"""
Imprime el estado de un archivo en el proceso de descarga.
Parámetros:
current_file (int): El número del archivo actual en el proceso de descarga.
total_files (int): El número total de archivos en el proceso de descarga.
element (dict): Diccionario que contiene información del archivo actual, incluyendo 'file_name' y 'filetype'.
total_files_width (int): El ancho del campo para el número total de archivos, para un formato de impresión alineado.
status (str): El estado del archivo (por defecto es "cached").
Comportamiento:
- Imprime el estado del archivo actual en el proceso de descarga de una manera formateada y alineada.
Ejemplo:
>>> element = {"file_name": "example.zip", "filetype": "zip"}
>>> print_status(1, 100, element, 3)
( 1 / 100) : cached : example.zip (zip)
"""
print(
"({:{width}} / {}) : {:<10} : {} ({})".format(
current_file,
total_files,
status,
element["file_name"],
element["filetype"],
width=total_files_width,
)
)
def get_final_destination_folder(title, year, developer):
"""
Compone la carpeta de destino en función de varios parámetros.
Parámetros:
developer (str): Nombre del desarrollador del juego.
year (int o str): Año de lanzamiento del juego.
game_folder_name (str): Nombre de la carpeta raíz.
Retorna:
str: La ruta completa de la carpeta de destino final.
Comportamiento:
- Determina la carpeta base (`folder1`) en función de si el juego es clásico o moderno, el desarrollador o el año.
- Determina la subcarpeta (`folder2`) basada en el desarrollador, si está habilitado.
- Determina la subcarpeta (`folder3`) basada en el año de lanzamiento, si está habilitado.
- Determina la subcarpeta (`folder4`) basada en la primera letra del nombre de la carpeta raíz, si está habilitado.
- Combina las carpetas calculadas y la carpeta raíz para obtener la ruta final de la carpeta de destino.
Ejemplo:
>>> get_final_destination_folder("Nintendo", 1985, "Super Mario")
'by_developer/N/Nintendo/1985/Super Mario'
"""
# Carpeta basada en los años de vida comercial del spectrum o el tipo de agrupación
folder1 = ""
if SHOULD_SPLIT_MODERN_AND_CLASSIC:
if year == "none" or year is None or year > LAST_CLASSIC_YEAR:
folder1 = "modern"
else:
folder1 = "classics"
elif SHOULD_SORT_BY_DEVELOPER:
folder1 = "by_developer"
elif SHOULD_SORT_BY_YEAR:
folder1 = "by_year"
# Carpeta basada en el desarrollador
folder2 = ""
if SHOULD_SORT_BY_DEVELOPER:
developer = developer or ""
folder2 = os.path.join(developer[0].upper(), developer)
# Carpeta basada en el año de lanzamiento
folder3 = ""
if SHOULD_SORT_BY_YEAR:
folder3 = str(year) if year not in [None, "none"] else "unknown"
# Carpeta basada en la primera letra del nombre de la carpeta raíz
folder4 = ""
if SHOULD_SORT_BY_LETTER:
if game_folder_name[0].isdigit():
folder4 = "0-9"
else:
folder4 = game_folder_name[0].upper()
# Compone el nombre de la carpeta de destino del juego
game_folder_name = f"{title} ({year})" if SHOULD_SORT_BY_DEVELOPER else f"{title} ({year})({developer})"
# Combina los prefijos y la carpeta raíz para obtener la carpeta de destino final
return os.path.join(folder1, folder2, folder3, folder4, game_folder_name)
def process_cache_file(cache_file, destination_subfolder, destination_file, element):
"""
Crea las carpetas de destino y copia o extrae el archivo de la caché.
Parámetros:
cache_file (str): Ruta del archivo en la caché.
destination_subfolder (str): Ruta del subdirectorio de destino donde se guardarán los archivos extraídos.
destination_file (str): Ruta del archivo de destino si no se extrae.
element (dict): Diccionario que contiene información del archivo, incluyendo si tiene subcarpeta específica.
Comportamiento:
- Crea las carpetas de destino necesarias.
- Si el archivo en caché es un zip y no tiene subcarpeta especificada, descomprime el archivo en el subdirectorio de destino.
- Si no, copia el archivo en caché al archivo de destino especificado.
Ejemplo:
>>> process_cache_file('/path/to/cache.zip', '/path/to/destination/subfolder', '/path/to/destination/file', element)
"""
os.makedirs(destination_subfolder, exist_ok=True)
if cache_file.endswith(".zip") and element["subfolder"] == "":
unzip_file(cache_file, destination_subfolder)
else:
shutil.copyfile(cache_file, destination_file)
def get_files():
"""
Obtiene los ficheros de la consulta desde internet o desde la caché y los deposita en la carpeta destino,
descomprimiendo los archivos necesarios.
Comportamiento:
- Presenta en pantalla el progreso de la descarga.
- Para cada elemento en ELEMENTS:
- Calcula la carpeta de clasificación basada en el desarrollador, año de lanzamiento y carpeta raíz.
- Determina las rutas de la carpeta de destino y de caché.
- Si el fichero no existe en la carpeta de destino:
- Si el fichero existe en la caché, lo copia al destino.
- Si no existe en la caché, lo descarga y lo guarda en la caché y en el destino.
- Si el fichero ya existe en el destino, lo omite.
- Maneja errores durante el procesamiento, registrándolos adecuadamente.
Ejemplo:
>>> get_files()
"""
# Variables para la presentación en pantalla de la descarga
current_file = 0
total_files = len(ELEMENTS)
total_files_width = len(str(total_files))
last_game_folder = ""
for element in ELEMENTS:
classification_folder = get_final_destination_folder(element["title"], element["release_year"], element["developer"])
destination_folder = os.path.join(DESTINATION_PATH, classification_folder)
destination_subfolder = os.path.join(destination_folder, element["subfolder"])
cache_folder = os.path.join(CACHE_PATH, element["game_folder_name"])
cache_subfolder = os.path.join(cache_folder, element["subfolder"])
# Ruta completa hasta el fichero de destino y de caché
destination_file = os.path.join(destination_subfolder, element["file_name"])
cache_file = os.path.join(cache_subfolder, element["file_name"])
# Actualiza las variables de presentación
current_file += 1
if element["game_folder_name"] != last_game_folder:
print("\n{}".format(element["game_folder_name"]))
last_game_folder = element["game_folder_name"]
try:
# Si el fichero no existe en la carpeta de destino
if not os.path.isfile(destination_file) and not os.path.isfile(os.path.join(destination_subfolder, element["non_zip_file_name"])):
# Si existe en la caché, lo copia
if os.path.isfile(cache_file):
process_cache_file(cache_file, destination_subfolder, destination_file, element)
print_status(current_file, total_files, element, total_files_width, status="cached")
# Si no existe en la caché, lo descarga
else:
if download_file(element["url"], TEMP_FILE):
print_status(current_file, total_files, element, total_files_width, status="downloaded")
if os.path.isfile(TEMP_FILE):
# Mueve el fichero temporal descargado a la cache
os.makedirs(cache_subfolder, exist_ok=True)
shutil.move(TEMP_FILE, cache_file)
# Copia el fichero de la cache al destino
if os.path.isfile(cache_file):
process_cache_file(cache_file, destination_subfolder, destination_file, element)
else:
print_status(current_file, total_files, element, total_files_width, status="not found")
if WAIT:
time.sleep(random.randint(MIN_WAIT, MAX_WAIT))
# Si el fichero ya existe en el destino, no hace nada
else:
print_status(current_file, total_files, element, total_files_width, status="skipping")
except Exception as e:
logging.error(f"Error al procesar el fichero {element['file_name']}: {e}")
def normalize_path(path):
"""
Elimina los caracteres ilegales de la cadena de texto.
Parámetros:
path (str): La cadena de texto que se desea normalizar.
Retorna:
str: La cadena de texto normalizada sin los caracteres ilegales.
Comportamiento:
- Reemplaza los caracteres ilegales en la cadena de texto con un carácter vacío.
- Los caracteres ilegales incluyen: <, >, :, ", /, \, |, ?, *.
- Utiliza la función unidecode para manejar caracteres Unicode.
Ejemplo:
>>> normalize_path("file:name/with|illegal*chars?")
'filenamewithillegalchars'
"""
illegal_chars = ["<", ">", ":", '"', "/", "\\", "|", "?", "*"]
replace_with = ""
for char in illegal_chars:
path = unidecode(path.replace(char, replace_with))
return path
def remove_empty_directories(path):
"""
Elimina los subdirectorios vacíos.
Parámetros:
path (str): La ruta del directorio raíz desde donde se iniciará la eliminación de subdirectorios vacíos.
Comportamiento:
- Recorre la estructura de directorios desde el `path` especificado, de forma descendente.
- Intenta eliminar cada subdirectorio encontrado.
- Registra un mensaje informativo si un subdirectorio vacío es eliminado.
- Si no se puede eliminar un directorio porque no está vacío u ocurre otro error, el error es ignorado.
Ejemplo:
>>> remove_empty_directories('/path/to/directory')
"""
for root, dirs, files in os.walk(path, topdown=False):
for dir in dirs:
dir_path = os.path.join(root, dir)
try:
os.rmdir(dir_path)
logging.info(f"Directorio vacío eliminado: {dir_path}")
except OSError as e:
# El directorio no está vacío o ocurrió otro error
pass
def clear_destination_folder():
"""
Limpia la carpeta de destino.
Comportamiento:
- Verifica si la opción de limpiar la carpeta de destino (`SHOULD_CLEAR_DESTINATION_PATH`) está habilitada.
- Si está habilitada, elimina todos los archivos y subdirectorios en la carpeta de destino (`DESTINATION_PATH`).
- Registra un mensaje informativo para cada archivo o directorio eliminado.
- Maneja y registra cualquier error que ocurra durante la eliminación.
Ejemplo:
>>> clear_destination_folder()
"""
if SHOULD_CLEAR_DESTINATION_PATH:
logging.info("Limpiando la carpeta de destino ...")
for filename in os.listdir(DESTINATION_PATH):
file_path = os.path.join(DESTINATION_PATH, filename)
try:
if os.path.isfile(file_path) or os.path.islink(file_path):
os.unlink(file_path)
logging.info(f"Archivo eliminado: {file_path}")
elif os.path.isdir(file_path):
shutil.rmtree(file_path)
logging.info(f"Directorio eliminado: {file_path}")
except Exception as e:
logging.error(f'No se pudo eliminar {file_path}. Razón: {e}')
def print_elements(mode=0):
"""
Imprime la lista de elementos en diferentes modos.
Parámetros:
mode (int): El modo de impresión.
- Si es 0, imprime todos los elementos con sus claves y valores.
- Si es 1, imprime los nombres de las carpetas raíz eliminando duplicados y muestra el número de entradas únicas.
Comportamiento:
- Modo 0: Recorre todos los elementos en ELEMENTS e imprime cada clave y valor en un formato legible.
- Modo 1: Elimina duplicados basándose en la carpeta raíz ('game_folder_name') y los imprime. Luego, muestra el número total de entradas únicas.
Ejemplo:
>>> print_elements(0)
>>> print_elements(1)
"""
if mode == 0:
# Primer bucle for
for element in ELEMENTS:
print('')
for key, value in element.items():
print(key, ':', value)
elif mode == 1:
# Segundo bucle for con eliminación de duplicados
seen = set()
for element in ELEMENTS:
game_folder_name = element['game_folder_name']
if game_folder_name not in seen:
print(game_folder_name)
seen.add(game_folder_name)
# Imprimir el número de elementos únicos
print(f"Número de entradas: {len(seen)}")
def main():
"""
Bucle principal que ejecuta las principales funciones del programa.
Comportamiento:
- Establece la conexión a la base de datos y ejecuta la consulta con `query_index=3`.
- Procesa todos los elementos, modificando sus parámetros.
- Imprime la lista de elementos en el modo 1, que elimina duplicados y muestra el número total de entradas únicas.
- Limpia la carpeta de destino si la opción está habilitada.
- Obtiene los archivos desde internet o desde la caché, y los deposita en la carpeta de destino, descomprimiendo los necesarios.
- Elimina los subdirectorios vacíos en la carpeta de destino.
Ejemplo:
>>> main()
"""
connect(query_index=3)
process_elements()
print_elements(mode=1)
clear_destination_folder()
get_files()
remove_empty_directories(DESTINATION_PATH)
if __name__ == "__main__":
main()