Files
pocketsync/test_tkinter.py
T
2026-01-22 19:21:52 +01:00

367 lines
14 KiB
Python

import os
import json
import threading
import subprocess
import tkinter as tk
from tkinter import filedialog, messagebox
BASE_DIR = os.path.dirname(os.path.abspath(__file__))
CONFIG_FILE = os.path.join(BASE_DIR, "config.json")
class DirectorySelectorApp:
def __init__(self, root):
self.root = root
self.root.title("Selector de Directorios ES-DE / ROMs")
self.root.geometry("700x700")
# Variables de rutas
self.path_esde_src = tk.StringVar()
self.path_roms_src = tk.StringVar()
self.path_esde_dst = tk.StringVar()
self.path_roms_dst = tk.StringVar()
# ---------------------------
# RUTAS DE ORIGEN
# ---------------------------
tk.Label(root, text="Rutas de origen", font=("Arial", 12, "bold")).pack(pady=(10, 0))
self.create_path_selector("Origen ES-DE:", self.path_esde_src)
self.create_path_selector("Origen ROMs:", self.path_roms_src, callback=self.update_directory_list)
# ---------------------------
# LISTA DE DIRECTORIOS
# ---------------------------
tk.Label(root, text="Directorios encontrados en ROMs:", font=("Arial", 11)).pack(pady=(15, 5))
self.listbox = tk.Listbox(root, selectmode=tk.MULTIPLE, height=10)
self.listbox.pack(fill="both", expand=True, padx=10)
# ---------------------------
# RUTAS DE DESTINO
# ---------------------------
tk.Label(root, text="Rutas de destino", font=("Arial", 12, "bold")).pack(pady=(15, 0))
self.create_path_selector("Destino ES-DE:", self.path_esde_dst)
self.create_path_selector("Destino ROMs:", self.path_roms_dst)
# ---------------------------
# BOTÓN ROBOCOPY
# ---------------------------
tk.Button(root, text="Robocopy", font=("Arial", 12, "bold"),
command=self.run_robocopy).pack(pady=10)
# ---------------------------
# LABELS DE ESTADO
# ---------------------------
status_frame = tk.Frame(root, bg="#2a2a2a", relief="sunken", bd=2)
status_frame.pack(fill="x", padx=10, pady=5)
self.label_system = tk.Label(status_frame, text="Sistema: -",
font=("Arial", 10, "bold"),
bg="#2a2a2a", fg="#00ff00", anchor="w")
self.label_system.pack(fill="x", padx=10, pady=3)
self.label_phase = tk.Label(status_frame, text="Fase: -",
font=("Arial", 10),
bg="#2a2a2a", fg="#00aaff", anchor="w")
self.label_phase.pack(fill="x", padx=10, pady=3)
self.label_current = tk.Label(status_frame, text="Archivo: -",
font=("Arial", 9),
bg="#2a2a2a", fg="#ffaa00", anchor="w")
self.label_current.pack(fill="x", padx=10, pady=3)
# ---------------------------
# RESUMEN
# ---------------------------
tk.Label(root, text="Resumen:", font=("Arial", 11)).pack(pady=(10, 5))
self.summary_text = tk.Text(root, height=6, state="disabled", bg="#f0f0f0", fg="#000")
self.summary_text.pack(fill="both", expand=False, padx=10, pady=5)
# Evento de cierre
self.root.protocol("WM_DELETE_WINDOW", self.on_close)
# Cargar configuración previa
self.load_config()
# ---------------------------------------------------------
# CREAR SELECTOR DE RUTA
# ---------------------------------------------------------
def create_path_selector(self, label_text, var, callback=None):
frame = tk.Frame(self.root)
frame.pack(fill="x", padx=10, pady=5)
tk.Label(frame, text=label_text, width=15, anchor="w").pack(side="left")
tk.Entry(frame, textvariable=var, width=50).pack(side="left", padx=5)
tk.Button(frame, text="Buscar", command=lambda: self.choose_path(var, callback)).pack(side="left")
def choose_path(self, var, callback):
path = filedialog.askdirectory()
if not path:
return
var.set(path)
if callback:
callback(path)
# ---------------------------------------------------------
# LISTA DE DIRECTORIOS
# ---------------------------------------------------------
def update_directory_list(self, path):
self.listbox.delete(0, tk.END)
if not os.path.isdir(path):
return
try:
dirs = sorted(
d for d in os.listdir(path)
if os.path.isdir(os.path.join(path, d))
)
for d in dirs:
self.listbox.insert(tk.END, d)
except Exception as e:
messagebox.showerror("Error", f"No se pudo leer la ruta:\n{e}")
# ---------------------------------------------------------
# ACTUALIZACIÓN DE LABELS Y RESUMEN
# ---------------------------------------------------------
def append_summary(self, text):
self.summary_text.configure(state="normal")
self.summary_text.insert(tk.END, text + "\n")
self.summary_text.see(tk.END)
self.summary_text.configure(state="disabled")
def clear_summary(self):
self.summary_text.configure(state="normal")
self.summary_text.delete(1.0, tk.END)
self.summary_text.configure(state="disabled")
def update_status_system(self, text):
self.label_system.config(text=text)
self.root.update_idletasks()
def update_status_phase(self, text):
self.label_phase.config(text=text)
self.root.update_idletasks()
def update_status_current(self, text):
# Limitar longitud para que no se salga de la ventana
if len(text) > 80:
text = "..." + text[-77:]
self.label_current.config(text=text)
self.root.update_idletasks()
# ---------------------------------------------------------
# EJECUTAR ROBOCOPY (HILO)
# ---------------------------------------------------------
def run_robocopy(self):
thread = threading.Thread(target=self._run_robocopy_thread)
thread.daemon = True
thread.start()
def _run_robocopy_thread(self):
selected = [self.listbox.get(i) for i in self.listbox.curselection()]
if not selected:
self.append_summary("❌ No hay sistemas seleccionados.")
return
esde_src = self.path_esde_src.get()
roms_src = self.path_roms_src.get()
esde_dst = self.path_esde_dst.get()
roms_dst = self.path_roms_dst.get()
if not all([esde_src, roms_src, esde_dst, roms_dst]):
self.append_summary("❌ ERROR: Debes configurar todas las rutas antes de continuar.")
return
self.clear_summary()
self.append_summary("=" * 60)
self.append_summary("🚀 INICIANDO PROCESO DE COPIA")
self.append_summary(f"📦 Total de sistemas a procesar: {len(selected)}")
self.append_summary("=" * 60)
total_systems = len(selected)
for idx, system in enumerate(selected, 1):
# Actualizar label de sistema
self.update_status_system(f"Sistema: {idx}/{total_systems} - {system.upper()}")
self.append_summary(f"\n🎮 SISTEMA [{idx}/{total_systems}]: {system.upper()}")
# ROMs
self.update_status_phase("Fase: [1/3] Copiando ROMs...")
self.append_summary(" 📁 [1/3] Copiando ROMs...")
self.launch_robocopy_with_log(
os.path.join(roms_src, system),
os.path.join(roms_dst, system)
)
# ES-DE gamelists
self.update_status_phase("Fase: [2/3] Copiando gamelists...")
self.append_summary(" 📋 [2/3] Copiando gamelists...")
self.launch_robocopy_with_log(
os.path.join(esde_src, "gamelists", system),
os.path.join(esde_dst, "gamelists", system)
)
# ES-DE downloaded_media
self.update_status_phase("Fase: [3/3] Copiando media...")
self.append_summary(" 🖼️ [3/3] Copiando media...")
self.launch_robocopy_with_log(
os.path.join(esde_src, "downloaded_media", system),
os.path.join(esde_dst, "downloaded_media", system)
)
self.append_summary(f" ✅ Sistema '{system}' completado\n")
# Limpiar labels al finalizar
self.update_status_system("Sistema: ✅ COMPLETADO")
self.update_status_phase("Fase: -")
self.update_status_current("Archivo: -")
self.append_summary("=" * 60)
self.append_summary("🎉 PROCESO COMPLETADO EXITOSAMENTE")
self.append_summary("=" * 60)
def launch_robocopy_with_log(self, src, dst):
if not os.path.isdir(src):
self.append_summary(f" ⚠️ Carpeta no existe (omitido)")
self.update_status_current("Archivo: (carpeta no existe)")
return
os.makedirs(dst, exist_ok=True)
# Quitar /NFL y /NDL para ver los archivos en tiempo real
cmd = ["robocopy", src, dst, "/MIR", "/NP"]
process = subprocess.Popen(
cmd,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
text=True,
universal_newlines=True
)
# Variables para extraer el resumen
files_copied = 0
dirs_copied = 0
total_bytes = 0
for line in process.stdout:
line = line.strip()
# Actualizar label con el archivo actual
if line and not line.startswith("---") and not line.startswith("Total"):
# Si la línea parece un archivo siendo copiado
if len(line) > 5 and not any(x in line for x in ["Files :", "Dirs :", "Bytes :", "Speed :", "Times :"]):
self.update_status_current(f"Archivo: {line}")
# Extraer información relevante del output de robocopy
if "Files :" in line:
parts = line.split()
try:
total_idx = parts.index("Files")
if total_idx + 2 < len(parts):
files_copied = int(parts[total_idx + 2])
except (ValueError, IndexError):
pass
elif "Dirs :" in line:
parts = line.split()
try:
total_idx = parts.index("Dirs")
if total_idx + 2 < len(parts):
dirs_copied = int(parts[total_idx + 2])
except (ValueError, IndexError):
pass
elif "Bytes :" in line:
parts = line.split()
try:
bytes_idx = parts.index("Bytes")
if bytes_idx + 2 < len(parts):
bytes_str = parts[bytes_idx + 2].replace(",", "").replace(".", "")
# Extraer solo números
bytes_str = ''.join(c for c in bytes_str if c.isdigit())
if bytes_str:
total_bytes = int(bytes_str)
except (ValueError, IndexError):
pass
process.wait()
# Convertir bytes a formato legible
if files_copied > 0 or dirs_copied > 0:
if total_bytes < 1024:
size_str = f"{total_bytes} B"
elif total_bytes < 1024**2:
size_str = f"{total_bytes/1024:.2f} KB"
elif total_bytes < 1024**3:
size_str = f"{total_bytes/(1024**2):.2f} MB"
else:
size_str = f"{total_bytes/(1024**3):.2f} GB"
self.append_summary(f"{files_copied} archivos, {dirs_copied} carpetas ({size_str})")
else:
self.append_summary(f" ✓ Sin cambios (ya sincronizado)")
self.update_status_current("Archivo: -")
# ---------------------------------------------------------
# PERSISTENCIA
# ---------------------------------------------------------
def save_config(self):
data = {
"esde_src": self.path_esde_src.get(),
"roms_src": self.path_roms_src.get(),
"esde_dst": self.path_esde_dst.get(),
"roms_dst": self.path_roms_dst.get(),
"selected": [self.listbox.get(i) for i in self.listbox.curselection()]
}
try:
with open(CONFIG_FILE, "w", encoding="utf-8") as f:
json.dump(data, f, indent=4)
except Exception as e:
messagebox.showerror("Error", f"No se pudo guardar la configuración:\n{e}")
def load_config(self):
if not os.path.exists(CONFIG_FILE):
return
try:
with open(CONFIG_FILE, "r", encoding="utf-8") as f:
data = json.load(f)
except Exception:
return
self.path_esde_src.set(data.get("esde_src", ""))
self.path_roms_src.set(data.get("roms_src", ""))
self.path_esde_dst.set(data.get("esde_dst", ""))
self.path_roms_dst.set(data.get("roms_dst", ""))
roms_path = self.path_roms_src.get()
if os.path.isdir(roms_path):
self.update_directory_list(roms_path)
selected = set(data.get("selected", []))
for i in range(self.listbox.size()):
if self.listbox.get(i) in selected:
self.listbox.selection_set(i)
# ---------------------------------------------------------
# CIERRE
# ---------------------------------------------------------
def on_close(self):
self.save_config()
self.root.destroy()
if __name__ == "__main__":
root = tk.Tk()
app = DirectorySelectorApp(root)
root.mainloop()