Files

572 lines
21 KiB
Python

#!/usr/bin/env python3
"""repoman — lista repos en un servidor Gitea y clona los que falten en local."""
from __future__ import annotations
import argparse
import json
import os
import shutil
import subprocess
import sys
import threading
import time
import tomllib
import urllib.error
import urllib.parse
import urllib.request
from concurrent.futures import ThreadPoolExecutor, as_completed
from dataclasses import dataclass, field
from pathlib import Path
import readchar
from rich.console import Console, Group
from rich.live import Live
from rich.table import Table
from rich.text import Text
__version__ = "1.0.5"
console = Console()
CONFIG_PATH = Path(os.environ.get("XDG_CONFIG_HOME", Path.home() / ".config")) / "repoman" / "config.toml"
CONFIG_EXAMPLE = """\
# ~/.config/repoman/config.toml
[server]
url = "https://gitea.example.com"
# token = "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx" # opcional, para repos privados
# default_owner = "tu-usuario" # opcional, omite el argumento
# clone_protocol = "https" # "https" (por defecto) o "ssh"
"""
@dataclass
class Config:
url: str
token: str | None = None
default_owner: str | None = None
clone_protocol: str = "https"
@dataclass
class RemoteRepo:
name: str
full_name: str
clone_url: str
ssh_url: str
description: str = ""
private: bool = False
default_branch: str = "main"
@dataclass
class RepoEntry:
name: str
full_name: str
remote: RemoteRepo | None
local_path: Path | None # None si no está clonado
selected: bool = False
cloning: bool = False
cloned_ok: bool | None = None # None = no intentado, True/False = resultado
error: str = ""
# --- config -----------------------------------------------------------------
def create_default_config() -> int:
if CONFIG_PATH.exists():
console.print(f"[yellow]Ya existe[/yellow] [bold]{CONFIG_PATH}[/bold] — no se sobreescribe.")
return 1
try:
CONFIG_PATH.parent.mkdir(parents=True, exist_ok=True)
CONFIG_PATH.write_text(CONFIG_EXAMPLE)
except OSError as e:
console.print(f"[red]No se pudo escribir {CONFIG_PATH}: {e}[/red]")
return 1
console.print(f"[green]Config creada en[/green] [bold]{CONFIG_PATH}[/bold]")
console.print("[dim]Edítala y descomenta token / default_owner / clone_protocol si los necesitas.[/dim]")
return 0
def load_config() -> Config:
if not CONFIG_PATH.exists():
console.print(f"[red]No se ha encontrado la configuración en[/red] [bold]{CONFIG_PATH}[/bold]")
console.print("[yellow]Crea el fichero con este contenido mínimo:[/yellow]\n")
console.print(CONFIG_EXAMPLE)
sys.exit(1)
try:
with CONFIG_PATH.open("rb") as fh:
raw = tomllib.load(fh)
except (OSError, tomllib.TOMLDecodeError) as e:
console.print(f"[red]Error leyendo {CONFIG_PATH}: {e}[/red]")
sys.exit(1)
server = raw.get("server") or {}
url = server.get("url")
if not url:
console.print(f"[red]Falta [bold]server.url[/bold] en {CONFIG_PATH}[/red]")
sys.exit(1)
proto = (server.get("clone_protocol") or "https").lower()
if proto not in ("https", "ssh"):
console.print(f"[red]server.clone_protocol debe ser 'https' o 'ssh' (encontrado: {proto})[/red]")
sys.exit(1)
return Config(
url=url.rstrip("/"),
token=server.get("token") or None,
default_owner=server.get("default_owner") or None,
clone_protocol=proto,
)
# --- cliente Gitea ----------------------------------------------------------
def gitea_get(cfg: Config, path: str, params: dict[str, str] | None = None) -> tuple[int, bytes, dict[str, str]]:
qs = ("?" + urllib.parse.urlencode(params)) if params else ""
url = f"{cfg.url}/api/v1{path}{qs}"
req = urllib.request.Request(url, headers={"Accept": "application/json"})
if cfg.token:
req.add_header("Authorization", f"token {cfg.token}")
try:
with urllib.request.urlopen(req, timeout=30) as resp:
return resp.status, resp.read(), dict(resp.headers)
except urllib.error.HTTPError as e:
return e.code, e.read() if e.fp else b"", dict(e.headers) if e.headers else {}
except urllib.error.URLError as e:
raise RuntimeError(f"no se pudo contactar con {cfg.url}: {e.reason}") from e
def _oneline(s: str) -> str:
"""Col·lapsa qualsevol salt de línia o whitespace múltiple en un sol espai."""
return " ".join((s or "").split())
def _parse_repo_list(body: bytes) -> list[RemoteRepo]:
items = json.loads(body or b"[]")
return [
RemoteRepo(
name=item.get("name", ""),
full_name=item.get("full_name", ""),
clone_url=item.get("clone_url", ""),
ssh_url=item.get("ssh_url", ""),
description=_oneline(item.get("description") or ""),
private=bool(item.get("private", False)),
default_branch=item.get("default_branch") or "main",
)
for item in items
]
def fetch_remote_repos(cfg: Config, owner: str) -> list[RemoteRepo]:
"""Lista los repos de un owner. Prueba primero como organización, luego como usuario."""
for endpoint in (f"/orgs/{owner}/repos", f"/users/{owner}/repos"):
repos: list[RemoteRepo] = []
page = 1
found = False
while True:
status, body, _ = gitea_get(cfg, endpoint, {"limit": "50", "page": str(page)})
if status == 401:
raise RuntimeError("401 no autorizado — revisa server.token en la config")
if status == 404:
break # no es este tipo, probamos el siguiente
if status >= 400:
raise RuntimeError(f"Gitea devolvió HTTP {status}: {body[:200].decode('utf-8', 'replace')}")
found = True
items = _parse_repo_list(body)
repos.extend(items)
if len(items) < 50:
break
page += 1
if found:
return repos
raise RuntimeError(f"'{owner}' no existe como usuario ni como organización en el servidor")
# --- escaneo local ----------------------------------------------------------
def is_git_repo(path: Path) -> bool:
return path.is_dir() and (path / ".git").exists()
def local_remote_url(path: Path) -> str:
try:
out = subprocess.run(
["git", "-C", str(path), "remote", "get-url", "origin"],
capture_output=True,
text=True,
timeout=5,
)
return out.stdout.strip() if out.returncode == 0 else ""
except (subprocess.TimeoutExpired, FileNotFoundError):
return ""
def scan_local(base: Path) -> dict[str, Path]:
"""Devuelve {url_normalizada_or_name: ruta_local} para cada repo git en `base`."""
found: dict[str, Path] = {}
try:
entries = [p for p in base.iterdir() if is_git_repo(p)]
except (PermissionError, FileNotFoundError):
return found
for p in entries:
url = local_remote_url(p)
if url:
found[normalize_url(url)] = p
else:
# Sense origin → emparellem pel nom (per a repos locals sense remote)
found[f"name::{p.name}"] = p
return found
def normalize_url(url: str) -> str:
"""Normaliza una URL git para comparar (quita .git, lower, ssh→https-like)."""
u = url.strip().lower()
if u.endswith(".git"):
u = u[:-4]
# git@host:owner/repo -> host/owner/repo
if u.startswith("git@"):
u = u[4:].replace(":", "/", 1)
# https://host/owner/repo -> host/owner/repo
for prefix in ("https://", "http://", "ssh://git@", "ssh://"):
if u.startswith(prefix):
u = u[len(prefix):]
break
return u
# --- emparejado local/remoto ------------------------------------------------
def build_entries(remote_repos: list[RemoteRepo], local_index: dict[str, Path]) -> list[RepoEntry]:
entries: list[RepoEntry] = []
for r in remote_repos:
local: Path | None = None
for candidate_url in (r.clone_url, r.ssh_url):
key = normalize_url(candidate_url)
if key in local_index:
local = local_index[key]
break
if local is None and f"name::{r.name}" in local_index:
local = local_index[f"name::{r.name}"]
entries.append(
RepoEntry(
name=r.name,
full_name=r.full_name,
remote=r,
local_path=local,
)
)
entries.sort(key=lambda e: e.name.lower())
return entries
# --- render -----------------------------------------------------------------
SPINNER_FRAMES = ["", "", "", "", "", "", "", "", "", ""]
def spinner_frame() -> str:
return SPINNER_FRAMES[int(time.monotonic() * 10) % len(SPINNER_FRAMES)]
def compute_page(cursor: int, total: int) -> tuple[int, int, int, int, int]:
"""Calcula (start, end, page_idx, total_pages, page_size) per a la finestra visible."""
h = max(10, console.size.height)
# Marges: 1 títol + 1 separador títol + 1 cabecera + 2 vores + 1 llegenda + 1 status + 1 buffer
page_size = max(5, h - 8)
if total == 0:
return 0, 0, 0, 1, page_size
page = cursor // page_size
start = page * page_size
end = min(start + page_size, total)
total_pages = (total + page_size - 1) // page_size
return start, end, page, total_pages, page_size
def render(entries: list[RepoEntry], cursor: int, base: Path, owner: str, status_msg: str = "") -> Group:
start, end, page_idx, total_pages, _ = compute_page(cursor, len(entries))
page_label = f" — pàgina {page_idx + 1}/{total_pages}" if total_pages > 1 else ""
table = Table(
show_header=True,
header_style="bold magenta",
title=f"[bold]repoman[/bold] — [cyan]{owner}[/cyan] → [bold]{base}[/bold][dim]{page_label}[/dim]",
title_style="white",
expand=True,
)
table.add_column(" ", width=1, no_wrap=True)
table.add_column("Estado", width=14, no_wrap=True)
table.add_column("Repo", style="bold", no_wrap=True, overflow="ellipsis")
table.add_column("Descripción", no_wrap=True, overflow="ellipsis", ratio=1)
for i in range(start, end):
e = entries[i]
# Cursor
caret = Text("", style="bold yellow") if i == cursor else Text(" ")
# Estat
if e.cloning:
state = Text(f"{spinner_frame()} clonant…", style="bold blue")
elif e.cloned_ok is True:
state = Text("✓ clonat", style="bold green")
elif e.cloned_ok is False:
state = Text("✗ error", style="bold red")
elif e.local_path is not None and e.remote is None:
state = Text("◇ només local", style="dim yellow")
elif e.local_path is not None:
state = Text("● en local", style="green")
elif e.selected:
state = Text("☑ marcat", style="bold cyan")
else:
state = Text("○ remot", style="dim")
# Nom amb marca de privat
name = Text(e.name)
if e.remote and e.remote.private:
name = Text("🔒 ", style="yellow") + name
if i == cursor:
name.stylize("bold underline")
# Descripció: motiu de l'error si n'hi ha, si no la del repo
if e.cloned_ok is False and e.error:
desc = Text(e.error, style="red")
else:
desc_text = (e.remote.description if e.remote else "(no està al servidor)") or ""
desc = Text(desc_text)
table.add_row(caret, state, name, desc)
legend = Text.assemble(
("↑/↓ j/k", "bold cyan"), " moure ",
("←/→ h/l", "bold cyan"), " pàgina ",
("Space", "bold cyan"), " marcar ",
("a", "bold cyan"), " tots ",
("n", "bold cyan"), " cap ",
("Enter", "bold green"), " clona ",
("r", "bold cyan"), " refresca ",
("q", "bold cyan"), " surt",
style="dim",
)
status = Text(status_msg, style="dim italic") if status_msg else Text("")
return Group(table, legend, status)
# --- clonado ----------------------------------------------------------------
def clone_one(entry: RepoEntry, base: Path, cfg: Config) -> tuple[RepoEntry, bool, str]:
if entry.remote is None:
return entry, False, "sense info remota"
url = entry.remote.ssh_url if cfg.clone_protocol == "ssh" else entry.remote.clone_url
if not url:
return entry, False, "URL buida"
target = base / entry.name
if target.exists():
return entry, False, "ja existeix en local"
env = {
**os.environ,
"GIT_TERMINAL_PROMPT": "0", # mai demanar credencials interactivament
"GIT_ASKPASS": "/bin/true", # cap helper interactiu
"SSH_ASKPASS": "/bin/true",
}
try:
result = subprocess.run(
["git", "clone", "--quiet", url, str(target)],
capture_output=True,
text=True,
timeout=600,
stdin=subprocess.DEVNULL,
env=env,
)
if result.returncode != 0:
# Netegem la carpeta parcial si git la va crear
if target.exists():
shutil.rmtree(target, ignore_errors=True)
err_lines = (result.stderr or result.stdout or "git clone ha fallat").strip().splitlines()
err_msg = " ".join(l.strip() for l in err_lines if l.strip())
return entry, False, err_msg or "error desconegut"
return entry, True, "ok"
except subprocess.TimeoutExpired:
if target.exists():
shutil.rmtree(target, ignore_errors=True)
return entry, False, "timeout (>10 min)"
except FileNotFoundError:
return entry, False, "git no trobat"
def run_clone_queue(entries: list[RepoEntry], base: Path, cfg: Config, live: Live, cursor: int, owner: str) -> int:
pending = [e for e in entries if e.selected and e.local_path is None and e.remote is not None]
if not pending:
live.update(render(entries, cursor, base, owner, "Res marcat per clonar."), refresh=True)
return 0
for e in pending:
e.cloning = True
e.selected = False
total = len(pending)
stop = threading.Event()
state = {"ok": 0, "done": 0}
def animate() -> None:
while not stop.is_set():
in_progress = total - state["done"]
msg = f"Clonant {in_progress} en curs · {state['done']}/{total} acabats"
live.update(render(entries, cursor, base, owner, msg), refresh=True)
stop.wait(0.1)
anim = threading.Thread(target=animate, daemon=True)
anim.start()
try:
max_workers = min(4, total)
with ThreadPoolExecutor(max_workers=max_workers) as ex:
futures = {ex.submit(clone_one, e, base, cfg): e for e in pending}
for fut in as_completed(futures):
entry, success, msg = fut.result()
entry.cloning = False
entry.cloned_ok = success
entry.error = "" if success else msg
if success:
entry.local_path = base / entry.name
state["ok"] += 1
state["done"] += 1
finally:
stop.set()
anim.join()
live.update(render(entries, cursor, base, owner, f"Fet: {state['ok']}/{total} clonats."), refresh=True)
return state["ok"]
# --- TUI loop ---------------------------------------------------------------
def tui(entries: list[RepoEntry], base: Path, cfg: Config, owner: str) -> None:
if not entries:
console.print("[yellow]No hay repos para mostrar.[/yellow]")
return
cursor = 0
status_msg = ""
with Live(render(entries, cursor, base, owner, status_msg), console=console, screen=False, auto_refresh=False) as live:
while True:
live.update(render(entries, cursor, base, owner, status_msg), refresh=True)
try:
key = readchar.readkey()
except KeyboardInterrupt:
return
status_msg = ""
if key in ("q", "Q", readchar.key.CTRL_C, readchar.key.CTRL_D):
return
elif key in (readchar.key.UP, "k"):
cursor = (cursor - 1) % len(entries)
elif key in (readchar.key.DOWN, "j"):
cursor = (cursor + 1) % len(entries)
elif key in (readchar.key.LEFT, readchar.key.PAGE_UP, "h"):
_, _, page_idx, total_pages, page_size = compute_page(cursor, len(entries))
new_page = (page_idx - 1) % total_pages
cursor = min(new_page * page_size, len(entries) - 1)
elif key in (readchar.key.RIGHT, readchar.key.PAGE_DOWN, "l"):
_, _, page_idx, total_pages, page_size = compute_page(cursor, len(entries))
new_page = (page_idx + 1) % total_pages
cursor = min(new_page * page_size, len(entries) - 1)
elif key in (readchar.key.HOME, "g"):
cursor = 0
elif key in (readchar.key.END, "G"):
cursor = len(entries) - 1
elif key == " ":
e = entries[cursor]
if e.local_path is not None or e.remote is None:
status_msg = "ese repo no se puede clonar (ya está en local o sólo es local)"
else:
e.selected = not e.selected
elif key == "a":
for e in entries:
if e.local_path is None and e.remote is not None:
e.selected = True
elif key == "n":
for e in entries:
e.selected = False
elif key in (readchar.key.ENTER, "\r", "\n"):
run_clone_queue(entries, base, cfg, live, cursor, owner)
elif key in ("r", "R"):
status_msg = "refrescando…"
live.update(render(entries, cursor, base, owner, status_msg), refresh=True)
try:
remote_repos = fetch_remote_repos(cfg, owner)
except RuntimeError as e:
status_msg = f"error al refrescar: {e}"
continue
local_index = scan_local(base)
new_entries = build_entries(remote_repos, local_index)
entries[:] = new_entries
cursor = min(cursor, len(entries) - 1) if entries else 0
status_msg = f"refrescado: {len(entries)} repo(s)"
# --- entrypoint -------------------------------------------------------------
def main() -> int:
parser = argparse.ArgumentParser(
prog="repoman",
description="Lista repos en un servidor Gitea y clona los que faltan en local.",
)
parser.add_argument("-v", "--version", action="version", version=f"repoman v{__version__}")
parser.add_argument(
"--create-default-config",
action="store_true",
help=f"Crea un fichero de config por defecto en {CONFIG_PATH} y sale",
)
parser.add_argument("path", type=Path, nargs="?", help="Carpeta local donde se clonarán los repos")
parser.add_argument("owner", nargs="?", help="Usuario u organización en Gitea (opcional si default_owner está en la config)")
args = parser.parse_args()
if args.create_default_config:
return create_default_config()
if args.path is None:
parser.error("falta el argumento path")
if shutil.which("git") is None:
console.print("[red]git no está instalado o no se encuentra en el PATH[/red]")
return 1
cfg = load_config()
owner = args.owner or cfg.default_owner
if not owner:
console.print("[red]Falta el argumento [bold]owner[/bold] (o define default_owner en la config)[/red]")
return 2
base: Path = args.path.expanduser().resolve()
if not base.exists():
console.print(f"[yellow]La carpeta {base} no existe.[/yellow]")
try:
ans = console.input("[bold]¿Crearla? [s/N][/bold] ").strip().lower()
except (EOFError, KeyboardInterrupt):
console.print()
return 1
if ans not in ("s", "si", "", "y", "yes"):
return 1
base.mkdir(parents=True, exist_ok=True)
elif not base.is_dir():
console.print(f"[red]{base} no es un directorio[/red]")
return 1
console.print(f"[dim]repoman v{__version__} — servidor[/dim] [bold]{cfg.url}[/bold] [dim]· owner[/dim] [cyan]{owner}[/cyan]")
with console.status("[cyan]Consultando Gitea…[/cyan]", spinner="dots"):
try:
remote_repos = fetch_remote_repos(cfg, owner)
except RuntimeError as e:
console.print(f"[red]{e}[/red]")
return 1
if not remote_repos:
console.print(f"[yellow]El servidor no devolvió repos para [bold]{owner}[/bold].[/yellow]")
with console.status("[cyan]Escaneando local…[/cyan]", spinner="dots"):
local_index = scan_local(base)
entries = build_entries(remote_repos, local_index)
if not entries:
console.print("[yellow]No hay repos remotos ni locales que mostrar.[/yellow]")
return 0
try:
tui(entries, base, cfg, owner)
except KeyboardInterrupt:
pass
return 0
if __name__ == "__main__":
sys.exit(main())