diff --git a/.gitignore b/.gitignore index 716a7db..870507d 100644 --- a/.gitignore +++ b/.gitignore @@ -31,7 +31,7 @@ $RECYCLE.BIN/ .LSOverride # Icon must end with two \r -Icon +Icon # Thumbnails ._* @@ -243,3 +243,10 @@ cython_debug/ # PyPI configuration file .pypirc +# Nuitka +*.build/ +*.dist/ +*.onefile-build/ + +# IDE +.vscode/ diff --git a/README.md b/README.md index a679c7c..b7ae864 100644 --- a/README.md +++ b/README.md @@ -1,3 +1,112 @@ # repoman -Utilitat per clonar masivament repositoris de gitea \ No newline at end of file +> Utilitat per clonar masivament repositoris de gitea + +Lista los repositorios de un usuario u organización en un servidor **Gitea**, los cruza con los que ya tengas clonados en una carpeta local y te deja clonar los que falten desde una lista navegable. + +## Uso + +```bash +repoman /ruta/local [] +``` + +El segundo argumento es opcional si tienes `default_owner` en la config. + +### Interacción + +Una vez dentro de la lista: + +| Tecla | Acción | +|---|---| +| `↑` / `↓` / `j` / `k` | mover cursor | +| `PgUp` / `PgDn` | saltar 10 | +| `g` / `G` | inicio / final | +| `Space` | marcar / desmarcar el repo bajo el cursor | +| `a` | marcar todos los que aún no estén en local | +| `n` | desmarcar todo | +| `Enter` | clonar los marcados (en paralelo) | +| `r` | refrescar (vuelve a consultar al servidor) | +| `q` / `Ctrl-C` | salir | + +Iconos de estado: + +- `○ remoto` — está en el servidor, no clonado localmente +- `☑ marcado` — pendiente de clonar +- `⟳ clonando…` — clone en curso +- `✓ clonado` — éxito +- `✗ ` — clone fallido (motivo abreviado) +- `● en local` — ya existía en local +- `◇ sólo local` — está en local pero no en el servidor +- `🔒` — repo privado + +## Configuración + +Se carga **siempre** desde `~/.config/repoman/config.toml` (respeta `XDG_CONFIG_HOME`): + +```toml +[server] +url = "https://gitea.example.com" +# token = "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx" # opcional, para repos privados +# default_owner = "tu-usuario" # opcional, omite el argumento +# clone_protocol = "https" # "https" (por defecto) o "ssh" +``` + +El **token** se genera en Gitea en *Settings → Applications → Generate New Token* con el scope `read:repository` como mínimo. Sin token sólo verás repos públicos. + +## Requisitos en el sistema + +Necesarios siempre: + +- `python3` (≥ 3.11, por `tomllib` en stdlib) +- `git` + +Para desarrollar (con venv): + +- `python3-venv` + +Para compilar a binario con Nuitka (`./build.sh`): + +- `python3-dev` — cabeceras `Python.h` +- `gcc` — compilador C +- `patchelf` — necesario para el modo `--onefile` en Linux + +En Debian/Ubuntu: + +```bash +sudo apt install python3 python3-venv python3-dev git gcc patchelf +``` + +## Desarrollo (con venv) + +```bash +python3 -m venv .venv +.venv/bin/pip install -r requirements.txt +.venv/bin/python repoman.py /ruta/local mi-usuario +``` + +## Compilar a binario standalone + +```bash +./build.sh +``` + +Genera `dist/repoman`, un único ejecutable autocontenido (lleva Python, rich y readchar dentro), más un `dist/repoman-vX.Y.Z-linux-arch.tar.gz` listo para distribuir. Para instalarlo en tu PATH: + +```bash +cp dist/repoman ~/.local/bin/ +``` + +## Ficheros + +- `repoman.py` — script principal +- `requirements.txt` — dependencias Python (`rich`, `readchar`) +- `build.sh` — compila el binario con Nuitka +- `README.md` — este fichero +- `.venv/` — virtualenv local (no en git) +- `dist/` — binarios compilados (no en git) + +## Notas + +- La detección de "ya clonado" compara primero la URL del `origin` con la `clone_url`/`ssh_url` del servidor; si no hay match, cae a comparar por nombre. Eso evita falsos positivos cuando tienes carpetas que se llaman igual pero apuntan a otro servidor. +- Los clones se ejecutan en paralelo (máx. 4 a la vez). +- Se usa el endpoint `GET /api/v1/repos/search?owner=` con paginación de 50 en 50, así sirve tanto para usuarios como para organizaciones. diff --git a/build.sh b/build.sh new file mode 100755 index 0000000..7cb97a6 --- /dev/null +++ b/build.sh @@ -0,0 +1,51 @@ +#!/usr/bin/env bash +# Compila repoman a un binario standalone con Nuitka y empaqueta un tar.gz de release. +# Requisitos del sistema: python3-dev, gcc, patchelf (ver README). +set -euo pipefail + +HERE="$(cd "$(dirname "$0")" && pwd)" +cd "$HERE" + +VERSION="$(grep -oP '__version__\s*=\s*"\K[^"]+' repoman.py)" +if [ -z "$VERSION" ]; then + echo "[build] no se pudo leer __version__ de repoman.py" >&2 + exit 1 +fi +ARCH="$(uname -m)" +OS="$(uname -s | tr '[:upper:]' '[:lower:]')" +RELEASE_NAME="repoman-v${VERSION}-${OS}-${ARCH}" + +if [ ! -d .venv ]; then + echo "[build] creando venv…" + python3 -m venv .venv + .venv/bin/pip install --quiet --upgrade pip + .venv/bin/pip install --quiet -r requirements.txt +fi + +if ! .venv/bin/python -c "import nuitka" 2>/dev/null; then + echo "[build] instalando nuitka en el venv…" + .venv/bin/pip install --quiet nuitka +fi + +echo "[build] versión: v${VERSION}" +echo "[build] limpiando artefactos previos…" +rm -rf dist build repoman.build repoman.dist repoman.onefile-build + +echo "[build] compilando (esto puede tardar 1-2 min)…" +.venv/bin/python -m nuitka \ + --onefile \ + --assume-yes-for-downloads \ + --output-dir=dist \ + --output-filename=repoman \ + --remove-output \ + --lto=yes \ + --include-package=rich \ + --include-package=readchar \ + repoman.py + +echo "[build] empaquetando release ${RELEASE_NAME}.tar.gz…" +tar -czf "dist/${RELEASE_NAME}.tar.gz" -C dist repoman + +echo "[build] hecho:" +ls -lh "dist/repoman" "dist/${RELEASE_NAME}.tar.gz" +echo "[build] instalar con: cp dist/repoman ~/.local/bin/" diff --git a/repoman.py b/repoman.py new file mode 100644 index 0000000..e3bb59e --- /dev/null +++ b/repoman.py @@ -0,0 +1,483 @@ +#!/usr/bin/env python3 +"""repoman — lista repos en un servidor Gitea y clona los que falten en local.""" + +from __future__ import annotations + +import argparse +import json +import os +import shutil +import subprocess +import sys +import tomllib +import urllib.error +import urllib.parse +import urllib.request +from concurrent.futures import ThreadPoolExecutor, as_completed +from dataclasses import dataclass, field +from pathlib import Path + +import readchar +from rich.console import Console, Group +from rich.live import Live +from rich.table import Table +from rich.text import Text + +__version__ = "1.0.0" + +console = Console() + +CONFIG_PATH = Path(os.environ.get("XDG_CONFIG_HOME", Path.home() / ".config")) / "repoman" / "config.toml" + +CONFIG_EXAMPLE = """\ +# ~/.config/repoman/config.toml +[server] +url = "https://gitea.example.com" +# token = "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx" # opcional, para repos privados +# default_owner = "tu-usuario" # opcional, omite el argumento +# clone_protocol = "https" # "https" (por defecto) o "ssh" +""" + + +@dataclass +class Config: + url: str + token: str | None = None + default_owner: str | None = None + clone_protocol: str = "https" + + +@dataclass +class RemoteRepo: + name: str + full_name: str + clone_url: str + ssh_url: str + description: str = "" + private: bool = False + default_branch: str = "main" + + +@dataclass +class RepoEntry: + name: str + full_name: str + remote: RemoteRepo | None + local_path: Path | None # None si no está clonado + selected: bool = False + cloning: bool = False + cloned_ok: bool | None = None # None = no intentado, True/False = resultado + error: str = "" + + +# --- config ----------------------------------------------------------------- + +def load_config() -> Config: + if not CONFIG_PATH.exists(): + console.print(f"[red]No se ha encontrado la configuración en[/red] [bold]{CONFIG_PATH}[/bold]") + console.print("[yellow]Crea el fichero con este contenido mínimo:[/yellow]\n") + console.print(CONFIG_EXAMPLE) + sys.exit(1) + try: + with CONFIG_PATH.open("rb") as fh: + raw = tomllib.load(fh) + except (OSError, tomllib.TOMLDecodeError) as e: + console.print(f"[red]Error leyendo {CONFIG_PATH}: {e}[/red]") + sys.exit(1) + server = raw.get("server") or {} + url = server.get("url") + if not url: + console.print(f"[red]Falta [bold]server.url[/bold] en {CONFIG_PATH}[/red]") + sys.exit(1) + proto = (server.get("clone_protocol") or "https").lower() + if proto not in ("https", "ssh"): + console.print(f"[red]server.clone_protocol debe ser 'https' o 'ssh' (encontrado: {proto})[/red]") + sys.exit(1) + return Config( + url=url.rstrip("/"), + token=server.get("token") or None, + default_owner=server.get("default_owner") or None, + clone_protocol=proto, + ) + + +# --- cliente Gitea ---------------------------------------------------------- + +def gitea_get(cfg: Config, path: str, params: dict[str, str] | None = None) -> tuple[int, bytes, dict[str, str]]: + qs = ("?" + urllib.parse.urlencode(params)) if params else "" + url = f"{cfg.url}/api/v1{path}{qs}" + req = urllib.request.Request(url, headers={"Accept": "application/json"}) + if cfg.token: + req.add_header("Authorization", f"token {cfg.token}") + try: + with urllib.request.urlopen(req, timeout=30) as resp: + return resp.status, resp.read(), dict(resp.headers) + except urllib.error.HTTPError as e: + return e.code, e.read() if e.fp else b"", dict(e.headers) if e.headers else {} + except urllib.error.URLError as e: + raise RuntimeError(f"no se pudo contactar con {cfg.url}: {e.reason}") from e + + +def fetch_remote_repos(cfg: Config, owner: str) -> list[RemoteRepo]: + repos: list[RemoteRepo] = [] + page = 1 + while True: + status, body, _ = gitea_get( + cfg, + "/repos/search", + {"owner": owner, "limit": "50", "page": str(page)}, + ) + if status == 401: + raise RuntimeError("401 no autorizado — revisa server.token en la config") + if status >= 400: + raise RuntimeError(f"Gitea devolvió HTTP {status}: {body[:200].decode('utf-8', 'replace')}") + data = json.loads(body or b"{}") + items = data.get("data") or [] + if not items: + break + for item in items: + repos.append( + RemoteRepo( + name=item.get("name", ""), + full_name=item.get("full_name", ""), + clone_url=item.get("clone_url", ""), + ssh_url=item.get("ssh_url", ""), + description=item.get("description") or "", + private=bool(item.get("private", False)), + default_branch=item.get("default_branch") or "main", + ) + ) + if len(items) < 50: + break + page += 1 + return repos + + +# --- escaneo local ---------------------------------------------------------- + +def is_git_repo(path: Path) -> bool: + return path.is_dir() and (path / ".git").exists() + + +def local_remote_url(path: Path) -> str: + try: + out = subprocess.run( + ["git", "-C", str(path), "remote", "get-url", "origin"], + capture_output=True, + text=True, + timeout=5, + ) + return out.stdout.strip() if out.returncode == 0 else "" + except (subprocess.TimeoutExpired, FileNotFoundError): + return "" + + +def scan_local(base: Path) -> dict[str, Path]: + """Devuelve {url_normalizada_or_name: ruta_local} para cada repo git en `base`.""" + found: dict[str, Path] = {} + try: + entries = [p for p in base.iterdir() if is_git_repo(p)] + except (PermissionError, FileNotFoundError): + return found + for p in entries: + url = local_remote_url(p) + if url: + found[normalize_url(url)] = p + # Siempre indexamos también por nombre como fallback + found.setdefault(f"name::{p.name}", p) + return found + + +def normalize_url(url: str) -> str: + """Normaliza una URL git para comparar (quita .git, lower, ssh→https-like).""" + u = url.strip().lower() + if u.endswith(".git"): + u = u[:-4] + # git@host:owner/repo -> host/owner/repo + if u.startswith("git@"): + u = u[4:].replace(":", "/", 1) + # https://host/owner/repo -> host/owner/repo + for prefix in ("https://", "http://", "ssh://git@", "ssh://"): + if u.startswith(prefix): + u = u[len(prefix):] + break + return u + + +# --- emparejado local/remoto ------------------------------------------------ + +def build_entries(remote_repos: list[RemoteRepo], local_index: dict[str, Path]) -> list[RepoEntry]: + entries: list[RepoEntry] = [] + matched_paths: set[Path] = set() + for r in remote_repos: + local: Path | None = None + for candidate_url in (r.clone_url, r.ssh_url): + key = normalize_url(candidate_url) + if key in local_index: + local = local_index[key] + break + if local is None and f"name::{r.name}" in local_index: + local = local_index[f"name::{r.name}"] + if local is not None: + matched_paths.add(local) + entries.append( + RepoEntry( + name=r.name, + full_name=r.full_name, + remote=r, + local_path=local, + ) + ) + # Añadir repos locales que NO están en el servidor (por info) + for key, path in local_index.items(): + if key.startswith("name::"): + continue + if path in matched_paths: + continue + entries.append( + RepoEntry( + name=path.name, + full_name=path.name, + remote=None, + local_path=path, + ) + ) + entries.sort(key=lambda e: (e.local_path is None, e.name.lower())) + return entries + + +# --- render ----------------------------------------------------------------- + +def render(entries: list[RepoEntry], cursor: int, base: Path, owner: str, status_msg: str = "") -> Group: + table = Table( + show_header=True, + header_style="bold magenta", + title=f"[bold]repoman[/bold] — [cyan]{owner}[/cyan] → [bold]{base}[/bold]", + title_style="white", + expand=True, + ) + table.add_column(" ", width=1, no_wrap=True) + table.add_column("Estado", width=14, no_wrap=True) + table.add_column("Repo", style="bold", no_wrap=True, overflow="ellipsis") + table.add_column("Descripción", overflow="ellipsis", ratio=1) + + for i, e in enumerate(entries): + # Cursor + caret = Text("▶", style="bold yellow") if i == cursor else Text(" ") + + # Estado + if e.cloning: + state = Text("⟳ clonando…", style="bold blue") + elif e.cloned_ok is True: + state = Text("✓ clonado", style="bold green") + elif e.cloned_ok is False: + state = Text(f"✗ {e.error[:11]}", style="bold red") + elif e.local_path is not None and e.remote is None: + state = Text("◇ sólo local", style="dim yellow") + elif e.local_path is not None: + state = Text("● en local", style="green") + elif e.selected: + state = Text("☑ marcado", style="bold cyan") + else: + state = Text("○ remoto", style="dim") + + # Nombre con marca de privado + name = Text(e.name) + if e.remote and e.remote.private: + name = Text("🔒 ", style="yellow") + name + if i == cursor: + name.stylize("bold underline") + + desc = (e.remote.description if e.remote else "(no está en el servidor)") or "—" + table.add_row(caret, state, name, desc) + + legend = Text.assemble( + ("↑/↓ j/k", "bold cyan"), " mover ", + ("Space", "bold cyan"), " marcar/desmarcar ", + ("a", "bold cyan"), " marcar todos remotos ", + ("n", "bold cyan"), " ninguno ", + ("Enter", "bold green"), " clonar marcados ", + ("r", "bold cyan"), " refrescar ", + ("q", "bold cyan"), " salir", + style="dim", + ) + status = Text(status_msg, style="dim italic") if status_msg else Text("") + return Group(table, legend, status) + + +# --- clonado ---------------------------------------------------------------- + +def clone_one(entry: RepoEntry, base: Path, cfg: Config) -> tuple[RepoEntry, bool, str]: + if entry.remote is None: + return entry, False, "sin info remota" + url = entry.remote.ssh_url if cfg.clone_protocol == "ssh" else entry.remote.clone_url + if not url: + return entry, False, "URL vacía" + target = base / entry.name + if target.exists(): + return entry, False, "ya existe en local" + try: + result = subprocess.run( + ["git", "clone", "--quiet", url, str(target)], + capture_output=True, + text=True, + timeout=600, + ) + if result.returncode != 0: + err = (result.stderr or result.stdout or "git clone falló").strip().splitlines() + return entry, False, err[-1][:80] if err else "error" + return entry, True, "ok" + except subprocess.TimeoutExpired: + return entry, False, "timeout" + except FileNotFoundError: + return entry, False, "git no encontrado" + + +def run_clone_queue(entries: list[RepoEntry], base: Path, cfg: Config, live: Live, cursor: int, owner: str) -> int: + pending = [e for e in entries if e.selected and e.local_path is None and e.remote is not None] + if not pending: + live.update(render(entries, cursor, base, owner, "Nada marcado para clonar.")) + return 0 + for e in pending: + e.cloning = True + e.selected = False + live.update(render(entries, cursor, base, owner, f"Clonando {len(pending)} repo(s)…")) + ok = 0 + max_workers = min(4, len(pending)) + with ThreadPoolExecutor(max_workers=max_workers) as ex: + futures = {ex.submit(clone_one, e, base, cfg): e for e in pending} + for fut in as_completed(futures): + entry, success, msg = fut.result() + entry.cloning = False + entry.cloned_ok = success + entry.error = "" if success else msg + if success: + entry.local_path = base / entry.name + ok += 1 + live.update(render(entries, cursor, base, owner, f"Clonando… {ok}/{len(pending)} listos")) + live.update(render(entries, cursor, base, owner, f"Hecho: {ok}/{len(pending)} clonados.")) + return ok + + +# --- TUI loop --------------------------------------------------------------- + +def tui(entries: list[RepoEntry], base: Path, cfg: Config, owner: str) -> None: + if not entries: + console.print("[yellow]No hay repos para mostrar.[/yellow]") + return + cursor = 0 + status_msg = "" + with Live(render(entries, cursor, base, owner, status_msg), console=console, screen=False, auto_refresh=False) as live: + while True: + live.update(render(entries, cursor, base, owner, status_msg), refresh=True) + try: + key = readchar.readkey() + except KeyboardInterrupt: + return + status_msg = "" + if key in ("q", "Q", readchar.key.CTRL_C, readchar.key.CTRL_D): + return + elif key in (readchar.key.UP, "k"): + cursor = (cursor - 1) % len(entries) + elif key in (readchar.key.DOWN, "j"): + cursor = (cursor + 1) % len(entries) + elif key in (readchar.key.PAGE_UP,): + cursor = max(0, cursor - 10) + elif key in (readchar.key.PAGE_DOWN,): + cursor = min(len(entries) - 1, cursor + 10) + elif key in (readchar.key.HOME, "g"): + cursor = 0 + elif key in (readchar.key.END, "G"): + cursor = len(entries) - 1 + elif key == " ": + e = entries[cursor] + if e.local_path is not None or e.remote is None: + status_msg = "ese repo no se puede clonar (ya está en local o sólo es local)" + else: + e.selected = not e.selected + elif key == "a": + for e in entries: + if e.local_path is None and e.remote is not None: + e.selected = True + elif key == "n": + for e in entries: + e.selected = False + elif key in (readchar.key.ENTER, "\r", "\n"): + run_clone_queue(entries, base, cfg, live, cursor, owner) + elif key in ("r", "R"): + status_msg = "refrescando…" + live.update(render(entries, cursor, base, owner, status_msg), refresh=True) + try: + remote_repos = fetch_remote_repos(cfg, owner) + except RuntimeError as e: + status_msg = f"error al refrescar: {e}" + continue + local_index = scan_local(base) + new_entries = build_entries(remote_repos, local_index) + entries[:] = new_entries + cursor = min(cursor, len(entries) - 1) if entries else 0 + status_msg = f"refrescado: {len(entries)} repo(s)" + + +# --- entrypoint ------------------------------------------------------------- + +def main() -> int: + parser = argparse.ArgumentParser( + prog="repoman", + description="Lista repos en un servidor Gitea y clona los que faltan en local.", + ) + parser.add_argument("-v", "--version", action="version", version=f"repoman v{__version__}") + parser.add_argument("path", type=Path, help="Carpeta local donde se clonarán los repos") + parser.add_argument("owner", nargs="?", help="Usuario u organización en Gitea (opcional si default_owner está en la config)") + args = parser.parse_args() + + if shutil.which("git") is None: + console.print("[red]git no está instalado o no se encuentra en el PATH[/red]") + return 1 + + cfg = load_config() + owner = args.owner or cfg.default_owner + if not owner: + console.print("[red]Falta el argumento [bold]owner[/bold] (o define default_owner en la config)[/red]") + return 2 + + base: Path = args.path.expanduser().resolve() + if not base.exists(): + console.print(f"[yellow]La carpeta {base} no existe.[/yellow]") + try: + ans = console.input("[bold]¿Crearla? [s/N][/bold] ").strip().lower() + except (EOFError, KeyboardInterrupt): + console.print() + return 1 + if ans not in ("s", "si", "sí", "y", "yes"): + return 1 + base.mkdir(parents=True, exist_ok=True) + elif not base.is_dir(): + console.print(f"[red]{base} no es un directorio[/red]") + return 1 + + console.print(f"[dim]repoman v{__version__} — servidor[/dim] [bold]{cfg.url}[/bold] [dim]· owner[/dim] [cyan]{owner}[/cyan]") + with console.status("[cyan]Consultando Gitea…[/cyan]", spinner="dots"): + try: + remote_repos = fetch_remote_repos(cfg, owner) + except RuntimeError as e: + console.print(f"[red]{e}[/red]") + return 1 + if not remote_repos: + console.print(f"[yellow]El servidor no devolvió repos para [bold]{owner}[/bold].[/yellow]") + with console.status("[cyan]Escaneando local…[/cyan]", spinner="dots"): + local_index = scan_local(base) + entries = build_entries(remote_repos, local_index) + if not entries: + console.print("[yellow]No hay repos remotos ni locales que mostrar.[/yellow]") + return 0 + + try: + tui(entries, base, cfg, owner) + except KeyboardInterrupt: + pass + return 0 + + +if __name__ == "__main__": + sys.exit(main()) diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..db4df61 --- /dev/null +++ b/requirements.txt @@ -0,0 +1,3 @@ +rich>=13.0 +readchar>=4.0 +zstandard>=0.22