Files
multi-agent-mux/skills/tmux-agent-orchestrate-delegate-job/scripts/registry.py
T
Godopu a6f7c045bc feat(delegate-job): bump default --timeout 600s -> 3600s (1h wall-clock budget)
Changed 11 locations across 5 files:
- scripts/registry.py: timeout_sec dataclass default + argparse default
- scripts/job_subscriber.py: help text + fallback default
- SKILL.md: 4 recommended invocation examples
- registry.md: JSON example + CLI example
- tmux-agent-orchestrate-delegate-job: bash wrapper TIMEOUT var

--idle-timeout 120s preserved unchanged.
Rationale: 10min default was too short for deep analysis / multi-file
generation tasks; 1h aligns with long-running agent delegation patterns.
2026-06-21 06:08:49 +00:00

328 lines
12 KiB
Python
Executable File

"""Job registry for the tmux-agent-orchestrate-delegate-job skill.
A job record is the single source of truth for one delegated unit of work:
its id, prompt, owning agent session, broker connection, timeouts, and status.
Records live as ``<registry_dir>/<job_id>.json`` with an append-only event log
``<registry_dir>/<job_id>.events.log`` and a shared ``<registry_dir>/.lock``.
Concurrency is handled via the fcntl lock in :mod:`mqtt_common` (PoC). For
multi-host delegation, migrate to SQLite WAL — see references/registry.md.
Importable as a library and runnable as a CLI (``register``/``list``/``get``/
``status``/``pick``) so the ``tmux-agent-orchestrate-delegate-job`` bash wrapper can shell out.
"""
from __future__ import annotations
import argparse
import json
import logging
import sys
import uuid
from pathlib import Path
from typing import Any, Dict, List, Optional
import mqtt_common
from mqtt_common import (
DEFAULT_REGISTRY_DIR,
SCHEMA_VERSION,
_atomic_write_record,
_utcnow,
broker_config_from_env,
load_job,
registry_lock,
topic_prefix_for,
)
logger = logging.getLogger("delegate_job.registry")
TERMINAL_STATUSES = ("completed", "error", "cancelled")
VALID_STATUSES = ("pending", "running", "completed", "error", "cancelled")
def generate_job_id(bits: int = 32) -> str:
"""PoC: 32-bit hex (8 chars). Production: 128-bit (full uuid4 hex)."""
if bits >= 128:
return uuid.uuid4().hex
nibbles = max(1, bits // 4)
return uuid.uuid4().hex[:nibbles]
def register_job(
prompt: str,
agent: str = "claude-code",
agent_session: str = "tmux:claude",
broker: Optional[Dict[str, Any]] = None,
timeout_sec: int = 3600,
idle_timeout_sec: int = 120,
registry_dir: str = DEFAULT_REGISTRY_DIR,
job_id: Optional[str] = None,
expected_artifacts: Optional[List[str]] = None,
bits: int = 32,
auth_token: Optional[str] = None,
) -> str:
"""Create a new ``pending`` job record and return its id.
``broker`` defaults to the current environment's resolved broker block, so
the registry alone is enough for ``publish_event.py`` to connect later.
"""
job_id = job_id or generate_job_id(bits)
if broker is None:
broker = broker_config_from_env().to_registry_block()
now = _utcnow()
record: Dict[str, Any] = {
"schema_version": SCHEMA_VERSION,
"job_id": job_id,
"status": "pending",
"created_at": now,
"updated_at": now,
"prompt": prompt,
"agent": agent,
"agent_session": agent_session,
"broker": broker,
"topic_prefix": topic_prefix_for(job_id),
"timeout_sec": int(timeout_sec),
"idle_timeout_sec": int(idle_timeout_sec),
"expected_artifacts": expected_artifacts or [],
"last_seq": 0,
"auth_token": auth_token,
}
with registry_lock(registry_dir):
if mqtt_common._job_path(job_id, registry_dir).exists():
raise FileExistsError(f"job already exists: {job_id}")
_atomic_write_record(job_id, registry_dir, record)
# Seed the persistent audit log (meta.json + status.json + a "registered"
# event). Best-effort inside init_job_log — never blocks registration.
mqtt_common.init_job_log(job_id, meta=record)
logger.info("registered job %s (agent=%s session=%s)", job_id, agent, agent_session)
return job_id
def pick_pending(agent_session: str, registry_dir: str = DEFAULT_REGISTRY_DIR) -> Optional[str]:
"""Claim the oldest ``pending`` job for ``agent_session``, flipping it to
``running`` atomically under the lock. Returns the job id, or None if no
pending job matches. This is how each tmux session takes only its own work
without two sessions grabbing the same job."""
with registry_lock(registry_dir):
candidates = []
for record in _iter_records(registry_dir):
if record.get("status") == "pending" and record.get("agent_session") == agent_session:
candidates.append(record)
if not candidates:
return None
candidates.sort(key=lambda r: r.get("created_at", ""))
chosen = candidates[0]
chosen["status"] = "running"
chosen["updated_at"] = _utcnow()
_atomic_write_record(chosen["job_id"], registry_dir, chosen)
logger.info("session %s picked job %s", agent_session, chosen["job_id"])
job_id = chosen["job_id"]
updated_at = chosen["updated_at"]
# pick_pending writes the record directly (not via update_job_status), so it
# mirrors the pending->running transition into the audit log here. Best-effort.
mqtt_common.update_logged_status(job_id, "running", updated_at=updated_at)
mqtt_common.append_event(job_id, {
"event": "status_changed",
"from": "pending",
"to": "running",
"by": agent_session,
"timestamp": updated_at,
})
return job_id
def update_status(job_id: str, registry_dir: str, status: str) -> Dict[str, Any]:
if status not in VALID_STATUSES:
raise ValueError(f"invalid status {status!r}; expected one of {VALID_STATUSES}")
return mqtt_common.update_job_status(job_id, registry_dir, status=status)
def list_jobs(registry_dir: str = DEFAULT_REGISTRY_DIR, status: Optional[str] = None) -> List[Dict[str, Any]]:
records = list(_iter_records(registry_dir))
if status:
records = [r for r in records if r.get("status") == status]
records.sort(key=lambda r: r.get("created_at", ""))
return records
def append_event(job_id: str, registry_dir: str, payload: Dict[str, Any]) -> None:
"""Append one event payload as a JSON line to the job's events log. Best
effort, debug-only; failures are logged but never raised to the caller."""
try:
Path(registry_dir).mkdir(parents=True, exist_ok=True)
log_path = Path(registry_dir) / f"{job_id}.events.log"
with open(log_path, "a", encoding="utf-8") as fh:
fh.write(json.dumps(payload, ensure_ascii=False) + "\n")
except OSError as exc: # pragma: no cover - best effort
logger.warning("could not append event for %s: %s", job_id, exc)
# convenience re-export so callers can `from registry import load_job`
__all__ = [
"register_job", "pick_pending", "update_status", "load_job",
"list_jobs", "append_event", "generate_job_id",
]
def _iter_records(registry_dir: str):
base = Path(registry_dir)
if not base.exists():
return
for path in sorted(base.glob("*.json")):
try:
with open(path, "r", encoding="utf-8") as fh:
yield json.load(fh)
except (OSError, json.JSONDecodeError) as exc:
logger.warning("skipping unreadable record %s: %s", path, exc)
# --------------------------------------------------------------------------
# CLI (so the bash wrapper can shell out without inline python)
# --------------------------------------------------------------------------
def _build_parser() -> argparse.ArgumentParser:
parser = argparse.ArgumentParser(description="tmux-agent-orchestrate-delegate-job registry CLI")
parser.add_argument("--registry-dir", default=DEFAULT_REGISTRY_DIR)
sub = parser.add_subparsers(dest="command", required=True)
p_reg = sub.add_parser("register", help="create a pending job; prints the job id")
p_reg.add_argument("--prompt", required=True)
p_reg.add_argument("--agent", default="claude-code")
p_reg.add_argument("--agent-session", default="tmux:claude")
p_reg.add_argument("--timeout", type=int, default=3600)
p_reg.add_argument("--idle-timeout", type=int, default=120)
p_reg.add_argument("--bits", type=int, default=32, help="32 (PoC) or 128 (prod)")
p_reg.add_argument("--artifact", action="append", default=[], dest="artifacts")
p_list = sub.add_parser("list", help="list jobs (optionally by status)")
p_list.add_argument("--status", default=None)
p_list.add_argument("--json", action="store_true")
p_get = sub.add_parser("get", help="print one job record as JSON")
p_get.add_argument("--job", required=True)
p_status = sub.add_parser("status", help="set a job status")
p_status.add_argument("--job", required=True)
p_status.add_argument("--set", required=True, dest="status")
p_pick = sub.add_parser("pick", help="claim a pending job for a session; prints id")
p_pick.add_argument("--agent-session", default="tmux:claude")
p_logs = sub.add_parser(
"logs",
help="show the persistent audit log for a job, or --list every logged job",
)
p_logs.add_argument("job_id", nargs="?", default=None,
help="job id whose events.ndjson to print")
p_logs.add_argument("--list", action="store_true", dest="list_all",
help="summarise every job under the logs dir instead")
p_logs.add_argument("--logs-dir", default=None,
help="override the audit-log root (default: $DELEGATE_JOB_LOGS_DIR "
"or <cwd>/.hermes/delegate_job_logs)")
p_logs.add_argument("--tail", type=int, default=0,
help="show only the last N events (0 = all)")
p_logs.add_argument("--json", action="store_true",
help="emit raw JSON lines / records instead of a table")
return parser
def main(argv: Optional[List[str]] = None) -> int:
mqtt_common.setup_logging(logging.INFO)
args = _build_parser().parse_args(argv)
rd = args.registry_dir
if args.command == "register":
job_id = register_job(
prompt=args.prompt,
agent=args.agent,
agent_session=args.agent_session,
timeout_sec=args.timeout,
idle_timeout_sec=args.idle_timeout,
registry_dir=rd,
expected_artifacts=args.artifacts,
bits=args.bits,
)
print(job_id)
return 0
if args.command == "list":
records = list_jobs(rd, status=args.status)
if args.json:
print(json.dumps(records, ensure_ascii=False, indent=2))
else:
if not records:
print("(no jobs)")
for r in records:
print(f"{r['job_id']} {r.get('status','?'):10s} {r.get('agent_session','')}"
f" {r.get('prompt','')[:48]}")
return 0
if args.command == "get":
try:
print(json.dumps(load_job(args.job, rd), ensure_ascii=False, indent=2))
except FileNotFoundError as exc:
print(str(exc), file=sys.stderr)
return 1
return 0
if args.command == "status":
try:
update_status(args.job, rd, args.status)
except (FileNotFoundError, ValueError) as exc:
print(str(exc), file=sys.stderr)
return 1
return 0
if args.command == "pick":
job_id = pick_pending(args.agent_session, rd)
if job_id is None:
return 3 # no pending job for this session
print(job_id)
return 0
if args.command == "logs":
return _cmd_logs(args)
return 1
def _cmd_logs(args) -> int:
"""Pretty-print one job's events.ndjson, or summarise all logged jobs."""
logs_dir = args.logs_dir or mqtt_common.LOGS_DIR
if args.list_all:
jobs = mqtt_common.list_logged_jobs(logs_dir)
if args.json:
print(json.dumps(jobs, ensure_ascii=False, indent=2))
return 0
if not jobs:
print(f"(no logged jobs under {logs_dir})")
return 0
for m in jobs:
print(f"{m.get('job_id','?')} {m.get('status','?'):10s} "
f"{m.get('created_at','-'):20s} {(m.get('prompt') or '')[:48]}")
return 0
if not args.job_id:
print("logs requires a <job_id> or --list", file=sys.stderr)
return 1
events = list(mqtt_common.iter_logged_events(args.job_id, logs_dir))
if not events and not mqtt_common.job_log_dir(args.job_id, logs_dir).exists():
print(f"no audit log for job {args.job_id} under {logs_dir}", file=sys.stderr)
return 1
if args.tail and args.tail > 0:
events = events[-args.tail:]
if args.json:
for e in events:
print(json.dumps(e, ensure_ascii=False))
return 0
for e in events:
ts = e.get("logged_at") or e.get("timestamp") or "-"
extra = e.get("detail") or e.get("to") or e.get("source_event") or ""
print(f"{ts:24s} {e.get('event','?'):<16s} {extra}")
return 0
if __name__ == "__main__":
sys.exit(main())