#!/usr/bin/env python3 """repoman — lista repos en un servidor Gitea y clona los que falten en local.""" from __future__ import annotations import argparse import json import os import shutil import subprocess import sys import threading import time import tomllib import urllib.error import urllib.parse import urllib.request from concurrent.futures import ThreadPoolExecutor, as_completed from dataclasses import dataclass, field from pathlib import Path import readchar from rich.console import Console, Group from rich.live import Live from rich.table import Table from rich.text import Text __version__ = "1.0.5" console = Console() CONFIG_PATH = Path(os.environ.get("XDG_CONFIG_HOME", Path.home() / ".config")) / "repoman" / "config.toml" CONFIG_EXAMPLE = """\ # ~/.config/repoman/config.toml [server] url = "https://gitea.example.com" # token = "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx" # opcional, para repos privados # default_owner = "tu-usuario" # opcional, omite el argumento # clone_protocol = "https" # "https" (por defecto) o "ssh" """ @dataclass class Config: url: str token: str | None = None default_owner: str | None = None clone_protocol: str = "https" @dataclass class RemoteRepo: name: str full_name: str clone_url: str ssh_url: str description: str = "" private: bool = False default_branch: str = "main" @dataclass class RepoEntry: name: str full_name: str remote: RemoteRepo | None local_path: Path | None # None si no está clonado selected: bool = False cloning: bool = False cloned_ok: bool | None = None # None = no intentado, True/False = resultado error: str = "" # --- config ----------------------------------------------------------------- def create_default_config() -> int: if CONFIG_PATH.exists(): console.print(f"[yellow]Ya existe[/yellow] [bold]{CONFIG_PATH}[/bold] — no se sobreescribe.") return 1 try: CONFIG_PATH.parent.mkdir(parents=True, exist_ok=True) CONFIG_PATH.write_text(CONFIG_EXAMPLE) except OSError as e: console.print(f"[red]No se pudo escribir {CONFIG_PATH}: {e}[/red]") return 1 console.print(f"[green]Config creada en[/green] [bold]{CONFIG_PATH}[/bold]") console.print("[dim]Edítala y descomenta token / default_owner / clone_protocol si los necesitas.[/dim]") return 0 def load_config() -> Config: if not CONFIG_PATH.exists(): console.print(f"[red]No se ha encontrado la configuración en[/red] [bold]{CONFIG_PATH}[/bold]") console.print("[yellow]Crea el fichero con este contenido mínimo:[/yellow]\n") console.print(CONFIG_EXAMPLE) sys.exit(1) try: with CONFIG_PATH.open("rb") as fh: raw = tomllib.load(fh) except (OSError, tomllib.TOMLDecodeError) as e: console.print(f"[red]Error leyendo {CONFIG_PATH}: {e}[/red]") sys.exit(1) server = raw.get("server") or {} url = server.get("url") if not url: console.print(f"[red]Falta [bold]server.url[/bold] en {CONFIG_PATH}[/red]") sys.exit(1) proto = (server.get("clone_protocol") or "https").lower() if proto not in ("https", "ssh"): console.print(f"[red]server.clone_protocol debe ser 'https' o 'ssh' (encontrado: {proto})[/red]") sys.exit(1) return Config( url=url.rstrip("/"), token=server.get("token") or None, default_owner=server.get("default_owner") or None, clone_protocol=proto, ) # --- cliente Gitea ---------------------------------------------------------- def gitea_get(cfg: Config, path: str, params: dict[str, str] | None = None) -> tuple[int, bytes, dict[str, str]]: qs = ("?" + urllib.parse.urlencode(params)) if params else "" url = f"{cfg.url}/api/v1{path}{qs}" req = urllib.request.Request(url, headers={"Accept": "application/json"}) if cfg.token: req.add_header("Authorization", f"token {cfg.token}") try: with urllib.request.urlopen(req, timeout=30) as resp: return resp.status, resp.read(), dict(resp.headers) except urllib.error.HTTPError as e: return e.code, e.read() if e.fp else b"", dict(e.headers) if e.headers else {} except urllib.error.URLError as e: raise RuntimeError(f"no se pudo contactar con {cfg.url}: {e.reason}") from e def _oneline(s: str) -> str: """Col·lapsa qualsevol salt de línia o whitespace múltiple en un sol espai.""" return " ".join((s or "").split()) def _parse_repo_list(body: bytes) -> list[RemoteRepo]: items = json.loads(body or b"[]") return [ RemoteRepo( name=item.get("name", ""), full_name=item.get("full_name", ""), clone_url=item.get("clone_url", ""), ssh_url=item.get("ssh_url", ""), description=_oneline(item.get("description") or ""), private=bool(item.get("private", False)), default_branch=item.get("default_branch") or "main", ) for item in items ] def fetch_remote_repos(cfg: Config, owner: str) -> list[RemoteRepo]: """Lista los repos de un owner. Prueba primero como organización, luego como usuario.""" for endpoint in (f"/orgs/{owner}/repos", f"/users/{owner}/repos"): repos: list[RemoteRepo] = [] page = 1 found = False while True: status, body, _ = gitea_get(cfg, endpoint, {"limit": "50", "page": str(page)}) if status == 401: raise RuntimeError("401 no autorizado — revisa server.token en la config") if status == 404: break # no es este tipo, probamos el siguiente if status >= 400: raise RuntimeError(f"Gitea devolvió HTTP {status}: {body[:200].decode('utf-8', 'replace')}") found = True items = _parse_repo_list(body) repos.extend(items) if len(items) < 50: break page += 1 if found: return repos raise RuntimeError(f"'{owner}' no existe como usuario ni como organización en el servidor") # --- escaneo local ---------------------------------------------------------- def is_git_repo(path: Path) -> bool: return path.is_dir() and (path / ".git").exists() def local_remote_url(path: Path) -> str: try: out = subprocess.run( ["git", "-C", str(path), "remote", "get-url", "origin"], capture_output=True, text=True, timeout=5, ) return out.stdout.strip() if out.returncode == 0 else "" except (subprocess.TimeoutExpired, FileNotFoundError): return "" def scan_local(base: Path) -> dict[str, Path]: """Devuelve {url_normalizada_or_name: ruta_local} para cada repo git en `base`.""" found: dict[str, Path] = {} try: entries = [p for p in base.iterdir() if is_git_repo(p)] except (PermissionError, FileNotFoundError): return found for p in entries: url = local_remote_url(p) if url: found[normalize_url(url)] = p else: # Sense origin → emparellem pel nom (per a repos locals sense remote) found[f"name::{p.name}"] = p return found def normalize_url(url: str) -> str: """Normaliza una URL git para comparar (quita .git, lower, ssh→https-like).""" u = url.strip().lower() if u.endswith(".git"): u = u[:-4] # git@host:owner/repo -> host/owner/repo if u.startswith("git@"): u = u[4:].replace(":", "/", 1) # https://host/owner/repo -> host/owner/repo for prefix in ("https://", "http://", "ssh://git@", "ssh://"): if u.startswith(prefix): u = u[len(prefix):] break return u # --- emparejado local/remoto ------------------------------------------------ def build_entries(remote_repos: list[RemoteRepo], local_index: dict[str, Path]) -> list[RepoEntry]: entries: list[RepoEntry] = [] for r in remote_repos: local: Path | None = None for candidate_url in (r.clone_url, r.ssh_url): key = normalize_url(candidate_url) if key in local_index: local = local_index[key] break if local is None and f"name::{r.name}" in local_index: local = local_index[f"name::{r.name}"] entries.append( RepoEntry( name=r.name, full_name=r.full_name, remote=r, local_path=local, ) ) entries.sort(key=lambda e: e.name.lower()) return entries # --- render ----------------------------------------------------------------- SPINNER_FRAMES = ["⠋", "⠙", "⠹", "⠸", "⠼", "⠴", "⠦", "⠧", "⠇", "⠏"] def spinner_frame() -> str: return SPINNER_FRAMES[int(time.monotonic() * 10) % len(SPINNER_FRAMES)] def compute_page(cursor: int, total: int) -> tuple[int, int, int, int, int]: """Calcula (start, end, page_idx, total_pages, page_size) per a la finestra visible.""" h = max(10, console.size.height) # Marges: 1 títol + 1 separador títol + 1 cabecera + 2 vores + 1 llegenda + 1 status + 1 buffer page_size = max(5, h - 8) if total == 0: return 0, 0, 0, 1, page_size page = cursor // page_size start = page * page_size end = min(start + page_size, total) total_pages = (total + page_size - 1) // page_size return start, end, page, total_pages, page_size def render(entries: list[RepoEntry], cursor: int, base: Path, owner: str, status_msg: str = "") -> Group: start, end, page_idx, total_pages, _ = compute_page(cursor, len(entries)) page_label = f" — pàgina {page_idx + 1}/{total_pages}" if total_pages > 1 else "" table = Table( show_header=True, header_style="bold magenta", title=f"[bold]repoman[/bold] — [cyan]{owner}[/cyan] → [bold]{base}[/bold][dim]{page_label}[/dim]", title_style="white", expand=True, ) table.add_column(" ", width=1, no_wrap=True) table.add_column("Estado", width=14, no_wrap=True) table.add_column("Repo", style="bold", no_wrap=True, overflow="ellipsis") table.add_column("Descripción", no_wrap=True, overflow="ellipsis", ratio=1) for i in range(start, end): e = entries[i] # Cursor caret = Text("▶", style="bold yellow") if i == cursor else Text(" ") # Estat if e.cloning: state = Text(f"{spinner_frame()} clonant…", style="bold blue") elif e.cloned_ok is True: state = Text("✓ clonat", style="bold green") elif e.cloned_ok is False: state = Text("✗ error", style="bold red") elif e.local_path is not None and e.remote is None: state = Text("◇ només local", style="dim yellow") elif e.local_path is not None: state = Text("● en local", style="green") elif e.selected: state = Text("☑ marcat", style="bold cyan") else: state = Text("○ remot", style="dim") # Nom amb marca de privat name = Text(e.name) if e.remote and e.remote.private: name = Text("🔒 ", style="yellow") + name if i == cursor: name.stylize("bold underline") # Descripció: motiu de l'error si n'hi ha, si no la del repo if e.cloned_ok is False and e.error: desc = Text(e.error, style="red") else: desc_text = (e.remote.description if e.remote else "(no està al servidor)") or "—" desc = Text(desc_text) table.add_row(caret, state, name, desc) legend = Text.assemble( ("↑/↓ j/k", "bold cyan"), " moure ", ("←/→ h/l", "bold cyan"), " pàgina ", ("Space", "bold cyan"), " marcar ", ("a", "bold cyan"), " tots ", ("n", "bold cyan"), " cap ", ("Enter", "bold green"), " clona ", ("r", "bold cyan"), " refresca ", ("q", "bold cyan"), " surt", style="dim", ) status = Text(status_msg, style="dim italic") if status_msg else Text("") return Group(table, legend, status) # --- clonado ---------------------------------------------------------------- def clone_one(entry: RepoEntry, base: Path, cfg: Config) -> tuple[RepoEntry, bool, str]: if entry.remote is None: return entry, False, "sense info remota" url = entry.remote.ssh_url if cfg.clone_protocol == "ssh" else entry.remote.clone_url if not url: return entry, False, "URL buida" target = base / entry.name if target.exists(): return entry, False, "ja existeix en local" env = { **os.environ, "GIT_TERMINAL_PROMPT": "0", # mai demanar credencials interactivament "GIT_ASKPASS": "/bin/true", # cap helper interactiu "SSH_ASKPASS": "/bin/true", } try: result = subprocess.run( ["git", "clone", "--quiet", url, str(target)], capture_output=True, text=True, timeout=600, stdin=subprocess.DEVNULL, env=env, ) if result.returncode != 0: # Netegem la carpeta parcial si git la va crear if target.exists(): shutil.rmtree(target, ignore_errors=True) err_lines = (result.stderr or result.stdout or "git clone ha fallat").strip().splitlines() err_msg = " ".join(l.strip() for l in err_lines if l.strip()) return entry, False, err_msg or "error desconegut" return entry, True, "ok" except subprocess.TimeoutExpired: if target.exists(): shutil.rmtree(target, ignore_errors=True) return entry, False, "timeout (>10 min)" except FileNotFoundError: return entry, False, "git no trobat" def run_clone_queue(entries: list[RepoEntry], base: Path, cfg: Config, live: Live, cursor: int, owner: str) -> int: pending = [e for e in entries if e.selected and e.local_path is None and e.remote is not None] if not pending: live.update(render(entries, cursor, base, owner, "Res marcat per clonar."), refresh=True) return 0 for e in pending: e.cloning = True e.selected = False total = len(pending) stop = threading.Event() state = {"ok": 0, "done": 0} def animate() -> None: while not stop.is_set(): in_progress = total - state["done"] msg = f"Clonant {in_progress} en curs · {state['done']}/{total} acabats" live.update(render(entries, cursor, base, owner, msg), refresh=True) stop.wait(0.1) anim = threading.Thread(target=animate, daemon=True) anim.start() try: max_workers = min(4, total) with ThreadPoolExecutor(max_workers=max_workers) as ex: futures = {ex.submit(clone_one, e, base, cfg): e for e in pending} for fut in as_completed(futures): entry, success, msg = fut.result() entry.cloning = False entry.cloned_ok = success entry.error = "" if success else msg if success: entry.local_path = base / entry.name state["ok"] += 1 state["done"] += 1 finally: stop.set() anim.join() live.update(render(entries, cursor, base, owner, f"Fet: {state['ok']}/{total} clonats."), refresh=True) return state["ok"] # --- TUI loop --------------------------------------------------------------- def tui(entries: list[RepoEntry], base: Path, cfg: Config, owner: str) -> None: if not entries: console.print("[yellow]No hay repos para mostrar.[/yellow]") return cursor = 0 status_msg = "" with Live(render(entries, cursor, base, owner, status_msg), console=console, screen=False, auto_refresh=False) as live: while True: live.update(render(entries, cursor, base, owner, status_msg), refresh=True) try: key = readchar.readkey() except KeyboardInterrupt: return status_msg = "" if key in ("q", "Q", readchar.key.CTRL_C, readchar.key.CTRL_D): return elif key in (readchar.key.UP, "k"): cursor = (cursor - 1) % len(entries) elif key in (readchar.key.DOWN, "j"): cursor = (cursor + 1) % len(entries) elif key in (readchar.key.LEFT, readchar.key.PAGE_UP, "h"): _, _, page_idx, total_pages, page_size = compute_page(cursor, len(entries)) new_page = (page_idx - 1) % total_pages cursor = min(new_page * page_size, len(entries) - 1) elif key in (readchar.key.RIGHT, readchar.key.PAGE_DOWN, "l"): _, _, page_idx, total_pages, page_size = compute_page(cursor, len(entries)) new_page = (page_idx + 1) % total_pages cursor = min(new_page * page_size, len(entries) - 1) elif key in (readchar.key.HOME, "g"): cursor = 0 elif key in (readchar.key.END, "G"): cursor = len(entries) - 1 elif key == " ": e = entries[cursor] if e.local_path is not None or e.remote is None: status_msg = "ese repo no se puede clonar (ya está en local o sólo es local)" else: e.selected = not e.selected elif key == "a": for e in entries: if e.local_path is None and e.remote is not None: e.selected = True elif key == "n": for e in entries: e.selected = False elif key in (readchar.key.ENTER, "\r", "\n"): run_clone_queue(entries, base, cfg, live, cursor, owner) elif key in ("r", "R"): status_msg = "refrescando…" live.update(render(entries, cursor, base, owner, status_msg), refresh=True) try: remote_repos = fetch_remote_repos(cfg, owner) except RuntimeError as e: status_msg = f"error al refrescar: {e}" continue local_index = scan_local(base) new_entries = build_entries(remote_repos, local_index) entries[:] = new_entries cursor = min(cursor, len(entries) - 1) if entries else 0 status_msg = f"refrescado: {len(entries)} repo(s)" # --- entrypoint ------------------------------------------------------------- def main() -> int: parser = argparse.ArgumentParser( prog="repoman", description="Lista repos en un servidor Gitea y clona los que faltan en local.", ) parser.add_argument("-v", "--version", action="version", version=f"repoman v{__version__}") parser.add_argument( "--create-default-config", action="store_true", help=f"Crea un fichero de config por defecto en {CONFIG_PATH} y sale", ) parser.add_argument("path", type=Path, nargs="?", help="Carpeta local donde se clonarán los repos") parser.add_argument("owner", nargs="?", help="Usuario u organización en Gitea (opcional si default_owner está en la config)") args = parser.parse_args() if args.create_default_config: return create_default_config() if args.path is None: parser.error("falta el argumento path") if shutil.which("git") is None: console.print("[red]git no está instalado o no se encuentra en el PATH[/red]") return 1 cfg = load_config() owner = args.owner or cfg.default_owner if not owner: console.print("[red]Falta el argumento [bold]owner[/bold] (o define default_owner en la config)[/red]") return 2 base: Path = args.path.expanduser().resolve() if not base.exists(): console.print(f"[yellow]La carpeta {base} no existe.[/yellow]") try: ans = console.input("[bold]¿Crearla? [s/N][/bold] ").strip().lower() except (EOFError, KeyboardInterrupt): console.print() return 1 if ans not in ("s", "si", "sí", "y", "yes"): return 1 base.mkdir(parents=True, exist_ok=True) elif not base.is_dir(): console.print(f"[red]{base} no es un directorio[/red]") return 1 console.print(f"[dim]repoman v{__version__} — servidor[/dim] [bold]{cfg.url}[/bold] [dim]· owner[/dim] [cyan]{owner}[/cyan]") with console.status("[cyan]Consultando Gitea…[/cyan]", spinner="dots"): try: remote_repos = fetch_remote_repos(cfg, owner) except RuntimeError as e: console.print(f"[red]{e}[/red]") return 1 if not remote_repos: console.print(f"[yellow]El servidor no devolvió repos para [bold]{owner}[/bold].[/yellow]") with console.status("[cyan]Escaneando local…[/cyan]", spinner="dots"): local_index = scan_local(base) entries = build_entries(remote_repos, local_index) if not entries: console.print("[yellow]No hay repos remotos ni locales que mostrar.[/yellow]") return 0 try: tui(entries, base, cfg, owner) except KeyboardInterrupt: pass return 0 if __name__ == "__main__": sys.exit(main())