diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..37eac7e --- /dev/null +++ b/.gitignore @@ -0,0 +1,2 @@ +zxdbenv +.env \ No newline at end of file diff --git a/README.MD b/README.MD index 9fa0319..46ed41a 100644 --- a/README.MD +++ b/README.MD @@ -1,8 +1,7 @@ ## Anotacions - Per a connectar-se a la base de dades local (que està al container de MariaDB) - mysql -h 172.20.0.2 -P 3306 -u root -p + mysql -h 172.18.0.2 -P 3306 -u root -p Password @@ -14,7 +13,26 @@ Si la base de dades ja existeix, per seleccionar-la Per descarregar una nova versió, baixar-la desde [GitHub - zxdb/ZXDB: Open database with historical information about Sinclair machines](https://github.com/zxdb/ZXDB) +Per executar el fitxer .sql, amb la base de dades zxdb activa: -Para ejecutar el archivo .sql, una vez dentro de la base de datos con la base de datos zxdb activa, ejecutar + source /ruta/fichero.sql - source /ruta/fichero.sql \ No newline at end of file +Per instalar els requisits del script de python + + pip install -r requirements.txt + +Per crear un entorn virtual "zxdbenv" + + python3 -m venv zxdbenv + +Per activar el entorn "zxdbenv" + + source zxdbenv/bin/activate + +Exemple de fitxer .env + + DB_USER=root + DB_PASSWORD=unJEPimbJddHP8 + DB_HOST=172.18.0.2 + DB_PORT=3306 + DB_NAME=zxdb diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..8b93013 --- /dev/null +++ b/requirements.txt @@ -0,0 +1,5 @@ +mysql-connector-python +requests +python-dotenv +logging +unidecode \ No newline at end of file diff --git a/zxdb.py b/zxdb.py new file mode 100644 index 0000000..e839c16 --- /dev/null +++ b/zxdb.py @@ -0,0 +1,431 @@ +## Script para descargar ficheros de spectrum a partir de zxdb + +## Imports utilizados en el script +import logging +import mysql.connector +import os +import random +import requests +import shutil +import time +import zipfile +from dotenv import load_dotenv +from mysql.connector import errorcode +from unidecode import unidecode +from urllib.parse import urlparse +from urllib.request import urlretrieve + +# Cargar las variables de entorno desde el archivo .env +load_dotenv() + +# Configuración del logger +logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') + +config = { + "user": os.getenv("DB_USER"), + "password": os.getenv("DB_PASSWORD"), + "host": os.getenv("DB_HOST"), + "port": os.getenv("DB_PORT"), + "database": os.getenv("DB_NAME"), + "raise_on_warnings": True, +} + +# Direcciones de internet de donde descargar los datos +url_prefix = { + "spectrum_computing": r"https://spectrumcomputing.co.uk", + "wos": r"https://php.sustancia.synology.me/wos", + "nvg": r"https://php.sustancia.synology.me/nvg", +} + +# Rutas locales donde depositar los resultados +destination_path = r"/home/sergio/zx/zxdb/games/" +cache_path = r"/home/sergio/zx/zxdb/cache/games/" +temp_file = r"/tmp/zxdb.download.tmp" + +# Parametros de configuración +should_clear_destination_path = True # Establece si se limpia primero la carpeta de destino +wait = True # Establece una pausa aleatoria entre descargas +min_wait = 2 # Segundos mínimos a esperar entre descargas +max_wait = min_wait + 1 # Segundos máximos a esperar entre descargas +elements = [] +filetypes_on_root = [ + "Tape image", + "Disk image", + "Snapshot image", + "POK pokes file", +] # Tipos de fichero que se guardan en la carpeta raíz del juego + +# Listado con las consultas +def select(cursor, query_index=0): + # Lista de consultas + queries = [] + + # Consulta 0: Devuelve todos los juegos y sus archivos asociados + queries.append(""" + SELECT DISTINCT + e.title, l.name, r.release_year, d.file_link, f.text + FROM + ((((((publishers p + INNER JOIN entries e ON + p.entry_id = e.id) + INNER JOIN labels l ON + p.label_id = l.id) + INNER JOIN genretypes g ON + e.genretype_id = g.id) + INNER JOIN downloads d ON + e.id = d.entry_id) + INNER JOIN filetypes f ON + d.filetype_id = f.id) + INNER JOIN releases r ON + e.id = r.entry_id AND + p.release_seq = r.release_seq) + WHERE + (e.availabletype_id = 'A' OR e.availabletype_id = 'D') AND + (f.text <> 'Remote link' AND f.text <> '?') AND + r.release_seq = 0 AND + (g.text like '%Game:%' AND g.text not like 'Casual%') + ORDER BY + e.title; + """) + + # Consulta 1: Filtra más la consulta anterior + queries.append(""" + SELECT DISTINCT + e.title, l.name, r.release_year, d.file_link, f.text + FROM + ((((((publishers p + INNER JOIN entries e ON + p.entry_id = e.id) + INNER JOIN labels l ON + p.label_id = l.id) + INNER JOIN genretypes g ON + e.genretype_id = g.id) + INNER JOIN downloads d ON + e.id = d.entry_id) + INNER JOIN filetypes f ON + d.filetype_id = f.id) + INNER JOIN releases r ON + e.id = r.entry_id AND + p.release_seq = r.release_seq) + WHERE + (e.availabletype_id = 'A' OR e.availabletype_id = 'D') AND + (f.text <> 'Remote link' AND f.text <> '?') AND + r.release_seq = 0 AND + l.name like 'ZOSYA%' AND + (g.text like '%Game:%' AND g.text not like 'Casual%') + ORDER BY + e.title; + """) + + # Consulta 2: Devuelve juegos y solo archivos de cinta, disco o pokes + queries.append(""" + SELECT DISTINCT + e.title, l.name, r.release_year, d.file_link, f.text + FROM + ((((((publishers p + INNER JOIN entries e ON + p.entry_id = e.id) + INNERJOIN labels l ON + p.label_id = l.id) + INNERJOIN genretypes g ON + e.genretype_id = g.id) + INNERJOIN downloads d ON + e.id = d.entry_id) + INNERJOIN filetypes f ON + d.filetype_id = f.id) + INNERJOIN releases r ON + e.id = r.entry_id AND + p.release_seq = r.release_seq) + WHERE + (e.availabletype_id = 'A' OR e.availabletype_id = 'D') AND + (f.text IN ('Tape image','Disk image','Snapshot image','POK pokes file')) AND + r.release_seq = 0 AND + (g.text like '%Game:%' AND g.text not like 'Casual%') + ORDER BY + e.title; + """) + + # Ejecutar la consulta seleccionada + cursor.execute(queries[query_index]) + + # Procesar los resultados + for row in cursor: + element = { + "title": row[0], + "developer": row[1], + "release_year": row[2], + "url": row[3], + "filetype": row[4], + } + elements.append(element) + + # Registro de consulta ejecutada + logging.info(f"Consulta {query_index} ejecutada correctamente con {len(elements)} resultados.") + +# Establece la conexión a la BBDD y ejecuta la consulta +def connect(query_index=0): + try: + with mysql.connector.connect(**config) as connection: + with connection.cursor() as cursor: + # Ejecutar la consulta 1 + select(cursor, query_index=query_index) + except mysql.connector.Error as err: + if err.errno == errorcode.ER_ACCESS_DENIED_ERROR: + logging.error("Algo está mal con tu nombre de usuario o contraseña.") + elif err.errno == errorcode.ER_BAD_DB_ERROR: + logging.error("La base de datos no existe.") + else: + logging.error(err) + except Exception as e: + logging.error(f"Error inesperado: {e}") + +# Procesa todos lo elementos, modificando cada uno de sus parametros +def process_elements(): + global elements + for i in range(len(elements)): + # Construye el nombre de la carpeta raiz + elements[i]["root_folder"] = ( + elements[i]["title"] + + " (" + + str(elements[i]["release_year"]) + + ")(" + + elements[i]["developer"] + + ")" + ) + elements[i]["root_folder"] = normalize_path(elements[i]["root_folder"]) + + # Obtiene el nombre del fichero a partir de la url de descarga + elements[i]["file_name"] = url_filename(elements[i]["url"]) + + # Establece la subcarpeta dentro de la raiz + elements[i]["subfolder"] = "" + if elements[i]["filetype"] not in filetypes_on_root: + elements[i]["subfolder"] = normalize_path(elements[i]["filetype"]) + + # Averigua si el fichero está en formato .zip + elements[i]["is_zip"] = elements[i]["file_name"].endswith(".zip") + + # Calcula el nombre del fichero si es un zip + elements[i]["non_zip_file_name"] = elements[i]["file_name"] + if elements[i]["is_zip"]: + elements[i]["non_zip_file_name"] = elements[i]["file_name"][:-4] + + # Añade el prefijo a la url + if elements[i]["url"].startswith("/zxdb"): + elements[i]["url"] = url_prefix["spectrum_computing"] + str( + elements[i]["url"] + ) + elif elements[i]["url"].startswith("/pub"): + elements[i]["url"] = url_prefix["wos"] + str(elements[i]["url"][4:]) + elif elements[i]["url"].startswith("/nvg"): + elements[i]["url"] = url_prefix["nvg"] + str(elements[i]["url"][4:]) + +# Devuelve el fichero que forma la parte final de una URL +def url_filename(url): + parsed_url = urlparse(url) + path = parsed_url.path + filename = os.path.basename(path) + return filename + +# Descarga un fichero a partir de una URL +def download_file(url, dest): + try: + r = requests.get(url) + if r.status_code != 200: + return False + with open(dest, "wb") as f: + f.write(r.content) + return True + + except requests.exceptions.Timeout: + # Maybe set up for a retry, or continue in a retry loop + print("Timeout: {}".format(url)) + + except requests.exceptions.TooManyRedirects: + # Tell the user their URL was bad and try a different one + print("Bad URL: {}".format(url)) + + except requests.exceptions.RequestException as e: + # catastrophic error. bail. + raise SystemExit(e) + +# Descomprime los ficheros que coinciden con la lista de extensiones +def unzip_file(src, dst): + # with zipfile.ZipFile(src, "r") as zip_ref: + # zip_ref.extractall(dst) + archive = src + directory = dst + extensions = (".z80", ".sna", ".tzx", ".tap", "dsk", ".trd", ".Z80", ".SNA", ".TZX", ".TAP", "DSK", ".TRD") + zip_file = zipfile.ZipFile(archive, "r") + [ + zip_file.extract(file, directory) + for file in zip_file.namelist() + if file.endswith(extensions) + ] + zip_file.close() + +# Obtiene los ficheros de la consulta desde internet o desde la caché +# y los deposita en la carpeta destino, descomprimiendo los archivos necesarios +def get_files(): + # Variables para la presentación en pantalla de la descarga + current_file = 0 + total_files = len(elements) + total_files_width = len(str(total_files)) + last_game_folder = "" + for element in elements: + # Carpeta del juego en destino y en caché + game_folder = element["root_folder"] + destination_folder = os.path.join(destination_path, element["root_folder"]) + destination_subfolder = os.path.join(destination_folder, element["subfolder"]) + cache_folder = os.path.join(cache_path, element["root_folder"]) + cache_subfolder = os.path.join(cache_folder, element["subfolder"]) + + # Ruta completa hasta el fichero de destino y de caché + destination_file = os.path.join(destination_subfolder, element["file_name"]) + cache_file = os.path.join(cache_subfolder, element["file_name"]) + + # Actualiza las variables de presentación + current_file = current_file + 1 + + if game_folder != last_game_folder: + print("\n{}".format(game_folder)) + last_game_folder = game_folder + + #print( + # "(WORKING : {} ({})".format( + # element["file_name"], + # element["filetype"] + # ) + #) + + # Comprueba si ya existe el fichero a descargar + if not os.path.isfile(destination_file) and ( + not os.path.isfile( + os.path.join(destination_subfolder, element["non_zip_file_name"]) + ) + ): + # Comprueba si ya existe el fichero en la cache + if os.path.isfile(cache_file): + # Si encuentra el fichero en cache, crea las carpetas de destino y lo copia o lo extrae + if not os.path.isdir(destination_folder): + os.mkdir(destination_folder) + if not os.path.isdir(destination_subfolder): + os.mkdir(destination_subfolder) + if cache_file.endswith(".zip") and element["subfolder"] == "": + unzip_file(cache_file, destination_subfolder) + else: + shutil.copyfile(cache_file, destination_file) + print( + "({:{width}} / {}) : cached : {} ({})".format( + current_file, + total_files, + element["file_name"], + element["filetype"], + width=total_files_width, + ) + ) + # El fichero no está en la cache + else: + status = "not found " + if download_file(element["url"], temp_file): + status = "downloaded" + if os.path.isfile(temp_file): + # Copia el fichero temnporal a la cache + if not os.path.isdir(cache_folder): + os.mkdir(cache_folder) + if not os.path.isdir(cache_subfolder): + os.mkdir(cache_subfolder) + shutil.copyfile(temp_file, cache_file) + os.remove(temp_file) + # Copia el fichero de la cache al destino + if os.path.isfile(cache_file): + if not os.path.isdir(destination_folder): + os.mkdir(destination_folder) + if not os.path.isdir(destination_subfolder): + os.mkdir(destination_subfolder) + if ( + cache_file.endswith(".zip") + and element["subfolder"] == "" + ): + unzip_file(cache_file, destination_folder) + else: + shutil.copyfile(cache_file, destination_file) + print( + "({:{width}} / {}) : {} : {} ({})".format( + current_file, + total_files, + status, + element["file_name"], + element["filetype"], + width=total_files_width, + ) + ) + if wait: + time.sleep(random.randint(min_wait, max_wait)) + + # El fichero ya existe en el destino + else: + print( + "({:{width}} / {}) : skipping : {} ({})".format( + current_file, + total_files, + element["file_name"], + element["filetype"], + width=total_files_width, + ) + ) + + +# Elimina los caracteres ilegales de la cadena de texto +def normalize_path(path): + illegal_chars = ["<", ">", ":", '"', "/", "\\", "|", "?", "*"] + replace_with = "" + for char in illegal_chars: + path = unidecode(path.replace(char, replace_with)) + return path + + +# Limpia la carpeta de destino +def clear_destination_folder(): + if should_clear_destination_path: + print("Clear destination folder ...") + folder = destination_path + for filename in os.listdir(folder): + file_path = os.path.join(folder, filename) + try: + if os.path.isfile(file_path) or os.path.islink(file_path): + os.unlink(file_path) + elif os.path.isdir(file_path): + shutil.rmtree(file_path) + except Exception as e: + print('Failed to delete %s. Reason: %s' % (file_path, e)) + +# Imprime la lista de elementos +def print_elements(mode=0): + if mode == 0: + # Primer bucle for + for element in elements: + print('') + for key, value in element.items(): + print(key, ':', value) + elif mode == 1: + # Segundo bucle for con eliminación de duplicados + seen = set() + for element in elements: + root_folder = element['root_folder'] + if root_folder not in seen: + print(root_folder) + seen.add(root_folder) + # Imprimir el número de elementos únicos + print(f"Número de entradas: {len(seen)}") + +# Bucle principal +def main(): + connect(query_index=0) + process_elements() + print_elements(mode=1) + #clear_destination_folder() + #get_files() + +if __name__ == "__main__": + main()