Actualitzat zx_tosec_game_collector.py

This commit is contained in:
2024-11-14 17:01:27 +01:00
parent 84003868b9
commit 942a4c7b26
3 changed files with 162 additions and 525 deletions
-101
View File
@@ -1,101 +0,0 @@
import os
import re # regexp
import shutil
from pathlib import Path
def first_letter(x):
if len(x) == 0:
return "0-9"
if x[0] in ["0", "1", "2", "3", "4", "5", "6", "7", "8", "9"]:
return "0-9"
else:
return x[0].upper()
source_path = [Path("/home/sergio/zx/Games"), Path("/home/sergio/zx/Pokes")]
destination_path = Path("/home/sergio/tmp/final")
opt_print = "no"
opt_create_dirs = "yes"
# Elimina el directorio de destino
try:
print("Directory: {} -> deleting...".format(destination_path))
shutil.rmtree(destination_path)
print("Directory: {} -> removed successfully".format(destination_path))
except OSError as o:
print(f"Error, {o.strerror}: {destination_path}")
# Crea el directorio de destino
try:
os.mkdir(destination_path)
print("Directory: {} -> created successfully".format(destination_path))
except OSError as error:
print(error)
# Variables
paths = [] # Ruta donde se encuentra el fichero
files = [] # Nombre del fichero
names = [] # Nombre del juego
years = [] # Año del juego
companies = [] # Compañía o distribuidora del juego
# Obtiene la lista de ficheros desde los directorios de origen
for path in source_path:
for file_name in os.listdir(path):
if os.path.isfile(os.path.join(path, file_name)): # Comprueba si es un fichero
paths.append(path) # Añade la ruta
files.append(file_name) # Añade el nombre del fichero
# Extrae los datos del juego
regex_year = r"\(\d.*?\)"
regex_company = r"^\(.*?\)"
for i in files:
# Año
match = re.search(regex_year, i) # Busca el año en el nombre del fichero
if match:
years.append(match.group()) # Añade el año con los parentesis a la lista
pos = i.find(years[-1]) # Busca el caracter donde empieza el año
names.append(
i[0:pos].strip()
) # Añade como nombre el texto que hay desde el principio hasta el año
match_company = re.search(
regex_company, i[pos + len(years[-1]) :]
) # Busca la compañia en lo que queda despues del año
if match_company:
companies.append(match_company.group())
else:
companies.append("-")
# Limpia los parentesis del año y la compañía
years[-1] = years[-1][1:5]
if companies[-1] != "-":
companies[-1] = companies[-1][1:-1]
else:
years.append("0")
names.append(i)
companies.append("-")
# Lista los resultados
if opt_print == "yes":
for i, item in enumerate(files):
print(
"File: {}\nName: {}\nYear: {}\nCompany: {}\n".format(
item, names[i], years[i], companies[i]
)
)
# Copia los archivos
total_files = len(files)
if opt_create_dirs == "yes":
for i in range(total_files):
print("({} de {}) {}".format(i + 1, total_files, files[i]))
game_dir = names[i] + " (" + years[i] + ")"
dst_path = os.path.join(destination_path, first_letter(names[i]), game_dir)
if not os.path.exists(dst_path):
os.makedirs(dst_path)
src = os.path.join(paths[i], files[i])
dst = os.path.join(dst_path, files[i])
shutil.copyfile(src, dst)
+162
View File
@@ -0,0 +1,162 @@
import re
import os
import shutil
import logging
from pathlib import Path
# Configuración del logger
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
# Variables globales
source_path = [Path("/home/sergio/zx/tosec/all"), Path("/home/sergio/zx/tosec/pokes")]
destination_path = Path("/home/sergio/zx/tosec/final")
opt_print = True
opt_create_dirs = True
opt_split_modern_and_classic = True
last_classic_year = 1993
modern_folder_name = "modern"
classic_folder_name = "classics"
# Variables para almacenar datos de los archivos
paths = [] # Ruta donde se encuentra el fichero
files = [] # Nombre del fichero
names = [] # Nombre del juego
years = [] # Año del juego
companies = [] # Compañía o distribuidora del juego
def remove_destination_folder(destination_path):
try:
logging.info(f"Directorio: {destination_path} -> eliminando...")
shutil.rmtree(destination_path)
logging.info(f"Directorio: {destination_path} -> eliminado con éxito")
except OSError as o:
logging.error(f"Error, {o.strerror}: {destination_path}")
def create_destination_folder(destination_path):
try:
os.mkdir(destination_path)
logging.info(f"Directorio: {destination_path} -> creado con éxito")
except OSError as error:
logging.error(f"Error al crear el directorio: {error}")
def get_file_list(source_paths):
global paths, files
paths = []
files = []
for path in source_paths:
try:
for file_name in os.listdir(path):
if os.path.isfile(os.path.join(path, file_name)): # Comprueba si es un fichero
paths.append(path) # Añade la ruta
files.append(file_name) # Añade el nombre del fichero
logging.info(f"Fichero encontrado: {file_name} en {path}")
except OSError as error:
logging.error(f"Error al acceder al directorio {path}: {error}")
return paths, files
def extract_game_data():
global files, names, years, companies
regex_year = r"\(\d.*?\)"
regex_company = r"^\(.*?\)"
for file in files:
# Año
match = re.search(regex_year, file) # Busca el año en el nombre del fichero
if match:
years.append(match.group()) # Añade el año con los parentesis a la lista
pos = file.find(years[-1]) # Busca el carácter donde empieza el año
names.append(file[0:pos].strip()) # Añade como nombre el texto desde el principio hasta el año
match_company = re.search(regex_company, file[pos + len(years[-1]):]) # Busca la compañía en lo que queda después del año
if match_company:
companies.append(match_company.group())
else:
companies.append("-")
# Limpia los paréntesis del año y la compañía
years[-1] = years[-1][1:5]
if companies[-1] != "-":
companies[-1] = companies[-1][1:-1]
else:
years.append("0")
names.append(file)
companies.append("-")
logging.info(f"Datos extraídos -> Nombre: {names[-1]}, Año: {years[-1]}, Compañía: {companies[-1]}")
def print_results():
if opt_print:
for i, item in enumerate(files):
print(
"File: {}\nName: {}\nYear: {}\nCompany: {}\n".format(
item, names[i], years[i], companies[i]
)
)
def first_letter(name):
if not name or name[0].isdigit():
return "0-9"
return name[0].upper()
def safe_int_conversion(value):
try:
return int(value)
except ValueError:
return "unknown"
def get_modern_and_classic_folder(year):
year = safe_int_conversion(year)
if year == "unknown":
return "unknown"
if opt_split_modern_and_classic:
if year == "none" or year is None or year > last_classic_year:
return modern_folder_name
else:
return classic_folder_name
else:
return ""
def copy_files():
global files, names, years, companies, paths, destination_path, opt_create_dirs
total_files = len(files)
if opt_create_dirs:
for i in range(total_files):
logging.info("({} de {}) {}".format(i + 1, total_files, files[i]))
game_dir = f"{names[i]} ({years[i]})"
modern_and_classic_folder = get_modern_and_classic_folder(years[i])
dst_path = os.path.join(destination_path, modern_and_classic_folder, first_letter(names[i]), game_dir)
if not os.path.exists(dst_path):
os.makedirs(dst_path)
logging.info(f"Directorio creado: {dst_path}")
src = os.path.join(paths[i], files[i])
dst = os.path.join(dst_path, files[i])
shutil.copyfile(src, dst)
# logging.info(f"Archivo copiado de {src} a {dst}")
def main():
global destination_path, source_path, opt_print, opt_create_dirs, paths, files
# Eliminar la carpeta de destino si existe
remove_destination_folder(destination_path)
# Crear la carpeta de destino
create_destination_folder(destination_path)
# Obtener la lista de archivos desde los directorios de origen
paths, files = get_file_list(source_path)
# Extraer los datos del juego
extract_game_data()
# Imprimir los resultados
# print_results()
# Copiar los archivos
copy_files()
if __name__ == "__main__":
main()
-424
View File
@@ -1,424 +0,0 @@
## Script para descargar ficheros de spectrum a partir de zxdb
## Imports utilizados en el script
import os
import mysql.connector
import requests
import time
import random
import zipfile
import shutil
from mysql.connector import errorcode
from urllib.parse import urlparse
from urllib.request import urlretrieve
## Direcciones de internet de donde descargar los datos
url_prefix = {
"spectrum_computing": r"https://spectrumcomputing.co.uk",
"wos": r"https://php.sustancia.synology.me/wos",
"nvg": r"https://php.sustancia.synology.me/nvg",
}
## Rutas locales donde depositar los resultados
destination_path = r"/home/sergio/zx/zxdb/games/"
cache_path = r"/home/sergio/zx/zxdb/cache/games/"
temp_file = r"/tmp/zxdb.download.tmp"
## Parametros de configuración
should_clear_destination_path = True # Establece si se limpia primero la carpeta de destino
wait = True # Establece una pausa aleatoria entre descargas
min_wait = 2 # Segundos mínimos a esperar entre descargas
max_wait = min_wait + 1 # Segundos máximos a esperar entre descargas
elements = []
filetypes_on_root = [
"Tape image",
"Disk image",
"Snapshot image",
"POK pokes file",
] # Tipos de fichero que se guardan en la carpeta raíz del juego
def select(cursor):
query = []
selected_query = 0
## Esta consulta devuelve todos los juegos, filtrando aplicaciones, libros, etc y todos los ficheros asociados a esos juegos
## 0
select = """
SELECT DISTINCT
e.title, l.name, r.release_year, d.file_link, f.text
FROM
((((((publishers p
INNER JOIN entries e ON
p.entry_id = e.id)
INNER JOIN labels l ON
p.label_id = l.id)
INNER JOIN genretypes g ON
e.genretype_id = g.id)
INNER JOIN downloads d ON
e.id = d.entry_id)
INNER JOIN filetypes f ON
d.filetype_id = f.id)
INNER JOIN releases r ON
e.id = r.entry_id AND
p.release_seq = r.release_seq)
WHERE
(e.availabletype_id = 'A' OR e.availabletype_id = 'D') AND
(f.text <> 'Remote link' AND f.text <> '?') AND
r.release_seq = 0 AND
(g.text like '%Game:%' AND g.text not like 'Casual%')
ORDER BY
e.title;"""
query.append(select)
## Esta consulta se usa para filtrar mas la consulta anterior
## 1
select = """
SELECT DISTINCT
e.title, l.name, r.release_year, d.file_link, f.text
FROM
((((((publishers p
INNER JOIN entries e ON
p.entry_id = e.id)
INNER JOIN labels l ON
p.label_id = l.id)
INNER JOIN genretypes g ON
e.genretype_id = g.id)
INNER JOIN downloads d ON
e.id = d.entry_id)
INNER JOIN filetypes f ON
d.filetype_id = f.id)
INNER JOIN releases r ON
e.id = r.entry_id AND
p.release_seq = r.release_seq)
WHERE
(e.availabletype_id = 'A' OR e.availabletype_id = 'D') AND
(f.text <> 'Remote link' AND f.text <> '?') AND
r.release_seq = 0 AND
l.name like 'ZOSYA%' AND
(g.text like '%Game:%' AND g.text not like 'Casual%')
ORDER BY
e.title;"""
#(r.release_year >= '1986' AND r.release_year <= '1991') AND
#l.name in ('Dinamic Software', 'Aventuras AD S.A.', 'Arcadia Soft', 'Creepsoft', 'Dro Soft', 'Erbe Software S.A.', 'Iber Software', 'MCM Software S.A.', 'Made in Spain', 'New Frontier', 'Opera Soft S.A.', 'System 4', 'Topo Soft', 'Zigurat Software') AND
#(l.country_id = 'ES' AND l.labeltype_id = 'Z') AND
#l.name in ('Ocean Software Ltd', 'Imagine Software Ltd', 'Palace Software', 'Gremlin Graphics Software Ltd', 'Elite Systems Ltd', 'Melbourne House', 'Ultimate Play The Game', 'Durell Software Ltd', 'Codemasters Ltd') AND
#e.title = 'Arkanoid - Revenge of Doh' AND
query.append(select)
## Esta consulta devuelve todos los juegos, filtrando aplicaciones, libros, etc y SOLO los ficheros de cinta, disco o pokes
## 2
select = """
SELECT DISTINCT
e.title, l.name, r.release_year, d.file_link, f.text
FROM
((((((publishers p
INNER JOIN entries e ON
p.entry_id = e.id)
INNER JOIN labels l ON
p.label_id = l.id)
INNER JOIN genretypes g ON
e.genretype_id = g.id)
INNER JOIN downloads d ON
e.id = d.entry_id)
INNER JOIN filetypes f ON
d.filetype_id = f.id)
INNER JOIN releases r ON
e.id = r.entry_id AND
p.release_seq = r.release_seq)
WHERE
(e.availabletype_id = 'A' OR e.availabletype_id = 'D') AND
(f.text IN ('Tape image','Disk image','Snapshot image','POK pokes file')) AND
r.release_seq = 0 AND
(g.text like '%Game:%' AND g.text not like 'Casual%')
ORDER BY
e.title;"""
query.append(select)
cursor.execute(query[selected_query])
for row in cursor:
element = dict(
title=row[0],
developer=row[1],
release_year=row[2],
url=row[3],
filetype=row[4],
)
elements.append(element)
## Establece la conexión a la BBDD y ejecuta la consulta
def connect():
config = {
"user": "root",
"password": "unJEPimbJddHP8",
"host": "127.0.0.1",
"database": "zxdb",
"raise_on_warnings": True,
}
try:
connection = mysql.connector.connect(**config)
cursor = connection.cursor()
select(cursor)
except mysql.connector.Error as err:
if err.errno == errorcode.ER_ACCESS_DENIED_ERROR:
print("Something is wrong with your user name or password")
elif err.errno == errorcode.ER_BAD_DB_ERROR:
print("Database does not exist")
else:
print(err)
finally:
if connection.is_connected():
connection.close()
cursor.close()
## Procesa todos lo elementos, modificando cada uno de sus parametros
def process_elements():
global elements
for i in range(len(elements)):
# Construye el nombre de la carpeta raiz
elements[i]["root_folder"] = (
elements[i]["title"]
+ " ("
+ str(elements[i]["release_year"])
+ ")("
+ elements[i]["developer"]
+ ")"
)
elements[i]["root_folder"] = normalize_path(elements[i]["root_folder"])
# Obtiene el nombre del fichero a partir de la url de descarga
elements[i]["file_name"] = url_filename(elements[i]["url"])
# Establece la subcarpeta dentro de la raiz
elements[i]["subfolder"] = ""
if elements[i]["filetype"] not in filetypes_on_root:
elements[i]["subfolder"] = normalize_path(elements[i]["filetype"])
# Averigua si el fichero está en formato .zip
elements[i]["is_zip"] = elements[i]["file_name"].endswith(".zip")
# Calcula el nombre del fichero si es un zip
elements[i]["non_zip_file_name"] = elements[i]["file_name"]
if elements[i]["is_zip"]:
elements[i]["non_zip_file_name"] = elements[i]["file_name"][:-4]
# Añade el prefijo a la url
if elements[i]["url"].startswith("/zxdb"):
elements[i]["url"] = url_prefix["spectrum_computing"] + str(
elements[i]["url"]
)
elif elements[i]["url"].startswith("/pub"):
elements[i]["url"] = url_prefix["wos"] + str(elements[i]["url"][4:])
elif elements[i]["url"].startswith("/nvg"):
elements[i]["url"] = url_prefix["nvg"] + str(elements[i]["url"][4:])
## Devuelve el fichero que forma la parte final de una URL
def url_filename(url):
parsed_url = urlparse(url)
path = parsed_url.path
filename = os.path.basename(path)
return filename
## Descarga un fichero a partir de una URL
def download_file(url, dest):
try:
r = requests.get(url)
if r.status_code != 200:
return False
with open(dest, "wb") as f:
f.write(r.content)
return True
except requests.exceptions.Timeout:
# Maybe set up for a retry, or continue in a retry loop
print("Timeout: {}".format(url))
except requests.exceptions.TooManyRedirects:
# Tell the user their URL was bad and try a different one
print("Bad URL: {}".format(url))
except requests.exceptions.RequestException as e:
# catastrophic error. bail.
raise SystemExit(e)
## Descomprime los ficheros que coinciden con la lista de extensiones
def unzip_file(src, dst):
# with zipfile.ZipFile(src, "r") as zip_ref:
# zip_ref.extractall(dst)
archive = src
directory = dst
extensions = (".z80", ".sna", ".tzx", ".tap", "dsk", ".trd", ".Z80", ".SNA", ".TZX", ".TAP", "DSK", ".TRD")
zip_file = zipfile.ZipFile(archive, "r")
[
zip_file.extract(file, directory)
for file in zip_file.namelist()
if file.endswith(extensions)
]
zip_file.close()
## Obtiene los ficheros de la consulta desde internet o desde la caché
## y los deposita en la carpeta destino, descomprimiendo los archivos necesarios
def get_files():
# Variables para la presentación en pantalla de la descarga
current_file = 0
total_files = len(elements)
total_files_width = len(str(total_files))
last_game_folder = ""
for element in elements:
# Carpeta del juego en destino y en caché
game_folder = element["root_folder"]
destination_folder = os.path.join(destination_path, element["root_folder"])
destination_subfolder = os.path.join(destination_folder, element["subfolder"])
cache_folder = os.path.join(cache_path, element["root_folder"])
cache_subfolder = os.path.join(cache_folder, element["subfolder"])
# Ruta completa hasta el fichero de destino y de caché
destination_file = os.path.join(destination_subfolder, element["file_name"])
cache_file = os.path.join(cache_subfolder, element["file_name"])
# Actualiza las variables de presentación
current_file = current_file + 1
if game_folder != last_game_folder:
print("\n{}".format(game_folder))
last_game_folder = game_folder
#print(
# "(WORKING : {} ({})".format(
# element["file_name"],
# element["filetype"]
# )
#)
# Comprueba si ya existe el fichero a descargar
if not os.path.isfile(destination_file) and (
not os.path.isfile(
os.path.join(destination_subfolder, element["non_zip_file_name"])
)
):
# Comprueba si ya existe el fichero en la cache
if os.path.isfile(cache_file):
# Si encuentra el fichero en cache, crea las carpetas de destino y lo copia o lo extrae
if not os.path.isdir(destination_folder):
os.mkdir(destination_folder)
if not os.path.isdir(destination_subfolder):
os.mkdir(destination_subfolder)
if cache_file.endswith(".zip") and element["subfolder"] == "":
unzip_file(cache_file, destination_subfolder)
else:
shutil.copyfile(cache_file, destination_file)
print(
"({:{width}} / {}) : cached : {} ({})".format(
current_file,
total_files,
element["file_name"],
element["filetype"],
width=total_files_width,
)
)
# El fichero no está en la cache
else:
status = "not found "
if download_file(element["url"], temp_file):
status = "downloaded"
if os.path.isfile(temp_file):
# Copia el fichero temnporal a la cache
if not os.path.isdir(cache_folder):
os.mkdir(cache_folder)
if not os.path.isdir(cache_subfolder):
os.mkdir(cache_subfolder)
shutil.copyfile(temp_file, cache_file)
os.remove(temp_file)
# Copia el fichero de la cache al destino
if os.path.isfile(cache_file):
if not os.path.isdir(destination_folder):
os.mkdir(destination_folder)
if not os.path.isdir(destination_subfolder):
os.mkdir(destination_subfolder)
if (
cache_file.endswith(".zip")
and element["subfolder"] == ""
):
unzip_file(cache_file, destination_folder)
else:
shutil.copyfile(cache_file, destination_file)
print(
"({:{width}} / {}) : {} : {} ({})".format(
current_file,
total_files,
status,
element["file_name"],
element["filetype"],
width=total_files_width,
)
)
if wait:
time.sleep(random.randint(min_wait, max_wait))
# El fichero ya existe en el destino
else:
print(
"({:{width}} / {}) : skipping : {} ({})".format(
current_file,
total_files,
element["file_name"],
element["filetype"],
width=total_files_width,
)
)
## Elimina los caracteres ilegales de la cadena de texto
def normalize_path(path):
illegal_chars = ["<", ">", ":", '"', "/", "\\", "|", "?", "*"]
replace_with = "_"
for char in illegal_chars:
path = path.replace(char, replace_with)
return path
## Limpia la carpeta de destino
def clear_destination_folder():
if should_clear_destination_path:
print("Clear destination folder ...")
folder = destination_path
for filename in os.listdir(folder):
file_path = os.path.join(folder, filename)
try:
if os.path.isfile(file_path) or os.path.islink(file_path):
os.unlink(file_path)
elif os.path.isdir(file_path):
shutil.rmtree(file_path)
except Exception as e:
print('Failed to delete %s. Reason: %s' % (file_path, e))
## Bucle principal
def main():
connect()
process_elements()
#for element in elements:
# print('')
# for key, value in element.items():
# print(key, ':', value)
clear_destination_folder()
get_files()
# for element in elements:
# print(element['title'])
# print(len(elements))
if __name__ == "__main__":
main()