4 Commits

4 changed files with 528 additions and 139 deletions
+48 -14
View File
@@ -1,6 +1,6 @@
# gitswarm
Lista y actualiza repos git de primer nivel dentro de una carpeta. REPL interactivo con colores y tabla.
TUI para listar y actualizar repos git de primer nivel dentro de una carpeta. Soporta muchos repos (paginación automática) y nunca se bloquea pidiendo credenciales.
## Uso
@@ -8,16 +8,51 @@ Lista y actualiza repos git de primer nivel dentro de una carpeta. REPL interact
gitswarm /ruta/a/escanear
```
Te muestra una tabla con los repos detectados y entra a un prompt:
Te muestra una tabla paginada con todos los repos detectados y un cursor para navegarlos. La paginación se ajusta sola al alto del terminal.
- `list` / `ls` — vuelve a mostrar la tabla
- `refresh` / `r` — re-escanea haciendo `git fetch`
- `update <repo>``git pull --ff-only` (acepta nº, nombre exacto o fragmento)
- `update all` — actualiza todos los que estén behind
- `help` / `?`
- `quit` / `q` (también Ctrl-D / Ctrl-C)
### Atajos
El estado se calcula con `git fetch` al arrancar y al hacer `refresh`. `update` aborta si el repo tiene cambios locales sin commitear.
| Tecla | Acción |
|---|---|
| `↑` `↓` `j` `k` | mover cursor |
| `←` `→` `h` `l` | cambiar de página |
| `g` `G` | inicio / final |
| `Space` | marcar/desmarcar el repo bajo el cursor |
| `a` `n` | marcar todos los que estén behind / desmarcar todos |
| `Enter` | `git pull --ff-only` en los marcados (en paralelo) |
| `s` | mostrar `git status` del repo bajo cursor |
| `c` | commit en el repo bajo cursor (`git add -A` + mensaje) |
| `r` | refrescar (re-fetch) |
| `?` | ayuda |
| `q` | salir (también `Ctrl-C` / `Ctrl-D`) |
El estado se calcula con `git fetch` al arrancar y en cada `refresh`. `Enter` aborta cualquier repo con cambios locales sin commitear.
## Robustez de red
gitswarm **nunca se cuelga**: ejecuta git con timeouts cortos (SSH `ConnectTimeout=5`, HTTP aborta si no llegan datos en 10s, fetch global cap de 30s), `GIT_TERMINAL_PROMPT=0` y askpass desactivado. Cada repo problemático se marca y se pasa al siguiente:
- 🔒 **`auth`** — el repo necesita credenciales que no están configuradas.
- 🔌 **`sin red`** — el host del remote no resuelve, no responde o tarda demasiado.
- **`ERROR: …`** — fallo de git distinto a los anteriores.
## Credenciales (repos privados)
Para que funcionen los repos privados, configura un credential helper de git **una sola vez** (no es trabajo de gitswarm, esto es la forma estándar de git):
```bash
# Opción 1 — store sencillo en disco (~/.git-credentials, sin cifrar):
git config --global credential.helper store
# Opción 2 — libsecret (gnome-keyring, KDE wallet, etc., recomendado en Linux):
sudo apt install libsecret-1-0 libsecret-1-dev
sudo make -C /usr/share/doc/git/contrib/credential/libsecret
git config --global credential.helper /usr/share/doc/git/contrib/credential/libsecret/git-credential-libsecret
```
La primera vez que hagas `git pull` a mano sobre un repo privado, git te pedirá usuario/token y el helper lo guardará. A partir de ahí gitswarm puede actualizarlo sin intervención.
Para repos por SSH lo equivalente es tener la clave cargada en `ssh-agent`.
## Requisitos en el sistema
@@ -26,7 +61,7 @@ Necesarios siempre:
- `python3` (≥ 3.10)
- `git`
Para desarrollar (con venv y rich pip-installed):
Para desarrollar (con venv y dependencias pip-installed):
- `python3-venv`
@@ -47,7 +82,7 @@ sudo apt install python3 python3-venv python3-dev git gcc patchelf
```bash
python3 -m venv .venv
.venv/bin/pip install -r requirements.txt
./gitswarm /ruta/a/escanear # wrapper que usa el venv
.venv/bin/python gitswarm.py /ruta/a/escanear
```
## Compilar a binario standalone
@@ -56,7 +91,7 @@ python3 -m venv .venv
./build.sh
```
Genera `dist/gitswarm`, un único ejecutable autocontenido (lleva Python + rich dentro). Para instalarlo en tu PATH:
Genera `dist/gitswarm`, un único ejecutable autocontenido (lleva Python + rich + readchar dentro). Para instalarlo en tu PATH:
```bash
cp dist/gitswarm ~/.local/bin/
@@ -65,8 +100,7 @@ cp dist/gitswarm ~/.local/bin/
## Ficheros
- `gitswarm.py` — script principal
- `gitswarm` — wrapper bash que invoca el venv local (para desarrollo)
- `requirements.txt` — dependencias Python (`rich`)
- `requirements.txt` — dependencias Python (`rich`, `readchar`)
- `build.sh` — compila el binario con Nuitka
- `.venv/` — virtualenv local (no en git)
- `dist/` — binarios compilados (no en git)
+5 -2
View File
@@ -6,7 +6,7 @@ set -euo pipefail
HERE="$(cd "$(dirname "$0")" && pwd)"
cd "$HERE"
VERSION="$(grep -oP '__version__\s*=\s*"\K[^"]+' gitswarm.py)"
VERSION="$(sed -n 's/^__version__[[:space:]]*=[[:space:]]*"\([^"]*\)".*/\1/p' gitswarm.py | head -n1)"
if [ -z "$VERSION" ]; then
echo "[build] no se pudo leer __version__ de gitswarm.py" >&2
exit 1
@@ -19,9 +19,11 @@ if [ ! -d .venv ]; then
echo "[build] creando venv…"
python3 -m venv .venv
.venv/bin/pip install --quiet --upgrade pip
.venv/bin/pip install --quiet -r requirements.txt
fi
echo "[build] sincronizando dependencias…"
.venv/bin/pip install --quiet -r requirements.txt
if ! .venv/bin/python -c "import nuitka" 2>/dev/null; then
echo "[build] instalando nuitka en el venv…"
.venv/bin/pip install --quiet nuitka
@@ -40,6 +42,7 @@ echo "[build] compilando (esto puede tardar 1-2 min)…"
--remove-output \
--lto=yes \
--include-package=rich \
--include-package=readchar \
gitswarm.py
echo "[build] empaquetando release ${RELEASE_NAME}.tar.gz…"
+473 -123
View File
@@ -1,26 +1,115 @@
#!/usr/bin/env python3
"""gitswarm — lista y actualiza repos git de primer nivel en una carpeta."""
"""gitswarm — TUI para listar y actualizar repos git de primer nivel en una carpeta."""
from __future__ import annotations
import argparse
import shlex
import os
import subprocess
import sys
from concurrent.futures import ThreadPoolExecutor
import threading
import time
from concurrent.futures import ThreadPoolExecutor, as_completed
from dataclasses import dataclass, field
from pathlib import Path
from rich.console import Console
import readchar
from rich.console import Console, Group
from rich.live import Live
from rich.progress import (
BarColumn,
MofNCompleteColumn,
Progress,
SpinnerColumn,
TextColumn,
TimeElapsedColumn,
)
from rich.prompt import Prompt
from rich.table import Table
from rich.text import Text
__version__ = "1.0.0"
__version__ = "1.2.0"
console = Console()
# --- env anti-bloqueo para git ----------------------------------------------
NONINTERACTIVE_ENV = {
"GIT_TERMINAL_PROMPT": "0",
"GIT_ASKPASS": "/bin/true",
"SSH_ASKPASS": "/bin/true",
# SSH: no preguntar, time out rápido si el host no responde
"GIT_SSH_COMMAND": (
"ssh -o BatchMode=yes -o StrictHostKeyChecking=accept-new "
"-o ConnectTimeout=5 -o ServerAliveInterval=5 -o ServerAliveCountMax=2"
),
}
# Config de git pasada con `-c` para acotar timeouts de red en HTTP/HTTPS.
# lowSpeedTime=10s + lowSpeedLimit=1KB/s aborta si no llegan datos en 10s.
NETWORK_GIT_CONFIG = [
"-c", "http.lowSpeedLimit=1000",
"-c", "http.lowSpeedTime=10",
]
def _git_env() -> dict[str, str]:
return {**os.environ, **NONINTERACTIVE_ENV}
def run_git(repo: Path, *args: str, timeout: int = 30, network: bool = False) -> tuple[int, str, str]:
cmd = ["git"]
if network:
cmd.extend(NETWORK_GIT_CONFIG)
cmd.extend(args)
result = subprocess.run(
cmd,
cwd=repo,
capture_output=True,
text=True,
timeout=timeout,
stdin=subprocess.DEVNULL,
env=_git_env(),
)
return result.returncode, result.stdout.strip(), result.stderr.strip()
def looks_like_auth_error(msg: str) -> bool:
m = msg.lower()
return any(
token in m
for token in (
"authentication failed",
"could not read username",
"could not read password",
"permission denied (publickey)",
"host key verification failed",
"terminal prompts disabled",
)
)
def looks_like_network_error(msg: str) -> bool:
m = msg.lower()
return any(
token in m
for token in (
"could not resolve host",
"name or service not known",
"connection refused",
"connection timed out",
"operation timed out",
"network is unreachable",
"no route to host",
"unable to access",
"failed to connect",
)
)
# --- modelo -----------------------------------------------------------------
@dataclass
class RepoStatus:
name: str
@@ -31,19 +120,13 @@ class RepoStatus:
ahead: int = 0
behind: int = 0
has_upstream: bool = True
auth_required: bool = False
unreachable: bool = False
error: str | None = None
dirty_files: list[str] = field(default_factory=list)
def run_git(repo: Path, *args: str, timeout: int = 30) -> tuple[int, str, str]:
result = subprocess.run(
["git", *args],
cwd=repo,
capture_output=True,
text=True,
timeout=timeout,
)
return result.returncode, result.stdout.strip(), result.stderr.strip()
selected: bool = False
busy: bool = False
last_action: str = ""
def is_git_repo(path: Path) -> bool:
@@ -69,7 +152,19 @@ def gather_status(path: Path, fetch: bool = True) -> RepoStatus:
s.dirty_files = out.splitlines()
if fetch:
run_git(path, "fetch", "--quiet", "--all", timeout=120)
try:
rc, _, ferr = run_git(
path, "fetch", "--quiet", "--all",
timeout=30, network=True,
)
if rc != 0:
if looks_like_auth_error(ferr):
s.auth_required = True
elif looks_like_network_error(ferr):
s.unreachable = True
except subprocess.TimeoutExpired:
# El fetch superó 30s: lo damos por host inalcanzable
s.unreachable = True
rc, out, _ = run_git(
path, "rev-list", "--left-right", "--count", "@{u}...HEAD"
@@ -100,15 +195,60 @@ def scan(base: Path, fetch: bool = True) -> list[RepoStatus]:
return []
if not candidates:
return []
msg = f"[cyan]Analizando {len(candidates)} repo(s){' (con fetch)' if fetch else ''}…[/cyan]"
with console.status(msg, spinner="dots"):
results: dict[Path, RepoStatus] = {}
label = "Analizando" + (" (con fetch)" if fetch else "")
with Progress(
SpinnerColumn(),
TextColumn("[cyan]{task.description}[/cyan]"),
BarColumn(),
MofNCompleteColumn(),
TextColumn("[dim]{task.fields[current]}[/dim]"),
TimeElapsedColumn(),
console=console,
transient=True,
) as progress:
task = progress.add_task(label, total=len(candidates), current="")
with ThreadPoolExecutor(max_workers=min(8, len(candidates))) as ex:
return list(ex.map(lambda p: gather_status(p, fetch=fetch), candidates))
futures = {ex.submit(gather_status, p, fetch): p for p in candidates}
for fut in as_completed(futures):
p = futures[fut]
results[p] = fut.result()
progress.update(task, advance=1, current=p.name)
return [results[p] for p in candidates]
# --- render -----------------------------------------------------------------
SPINNER_FRAMES = ["", "", "", "", "", "", "", "", "", ""]
def spinner_frame() -> str:
return SPINNER_FRAMES[int(time.monotonic() * 10) % len(SPINNER_FRAMES)]
def compute_page(cursor: int, total: int) -> tuple[int, int, int, int, int]:
"""Devuelve (start, end, page_idx, total_pages, page_size) para la ventana visible."""
h = max(10, console.size.height)
# Márgenes: título + cabecera + bordes + leyenda + status + colchón
page_size = max(5, h - 8)
if total == 0:
return 0, 0, 0, 1, page_size
page = cursor // page_size
start = page * page_size
end = min(start + page_size, total)
total_pages = (total + page_size - 1) // page_size
return start, end, page, total_pages, page_size
def status_cell(s: RepoStatus) -> Text:
if s.busy:
return Text(f"{spinner_frame()} trabajando…", style="bold blue")
if s.error:
return Text(f"ERROR: {s.error}", style="bold red")
if s.auth_required:
return Text("🔒 auth", style="bold yellow")
if s.unreachable:
return Text("🔌 sin red", style="bold yellow")
if not s.has_upstream:
return Text("sin upstream", style="yellow")
if s.behind == 0 and s.ahead == 0:
@@ -121,153 +261,357 @@ def status_cell(s: RepoStatus) -> Text:
return Text(" ").join(parts)
def render_table(repos: list[RepoStatus]) -> Table:
def progress_bar(done: int, total: int, width: int = 30) -> Text:
if total <= 0:
return Text("")
filled = int(width * done / total)
pct = int(100 * done / total)
bar = Text()
bar.append("" * filled, style="bold green")
bar.append("" * (width - filled), style="dim")
return Text.assemble(
Text("["),
bar,
Text(f"] {done}/{total} ({pct}%)", style="bold"),
)
def render(
repos: list[RepoStatus],
cursor: int,
base: Path,
status_msg: str = "",
progress: tuple[int, int] | None = None,
) -> Group:
start, end, page_idx, total_pages, _ = compute_page(cursor, len(repos))
page_label = f" — página {page_idx + 1}/{total_pages}" if total_pages > 1 else ""
marked = sum(1 for r in repos if r.selected)
marked_label = f" · [bold cyan]{marked} marcado(s)[/bold cyan]" if marked else ""
table = Table(
show_header=True,
header_style="bold magenta",
title="Repositorios git",
title_style="bold white",
title=f"[bold]gitswarm[/bold] — [bold]{base}[/bold][dim]{page_label}[/dim]{marked_label}",
title_style="white",
expand=True,
)
table.add_column("#", justify="right", style="dim")
table.add_column(" ", width=1, no_wrap=True)
table.add_column("#", justify="right", style="dim", width=4)
table.add_column("Repo", style="bold", no_wrap=True, overflow="ellipsis")
table.add_column("Rama", style="cyan", no_wrap=True, overflow="ellipsis")
table.add_column("Estado", no_wrap=True)
table.add_column("Local", no_wrap=True)
table.add_column("Último commit", no_wrap=True, overflow="ellipsis", ratio=1)
for i, s in enumerate(repos, 1):
for i in range(start, end):
s = repos[i]
caret = Text("", style="bold yellow") if i == cursor else Text(" ")
if s.selected:
mark = Text("", style="bold cyan")
else:
mark = Text("", style="dim") if s.behind else Text(" ")
# Repo: cursor + marca + nombre
name = Text(s.name)
if i == cursor:
name.stylize("bold underline")
repo_cell = Text.assemble(mark, " ", name)
dirty = (
Text(f"{len(s.dirty_files)} cambio(s)", style="bold yellow")
Text(f"{len(s.dirty_files)}", style="bold yellow")
if s.dirty
else Text("limpio", style="dim green")
)
last = s.last_action or s.last_commit
last_style = "italic dim" if s.last_action else ""
table.add_row(
str(i),
s.name,
caret,
str(i + 1),
repo_cell,
s.branch,
status_cell(s),
dirty,
s.last_commit,
Text(last, style=last_style) if last_style else Text(last),
)
return table
legend = Text.assemble(
("↑↓ j/k", "bold cyan"), " mover ",
("←→ h/l", "bold cyan"), " página ",
("Space", "bold cyan"), " marcar ",
("a/n", "bold cyan"), " todos/ninguno ",
("Enter", "bold green"), " update ",
("s", "bold cyan"), " status ",
("c", "bold cyan"), " commit ",
("r", "bold cyan"), " refresh ",
("?", "bold cyan"), " ayuda ",
("q", "bold cyan"), " salir",
style="dim",
)
status = Text(status_msg, style="dim italic") if status_msg else Text("")
if progress is not None:
done, total = progress
bar = Text.assemble(
Text("Progreso ", style="bold cyan"),
progress_bar(done, total),
)
return Group(table, legend, bar, status)
return Group(table, legend, status)
# --- acciones ---------------------------------------------------------------
def update_repo(s: RepoStatus) -> tuple[bool, str]:
if s.error:
return False, f"no se puede actualizar (error previo: {s.error})"
return False, f"error previo: {s.error}"
if not s.has_upstream:
return False, "no tiene upstream configurado"
return False, "sin upstream"
if s.dirty:
return False, "tiene cambios locales sin commitear, abortado"
return False, "cambios locales sin commitear, abortado"
if s.behind == 0:
return True, "ya estaba al día"
rc, out, err = run_git(s.path, "pull", "--ff-only", "--quiet", timeout=180)
try:
rc, out, err = run_git(s.path, "pull", "--ff-only", "--quiet", timeout=90, network=True)
except subprocess.TimeoutExpired:
return False, "🔌 timeout (host no responde)"
if rc != 0:
return False, (err or out or "git pull --ff-only falló").splitlines()[0]
msg = (err or out or "git pull --ff-only falló").splitlines()[0]
if looks_like_auth_error(err):
return False, "🔒 necesita credenciales"
if looks_like_network_error(err):
return False, "🔌 host inalcanzable"
return False, msg
return True, f"actualizado (+{s.behind} commit(s))"
def find_repo(repos: list[RepoStatus], token: str) -> RepoStatus | None:
if token.isdigit():
idx = int(token) - 1
return repos[idx] if 0 <= idx < len(repos) else None
exact = [r for r in repos if r.name == token]
if exact:
return exact[0]
partial = [r for r in repos if token.lower() in r.name.lower()]
if len(partial) == 1:
return partial[0]
if len(partial) > 1:
names = ", ".join(r.name for r in partial)
console.print(f"[yellow]'{token}' es ambiguo: {names}[/yellow]")
return None
def cmd_update(repos: list[RepoStatus], args: list[str]) -> None:
if not args:
console.print("[red]Uso: update <repo|nº|all>[/red]")
def run_update_queue(repos: list[RepoStatus], live: Live, cursor: int, base: Path) -> None:
pending = [r for r in repos if r.selected and r.behind > 0 and not r.dirty and r.has_upstream and not r.error]
if not pending:
live.update(render(repos, cursor, base, "Nada que actualizar (marca algún repo behind con Space)."), refresh=True)
return
if args == ["all"]:
targets = list(repos)
else:
targets = []
for token in args:
r = find_repo(repos, token)
if r is None:
console.print(f"[red]✗ repo no encontrado: {token}[/red]")
continue
targets.append(r)
if not targets:
return
for r in targets:
with console.status(f"[cyan]actualizando {r.name}…[/cyan]", spinner="dots"):
ok, msg = update_repo(r)
icon = "[green]✓[/green]" if ok else "[red]✗[/red]"
console.print(f" {icon} [bold]{r.name}[/bold]: {msg}")
idx = repos.index(r)
repos[idx] = gather_status(r.path, fetch=False)
for r in pending:
r.busy = True
r.selected = False
total = len(pending)
stop = threading.Event()
state = {"ok": 0, "done": 0}
def animate() -> None:
while not stop.is_set():
in_progress = total - state["done"]
msg = f"Actualizando · {in_progress} en curso · {state['ok']} ok"
live.update(
render(repos, cursor, base, msg, progress=(state["done"], total)),
refresh=True,
)
stop.wait(0.1)
anim = threading.Thread(target=animate, daemon=True)
anim.start()
try:
with ThreadPoolExecutor(max_workers=min(4, total)) as ex:
futures = {ex.submit(update_repo, r): r for r in pending}
for fut in as_completed(futures):
r = futures[fut]
ok, msg = fut.result()
r.busy = False
r.last_action = ("" if ok else "") + msg
if ok:
state["ok"] += 1
state["done"] += 1
# Re-leer ahead/behind sin fetch (el pull ya lo bajó)
fresh = gather_status(r.path, fetch=False)
fresh.selected = False
fresh.last_action = r.last_action
idx = repos.index(r)
repos[idx] = fresh
finally:
stop.set()
anim.join()
live.update(render(repos, cursor, base, f"Listo: {state['ok']}/{total} actualizados."), refresh=True)
HELP = """[bold]Comandos:[/bold]
[cyan]list[/cyan] / [cyan]ls[/cyan] Vuelve a mostrar la tabla
[cyan]refresh[/cyan] / [cyan]r[/cyan] Re-escanea haciendo git fetch
[cyan]update <repo>[/cyan] git pull --ff-only en ese repo (acepta nº, nombre o trozo)
[cyan]update all[/cyan] Actualiza todos los que estén behind
[cyan]version[/cyan] / [cyan]v[/cyan] Muestra la versión
[cyan]help[/cyan] / [cyan]?[/cyan] Esta ayuda
[cyan]quit[/cyan] / [cyan]q[/cyan] Salir (también Ctrl-D / Ctrl-C)
def show_status(s: RepoStatus, live: Live) -> None:
"""Suspende el Live y muestra `git status` del repo."""
live.stop()
try:
console.clear()
console.rule(f"[bold]{s.name}[/bold] — git status")
try:
result = subprocess.run(
["git", "-c", "color.status=always", "status"],
cwd=s.path,
stdin=subprocess.DEVNULL,
env=_git_env(),
timeout=15,
)
if result.returncode != 0:
console.print(f"[red]git status devolvió {result.returncode}[/red]")
except subprocess.TimeoutExpired:
console.print("[red]timeout ejecutando git status[/red]")
console.rule("[dim]pulsa cualquier tecla para volver[/dim]")
readchar.readkey()
finally:
live.start(refresh=True)
def do_commit(s: RepoStatus, live: Live) -> str:
"""Suspende el Live, hace add -A y commit. Devuelve mensaje de resultado."""
if s.error:
return f"✗ error previo: {s.error}"
if s.branch == "(detached)":
return "✗ HEAD detached, no se puede commitear"
if not s.dirty:
return "nada que commitear"
live.stop()
msg_result = ""
try:
console.clear()
console.rule(f"[bold]{s.name}[/bold] — commit")
# Resumen de lo que se va a stagear
added = modified = deleted = 0
for line in s.dirty_files:
code = line[:2]
if "?" in code:
added += 1
elif "D" in code:
deleted += 1
else:
modified += 1
console.print(
f"Se va a hacer [bold]git add -A[/bold] e incluir: "
f"[green]+{added} nuevos[/green], [yellow]~{modified} modificados[/yellow], "
f"[red]-{deleted} borrados[/red]"
)
try:
cm = Prompt.ask("[bold]Mensaje de commit[/bold] (vacío = cancelar)", default="").strip()
except (EOFError, KeyboardInterrupt):
cm = ""
if not cm:
msg_result = "commit cancelado"
else:
rc, _, err = run_git(s.path, "add", "-A", timeout=60)
if rc != 0:
msg_result = f"✗ git add falló: {(err or '').splitlines()[0] if err else 'sin detalle'}"
else:
rc, out, err = run_git(s.path, "commit", "-m", cm, timeout=60)
if rc != 0:
line = (err or out or "commit falló").splitlines()[0]
msg_result = f"✗ commit falló: {line}"
else:
msg_result = "✓ commit creado"
console.rule("[dim]pulsa cualquier tecla para volver[/dim]")
readchar.readkey()
finally:
live.start(refresh=True)
return msg_result
# --- ayuda ------------------------------------------------------------------
HELP_TEXT = """[bold]gitswarm — atajos de teclado[/bold]
[cyan]↑ / ↓ / j / k[/cyan] mover el cursor
[cyan]← / → / h / l[/cyan] cambiar de página
[cyan]g / G[/cyan] ir al inicio / final
[cyan]Space[/cyan] marcar/desmarcar el repo bajo el cursor
[cyan]a[/cyan] marcar todos los que estén behind
[cyan]n[/cyan] desmarcar todos
[cyan]Enter[/cyan] ejecuta [bold]git pull --ff-only[/bold] en los marcados
[cyan]s[/cyan] ver [bold]git status[/bold] del repo bajo cursor
[cyan]c[/cyan] commit en el repo bajo cursor (git add -A + mensaje)
[cyan]r[/cyan] refrescar (re-fetch de todos)
[cyan]?[/cyan] esta ayuda
[cyan]q[/cyan] salir (también Ctrl-C / Ctrl-D)
"""
def repl(base: Path) -> None:
console.print(
f"[dim]gitswarm v{__version__} — escaneando[/dim] [bold]{base}[/bold]"
)
repos = scan(base, fetch=True)
def show_help(live: Live) -> None:
live.stop()
try:
console.clear()
console.print(HELP_TEXT)
console.rule("[dim]pulsa cualquier tecla para volver[/dim]")
readchar.readkey()
finally:
live.start(refresh=True)
# --- TUI loop ---------------------------------------------------------------
def tui(repos: list[RepoStatus], base: Path) -> None:
if not repos:
console.print(f"[yellow]No se han encontrado repos git en {base}[/yellow]")
return
console.print(render_table(repos))
console.print("[dim]Escribe 'help' para ver los comandos.[/dim]")
while True:
try:
raw = Prompt.ask("\n[bold blue]gitswarm[/bold blue]").strip()
except (EOFError, KeyboardInterrupt):
console.print()
return
if not raw:
continue
try:
parts = shlex.split(raw)
except ValueError as e:
console.print(f"[red]Comando mal formado: {e}[/red]")
continue
cmd, *args = parts
cmd = cmd.lower()
if cmd in ("quit", "exit", "q"):
return
elif cmd in ("help", "?", "h"):
console.print(HELP)
elif cmd in ("list", "ls"):
console.print(render_table(repos))
elif cmd in ("version", "v"):
console.print(f"gitswarm v{__version__}")
elif cmd in ("refresh", "r"):
repos = scan(base, fetch=True)
console.print(render_table(repos))
elif cmd == "update":
cmd_update(repos, args)
console.print(render_table(repos))
else:
console.print(
f"[red]Comando desconocido: {cmd}[/red] (prueba [cyan]help[/cyan])"
)
cursor = 0
status_msg = ""
with Live(render(repos, cursor, base, status_msg), console=console, screen=False, auto_refresh=False) as live:
while True:
live.update(render(repos, cursor, base, status_msg), refresh=True)
try:
key = readchar.readkey()
except KeyboardInterrupt:
return
status_msg = ""
if key in ("q", "Q", readchar.key.CTRL_C, readchar.key.CTRL_D):
return
elif key in (readchar.key.UP, "k"):
cursor = (cursor - 1) % len(repos)
elif key in (readchar.key.DOWN, "j"):
cursor = (cursor + 1) % len(repos)
elif key in (readchar.key.LEFT, readchar.key.PAGE_UP, "h"):
_, _, page_idx, total_pages, page_size = compute_page(cursor, len(repos))
new_page = (page_idx - 1) % total_pages
cursor = min(new_page * page_size, len(repos) - 1)
elif key in (readchar.key.RIGHT, readchar.key.PAGE_DOWN, "l"):
_, _, page_idx, total_pages, page_size = compute_page(cursor, len(repos))
new_page = (page_idx + 1) % total_pages
cursor = min(new_page * page_size, len(repos) - 1)
elif key in (readchar.key.HOME, "g"):
cursor = 0
elif key in (readchar.key.END, "G"):
cursor = len(repos) - 1
elif key == " ":
r = repos[cursor]
if r.behind > 0 and not r.dirty and r.has_upstream and not r.error:
r.selected = not r.selected
else:
status_msg = "ese repo no se puede actualizar (limpio, sin upstream, dirty o con error)"
elif key == "a":
for r in repos:
if r.behind > 0 and not r.dirty and r.has_upstream and not r.error:
r.selected = True
elif key == "n":
for r in repos:
r.selected = False
elif key in (readchar.key.ENTER, "\r", "\n"):
run_update_queue(repos, live, cursor, base)
elif key in ("s", "S"):
show_status(repos[cursor], live)
elif key in ("c", "C"):
msg = do_commit(repos[cursor], live)
# refresca el estado del repo tras el commit
fresh = gather_status(repos[cursor].path, fetch=False)
fresh.last_action = msg
repos[cursor] = fresh
elif key in ("r", "R"):
status_msg = "refrescando…"
live.update(render(repos, cursor, base, status_msg), refresh=True)
new_repos = scan(base, fetch=True)
if new_repos:
repos[:] = new_repos
cursor = min(cursor, len(repos) - 1)
status_msg = f"refrescado: {len(repos)} repo(s)"
elif key == "?":
show_help(live)
# --- entrypoint -------------------------------------------------------------
def main() -> int:
parser = argparse.ArgumentParser(
prog="gitswarm",
description="Lista y actualiza repos git de primer nivel en una carpeta.",
description="TUI para listar y actualizar repos git de primer nivel en una carpeta.",
)
parser.add_argument(
"-v",
@@ -280,7 +624,13 @@ def main() -> int:
if not args.path.is_dir():
console.print(f"[red]No es un directorio válido: {args.path}[/red]")
return 1
repl(args.path.resolve())
base = args.path.resolve()
console.print(f"[dim]gitswarm v{__version__} — escaneando[/dim] [bold]{base}[/bold]")
repos = scan(base, fetch=True)
try:
tui(repos, base)
except KeyboardInterrupt:
pass
return 0
+2
View File
@@ -1,2 +1,4 @@
rich>=13.0
readchar>=4.0
# Usat per Nuitka per a comprimir el binari --onefile (build.sh)
zstandard>=0.22