Reubicados los archivos en carpetas

This commit is contained in:
2023-10-28 13:24:05 +02:00
parent e8efb7c4d0
commit 7c7bca5667
45 changed files with 93 additions and 80 deletions
+175
View File
@@ -0,0 +1,175 @@
# Script para copiar roms a partir de un xml de fbneo
# Copia las roms por desarrollador y sin clones
# Por hacer:
# pasar por parametro si se quieren clones
import os
import shutil
import sys
import getopt
from xml.dom import minidom
# Inicialización de las opciones
opt_manufacturer = ""
opt_list = "no"
opt_dat = ""
opt_src_roms = ""
opt_dst_roms = ""
opt_copy = "no"
opt_sort = "no"
ignore_list = ["DECO Cassette"]
# Comprueba los parametros
try:
opts, args = getopt.getopt(
sys.argv[1:],
"hlm:d:ci:o:s",
["help", "list", "manufacturer=", "dat=", "copy", "input=", "output=", "sort"],
)
except getopt.GetoptError:
print("test.py -i <inputfile> -o <outputfile>")
sys.exit(2)
for opt, arg in opts:
if opt in ("-h", "--help"):
print("test.py -i <inputfile> -o <outputfile>")
sys.exit()
elif opt in ("-l", "--list"):
opt_list = "yes"
elif opt in ("-m", "--manufacturer"):
opt_manufacturer = arg
elif opt in ("-d", "--dat"):
opt_dat = arg
elif opt in ("-c", "--copy"):
opt_copy = "yes"
elif opt in ("-i", "--input"):
opt_src_roms = arg
elif opt in ("-o", "--output"):
opt_dst_roms = arg
elif opt in ("-s", "--sort"):
opt_sort = "yes"
# Importa el xml
if opt_dat == "" or not os.path.isfile(opt_dat):
sys.exit(2)
print("Parsing {} file".format(opt_dat))
file = minidom.parse(opt_dat)
# Lista los desarrolladores
if opt_list == "yes" and opt_manufacturer == "":
print("List of all manufacturers:")
manufacturers = []
games = file.getElementsByTagName("game")
for game in games:
manufacturer = game.getElementsByTagName("manufacturer")[0]
manufacturers.append(manufacturer.firstChild.data)
# Elimina los duplicados
manufacturers = list(dict.fromkeys(manufacturers))
# Ordena la lista
manufacturers.sort()
# Imprime la lista
for i in manufacturers:
print(i)
# Lista los juegos de un desarrollador
if opt_list == "yes" and opt_manufacturer != "":
print("List of all {} games".format(opt_manufacturer))
games = file.getElementsByTagName("game")
for game in games:
cloneof = game.getAttribute("cloneof")
isbios = game.getAttribute("isbios")
manufacturer = game.getElementsByTagName("manufacturer")[0]
description = game.getElementsByTagName("description")[0]
if (
manufacturer.firstChild.data == opt_manufacturer
and not cloneof
and not isbios
):
print("%s" % (description.firstChild.data))
# Copia los juegos seleccionados
if (
opt_copy == "yes"
and os.path.isdir(opt_src_roms)
and os.path.isdir(opt_dst_roms)
and opt_manufacturer != ""
):
print("Copying all {} games".format(opt_manufacturer))
notfound = []
ignored_games = []
games = file.getElementsByTagName("game")
for game in games:
isignored = False
name = game.getAttribute("name") + ".zip"
cloneof = game.getAttribute("cloneof")
isbios = game.getAttribute("isbios")
manufacturer = game.getElementsByTagName("manufacturer")[0]
description = game.getElementsByTagName("description")[0]
if (
manufacturer.firstChild.data == opt_manufacturer
and not cloneof
and not isbios
):
for element in ignore_list:
if description.firstChild.data.find(element) != -1:
ignored_games.append(description.firstChild.data)
isignored = True
if not isignored:
src = os.path.join(opt_src_roms, name)
if opt_sort == "yes":
x = manufacturer.firstChild.data
x = x.replace(r"/", r"-")
dst = os.path.join(opt_dst_roms, x, name)
if not os.path.isdir(os.path.join(opt_dst_roms, x)):
os.mkdir(os.path.join(opt_dst_roms, x))
else:
dst = os.path.join(opt_dst_roms, name)
if os.path.isfile(src):
shutil.copyfile(src, dst)
print("%s" % (description.firstChild.data))
else:
notfound.append(description.firstChild.data)
print("\nMissing games:")
for game in notfound:
print(game)
print("\nIgnored games:")
for game in ignored_games:
print(game)
# Copia todos los juegos
if (
opt_copy == "yes"
and os.path.isdir(opt_src_roms)
and os.path.isdir(opt_dst_roms)
and opt_manufacturer == ""
):
print("Copying all games")
notfound = []
games = file.getElementsByTagName("game")
for game in games:
name = game.getAttribute("name") + ".zip"
cloneof = game.getAttribute("cloneof")
isbios = game.getAttribute("isbios")
manufacturer = game.getElementsByTagName("manufacturer")[0]
description = game.getElementsByTagName("description")[0]
if not cloneof and not isbios:
src = os.path.join(opt_src_roms, name)
if opt_sort == "yes":
x = manufacturer.firstChild.data
x = x.replace(r"/", r"-")
dst = os.path.join(opt_dst_roms, x, name)
if not os.path.isdir(os.path.join(opt_dst_roms, x)):
os.mkdir(os.path.join(opt_dst_roms, x))
else:
dst = os.path.join(opt_dst_roms, name)
if os.path.isfile(src):
shutil.copyfile(src, dst)
print("%s" % (description.firstChild.data))
else:
notfound.append(description.firstChild.data)
print("\nMissing games:")
for game in notfound:
print(game)
+392
View File
@@ -0,0 +1,392 @@
import requests
import urllib3
import sys
import time
import threading
import logging
import os
import errno
from concurrent.futures import ThreadPoolExecutor
from dotenv import load_dotenv
from vmware.vapi.vsphere.client import create_vsphere_client
from com.vmware.vcenter_client import VM
from com.vmware.vcenter.vm_client import Power as PowerHardware
from com.vmware.vcenter.vm.guest_client import Power as PowerGuest
# Variables
wait_power_on = 60 # Tiempo de espera entre grupos de máquinas
wait_power_off = 60 # Tiempo de espera entre grupos de máquinas
wait_tools_on = 120 # Tiempo de espera para que las tools esten activas
wait_tools_off = 120 # Tiempo de espera para que las tools esten apagadas
timeout_on = 1200 # Tiempo de espera para que se de la máquina por encendida
timeout_off = 1200 # Tiempo de espera hasta forzar el apagado de la maquina
operation = "none" # Modo de operación: encendido o apagado
# Grupo de máquinas sobre las que se aplicará el modo de energía (NOMBRE)
group_selected = "none"
vms = "none" # Máquinas que se van a utilizar en la ejecución del script
# TEST
vms_test_g1 = ["test1", "test2"]
vms_test_g2 = ["test3", "test4", "test5"]
vms_test_g3 = ["test6", "test7", "test8", "test9", "test10"]
# TEST2
vms_test_g4 = ["test1", "test2", "test3", "test4",
"test5", "test6", "test7", "test8", "test9", "test10"]
# PRODUCCIÓN
vm_sin_grupo = ["alarmas_ayto", "alarmas_cpd", "cita_previa", "daloradius_biblioteca", "deep_freeze2", "editrans2", "edu_latorre", "gestion_switches", "impresoras_ricoh", "inattend2", "lucia", "micollab-mbg", "micollab9.4", "museo_domus", "ofimatico2007", "ofimatico2019",
"pulse_secure", "tarificador", "tc_aplicaciones", "tc_balanceador", "tc_bbdd", "tc_wserver", "untangle_biblioteca", "web_pre", "webdmz", "wifimovi", "tao_consultores1", "tao_consultores2", "tao_consultores3", "temporizador_pleno", "wazuh4.3.6"]
vm_apagadas = ["adnperros", "autocad2010", "control_biblio", "gisabsis", "glpi", "gwdmz15", "helpdesk",
"inventario_igs", "oes2018_pruebas", "open_erp", "pacmer", "presencia", "severino", "verticales_telefonica_camaras"]
vm_grupo_0 = ["datacore-cpd", "datacore-pol", "vcsa7"]
vm_grupo_1 = ["sacrista", "paco", "synology", "hp1440"]
vm_grupo_2 = ["torero", "oraserver19", "oraserver19_pre",
"ndsmaster", "storeonce1", "storeonce2"]
vm_grupo_3 = ["ayto_web_int", "documentum2016", "geoserver_lan", "peseta2",
"taoactuate", "abaco", "intranet2016", "intranet2_pre", "petrolio2"]
vm_grupo_4 = ["ayto_web_ext2", "geoserver_pre",
"tao1", "tao1_pre", "tirisiti2"]
vm_grupo_5 = ["clasico", "gw01-19", "gw02-19", "geoserver_pro", "otrs", "taoapps1_wildfly", "taoapps2_wildfly", "taoapps3_wildfly", "taoapps_pre_wildfly",
"tereseta2", "fortianalyzer", "fortimail", "veeam", "taobalanceador_wildfly", "taosede_9", "taosede_9_pre", "filr4"]
vm_grupo_6 = ["datasync_gms15", "gwdmz"]
vm_grupo_7 = ["centreon2", "zabbix"]
# ORACLE
vm_oracle_0 = ["oraserver19", "oraserver19_pre"]
vm_oracle_1 = ["ayto_web_int", "documentum2016", "geoserver_lan", "peseta2",
"taoactuate", "abaco", "intranet2016", "intranet2_pre"]
vm_oracle_2 = ["ayto_web_ext2", "geoserver_pre", "tao1", "tao1_pre"]
vm_oracle_3 = ["geoserver_pro", "taoapps1_wildfly", "taoapps2_wildfly", "taoapps3_wildfly",
"taoapps_pre_wildfly", "taobalanceador_wildfly", "taosede_9", "taosede_9_pre"]
# GEOSERVERS
g1 = ["geoserver_lan"]
g2 = ["geoserver_pre"]
g3 = ["geoserver_pro"]
# Orden de encendido de las máquinas de test
vms_test1 = [vms_test_g1, vms_test_g2, vms_test_g3]
vms_test2 = [vms_test_g4]
vms_oracle = [vm_oracle_0, vm_oracle_1, vm_oracle_2, vm_oracle_3]
vms_produccion = [vm_grupo_2, vm_grupo_3, vm_grupo_4,
vm_grupo_5, vm_grupo_6, vm_sin_grupo, vm_grupo_7]
vms_geo = [g1, g2, g3]
# Listados con los diferentes grupos de máquinas
groups = [vms_test1, vms_test2, vms_oracle, vms_produccion, vms_geo]
group_names = ["TEST1", "TEST2", "ORACLE", "PRODUCCION", "GEOSERVERS"]
def get_vm(client, vm_name):
# Return the identifier of a vm
# Note: The method assumes that there is only one vm with the mentioned name.
names = set([vm_name])
vms = client.vcenter.VM.list(VM.FilterSpec(names=names))
if len(vms) == 0:
return None
vm = vms[0].vm
return vm
def power_off_vm(client, vm, name):
# Si la maquina existe
if vm != None:
# Obtiene el estado (encendida o apagada)
status = client.vcenter.vm.Power.get(vm)
timeout_shutdown = time.time() + timeout_off
# Repite hasta que se apague o se acabe el tiempo
while status.state == PowerHardware.State.POWERED_ON:
status = client.vcenter.vm.Power.get(vm)
guest_status = client.vcenter.vm.guest.Power.get(vm).state
# El sistema operativo está funcionando
if guest_status == PowerGuest.State.RUNNING:
tools = client.vcenter.vm.guest.Power.get(vm).operations_ready
# Las tools estan operativas
if tools == True:
logging.info("%s: Enviada señal de apagado", name)
client.vcenter.vm.guest.Power.shutdown(vm)
# Espera hasta que las tools ya no esten disponibles o salte el time out
timeout_tools = time.time() + wait_tools_off
while tools == True:
tools = client.vcenter.vm.guest.Power.get(
vm).operations_ready
time.sleep(1)
if time.time() > timeout_tools:
logging.info("%s: Timeout tools", name)
break
logging.info("%s: Apagandose ...", name)
time.sleep(5)
if time.time() > timeout_shutdown:
logging.info("%s: Forzando apagado ...", name)
client.vcenter.vm.power.stop(vm)
break
logging.info("%s: Apagada", name)
return True
# Si la máquina no existe
else:
logging.info("%s: No existe", name)
return False
def power_on_vm(client, vm, name):
# Si la maquina existe
if vm != None:
# Obtiene el estado (encendida o apagada)
status = client.vcenter.vm.Power.get(vm)
# Repite hasta que se encienda
while status.state == PowerHardware.State.POWERED_OFF:
client.vcenter.vm.Power.start(vm)
logging.info("%s: Enviada señal de encendido", name)
# Espera hasta que las tools esten disponibles o salte el time out
timeout_tools = time.time() + wait_tools_on
tools = client.vcenter.vm.guest.Power.get(vm).operations_ready
while tools == False:
time.sleep(5)
tools = client.vcenter.vm.guest.Power.get(vm).operations_ready
logging.info("%s: Encendiendose", name)
if time.time() > timeout_tools:
logging.info("%s: Timeout tools", name)
break
# Comprueba el estado
status = client.vcenter.vm.Power.get(vm)
# La máquina ya está encendida
logging.info("%s: Encendida", name)
return True
# Si la máquina no existe
else:
logging.warning("%s: no existe", name)
return False
def printHelp(prog):
print("\nFUNCIONAMIENTO")
print("\tpython3 " + prog + " OPCIONES\n")
print("OPCIONES")
print("\t -h, --help:")
print("\t\tMuestra esta ayuda\n")
print("\t -l, --list:")
print("\t\tLista los posibles grupos de maquinas virtuales para operar\n")
print("\t -l, --list NOMBRE_GRUPO:")
print("\t\tLista los nombres de las máquinas virtuales que pertenecen al grupo NOMBRE_GRUPO\n")
print("\t -g, --group NOMBRE_GRUPO:")
print("\t\tEstablece el grupo de maquinas virtuales sobre el cual se aplicará el modo de energía\n")
print("\t -p, --power on|off:")
print("\t\tIndica el modo de operación de energía que se aplicará a las máquians virtuales")
print("\t\t--power on: Enciende las máquinas virtuales del grupo seleccionado")
print("\t\t--power off: Apaga las máquinas virtuales del grupo seleccionado\n")
print("EJEMPLO")
print("\tpython3 " + prog + " --power on --group ORACLE\n")
def checkArgs(args):
global operation
operation = "none"
global group_selected
group_selected = "none"
global vms
vms = "none"
for index in range(0, len(args)):
# POWER
if args[index] == "--power" or args[index] == "-p":
if index < len(args)-1:
if args[index+1] == "on":
operation = "on"
elif args[index+1] == "off":
operation = "off"
# LIST
if args[index] == "--list" or args[index] == "-l":
operation = "none"
# Comprueba que el argumento list es el último o no contiene un nombre de grupo válido detras
if index == len(args)-1 or (index <= len(args)-1 and args[index+1] not in group_names):
print("Los grupos de máquinas disponibles son los siguientes:")
for group in group_names:
print("- "+group)
# Comprueba que el argumento posterior pertenece a un grupo
if index < len(args)-1:
if args[index+1] in group_names:
num = group_names.index(args[index+1])
num_group = 1
for group in groups[num]:
print("Grupo", num_group)
num_group += 1
for mv in group:
print("\t" + mv)
# GROUP
if args[index] == "--group" or args[index] == "-g":
group_selected = "none"
# Comprueba que el argumento posterior pertenece a un grupo
if index < len(args)-1:
if args[index+1] in group_names:
group_selected = args[index+1]
num = group_names.index(group_selected)
vms = groups[num]
# HELP
if args[index] == "--help" or args[index] == "-h":
operation = "none"
printHelp(args[0])
if len(args) == 1:
operation = "none"
printHelp(args[0])
def main():
# Comprueba los argumentos
checkArgs(sys.argv)
print("operation", operation)
vms = vms_test1
print("vms", vms)
# ~Si no tiene los parametros necesarios, sale del programa
if (operation == "none" or vms == "none"):
sys.exit()
# Crea el directorio para los logs
try:
os.mkdir('log')
except OSError as e:
if e.errno != errno.EEXIST:
raise
# Obtiene la fecha y hora de inicio
ini_timestr = time.strftime("%Y%m%d-%H%M%S")
# Inicia el log
logging.basicConfig(level=logging.INFO,
format='%(asctime)s %(name)-12s %(levelname)-8s %(message)s',
encoding='utf-8',
datefmt='%m-%d %H:%M',
filename='log/' + ini_timestr + '.log',
filemode='w')
# Define a Handler which writes INFO messages or higher to the sys.stderr
console = logging.StreamHandler()
console.setLevel(logging.INFO)
# Set a format which is simpler for console use
formatter = logging.Formatter('%(levelname)-8s>> %(message)s')
# Tell the handler to use this format
console.setFormatter(formatter)
# Add the handler to the root logger
logging.getLogger().addHandler(console)
# logging.info("Empieza el programa\n")
session = requests.session()
# Disable cert verification for demo purpose.
# This is not recommended in a production environment.
session.verify = False
# Disable the secure connection warning for demo purpose.
# This is not recommended in a production environment.
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
# Obtiene la información desde el fichero .env
load_dotenv()
vcenter_server = os.getenv('VCENTER_SERVER')
vcenter_username = os.getenv('VCENTER_USERNAME')
vcenter_password = os.getenv('VCENTER_PASSWORD')
print("vcenter_server", vcenter_server)
# Connect to a vCenter Server using username and password
client = create_vsphere_client(
server=vcenter_server, username=vcenter_username, password=vcenter_password, session=session)
# Bucle principal
main_loop(client, operation)
# logging.info("Acaba el programa")
def main_loop(client, operation):
# Enciende máquinas
if operation == "on":
logging.info("Modo de operación: ENCENDER MÁQUINAS\n")
group = 0
while group < len(vms):
logging.info("Grupo %s: PROCESANDO", group)
# Create the thread pool
n_threads = len(vms[group])
with ThreadPoolExecutor(n_threads) as executor:
_ = [executor.submit(power_on_vm, client, get_vm(
client, vm_name), vm_name) for vm_name in vms[group]]
# Terminado el grupo de máquinas
logging.info("Grupo %s: TERMINADO\n", group)
# Resumen
logging.info("Grupo %s: RESUMEN", group)
num_mv = 0
for vm_name in vms[group]:
vm = get_vm(client, vm_name)
if power_on_vm(client, vm, vm_name):
num_mv += 1
logging.info("%s de %s máquinas encendidas\n",
num_mv, len(vms[group]))
group += 1
if group < len(vms):
logging.info("Esperando %s segundos ...\n", wait_power_on)
time.sleep(wait_power_on)
# Apaga máquinas
if operation == "off":
logging.info("Modo de operación: APAGAR MÁQUINAS\n")
group = len(vms) - 1
while group >= 0:
logging.info("Grupo %s: PROCESANDO", group)
# Create the thread pool
n_threads = len(vms[group])
with ThreadPoolExecutor(n_threads) as executor:
_ = [executor.submit(power_off_vm, client, get_vm(
client, vm_name), vm_name) for vm_name in vms[group]]
# Terminado el grupo de máquinas
logging.info("Grupo %s: TERMINADO\n", group)
# Resumen
logging.info("Grupo %s: RESUMEN", group)
num_mv = 0
for vm_name in vms[group]:
vm = get_vm(client, vm_name)
if power_off_vm(client, vm, vm_name):
num_mv += 1
logging.info("%s de %s máquinas apagadas\n",
num_mv, len(vms[group]))
group -= 1
if group >= 0:
logging.info("Esperando %s segundos ...\n", wait_power_off)
time.sleep(wait_power_off)
if __name__ == "__main__":
main()
+101
View File
@@ -0,0 +1,101 @@
import os
import re # regexp
import shutil
from pathlib import Path
def first_letter(x):
if len(x) == 0:
return "0-9"
if x[0] in ["0", "1", "2", "3", "4", "5", "6", "7", "8", "9"]:
return "0-9"
else:
return x[0].upper()
source_path = [Path("/home/sergio/zx/Games"), Path("/home/sergio/zx/Pokes")]
destination_path = Path("/home/sergio/tmp/final")
opt_print = "no"
opt_create_dirs = "yes"
# Elimina el directorio de destino
try:
print("Directory: {} -> deleting...".format(destination_path))
shutil.rmtree(destination_path)
print("Directory: {} -> removed successfully".format(destination_path))
except OSError as o:
print(f"Error, {o.strerror}: {destination_path}")
# Crea el directorio de destino
try:
os.mkdir(destination_path)
print("Directory: {} -> created successfully".format(destination_path))
except OSError as error:
print(error)
# Variables
paths = [] # Ruta donde se encuentra el fichero
files = [] # Nombre del fichero
names = [] # Nombre del juego
years = [] # Año del juego
companies = [] # Compañía o distribuidora del juego
# Obtiene la lista de ficheros desde los directorios de origen
for path in source_path:
for file_name in os.listdir(path):
if os.path.isfile(os.path.join(path, file_name)): # Comprueba si es un fichero
paths.append(path) # Añade la ruta
files.append(file_name) # Añade el nombre del fichero
# Extrae los datos del juego
regex_year = r"\(\d.*?\)"
regex_company = r"^\(.*?\)"
for i in files:
# Año
match = re.search(regex_year, i) # Busca el año en el nombre del fichero
if match:
years.append(match.group()) # Añade el año con los parentesis a la lista
pos = i.find(years[-1]) # Busca el caracter donde empieza el año
names.append(
i[0:pos].strip()
) # Añade como nombre el texto que hay desde el principio hasta el año
match_company = re.search(
regex_company, i[pos + len(years[-1]) :]
) # Busca la compañia en lo que queda despues del año
if match_company:
companies.append(match_company.group())
else:
companies.append("-")
# Limpia los parentesis del año y la compañía
years[-1] = years[-1][1:5]
if companies[-1] != "-":
companies[-1] = companies[-1][1:-1]
else:
years.append("0")
names.append(i)
companies.append("-")
# Lista los resultados
if opt_print == "yes":
for i, item in enumerate(files):
print(
"File: {}\nName: {}\nYear: {}\nCompany: {}\n".format(
item, names[i], years[i], companies[i]
)
)
# Copia los archivos
total_files = len(files)
if opt_create_dirs == "yes":
for i in range(total_files):
print("({} de {}) {}".format(i + 1, total_files, files[i]))
game_dir = names[i] + " (" + years[i] + ")"
dst_path = os.path.join(destination_path, first_letter(names[i]), game_dir)
if not os.path.exists(dst_path):
os.makedirs(dst_path)
src = os.path.join(paths[i], files[i])
dst = os.path.join(dst_path, files[i])
shutil.copyfile(src, dst)
+135
View File
@@ -0,0 +1,135 @@
import json
import requests
import os
import time
import random
import shutil
from urllib.parse import urlparse
# Variables para configurar el modo de funcionamiento del programa
json_file = r"/home/sergio/zx/zxart/picture.json"
destination_path = r"/home/sergio/zx/zxart/pictures/"
cache_path = r"/home/sergio/zx/zxart/cache/pictures/"
wait = False # Establece una pausa aleatoria entre descargas
min_wait = 1 # Segundos mínimos a esperar entre descargas
max_wait = 3 # Segundos máximos a esperar entre descargas
tags = [
"Loading Screen",
"Game",
] # Tags de las imagenes seleccionadas. Vacío para todas
# Obtiene la lista de direcciones desde un fichero json
def get_urls():
urls = []
# Abre el fichero json y lo importa en un diccionario
f = open(json_file)
data = json.load(f)
# Procesa el diccionario para obtener la lista de direcciones
# Se distingue el caso de descargar todas las imagenes o solo las que tienen ciertas etiquetas
if len(tags) > 0:
for i in data["zxPicture"]:
if "tags" in i:
for tag in tags:
if tag in i["tags"]:
if "originalUrl" in i:
if i["originalUrl"][-3:] == "scr":
urls.append(i["originalUrl"])
else:
for i in data["zxPicture"]:
if "originalUrl" in i:
if i["originalUrl"][-3:] == "scr":
urls.append(i["originalUrl"])
# Elimina los direcciones duplicadas
urls = list(dict.fromkeys(urls))
# Ordena la lista de direcciones
urls.sort()
# Cierra el fichero
f.close()
# Devuelve el resultado
return urls
# Obtiene el nombre del fichero a partir de una url completa
def url_filename(url):
pos = url.rfind("/") + 1
filename = url[pos:]
filename = filename.replace("filename:", "pic_")
return filename
# Descarga un fichero desde una url a un destino específico
def download_file(url, dest):
try:
r = requests.get(url)
with open(dest, "wb") as f:
f.write(r.content)
except requests.exceptions.Timeout:
# Maybe set up for a retry, or continue in a retry loop
print("Timeout: {}".format(url))
except requests.exceptions.TooManyRedirects:
# Tell the user their URL was bad and try a different one
print("Bad URL: {}".format(url))
except requests.exceptions.RequestException as e:
# catastrophic error. bail.
raise SystemExit(e)
# Descarga los ficheros a partir de una lista de direcciones
def get_files(urls):
count = 0
total = len(urls)
for url in urls:
count = count + 1
downloaded_file = url_filename(url)
destination_file = os.path.join(destination_path, downloaded_file)
cache_file = os.path.join(cache_path, downloaded_file)
# Comprueba si el fichero existe en el destino
if not os.path.isfile(destination_file):
# Si no existe, comprueba si existe en la caché
if os.path.isfile(cache_file):
shutil.copyfile(cache_file, destination_file)
print(
"cached : {:{width}} ({} / {})".format(
downloaded_file, count, total, width=50
)
)
# Si no está en la caché, lo descarga a la caché
else:
download_file(url, cache_file)
# Si la ha descargado a la caché, la copia al destino
if os.path.isfile(cache_file):
shutil.copyfile(cache_file, destination_file)
# download_file(url, destination_file)
print(
"downloaded : {:{width}} ({} / {})".format(
downloaded_file, count, total, width=50
)
)
if wait:
time.sleep(random.randint(min_wait, max_wait))
# Si el fichero ya existe, no hace nada
else:
print(
"skipping : {:{width}} ({} / {})".format(
downloaded_file, count, total, width=50
)
)
def main():
urls = get_urls()
get_files(urls)
if __name__ == "__main__":
main()
+424
View File
@@ -0,0 +1,424 @@
## Script para descargar ficheros de spectrum a partir de zxdb
## Imports utilizados en el script
import os
import mysql.connector
import requests
import time
import random
import zipfile
import shutil
from mysql.connector import errorcode
from urllib.parse import urlparse
from urllib.request import urlretrieve
## Direcciones de internet de donde descargar los datos
url_prefix = {
"spectrum_computing": r"https://spectrumcomputing.co.uk",
"wos": r"https://php.sustancia.synology.me/wos",
"nvg": r"https://php.sustancia.synology.me/nvg",
}
## Rutas locales donde depositar los resultados
destination_path = r"/home/sergio/zx/zxdb/games/"
cache_path = r"/home/sergio/zx/zxdb/cache/games/"
temp_file = r"/tmp/zxdb.download.tmp"
## Parametros de configuración
should_clear_destination_path = True # Establece si se limpia primero la carpeta de destino
wait = True # Establece una pausa aleatoria entre descargas
min_wait = 2 # Segundos mínimos a esperar entre descargas
max_wait = min_wait + 1 # Segundos máximos a esperar entre descargas
elements = []
filetypes_on_root = [
"Tape image",
"Disk image",
"Snapshot image",
"POK pokes file",
] # Tipos de fichero que se guardan en la carpeta raíz del juego
def select(cursor):
query = []
selected_query = 0
## Esta consulta devuelve todos los juegos, filtrando aplicaciones, libros, etc y todos los ficheros asociados a esos juegos
## 0
select = """
SELECT DISTINCT
e.title, l.name, r.release_year, d.file_link, f.text
FROM
((((((publishers p
INNER JOIN entries e ON
p.entry_id = e.id)
INNER JOIN labels l ON
p.label_id = l.id)
INNER JOIN genretypes g ON
e.genretype_id = g.id)
INNER JOIN downloads d ON
e.id = d.entry_id)
INNER JOIN filetypes f ON
d.filetype_id = f.id)
INNER JOIN releases r ON
e.id = r.entry_id AND
p.release_seq = r.release_seq)
WHERE
(e.availabletype_id = 'A' OR e.availabletype_id = 'D') AND
(f.text <> 'Remote link' AND f.text <> '?') AND
r.release_seq = 0 AND
(g.text like '%Game:%' AND g.text not like 'Casual%')
ORDER BY
e.title;"""
query.append(select)
## Esta consulta se usa para filtrar mas la consulta anterior
## 1
select = """
SELECT DISTINCT
e.title, l.name, r.release_year, d.file_link, f.text
FROM
((((((publishers p
INNER JOIN entries e ON
p.entry_id = e.id)
INNER JOIN labels l ON
p.label_id = l.id)
INNER JOIN genretypes g ON
e.genretype_id = g.id)
INNER JOIN downloads d ON
e.id = d.entry_id)
INNER JOIN filetypes f ON
d.filetype_id = f.id)
INNER JOIN releases r ON
e.id = r.entry_id AND
p.release_seq = r.release_seq)
WHERE
(e.availabletype_id = 'A' OR e.availabletype_id = 'D') AND
(f.text <> 'Remote link' AND f.text <> '?') AND
r.release_seq = 0 AND
l.name like 'ZOSYA%' AND
(g.text like '%Game:%' AND g.text not like 'Casual%')
ORDER BY
e.title;"""
#(r.release_year >= '1986' AND r.release_year <= '1991') AND
#l.name in ('Dinamic Software', 'Aventuras AD S.A.', 'Arcadia Soft', 'Creepsoft', 'Dro Soft', 'Erbe Software S.A.', 'Iber Software', 'MCM Software S.A.', 'Made in Spain', 'New Frontier', 'Opera Soft S.A.', 'System 4', 'Topo Soft', 'Zigurat Software') AND
#(l.country_id = 'ES' AND l.labeltype_id = 'Z') AND
#l.name in ('Ocean Software Ltd', 'Imagine Software Ltd', 'Palace Software', 'Gremlin Graphics Software Ltd', 'Elite Systems Ltd', 'Melbourne House', 'Ultimate Play The Game', 'Durell Software Ltd', 'Codemasters Ltd') AND
#e.title = 'Arkanoid - Revenge of Doh' AND
query.append(select)
## Esta consulta devuelve todos los juegos, filtrando aplicaciones, libros, etc y SOLO los ficheros de cinta, disco o pokes
## 2
select = """
SELECT DISTINCT
e.title, l.name, r.release_year, d.file_link, f.text
FROM
((((((publishers p
INNER JOIN entries e ON
p.entry_id = e.id)
INNER JOIN labels l ON
p.label_id = l.id)
INNER JOIN genretypes g ON
e.genretype_id = g.id)
INNER JOIN downloads d ON
e.id = d.entry_id)
INNER JOIN filetypes f ON
d.filetype_id = f.id)
INNER JOIN releases r ON
e.id = r.entry_id AND
p.release_seq = r.release_seq)
WHERE
(e.availabletype_id = 'A' OR e.availabletype_id = 'D') AND
(f.text IN ('Tape image','Disk image','Snapshot image','POK pokes file')) AND
r.release_seq = 0 AND
(g.text like '%Game:%' AND g.text not like 'Casual%')
ORDER BY
e.title;"""
query.append(select)
cursor.execute(query[selected_query])
for row in cursor:
element = dict(
title=row[0],
developer=row[1],
release_year=row[2],
url=row[3],
filetype=row[4],
)
elements.append(element)
## Establece la conexión a la BBDD y ejecuta la consulta
def connect():
config = {
"user": "root",
"password": "unJEPimbJddHP8",
"host": "127.0.0.1",
"database": "zxdb",
"raise_on_warnings": True,
}
try:
connection = mysql.connector.connect(**config)
cursor = connection.cursor()
select(cursor)
except mysql.connector.Error as err:
if err.errno == errorcode.ER_ACCESS_DENIED_ERROR:
print("Something is wrong with your user name or password")
elif err.errno == errorcode.ER_BAD_DB_ERROR:
print("Database does not exist")
else:
print(err)
finally:
if connection.is_connected():
connection.close()
cursor.close()
## Procesa todos lo elementos, modificando cada uno de sus parametros
def process_elements():
global elements
for i in range(len(elements)):
# Construye el nombre de la carpeta raiz
elements[i]["root_folder"] = (
elements[i]["title"]
+ " ("
+ str(elements[i]["release_year"])
+ ")("
+ elements[i]["developer"]
+ ")"
)
elements[i]["root_folder"] = normalize_path(elements[i]["root_folder"])
# Obtiene el nombre del fichero a partir de la url de descarga
elements[i]["file_name"] = url_filename(elements[i]["url"])
# Establece la subcarpeta dentro de la raiz
elements[i]["subfolder"] = ""
if elements[i]["filetype"] not in filetypes_on_root:
elements[i]["subfolder"] = normalize_path(elements[i]["filetype"])
# Averigua si el fichero está en formato .zip
elements[i]["is_zip"] = elements[i]["file_name"].endswith(".zip")
# Calcula el nombre del fichero si es un zip
elements[i]["non_zip_file_name"] = elements[i]["file_name"]
if elements[i]["is_zip"]:
elements[i]["non_zip_file_name"] = elements[i]["file_name"][:-4]
# Añade el prefijo a la url
if elements[i]["url"].startswith("/zxdb"):
elements[i]["url"] = url_prefix["spectrum_computing"] + str(
elements[i]["url"]
)
elif elements[i]["url"].startswith("/pub"):
elements[i]["url"] = url_prefix["wos"] + str(elements[i]["url"][4:])
elif elements[i]["url"].startswith("/nvg"):
elements[i]["url"] = url_prefix["nvg"] + str(elements[i]["url"][4:])
## Devuelve el fichero que forma la parte final de una URL
def url_filename(url):
parsed_url = urlparse(url)
path = parsed_url.path
filename = os.path.basename(path)
return filename
## Descarga un fichero a partir de una URL
def download_file(url, dest):
try:
r = requests.get(url)
if r.status_code != 200:
return False
with open(dest, "wb") as f:
f.write(r.content)
return True
except requests.exceptions.Timeout:
# Maybe set up for a retry, or continue in a retry loop
print("Timeout: {}".format(url))
except requests.exceptions.TooManyRedirects:
# Tell the user their URL was bad and try a different one
print("Bad URL: {}".format(url))
except requests.exceptions.RequestException as e:
# catastrophic error. bail.
raise SystemExit(e)
## Descomprime los ficheros que coinciden con la lista de extensiones
def unzip_file(src, dst):
# with zipfile.ZipFile(src, "r") as zip_ref:
# zip_ref.extractall(dst)
archive = src
directory = dst
extensions = (".z80", ".sna", ".tzx", ".tap", "dsk", ".trd", ".Z80", ".SNA", ".TZX", ".TAP", "DSK", ".TRD")
zip_file = zipfile.ZipFile(archive, "r")
[
zip_file.extract(file, directory)
for file in zip_file.namelist()
if file.endswith(extensions)
]
zip_file.close()
## Obtiene los ficheros de la consulta desde internet o desde la caché
## y los deposita en la carpeta destino, descomprimiendo los archivos necesarios
def get_files():
# Variables para la presentación en pantalla de la descarga
current_file = 0
total_files = len(elements)
total_files_width = len(str(total_files))
last_game_folder = ""
for element in elements:
# Carpeta del juego en destino y en caché
game_folder = element["root_folder"]
destination_folder = os.path.join(destination_path, element["root_folder"])
destination_subfolder = os.path.join(destination_folder, element["subfolder"])
cache_folder = os.path.join(cache_path, element["root_folder"])
cache_subfolder = os.path.join(cache_folder, element["subfolder"])
# Ruta completa hasta el fichero de destino y de caché
destination_file = os.path.join(destination_subfolder, element["file_name"])
cache_file = os.path.join(cache_subfolder, element["file_name"])
# Actualiza las variables de presentación
current_file = current_file + 1
if game_folder != last_game_folder:
print("\n{}".format(game_folder))
last_game_folder = game_folder
#print(
# "(WORKING : {} ({})".format(
# element["file_name"],
# element["filetype"]
# )
#)
# Comprueba si ya existe el fichero a descargar
if not os.path.isfile(destination_file) and (
not os.path.isfile(
os.path.join(destination_subfolder, element["non_zip_file_name"])
)
):
# Comprueba si ya existe el fichero en la cache
if os.path.isfile(cache_file):
# Si encuentra el fichero en cache, crea las carpetas de destino y lo copia o lo extrae
if not os.path.isdir(destination_folder):
os.mkdir(destination_folder)
if not os.path.isdir(destination_subfolder):
os.mkdir(destination_subfolder)
if cache_file.endswith(".zip") and element["subfolder"] == "":
unzip_file(cache_file, destination_subfolder)
else:
shutil.copyfile(cache_file, destination_file)
print(
"({:{width}} / {}) : cached : {} ({})".format(
current_file,
total_files,
element["file_name"],
element["filetype"],
width=total_files_width,
)
)
# El fichero no está en la cache
else:
status = "not found "
if download_file(element["url"], temp_file):
status = "downloaded"
if os.path.isfile(temp_file):
# Copia el fichero temnporal a la cache
if not os.path.isdir(cache_folder):
os.mkdir(cache_folder)
if not os.path.isdir(cache_subfolder):
os.mkdir(cache_subfolder)
shutil.copyfile(temp_file, cache_file)
os.remove(temp_file)
# Copia el fichero de la cache al destino
if os.path.isfile(cache_file):
if not os.path.isdir(destination_folder):
os.mkdir(destination_folder)
if not os.path.isdir(destination_subfolder):
os.mkdir(destination_subfolder)
if (
cache_file.endswith(".zip")
and element["subfolder"] == ""
):
unzip_file(cache_file, destination_folder)
else:
shutil.copyfile(cache_file, destination_file)
print(
"({:{width}} / {}) : {} : {} ({})".format(
current_file,
total_files,
status,
element["file_name"],
element["filetype"],
width=total_files_width,
)
)
if wait:
time.sleep(random.randint(min_wait, max_wait))
# El fichero ya existe en el destino
else:
print(
"({:{width}} / {}) : skipping : {} ({})".format(
current_file,
total_files,
element["file_name"],
element["filetype"],
width=total_files_width,
)
)
## Elimina los caracteres ilegales de la cadena de texto
def normalize_path(path):
illegal_chars = ["<", ">", ":", '"', "/", "\\", "|", "?", "*"]
replace_with = "_"
for char in illegal_chars:
path = path.replace(char, replace_with)
return path
## Limpia la carpeta de destino
def clear_destination_folder():
if should_clear_destination_path:
print("Clear destination folder ...")
folder = destination_path
for filename in os.listdir(folder):
file_path = os.path.join(folder, filename)
try:
if os.path.isfile(file_path) or os.path.islink(file_path):
os.unlink(file_path)
elif os.path.isdir(file_path):
shutil.rmtree(file_path)
except Exception as e:
print('Failed to delete %s. Reason: %s' % (file_path, e))
## Bucle principal
def main():
connect()
process_elements()
#for element in elements:
# print('')
# for key, value in element.items():
# print(key, ':', value)
clear_destination_folder()
get_files()
# for element in elements:
# print(element['title'])
# print(len(elements))
if __name__ == "__main__":
main()