#!/usr/bin/env python3 """gitswarm — lista y actualiza repos git de primer nivel en una carpeta.""" from __future__ import annotations import argparse import shlex import subprocess import sys from concurrent.futures import ThreadPoolExecutor from dataclasses import dataclass, field from pathlib import Path from rich.console import Console from rich.prompt import Prompt from rich.table import Table from rich.text import Text console = Console() @dataclass class RepoStatus: name: str path: Path branch: str = "-" last_commit: str = "-" dirty: bool = False ahead: int = 0 behind: int = 0 has_upstream: bool = True error: str | None = None dirty_files: list[str] = field(default_factory=list) def run_git(repo: Path, *args: str, timeout: int = 30) -> tuple[int, str, str]: result = subprocess.run( ["git", *args], cwd=repo, capture_output=True, text=True, timeout=timeout, ) return result.returncode, result.stdout.strip(), result.stderr.strip() def is_git_repo(path: Path) -> bool: return path.is_dir() and (path / ".git").exists() def gather_status(path: Path, fetch: bool = True) -> RepoStatus: s = RepoStatus(name=path.name, path=path) try: rc, out, err = run_git(path, "rev-parse", "--abbrev-ref", "HEAD") if rc != 0: s.error = err or "no se pudo leer HEAD" return s s.branch = "(detached)" if out == "HEAD" else out rc, out, _ = run_git(path, "log", "-1", "--pretty=format:%h %s") if rc == 0 and out: s.last_commit = out rc, out, _ = run_git(path, "status", "--porcelain") if out: s.dirty = True s.dirty_files = out.splitlines() if fetch: run_git(path, "fetch", "--quiet", "--all", timeout=120) rc, out, _ = run_git( path, "rev-list", "--left-right", "--count", "@{u}...HEAD" ) if rc != 0: s.has_upstream = False else: try: behind_s, ahead_s = out.split() s.behind = int(behind_s) s.ahead = int(ahead_s) except ValueError: s.has_upstream = False except subprocess.TimeoutExpired: s.error = "timeout" except FileNotFoundError: s.error = "git no encontrado en el PATH" except Exception as e: s.error = str(e) return s def scan(base: Path, fetch: bool = True) -> list[RepoStatus]: try: candidates = sorted(p for p in base.iterdir() if is_git_repo(p)) except PermissionError as e: console.print(f"[red]Sin permisos para leer {base}: {e}[/red]") return [] if not candidates: return [] msg = f"[cyan]Analizando {len(candidates)} repo(s){' (con fetch)' if fetch else ''}…[/cyan]" with console.status(msg, spinner="dots"): with ThreadPoolExecutor(max_workers=min(8, len(candidates))) as ex: return list(ex.map(lambda p: gather_status(p, fetch=fetch), candidates)) def status_cell(s: RepoStatus) -> Text: if s.error: return Text(f"ERROR: {s.error}", style="bold red") if not s.has_upstream: return Text("sin upstream", style="yellow") if s.behind == 0 and s.ahead == 0: return Text("✓ al día", style="bold green") parts: list[Text] = [] if s.behind: parts.append(Text(f"↓{s.behind}", style="bold red")) if s.ahead: parts.append(Text(f"↑{s.ahead}", style="bold cyan")) return Text(" ").join(parts) def render_table(repos: list[RepoStatus]) -> Table: table = Table( show_header=True, header_style="bold magenta", title="Repositorios git", title_style="bold white", expand=True, ) table.add_column("#", justify="right", style="dim") table.add_column("Repo", style="bold", no_wrap=True, overflow="ellipsis") table.add_column("Rama", style="cyan", no_wrap=True, overflow="ellipsis") table.add_column("Estado", no_wrap=True) table.add_column("Local", no_wrap=True) table.add_column("Último commit", no_wrap=True, overflow="ellipsis", ratio=1) for i, s in enumerate(repos, 1): dirty = ( Text(f"● {len(s.dirty_files)} cambio(s)", style="bold yellow") if s.dirty else Text("limpio", style="dim green") ) table.add_row( str(i), s.name, s.branch, status_cell(s), dirty, s.last_commit, ) return table def update_repo(s: RepoStatus) -> tuple[bool, str]: if s.error: return False, f"no se puede actualizar (error previo: {s.error})" if not s.has_upstream: return False, "no tiene upstream configurado" if s.dirty: return False, "tiene cambios locales sin commitear, abortado" if s.behind == 0: return True, "ya estaba al día" rc, out, err = run_git(s.path, "pull", "--ff-only", "--quiet", timeout=180) if rc != 0: return False, (err or out or "git pull --ff-only falló").splitlines()[0] return True, f"actualizado (+{s.behind} commit(s))" def find_repo(repos: list[RepoStatus], token: str) -> RepoStatus | None: if token.isdigit(): idx = int(token) - 1 return repos[idx] if 0 <= idx < len(repos) else None exact = [r for r in repos if r.name == token] if exact: return exact[0] partial = [r for r in repos if token.lower() in r.name.lower()] if len(partial) == 1: return partial[0] if len(partial) > 1: names = ", ".join(r.name for r in partial) console.print(f"[yellow]'{token}' es ambiguo: {names}[/yellow]") return None def cmd_update(repos: list[RepoStatus], args: list[str]) -> None: if not args: console.print("[red]Uso: update [/red]") return if args == ["all"]: targets = list(repos) else: targets = [] for token in args: r = find_repo(repos, token) if r is None: console.print(f"[red]✗ repo no encontrado: {token}[/red]") continue targets.append(r) if not targets: return for r in targets: with console.status(f"[cyan]actualizando {r.name}…[/cyan]", spinner="dots"): ok, msg = update_repo(r) icon = "[green]✓[/green]" if ok else "[red]✗[/red]" console.print(f" {icon} [bold]{r.name}[/bold]: {msg}") idx = repos.index(r) repos[idx] = gather_status(r.path, fetch=False) HELP = """[bold]Comandos:[/bold] [cyan]list[/cyan] / [cyan]ls[/cyan] Vuelve a mostrar la tabla [cyan]refresh[/cyan] / [cyan]r[/cyan] Re-escanea haciendo git fetch [cyan]update [/cyan] git pull --ff-only en ese repo (acepta nº, nombre o trozo) [cyan]update all[/cyan] Actualiza todos los que estén behind [cyan]help[/cyan] / [cyan]?[/cyan] Esta ayuda [cyan]quit[/cyan] / [cyan]q[/cyan] Salir (también Ctrl-D / Ctrl-C) """ def repl(base: Path) -> None: console.print(f"[dim]gitswarm — escaneando[/dim] [bold]{base}[/bold]") repos = scan(base, fetch=True) if not repos: console.print(f"[yellow]No se han encontrado repos git en {base}[/yellow]") return console.print(render_table(repos)) console.print("[dim]Escribe 'help' para ver los comandos.[/dim]") while True: try: raw = Prompt.ask("\n[bold blue]gitswarm[/bold blue]").strip() except (EOFError, KeyboardInterrupt): console.print() return if not raw: continue try: parts = shlex.split(raw) except ValueError as e: console.print(f"[red]Comando mal formado: {e}[/red]") continue cmd, *args = parts cmd = cmd.lower() if cmd in ("quit", "exit", "q"): return elif cmd in ("help", "?", "h"): console.print(HELP) elif cmd in ("list", "ls"): console.print(render_table(repos)) elif cmd in ("refresh", "r"): repos = scan(base, fetch=True) console.print(render_table(repos)) elif cmd == "update": cmd_update(repos, args) console.print(render_table(repos)) else: console.print( f"[red]Comando desconocido: {cmd}[/red] (prueba [cyan]help[/cyan])" ) def main() -> int: parser = argparse.ArgumentParser( prog="gitswarm", description="Lista y actualiza repos git de primer nivel en una carpeta.", ) parser.add_argument("path", type=Path, help="Carpeta a escanear") args = parser.parse_args() if not args.path.is_dir(): console.print(f"[red]No es un directorio válido: {args.path}[/red]") return 1 repl(args.path.resolve()) return 0 if __name__ == "__main__": sys.exit(main())