Primer commit amb fitxers del projecte

This commit is contained in:
2024-11-12 19:21:17 +01:00
parent f7fa1dbe11
commit abb026ece0
4 changed files with 460 additions and 4 deletions
+2
View File
@@ -0,0 +1,2 @@
zxdbenv
.env
+22 -4
View File
@@ -1,8 +1,7 @@
## Anotacions
Per a connectar-se a la base de dades local (que està al container de MariaDB)
mysql -h 172.20.0.2 -P 3306 -u root -p
mysql -h 172.18.0.2 -P 3306 -u root -p
Password
@@ -14,7 +13,26 @@ Si la base de dades ja existeix, per seleccionar-la
Per descarregar una nova versió, baixar-la desde
[GitHub - zxdb/ZXDB: Open database with historical information about Sinclair machines](https://github.com/zxdb/ZXDB)
Para ejecutar el archivo .sql, una vez dentro de la base de datos con la base de datos zxdb activa, ejecutar
Per executar el fitxer .sql, amb la base de dades zxdb activa:
source /ruta/fichero.sql
Per instalar els requisits del script de python
pip install -r requirements.txt
Per crear un entorn virtual "zxdbenv"
python3 -m venv zxdbenv
Per activar el entorn "zxdbenv"
source zxdbenv/bin/activate
Exemple de fitxer .env
DB_USER=root
DB_PASSWORD=unJEPimbJddHP8
DB_HOST=172.18.0.2
DB_PORT=3306
DB_NAME=zxdb
+5
View File
@@ -0,0 +1,5 @@
mysql-connector-python
requests
python-dotenv
logging
unidecode
+431
View File
@@ -0,0 +1,431 @@
## Script para descargar ficheros de spectrum a partir de zxdb
## Imports utilizados en el script
import logging
import mysql.connector
import os
import random
import requests
import shutil
import time
import zipfile
from dotenv import load_dotenv
from mysql.connector import errorcode
from unidecode import unidecode
from urllib.parse import urlparse
from urllib.request import urlretrieve
# Cargar las variables de entorno desde el archivo .env
load_dotenv()
# Configuración del logger
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
config = {
"user": os.getenv("DB_USER"),
"password": os.getenv("DB_PASSWORD"),
"host": os.getenv("DB_HOST"),
"port": os.getenv("DB_PORT"),
"database": os.getenv("DB_NAME"),
"raise_on_warnings": True,
}
# Direcciones de internet de donde descargar los datos
url_prefix = {
"spectrum_computing": r"https://spectrumcomputing.co.uk",
"wos": r"https://php.sustancia.synology.me/wos",
"nvg": r"https://php.sustancia.synology.me/nvg",
}
# Rutas locales donde depositar los resultados
destination_path = r"/home/sergio/zx/zxdb/games/"
cache_path = r"/home/sergio/zx/zxdb/cache/games/"
temp_file = r"/tmp/zxdb.download.tmp"
# Parametros de configuración
should_clear_destination_path = True # Establece si se limpia primero la carpeta de destino
wait = True # Establece una pausa aleatoria entre descargas
min_wait = 2 # Segundos mínimos a esperar entre descargas
max_wait = min_wait + 1 # Segundos máximos a esperar entre descargas
elements = []
filetypes_on_root = [
"Tape image",
"Disk image",
"Snapshot image",
"POK pokes file",
] # Tipos de fichero que se guardan en la carpeta raíz del juego
# Listado con las consultas
def select(cursor, query_index=0):
# Lista de consultas
queries = []
# Consulta 0: Devuelve todos los juegos y sus archivos asociados
queries.append("""
SELECT DISTINCT
e.title, l.name, r.release_year, d.file_link, f.text
FROM
((((((publishers p
INNER JOIN entries e ON
p.entry_id = e.id)
INNER JOIN labels l ON
p.label_id = l.id)
INNER JOIN genretypes g ON
e.genretype_id = g.id)
INNER JOIN downloads d ON
e.id = d.entry_id)
INNER JOIN filetypes f ON
d.filetype_id = f.id)
INNER JOIN releases r ON
e.id = r.entry_id AND
p.release_seq = r.release_seq)
WHERE
(e.availabletype_id = 'A' OR e.availabletype_id = 'D') AND
(f.text <> 'Remote link' AND f.text <> '?') AND
r.release_seq = 0 AND
(g.text like '%Game:%' AND g.text not like 'Casual%')
ORDER BY
e.title;
""")
# Consulta 1: Filtra más la consulta anterior
queries.append("""
SELECT DISTINCT
e.title, l.name, r.release_year, d.file_link, f.text
FROM
((((((publishers p
INNER JOIN entries e ON
p.entry_id = e.id)
INNER JOIN labels l ON
p.label_id = l.id)
INNER JOIN genretypes g ON
e.genretype_id = g.id)
INNER JOIN downloads d ON
e.id = d.entry_id)
INNER JOIN filetypes f ON
d.filetype_id = f.id)
INNER JOIN releases r ON
e.id = r.entry_id AND
p.release_seq = r.release_seq)
WHERE
(e.availabletype_id = 'A' OR e.availabletype_id = 'D') AND
(f.text <> 'Remote link' AND f.text <> '?') AND
r.release_seq = 0 AND
l.name like 'ZOSYA%' AND
(g.text like '%Game:%' AND g.text not like 'Casual%')
ORDER BY
e.title;
""")
# Consulta 2: Devuelve juegos y solo archivos de cinta, disco o pokes
queries.append("""
SELECT DISTINCT
e.title, l.name, r.release_year, d.file_link, f.text
FROM
((((((publishers p
INNER JOIN entries e ON
p.entry_id = e.id)
INNERJOIN labels l ON
p.label_id = l.id)
INNERJOIN genretypes g ON
e.genretype_id = g.id)
INNERJOIN downloads d ON
e.id = d.entry_id)
INNERJOIN filetypes f ON
d.filetype_id = f.id)
INNERJOIN releases r ON
e.id = r.entry_id AND
p.release_seq = r.release_seq)
WHERE
(e.availabletype_id = 'A' OR e.availabletype_id = 'D') AND
(f.text IN ('Tape image','Disk image','Snapshot image','POK pokes file')) AND
r.release_seq = 0 AND
(g.text like '%Game:%' AND g.text not like 'Casual%')
ORDER BY
e.title;
""")
# Ejecutar la consulta seleccionada
cursor.execute(queries[query_index])
# Procesar los resultados
for row in cursor:
element = {
"title": row[0],
"developer": row[1],
"release_year": row[2],
"url": row[3],
"filetype": row[4],
}
elements.append(element)
# Registro de consulta ejecutada
logging.info(f"Consulta {query_index} ejecutada correctamente con {len(elements)} resultados.")
# Establece la conexión a la BBDD y ejecuta la consulta
def connect(query_index=0):
try:
with mysql.connector.connect(**config) as connection:
with connection.cursor() as cursor:
# Ejecutar la consulta 1
select(cursor, query_index=query_index)
except mysql.connector.Error as err:
if err.errno == errorcode.ER_ACCESS_DENIED_ERROR:
logging.error("Algo está mal con tu nombre de usuario o contraseña.")
elif err.errno == errorcode.ER_BAD_DB_ERROR:
logging.error("La base de datos no existe.")
else:
logging.error(err)
except Exception as e:
logging.error(f"Error inesperado: {e}")
# Procesa todos lo elementos, modificando cada uno de sus parametros
def process_elements():
global elements
for i in range(len(elements)):
# Construye el nombre de la carpeta raiz
elements[i]["root_folder"] = (
elements[i]["title"]
+ " ("
+ str(elements[i]["release_year"])
+ ")("
+ elements[i]["developer"]
+ ")"
)
elements[i]["root_folder"] = normalize_path(elements[i]["root_folder"])
# Obtiene el nombre del fichero a partir de la url de descarga
elements[i]["file_name"] = url_filename(elements[i]["url"])
# Establece la subcarpeta dentro de la raiz
elements[i]["subfolder"] = ""
if elements[i]["filetype"] not in filetypes_on_root:
elements[i]["subfolder"] = normalize_path(elements[i]["filetype"])
# Averigua si el fichero está en formato .zip
elements[i]["is_zip"] = elements[i]["file_name"].endswith(".zip")
# Calcula el nombre del fichero si es un zip
elements[i]["non_zip_file_name"] = elements[i]["file_name"]
if elements[i]["is_zip"]:
elements[i]["non_zip_file_name"] = elements[i]["file_name"][:-4]
# Añade el prefijo a la url
if elements[i]["url"].startswith("/zxdb"):
elements[i]["url"] = url_prefix["spectrum_computing"] + str(
elements[i]["url"]
)
elif elements[i]["url"].startswith("/pub"):
elements[i]["url"] = url_prefix["wos"] + str(elements[i]["url"][4:])
elif elements[i]["url"].startswith("/nvg"):
elements[i]["url"] = url_prefix["nvg"] + str(elements[i]["url"][4:])
# Devuelve el fichero que forma la parte final de una URL
def url_filename(url):
parsed_url = urlparse(url)
path = parsed_url.path
filename = os.path.basename(path)
return filename
# Descarga un fichero a partir de una URL
def download_file(url, dest):
try:
r = requests.get(url)
if r.status_code != 200:
return False
with open(dest, "wb") as f:
f.write(r.content)
return True
except requests.exceptions.Timeout:
# Maybe set up for a retry, or continue in a retry loop
print("Timeout: {}".format(url))
except requests.exceptions.TooManyRedirects:
# Tell the user their URL was bad and try a different one
print("Bad URL: {}".format(url))
except requests.exceptions.RequestException as e:
# catastrophic error. bail.
raise SystemExit(e)
# Descomprime los ficheros que coinciden con la lista de extensiones
def unzip_file(src, dst):
# with zipfile.ZipFile(src, "r") as zip_ref:
# zip_ref.extractall(dst)
archive = src
directory = dst
extensions = (".z80", ".sna", ".tzx", ".tap", "dsk", ".trd", ".Z80", ".SNA", ".TZX", ".TAP", "DSK", ".TRD")
zip_file = zipfile.ZipFile(archive, "r")
[
zip_file.extract(file, directory)
for file in zip_file.namelist()
if file.endswith(extensions)
]
zip_file.close()
# Obtiene los ficheros de la consulta desde internet o desde la caché
# y los deposita en la carpeta destino, descomprimiendo los archivos necesarios
def get_files():
# Variables para la presentación en pantalla de la descarga
current_file = 0
total_files = len(elements)
total_files_width = len(str(total_files))
last_game_folder = ""
for element in elements:
# Carpeta del juego en destino y en caché
game_folder = element["root_folder"]
destination_folder = os.path.join(destination_path, element["root_folder"])
destination_subfolder = os.path.join(destination_folder, element["subfolder"])
cache_folder = os.path.join(cache_path, element["root_folder"])
cache_subfolder = os.path.join(cache_folder, element["subfolder"])
# Ruta completa hasta el fichero de destino y de caché
destination_file = os.path.join(destination_subfolder, element["file_name"])
cache_file = os.path.join(cache_subfolder, element["file_name"])
# Actualiza las variables de presentación
current_file = current_file + 1
if game_folder != last_game_folder:
print("\n{}".format(game_folder))
last_game_folder = game_folder
#print(
# "(WORKING : {} ({})".format(
# element["file_name"],
# element["filetype"]
# )
#)
# Comprueba si ya existe el fichero a descargar
if not os.path.isfile(destination_file) and (
not os.path.isfile(
os.path.join(destination_subfolder, element["non_zip_file_name"])
)
):
# Comprueba si ya existe el fichero en la cache
if os.path.isfile(cache_file):
# Si encuentra el fichero en cache, crea las carpetas de destino y lo copia o lo extrae
if not os.path.isdir(destination_folder):
os.mkdir(destination_folder)
if not os.path.isdir(destination_subfolder):
os.mkdir(destination_subfolder)
if cache_file.endswith(".zip") and element["subfolder"] == "":
unzip_file(cache_file, destination_subfolder)
else:
shutil.copyfile(cache_file, destination_file)
print(
"({:{width}} / {}) : cached : {} ({})".format(
current_file,
total_files,
element["file_name"],
element["filetype"],
width=total_files_width,
)
)
# El fichero no está en la cache
else:
status = "not found "
if download_file(element["url"], temp_file):
status = "downloaded"
if os.path.isfile(temp_file):
# Copia el fichero temnporal a la cache
if not os.path.isdir(cache_folder):
os.mkdir(cache_folder)
if not os.path.isdir(cache_subfolder):
os.mkdir(cache_subfolder)
shutil.copyfile(temp_file, cache_file)
os.remove(temp_file)
# Copia el fichero de la cache al destino
if os.path.isfile(cache_file):
if not os.path.isdir(destination_folder):
os.mkdir(destination_folder)
if not os.path.isdir(destination_subfolder):
os.mkdir(destination_subfolder)
if (
cache_file.endswith(".zip")
and element["subfolder"] == ""
):
unzip_file(cache_file, destination_folder)
else:
shutil.copyfile(cache_file, destination_file)
print(
"({:{width}} / {}) : {} : {} ({})".format(
current_file,
total_files,
status,
element["file_name"],
element["filetype"],
width=total_files_width,
)
)
if wait:
time.sleep(random.randint(min_wait, max_wait))
# El fichero ya existe en el destino
else:
print(
"({:{width}} / {}) : skipping : {} ({})".format(
current_file,
total_files,
element["file_name"],
element["filetype"],
width=total_files_width,
)
)
# Elimina los caracteres ilegales de la cadena de texto
def normalize_path(path):
illegal_chars = ["<", ">", ":", '"', "/", "\\", "|", "?", "*"]
replace_with = ""
for char in illegal_chars:
path = unidecode(path.replace(char, replace_with))
return path
# Limpia la carpeta de destino
def clear_destination_folder():
if should_clear_destination_path:
print("Clear destination folder ...")
folder = destination_path
for filename in os.listdir(folder):
file_path = os.path.join(folder, filename)
try:
if os.path.isfile(file_path) or os.path.islink(file_path):
os.unlink(file_path)
elif os.path.isdir(file_path):
shutil.rmtree(file_path)
except Exception as e:
print('Failed to delete %s. Reason: %s' % (file_path, e))
# Imprime la lista de elementos
def print_elements(mode=0):
if mode == 0:
# Primer bucle for
for element in elements:
print('')
for key, value in element.items():
print(key, ':', value)
elif mode == 1:
# Segundo bucle for con eliminación de duplicados
seen = set()
for element in elements:
root_folder = element['root_folder']
if root_folder not in seen:
print(root_folder)
seen.add(root_folder)
# Imprimir el número de elementos únicos
print(f"Número de entradas: {len(seen)}")
# Bucle principal
def main():
connect(query_index=0)
process_elements()
print_elements(mode=1)
#clear_destination_folder()
#get_files()
if __name__ == "__main__":
main()