344 lines
12 KiB
Python
344 lines
12 KiB
Python
## Script para descargar pantallas de carga de spectrum a partir de zxdb
|
|
|
|
import os
|
|
import mysql.connector
|
|
import requests
|
|
import time
|
|
import random
|
|
import zipfile
|
|
import shutil
|
|
from mysql.connector import errorcode
|
|
from urllib.parse import urlparse
|
|
from urllib.request import urlretrieve
|
|
|
|
|
|
url_prefix = {
|
|
"spectrum_computing": r"https://spectrumcomputing.co.uk",
|
|
"wos": r"https://php.sustancia.synology.me/wos",
|
|
"nvg": r"https://php.sustancia.synology.me/nvg",
|
|
}
|
|
destination_path = r"/home/sergio/zx/zxdb/games/"
|
|
cache_path = r"/home/sergio/zx/zxdb/cache/games/"
|
|
temp_file = r"/tmp/zxdb.download.tmp"
|
|
wait = True # Establece una pausa aleatoria entre descargas
|
|
min_wait = 2 # Segundos mínimos a esperar entre descargas
|
|
max_wait = min_wait + 1 # Segundos máximos a esperar entre descargas
|
|
elements = []
|
|
filetypes_on_root = [
|
|
"Tape image",
|
|
"Disk image",
|
|
"Snapshot image",
|
|
"POK pokes file",
|
|
] # Tipos de fichero que se guardan en la carpeta raíz del juego
|
|
|
|
|
|
def select1(cursor):
|
|
query = "SELECT id, title FROM entries WHERE id BETWEEN %s AND %s"
|
|
id_start = 1950
|
|
id_end = 1980
|
|
cursor.execute(query, (id_start, id_end))
|
|
for id, title in cursor:
|
|
print("{} ({})".format(title, id))
|
|
|
|
|
|
def select2(cursor):
|
|
query = "select file_link from downloads where filetype_id=1"
|
|
cursor.execute(query)
|
|
for file_link in cursor:
|
|
elements.append(url_prefix[0] + str(file_link)[3:-3])
|
|
|
|
|
|
def select3(cursor):
|
|
query = """
|
|
SELECT DISTINCT
|
|
e.title, l.name, r.release_year, d.file_link, f.text
|
|
FROM
|
|
((((((publishers p
|
|
INNER JOIN entries e ON
|
|
p.entry_id = e.id)
|
|
INNER JOIN labels l ON
|
|
p.label_id = l.id)
|
|
INNER JOIN genretypes g ON
|
|
e.genretype_id = g.id)
|
|
INNER JOIN downloads d ON
|
|
e.id = d.entry_id)
|
|
INNER JOIN filetypes f ON
|
|
d.filetype_id = f.id)
|
|
INNER JOIN releases r ON
|
|
e.id = r.entry_id AND
|
|
p.release_seq = r.release_seq)
|
|
WHERE
|
|
(e.availabletype_id = 'A' OR e.availabletype_id = 'D') AND
|
|
(f.text <> 'Remote link' AND f.text <> '?') AND
|
|
r.release_seq = 0 AND
|
|
l.name like 'ZOSYA%' AND
|
|
(g.text like '%Game:%' AND g.text not like 'Casual%')
|
|
ORDER BY
|
|
e.title;"""
|
|
#(r.release_year >= '1986' AND r.release_year <= '1991') AND
|
|
#l.name in ('Dinamic Software', 'Aventuras AD S.A.', 'Arcadia Soft', 'Creepsoft', 'Dro Soft', 'Erbe Software S.A.', 'Iber Software', 'MCM Software S.A.', 'Made in Spain', 'New Frontier', 'Opera Soft S.A.', 'System 4', 'Topo Soft', 'Zigurat Software') AND
|
|
#(l.country_id = 'ES' AND l.labeltype_id = 'Z') AND
|
|
#l.name in ('Ocean Software Ltd', 'Imagine Software Ltd', 'Palace Software', 'Gremlin Graphics Software Ltd', 'Elite Systems Ltd', 'Melbourne House', 'Ultimate Play The Game', 'Durell Software Ltd', 'Codemasters Ltd') AND
|
|
#e.title = 'Arkanoid - Revenge of Doh' AND
|
|
cursor.execute(query)
|
|
for row in cursor:
|
|
element = dict(
|
|
title=row[0],
|
|
developer=row[1],
|
|
release_year=row[2],
|
|
url=row[3],
|
|
filetype=row[4],
|
|
)
|
|
elements.append(element)
|
|
|
|
|
|
def connect():
|
|
config = {
|
|
"user": "root",
|
|
"password": "unJEPimbJddHP8",
|
|
"host": "127.0.0.1",
|
|
"database": "zxdb",
|
|
"raise_on_warnings": True,
|
|
}
|
|
|
|
try:
|
|
connection = mysql.connector.connect(**config)
|
|
cursor = connection.cursor()
|
|
select3(cursor)
|
|
|
|
except mysql.connector.Error as err:
|
|
if err.errno == errorcode.ER_ACCESS_DENIED_ERROR:
|
|
print("Something is wrong with your user name or password")
|
|
elif err.errno == errorcode.ER_BAD_DB_ERROR:
|
|
print("Database does not exist")
|
|
else:
|
|
print(err)
|
|
|
|
finally:
|
|
if connection.is_connected():
|
|
connection.close()
|
|
cursor.close()
|
|
|
|
|
|
def process_elements():
|
|
global elements
|
|
for i in range(len(elements)):
|
|
# Construye el nombre de la carpeta raiz
|
|
elements[i]["root_folder"] = (
|
|
elements[i]["title"]
|
|
+ " ("
|
|
+ str(elements[i]["release_year"])
|
|
+ ")("
|
|
+ elements[i]["developer"]
|
|
+ ")"
|
|
)
|
|
elements[i]["root_folder"] = normalize_path(elements[i]["root_folder"])
|
|
|
|
# Obtiene el nombre del fichero a partir de la url de descarga
|
|
elements[i]["file_name"] = url_filename(elements[i]["url"])
|
|
|
|
# Establece la subcarpeta dentro de la raiz
|
|
elements[i]["subfolder"] = ""
|
|
if elements[i]["filetype"] not in filetypes_on_root:
|
|
elements[i]["subfolder"] = normalize_path(elements[i]["filetype"])
|
|
|
|
# Averigua si el fichero está en formato .zip
|
|
elements[i]["is_zip"] = elements[i]["file_name"].endswith(".zip")
|
|
|
|
# Calcula el nombre del fichero si es un zip
|
|
elements[i]["non_zip_file_name"] = elements[i]["file_name"]
|
|
if elements[i]["is_zip"]:
|
|
elements[i]["non_zip_file_name"] = elements[i]["file_name"][:-4]
|
|
|
|
# Añade el prefijo a la url
|
|
if elements[i]["url"].startswith("/zxdb"):
|
|
elements[i]["url"] = url_prefix["spectrum_computing"] + str(
|
|
elements[i]["url"]
|
|
)
|
|
elif elements[i]["url"].startswith("/pub"):
|
|
elements[i]["url"] = url_prefix["wos"] + str(elements[i]["url"][4:])
|
|
elif elements[i]["url"].startswith("/nvg"):
|
|
elements[i]["url"] = url_prefix["nvg"] + str(elements[i]["url"][4:])
|
|
|
|
|
|
def url_filename(url):
|
|
parsed_url = urlparse(url)
|
|
path = parsed_url.path
|
|
filename = os.path.basename(path)
|
|
return filename
|
|
|
|
|
|
def download_file(url, dest):
|
|
try:
|
|
r = requests.get(url)
|
|
if r.status_code != 200:
|
|
return False
|
|
with open(dest, "wb") as f:
|
|
f.write(r.content)
|
|
return True
|
|
|
|
except requests.exceptions.Timeout:
|
|
# Maybe set up for a retry, or continue in a retry loop
|
|
print("Timeout: {}".format(url))
|
|
|
|
except requests.exceptions.TooManyRedirects:
|
|
# Tell the user their URL was bad and try a different one
|
|
print("Bad URL: {}".format(url))
|
|
|
|
except requests.exceptions.RequestException as e:
|
|
# catastrophic error. bail.
|
|
raise SystemExit(e)
|
|
|
|
|
|
def unzip_file(src, dst):
|
|
# with zipfile.ZipFile(src, "r") as zip_ref:
|
|
# zip_ref.extractall(dst)
|
|
archive = src
|
|
directory = dst
|
|
extensions = (".z80", ".sna", ".tzx", ".tap", "dsk", ".trd", ".Z80", ".SNA", ".TZX", ".TAP", "DSK", ".TRD")
|
|
zip_file = zipfile.ZipFile(archive, "r")
|
|
[
|
|
zip_file.extract(file, directory)
|
|
for file in zip_file.namelist()
|
|
if file.endswith(extensions)
|
|
]
|
|
zip_file.close()
|
|
|
|
|
|
def get_files():
|
|
# Variables para la presentación en pantalla de la descarga
|
|
current_file = 0
|
|
total_files = len(elements)
|
|
total_files_width = len(str(total_files))
|
|
last_game_folder = ""
|
|
for element in elements:
|
|
# Carpeta del juego en destino y en caché
|
|
game_folder = element["root_folder"]
|
|
destination_folder = os.path.join(destination_path, element["root_folder"])
|
|
destination_subfolder = os.path.join(destination_folder, element["subfolder"])
|
|
cache_folder = os.path.join(cache_path, element["root_folder"])
|
|
cache_subfolder = os.path.join(cache_folder, element["subfolder"])
|
|
|
|
# Ruta completa hasta el fichero de destino y de caché
|
|
destination_file = os.path.join(destination_subfolder, element["file_name"])
|
|
cache_file = os.path.join(cache_subfolder, element["file_name"])
|
|
|
|
# Actualiza las variables de presentación
|
|
current_file = current_file + 1
|
|
|
|
if game_folder != last_game_folder:
|
|
print("\n{}".format(game_folder))
|
|
last_game_folder = game_folder
|
|
|
|
#print(
|
|
# "(WORKING : {} ({})".format(
|
|
# element["file_name"],
|
|
# element["filetype"]
|
|
# )
|
|
#)
|
|
|
|
# Comprueba si ya existe el fichero a descargar
|
|
if not os.path.isfile(destination_file) and (
|
|
not os.path.isfile(
|
|
os.path.join(destination_subfolder, element["non_zip_file_name"])
|
|
)
|
|
):
|
|
# Comprueba si ya existe el fichero en la cache
|
|
if os.path.isfile(cache_file):
|
|
# Si encuentra el fichero en cache, crea las carpetas de destino y lo copia o lo extrae
|
|
if not os.path.isdir(destination_folder):
|
|
os.mkdir(destination_folder)
|
|
if not os.path.isdir(destination_subfolder):
|
|
os.mkdir(destination_subfolder)
|
|
if cache_file.endswith(".zip") and element["subfolder"] == "":
|
|
unzip_file(cache_file, destination_subfolder)
|
|
else:
|
|
shutil.copyfile(cache_file, destination_file)
|
|
print(
|
|
"({:{width}} / {}) : cached : {} ({})".format(
|
|
current_file,
|
|
total_files,
|
|
element["file_name"],
|
|
element["filetype"],
|
|
width=total_files_width,
|
|
)
|
|
)
|
|
# El fichero no está en la cache
|
|
else:
|
|
status = "not found "
|
|
if download_file(element["url"], temp_file):
|
|
status = "downloaded"
|
|
if os.path.isfile(temp_file):
|
|
# Copia el fichero temnporal a la cache
|
|
if not os.path.isdir(cache_folder):
|
|
os.mkdir(cache_folder)
|
|
if not os.path.isdir(cache_subfolder):
|
|
os.mkdir(cache_subfolder)
|
|
shutil.copyfile(temp_file, cache_file)
|
|
os.remove(temp_file)
|
|
# Copia el fichero de la cache al destino
|
|
if os.path.isfile(cache_file):
|
|
if not os.path.isdir(destination_folder):
|
|
os.mkdir(destination_folder)
|
|
if not os.path.isdir(destination_subfolder):
|
|
os.mkdir(destination_subfolder)
|
|
if (
|
|
cache_file.endswith(".zip")
|
|
and element["subfolder"] == ""
|
|
):
|
|
unzip_file(cache_file, destination_folder)
|
|
else:
|
|
shutil.copyfile(cache_file, destination_file)
|
|
print(
|
|
"({:{width}} / {}) : {} : {} ({})".format(
|
|
current_file,
|
|
total_files,
|
|
status,
|
|
element["file_name"],
|
|
element["filetype"],
|
|
width=total_files_width,
|
|
)
|
|
)
|
|
if wait:
|
|
time.sleep(random.randint(min_wait, max_wait))
|
|
|
|
# El fichero ya existe en el destino
|
|
else:
|
|
print(
|
|
"({:{width}} / {}) : skipping : {} ({})".format(
|
|
current_file,
|
|
total_files,
|
|
element["file_name"],
|
|
element["filetype"],
|
|
width=total_files_width,
|
|
)
|
|
)
|
|
|
|
|
|
def normalize_path(path):
|
|
illegal_chars = ["<", ">", ":", '"', "/", "\\", "|", "?", "*"]
|
|
replace_with = "_"
|
|
for char in illegal_chars:
|
|
path = path.replace(char, replace_with)
|
|
return path
|
|
|
|
|
|
def main():
|
|
connect()
|
|
process_elements()
|
|
|
|
#for element in elements:
|
|
# print('')
|
|
# for key, value in element.items():
|
|
# print(key, ':', value)
|
|
|
|
get_files()
|
|
# for element in elements:
|
|
# print(element['title'])
|
|
|
|
# print(len(elements))
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|