## Script para descargar ficheros de spectrum a partir de zxdb ## Imports utilizados en el script import os import mysql.connector import requests import time import random import zipfile import shutil from mysql.connector import errorcode from urllib.parse import urlparse from urllib.request import urlretrieve ## Direcciones de internet de donde descargar los datos url_prefix = { "spectrum_computing": r"https://spectrumcomputing.co.uk", "wos": r"https://php.sustancia.synology.me/wos", "nvg": r"https://php.sustancia.synology.me/nvg", } ## Rutas locales donde depositar los resultados destination_path = r"/home/sergio/zx/zxdb/games/" cache_path = r"/home/sergio/zx/zxdb/cache/games/" temp_file = r"/tmp/zxdb.download.tmp" ## Parametros de configuración should_clear_destination_path = True # Establece si se limpia primero la carpeta de destino wait = True # Establece una pausa aleatoria entre descargas min_wait = 2 # Segundos mínimos a esperar entre descargas max_wait = min_wait + 1 # Segundos máximos a esperar entre descargas elements = [] filetypes_on_root = [ "Tape image", "Disk image", "Snapshot image", "POK pokes file", ] # Tipos de fichero que se guardan en la carpeta raíz del juego def select(cursor): query = [] selected_query = 0 ## Esta consulta devuelve todos los juegos, filtrando aplicaciones, libros, etc y todos los ficheros asociados a esos juegos ## 0 select = """ SELECT DISTINCT e.title, l.name, r.release_year, d.file_link, f.text FROM ((((((publishers p INNER JOIN entries e ON p.entry_id = e.id) INNER JOIN labels l ON p.label_id = l.id) INNER JOIN genretypes g ON e.genretype_id = g.id) INNER JOIN downloads d ON e.id = d.entry_id) INNER JOIN filetypes f ON d.filetype_id = f.id) INNER JOIN releases r ON e.id = r.entry_id AND p.release_seq = r.release_seq) WHERE (e.availabletype_id = 'A' OR e.availabletype_id = 'D') AND (f.text <> 'Remote link' AND f.text <> '?') AND r.release_seq = 0 AND (g.text like '%Game:%' AND g.text not like 'Casual%') ORDER BY e.title;""" query.append(select) ## Esta consulta se usa para filtrar mas la consulta anterior ## 1 select = """ SELECT DISTINCT e.title, l.name, r.release_year, d.file_link, f.text FROM ((((((publishers p INNER JOIN entries e ON p.entry_id = e.id) INNER JOIN labels l ON p.label_id = l.id) INNER JOIN genretypes g ON e.genretype_id = g.id) INNER JOIN downloads d ON e.id = d.entry_id) INNER JOIN filetypes f ON d.filetype_id = f.id) INNER JOIN releases r ON e.id = r.entry_id AND p.release_seq = r.release_seq) WHERE (e.availabletype_id = 'A' OR e.availabletype_id = 'D') AND (f.text <> 'Remote link' AND f.text <> '?') AND r.release_seq = 0 AND l.name like 'ZOSYA%' AND (g.text like '%Game:%' AND g.text not like 'Casual%') ORDER BY e.title;""" #(r.release_year >= '1986' AND r.release_year <= '1991') AND #l.name in ('Dinamic Software', 'Aventuras AD S.A.', 'Arcadia Soft', 'Creepsoft', 'Dro Soft', 'Erbe Software S.A.', 'Iber Software', 'MCM Software S.A.', 'Made in Spain', 'New Frontier', 'Opera Soft S.A.', 'System 4', 'Topo Soft', 'Zigurat Software') AND #(l.country_id = 'ES' AND l.labeltype_id = 'Z') AND #l.name in ('Ocean Software Ltd', 'Imagine Software Ltd', 'Palace Software', 'Gremlin Graphics Software Ltd', 'Elite Systems Ltd', 'Melbourne House', 'Ultimate Play The Game', 'Durell Software Ltd', 'Codemasters Ltd') AND #e.title = 'Arkanoid - Revenge of Doh' AND query.append(select) ## Esta consulta devuelve todos los juegos, filtrando aplicaciones, libros, etc y SOLO los ficheros de cinta, disco o pokes ## 2 select = """ SELECT DISTINCT e.title, l.name, r.release_year, d.file_link, f.text FROM ((((((publishers p INNER JOIN entries e ON p.entry_id = e.id) INNER JOIN labels l ON p.label_id = l.id) INNER JOIN genretypes g ON e.genretype_id = g.id) INNER JOIN downloads d ON e.id = d.entry_id) INNER JOIN filetypes f ON d.filetype_id = f.id) INNER JOIN releases r ON e.id = r.entry_id AND p.release_seq = r.release_seq) WHERE (e.availabletype_id = 'A' OR e.availabletype_id = 'D') AND (f.text IN ('Tape image','Disk image','Snapshot image','POK pokes file')) AND r.release_seq = 0 AND (g.text like '%Game:%' AND g.text not like 'Casual%') ORDER BY e.title;""" query.append(select) cursor.execute(query[selected_query]) for row in cursor: element = dict( title=row[0], developer=row[1], release_year=row[2], url=row[3], filetype=row[4], ) elements.append(element) ## Establece la conexión a la BBDD y ejecuta la consulta def connect(): config = { "user": "root", "password": "unJEPimbJddHP8", "host": "127.0.0.1", "database": "zxdb", "raise_on_warnings": True, } try: connection = mysql.connector.connect(**config) cursor = connection.cursor() select(cursor) except mysql.connector.Error as err: if err.errno == errorcode.ER_ACCESS_DENIED_ERROR: print("Something is wrong with your user name or password") elif err.errno == errorcode.ER_BAD_DB_ERROR: print("Database does not exist") else: print(err) finally: if connection.is_connected(): connection.close() cursor.close() ## Procesa todos lo elementos, modificando cada uno de sus parametros def process_elements(): global elements for i in range(len(elements)): # Construye el nombre de la carpeta raiz elements[i]["root_folder"] = ( elements[i]["title"] + " (" + str(elements[i]["release_year"]) + ")(" + elements[i]["developer"] + ")" ) elements[i]["root_folder"] = normalize_path(elements[i]["root_folder"]) # Obtiene el nombre del fichero a partir de la url de descarga elements[i]["file_name"] = url_filename(elements[i]["url"]) # Establece la subcarpeta dentro de la raiz elements[i]["subfolder"] = "" if elements[i]["filetype"] not in filetypes_on_root: elements[i]["subfolder"] = normalize_path(elements[i]["filetype"]) # Averigua si el fichero está en formato .zip elements[i]["is_zip"] = elements[i]["file_name"].endswith(".zip") # Calcula el nombre del fichero si es un zip elements[i]["non_zip_file_name"] = elements[i]["file_name"] if elements[i]["is_zip"]: elements[i]["non_zip_file_name"] = elements[i]["file_name"][:-4] # Añade el prefijo a la url if elements[i]["url"].startswith("/zxdb"): elements[i]["url"] = url_prefix["spectrum_computing"] + str( elements[i]["url"] ) elif elements[i]["url"].startswith("/pub"): elements[i]["url"] = url_prefix["wos"] + str(elements[i]["url"][4:]) elif elements[i]["url"].startswith("/nvg"): elements[i]["url"] = url_prefix["nvg"] + str(elements[i]["url"][4:]) ## Devuelve el fichero que forma la parte final de una URL def url_filename(url): parsed_url = urlparse(url) path = parsed_url.path filename = os.path.basename(path) return filename ## Descarga un fichero a partir de una URL def download_file(url, dest): try: r = requests.get(url) if r.status_code != 200: return False with open(dest, "wb") as f: f.write(r.content) return True except requests.exceptions.Timeout: # Maybe set up for a retry, or continue in a retry loop print("Timeout: {}".format(url)) except requests.exceptions.TooManyRedirects: # Tell the user their URL was bad and try a different one print("Bad URL: {}".format(url)) except requests.exceptions.RequestException as e: # catastrophic error. bail. raise SystemExit(e) ## Descomprime los ficheros que coinciden con la lista de extensiones def unzip_file(src, dst): # with zipfile.ZipFile(src, "r") as zip_ref: # zip_ref.extractall(dst) archive = src directory = dst extensions = (".z80", ".sna", ".tzx", ".tap", "dsk", ".trd", ".Z80", ".SNA", ".TZX", ".TAP", "DSK", ".TRD") zip_file = zipfile.ZipFile(archive, "r") [ zip_file.extract(file, directory) for file in zip_file.namelist() if file.endswith(extensions) ] zip_file.close() ## Obtiene los ficheros de la consulta desde internet o desde la caché ## y los deposita en la carpeta destino, descomprimiendo los archivos necesarios def get_files(): # Variables para la presentación en pantalla de la descarga current_file = 0 total_files = len(elements) total_files_width = len(str(total_files)) last_game_folder = "" for element in elements: # Carpeta del juego en destino y en caché game_folder = element["root_folder"] destination_folder = os.path.join(destination_path, element["root_folder"]) destination_subfolder = os.path.join(destination_folder, element["subfolder"]) cache_folder = os.path.join(cache_path, element["root_folder"]) cache_subfolder = os.path.join(cache_folder, element["subfolder"]) # Ruta completa hasta el fichero de destino y de caché destination_file = os.path.join(destination_subfolder, element["file_name"]) cache_file = os.path.join(cache_subfolder, element["file_name"]) # Actualiza las variables de presentación current_file = current_file + 1 if game_folder != last_game_folder: print("\n{}".format(game_folder)) last_game_folder = game_folder #print( # "(WORKING : {} ({})".format( # element["file_name"], # element["filetype"] # ) #) # Comprueba si ya existe el fichero a descargar if not os.path.isfile(destination_file) and ( not os.path.isfile( os.path.join(destination_subfolder, element["non_zip_file_name"]) ) ): # Comprueba si ya existe el fichero en la cache if os.path.isfile(cache_file): # Si encuentra el fichero en cache, crea las carpetas de destino y lo copia o lo extrae if not os.path.isdir(destination_folder): os.mkdir(destination_folder) if not os.path.isdir(destination_subfolder): os.mkdir(destination_subfolder) if cache_file.endswith(".zip") and element["subfolder"] == "": unzip_file(cache_file, destination_subfolder) else: shutil.copyfile(cache_file, destination_file) print( "({:{width}} / {}) : cached : {} ({})".format( current_file, total_files, element["file_name"], element["filetype"], width=total_files_width, ) ) # El fichero no está en la cache else: status = "not found " if download_file(element["url"], temp_file): status = "downloaded" if os.path.isfile(temp_file): # Copia el fichero temnporal a la cache if not os.path.isdir(cache_folder): os.mkdir(cache_folder) if not os.path.isdir(cache_subfolder): os.mkdir(cache_subfolder) shutil.copyfile(temp_file, cache_file) os.remove(temp_file) # Copia el fichero de la cache al destino if os.path.isfile(cache_file): if not os.path.isdir(destination_folder): os.mkdir(destination_folder) if not os.path.isdir(destination_subfolder): os.mkdir(destination_subfolder) if ( cache_file.endswith(".zip") and element["subfolder"] == "" ): unzip_file(cache_file, destination_folder) else: shutil.copyfile(cache_file, destination_file) print( "({:{width}} / {}) : {} : {} ({})".format( current_file, total_files, status, element["file_name"], element["filetype"], width=total_files_width, ) ) if wait: time.sleep(random.randint(min_wait, max_wait)) # El fichero ya existe en el destino else: print( "({:{width}} / {}) : skipping : {} ({})".format( current_file, total_files, element["file_name"], element["filetype"], width=total_files_width, ) ) ## Elimina los caracteres ilegales de la cadena de texto def normalize_path(path): illegal_chars = ["<", ">", ":", '"', "/", "\\", "|", "?", "*"] replace_with = "_" for char in illegal_chars: path = path.replace(char, replace_with) return path ## Limpia la carpeta de destino def clear_destination_folder(): if should_clear_destination_path: print("Clear destination folder ...") folder = destination_path for filename in os.listdir(folder): file_path = os.path.join(folder, filename) try: if os.path.isfile(file_path) or os.path.islink(file_path): os.unlink(file_path) elif os.path.isdir(file_path): shutil.rmtree(file_path) except Exception as e: print('Failed to delete %s. Reason: %s' % (file_path, e)) ## Bucle principal def main(): connect() process_elements() #for element in elements: # print('') # for key, value in element.items(): # print(key, ':', value) clear_destination_folder() get_files() # for element in elements: # print(element['title']) # print(len(elements)) if __name__ == "__main__": main()