## Script para descargar ficheros de spectrum a partir de zxdb ## Imports utilizados en el script import logging import mysql.connector import os import random import requests import shutil import sqlite3 import time import zipfile from dotenv import load_dotenv from mysql.connector import errorcode from unidecode import unidecode from urllib.parse import urlparse from urllib.request import urlretrieve from requests.adapters import HTTPAdapter from urllib3.util.retry import Retry # Cargar las variables de entorno desde el archivo .env load_dotenv() # Configuración del logger logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') config = { "user": os.getenv("DB_USER"), "password": os.getenv("DB_PASSWORD"), "host": os.getenv("DB_HOST"), "port": os.getenv("DB_PORT"), "database": os.getenv("DB_NAME"), "raise_on_warnings": True, } # Direcciones de internet de donde descargar los datos url_prefix = { "spectrum_computing": r"https://spectrumcomputing.co.uk", "wos": r"https://php.sustancia.synology.me/wos", "nvg": r"https://php.sustancia.synology.me/nvg", } # Rutas locales donde depositar los resultados destination_path = os.getenv('DESTINATION_PATH') cache_path = os.getenv('CACHE_PATH') temp_file = os.getenv('TEMP_FILE') # Parametros de configuración should_clear_destination_path = os.getenv('SHOULD_CLEAR_DESTINATION_PATH') == 'True' # Establece si se limpia primero la carpeta de destino should_split_modern_and_classic = os.getenv('SHOULD_SPLIT_MODERN_AND_CLASSIC') == 'True'# Separa los juegos en dos carpetas a partir de un año especificado should_sort_by_letter = os.getenv('SHOULD_SORT_BY_LETTER') == 'True' # Separa los juegos por carpetas en función de su primera letra wait = os.getenv('WAIT') == 'True' # Establece una pausa aleatoria entre descargas min_wait = int(os.getenv('MIN_WAIT')) # Cantidad de segundos mínima a esperar entre descargas max_wait = int(os.getenv('MAX_WAIT')) # Cantidad de segundos máxima a esperar entre descargas last_classic_year = int(os.getenv('LAST_CLASSIC_YEAR'))# Año usado para la separación entre juegos clásicos y modernos # Tipos de fichero que se guardan en la carpeta raíz del juego filetypes_on_root = [ "Tape image", "Disk image", "Snapshot image", "POK pokes file", ] # Resto de variables globales elements = [] # Carga un fichero con consultas SQL def load_queries(file_path): with open(file_path, 'r') as file: queries = file.read().split(';') return [query.strip() for query in queries if query.strip()] # Carga las consultas desde el archivo queries = load_queries('queries.sql') # Listado con las consultas def select(cursor, query_index=0): # Ejecutar la consulta seleccionada cursor.execute(queries[query_index]) # Procesar los resultados for row in cursor: element = { "title": row[0], "developer": row[1], "release_year": row[2], "url": row[3], "filetype": row[4], } elements.append(element) # Registro de consulta ejecutada logging.info(f"Consulta {query_index} ejecutada correctamente con {len(elements)} resultados.") # Establece la conexión a la BBDD y ejecuta la consulta def connect(query_index=0): try: with mysql.connector.connect(**config) as connection: with connection.cursor() as cursor: # Ejecutar la consulta 1 select(cursor, query_index=query_index) except mysql.connector.Error as err: if err.errno == errorcode.ER_ACCESS_DENIED_ERROR: logging.error("Algo está mal con tu nombre de usuario o contraseña.") elif err.errno == errorcode.ER_BAD_DB_ERROR: logging.error("La base de datos no existe.") else: logging.error(err) except Exception as e: logging.error(f"Error inesperado: {e}") # Procesa todos lo elementos, modificando cada uno de sus parametros def process_elements(): global elements for i in range(len(elements)): # Construye el nombre de la carpeta raiz elements[i]["root_folder"] = ( elements[i]["title"] + " (" + str(elements[i]["release_year"]) + ")(" + elements[i]["developer"] + ")" ) elements[i]["root_folder"] = normalize_path(elements[i]["root_folder"]) # Obtiene el nombre del fichero a partir de la url de descarga elements[i]["file_name"] = url_filename(elements[i]["url"]) # Establece la subcarpeta dentro de la raiz elements[i]["subfolder"] = "" if elements[i]["filetype"] not in filetypes_on_root: elements[i]["subfolder"] = normalize_path(elements[i]["filetype"]) # Averigua si el fichero está en formato .zip elements[i]["is_zip"] = elements[i]["file_name"].endswith(".zip") # Calcula el nombre del fichero si es un zip elements[i]["non_zip_file_name"] = elements[i]["file_name"] if elements[i]["is_zip"]: elements[i]["non_zip_file_name"] = elements[i]["file_name"][:-4] # Añade el prefijo a la url if elements[i]["url"].startswith("/zxdb"): elements[i]["url"] = url_prefix["spectrum_computing"] + str(elements[i]["url"]) elif elements[i]["url"].startswith("/pub"): elements[i]["url"] = url_prefix["wos"] + str(elements[i]["url"][4:]) elif elements[i]["url"].startswith("/nvg"): elements[i]["url"] = url_prefix["nvg"] + str(elements[i]["url"][4:]) # Devuelve el fichero que forma la parte final de una URL def url_filename(url): parsed_url = urlparse(url) path = parsed_url.path filename = os.path.basename(path) return filename # Descarga un fichero a partir de una URL def download_file(url, destination): session = requests.Session() retries = Retry(total=3, backoff_factor=1, status_forcelist=[500, 502, 503, 504]) session.mount('https://', HTTPAdapter(max_retries=retries)) try: response = session.get(url, timeout=10) response.raise_for_status() with open(destination, 'wb') as file: file.write(response.content) return True except requests.exceptions.RequestException as e: logging.error(f"Error al descargar el archivo: {e}") return False # Descomprime los ficheros que coinciden con la lista de extensiones def unzip_file(src, dst): archive = src directory = dst extensions = (".z80", ".sna", ".tzx", ".tap", ".dsk", ".trd") try: with zipfile.ZipFile(archive, "r") as zip_file: for file in zip_file.namelist(): if file.lower().endswith(extensions): zip_file.extract(file, directory) # logging.info(f"Archivo {file} extraído a {directory}") except zipfile.BadZipFile: logging.error("El archivo ZIP está corrupto.") except FileNotFoundError: logging.error("El archivo ZIP no se encontró.") except Exception as e: logging.error(f"Ocurrió un error: {e}") # Imprime el estado de un archivo en el proceso de descarga def print_status(current_file, total_files, element, total_files_width, status="cached"): print( "({:{width}} / {}) : {:<10} : {} ({})".format( current_file, total_files, status, element["file_name"], element["filetype"], width=total_files_width, ) ) # Compone la carpeta de destino en función de varios parámetros def get_final_destination_folder(year, root_folder): # Prefijo basado en el año prefix1 = "" if should_split_modern_and_classic: if year == "none" or year is None or year > last_classic_year: prefix1 = "modern" else: prefix1 = "classics" # Prefijo basado en la primera letra del nombre de la carpeta raíz prefix2 = "" if should_sort_by_letter: if root_folder[0].isdigit(): prefix2 = "0-9" else: prefix2 = root_folder[0].upper() # Combina los prefijos y la carpeta raíz para obtener la carpeta de destino final return os.path.join(prefix1, prefix2, root_folder) # Obtiene los ficheros de la consulta desde internet o desde la caché # y los deposita en la carpeta destino, descomprimiendo los archivos necesarios def get_files(): # Variables para la presentación en pantalla de la descarga current_file = 0 total_files = len(elements) total_files_width = len(str(total_files)) last_game_folder = "" for element in elements: classification_folder = get_final_destination_folder(element["release_year"], element["root_folder"]) destination_folder = os.path.join(destination_path, classification_folder) destination_subfolder = os.path.join(destination_folder, element["subfolder"]) cache_folder = os.path.join(cache_path, element["root_folder"]) cache_subfolder = os.path.join(cache_folder, element["subfolder"]) # Ruta completa hasta el fichero de destino y de caché destination_file = os.path.join(destination_subfolder, element["file_name"]) cache_file = os.path.join(cache_subfolder, element["file_name"]) # Actualiza las variables de presentación current_file += 1 if element["root_folder"] != last_game_folder: print("\n{}".format(element["root_folder"])) last_game_folder = element["root_folder"] try: # Comprueba si ya existe el fichero a descargar if not os.path.isfile(destination_file) and not os.path.isfile(os.path.join(destination_subfolder, element["non_zip_file_name"])): # Comprueba si ya existe el fichero en la cache if os.path.isfile(cache_file): # Si encuentra el fichero en cache, crea las carpetas de destino y lo copia o lo extrae os.makedirs(destination_subfolder, exist_ok=True) if cache_file.endswith(".zip") and element["subfolder"] == "": unzip_file(cache_file, destination_subfolder) else: shutil.copyfile(cache_file, destination_file) print_status(current_file, total_files, element, total_files_width, status="cached") # El fichero no está en la cache else: status = "not found " if download_file(element["url"], temp_file): status = "downloaded" if os.path.isfile(temp_file): # Copia el fichero temporal a la cache os.makedirs(cache_subfolder, exist_ok=True) shutil.copyfile(temp_file, cache_file) os.remove(temp_file) # Copia el fichero de la cache al destino if os.path.isfile(cache_file): os.makedirs(destination_subfolder, exist_ok=True) if cache_file.endswith(".zip") and element["subfolder"] == "": unzip_file(cache_file, destination_folder) else: shutil.copyfile(cache_file, destination_file) print_status(current_file, total_files, element, total_files_width, status=status) if wait: time.sleep(random.randint(min_wait, max_wait)) # El fichero ya existe en el destino else: print_status(current_file, total_files, element, total_files_width, status="skipping") except Exception as e: logging.error(f"Error al procesar el fichero {element['file_name']}: {e}") # Elimina los caracteres ilegales de la cadena de texto def normalize_path(path): illegal_chars = ["<", ">", ":", '"', "/", "\\", "|", "?", "*"] replace_with = "" for char in illegal_chars: path = unidecode(path.replace(char, replace_with)) return path # Elimina los subdirectorios vacios def remove_empty_directories(path): for root, dirs, files in os.walk(path, topdown=False): for dir in dirs: dir_path = os.path.join(root, dir) try: os.rmdir(dir_path) logging.info(f"Directorio vacío eliminado: {dir_path}") except OSError as e: # El directorio no está vacío o ocurrió otro error pass # Limpia la carpeta de destino def clear_destination_folder(): if should_clear_destination_path: logging.info("Limpiando la carpeta de destino ...") for filename in os.listdir(destination_path): file_path = os.path.join(destination_path, filename) try: if os.path.isfile(file_path) or os.path.islink(file_path): os.unlink(file_path) logging.info(f"Archivo eliminado: {file_path}") elif os.path.isdir(file_path): shutil.rmtree(file_path) logging.info(f"Directorio eliminado: {file_path}") except Exception as e: logging.error(f'No se pudo eliminar {file_path}. Razón: {e}') # Imprime la lista de elementos def print_elements(mode=0): if mode == 0: # Primer bucle for for element in elements: print('') for key, value in element.items(): print(key, ':', value) elif mode == 1: # Segundo bucle for con eliminación de duplicados seen = set() for element in elements: root_folder = element['root_folder'] if root_folder not in seen: print(root_folder) seen.add(root_folder) # Imprimir el número de elementos únicos print(f"Número de entradas: {len(seen)}") # Bucle principal def main(): connect(query_index=3) process_elements() print_elements(mode=1) clear_destination_folder() get_files() if __name__ == "__main__": main()