Files
zxdb/zxdb.py
T
2024-11-19 17:17:09 +01:00

381 lines
15 KiB
Python

## Script para descargar ficheros de spectrum a partir de zxdb
## Imports utilizados en el script
import config
import logging
import mysql.connector
import os
import random
import requests
import shutil
import sqlite3
import time
import zipfile
from mysql.connector import errorcode
from unidecode import unidecode
from urllib.parse import urlparse
from urllib.request import urlretrieve
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
# Configuración del logger
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
# Configuración de base de datos
config_db = {
"user": config.DB_USER,
"password": config.DB_PASSWORD,
"host": config.DB_HOST,
"port": config.DB_PORT,
"database": config.DB_NAME,
"raise_on_warnings": True,
}
# Direcciones de internet de donde descargar los datos
url_prefix = {
"spectrum_computing": r"https://spectrumcomputing.co.uk",
"wos": r"https://php.sustancia.synology.me/wos",
"nvg": r"https://php.sustancia.synology.me/nvg",
}
# Rutas locales donde depositar los resultados
destination_path = config.DESTINATION_PATH
cache_path = config.CACHE_PATH
temp_file = config.TEMP_FILE
# Parámetros de configuración
should_clear_destination_path = config.SHOULD_CLEAR_DESTINATION_PATH # Establece si se limpia primero la carpeta de destino
should_split_modern_and_classic = config.SHOULD_SPLIT_MODERN_AND_CLASSIC # Separa los juegos en dos carpetas a partir de un año especificado
should_sort_by_year = config.SHOULD_SORT_BY_YEAR # Separa los juegos por carpetas en función de su año de lanzamiento
should_sort_by_letter = config.SHOULD_SORT_BY_LETTER # Separa los juegos por carpetas en función de su primera letra
should_sort_by_developer = config.SHOULD_SORT_BY_DEVELOPER # Separa los juegos por desarrollador
wait = config.WAIT # Establece una pausa aleatoria entre descargas
# Leer variables numéricas
min_wait = config.MIN_WAIT # Cantidad de segundos mínima a esperar entre descargas
max_wait = config.MAX_WAIT # Cantidad de segundos máxima a esperar entre descargas
last_classic_year = config.LAST_CLASSIC_YEAR # Año usado para la separación entre juegos clásicos y modernos
# Tipos de fichero que se guardan en la carpeta raíz del juego
filetypes_on_root = [
"Tape image",
"Disk image",
"Snapshot image",
"POK pokes file",
]
# Resto de variables globales
elements = []
# Carga un fichero con consultas SQL
def load_queries(file_path):
with open(file_path, 'r') as file:
queries = file.read().split(';')
return [query.strip() for query in queries if query.strip()]
# Carga las consultas desde el archivo
queries = load_queries('queries.sql')
# Listado con las consultas
def select(cursor, query_index=0):
# Ejecutar la consulta seleccionada
cursor.execute(queries[query_index])
# Procesar los resultados
for row in cursor:
element = {
"title": unidecode(row[0]),
"developer": unidecode(row[1]),
"release_year": row[2],
"url": row[3],
"filetype": row[4],
}
elements.append(element)
# Registro de consulta ejecutada
logging.info(f"Consulta {query_index} ejecutada correctamente con {len(elements)} resultados.")
# Establece la conexión a la BBDD y ejecuta la consulta
def connect(query_index=0):
try:
with mysql.connector.connect(**config_db) as connection:
with connection.cursor() as cursor:
# Ejecutar la consulta 1
select(cursor, query_index=query_index)
except mysql.connector.Error as err:
if err.errno == errorcode.ER_ACCESS_DENIED_ERROR:
logging.error("Algo está mal con tu nombre de usuario o contraseña.")
elif err.errno == errorcode.ER_BAD_DB_ERROR:
logging.error("La base de datos no existe.")
else:
logging.error(err)
except Exception as e:
logging.error(f"Error inesperado: {e}")
# Añade un prefijo a la url
def update_url(element, url_prefix):
url = element["url"]
if url.startswith("/zxdb"):
element["url"] = url_prefix["spectrum_computing"] + url
elif url.startswith("/pub"):
element["url"] = url_prefix["wos"] + url[4:]
elif url.startswith("/nvg"):
element["url"] = url_prefix["nvg"] + url[4:]
# Procesa todos lo elementos, modificando cada uno de sus parametros
def process_elements():
global elements
for i in range(len(elements)):
# Construye el nombre de la carpeta raiz
if should_sort_by_developer:
elements[i]["root_folder"] = f"{elements[i]['title']} ({elements[i]['release_year']})"
else:
elements[i]["root_folder"] = f"{elements[i]['title']} ({elements[i]['release_year']})({elements[i]['developer']})"
elements[i]["root_folder"] = normalize_path(elements[i]["root_folder"])
# Añade el prefijo a la url y normaliza los enlaces de "wos"
update_url(elements[i], url_prefix)
# Obtiene el nombre del fichero a partir de la url de descarga
elements[i]["file_name"] = url_filename(elements[i]["url"])
# Establece la subcarpeta dentro de la raiz
elements[i]["subfolder"] = normalize_path(elements[i]["filetype"]) if elements[i]["filetype"] not in filetypes_on_root else ""
# Averigua si el fichero está en formato .zip
elements[i]["is_zip"] = elements[i]["file_name"].lower().endswith(".zip")
# Calcula el nombre del fichero si es un zip
elements[i]["non_zip_file_name"] = elements[i]["file_name"][:-4] if elements[i]["is_zip"] else elements[i]["file_name"]
# Devuelve el fichero que forma la parte final de una URL
def url_filename(url):
parsed_url = urlparse(url)
path = parsed_url.path
filename = os.path.basename(path)
return filename
# Descarga un fichero a partir de una URL
def download_file(url, destination):
session = requests.Session()
retries = Retry(total=3, backoff_factor=1, status_forcelist=[500, 502, 503, 504])
session.mount('https://', HTTPAdapter(max_retries=retries))
try:
response = session.get(url, timeout=10)
response.raise_for_status()
with open(destination, 'wb') as file:
file.write(response.content)
return True
except requests.exceptions.RequestException as e:
logging.error(f"Error al descargar el archivo: {e}")
return False
# Descomprime los ficheros que coinciden con la lista de extensiones
def unzip_file(src, dst):
archive = src
directory = dst
extensions = (".z80", ".sna", ".tzx", ".tap", ".dsk", ".trd")
try:
with zipfile.ZipFile(archive, "r") as zip_file:
for file in zip_file.namelist():
if file.lower().endswith(extensions):
zip_file.extract(file, directory)
# logging.info(f"Archivo {file} extraído a {directory}")
except zipfile.BadZipFile:
logging.error("El archivo ZIP está corrupto.")
except FileNotFoundError:
logging.error("El archivo ZIP no se encontró.")
except Exception as e:
logging.error(f"Ocurrió un error: {e}")
# Imprime el estado de un archivo en el proceso de descarga
def print_status(current_file, total_files, element, total_files_width, status="cached"):
print(
"({:{width}} / {}) : {:<10} : {} ({})".format(
current_file,
total_files,
status,
element["file_name"],
element["filetype"],
width=total_files_width,
)
)
# Compone la carpeta de destino en función de varios parámetros
def get_final_destination_folder(developer, year, root_folder):
# Carpeta basada en los años de vida comercial del spectrum
folder1 = ""
if should_split_modern_and_classic:
if year == "none" or year is None or year > last_classic_year:
folder1 = "modern"
else:
folder1 = "classics"
elif should_sort_by_developer:
folder1 = "by_developer"
elif should_sort_by_year:
folder1 = "by_year"
# Carpeta basada en el desarrollador
folder2 = ""
if should_sort_by_developer:
developer = developer or ""
folder2 = os.path.join(developer[0].upper(), developer)
# Carpeta basada en el año de lanzamiento
folder3 = ""
if should_sort_by_year:
folder3 = str(year) if year not in [None, "none"] else "unknown"
# Carpeta basada en la primera letra del nombre de la carpeta raíz
folder4 = ""
if should_sort_by_letter:
if root_folder[0].isdigit():
folder4 = "0-9"
else:
folder4 = root_folder[0].upper()
# Asegurarse de que root_folder es una cadena
root_folder = root_folder or ""
# Combina los prefijos y la carpeta raíz para obtener la carpeta de destino final
return os.path.join(folder1, folder2, folder3, folder4, root_folder)
# Crea las carpetas de destino y copia o extrae el archivo de la caché
def process_cache_file(cache_file, destination_subfolder, destination_file, element):
os.makedirs(destination_subfolder, exist_ok=True)
if cache_file.endswith(".zip") and element["subfolder"] == "":
unzip_file(cache_file, destination_subfolder)
else:
shutil.copyfile(cache_file, destination_file)
# Obtiene los ficheros de la consulta desde internet o desde la caché
# y los deposita en la carpeta destino, descomprimiendo los archivos necesarios
def get_files():
# Variables para la presentación en pantalla de la descarga
current_file = 0
total_files = len(elements)
total_files_width = len(str(total_files))
last_game_folder = ""
for element in elements:
classification_folder = get_final_destination_folder(element["developer"], element["release_year"], element["root_folder"])
destination_folder = os.path.join(destination_path, classification_folder)
destination_subfolder = os.path.join(destination_folder, element["subfolder"])
cache_folder = os.path.join(cache_path, element["root_folder"])
cache_subfolder = os.path.join(cache_folder, element["subfolder"])
# Ruta completa hasta el fichero de destino y de caché
destination_file = os.path.join(destination_subfolder, element["file_name"])
cache_file = os.path.join(cache_subfolder, element["file_name"])
# Actualiza las variables de presentación
current_file += 1
if element["root_folder"] != last_game_folder:
print("\n{}".format(element["root_folder"]))
last_game_folder = element["root_folder"]
try:
# Si el fichero no existe en la carpeta de destino
if not os.path.isfile(destination_file) and not os.path.isfile(os.path.join(destination_subfolder, element["non_zip_file_name"])):
# Si existe en la caché, lo copia
if os.path.isfile(cache_file):
process_cache_file(cache_file, destination_subfolder, destination_file, element)
print_status(current_file, total_files, element, total_files_width, status="cached")
# Si no existe en la caché, lo descarga
else:
if download_file(element["url"], temp_file):
print_status(current_file, total_files, element, total_files_width, status="downloaded")
if os.path.isfile(temp_file):
# Mueve el fichero temporal descargado a la cache
os.makedirs(cache_subfolder, exist_ok=True)
shutil.move(temp_file, cache_file)
# Copia el fichero de la cache al destino
if os.path.isfile(cache_file):
process_cache_file(cache_file, destination_subfolder, destination_file, element)
else:
print_status(current_file, total_files, element, total_files_width, status="not found")
if wait:
time.sleep(random.randint(min_wait, max_wait))
# Si el fichero ya existe en el destino, no hace nada
else:
print_status(current_file, total_files, element, total_files_width, status="skipping")
except Exception as e:
logging.error(f"Error al procesar el fichero {element['file_name']}: {e}")
# Elimina los caracteres ilegales de la cadena de texto
def normalize_path(path):
illegal_chars = ["<", ">", ":", '"', "/", "\\", "|", "?", "*"]
replace_with = ""
for char in illegal_chars:
path = unidecode(path.replace(char, replace_with))
return path
# Elimina los subdirectorios vacios
def remove_empty_directories(path):
for root, dirs, files in os.walk(path, topdown=False):
for dir in dirs:
dir_path = os.path.join(root, dir)
try:
os.rmdir(dir_path)
logging.info(f"Directorio vacío eliminado: {dir_path}")
except OSError as e:
# El directorio no está vacío o ocurrió otro error
pass
# Limpia la carpeta de destino
def clear_destination_folder():
if should_clear_destination_path:
logging.info("Limpiando la carpeta de destino ...")
for filename in os.listdir(destination_path):
file_path = os.path.join(destination_path, filename)
try:
if os.path.isfile(file_path) or os.path.islink(file_path):
os.unlink(file_path)
logging.info(f"Archivo eliminado: {file_path}")
elif os.path.isdir(file_path):
shutil.rmtree(file_path)
logging.info(f"Directorio eliminado: {file_path}")
except Exception as e:
logging.error(f'No se pudo eliminar {file_path}. Razón: {e}')
# Imprime la lista de elementos
def print_elements(mode=0):
if mode == 0:
# Primer bucle for
for element in elements:
print('')
for key, value in element.items():
print(key, ':', value)
elif mode == 1:
# Segundo bucle for con eliminación de duplicados
seen = set()
for element in elements:
root_folder = element['root_folder']
if root_folder not in seen:
print(root_folder)
seen.add(root_folder)
# Imprimir el número de elementos únicos
print(f"Número de entradas: {len(seen)}")
# Bucle principal
def main():
connect(query_index=3)
process_elements()
print_elements(mode=1)
clear_destination_folder()
get_files()
remove_empty_directories(destination_path)
if __name__ == "__main__":
main()