#!/usr/bin/env python3 """repoman — lista repos en un servidor Gitea y clona los que falten en local.""" from __future__ import annotations import argparse import json import os import shutil import subprocess import sys import tomllib import urllib.error import urllib.parse import urllib.request from concurrent.futures import ThreadPoolExecutor, as_completed from dataclasses import dataclass, field from pathlib import Path import readchar from rich.console import Console, Group from rich.live import Live from rich.table import Table from rich.text import Text __version__ = "1.0.0" console = Console() CONFIG_PATH = Path(os.environ.get("XDG_CONFIG_HOME", Path.home() / ".config")) / "repoman" / "config.toml" CONFIG_EXAMPLE = """\ # ~/.config/repoman/config.toml [server] url = "https://gitea.example.com" # token = "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx" # opcional, para repos privados # default_owner = "tu-usuario" # opcional, omite el argumento # clone_protocol = "https" # "https" (por defecto) o "ssh" """ @dataclass class Config: url: str token: str | None = None default_owner: str | None = None clone_protocol: str = "https" @dataclass class RemoteRepo: name: str full_name: str clone_url: str ssh_url: str description: str = "" private: bool = False default_branch: str = "main" @dataclass class RepoEntry: name: str full_name: str remote: RemoteRepo | None local_path: Path | None # None si no está clonado selected: bool = False cloning: bool = False cloned_ok: bool | None = None # None = no intentado, True/False = resultado error: str = "" # --- config ----------------------------------------------------------------- def create_default_config() -> int: if CONFIG_PATH.exists(): console.print(f"[yellow]Ya existe[/yellow] [bold]{CONFIG_PATH}[/bold] — no se sobreescribe.") return 1 try: CONFIG_PATH.parent.mkdir(parents=True, exist_ok=True) CONFIG_PATH.write_text(CONFIG_EXAMPLE) except OSError as e: console.print(f"[red]No se pudo escribir {CONFIG_PATH}: {e}[/red]") return 1 console.print(f"[green]Config creada en[/green] [bold]{CONFIG_PATH}[/bold]") console.print("[dim]Edítala y descomenta token / default_owner / clone_protocol si los necesitas.[/dim]") return 0 def load_config() -> Config: if not CONFIG_PATH.exists(): console.print(f"[red]No se ha encontrado la configuración en[/red] [bold]{CONFIG_PATH}[/bold]") console.print("[yellow]Crea el fichero con este contenido mínimo:[/yellow]\n") console.print(CONFIG_EXAMPLE) sys.exit(1) try: with CONFIG_PATH.open("rb") as fh: raw = tomllib.load(fh) except (OSError, tomllib.TOMLDecodeError) as e: console.print(f"[red]Error leyendo {CONFIG_PATH}: {e}[/red]") sys.exit(1) server = raw.get("server") or {} url = server.get("url") if not url: console.print(f"[red]Falta [bold]server.url[/bold] en {CONFIG_PATH}[/red]") sys.exit(1) proto = (server.get("clone_protocol") or "https").lower() if proto not in ("https", "ssh"): console.print(f"[red]server.clone_protocol debe ser 'https' o 'ssh' (encontrado: {proto})[/red]") sys.exit(1) return Config( url=url.rstrip("/"), token=server.get("token") or None, default_owner=server.get("default_owner") or None, clone_protocol=proto, ) # --- cliente Gitea ---------------------------------------------------------- def gitea_get(cfg: Config, path: str, params: dict[str, str] | None = None) -> tuple[int, bytes, dict[str, str]]: qs = ("?" + urllib.parse.urlencode(params)) if params else "" url = f"{cfg.url}/api/v1{path}{qs}" req = urllib.request.Request(url, headers={"Accept": "application/json"}) if cfg.token: req.add_header("Authorization", f"token {cfg.token}") try: with urllib.request.urlopen(req, timeout=30) as resp: return resp.status, resp.read(), dict(resp.headers) except urllib.error.HTTPError as e: return e.code, e.read() if e.fp else b"", dict(e.headers) if e.headers else {} except urllib.error.URLError as e: raise RuntimeError(f"no se pudo contactar con {cfg.url}: {e.reason}") from e def _oneline(s: str) -> str: """Col·lapsa qualsevol salt de línia o whitespace múltiple en un sol espai.""" return " ".join((s or "").split()) def _parse_repo_list(body: bytes) -> list[RemoteRepo]: items = json.loads(body or b"[]") return [ RemoteRepo( name=item.get("name", ""), full_name=item.get("full_name", ""), clone_url=item.get("clone_url", ""), ssh_url=item.get("ssh_url", ""), description=_oneline(item.get("description") or ""), private=bool(item.get("private", False)), default_branch=item.get("default_branch") or "main", ) for item in items ] def fetch_remote_repos(cfg: Config, owner: str) -> list[RemoteRepo]: """Lista los repos de un owner. Prueba primero como organización, luego como usuario.""" for endpoint in (f"/orgs/{owner}/repos", f"/users/{owner}/repos"): repos: list[RemoteRepo] = [] page = 1 found = False while True: status, body, _ = gitea_get(cfg, endpoint, {"limit": "50", "page": str(page)}) if status == 401: raise RuntimeError("401 no autorizado — revisa server.token en la config") if status == 404: break # no es este tipo, probamos el siguiente if status >= 400: raise RuntimeError(f"Gitea devolvió HTTP {status}: {body[:200].decode('utf-8', 'replace')}") found = True items = _parse_repo_list(body) repos.extend(items) if len(items) < 50: break page += 1 if found: return repos raise RuntimeError(f"'{owner}' no existe como usuario ni como organización en el servidor") # --- escaneo local ---------------------------------------------------------- def is_git_repo(path: Path) -> bool: return path.is_dir() and (path / ".git").exists() def local_remote_url(path: Path) -> str: try: out = subprocess.run( ["git", "-C", str(path), "remote", "get-url", "origin"], capture_output=True, text=True, timeout=5, ) return out.stdout.strip() if out.returncode == 0 else "" except (subprocess.TimeoutExpired, FileNotFoundError): return "" def scan_local(base: Path) -> dict[str, Path]: """Devuelve {url_normalizada_or_name: ruta_local} para cada repo git en `base`.""" found: dict[str, Path] = {} try: entries = [p for p in base.iterdir() if is_git_repo(p)] except (PermissionError, FileNotFoundError): return found for p in entries: url = local_remote_url(p) if url: found[normalize_url(url)] = p # Siempre indexamos también por nombre como fallback found.setdefault(f"name::{p.name}", p) return found def normalize_url(url: str) -> str: """Normaliza una URL git para comparar (quita .git, lower, ssh→https-like).""" u = url.strip().lower() if u.endswith(".git"): u = u[:-4] # git@host:owner/repo -> host/owner/repo if u.startswith("git@"): u = u[4:].replace(":", "/", 1) # https://host/owner/repo -> host/owner/repo for prefix in ("https://", "http://", "ssh://git@", "ssh://"): if u.startswith(prefix): u = u[len(prefix):] break return u # --- emparejado local/remoto ------------------------------------------------ def build_entries(remote_repos: list[RemoteRepo], local_index: dict[str, Path]) -> list[RepoEntry]: entries: list[RepoEntry] = [] for r in remote_repos: local: Path | None = None for candidate_url in (r.clone_url, r.ssh_url): key = normalize_url(candidate_url) if key in local_index: local = local_index[key] break if local is None and f"name::{r.name}" in local_index: local = local_index[f"name::{r.name}"] entries.append( RepoEntry( name=r.name, full_name=r.full_name, remote=r, local_path=local, ) ) entries.sort(key=lambda e: e.name.lower()) return entries # --- render ----------------------------------------------------------------- def render(entries: list[RepoEntry], cursor: int, base: Path, owner: str, status_msg: str = "") -> Group: table = Table( show_header=True, header_style="bold magenta", title=f"[bold]repoman[/bold] — [cyan]{owner}[/cyan] → [bold]{base}[/bold]", title_style="white", expand=True, ) table.add_column(" ", width=1, no_wrap=True) table.add_column("Estado", width=14, no_wrap=True) table.add_column("Repo", style="bold", no_wrap=True, overflow="ellipsis") table.add_column("Descripción", no_wrap=True, overflow="ellipsis", ratio=1) for i, e in enumerate(entries): # Cursor caret = Text("▶", style="bold yellow") if i == cursor else Text(" ") # Estado if e.cloning: state = Text("⟳ clonando…", style="bold blue") elif e.cloned_ok is True: state = Text("✓ clonado", style="bold green") elif e.cloned_ok is False: state = Text(f"✗ {e.error[:11]}", style="bold red") elif e.local_path is not None and e.remote is None: state = Text("◇ sólo local", style="dim yellow") elif e.local_path is not None: state = Text("● en local", style="green") elif e.selected: state = Text("☑ marcado", style="bold cyan") else: state = Text("○ remoto", style="dim") # Nombre con marca de privado name = Text(e.name) if e.remote and e.remote.private: name = Text("🔒 ", style="yellow") + name if i == cursor: name.stylize("bold underline") desc = (e.remote.description if e.remote else "(no está en el servidor)") or "—" table.add_row(caret, state, name, desc) legend = Text.assemble( ("↑/↓ j/k", "bold cyan"), " mover ", ("Space", "bold cyan"), " marcar/desmarcar ", ("a", "bold cyan"), " marcar todos remotos ", ("n", "bold cyan"), " ninguno ", ("Enter", "bold green"), " clonar marcados ", ("r", "bold cyan"), " refrescar ", ("q", "bold cyan"), " salir", style="dim", ) status = Text(status_msg, style="dim italic") if status_msg else Text("") return Group(table, legend, status) # --- clonado ---------------------------------------------------------------- def clone_one(entry: RepoEntry, base: Path, cfg: Config) -> tuple[RepoEntry, bool, str]: if entry.remote is None: return entry, False, "sin info remota" url = entry.remote.ssh_url if cfg.clone_protocol == "ssh" else entry.remote.clone_url if not url: return entry, False, "URL vacía" target = base / entry.name if target.exists(): return entry, False, "ya existe en local" try: result = subprocess.run( ["git", "clone", "--quiet", url, str(target)], capture_output=True, text=True, timeout=600, ) if result.returncode != 0: err = (result.stderr or result.stdout or "git clone falló").strip().splitlines() return entry, False, err[-1][:80] if err else "error" return entry, True, "ok" except subprocess.TimeoutExpired: return entry, False, "timeout" except FileNotFoundError: return entry, False, "git no encontrado" def run_clone_queue(entries: list[RepoEntry], base: Path, cfg: Config, live: Live, cursor: int, owner: str) -> int: pending = [e for e in entries if e.selected and e.local_path is None and e.remote is not None] if not pending: live.update(render(entries, cursor, base, owner, "Nada marcado para clonar.")) return 0 for e in pending: e.cloning = True e.selected = False live.update(render(entries, cursor, base, owner, f"Clonando {len(pending)} repo(s)…")) ok = 0 max_workers = min(4, len(pending)) with ThreadPoolExecutor(max_workers=max_workers) as ex: futures = {ex.submit(clone_one, e, base, cfg): e for e in pending} for fut in as_completed(futures): entry, success, msg = fut.result() entry.cloning = False entry.cloned_ok = success entry.error = "" if success else msg if success: entry.local_path = base / entry.name ok += 1 live.update(render(entries, cursor, base, owner, f"Clonando… {ok}/{len(pending)} listos")) live.update(render(entries, cursor, base, owner, f"Hecho: {ok}/{len(pending)} clonados.")) return ok # --- TUI loop --------------------------------------------------------------- def tui(entries: list[RepoEntry], base: Path, cfg: Config, owner: str) -> None: if not entries: console.print("[yellow]No hay repos para mostrar.[/yellow]") return cursor = 0 status_msg = "" with Live(render(entries, cursor, base, owner, status_msg), console=console, screen=False, auto_refresh=False) as live: while True: live.update(render(entries, cursor, base, owner, status_msg), refresh=True) try: key = readchar.readkey() except KeyboardInterrupt: return status_msg = "" if key in ("q", "Q", readchar.key.CTRL_C, readchar.key.CTRL_D): return elif key in (readchar.key.UP, "k"): cursor = (cursor - 1) % len(entries) elif key in (readchar.key.DOWN, "j"): cursor = (cursor + 1) % len(entries) elif key in (readchar.key.PAGE_UP,): cursor = max(0, cursor - 10) elif key in (readchar.key.PAGE_DOWN,): cursor = min(len(entries) - 1, cursor + 10) elif key in (readchar.key.HOME, "g"): cursor = 0 elif key in (readchar.key.END, "G"): cursor = len(entries) - 1 elif key == " ": e = entries[cursor] if e.local_path is not None or e.remote is None: status_msg = "ese repo no se puede clonar (ya está en local o sólo es local)" else: e.selected = not e.selected elif key == "a": for e in entries: if e.local_path is None and e.remote is not None: e.selected = True elif key == "n": for e in entries: e.selected = False elif key in (readchar.key.ENTER, "\r", "\n"): run_clone_queue(entries, base, cfg, live, cursor, owner) elif key in ("r", "R"): status_msg = "refrescando…" live.update(render(entries, cursor, base, owner, status_msg), refresh=True) try: remote_repos = fetch_remote_repos(cfg, owner) except RuntimeError as e: status_msg = f"error al refrescar: {e}" continue local_index = scan_local(base) new_entries = build_entries(remote_repos, local_index) entries[:] = new_entries cursor = min(cursor, len(entries) - 1) if entries else 0 status_msg = f"refrescado: {len(entries)} repo(s)" # --- entrypoint ------------------------------------------------------------- def main() -> int: parser = argparse.ArgumentParser( prog="repoman", description="Lista repos en un servidor Gitea y clona los que faltan en local.", ) parser.add_argument("-v", "--version", action="version", version=f"repoman v{__version__}") parser.add_argument( "--create-default-config", action="store_true", help=f"Crea un fichero de config por defecto en {CONFIG_PATH} y sale", ) parser.add_argument("path", type=Path, nargs="?", help="Carpeta local donde se clonarán los repos") parser.add_argument("owner", nargs="?", help="Usuario u organización en Gitea (opcional si default_owner está en la config)") args = parser.parse_args() if args.create_default_config: return create_default_config() if args.path is None: parser.error("falta el argumento path") if shutil.which("git") is None: console.print("[red]git no está instalado o no se encuentra en el PATH[/red]") return 1 cfg = load_config() owner = args.owner or cfg.default_owner if not owner: console.print("[red]Falta el argumento [bold]owner[/bold] (o define default_owner en la config)[/red]") return 2 base: Path = args.path.expanduser().resolve() if not base.exists(): console.print(f"[yellow]La carpeta {base} no existe.[/yellow]") try: ans = console.input("[bold]¿Crearla? [s/N][/bold] ").strip().lower() except (EOFError, KeyboardInterrupt): console.print() return 1 if ans not in ("s", "si", "sí", "y", "yes"): return 1 base.mkdir(parents=True, exist_ok=True) elif not base.is_dir(): console.print(f"[red]{base} no es un directorio[/red]") return 1 console.print(f"[dim]repoman v{__version__} — servidor[/dim] [bold]{cfg.url}[/bold] [dim]· owner[/dim] [cyan]{owner}[/cyan]") with console.status("[cyan]Consultando Gitea…[/cyan]", spinner="dots"): try: remote_repos = fetch_remote_repos(cfg, owner) except RuntimeError as e: console.print(f"[red]{e}[/red]") return 1 if not remote_repos: console.print(f"[yellow]El servidor no devolvió repos para [bold]{owner}[/bold].[/yellow]") with console.status("[cyan]Escaneando local…[/cyan]", spinner="dots"): local_index = scan_local(base) entries = build_entries(remote_repos, local_index) if not entries: console.print("[yellow]No hay repos remotos ni locales que mostrar.[/yellow]") return 0 try: tui(entries, base, cfg, owner) except KeyboardInterrupt: pass return 0 if __name__ == "__main__": sys.exit(main())