335 lines
13 KiB
Python
Executable File
335 lines
13 KiB
Python
Executable File
"""Job registry for the multi-agent-mux-delegate-job skill.
|
|
|
|
A job record is the single source of truth for one delegated unit of work:
|
|
its id, prompt, owning agent session, broker connection, timeouts, and status.
|
|
Records live as ``<registry_dir>/<job_id>.json`` with an append-only event log
|
|
``<registry_dir>/<job_id>.events.log`` and a shared ``<registry_dir>/.lock``.
|
|
|
|
Concurrency is handled via the fcntl lock in :mod:`mqtt_common` (PoC). For
|
|
multi-host delegation, migrate to SQLite WAL — see references/registry.md.
|
|
|
|
Importable as a library and runnable as a CLI (``register``/``list``/``get``/
|
|
``status``/``pick``) so the ``multi-agent-mux-delegate-job`` bash wrapper can shell out.
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import argparse
|
|
import json
|
|
import logging
|
|
import sys
|
|
import uuid
|
|
from pathlib import Path
|
|
from typing import Any, Dict, List, Optional
|
|
|
|
import mqtt_common
|
|
from mqtt_common import (
|
|
DEFAULT_REGISTRY_DIR,
|
|
SCHEMA_VERSION,
|
|
_atomic_write_record,
|
|
_utcnow,
|
|
broker_config_from_env,
|
|
load_job,
|
|
registry_lock,
|
|
topic_prefix_for,
|
|
)
|
|
|
|
logger = logging.getLogger("delegate_job.registry")
|
|
|
|
TERMINAL_STATUSES = ("completed", "error", "cancelled")
|
|
VALID_STATUSES = ("pending", "running", "completed", "error", "cancelled")
|
|
|
|
|
|
def generate_job_id(bits: int = 32) -> str:
|
|
"""PoC: 32-bit hex (8 chars). Production: 128-bit (full uuid4 hex)."""
|
|
if bits >= 128:
|
|
return uuid.uuid4().hex
|
|
nibbles = max(1, bits // 4)
|
|
return uuid.uuid4().hex[:nibbles]
|
|
|
|
|
|
def register_job(
|
|
prompt: str,
|
|
agent: str = "claude-code",
|
|
agent_session: str = "tmux:claude",
|
|
broker: Optional[Dict[str, Any]] = None,
|
|
timeout_sec: int = 3600,
|
|
idle_timeout_sec: int = 120,
|
|
registry_dir: str = DEFAULT_REGISTRY_DIR,
|
|
job_id: Optional[str] = None,
|
|
expected_artifacts: Optional[List[str]] = None,
|
|
bits: int = 32,
|
|
auth_token: Optional[str] = None,
|
|
) -> str:
|
|
"""Create a new ``pending`` job record and return its id.
|
|
|
|
``broker`` defaults to the current environment's resolved broker block, so
|
|
the registry alone is enough for ``publish_event.py`` to connect later.
|
|
"""
|
|
job_id = job_id or generate_job_id(bits)
|
|
if broker is None:
|
|
broker = broker_config_from_env().to_registry_block()
|
|
if auth_token is None:
|
|
# Auto-generate token if secure broker configuration (TLS or username) is detected
|
|
if broker.get("tls") or broker.get("username"):
|
|
import secrets
|
|
auth_token = secrets.token_urlsafe(32)
|
|
now = _utcnow()
|
|
record: Dict[str, Any] = {
|
|
"schema_version": SCHEMA_VERSION,
|
|
"job_id": job_id,
|
|
"status": "pending",
|
|
"created_at": now,
|
|
"updated_at": now,
|
|
"prompt": prompt,
|
|
"agent": agent,
|
|
"agent_session": agent_session,
|
|
"broker": broker,
|
|
"topic_prefix": topic_prefix_for(job_id),
|
|
"timeout_sec": int(timeout_sec),
|
|
"idle_timeout_sec": int(idle_timeout_sec),
|
|
"expected_artifacts": expected_artifacts or [],
|
|
"last_seq": 0,
|
|
"auth_token": auth_token,
|
|
}
|
|
with registry_lock(registry_dir):
|
|
if mqtt_common._job_path(job_id, registry_dir).exists():
|
|
raise FileExistsError(f"job already exists: {job_id}")
|
|
_atomic_write_record(job_id, registry_dir, record)
|
|
# Seed the persistent audit log (meta.json + status.json + a "registered"
|
|
# event). Best-effort inside init_job_log — never blocks registration.
|
|
mqtt_common.init_job_log(job_id, meta=record)
|
|
logger.info("registered job %s (agent=%s session=%s)", job_id, agent, agent_session)
|
|
return job_id
|
|
|
|
|
|
def pick_pending(agent_session: str, registry_dir: str = DEFAULT_REGISTRY_DIR) -> Optional[str]:
|
|
"""Claim the oldest ``pending`` job for ``agent_session``, flipping it to
|
|
``running`` atomically under the lock. Returns the job id, or None if no
|
|
pending job matches. This is how each tmux session takes only its own work
|
|
without two sessions grabbing the same job."""
|
|
with registry_lock(registry_dir):
|
|
candidates = []
|
|
for record in _iter_records(registry_dir):
|
|
if record.get("status") == "pending" and record.get("agent_session") == agent_session:
|
|
candidates.append(record)
|
|
if not candidates:
|
|
return None
|
|
candidates.sort(key=lambda r: r.get("created_at", ""))
|
|
chosen = candidates[0]
|
|
chosen["status"] = "running"
|
|
chosen["updated_at"] = _utcnow()
|
|
_atomic_write_record(chosen["job_id"], registry_dir, chosen)
|
|
logger.info("session %s picked job %s", agent_session, chosen["job_id"])
|
|
job_id = chosen["job_id"]
|
|
updated_at = chosen["updated_at"]
|
|
# pick_pending writes the record directly (not via update_job_status), so it
|
|
# mirrors the pending->running transition into the audit log here. Best-effort.
|
|
mqtt_common.update_logged_status(job_id, "running", updated_at=updated_at)
|
|
mqtt_common.append_event(job_id, {
|
|
"event": "status_changed",
|
|
"from": "pending",
|
|
"to": "running",
|
|
"by": agent_session,
|
|
"timestamp": updated_at,
|
|
})
|
|
return job_id
|
|
|
|
|
|
def update_status(job_id: str, registry_dir: str, status: str) -> Dict[str, Any]:
|
|
if status not in VALID_STATUSES:
|
|
raise ValueError(f"invalid status {status!r}; expected one of {VALID_STATUSES}")
|
|
return mqtt_common.update_job_status(job_id, registry_dir, status=status)
|
|
|
|
|
|
def list_jobs(registry_dir: str = DEFAULT_REGISTRY_DIR, status: Optional[str] = None) -> List[Dict[str, Any]]:
|
|
records = list(_iter_records(registry_dir))
|
|
if status:
|
|
records = [r for r in records if r.get("status") == status]
|
|
records.sort(key=lambda r: r.get("created_at", ""))
|
|
return records
|
|
|
|
|
|
def append_event(job_id: str, registry_dir: str, payload: Dict[str, Any]) -> None:
|
|
"""Append one event payload as a JSON line to the job's events log. Best
|
|
effort, debug-only; failures are logged but never raised to the caller."""
|
|
try:
|
|
Path(registry_dir).mkdir(parents=True, exist_ok=True)
|
|
log_path = Path(registry_dir) / f"{job_id}.events.log"
|
|
with open(log_path, "a", encoding="utf-8") as fh:
|
|
fh.write(json.dumps(payload, ensure_ascii=False) + "\n")
|
|
except OSError as exc: # pragma: no cover - best effort
|
|
logger.warning("could not append event for %s: %s", job_id, exc)
|
|
|
|
|
|
# convenience re-export so callers can `from registry import load_job`
|
|
__all__ = [
|
|
"register_job", "pick_pending", "update_status", "load_job",
|
|
"list_jobs", "append_event", "generate_job_id",
|
|
]
|
|
|
|
|
|
def _iter_records(registry_dir: str):
|
|
base = Path(registry_dir)
|
|
if not base.exists():
|
|
return
|
|
for path in sorted(base.glob("*.json")):
|
|
try:
|
|
with open(path, "r", encoding="utf-8") as fh:
|
|
yield json.load(fh)
|
|
except (OSError, json.JSONDecodeError) as exc:
|
|
logger.warning("skipping unreadable record %s: %s", path, exc)
|
|
|
|
|
|
# --------------------------------------------------------------------------
|
|
# CLI (so the bash wrapper can shell out without inline python)
|
|
# --------------------------------------------------------------------------
|
|
def _build_parser() -> argparse.ArgumentParser:
|
|
parser = argparse.ArgumentParser(description="multi-agent-mux-delegate-job registry CLI")
|
|
parser.add_argument("--registry-dir", default=DEFAULT_REGISTRY_DIR)
|
|
sub = parser.add_subparsers(dest="command", required=True)
|
|
|
|
p_reg = sub.add_parser("register", help="create a pending job; prints the job id")
|
|
p_reg.add_argument("--prompt", required=True)
|
|
p_reg.add_argument("--agent", default="claude-code")
|
|
p_reg.add_argument("--agent-session", default="tmux:claude")
|
|
p_reg.add_argument("--timeout", type=int, default=3600)
|
|
p_reg.add_argument("--idle-timeout", type=int, default=120)
|
|
p_reg.add_argument("--bits", type=int, default=32, help="32 (PoC) or 128 (prod)")
|
|
p_reg.add_argument("--artifact", action="append", default=[], dest="artifacts")
|
|
p_reg.add_argument("--auth-token", default=None, help="HMAC auth token for the job (auto-generated if secure broker is detected)")
|
|
|
|
p_list = sub.add_parser("list", help="list jobs (optionally by status)")
|
|
p_list.add_argument("--status", default=None)
|
|
p_list.add_argument("--json", action="store_true")
|
|
|
|
p_get = sub.add_parser("get", help="print one job record as JSON")
|
|
p_get.add_argument("--job", required=True)
|
|
|
|
p_status = sub.add_parser("status", help="set a job status")
|
|
p_status.add_argument("--job", required=True)
|
|
p_status.add_argument("--set", required=True, dest="status")
|
|
|
|
p_pick = sub.add_parser("pick", help="claim a pending job for a session; prints id")
|
|
p_pick.add_argument("--agent-session", default="tmux:claude")
|
|
|
|
p_logs = sub.add_parser(
|
|
"logs",
|
|
help="show the persistent audit log for a job, or --list every logged job",
|
|
)
|
|
p_logs.add_argument("job_id", nargs="?", default=None,
|
|
help="job id whose events.ndjson to print")
|
|
p_logs.add_argument("--list", action="store_true", dest="list_all",
|
|
help="summarise every job under the logs dir instead")
|
|
p_logs.add_argument("--logs-dir", default=None,
|
|
help="override the audit-log root (default: $DELEGATE_JOB_LOGS_DIR "
|
|
"or <cwd>/.mam/delegate_job_logs)")
|
|
p_logs.add_argument("--tail", type=int, default=0,
|
|
help="show only the last N events (0 = all)")
|
|
p_logs.add_argument("--json", action="store_true",
|
|
help="emit raw JSON lines / records instead of a table")
|
|
|
|
return parser
|
|
|
|
|
|
def main(argv: Optional[List[str]] = None) -> int:
|
|
mqtt_common.setup_logging(logging.INFO)
|
|
args = _build_parser().parse_args(argv)
|
|
rd = args.registry_dir
|
|
|
|
if args.command == "register":
|
|
job_id = register_job(
|
|
prompt=args.prompt,
|
|
agent=args.agent,
|
|
agent_session=args.agent_session,
|
|
timeout_sec=args.timeout,
|
|
idle_timeout_sec=args.idle_timeout,
|
|
registry_dir=rd,
|
|
expected_artifacts=args.artifacts,
|
|
bits=args.bits,
|
|
auth_token=args.auth_token,
|
|
)
|
|
print(job_id)
|
|
return 0
|
|
|
|
if args.command == "list":
|
|
records = list_jobs(rd, status=args.status)
|
|
if args.json:
|
|
print(json.dumps(records, ensure_ascii=False, indent=2))
|
|
else:
|
|
if not records:
|
|
print("(no jobs)")
|
|
for r in records:
|
|
print(f"{r['job_id']} {r.get('status','?'):10s} {r.get('agent_session','')}"
|
|
f" {r.get('prompt','')[:48]}")
|
|
return 0
|
|
|
|
if args.command == "get":
|
|
try:
|
|
print(json.dumps(load_job(args.job, rd), ensure_ascii=False, indent=2))
|
|
except FileNotFoundError as exc:
|
|
print(str(exc), file=sys.stderr)
|
|
return 1
|
|
return 0
|
|
|
|
if args.command == "status":
|
|
try:
|
|
update_status(args.job, rd, args.status)
|
|
except (FileNotFoundError, ValueError) as exc:
|
|
print(str(exc), file=sys.stderr)
|
|
return 1
|
|
return 0
|
|
|
|
if args.command == "pick":
|
|
job_id = pick_pending(args.agent_session, rd)
|
|
if job_id is None:
|
|
return 3 # no pending job for this session
|
|
print(job_id)
|
|
return 0
|
|
|
|
if args.command == "logs":
|
|
return _cmd_logs(args)
|
|
|
|
return 1
|
|
|
|
|
|
def _cmd_logs(args) -> int:
|
|
"""Pretty-print one job's events.ndjson, or summarise all logged jobs."""
|
|
logs_dir = args.logs_dir or mqtt_common.LOGS_DIR
|
|
|
|
if args.list_all:
|
|
jobs = mqtt_common.list_logged_jobs(logs_dir)
|
|
if args.json:
|
|
print(json.dumps(jobs, ensure_ascii=False, indent=2))
|
|
return 0
|
|
if not jobs:
|
|
print(f"(no logged jobs under {logs_dir})")
|
|
return 0
|
|
for m in jobs:
|
|
print(f"{m.get('job_id','?')} {m.get('status','?'):10s} "
|
|
f"{m.get('created_at','-'):20s} {(m.get('prompt') or '')[:48]}")
|
|
return 0
|
|
|
|
if not args.job_id:
|
|
print("logs requires a <job_id> or --list", file=sys.stderr)
|
|
return 1
|
|
|
|
events = list(mqtt_common.iter_logged_events(args.job_id, logs_dir))
|
|
if not events and not mqtt_common.job_log_dir(args.job_id, logs_dir).exists():
|
|
print(f"no audit log for job {args.job_id} under {logs_dir}", file=sys.stderr)
|
|
return 1
|
|
if args.tail and args.tail > 0:
|
|
events = events[-args.tail:]
|
|
if args.json:
|
|
for e in events:
|
|
print(json.dumps(e, ensure_ascii=False))
|
|
return 0
|
|
for e in events:
|
|
ts = e.get("logged_at") or e.get("timestamp") or "-"
|
|
extra = e.get("detail") or e.get("to") or e.get("source_event") or ""
|
|
print(f"{ts:24s} {e.get('event','?'):<16s} {extra}")
|
|
return 0
|
|
|
|
|
|
if __name__ == "__main__":
|
|
sys.exit(main())
|