Files
zxdb/zxdb.py
T
2024-11-17 16:40:40 +01:00

368 lines
14 KiB
Python

## Script para descargar ficheros de spectrum a partir de zxdb
## Imports utilizados en el script
import logging
import mysql.connector
import os
import random
import requests
import shutil
import sqlite3
import time
import zipfile
from dotenv import load_dotenv
from mysql.connector import errorcode
from unidecode import unidecode
from urllib.parse import urlparse
from urllib.request import urlretrieve
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
# Cargar las variables de entorno desde el archivo .env
load_dotenv()
# Configuración del logger
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
config = {
"user": os.getenv("DB_USER"),
"password": os.getenv("DB_PASSWORD"),
"host": os.getenv("DB_HOST"),
"port": os.getenv("DB_PORT"),
"database": os.getenv("DB_NAME"),
"raise_on_warnings": True,
}
# Direcciones de internet de donde descargar los datos
url_prefix = {
"spectrum_computing": r"https://spectrumcomputing.co.uk",
"wos": r"https://php.sustancia.synology.me/wos",
"nvg": r"https://php.sustancia.synology.me/nvg",
}
# Rutas locales donde depositar los resultados
destination_path = os.getenv('DESTINATION_PATH')
cache_path = os.getenv('CACHE_PATH')
temp_file = os.getenv('TEMP_FILE')
# Parametros de configuración
should_clear_destination_path = os.getenv('SHOULD_CLEAR_DESTINATION_PATH') == 'True' # Establece si se limpia primero la carpeta de destino
should_split_modern_and_classic = os.getenv('SHOULD_SPLIT_MODERN_AND_CLASSIC') == 'True'# Separa los juegos en dos carpetas a partir de un año especificado
should_sort_by_year = os.getenv('SHOULD_SORT_BY_YEAR') == 'True' # Separa los juegos por carpetas en función de su año de lanzamiento
should_sort_by_letter = os.getenv('SHOULD_SORT_BY_LETTER') == 'True' # Separa los juegos por carpetas en función de su primera letra
wait = os.getenv('WAIT') == 'True' # Establece una pausa aleatoria entre descargas
min_wait = int(os.getenv('MIN_WAIT')) # Cantidad de segundos mínima a esperar entre descargas
max_wait = int(os.getenv('MAX_WAIT')) # Cantidad de segundos máxima a esperar entre descargas
last_classic_year = int(os.getenv('LAST_CLASSIC_YEAR'))# Año usado para la separación entre juegos clásicos y modernos
# Tipos de fichero que se guardan en la carpeta raíz del juego
filetypes_on_root = [
"Tape image",
"Disk image",
"Snapshot image",
"POK pokes file",
]
# Resto de variables globales
elements = []
# Carga un fichero con consultas SQL
def load_queries(file_path):
with open(file_path, 'r') as file:
queries = file.read().split(';')
return [query.strip() for query in queries if query.strip()]
# Carga las consultas desde el archivo
queries = load_queries('queries.sql')
# Listado con las consultas
def select(cursor, query_index=0):
# Ejecutar la consulta seleccionada
cursor.execute(queries[query_index])
# Procesar los resultados
for row in cursor:
element = {
"title": row[0],
"developer": row[1],
"release_year": row[2],
"url": row[3],
"filetype": row[4],
}
elements.append(element)
# Registro de consulta ejecutada
logging.info(f"Consulta {query_index} ejecutada correctamente con {len(elements)} resultados.")
# Establece la conexión a la BBDD y ejecuta la consulta
def connect(query_index=0):
try:
with mysql.connector.connect(**config) as connection:
with connection.cursor() as cursor:
# Ejecutar la consulta 1
select(cursor, query_index=query_index)
except mysql.connector.Error as err:
if err.errno == errorcode.ER_ACCESS_DENIED_ERROR:
logging.error("Algo está mal con tu nombre de usuario o contraseña.")
elif err.errno == errorcode.ER_BAD_DB_ERROR:
logging.error("La base de datos no existe.")
else:
logging.error(err)
except Exception as e:
logging.error(f"Error inesperado: {e}")
# Añade un prefijo a la url
def update_url(element, url_prefix):
url = element["url"]
if url.startswith("/zxdb"):
element["url"] = url_prefix["spectrum_computing"] + url
elif url.startswith("/pub"):
element["url"] = url_prefix["wos"] + url[4:]
elif url.startswith("/nvg"):
element["url"] = url_prefix["nvg"] + url[4:]
# Procesa todos lo elementos, modificando cada uno de sus parametros
def process_elements():
global elements
for i in range(len(elements)):
# Construye el nombre de la carpeta raiz
elements[i]["root_folder"] = f"{elements[i]['title']} ({elements[i]['release_year']})({elements[i]['developer']})"
elements[i]["root_folder"] = normalize_path(elements[i]["root_folder"])
# Añade el prefijo a la url y normaliza los enlaces de "wos"
update_url(elements[i], url_prefix)
# Obtiene el nombre del fichero a partir de la url de descarga
elements[i]["file_name"] = url_filename(elements[i]["url"])
# Establece la subcarpeta dentro de la raiz
elements[i]["subfolder"] = normalize_path(elements[i]["filetype"]) if elements[i]["filetype"] not in filetypes_on_root else ""
# Averigua si el fichero está en formato .zip
elements[i]["is_zip"] = elements[i]["file_name"].lower().endswith(".zip")
# Calcula el nombre del fichero si es un zip
elements[i]["non_zip_file_name"] = elements[i]["file_name"][:-4] if elements[i]["is_zip"] else elements[i]["file_name"]
# Devuelve el fichero que forma la parte final de una URL
def url_filename(url):
parsed_url = urlparse(url)
path = parsed_url.path
filename = os.path.basename(path)
return filename
# Descarga un fichero a partir de una URL
def download_file(url, destination):
session = requests.Session()
retries = Retry(total=3, backoff_factor=1, status_forcelist=[500, 502, 503, 504])
session.mount('https://', HTTPAdapter(max_retries=retries))
try:
response = session.get(url, timeout=10)
response.raise_for_status()
with open(destination, 'wb') as file:
file.write(response.content)
return True
except requests.exceptions.RequestException as e:
logging.error(f"Error al descargar el archivo: {e}")
return False
# Descomprime los ficheros que coinciden con la lista de extensiones
def unzip_file(src, dst):
archive = src
directory = dst
extensions = (".z80", ".sna", ".tzx", ".tap", ".dsk", ".trd")
try:
with zipfile.ZipFile(archive, "r") as zip_file:
for file in zip_file.namelist():
if file.lower().endswith(extensions):
zip_file.extract(file, directory)
# logging.info(f"Archivo {file} extraído a {directory}")
except zipfile.BadZipFile:
logging.error("El archivo ZIP está corrupto.")
except FileNotFoundError:
logging.error("El archivo ZIP no se encontró.")
except Exception as e:
logging.error(f"Ocurrió un error: {e}")
# Imprime el estado de un archivo en el proceso de descarga
def print_status(current_file, total_files, element, total_files_width, status="cached"):
print(
"({:{width}} / {}) : {:<10} : {} ({})".format(
current_file,
total_files,
status,
element["file_name"],
element["filetype"],
width=total_files_width,
)
)
# Compone la carpeta de destino en función de varios parámetros
def get_final_destination_folder(year, root_folder):
# Carpeta basada en los años de vida comercial del spectrum
folder1 = ""
if should_split_modern_and_classic:
if year == "none" or year is None or year > last_classic_year:
folder1 = "modern"
else:
folder1 = "classics"
# Carpeta basada en el año de lanzamiento
folder2 = ""
if should_sort_by_year:
folder2 = str(year) if year not in [None, "none"] else "unknown"
# Carpeta basada en la primera letra del nombre de la carpeta raíz
folder3 = ""
if should_sort_by_letter:
if root_folder[0].isdigit():
folder3 = "0-9"
else:
folder3 = root_folder[0].upper()
# Asegurarse de que root_folder es una cadena
root_folder = root_folder or ""
# Combina los prefijos y la carpeta raíz para obtener la carpeta de destino final
return os.path.join(folder1, folder2, folder3, root_folder)
# Crea las carpetas de destino y copia o extrae el archivo de la caché
def process_cache_file(cache_file, destination_subfolder, destination_file, element):
os.makedirs(destination_subfolder, exist_ok=True)
if cache_file.endswith(".zip") and element["subfolder"] == "":
unzip_file(cache_file, destination_subfolder)
else:
shutil.copyfile(cache_file, destination_file)
# Obtiene los ficheros de la consulta desde internet o desde la caché
# y los deposita en la carpeta destino, descomprimiendo los archivos necesarios
def get_files():
# Variables para la presentación en pantalla de la descarga
current_file = 0
total_files = len(elements)
total_files_width = len(str(total_files))
last_game_folder = ""
for element in elements:
classification_folder = get_final_destination_folder(element["release_year"], element["root_folder"])
destination_folder = os.path.join(destination_path, classification_folder)
destination_subfolder = os.path.join(destination_folder, element["subfolder"])
cache_folder = os.path.join(cache_path, element["root_folder"])
cache_subfolder = os.path.join(cache_folder, element["subfolder"])
# Ruta completa hasta el fichero de destino y de caché
destination_file = os.path.join(destination_subfolder, element["file_name"])
cache_file = os.path.join(cache_subfolder, element["file_name"])
# Actualiza las variables de presentación
current_file += 1
if element["root_folder"] != last_game_folder:
print("\n{}".format(element["root_folder"]))
last_game_folder = element["root_folder"]
try:
# Si el fichero no existe en la carpeta de destino
if not os.path.isfile(destination_file) and not os.path.isfile(os.path.join(destination_subfolder, element["non_zip_file_name"])):
# Si existe en la caché, lo copia
if os.path.isfile(cache_file):
process_cache_file(cache_file, destination_subfolder, destination_file, element)
print_status(current_file, total_files, element, total_files_width, status="cached")
# Si no existe en la caché, lo descarga
else:
if download_file(element["url"], temp_file):
print_status(current_file, total_files, element, total_files_width, status="downloaded")
if os.path.isfile(temp_file):
# Mueve el fichero temporal descargado a la cache
os.makedirs(cache_subfolder, exist_ok=True)
shutil.move(temp_file, cache_file)
# Copia el fichero de la cache al destino
if os.path.isfile(cache_file):
process_cache_file(cache_file, destination_subfolder, destination_file, element)
else:
print_status(current_file, total_files, element, total_files_width, status="not found")
if wait:
time.sleep(random.randint(min_wait, max_wait))
# Si el fichero ya existe en el destino, no hace nada
else:
print_status(current_file, total_files, element, total_files_width, status="skipping")
except Exception as e:
logging.error(f"Error al procesar el fichero {element['file_name']}: {e}")
# Elimina los caracteres ilegales de la cadena de texto
def normalize_path(path):
illegal_chars = ["<", ">", ":", '"', "/", "\\", "|", "?", "*"]
replace_with = ""
for char in illegal_chars:
path = unidecode(path.replace(char, replace_with))
return path
# Elimina los subdirectorios vacios
def remove_empty_directories(path):
for root, dirs, files in os.walk(path, topdown=False):
for dir in dirs:
dir_path = os.path.join(root, dir)
try:
os.rmdir(dir_path)
logging.info(f"Directorio vacío eliminado: {dir_path}")
except OSError as e:
# El directorio no está vacío o ocurrió otro error
pass
# Limpia la carpeta de destino
def clear_destination_folder():
if should_clear_destination_path:
logging.info("Limpiando la carpeta de destino ...")
for filename in os.listdir(destination_path):
file_path = os.path.join(destination_path, filename)
try:
if os.path.isfile(file_path) or os.path.islink(file_path):
os.unlink(file_path)
logging.info(f"Archivo eliminado: {file_path}")
elif os.path.isdir(file_path):
shutil.rmtree(file_path)
logging.info(f"Directorio eliminado: {file_path}")
except Exception as e:
logging.error(f'No se pudo eliminar {file_path}. Razón: {e}')
# Imprime la lista de elementos
def print_elements(mode=0):
if mode == 0:
# Primer bucle for
for element in elements:
print('')
for key, value in element.items():
print(key, ':', value)
elif mode == 1:
# Segundo bucle for con eliminación de duplicados
seen = set()
for element in elements:
root_folder = element['root_folder']
if root_folder not in seen:
print(root_folder)
seen.add(root_folder)
# Imprimir el número de elementos únicos
print(f"Número de entradas: {len(seen)}")
# Bucle principal
def main():
connect(query_index=3)
process_elements()
#print_elements(mode=1)
clear_destination_folder()
get_files()
remove_empty_directories(destination_path)
if __name__ == "__main__":
main()