Files
scripts/zxdb.py
T

330 lines
11 KiB
Python

## Script para descargar pantallas de carga de spectrum a partir de zxdb
import os
import mysql.connector
import requests
import time
import random
import zipfile
import shutil
from mysql.connector import errorcode
from urllib.parse import urlparse
from urllib.request import urlretrieve
url_prefix = {
"spectrum_computing": r"https://spectrumcomputing.co.uk",
"wos": r"https://php.sustancia.synology.me/wos",
"nvg": r"https://php.sustancia.synology.me/nvg",
}
destination_path = r"/home/sergio/zx/zxdb/games/"
cache_path = r"/home/sergio/zx/zxdb/cache/games/"
temp_file = r"/tmp/zxdb.download.tmp"
wait = False # Establece una pausa aleatoria entre descargas
min_wait = 1 # Segundos mínimos a esperar entre descargas
max_wait = min_wait + 1 # Segundos máximos a esperar entre descargas
elements = []
filetypes_on_root = [
"Tape image",
"Disk image",
"Snapshot image",
"POK pokes file",
] # Tipos de fichero que se guardan en la carpeta raíz del juego
def select1(cursor):
query = "SELECT id, title FROM entries WHERE id BETWEEN %s AND %s"
id_start = 1950
id_end = 1980
cursor.execute(query, (id_start, id_end))
for id, title in cursor:
print("{} ({})".format(title, id))
def select2(cursor):
query = "select file_link from downloads where filetype_id=1"
cursor.execute(query)
for file_link in cursor:
elements.append(url_prefix[0] + str(file_link)[3:-3])
def select3(cursor):
query = """
SELECT DISTINCT
e.title, l.name, r.release_year, d.file_link, f.text
FROM
((((((publishers p
INNER JOIN entries e ON
p.entry_id = e.id)
INNER JOIN labels l ON
p.label_id = l.id)
INNER JOIN genretypes g ON
e.genretype_id = g.id)
INNER JOIN downloads d ON
e.id = d.entry_id)
INNER JOIN filetypes f ON
d.filetype_id = f.id)
INNER JOIN releases r ON
e.id = r.entry_id AND
p.release_seq = r.release_seq)
WHERE
(e.availabletype_id = 'A' OR e.availabletype_id = 'D') AND
f.text <> 'Remote link' AND
r.release_seq = 0 AND
(g.text like '%Game:%' AND g.text not like 'Casual%')
ORDER BY
e.title;"""
#(l.country_id = 'ES' AND l.labeltype_id = 'Z') AND
cursor.execute(query)
for row in cursor:
element = dict(
title=row[0],
developer=row[1],
release_year=row[2],
url=row[3],
filetype=row[4],
)
elements.append(element)
def connect():
config = {
"user": "root",
"password": "unJEPimbJddHP8",
"host": "127.0.0.1",
"database": "zxdb",
"raise_on_warnings": True,
}
try:
connection = mysql.connector.connect(**config)
cursor = connection.cursor()
select3(cursor)
except mysql.connector.Error as err:
if err.errno == errorcode.ER_ACCESS_DENIED_ERROR:
print("Something is wrong with your user name or password")
elif err.errno == errorcode.ER_BAD_DB_ERROR:
print("Database does not exist")
else:
print(err)
finally:
if connection.is_connected():
connection.close()
cursor.close()
def process_elements():
global elements
for i in range(len(elements)):
# Construye el nombre de la carpeta raiz
elements[i]["root_folder"] = (
elements[i]["title"]
+ " ("
+ str(elements[i]["release_year"])
+ ")("
+ elements[i]["developer"]
+ ")"
)
# Obtiene el nombre del fichero a partir de la url de descarga
elements[i]["file_name"] = url_filename(elements[i]["url"])
# Establece la subcarpeta dentro de la raiz
elements[i]["subfolder"] = ""
if elements[i]["filetype"] not in filetypes_on_root:
elements[i]["subfolder"] = normalize_path(elements[i]["filetype"])
# Averigua si el fichero está en formato .zip
elements[i]["is_zip"] = elements[i]["file_name"].endswith(".zip")
# Calcula el nombre del fichero si es un zip
elements[i]["non_zip_file_name"] = elements[i]["file_name"]
if elements[i]["is_zip"]:
elements[i]["non_zip_file_name"] = elements[i]["file_name"][:-4]
# Añade el prefijo a la url
if elements[i]["url"].startswith("/zxdb"):
elements[i]["url"] = url_prefix["spectrum_computing"] + str(
elements[i]["url"]
)
elif elements[i]["url"].startswith("/pub"):
elements[i]["url"] = url_prefix["wos"] + str(elements[i]["url"][4:])
elif elements[i]["url"].startswith("/nvg"):
elements[i]["url"] = url_prefix["nvg"] + str(elements[i]["url"][4:])
def url_filename(url):
parsed_url = urlparse(url)
path = parsed_url.path
filename = os.path.basename(path)
return filename
def download_file(url, dest):
try:
r = requests.get(url)
if r.status_code != 200:
return False
with open(dest, "wb") as f:
f.write(r.content)
return True
except requests.exceptions.Timeout:
# Maybe set up for a retry, or continue in a retry loop
print("Timeout: {}".format(url))
except requests.exceptions.TooManyRedirects:
# Tell the user their URL was bad and try a different one
print("Bad URL: {}".format(url))
except requests.exceptions.RequestException as e:
# catastrophic error. bail.
raise SystemExit(e)
def unzip_file(src, dst):
# with zipfile.ZipFile(src, "r") as zip_ref:
# zip_ref.extractall(dst)
archive = src
directory = dst
extensions = (".z80", ".sna", ".tzx", ".tap", "dsk", ".trd", ".Z80", ".SNA", ".TZX", ".TAP", "DSK", ".TRD")
zip_file = zipfile.ZipFile(archive, "r")
[
zip_file.extract(file, directory)
for file in zip_file.namelist()
if file.endswith(extensions)
]
zip_file.close()
def get_files():
# Variables para la presentación en pantalla de la descarga
current_file = 0
total_files = len(elements)
total_files_width = len(str(total_files))
last_game_folder = ""
for element in elements:
# Carpeta del juego en destino y en caché
game_folder = element["root_folder"]
destination_folder = os.path.join(destination_path, element["root_folder"])
destination_subfolder = os.path.join(destination_folder, element["subfolder"])
cache_folder = os.path.join(cache_path, element["root_folder"])
cache_subfolder = os.path.join(cache_folder, element["subfolder"])
# Ruta completa hasta el fichero de destino y de caché
destination_file = os.path.join(destination_subfolder, element["file_name"])
cache_file = os.path.join(cache_subfolder, element["file_name"])
# Actualiza las variables de presentación
current_file = current_file + 1
if game_folder != last_game_folder:
print("\n{}".format(game_folder))
last_game_folder = game_folder
# Comprueba si ya existe el fichero a descargar
if not os.path.isfile(destination_file) and (
not os.path.isfile(
os.path.join(destination_subfolder, element["non_zip_file_name"])
)
):
# Comprueba si ya existe el fichero en la cache
if os.path.isfile(cache_file):
# Si encuentra el fichero en cache, crea las carpetas de destino y lo copia o lo extrae
if not os.path.isdir(destination_folder):
os.mkdir(destination_folder)
if not os.path.isdir(destination_subfolder):
os.mkdir(destination_subfolder)
if cache_file.endswith(".zip") and element["subfolder"] == "":
unzip_file(cache_file, destination_subfolder)
else:
shutil.copyfile(cache_file, destination_file)
print(
"({:{width}} / {}) : cached : {} ({})".format(
current_file,
total_files,
element["file_name"],
element["filetype"],
width=total_files_width,
)
)
# El fichero no está en la cache
else:
status = "not found "
if download_file(element["url"], temp_file):
status = "downloaded"
if os.path.isfile(temp_file):
# Copia el fichero temnporal a la cache
if not os.path.isdir(cache_folder):
os.mkdir(cache_folder)
if not os.path.isdir(cache_subfolder):
os.mkdir(cache_subfolder)
shutil.copyfile(temp_file, cache_file)
os.remove(temp_file)
# Copia el fichero de la cache al destino
if os.path.isfile(cache_file):
if not os.path.isdir(destination_folder):
os.mkdir(destination_folder)
if not os.path.isdir(destination_subfolder):
os.mkdir(destination_subfolder)
if (
cache_file.endswith(".zip")
and element["subfolder"] == ""
):
unzip_file(cache_file, destination_folder)
else:
shutil.copyfile(cache_file, destination_file)
print(
"({:{width}} / {}) : {} : {} ({})".format(
current_file,
total_files,
status,
element["file_name"],
element["filetype"],
width=total_files_width,
)
)
if wait:
time.sleep(random.randint(min_wait, max_wait))
# El fichero ya existe en el destino
else:
print(
"({:{width}} / {}) : skipping : {} ({})".format(
current_file,
total_files,
element["file_name"],
element["filetype"],
width=total_files_width,
)
)
def normalize_path(path):
illegal_chars = ["<", ">", ":", '"', "/", "\\", "|", "?", "*"]
replace_with = "_"
for char in illegal_chars:
path = path.replace(char, replace_with)
return path
def main():
connect()
process_elements()
# for element in elements:
# for key, value in element.items():
# print(key, ':', value)
get_files()
# for element in elements:
# print(element['title'])
# print(len(elements))
if __name__ == "__main__":
main()