a0ef53922e
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
110 lines
3.4 KiB
Python
110 lines
3.4 KiB
Python
import os
|
|
import subprocess
|
|
from typing import Callable
|
|
|
|
from core.sync_engine import SyncEngine
|
|
|
|
|
|
class RobocopySyncEngine(SyncEngine):
|
|
"""Motor de sincronización basado en robocopy (Windows)."""
|
|
|
|
def is_available(self) -> bool:
|
|
try:
|
|
result = subprocess.run(
|
|
["robocopy", "/?"],
|
|
stdout=subprocess.DEVNULL,
|
|
stderr=subprocess.DEVNULL,
|
|
)
|
|
# robocopy devuelve 16 en error fatal, cualquier otro código es OK
|
|
return result.returncode != 16
|
|
except FileNotFoundError:
|
|
return False
|
|
|
|
def sync_folder(
|
|
self,
|
|
src: str,
|
|
dst: str,
|
|
on_file: Callable[[str], None],
|
|
on_summary: Callable[[str], None],
|
|
) -> None:
|
|
if not os.path.isdir(src):
|
|
on_summary(" ⚠️ Carpeta no existe (omitido)")
|
|
on_file("(carpeta no existe)")
|
|
return
|
|
|
|
os.makedirs(dst, exist_ok=True)
|
|
|
|
cmd = ["robocopy", src, dst, "/MIR", "/NP"]
|
|
|
|
process = subprocess.Popen(
|
|
cmd,
|
|
stdout=subprocess.PIPE,
|
|
stderr=subprocess.STDOUT,
|
|
text=True,
|
|
universal_newlines=True,
|
|
)
|
|
|
|
files_copied = 0
|
|
dirs_copied = 0
|
|
total_bytes = 0
|
|
|
|
for line in process.stdout:
|
|
line = line.strip()
|
|
|
|
if line and not line.startswith("---") and not line.startswith("Total"):
|
|
if len(line) > 5 and not any(
|
|
x in line
|
|
for x in ["Files :", "Dirs :", "Bytes :", "Speed :", "Times :"]
|
|
):
|
|
on_file(line)
|
|
|
|
if "Files :" in line:
|
|
parts = line.split()
|
|
try:
|
|
idx = parts.index("Files")
|
|
if idx + 2 < len(parts):
|
|
files_copied = int(parts[idx + 2])
|
|
except (ValueError, IndexError):
|
|
pass
|
|
|
|
elif "Dirs :" in line:
|
|
parts = line.split()
|
|
try:
|
|
idx = parts.index("Dirs")
|
|
if idx + 2 < len(parts):
|
|
dirs_copied = int(parts[idx + 2])
|
|
except (ValueError, IndexError):
|
|
pass
|
|
|
|
elif "Bytes :" in line:
|
|
parts = line.split()
|
|
try:
|
|
idx = parts.index("Bytes")
|
|
if idx + 2 < len(parts):
|
|
bytes_str = parts[idx + 2].replace(",", "").replace(".", "")
|
|
bytes_str = "".join(c for c in bytes_str if c.isdigit())
|
|
if bytes_str:
|
|
total_bytes = int(bytes_str)
|
|
except (ValueError, IndexError):
|
|
pass
|
|
|
|
process.wait()
|
|
|
|
if files_copied > 0 or dirs_copied > 0:
|
|
size_str = self._format_bytes(total_bytes)
|
|
on_summary(f" ✓ {files_copied} archivos, {dirs_copied} carpetas ({size_str})")
|
|
else:
|
|
on_summary(" ✓ Sin cambios (ya sincronizado)")
|
|
|
|
on_file("-")
|
|
|
|
@staticmethod
|
|
def _format_bytes(n: int) -> str:
|
|
if n < 1024:
|
|
return f"{n} B"
|
|
if n < 1024 ** 2:
|
|
return f"{n / 1024:.2f} KB"
|
|
if n < 1024 ** 3:
|
|
return f"{n / 1024 ** 2:.2f} MB"
|
|
return f"{n / 1024 ** 3:.2f} GB"
|