#!/usr/bin/env python3 """gitswarm — TUI para listar y actualizar repos git de primer nivel en una carpeta.""" from __future__ import annotations import argparse import os import subprocess import sys import threading import time from concurrent.futures import ThreadPoolExecutor, as_completed from dataclasses import dataclass, field from pathlib import Path import readchar from rich.console import Console, Group from rich.live import Live from rich.progress import ( BarColumn, MofNCompleteColumn, Progress, SpinnerColumn, TextColumn, TimeElapsedColumn, ) from rich.prompt import Prompt from rich.table import Table from rich.text import Text __version__ = "1.2.0" console = Console() # --- env anti-bloqueo para git ---------------------------------------------- NONINTERACTIVE_ENV = { "GIT_TERMINAL_PROMPT": "0", "GIT_ASKPASS": "/bin/true", "SSH_ASKPASS": "/bin/true", # SSH: no preguntar, time out rápido si el host no responde "GIT_SSH_COMMAND": ( "ssh -o BatchMode=yes -o StrictHostKeyChecking=accept-new " "-o ConnectTimeout=5 -o ServerAliveInterval=5 -o ServerAliveCountMax=2" ), } # Config de git pasada con `-c` para acotar timeouts de red en HTTP/HTTPS. # lowSpeedTime=10s + lowSpeedLimit=1KB/s aborta si no llegan datos en 10s. NETWORK_GIT_CONFIG = [ "-c", "http.lowSpeedLimit=1000", "-c", "http.lowSpeedTime=10", ] def _git_env() -> dict[str, str]: return {**os.environ, **NONINTERACTIVE_ENV} def run_git(repo: Path, *args: str, timeout: int = 30, network: bool = False) -> tuple[int, str, str]: cmd = ["git"] if network: cmd.extend(NETWORK_GIT_CONFIG) cmd.extend(args) result = subprocess.run( cmd, cwd=repo, capture_output=True, text=True, timeout=timeout, stdin=subprocess.DEVNULL, env=_git_env(), ) return result.returncode, result.stdout.strip(), result.stderr.strip() def looks_like_auth_error(msg: str) -> bool: m = msg.lower() return any( token in m for token in ( "authentication failed", "could not read username", "could not read password", "permission denied (publickey)", "host key verification failed", "terminal prompts disabled", ) ) def looks_like_network_error(msg: str) -> bool: m = msg.lower() return any( token in m for token in ( "could not resolve host", "name or service not known", "connection refused", "connection timed out", "operation timed out", "network is unreachable", "no route to host", "unable to access", "failed to connect", ) ) # --- modelo ----------------------------------------------------------------- @dataclass class RepoStatus: name: str path: Path branch: str = "-" last_commit: str = "-" dirty: bool = False ahead: int = 0 behind: int = 0 has_upstream: bool = True auth_required: bool = False unreachable: bool = False error: str | None = None dirty_files: list[str] = field(default_factory=list) selected: bool = False busy: bool = False last_action: str = "" def is_git_repo(path: Path) -> bool: return path.is_dir() and (path / ".git").exists() def gather_status(path: Path, fetch: bool = True) -> RepoStatus: s = RepoStatus(name=path.name, path=path) try: rc, out, err = run_git(path, "rev-parse", "--abbrev-ref", "HEAD") if rc != 0: s.error = err or "no se pudo leer HEAD" return s s.branch = "(detached)" if out == "HEAD" else out rc, out, _ = run_git(path, "log", "-1", "--pretty=format:%h %s") if rc == 0 and out: s.last_commit = out rc, out, _ = run_git(path, "status", "--porcelain") if out: s.dirty = True s.dirty_files = out.splitlines() if fetch: try: rc, _, ferr = run_git( path, "fetch", "--quiet", "--all", timeout=30, network=True, ) if rc != 0: if looks_like_auth_error(ferr): s.auth_required = True elif looks_like_network_error(ferr): s.unreachable = True except subprocess.TimeoutExpired: # El fetch superó 30s: lo damos por host inalcanzable s.unreachable = True rc, out, _ = run_git( path, "rev-list", "--left-right", "--count", "@{u}...HEAD" ) if rc != 0: s.has_upstream = False else: try: behind_s, ahead_s = out.split() s.behind = int(behind_s) s.ahead = int(ahead_s) except ValueError: s.has_upstream = False except subprocess.TimeoutExpired: s.error = "timeout" except FileNotFoundError: s.error = "git no encontrado en el PATH" except Exception as e: s.error = str(e) return s def scan(base: Path, fetch: bool = True) -> list[RepoStatus]: try: candidates = sorted(p for p in base.iterdir() if is_git_repo(p)) except PermissionError as e: console.print(f"[red]Sin permisos para leer {base}: {e}[/red]") return [] if not candidates: return [] results: dict[Path, RepoStatus] = {} label = "Analizando" + (" (con fetch)" if fetch else "") with Progress( SpinnerColumn(), TextColumn("[cyan]{task.description}[/cyan]"), BarColumn(), MofNCompleteColumn(), TextColumn("[dim]{task.fields[current]}[/dim]"), TimeElapsedColumn(), console=console, transient=True, ) as progress: task = progress.add_task(label, total=len(candidates), current="") with ThreadPoolExecutor(max_workers=min(8, len(candidates))) as ex: futures = {ex.submit(gather_status, p, fetch): p for p in candidates} for fut in as_completed(futures): p = futures[fut] results[p] = fut.result() progress.update(task, advance=1, current=p.name) return [results[p] for p in candidates] # --- render ----------------------------------------------------------------- SPINNER_FRAMES = ["⠋", "⠙", "⠹", "⠸", "⠼", "⠴", "⠦", "⠧", "⠇", "⠏"] def spinner_frame() -> str: return SPINNER_FRAMES[int(time.monotonic() * 10) % len(SPINNER_FRAMES)] def compute_page(cursor: int, total: int) -> tuple[int, int, int, int, int]: """Devuelve (start, end, page_idx, total_pages, page_size) para la ventana visible.""" h = max(10, console.size.height) # Márgenes: título + cabecera + bordes + leyenda + status + colchón page_size = max(5, h - 8) if total == 0: return 0, 0, 0, 1, page_size page = cursor // page_size start = page * page_size end = min(start + page_size, total) total_pages = (total + page_size - 1) // page_size return start, end, page, total_pages, page_size def status_cell(s: RepoStatus) -> Text: if s.busy: return Text(f"{spinner_frame()} trabajando…", style="bold blue") if s.error: return Text(f"ERROR: {s.error}", style="bold red") if s.auth_required: return Text("🔒 auth", style="bold yellow") if s.unreachable: return Text("🔌 sin red", style="bold yellow") if not s.has_upstream: return Text("sin upstream", style="yellow") if s.behind == 0 and s.ahead == 0: return Text("✓ al día", style="bold green") parts: list[Text] = [] if s.behind: parts.append(Text(f"↓{s.behind}", style="bold red")) if s.ahead: parts.append(Text(f"↑{s.ahead}", style="bold cyan")) return Text(" ").join(parts) def progress_bar(done: int, total: int, width: int = 30) -> Text: if total <= 0: return Text("") filled = int(width * done / total) pct = int(100 * done / total) bar = Text() bar.append("█" * filled, style="bold green") bar.append("░" * (width - filled), style="dim") return Text.assemble( Text("["), bar, Text(f"] {done}/{total} ({pct}%)", style="bold"), ) def render( repos: list[RepoStatus], cursor: int, base: Path, status_msg: str = "", progress: tuple[int, int] | None = None, ) -> Group: start, end, page_idx, total_pages, _ = compute_page(cursor, len(repos)) page_label = f" — página {page_idx + 1}/{total_pages}" if total_pages > 1 else "" marked = sum(1 for r in repos if r.selected) marked_label = f" · [bold cyan]{marked} marcado(s)[/bold cyan]" if marked else "" table = Table( show_header=True, header_style="bold magenta", title=f"[bold]gitswarm[/bold] — [bold]{base}[/bold][dim]{page_label}[/dim]{marked_label}", title_style="white", expand=True, ) table.add_column(" ", width=1, no_wrap=True) table.add_column("#", justify="right", style="dim", width=4) table.add_column("Repo", style="bold", no_wrap=True, overflow="ellipsis") table.add_column("Rama", style="cyan", no_wrap=True, overflow="ellipsis") table.add_column("Estado", no_wrap=True) table.add_column("Local", no_wrap=True) table.add_column("Último commit", no_wrap=True, overflow="ellipsis", ratio=1) for i in range(start, end): s = repos[i] caret = Text("▶", style="bold yellow") if i == cursor else Text(" ") if s.selected: mark = Text("☑", style="bold cyan") else: mark = Text("○", style="dim") if s.behind else Text(" ") # Repo: cursor + marca + nombre name = Text(s.name) if i == cursor: name.stylize("bold underline") repo_cell = Text.assemble(mark, " ", name) dirty = ( Text(f"● {len(s.dirty_files)}", style="bold yellow") if s.dirty else Text("limpio", style="dim green") ) last = s.last_action or s.last_commit last_style = "italic dim" if s.last_action else "" table.add_row( caret, str(i + 1), repo_cell, s.branch, status_cell(s), dirty, Text(last, style=last_style) if last_style else Text(last), ) legend = Text.assemble( ("↑↓ j/k", "bold cyan"), " mover ", ("←→ h/l", "bold cyan"), " página ", ("Space", "bold cyan"), " marcar ", ("a/n", "bold cyan"), " todos/ninguno ", ("Enter", "bold green"), " update ", ("s", "bold cyan"), " status ", ("c", "bold cyan"), " commit ", ("r", "bold cyan"), " refresh ", ("?", "bold cyan"), " ayuda ", ("q", "bold cyan"), " salir", style="dim", ) status = Text(status_msg, style="dim italic") if status_msg else Text("") if progress is not None: done, total = progress bar = Text.assemble( Text("Progreso ", style="bold cyan"), progress_bar(done, total), ) return Group(table, legend, bar, status) return Group(table, legend, status) # --- acciones --------------------------------------------------------------- def update_repo(s: RepoStatus) -> tuple[bool, str]: if s.error: return False, f"error previo: {s.error}" if not s.has_upstream: return False, "sin upstream" if s.dirty: return False, "cambios locales sin commitear, abortado" if s.behind == 0: return True, "ya estaba al día" try: rc, out, err = run_git(s.path, "pull", "--ff-only", "--quiet", timeout=90, network=True) except subprocess.TimeoutExpired: return False, "🔌 timeout (host no responde)" if rc != 0: msg = (err or out or "git pull --ff-only falló").splitlines()[0] if looks_like_auth_error(err): return False, "🔒 necesita credenciales" if looks_like_network_error(err): return False, "🔌 host inalcanzable" return False, msg return True, f"actualizado (+{s.behind} commit(s))" def run_update_queue(repos: list[RepoStatus], live: Live, cursor: int, base: Path) -> None: pending = [r for r in repos if r.selected and r.behind > 0 and not r.dirty and r.has_upstream and not r.error] if not pending: live.update(render(repos, cursor, base, "Nada que actualizar (marca algún repo behind con Space)."), refresh=True) return for r in pending: r.busy = True r.selected = False total = len(pending) stop = threading.Event() state = {"ok": 0, "done": 0} def animate() -> None: while not stop.is_set(): in_progress = total - state["done"] msg = f"Actualizando · {in_progress} en curso · {state['ok']} ok" live.update( render(repos, cursor, base, msg, progress=(state["done"], total)), refresh=True, ) stop.wait(0.1) anim = threading.Thread(target=animate, daemon=True) anim.start() try: with ThreadPoolExecutor(max_workers=min(4, total)) as ex: futures = {ex.submit(update_repo, r): r for r in pending} for fut in as_completed(futures): r = futures[fut] ok, msg = fut.result() r.busy = False r.last_action = ("✓ " if ok else "✗ ") + msg if ok: state["ok"] += 1 state["done"] += 1 # Re-leer ahead/behind sin fetch (el pull ya lo bajó) fresh = gather_status(r.path, fetch=False) fresh.selected = False fresh.last_action = r.last_action idx = repos.index(r) repos[idx] = fresh finally: stop.set() anim.join() live.update(render(repos, cursor, base, f"Listo: {state['ok']}/{total} actualizados."), refresh=True) def show_status(s: RepoStatus, live: Live) -> None: """Suspende el Live y muestra `git status` del repo.""" live.stop() try: console.clear() console.rule(f"[bold]{s.name}[/bold] — git status") try: result = subprocess.run( ["git", "-c", "color.status=always", "status"], cwd=s.path, stdin=subprocess.DEVNULL, env=_git_env(), timeout=15, ) if result.returncode != 0: console.print(f"[red]git status devolvió {result.returncode}[/red]") except subprocess.TimeoutExpired: console.print("[red]timeout ejecutando git status[/red]") console.rule("[dim]pulsa cualquier tecla para volver[/dim]") readchar.readkey() finally: live.start(refresh=True) def do_commit(s: RepoStatus, live: Live) -> str: """Suspende el Live, hace add -A y commit. Devuelve mensaje de resultado.""" if s.error: return f"✗ error previo: {s.error}" if s.branch == "(detached)": return "✗ HEAD detached, no se puede commitear" if not s.dirty: return "nada que commitear" live.stop() msg_result = "" try: console.clear() console.rule(f"[bold]{s.name}[/bold] — commit") # Resumen de lo que se va a stagear added = modified = deleted = 0 for line in s.dirty_files: code = line[:2] if "?" in code: added += 1 elif "D" in code: deleted += 1 else: modified += 1 console.print( f"Se va a hacer [bold]git add -A[/bold] e incluir: " f"[green]+{added} nuevos[/green], [yellow]~{modified} modificados[/yellow], " f"[red]-{deleted} borrados[/red]" ) try: cm = Prompt.ask("[bold]Mensaje de commit[/bold] (vacío = cancelar)", default="").strip() except (EOFError, KeyboardInterrupt): cm = "" if not cm: msg_result = "commit cancelado" else: rc, _, err = run_git(s.path, "add", "-A", timeout=60) if rc != 0: msg_result = f"✗ git add falló: {(err or '').splitlines()[0] if err else 'sin detalle'}" else: rc, out, err = run_git(s.path, "commit", "-m", cm, timeout=60) if rc != 0: line = (err or out or "commit falló").splitlines()[0] msg_result = f"✗ commit falló: {line}" else: msg_result = "✓ commit creado" console.rule("[dim]pulsa cualquier tecla para volver[/dim]") readchar.readkey() finally: live.start(refresh=True) return msg_result # --- ayuda ------------------------------------------------------------------ HELP_TEXT = """[bold]gitswarm — atajos de teclado[/bold] [cyan]↑ / ↓ / j / k[/cyan] mover el cursor [cyan]← / → / h / l[/cyan] cambiar de página [cyan]g / G[/cyan] ir al inicio / final [cyan]Space[/cyan] marcar/desmarcar el repo bajo el cursor [cyan]a[/cyan] marcar todos los que estén behind [cyan]n[/cyan] desmarcar todos [cyan]Enter[/cyan] ejecuta [bold]git pull --ff-only[/bold] en los marcados [cyan]s[/cyan] ver [bold]git status[/bold] del repo bajo cursor [cyan]c[/cyan] commit en el repo bajo cursor (git add -A + mensaje) [cyan]r[/cyan] refrescar (re-fetch de todos) [cyan]?[/cyan] esta ayuda [cyan]q[/cyan] salir (también Ctrl-C / Ctrl-D) """ def show_help(live: Live) -> None: live.stop() try: console.clear() console.print(HELP_TEXT) console.rule("[dim]pulsa cualquier tecla para volver[/dim]") readchar.readkey() finally: live.start(refresh=True) # --- TUI loop --------------------------------------------------------------- def tui(repos: list[RepoStatus], base: Path) -> None: if not repos: console.print(f"[yellow]No se han encontrado repos git en {base}[/yellow]") return cursor = 0 status_msg = "" with Live(render(repos, cursor, base, status_msg), console=console, screen=False, auto_refresh=False) as live: while True: live.update(render(repos, cursor, base, status_msg), refresh=True) try: key = readchar.readkey() except KeyboardInterrupt: return status_msg = "" if key in ("q", "Q", readchar.key.CTRL_C, readchar.key.CTRL_D): return elif key in (readchar.key.UP, "k"): cursor = (cursor - 1) % len(repos) elif key in (readchar.key.DOWN, "j"): cursor = (cursor + 1) % len(repos) elif key in (readchar.key.LEFT, readchar.key.PAGE_UP, "h"): _, _, page_idx, total_pages, page_size = compute_page(cursor, len(repos)) new_page = (page_idx - 1) % total_pages cursor = min(new_page * page_size, len(repos) - 1) elif key in (readchar.key.RIGHT, readchar.key.PAGE_DOWN, "l"): _, _, page_idx, total_pages, page_size = compute_page(cursor, len(repos)) new_page = (page_idx + 1) % total_pages cursor = min(new_page * page_size, len(repos) - 1) elif key in (readchar.key.HOME, "g"): cursor = 0 elif key in (readchar.key.END, "G"): cursor = len(repos) - 1 elif key == " ": r = repos[cursor] if r.behind > 0 and not r.dirty and r.has_upstream and not r.error: r.selected = not r.selected else: status_msg = "ese repo no se puede actualizar (limpio, sin upstream, dirty o con error)" elif key == "a": for r in repos: if r.behind > 0 and not r.dirty and r.has_upstream and not r.error: r.selected = True elif key == "n": for r in repos: r.selected = False elif key in (readchar.key.ENTER, "\r", "\n"): run_update_queue(repos, live, cursor, base) elif key in ("s", "S"): show_status(repos[cursor], live) elif key in ("c", "C"): msg = do_commit(repos[cursor], live) # refresca el estado del repo tras el commit fresh = gather_status(repos[cursor].path, fetch=False) fresh.last_action = msg repos[cursor] = fresh elif key in ("r", "R"): status_msg = "refrescando…" live.update(render(repos, cursor, base, status_msg), refresh=True) new_repos = scan(base, fetch=True) if new_repos: repos[:] = new_repos cursor = min(cursor, len(repos) - 1) status_msg = f"refrescado: {len(repos)} repo(s)" elif key == "?": show_help(live) # --- entrypoint ------------------------------------------------------------- def main() -> int: parser = argparse.ArgumentParser( prog="gitswarm", description="TUI para listar y actualizar repos git de primer nivel en una carpeta.", ) parser.add_argument( "-v", "--version", action="version", version=f"gitswarm v{__version__}", ) parser.add_argument("path", type=Path, help="Carpeta a escanear") args = parser.parse_args() if not args.path.is_dir(): console.print(f"[red]No es un directorio válido: {args.path}[/red]") return 1 base = args.path.resolve() console.print(f"[dim]gitswarm v{__version__} — escaneando[/dim] [bold]{base}[/bold]") repos = scan(base, fetch=True) try: tui(repos, base) except KeyboardInterrupt: pass return 0 if __name__ == "__main__": sys.exit(main())