refactor: rename skills from tmux-agent-orchestrate-* to multi-agent-mux-* in backplane scripts and documents
This commit is contained in:
@@ -0,0 +1,334 @@
|
||||
"""Job registry for the multi-agent-mux-delegate-job skill.
|
||||
|
||||
A job record is the single source of truth for one delegated unit of work:
|
||||
its id, prompt, owning agent session, broker connection, timeouts, and status.
|
||||
Records live as ``<registry_dir>/<job_id>.json`` with an append-only event log
|
||||
``<registry_dir>/<job_id>.events.log`` and a shared ``<registry_dir>/.lock``.
|
||||
|
||||
Concurrency is handled via the fcntl lock in :mod:`mqtt_common` (PoC). For
|
||||
multi-host delegation, migrate to SQLite WAL — see references/registry.md.
|
||||
|
||||
Importable as a library and runnable as a CLI (``register``/``list``/``get``/
|
||||
``status``/``pick``) so the ``multi-agent-mux-delegate-job`` bash wrapper can shell out.
|
||||
"""
|
||||
from __future__ import annotations
|
||||
|
||||
import argparse
|
||||
import json
|
||||
import logging
|
||||
import sys
|
||||
import uuid
|
||||
from pathlib import Path
|
||||
from typing import Any, Dict, List, Optional
|
||||
|
||||
import mqtt_common
|
||||
from mqtt_common import (
|
||||
DEFAULT_REGISTRY_DIR,
|
||||
SCHEMA_VERSION,
|
||||
_atomic_write_record,
|
||||
_utcnow,
|
||||
broker_config_from_env,
|
||||
load_job,
|
||||
registry_lock,
|
||||
topic_prefix_for,
|
||||
)
|
||||
|
||||
logger = logging.getLogger("delegate_job.registry")
|
||||
|
||||
TERMINAL_STATUSES = ("completed", "error", "cancelled")
|
||||
VALID_STATUSES = ("pending", "running", "completed", "error", "cancelled")
|
||||
|
||||
|
||||
def generate_job_id(bits: int = 32) -> str:
|
||||
"""PoC: 32-bit hex (8 chars). Production: 128-bit (full uuid4 hex)."""
|
||||
if bits >= 128:
|
||||
return uuid.uuid4().hex
|
||||
nibbles = max(1, bits // 4)
|
||||
return uuid.uuid4().hex[:nibbles]
|
||||
|
||||
|
||||
def register_job(
|
||||
prompt: str,
|
||||
agent: str = "claude-code",
|
||||
agent_session: str = "tmux:claude",
|
||||
broker: Optional[Dict[str, Any]] = None,
|
||||
timeout_sec: int = 3600,
|
||||
idle_timeout_sec: int = 120,
|
||||
registry_dir: str = DEFAULT_REGISTRY_DIR,
|
||||
job_id: Optional[str] = None,
|
||||
expected_artifacts: Optional[List[str]] = None,
|
||||
bits: int = 32,
|
||||
auth_token: Optional[str] = None,
|
||||
) -> str:
|
||||
"""Create a new ``pending`` job record and return its id.
|
||||
|
||||
``broker`` defaults to the current environment's resolved broker block, so
|
||||
the registry alone is enough for ``publish_event.py`` to connect later.
|
||||
"""
|
||||
job_id = job_id or generate_job_id(bits)
|
||||
if broker is None:
|
||||
broker = broker_config_from_env().to_registry_block()
|
||||
if auth_token is None:
|
||||
# Auto-generate token if secure broker configuration (TLS or username) is detected
|
||||
if broker.get("tls") or broker.get("username"):
|
||||
import secrets
|
||||
auth_token = secrets.token_urlsafe(32)
|
||||
now = _utcnow()
|
||||
record: Dict[str, Any] = {
|
||||
"schema_version": SCHEMA_VERSION,
|
||||
"job_id": job_id,
|
||||
"status": "pending",
|
||||
"created_at": now,
|
||||
"updated_at": now,
|
||||
"prompt": prompt,
|
||||
"agent": agent,
|
||||
"agent_session": agent_session,
|
||||
"broker": broker,
|
||||
"topic_prefix": topic_prefix_for(job_id),
|
||||
"timeout_sec": int(timeout_sec),
|
||||
"idle_timeout_sec": int(idle_timeout_sec),
|
||||
"expected_artifacts": expected_artifacts or [],
|
||||
"last_seq": 0,
|
||||
"auth_token": auth_token,
|
||||
}
|
||||
with registry_lock(registry_dir):
|
||||
if mqtt_common._job_path(job_id, registry_dir).exists():
|
||||
raise FileExistsError(f"job already exists: {job_id}")
|
||||
_atomic_write_record(job_id, registry_dir, record)
|
||||
# Seed the persistent audit log (meta.json + status.json + a "registered"
|
||||
# event). Best-effort inside init_job_log — never blocks registration.
|
||||
mqtt_common.init_job_log(job_id, meta=record)
|
||||
logger.info("registered job %s (agent=%s session=%s)", job_id, agent, agent_session)
|
||||
return job_id
|
||||
|
||||
|
||||
def pick_pending(agent_session: str, registry_dir: str = DEFAULT_REGISTRY_DIR) -> Optional[str]:
|
||||
"""Claim the oldest ``pending`` job for ``agent_session``, flipping it to
|
||||
``running`` atomically under the lock. Returns the job id, or None if no
|
||||
pending job matches. This is how each tmux session takes only its own work
|
||||
without two sessions grabbing the same job."""
|
||||
with registry_lock(registry_dir):
|
||||
candidates = []
|
||||
for record in _iter_records(registry_dir):
|
||||
if record.get("status") == "pending" and record.get("agent_session") == agent_session:
|
||||
candidates.append(record)
|
||||
if not candidates:
|
||||
return None
|
||||
candidates.sort(key=lambda r: r.get("created_at", ""))
|
||||
chosen = candidates[0]
|
||||
chosen["status"] = "running"
|
||||
chosen["updated_at"] = _utcnow()
|
||||
_atomic_write_record(chosen["job_id"], registry_dir, chosen)
|
||||
logger.info("session %s picked job %s", agent_session, chosen["job_id"])
|
||||
job_id = chosen["job_id"]
|
||||
updated_at = chosen["updated_at"]
|
||||
# pick_pending writes the record directly (not via update_job_status), so it
|
||||
# mirrors the pending->running transition into the audit log here. Best-effort.
|
||||
mqtt_common.update_logged_status(job_id, "running", updated_at=updated_at)
|
||||
mqtt_common.append_event(job_id, {
|
||||
"event": "status_changed",
|
||||
"from": "pending",
|
||||
"to": "running",
|
||||
"by": agent_session,
|
||||
"timestamp": updated_at,
|
||||
})
|
||||
return job_id
|
||||
|
||||
|
||||
def update_status(job_id: str, registry_dir: str, status: str) -> Dict[str, Any]:
|
||||
if status not in VALID_STATUSES:
|
||||
raise ValueError(f"invalid status {status!r}; expected one of {VALID_STATUSES}")
|
||||
return mqtt_common.update_job_status(job_id, registry_dir, status=status)
|
||||
|
||||
|
||||
def list_jobs(registry_dir: str = DEFAULT_REGISTRY_DIR, status: Optional[str] = None) -> List[Dict[str, Any]]:
|
||||
records = list(_iter_records(registry_dir))
|
||||
if status:
|
||||
records = [r for r in records if r.get("status") == status]
|
||||
records.sort(key=lambda r: r.get("created_at", ""))
|
||||
return records
|
||||
|
||||
|
||||
def append_event(job_id: str, registry_dir: str, payload: Dict[str, Any]) -> None:
|
||||
"""Append one event payload as a JSON line to the job's events log. Best
|
||||
effort, debug-only; failures are logged but never raised to the caller."""
|
||||
try:
|
||||
Path(registry_dir).mkdir(parents=True, exist_ok=True)
|
||||
log_path = Path(registry_dir) / f"{job_id}.events.log"
|
||||
with open(log_path, "a", encoding="utf-8") as fh:
|
||||
fh.write(json.dumps(payload, ensure_ascii=False) + "\n")
|
||||
except OSError as exc: # pragma: no cover - best effort
|
||||
logger.warning("could not append event for %s: %s", job_id, exc)
|
||||
|
||||
|
||||
# convenience re-export so callers can `from registry import load_job`
|
||||
__all__ = [
|
||||
"register_job", "pick_pending", "update_status", "load_job",
|
||||
"list_jobs", "append_event", "generate_job_id",
|
||||
]
|
||||
|
||||
|
||||
def _iter_records(registry_dir: str):
|
||||
base = Path(registry_dir)
|
||||
if not base.exists():
|
||||
return
|
||||
for path in sorted(base.glob("*.json")):
|
||||
try:
|
||||
with open(path, "r", encoding="utf-8") as fh:
|
||||
yield json.load(fh)
|
||||
except (OSError, json.JSONDecodeError) as exc:
|
||||
logger.warning("skipping unreadable record %s: %s", path, exc)
|
||||
|
||||
|
||||
# --------------------------------------------------------------------------
|
||||
# CLI (so the bash wrapper can shell out without inline python)
|
||||
# --------------------------------------------------------------------------
|
||||
def _build_parser() -> argparse.ArgumentParser:
|
||||
parser = argparse.ArgumentParser(description="multi-agent-mux-delegate-job registry CLI")
|
||||
parser.add_argument("--registry-dir", default=DEFAULT_REGISTRY_DIR)
|
||||
sub = parser.add_subparsers(dest="command", required=True)
|
||||
|
||||
p_reg = sub.add_parser("register", help="create a pending job; prints the job id")
|
||||
p_reg.add_argument("--prompt", required=True)
|
||||
p_reg.add_argument("--agent", default="claude-code")
|
||||
p_reg.add_argument("--agent-session", default="tmux:claude")
|
||||
p_reg.add_argument("--timeout", type=int, default=3600)
|
||||
p_reg.add_argument("--idle-timeout", type=int, default=120)
|
||||
p_reg.add_argument("--bits", type=int, default=32, help="32 (PoC) or 128 (prod)")
|
||||
p_reg.add_argument("--artifact", action="append", default=[], dest="artifacts")
|
||||
p_reg.add_argument("--auth-token", default=None, help="HMAC auth token for the job (auto-generated if secure broker is detected)")
|
||||
|
||||
p_list = sub.add_parser("list", help="list jobs (optionally by status)")
|
||||
p_list.add_argument("--status", default=None)
|
||||
p_list.add_argument("--json", action="store_true")
|
||||
|
||||
p_get = sub.add_parser("get", help="print one job record as JSON")
|
||||
p_get.add_argument("--job", required=True)
|
||||
|
||||
p_status = sub.add_parser("status", help="set a job status")
|
||||
p_status.add_argument("--job", required=True)
|
||||
p_status.add_argument("--set", required=True, dest="status")
|
||||
|
||||
p_pick = sub.add_parser("pick", help="claim a pending job for a session; prints id")
|
||||
p_pick.add_argument("--agent-session", default="tmux:claude")
|
||||
|
||||
p_logs = sub.add_parser(
|
||||
"logs",
|
||||
help="show the persistent audit log for a job, or --list every logged job",
|
||||
)
|
||||
p_logs.add_argument("job_id", nargs="?", default=None,
|
||||
help="job id whose events.ndjson to print")
|
||||
p_logs.add_argument("--list", action="store_true", dest="list_all",
|
||||
help="summarise every job under the logs dir instead")
|
||||
p_logs.add_argument("--logs-dir", default=None,
|
||||
help="override the audit-log root (default: $DELEGATE_JOB_LOGS_DIR "
|
||||
"or <cwd>/.mam/delegate_job_logs)")
|
||||
p_logs.add_argument("--tail", type=int, default=0,
|
||||
help="show only the last N events (0 = all)")
|
||||
p_logs.add_argument("--json", action="store_true",
|
||||
help="emit raw JSON lines / records instead of a table")
|
||||
|
||||
return parser
|
||||
|
||||
|
||||
def main(argv: Optional[List[str]] = None) -> int:
|
||||
mqtt_common.setup_logging(logging.INFO)
|
||||
args = _build_parser().parse_args(argv)
|
||||
rd = args.registry_dir
|
||||
|
||||
if args.command == "register":
|
||||
job_id = register_job(
|
||||
prompt=args.prompt,
|
||||
agent=args.agent,
|
||||
agent_session=args.agent_session,
|
||||
timeout_sec=args.timeout,
|
||||
idle_timeout_sec=args.idle_timeout,
|
||||
registry_dir=rd,
|
||||
expected_artifacts=args.artifacts,
|
||||
bits=args.bits,
|
||||
auth_token=args.auth_token,
|
||||
)
|
||||
print(job_id)
|
||||
return 0
|
||||
|
||||
if args.command == "list":
|
||||
records = list_jobs(rd, status=args.status)
|
||||
if args.json:
|
||||
print(json.dumps(records, ensure_ascii=False, indent=2))
|
||||
else:
|
||||
if not records:
|
||||
print("(no jobs)")
|
||||
for r in records:
|
||||
print(f"{r['job_id']} {r.get('status','?'):10s} {r.get('agent_session','')}"
|
||||
f" {r.get('prompt','')[:48]}")
|
||||
return 0
|
||||
|
||||
if args.command == "get":
|
||||
try:
|
||||
print(json.dumps(load_job(args.job, rd), ensure_ascii=False, indent=2))
|
||||
except FileNotFoundError as exc:
|
||||
print(str(exc), file=sys.stderr)
|
||||
return 1
|
||||
return 0
|
||||
|
||||
if args.command == "status":
|
||||
try:
|
||||
update_status(args.job, rd, args.status)
|
||||
except (FileNotFoundError, ValueError) as exc:
|
||||
print(str(exc), file=sys.stderr)
|
||||
return 1
|
||||
return 0
|
||||
|
||||
if args.command == "pick":
|
||||
job_id = pick_pending(args.agent_session, rd)
|
||||
if job_id is None:
|
||||
return 3 # no pending job for this session
|
||||
print(job_id)
|
||||
return 0
|
||||
|
||||
if args.command == "logs":
|
||||
return _cmd_logs(args)
|
||||
|
||||
return 1
|
||||
|
||||
|
||||
def _cmd_logs(args) -> int:
|
||||
"""Pretty-print one job's events.ndjson, or summarise all logged jobs."""
|
||||
logs_dir = args.logs_dir or mqtt_common.LOGS_DIR
|
||||
|
||||
if args.list_all:
|
||||
jobs = mqtt_common.list_logged_jobs(logs_dir)
|
||||
if args.json:
|
||||
print(json.dumps(jobs, ensure_ascii=False, indent=2))
|
||||
return 0
|
||||
if not jobs:
|
||||
print(f"(no logged jobs under {logs_dir})")
|
||||
return 0
|
||||
for m in jobs:
|
||||
print(f"{m.get('job_id','?')} {m.get('status','?'):10s} "
|
||||
f"{m.get('created_at','-'):20s} {(m.get('prompt') or '')[:48]}")
|
||||
return 0
|
||||
|
||||
if not args.job_id:
|
||||
print("logs requires a <job_id> or --list", file=sys.stderr)
|
||||
return 1
|
||||
|
||||
events = list(mqtt_common.iter_logged_events(args.job_id, logs_dir))
|
||||
if not events and not mqtt_common.job_log_dir(args.job_id, logs_dir).exists():
|
||||
print(f"no audit log for job {args.job_id} under {logs_dir}", file=sys.stderr)
|
||||
return 1
|
||||
if args.tail and args.tail > 0:
|
||||
events = events[-args.tail:]
|
||||
if args.json:
|
||||
for e in events:
|
||||
print(json.dumps(e, ensure_ascii=False))
|
||||
return 0
|
||||
for e in events:
|
||||
ts = e.get("logged_at") or e.get("timestamp") or "-"
|
||||
extra = e.get("detail") or e.get("to") or e.get("source_event") or ""
|
||||
print(f"{ts:24s} {e.get('event','?'):<16s} {extra}")
|
||||
return 0
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
sys.exit(main())
|
||||
Reference in New Issue
Block a user