"""Job registry for the tmux-agent-orchestrate-delegate-job skill. A job record is the single source of truth for one delegated unit of work: its id, prompt, owning agent session, broker connection, timeouts, and status. Records live as ``/.json`` with an append-only event log ``/.events.log`` and a shared ``/.lock``. Concurrency is handled via the fcntl lock in :mod:`mqtt_common` (PoC). For multi-host delegation, migrate to SQLite WAL — see references/registry.md. Importable as a library and runnable as a CLI (``register``/``list``/``get``/ ``status``/``pick``) so the ``tmux-agent-orchestrate-delegate-job`` bash wrapper can shell out. """ from __future__ import annotations import argparse import json import logging import sys import uuid from pathlib import Path from typing import Any, Dict, List, Optional import mqtt_common from mqtt_common import ( DEFAULT_REGISTRY_DIR, SCHEMA_VERSION, _atomic_write_record, _utcnow, broker_config_from_env, load_job, registry_lock, topic_prefix_for, ) logger = logging.getLogger("delegate_job.registry") TERMINAL_STATUSES = ("completed", "error", "cancelled") VALID_STATUSES = ("pending", "running", "completed", "error", "cancelled") def generate_job_id(bits: int = 32) -> str: """PoC: 32-bit hex (8 chars). Production: 128-bit (full uuid4 hex).""" if bits >= 128: return uuid.uuid4().hex nibbles = max(1, bits // 4) return uuid.uuid4().hex[:nibbles] def register_job( prompt: str, agent: str = "claude-code", agent_session: str = "tmux:claude", broker: Optional[Dict[str, Any]] = None, timeout_sec: int = 3600, idle_timeout_sec: int = 120, registry_dir: str = DEFAULT_REGISTRY_DIR, job_id: Optional[str] = None, expected_artifacts: Optional[List[str]] = None, bits: int = 32, auth_token: Optional[str] = None, ) -> str: """Create a new ``pending`` job record and return its id. ``broker`` defaults to the current environment's resolved broker block, so the registry alone is enough for ``publish_event.py`` to connect later. """ job_id = job_id or generate_job_id(bits) if broker is None: broker = broker_config_from_env().to_registry_block() now = _utcnow() record: Dict[str, Any] = { "schema_version": SCHEMA_VERSION, "job_id": job_id, "status": "pending", "created_at": now, "updated_at": now, "prompt": prompt, "agent": agent, "agent_session": agent_session, "broker": broker, "topic_prefix": topic_prefix_for(job_id), "timeout_sec": int(timeout_sec), "idle_timeout_sec": int(idle_timeout_sec), "expected_artifacts": expected_artifacts or [], "last_seq": 0, "auth_token": auth_token, } with registry_lock(registry_dir): if mqtt_common._job_path(job_id, registry_dir).exists(): raise FileExistsError(f"job already exists: {job_id}") _atomic_write_record(job_id, registry_dir, record) # Seed the persistent audit log (meta.json + status.json + a "registered" # event). Best-effort inside init_job_log — never blocks registration. mqtt_common.init_job_log(job_id, meta=record) logger.info("registered job %s (agent=%s session=%s)", job_id, agent, agent_session) return job_id def pick_pending(agent_session: str, registry_dir: str = DEFAULT_REGISTRY_DIR) -> Optional[str]: """Claim the oldest ``pending`` job for ``agent_session``, flipping it to ``running`` atomically under the lock. Returns the job id, or None if no pending job matches. This is how each tmux session takes only its own work without two sessions grabbing the same job.""" with registry_lock(registry_dir): candidates = [] for record in _iter_records(registry_dir): if record.get("status") == "pending" and record.get("agent_session") == agent_session: candidates.append(record) if not candidates: return None candidates.sort(key=lambda r: r.get("created_at", "")) chosen = candidates[0] chosen["status"] = "running" chosen["updated_at"] = _utcnow() _atomic_write_record(chosen["job_id"], registry_dir, chosen) logger.info("session %s picked job %s", agent_session, chosen["job_id"]) job_id = chosen["job_id"] updated_at = chosen["updated_at"] # pick_pending writes the record directly (not via update_job_status), so it # mirrors the pending->running transition into the audit log here. Best-effort. mqtt_common.update_logged_status(job_id, "running", updated_at=updated_at) mqtt_common.append_event(job_id, { "event": "status_changed", "from": "pending", "to": "running", "by": agent_session, "timestamp": updated_at, }) return job_id def update_status(job_id: str, registry_dir: str, status: str) -> Dict[str, Any]: if status not in VALID_STATUSES: raise ValueError(f"invalid status {status!r}; expected one of {VALID_STATUSES}") return mqtt_common.update_job_status(job_id, registry_dir, status=status) def list_jobs(registry_dir: str = DEFAULT_REGISTRY_DIR, status: Optional[str] = None) -> List[Dict[str, Any]]: records = list(_iter_records(registry_dir)) if status: records = [r for r in records if r.get("status") == status] records.sort(key=lambda r: r.get("created_at", "")) return records def append_event(job_id: str, registry_dir: str, payload: Dict[str, Any]) -> None: """Append one event payload as a JSON line to the job's events log. Best effort, debug-only; failures are logged but never raised to the caller.""" try: Path(registry_dir).mkdir(parents=True, exist_ok=True) log_path = Path(registry_dir) / f"{job_id}.events.log" with open(log_path, "a", encoding="utf-8") as fh: fh.write(json.dumps(payload, ensure_ascii=False) + "\n") except OSError as exc: # pragma: no cover - best effort logger.warning("could not append event for %s: %s", job_id, exc) # convenience re-export so callers can `from registry import load_job` __all__ = [ "register_job", "pick_pending", "update_status", "load_job", "list_jobs", "append_event", "generate_job_id", ] def _iter_records(registry_dir: str): base = Path(registry_dir) if not base.exists(): return for path in sorted(base.glob("*.json")): try: with open(path, "r", encoding="utf-8") as fh: yield json.load(fh) except (OSError, json.JSONDecodeError) as exc: logger.warning("skipping unreadable record %s: %s", path, exc) # -------------------------------------------------------------------------- # CLI (so the bash wrapper can shell out without inline python) # -------------------------------------------------------------------------- def _build_parser() -> argparse.ArgumentParser: parser = argparse.ArgumentParser(description="tmux-agent-orchestrate-delegate-job registry CLI") parser.add_argument("--registry-dir", default=DEFAULT_REGISTRY_DIR) sub = parser.add_subparsers(dest="command", required=True) p_reg = sub.add_parser("register", help="create a pending job; prints the job id") p_reg.add_argument("--prompt", required=True) p_reg.add_argument("--agent", default="claude-code") p_reg.add_argument("--agent-session", default="tmux:claude") p_reg.add_argument("--timeout", type=int, default=3600) p_reg.add_argument("--idle-timeout", type=int, default=120) p_reg.add_argument("--bits", type=int, default=32, help="32 (PoC) or 128 (prod)") p_reg.add_argument("--artifact", action="append", default=[], dest="artifacts") p_list = sub.add_parser("list", help="list jobs (optionally by status)") p_list.add_argument("--status", default=None) p_list.add_argument("--json", action="store_true") p_get = sub.add_parser("get", help="print one job record as JSON") p_get.add_argument("--job", required=True) p_status = sub.add_parser("status", help="set a job status") p_status.add_argument("--job", required=True) p_status.add_argument("--set", required=True, dest="status") p_pick = sub.add_parser("pick", help="claim a pending job for a session; prints id") p_pick.add_argument("--agent-session", default="tmux:claude") p_logs = sub.add_parser( "logs", help="show the persistent audit log for a job, or --list every logged job", ) p_logs.add_argument("job_id", nargs="?", default=None, help="job id whose events.ndjson to print") p_logs.add_argument("--list", action="store_true", dest="list_all", help="summarise every job under the logs dir instead") p_logs.add_argument("--logs-dir", default=None, help="override the audit-log root (default: $DELEGATE_JOB_LOGS_DIR " "or /.hermes/delegate_job_logs)") p_logs.add_argument("--tail", type=int, default=0, help="show only the last N events (0 = all)") p_logs.add_argument("--json", action="store_true", help="emit raw JSON lines / records instead of a table") return parser def main(argv: Optional[List[str]] = None) -> int: mqtt_common.setup_logging(logging.INFO) args = _build_parser().parse_args(argv) rd = args.registry_dir if args.command == "register": job_id = register_job( prompt=args.prompt, agent=args.agent, agent_session=args.agent_session, timeout_sec=args.timeout, idle_timeout_sec=args.idle_timeout, registry_dir=rd, expected_artifacts=args.artifacts, bits=args.bits, ) print(job_id) return 0 if args.command == "list": records = list_jobs(rd, status=args.status) if args.json: print(json.dumps(records, ensure_ascii=False, indent=2)) else: if not records: print("(no jobs)") for r in records: print(f"{r['job_id']} {r.get('status','?'):10s} {r.get('agent_session','')}" f" {r.get('prompt','')[:48]}") return 0 if args.command == "get": try: print(json.dumps(load_job(args.job, rd), ensure_ascii=False, indent=2)) except FileNotFoundError as exc: print(str(exc), file=sys.stderr) return 1 return 0 if args.command == "status": try: update_status(args.job, rd, args.status) except (FileNotFoundError, ValueError) as exc: print(str(exc), file=sys.stderr) return 1 return 0 if args.command == "pick": job_id = pick_pending(args.agent_session, rd) if job_id is None: return 3 # no pending job for this session print(job_id) return 0 if args.command == "logs": return _cmd_logs(args) return 1 def _cmd_logs(args) -> int: """Pretty-print one job's events.ndjson, or summarise all logged jobs.""" logs_dir = args.logs_dir or mqtt_common.LOGS_DIR if args.list_all: jobs = mqtt_common.list_logged_jobs(logs_dir) if args.json: print(json.dumps(jobs, ensure_ascii=False, indent=2)) return 0 if not jobs: print(f"(no logged jobs under {logs_dir})") return 0 for m in jobs: print(f"{m.get('job_id','?')} {m.get('status','?'):10s} " f"{m.get('created_at','-'):20s} {(m.get('prompt') or '')[:48]}") return 0 if not args.job_id: print("logs requires a or --list", file=sys.stderr) return 1 events = list(mqtt_common.iter_logged_events(args.job_id, logs_dir)) if not events and not mqtt_common.job_log_dir(args.job_id, logs_dir).exists(): print(f"no audit log for job {args.job_id} under {logs_dir}", file=sys.stderr) return 1 if args.tail and args.tail > 0: events = events[-args.tail:] if args.json: for e in events: print(json.dumps(e, ensure_ascii=False)) return 0 for e in events: ts = e.get("logged_at") or e.get("timestamp") or "-" extra = e.get("detail") or e.get("to") or e.get("source_event") or "" print(f"{ts:24s} {e.get('event','?'):<16s} {extra}") return 0 if __name__ == "__main__": sys.exit(main())