From 942a4c7b265745e3b74b71b947b4aeb2572f32a9 Mon Sep 17 00:00:00 2001 From: Sergio Date: Thu, 14 Nov 2024 17:01:27 +0100 Subject: [PATCH] Actualitzat zx_tosec_game_collector.py --- python/zx_game_collector.py | 101 ------- python/zx_tosec_game_collector.py | 162 ++++++++++++ python/zxdb.py | 424 ------------------------------ 3 files changed, 162 insertions(+), 525 deletions(-) delete mode 100644 python/zx_game_collector.py create mode 100644 python/zx_tosec_game_collector.py delete mode 100644 python/zxdb.py diff --git a/python/zx_game_collector.py b/python/zx_game_collector.py deleted file mode 100644 index d2d3361..0000000 --- a/python/zx_game_collector.py +++ /dev/null @@ -1,101 +0,0 @@ -import os -import re # regexp -import shutil -from pathlib import Path - - -def first_letter(x): - if len(x) == 0: - return "0-9" - if x[0] in ["0", "1", "2", "3", "4", "5", "6", "7", "8", "9"]: - return "0-9" - else: - return x[0].upper() - - -source_path = [Path("/home/sergio/zx/Games"), Path("/home/sergio/zx/Pokes")] -destination_path = Path("/home/sergio/tmp/final") - -opt_print = "no" -opt_create_dirs = "yes" - -# Elimina el directorio de destino -try: - print("Directory: {} -> deleting...".format(destination_path)) - shutil.rmtree(destination_path) - print("Directory: {} -> removed successfully".format(destination_path)) -except OSError as o: - print(f"Error, {o.strerror}: {destination_path}") - -# Crea el directorio de destino -try: - os.mkdir(destination_path) - print("Directory: {} -> created successfully".format(destination_path)) -except OSError as error: - print(error) - -# Variables -paths = [] # Ruta donde se encuentra el fichero -files = [] # Nombre del fichero -names = [] # Nombre del juego -years = [] # Año del juego -companies = [] # Compañía o distribuidora del juego - -# Obtiene la lista de ficheros desde los directorios de origen -for path in source_path: - for file_name in os.listdir(path): - if os.path.isfile(os.path.join(path, file_name)): # Comprueba si es un fichero - paths.append(path) # Añade la ruta - files.append(file_name) # Añade el nombre del fichero - -# Extrae los datos del juego -regex_year = r"\(\d.*?\)" -regex_company = r"^\(.*?\)" - -for i in files: - # Año - match = re.search(regex_year, i) # Busca el año en el nombre del fichero - if match: - years.append(match.group()) # Añade el año con los parentesis a la lista - pos = i.find(years[-1]) # Busca el caracter donde empieza el año - names.append( - i[0:pos].strip() - ) # Añade como nombre el texto que hay desde el principio hasta el año - match_company = re.search( - regex_company, i[pos + len(years[-1]) :] - ) # Busca la compañia en lo que queda despues del año - if match_company: - companies.append(match_company.group()) - else: - companies.append("-") - - # Limpia los parentesis del año y la compañía - years[-1] = years[-1][1:5] - if companies[-1] != "-": - companies[-1] = companies[-1][1:-1] - else: - years.append("0") - names.append(i) - companies.append("-") - -# Lista los resultados -if opt_print == "yes": - for i, item in enumerate(files): - print( - "File: {}\nName: {}\nYear: {}\nCompany: {}\n".format( - item, names[i], years[i], companies[i] - ) - ) - -# Copia los archivos -total_files = len(files) -if opt_create_dirs == "yes": - for i in range(total_files): - print("({} de {}) {}".format(i + 1, total_files, files[i])) - game_dir = names[i] + " (" + years[i] + ")" - dst_path = os.path.join(destination_path, first_letter(names[i]), game_dir) - if not os.path.exists(dst_path): - os.makedirs(dst_path) - src = os.path.join(paths[i], files[i]) - dst = os.path.join(dst_path, files[i]) - shutil.copyfile(src, dst) diff --git a/python/zx_tosec_game_collector.py b/python/zx_tosec_game_collector.py new file mode 100644 index 0000000..2d27804 --- /dev/null +++ b/python/zx_tosec_game_collector.py @@ -0,0 +1,162 @@ +import re +import os +import shutil +import logging +from pathlib import Path + +# Configuración del logger +logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') + +# Variables globales +source_path = [Path("/home/sergio/zx/tosec/all"), Path("/home/sergio/zx/tosec/pokes")] +destination_path = Path("/home/sergio/zx/tosec/final") +opt_print = True +opt_create_dirs = True +opt_split_modern_and_classic = True +last_classic_year = 1993 +modern_folder_name = "modern" +classic_folder_name = "classics" + +# Variables para almacenar datos de los archivos +paths = [] # Ruta donde se encuentra el fichero +files = [] # Nombre del fichero +names = [] # Nombre del juego +years = [] # Año del juego +companies = [] # Compañía o distribuidora del juego + +def remove_destination_folder(destination_path): + try: + logging.info(f"Directorio: {destination_path} -> eliminando...") + shutil.rmtree(destination_path) + logging.info(f"Directorio: {destination_path} -> eliminado con éxito") + except OSError as o: + logging.error(f"Error, {o.strerror}: {destination_path}") + +def create_destination_folder(destination_path): + try: + os.mkdir(destination_path) + logging.info(f"Directorio: {destination_path} -> creado con éxito") + except OSError as error: + logging.error(f"Error al crear el directorio: {error}") + +def get_file_list(source_paths): + global paths, files + paths = [] + files = [] + + for path in source_paths: + try: + for file_name in os.listdir(path): + if os.path.isfile(os.path.join(path, file_name)): # Comprueba si es un fichero + paths.append(path) # Añade la ruta + files.append(file_name) # Añade el nombre del fichero + logging.info(f"Fichero encontrado: {file_name} en {path}") + except OSError as error: + logging.error(f"Error al acceder al directorio {path}: {error}") + + return paths, files + +def extract_game_data(): + global files, names, years, companies + regex_year = r"\(\d.*?\)" + regex_company = r"^\(.*?\)" + + for file in files: + # Año + match = re.search(regex_year, file) # Busca el año en el nombre del fichero + if match: + years.append(match.group()) # Añade el año con los parentesis a la lista + pos = file.find(years[-1]) # Busca el carácter donde empieza el año + names.append(file[0:pos].strip()) # Añade como nombre el texto desde el principio hasta el año + match_company = re.search(regex_company, file[pos + len(years[-1]):]) # Busca la compañía en lo que queda después del año + if match_company: + companies.append(match_company.group()) + else: + companies.append("-") + + # Limpia los paréntesis del año y la compañía + years[-1] = years[-1][1:5] + if companies[-1] != "-": + companies[-1] = companies[-1][1:-1] + else: + years.append("0") + names.append(file) + companies.append("-") + + logging.info(f"Datos extraídos -> Nombre: {names[-1]}, Año: {years[-1]}, Compañía: {companies[-1]}") + +def print_results(): + if opt_print: + for i, item in enumerate(files): + print( + "File: {}\nName: {}\nYear: {}\nCompany: {}\n".format( + item, names[i], years[i], companies[i] + ) + ) + +def first_letter(name): + if not name or name[0].isdigit(): + return "0-9" + return name[0].upper() + +def safe_int_conversion(value): + try: + return int(value) + except ValueError: + return "unknown" + +def get_modern_and_classic_folder(year): + year = safe_int_conversion(year) + + if year == "unknown": + return "unknown" + + if opt_split_modern_and_classic: + if year == "none" or year is None or year > last_classic_year: + return modern_folder_name + else: + return classic_folder_name + else: + return "" + +def copy_files(): + global files, names, years, companies, paths, destination_path, opt_create_dirs + + total_files = len(files) + if opt_create_dirs: + for i in range(total_files): + logging.info("({} de {}) {}".format(i + 1, total_files, files[i])) + game_dir = f"{names[i]} ({years[i]})" + modern_and_classic_folder = get_modern_and_classic_folder(years[i]) + dst_path = os.path.join(destination_path, modern_and_classic_folder, first_letter(names[i]), game_dir) + if not os.path.exists(dst_path): + os.makedirs(dst_path) + logging.info(f"Directorio creado: {dst_path}") + src = os.path.join(paths[i], files[i]) + dst = os.path.join(dst_path, files[i]) + shutil.copyfile(src, dst) + # logging.info(f"Archivo copiado de {src} a {dst}") + +def main(): + global destination_path, source_path, opt_print, opt_create_dirs, paths, files + + # Eliminar la carpeta de destino si existe + remove_destination_folder(destination_path) + + # Crear la carpeta de destino + create_destination_folder(destination_path) + + # Obtener la lista de archivos desde los directorios de origen + paths, files = get_file_list(source_path) + + # Extraer los datos del juego + extract_game_data() + + # Imprimir los resultados + # print_results() + + # Copiar los archivos + copy_files() + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/python/zxdb.py b/python/zxdb.py deleted file mode 100644 index dfd0cc7..0000000 --- a/python/zxdb.py +++ /dev/null @@ -1,424 +0,0 @@ -## Script para descargar ficheros de spectrum a partir de zxdb - -## Imports utilizados en el script -import os -import mysql.connector -import requests -import time -import random -import zipfile -import shutil -from mysql.connector import errorcode -from urllib.parse import urlparse -from urllib.request import urlretrieve - -## Direcciones de internet de donde descargar los datos -url_prefix = { - "spectrum_computing": r"https://spectrumcomputing.co.uk", - "wos": r"https://php.sustancia.synology.me/wos", - "nvg": r"https://php.sustancia.synology.me/nvg", -} - -## Rutas locales donde depositar los resultados -destination_path = r"/home/sergio/zx/zxdb/games/" -cache_path = r"/home/sergio/zx/zxdb/cache/games/" -temp_file = r"/tmp/zxdb.download.tmp" - -## Parametros de configuración -should_clear_destination_path = True # Establece si se limpia primero la carpeta de destino -wait = True # Establece una pausa aleatoria entre descargas -min_wait = 2 # Segundos mínimos a esperar entre descargas -max_wait = min_wait + 1 # Segundos máximos a esperar entre descargas -elements = [] -filetypes_on_root = [ - "Tape image", - "Disk image", - "Snapshot image", - "POK pokes file", -] # Tipos de fichero que se guardan en la carpeta raíz del juego - - -def select(cursor): - query = [] - selected_query = 0 - - ## Esta consulta devuelve todos los juegos, filtrando aplicaciones, libros, etc y todos los ficheros asociados a esos juegos - ## 0 - select = """ - SELECT DISTINCT - e.title, l.name, r.release_year, d.file_link, f.text - FROM - ((((((publishers p - INNER JOIN entries e ON - p.entry_id = e.id) - INNER JOIN labels l ON - p.label_id = l.id) - INNER JOIN genretypes g ON - e.genretype_id = g.id) - INNER JOIN downloads d ON - e.id = d.entry_id) - INNER JOIN filetypes f ON - d.filetype_id = f.id) - INNER JOIN releases r ON - e.id = r.entry_id AND - p.release_seq = r.release_seq) - WHERE - (e.availabletype_id = 'A' OR e.availabletype_id = 'D') AND - (f.text <> 'Remote link' AND f.text <> '?') AND - r.release_seq = 0 AND - (g.text like '%Game:%' AND g.text not like 'Casual%') - ORDER BY - e.title;""" - query.append(select) - - ## Esta consulta se usa para filtrar mas la consulta anterior - ## 1 - select = """ - SELECT DISTINCT - e.title, l.name, r.release_year, d.file_link, f.text - FROM - ((((((publishers p - INNER JOIN entries e ON - p.entry_id = e.id) - INNER JOIN labels l ON - p.label_id = l.id) - INNER JOIN genretypes g ON - e.genretype_id = g.id) - INNER JOIN downloads d ON - e.id = d.entry_id) - INNER JOIN filetypes f ON - d.filetype_id = f.id) - INNER JOIN releases r ON - e.id = r.entry_id AND - p.release_seq = r.release_seq) - WHERE - (e.availabletype_id = 'A' OR e.availabletype_id = 'D') AND - (f.text <> 'Remote link' AND f.text <> '?') AND - r.release_seq = 0 AND - l.name like 'ZOSYA%' AND - (g.text like '%Game:%' AND g.text not like 'Casual%') - ORDER BY - e.title;""" - #(r.release_year >= '1986' AND r.release_year <= '1991') AND - #l.name in ('Dinamic Software', 'Aventuras AD S.A.', 'Arcadia Soft', 'Creepsoft', 'Dro Soft', 'Erbe Software S.A.', 'Iber Software', 'MCM Software S.A.', 'Made in Spain', 'New Frontier', 'Opera Soft S.A.', 'System 4', 'Topo Soft', 'Zigurat Software') AND - #(l.country_id = 'ES' AND l.labeltype_id = 'Z') AND - #l.name in ('Ocean Software Ltd', 'Imagine Software Ltd', 'Palace Software', 'Gremlin Graphics Software Ltd', 'Elite Systems Ltd', 'Melbourne House', 'Ultimate Play The Game', 'Durell Software Ltd', 'Codemasters Ltd') AND - #e.title = 'Arkanoid - Revenge of Doh' AND - query.append(select) - - ## Esta consulta devuelve todos los juegos, filtrando aplicaciones, libros, etc y SOLO los ficheros de cinta, disco o pokes - ## 2 - select = """ - SELECT DISTINCT - e.title, l.name, r.release_year, d.file_link, f.text - FROM - ((((((publishers p - INNER JOIN entries e ON - p.entry_id = e.id) - INNER JOIN labels l ON - p.label_id = l.id) - INNER JOIN genretypes g ON - e.genretype_id = g.id) - INNER JOIN downloads d ON - e.id = d.entry_id) - INNER JOIN filetypes f ON - d.filetype_id = f.id) - INNER JOIN releases r ON - e.id = r.entry_id AND - p.release_seq = r.release_seq) - WHERE - (e.availabletype_id = 'A' OR e.availabletype_id = 'D') AND - (f.text IN ('Tape image','Disk image','Snapshot image','POK pokes file')) AND - r.release_seq = 0 AND - (g.text like '%Game:%' AND g.text not like 'Casual%') - ORDER BY - e.title;""" - query.append(select) - - cursor.execute(query[selected_query]) - - for row in cursor: - element = dict( - title=row[0], - developer=row[1], - release_year=row[2], - url=row[3], - filetype=row[4], - ) - elements.append(element) - -## Establece la conexión a la BBDD y ejecuta la consulta -def connect(): - config = { - "user": "root", - "password": "unJEPimbJddHP8", - "host": "127.0.0.1", - "database": "zxdb", - "raise_on_warnings": True, - } - - try: - connection = mysql.connector.connect(**config) - cursor = connection.cursor() - select(cursor) - - except mysql.connector.Error as err: - if err.errno == errorcode.ER_ACCESS_DENIED_ERROR: - print("Something is wrong with your user name or password") - elif err.errno == errorcode.ER_BAD_DB_ERROR: - print("Database does not exist") - else: - print(err) - - finally: - if connection.is_connected(): - connection.close() - cursor.close() - -## Procesa todos lo elementos, modificando cada uno de sus parametros -def process_elements(): - global elements - for i in range(len(elements)): - # Construye el nombre de la carpeta raiz - elements[i]["root_folder"] = ( - elements[i]["title"] - + " (" - + str(elements[i]["release_year"]) - + ")(" - + elements[i]["developer"] - + ")" - ) - elements[i]["root_folder"] = normalize_path(elements[i]["root_folder"]) - - # Obtiene el nombre del fichero a partir de la url de descarga - elements[i]["file_name"] = url_filename(elements[i]["url"]) - - # Establece la subcarpeta dentro de la raiz - elements[i]["subfolder"] = "" - if elements[i]["filetype"] not in filetypes_on_root: - elements[i]["subfolder"] = normalize_path(elements[i]["filetype"]) - - # Averigua si el fichero está en formato .zip - elements[i]["is_zip"] = elements[i]["file_name"].endswith(".zip") - - # Calcula el nombre del fichero si es un zip - elements[i]["non_zip_file_name"] = elements[i]["file_name"] - if elements[i]["is_zip"]: - elements[i]["non_zip_file_name"] = elements[i]["file_name"][:-4] - - # Añade el prefijo a la url - if elements[i]["url"].startswith("/zxdb"): - elements[i]["url"] = url_prefix["spectrum_computing"] + str( - elements[i]["url"] - ) - elif elements[i]["url"].startswith("/pub"): - elements[i]["url"] = url_prefix["wos"] + str(elements[i]["url"][4:]) - elif elements[i]["url"].startswith("/nvg"): - elements[i]["url"] = url_prefix["nvg"] + str(elements[i]["url"][4:]) - - -## Devuelve el fichero que forma la parte final de una URL -def url_filename(url): - parsed_url = urlparse(url) - path = parsed_url.path - filename = os.path.basename(path) - return filename - - -## Descarga un fichero a partir de una URL -def download_file(url, dest): - try: - r = requests.get(url) - if r.status_code != 200: - return False - with open(dest, "wb") as f: - f.write(r.content) - return True - - except requests.exceptions.Timeout: - # Maybe set up for a retry, or continue in a retry loop - print("Timeout: {}".format(url)) - - except requests.exceptions.TooManyRedirects: - # Tell the user their URL was bad and try a different one - print("Bad URL: {}".format(url)) - - except requests.exceptions.RequestException as e: - # catastrophic error. bail. - raise SystemExit(e) - - -## Descomprime los ficheros que coinciden con la lista de extensiones -def unzip_file(src, dst): - # with zipfile.ZipFile(src, "r") as zip_ref: - # zip_ref.extractall(dst) - archive = src - directory = dst - extensions = (".z80", ".sna", ".tzx", ".tap", "dsk", ".trd", ".Z80", ".SNA", ".TZX", ".TAP", "DSK", ".TRD") - zip_file = zipfile.ZipFile(archive, "r") - [ - zip_file.extract(file, directory) - for file in zip_file.namelist() - if file.endswith(extensions) - ] - zip_file.close() - - -## Obtiene los ficheros de la consulta desde internet o desde la caché -## y los deposita en la carpeta destino, descomprimiendo los archivos necesarios -def get_files(): - # Variables para la presentación en pantalla de la descarga - current_file = 0 - total_files = len(elements) - total_files_width = len(str(total_files)) - last_game_folder = "" - for element in elements: - # Carpeta del juego en destino y en caché - game_folder = element["root_folder"] - destination_folder = os.path.join(destination_path, element["root_folder"]) - destination_subfolder = os.path.join(destination_folder, element["subfolder"]) - cache_folder = os.path.join(cache_path, element["root_folder"]) - cache_subfolder = os.path.join(cache_folder, element["subfolder"]) - - # Ruta completa hasta el fichero de destino y de caché - destination_file = os.path.join(destination_subfolder, element["file_name"]) - cache_file = os.path.join(cache_subfolder, element["file_name"]) - - # Actualiza las variables de presentación - current_file = current_file + 1 - - if game_folder != last_game_folder: - print("\n{}".format(game_folder)) - last_game_folder = game_folder - - #print( - # "(WORKING : {} ({})".format( - # element["file_name"], - # element["filetype"] - # ) - #) - - # Comprueba si ya existe el fichero a descargar - if not os.path.isfile(destination_file) and ( - not os.path.isfile( - os.path.join(destination_subfolder, element["non_zip_file_name"]) - ) - ): - # Comprueba si ya existe el fichero en la cache - if os.path.isfile(cache_file): - # Si encuentra el fichero en cache, crea las carpetas de destino y lo copia o lo extrae - if not os.path.isdir(destination_folder): - os.mkdir(destination_folder) - if not os.path.isdir(destination_subfolder): - os.mkdir(destination_subfolder) - if cache_file.endswith(".zip") and element["subfolder"] == "": - unzip_file(cache_file, destination_subfolder) - else: - shutil.copyfile(cache_file, destination_file) - print( - "({:{width}} / {}) : cached : {} ({})".format( - current_file, - total_files, - element["file_name"], - element["filetype"], - width=total_files_width, - ) - ) - # El fichero no está en la cache - else: - status = "not found " - if download_file(element["url"], temp_file): - status = "downloaded" - if os.path.isfile(temp_file): - # Copia el fichero temnporal a la cache - if not os.path.isdir(cache_folder): - os.mkdir(cache_folder) - if not os.path.isdir(cache_subfolder): - os.mkdir(cache_subfolder) - shutil.copyfile(temp_file, cache_file) - os.remove(temp_file) - # Copia el fichero de la cache al destino - if os.path.isfile(cache_file): - if not os.path.isdir(destination_folder): - os.mkdir(destination_folder) - if not os.path.isdir(destination_subfolder): - os.mkdir(destination_subfolder) - if ( - cache_file.endswith(".zip") - and element["subfolder"] == "" - ): - unzip_file(cache_file, destination_folder) - else: - shutil.copyfile(cache_file, destination_file) - print( - "({:{width}} / {}) : {} : {} ({})".format( - current_file, - total_files, - status, - element["file_name"], - element["filetype"], - width=total_files_width, - ) - ) - if wait: - time.sleep(random.randint(min_wait, max_wait)) - - # El fichero ya existe en el destino - else: - print( - "({:{width}} / {}) : skipping : {} ({})".format( - current_file, - total_files, - element["file_name"], - element["filetype"], - width=total_files_width, - ) - ) - - -## Elimina los caracteres ilegales de la cadena de texto -def normalize_path(path): - illegal_chars = ["<", ">", ":", '"', "/", "\\", "|", "?", "*"] - replace_with = "_" - for char in illegal_chars: - path = path.replace(char, replace_with) - return path - - -## Limpia la carpeta de destino -def clear_destination_folder(): - if should_clear_destination_path: - print("Clear destination folder ...") - folder = destination_path - for filename in os.listdir(folder): - file_path = os.path.join(folder, filename) - try: - if os.path.isfile(file_path) or os.path.islink(file_path): - os.unlink(file_path) - elif os.path.isdir(file_path): - shutil.rmtree(file_path) - except Exception as e: - print('Failed to delete %s. Reason: %s' % (file_path, e)) - - -## Bucle principal -def main(): - connect() - process_elements() - - #for element in elements: - # print('') - # for key, value in element.items(): - # print(key, ':', value) - - clear_destination_folder() - get_files() - - # for element in elements: - # print(element['title']) - - # print(len(elements)) - - -if __name__ == "__main__": - main()