Files
gitswarm/gitswarm.py
T

639 lines
22 KiB
Python

#!/usr/bin/env python3
"""gitswarm — TUI para listar y actualizar repos git de primer nivel en una carpeta."""
from __future__ import annotations
import argparse
import os
import subprocess
import sys
import threading
import time
from concurrent.futures import ThreadPoolExecutor, as_completed
from dataclasses import dataclass, field
from pathlib import Path
import readchar
from rich.console import Console, Group
from rich.live import Live
from rich.progress import (
BarColumn,
MofNCompleteColumn,
Progress,
SpinnerColumn,
TextColumn,
TimeElapsedColumn,
)
from rich.prompt import Prompt
from rich.table import Table
from rich.text import Text
__version__ = "1.2.0"
console = Console()
# --- env anti-bloqueo para git ----------------------------------------------
NONINTERACTIVE_ENV = {
"GIT_TERMINAL_PROMPT": "0",
"GIT_ASKPASS": "/bin/true",
"SSH_ASKPASS": "/bin/true",
# SSH: no preguntar, time out rápido si el host no responde
"GIT_SSH_COMMAND": (
"ssh -o BatchMode=yes -o StrictHostKeyChecking=accept-new "
"-o ConnectTimeout=5 -o ServerAliveInterval=5 -o ServerAliveCountMax=2"
),
}
# Config de git pasada con `-c` para acotar timeouts de red en HTTP/HTTPS.
# lowSpeedTime=10s + lowSpeedLimit=1KB/s aborta si no llegan datos en 10s.
NETWORK_GIT_CONFIG = [
"-c", "http.lowSpeedLimit=1000",
"-c", "http.lowSpeedTime=10",
]
def _git_env() -> dict[str, str]:
return {**os.environ, **NONINTERACTIVE_ENV}
def run_git(repo: Path, *args: str, timeout: int = 30, network: bool = False) -> tuple[int, str, str]:
cmd = ["git"]
if network:
cmd.extend(NETWORK_GIT_CONFIG)
cmd.extend(args)
result = subprocess.run(
cmd,
cwd=repo,
capture_output=True,
text=True,
timeout=timeout,
stdin=subprocess.DEVNULL,
env=_git_env(),
)
return result.returncode, result.stdout.strip(), result.stderr.strip()
def looks_like_auth_error(msg: str) -> bool:
m = msg.lower()
return any(
token in m
for token in (
"authentication failed",
"could not read username",
"could not read password",
"permission denied (publickey)",
"host key verification failed",
"terminal prompts disabled",
)
)
def looks_like_network_error(msg: str) -> bool:
m = msg.lower()
return any(
token in m
for token in (
"could not resolve host",
"name or service not known",
"connection refused",
"connection timed out",
"operation timed out",
"network is unreachable",
"no route to host",
"unable to access",
"failed to connect",
)
)
# --- modelo -----------------------------------------------------------------
@dataclass
class RepoStatus:
name: str
path: Path
branch: str = "-"
last_commit: str = "-"
dirty: bool = False
ahead: int = 0
behind: int = 0
has_upstream: bool = True
auth_required: bool = False
unreachable: bool = False
error: str | None = None
dirty_files: list[str] = field(default_factory=list)
selected: bool = False
busy: bool = False
last_action: str = ""
def is_git_repo(path: Path) -> bool:
return path.is_dir() and (path / ".git").exists()
def gather_status(path: Path, fetch: bool = True) -> RepoStatus:
s = RepoStatus(name=path.name, path=path)
try:
rc, out, err = run_git(path, "rev-parse", "--abbrev-ref", "HEAD")
if rc != 0:
s.error = err or "no se pudo leer HEAD"
return s
s.branch = "(detached)" if out == "HEAD" else out
rc, out, _ = run_git(path, "log", "-1", "--pretty=format:%h %s")
if rc == 0 and out:
s.last_commit = out
rc, out, _ = run_git(path, "status", "--porcelain")
if out:
s.dirty = True
s.dirty_files = out.splitlines()
if fetch:
try:
rc, _, ferr = run_git(
path, "fetch", "--quiet", "--all",
timeout=30, network=True,
)
if rc != 0:
if looks_like_auth_error(ferr):
s.auth_required = True
elif looks_like_network_error(ferr):
s.unreachable = True
except subprocess.TimeoutExpired:
# El fetch superó 30s: lo damos por host inalcanzable
s.unreachable = True
rc, out, _ = run_git(
path, "rev-list", "--left-right", "--count", "@{u}...HEAD"
)
if rc != 0:
s.has_upstream = False
else:
try:
behind_s, ahead_s = out.split()
s.behind = int(behind_s)
s.ahead = int(ahead_s)
except ValueError:
s.has_upstream = False
except subprocess.TimeoutExpired:
s.error = "timeout"
except FileNotFoundError:
s.error = "git no encontrado en el PATH"
except Exception as e:
s.error = str(e)
return s
def scan(base: Path, fetch: bool = True) -> list[RepoStatus]:
try:
candidates = sorted(p for p in base.iterdir() if is_git_repo(p))
except PermissionError as e:
console.print(f"[red]Sin permisos para leer {base}: {e}[/red]")
return []
if not candidates:
return []
results: dict[Path, RepoStatus] = {}
label = "Analizando" + (" (con fetch)" if fetch else "")
with Progress(
SpinnerColumn(),
TextColumn("[cyan]{task.description}[/cyan]"),
BarColumn(),
MofNCompleteColumn(),
TextColumn("[dim]{task.fields[current]}[/dim]"),
TimeElapsedColumn(),
console=console,
transient=True,
) as progress:
task = progress.add_task(label, total=len(candidates), current="")
with ThreadPoolExecutor(max_workers=min(8, len(candidates))) as ex:
futures = {ex.submit(gather_status, p, fetch): p for p in candidates}
for fut in as_completed(futures):
p = futures[fut]
results[p] = fut.result()
progress.update(task, advance=1, current=p.name)
return [results[p] for p in candidates]
# --- render -----------------------------------------------------------------
SPINNER_FRAMES = ["", "", "", "", "", "", "", "", "", ""]
def spinner_frame() -> str:
return SPINNER_FRAMES[int(time.monotonic() * 10) % len(SPINNER_FRAMES)]
def compute_page(cursor: int, total: int) -> tuple[int, int, int, int, int]:
"""Devuelve (start, end, page_idx, total_pages, page_size) para la ventana visible."""
h = max(10, console.size.height)
# Márgenes: título + cabecera + bordes + leyenda + status + colchón
page_size = max(5, h - 8)
if total == 0:
return 0, 0, 0, 1, page_size
page = cursor // page_size
start = page * page_size
end = min(start + page_size, total)
total_pages = (total + page_size - 1) // page_size
return start, end, page, total_pages, page_size
def status_cell(s: RepoStatus) -> Text:
if s.busy:
return Text(f"{spinner_frame()} trabajando…", style="bold blue")
if s.error:
return Text(f"ERROR: {s.error}", style="bold red")
if s.auth_required:
return Text("🔒 auth", style="bold yellow")
if s.unreachable:
return Text("🔌 sin red", style="bold yellow")
if not s.has_upstream:
return Text("sin upstream", style="yellow")
if s.behind == 0 and s.ahead == 0:
return Text("✓ al día", style="bold green")
parts: list[Text] = []
if s.behind:
parts.append(Text(f"{s.behind}", style="bold red"))
if s.ahead:
parts.append(Text(f"{s.ahead}", style="bold cyan"))
return Text(" ").join(parts)
def progress_bar(done: int, total: int, width: int = 30) -> Text:
if total <= 0:
return Text("")
filled = int(width * done / total)
pct = int(100 * done / total)
bar = Text()
bar.append("" * filled, style="bold green")
bar.append("" * (width - filled), style="dim")
return Text.assemble(
Text("["),
bar,
Text(f"] {done}/{total} ({pct}%)", style="bold"),
)
def render(
repos: list[RepoStatus],
cursor: int,
base: Path,
status_msg: str = "",
progress: tuple[int, int] | None = None,
) -> Group:
start, end, page_idx, total_pages, _ = compute_page(cursor, len(repos))
page_label = f" — página {page_idx + 1}/{total_pages}" if total_pages > 1 else ""
marked = sum(1 for r in repos if r.selected)
marked_label = f" · [bold cyan]{marked} marcado(s)[/bold cyan]" if marked else ""
table = Table(
show_header=True,
header_style="bold magenta",
title=f"[bold]gitswarm[/bold] — [bold]{base}[/bold][dim]{page_label}[/dim]{marked_label}",
title_style="white",
expand=True,
)
table.add_column(" ", width=1, no_wrap=True)
table.add_column("#", justify="right", style="dim", width=4)
table.add_column("Repo", style="bold", no_wrap=True, overflow="ellipsis")
table.add_column("Rama", style="cyan", no_wrap=True, overflow="ellipsis")
table.add_column("Estado", no_wrap=True)
table.add_column("Local", no_wrap=True)
table.add_column("Último commit", no_wrap=True, overflow="ellipsis", ratio=1)
for i in range(start, end):
s = repos[i]
caret = Text("", style="bold yellow") if i == cursor else Text(" ")
if s.selected:
mark = Text("", style="bold cyan")
else:
mark = Text("", style="dim") if s.behind else Text(" ")
# Repo: cursor + marca + nombre
name = Text(s.name)
if i == cursor:
name.stylize("bold underline")
repo_cell = Text.assemble(mark, " ", name)
dirty = (
Text(f"{len(s.dirty_files)}", style="bold yellow")
if s.dirty
else Text("limpio", style="dim green")
)
last = s.last_action or s.last_commit
last_style = "italic dim" if s.last_action else ""
table.add_row(
caret,
str(i + 1),
repo_cell,
s.branch,
status_cell(s),
dirty,
Text(last, style=last_style) if last_style else Text(last),
)
legend = Text.assemble(
("↑↓ j/k", "bold cyan"), " mover ",
("←→ h/l", "bold cyan"), " página ",
("Space", "bold cyan"), " marcar ",
("a/n", "bold cyan"), " todos/ninguno ",
("Enter", "bold green"), " update ",
("s", "bold cyan"), " status ",
("c", "bold cyan"), " commit ",
("r", "bold cyan"), " refresh ",
("?", "bold cyan"), " ayuda ",
("q", "bold cyan"), " salir",
style="dim",
)
status = Text(status_msg, style="dim italic") if status_msg else Text("")
if progress is not None:
done, total = progress
bar = Text.assemble(
Text("Progreso ", style="bold cyan"),
progress_bar(done, total),
)
return Group(table, legend, bar, status)
return Group(table, legend, status)
# --- acciones ---------------------------------------------------------------
def update_repo(s: RepoStatus) -> tuple[bool, str]:
if s.error:
return False, f"error previo: {s.error}"
if not s.has_upstream:
return False, "sin upstream"
if s.dirty:
return False, "cambios locales sin commitear, abortado"
if s.behind == 0:
return True, "ya estaba al día"
try:
rc, out, err = run_git(s.path, "pull", "--ff-only", "--quiet", timeout=90, network=True)
except subprocess.TimeoutExpired:
return False, "🔌 timeout (host no responde)"
if rc != 0:
msg = (err or out or "git pull --ff-only falló").splitlines()[0]
if looks_like_auth_error(err):
return False, "🔒 necesita credenciales"
if looks_like_network_error(err):
return False, "🔌 host inalcanzable"
return False, msg
return True, f"actualizado (+{s.behind} commit(s))"
def run_update_queue(repos: list[RepoStatus], live: Live, cursor: int, base: Path) -> None:
pending = [r for r in repos if r.selected and r.behind > 0 and not r.dirty and r.has_upstream and not r.error]
if not pending:
live.update(render(repos, cursor, base, "Nada que actualizar (marca algún repo behind con Space)."), refresh=True)
return
for r in pending:
r.busy = True
r.selected = False
total = len(pending)
stop = threading.Event()
state = {"ok": 0, "done": 0}
def animate() -> None:
while not stop.is_set():
in_progress = total - state["done"]
msg = f"Actualizando · {in_progress} en curso · {state['ok']} ok"
live.update(
render(repos, cursor, base, msg, progress=(state["done"], total)),
refresh=True,
)
stop.wait(0.1)
anim = threading.Thread(target=animate, daemon=True)
anim.start()
try:
with ThreadPoolExecutor(max_workers=min(4, total)) as ex:
futures = {ex.submit(update_repo, r): r for r in pending}
for fut in as_completed(futures):
r = futures[fut]
ok, msg = fut.result()
r.busy = False
r.last_action = ("" if ok else "") + msg
if ok:
state["ok"] += 1
state["done"] += 1
# Re-leer ahead/behind sin fetch (el pull ya lo bajó)
fresh = gather_status(r.path, fetch=False)
fresh.selected = False
fresh.last_action = r.last_action
idx = repos.index(r)
repos[idx] = fresh
finally:
stop.set()
anim.join()
live.update(render(repos, cursor, base, f"Listo: {state['ok']}/{total} actualizados."), refresh=True)
def show_status(s: RepoStatus, live: Live) -> None:
"""Suspende el Live y muestra `git status` del repo."""
live.stop()
try:
console.clear()
console.rule(f"[bold]{s.name}[/bold] — git status")
try:
result = subprocess.run(
["git", "-c", "color.status=always", "status"],
cwd=s.path,
stdin=subprocess.DEVNULL,
env=_git_env(),
timeout=15,
)
if result.returncode != 0:
console.print(f"[red]git status devolvió {result.returncode}[/red]")
except subprocess.TimeoutExpired:
console.print("[red]timeout ejecutando git status[/red]")
console.rule("[dim]pulsa cualquier tecla para volver[/dim]")
readchar.readkey()
finally:
live.start(refresh=True)
def do_commit(s: RepoStatus, live: Live) -> str:
"""Suspende el Live, hace add -A y commit. Devuelve mensaje de resultado."""
if s.error:
return f"✗ error previo: {s.error}"
if s.branch == "(detached)":
return "✗ HEAD detached, no se puede commitear"
if not s.dirty:
return "nada que commitear"
live.stop()
msg_result = ""
try:
console.clear()
console.rule(f"[bold]{s.name}[/bold] — commit")
# Resumen de lo que se va a stagear
added = modified = deleted = 0
for line in s.dirty_files:
code = line[:2]
if "?" in code:
added += 1
elif "D" in code:
deleted += 1
else:
modified += 1
console.print(
f"Se va a hacer [bold]git add -A[/bold] e incluir: "
f"[green]+{added} nuevos[/green], [yellow]~{modified} modificados[/yellow], "
f"[red]-{deleted} borrados[/red]"
)
try:
cm = Prompt.ask("[bold]Mensaje de commit[/bold] (vacío = cancelar)", default="").strip()
except (EOFError, KeyboardInterrupt):
cm = ""
if not cm:
msg_result = "commit cancelado"
else:
rc, _, err = run_git(s.path, "add", "-A", timeout=60)
if rc != 0:
msg_result = f"✗ git add falló: {(err or '').splitlines()[0] if err else 'sin detalle'}"
else:
rc, out, err = run_git(s.path, "commit", "-m", cm, timeout=60)
if rc != 0:
line = (err or out or "commit falló").splitlines()[0]
msg_result = f"✗ commit falló: {line}"
else:
msg_result = "✓ commit creado"
console.rule("[dim]pulsa cualquier tecla para volver[/dim]")
readchar.readkey()
finally:
live.start(refresh=True)
return msg_result
# --- ayuda ------------------------------------------------------------------
HELP_TEXT = """[bold]gitswarm — atajos de teclado[/bold]
[cyan]↑ / ↓ / j / k[/cyan] mover el cursor
[cyan]← / → / h / l[/cyan] cambiar de página
[cyan]g / G[/cyan] ir al inicio / final
[cyan]Space[/cyan] marcar/desmarcar el repo bajo el cursor
[cyan]a[/cyan] marcar todos los que estén behind
[cyan]n[/cyan] desmarcar todos
[cyan]Enter[/cyan] ejecuta [bold]git pull --ff-only[/bold] en los marcados
[cyan]s[/cyan] ver [bold]git status[/bold] del repo bajo cursor
[cyan]c[/cyan] commit en el repo bajo cursor (git add -A + mensaje)
[cyan]r[/cyan] refrescar (re-fetch de todos)
[cyan]?[/cyan] esta ayuda
[cyan]q[/cyan] salir (también Ctrl-C / Ctrl-D)
"""
def show_help(live: Live) -> None:
live.stop()
try:
console.clear()
console.print(HELP_TEXT)
console.rule("[dim]pulsa cualquier tecla para volver[/dim]")
readchar.readkey()
finally:
live.start(refresh=True)
# --- TUI loop ---------------------------------------------------------------
def tui(repos: list[RepoStatus], base: Path) -> None:
if not repos:
console.print(f"[yellow]No se han encontrado repos git en {base}[/yellow]")
return
cursor = 0
status_msg = ""
with Live(render(repos, cursor, base, status_msg), console=console, screen=False, auto_refresh=False) as live:
while True:
live.update(render(repos, cursor, base, status_msg), refresh=True)
try:
key = readchar.readkey()
except KeyboardInterrupt:
return
status_msg = ""
if key in ("q", "Q", readchar.key.CTRL_C, readchar.key.CTRL_D):
return
elif key in (readchar.key.UP, "k"):
cursor = (cursor - 1) % len(repos)
elif key in (readchar.key.DOWN, "j"):
cursor = (cursor + 1) % len(repos)
elif key in (readchar.key.LEFT, readchar.key.PAGE_UP, "h"):
_, _, page_idx, total_pages, page_size = compute_page(cursor, len(repos))
new_page = (page_idx - 1) % total_pages
cursor = min(new_page * page_size, len(repos) - 1)
elif key in (readchar.key.RIGHT, readchar.key.PAGE_DOWN, "l"):
_, _, page_idx, total_pages, page_size = compute_page(cursor, len(repos))
new_page = (page_idx + 1) % total_pages
cursor = min(new_page * page_size, len(repos) - 1)
elif key in (readchar.key.HOME, "g"):
cursor = 0
elif key in (readchar.key.END, "G"):
cursor = len(repos) - 1
elif key == " ":
r = repos[cursor]
if r.behind > 0 and not r.dirty and r.has_upstream and not r.error:
r.selected = not r.selected
else:
status_msg = "ese repo no se puede actualizar (limpio, sin upstream, dirty o con error)"
elif key == "a":
for r in repos:
if r.behind > 0 and not r.dirty and r.has_upstream and not r.error:
r.selected = True
elif key == "n":
for r in repos:
r.selected = False
elif key in (readchar.key.ENTER, "\r", "\n"):
run_update_queue(repos, live, cursor, base)
elif key in ("s", "S"):
show_status(repos[cursor], live)
elif key in ("c", "C"):
msg = do_commit(repos[cursor], live)
# refresca el estado del repo tras el commit
fresh = gather_status(repos[cursor].path, fetch=False)
fresh.last_action = msg
repos[cursor] = fresh
elif key in ("r", "R"):
status_msg = "refrescando…"
live.update(render(repos, cursor, base, status_msg), refresh=True)
new_repos = scan(base, fetch=True)
if new_repos:
repos[:] = new_repos
cursor = min(cursor, len(repos) - 1)
status_msg = f"refrescado: {len(repos)} repo(s)"
elif key == "?":
show_help(live)
# --- entrypoint -------------------------------------------------------------
def main() -> int:
parser = argparse.ArgumentParser(
prog="gitswarm",
description="TUI para listar y actualizar repos git de primer nivel en una carpeta.",
)
parser.add_argument(
"-v",
"--version",
action="version",
version=f"gitswarm v{__version__}",
)
parser.add_argument("path", type=Path, help="Carpeta a escanear")
args = parser.parse_args()
if not args.path.is_dir():
console.print(f"[red]No es un directorio válido: {args.path}[/red]")
return 1
base = args.path.resolve()
console.print(f"[dim]gitswarm v{__version__} — escaneando[/dim] [bold]{base}[/bold]")
repos = scan(base, fetch=True)
try:
tui(repos, base)
except KeyboardInterrupt:
pass
return 0
if __name__ == "__main__":
sys.exit(main())