Files
multi-agent-paper/.agents/skills/multi-agent-mux-delegate-job/scripts/registry.py
T
2026-06-25 12:19:24 +09:00

335 lines
13 KiB
Python
Executable File

"""Job registry for the multi-agent-mux-delegate-job skill.
A job record is the single source of truth for one delegated unit of work:
its id, prompt, owning agent session, broker connection, timeouts, and status.
Records live as ``<registry_dir>/<job_id>.json`` with an append-only event log
``<registry_dir>/<job_id>.events.log`` and a shared ``<registry_dir>/.lock``.
Concurrency is handled via the fcntl lock in :mod:`mqtt_common` (PoC). For
multi-host delegation, migrate to SQLite WAL — see references/registry.md.
Importable as a library and runnable as a CLI (``register``/``list``/``get``/
``status``/``pick``) so the ``multi-agent-mux-delegate-job`` bash wrapper can shell out.
"""
from __future__ import annotations
import argparse
import json
import logging
import sys
import uuid
from pathlib import Path
from typing import Any, Dict, List, Optional
import mqtt_common
from mqtt_common import (
DEFAULT_REGISTRY_DIR,
SCHEMA_VERSION,
_atomic_write_record,
_utcnow,
broker_config_from_env,
load_job,
registry_lock,
topic_prefix_for,
)
logger = logging.getLogger("delegate_job.registry")
TERMINAL_STATUSES = ("completed", "error", "cancelled")
VALID_STATUSES = ("pending", "running", "completed", "error", "cancelled")
def generate_job_id(bits: int = 32) -> str:
"""PoC: 32-bit hex (8 chars). Production: 128-bit (full uuid4 hex)."""
if bits >= 128:
return uuid.uuid4().hex
nibbles = max(1, bits // 4)
return uuid.uuid4().hex[:nibbles]
def register_job(
prompt: str,
agent: str = "claude-code",
agent_session: str = "tmux:claude",
broker: Optional[Dict[str, Any]] = None,
timeout_sec: int = 3600,
idle_timeout_sec: int = 120,
registry_dir: str = DEFAULT_REGISTRY_DIR,
job_id: Optional[str] = None,
expected_artifacts: Optional[List[str]] = None,
bits: int = 32,
auth_token: Optional[str] = None,
) -> str:
"""Create a new ``pending`` job record and return its id.
``broker`` defaults to the current environment's resolved broker block, so
the registry alone is enough for ``publish_event.py`` to connect later.
"""
job_id = job_id or generate_job_id(bits)
if broker is None:
broker = broker_config_from_env().to_registry_block()
if auth_token is None:
# Auto-generate token if secure broker configuration (TLS or username) is detected
if broker.get("tls") or broker.get("username"):
import secrets
auth_token = secrets.token_urlsafe(32)
now = _utcnow()
record: Dict[str, Any] = {
"schema_version": SCHEMA_VERSION,
"job_id": job_id,
"status": "pending",
"created_at": now,
"updated_at": now,
"prompt": prompt,
"agent": agent,
"agent_session": agent_session,
"broker": broker,
"topic_prefix": topic_prefix_for(job_id),
"timeout_sec": int(timeout_sec),
"idle_timeout_sec": int(idle_timeout_sec),
"expected_artifacts": expected_artifacts or [],
"last_seq": 0,
"auth_token": auth_token,
}
with registry_lock(registry_dir):
if mqtt_common._job_path(job_id, registry_dir).exists():
raise FileExistsError(f"job already exists: {job_id}")
_atomic_write_record(job_id, registry_dir, record)
# Seed the persistent audit log (meta.json + status.json + a "registered"
# event). Best-effort inside init_job_log — never blocks registration.
mqtt_common.init_job_log(job_id, meta=record)
logger.info("registered job %s (agent=%s session=%s)", job_id, agent, agent_session)
return job_id
def pick_pending(agent_session: str, registry_dir: str = DEFAULT_REGISTRY_DIR) -> Optional[str]:
"""Claim the oldest ``pending`` job for ``agent_session``, flipping it to
``running`` atomically under the lock. Returns the job id, or None if no
pending job matches. This is how each tmux session takes only its own work
without two sessions grabbing the same job."""
with registry_lock(registry_dir):
candidates = []
for record in _iter_records(registry_dir):
if record.get("status") == "pending" and record.get("agent_session") == agent_session:
candidates.append(record)
if not candidates:
return None
candidates.sort(key=lambda r: r.get("created_at", ""))
chosen = candidates[0]
chosen["status"] = "running"
chosen["updated_at"] = _utcnow()
_atomic_write_record(chosen["job_id"], registry_dir, chosen)
logger.info("session %s picked job %s", agent_session, chosen["job_id"])
job_id = chosen["job_id"]
updated_at = chosen["updated_at"]
# pick_pending writes the record directly (not via update_job_status), so it
# mirrors the pending->running transition into the audit log here. Best-effort.
mqtt_common.update_logged_status(job_id, "running", updated_at=updated_at)
mqtt_common.append_event(job_id, {
"event": "status_changed",
"from": "pending",
"to": "running",
"by": agent_session,
"timestamp": updated_at,
})
return job_id
def update_status(job_id: str, registry_dir: str, status: str) -> Dict[str, Any]:
if status not in VALID_STATUSES:
raise ValueError(f"invalid status {status!r}; expected one of {VALID_STATUSES}")
return mqtt_common.update_job_status(job_id, registry_dir, status=status)
def list_jobs(registry_dir: str = DEFAULT_REGISTRY_DIR, status: Optional[str] = None) -> List[Dict[str, Any]]:
records = list(_iter_records(registry_dir))
if status:
records = [r for r in records if r.get("status") == status]
records.sort(key=lambda r: r.get("created_at", ""))
return records
def append_event(job_id: str, registry_dir: str, payload: Dict[str, Any]) -> None:
"""Append one event payload as a JSON line to the job's events log. Best
effort, debug-only; failures are logged but never raised to the caller."""
try:
Path(registry_dir).mkdir(parents=True, exist_ok=True)
log_path = Path(registry_dir) / f"{job_id}.events.log"
with open(log_path, "a", encoding="utf-8") as fh:
fh.write(json.dumps(payload, ensure_ascii=False) + "\n")
except OSError as exc: # pragma: no cover - best effort
logger.warning("could not append event for %s: %s", job_id, exc)
# convenience re-export so callers can `from registry import load_job`
__all__ = [
"register_job", "pick_pending", "update_status", "load_job",
"list_jobs", "append_event", "generate_job_id",
]
def _iter_records(registry_dir: str):
base = Path(registry_dir)
if not base.exists():
return
for path in sorted(base.glob("*.json")):
try:
with open(path, "r", encoding="utf-8") as fh:
yield json.load(fh)
except (OSError, json.JSONDecodeError) as exc:
logger.warning("skipping unreadable record %s: %s", path, exc)
# --------------------------------------------------------------------------
# CLI (so the bash wrapper can shell out without inline python)
# --------------------------------------------------------------------------
def _build_parser() -> argparse.ArgumentParser:
parser = argparse.ArgumentParser(description="multi-agent-mux-delegate-job registry CLI")
parser.add_argument("--registry-dir", default=DEFAULT_REGISTRY_DIR)
sub = parser.add_subparsers(dest="command", required=True)
p_reg = sub.add_parser("register", help="create a pending job; prints the job id")
p_reg.add_argument("--prompt", required=True)
p_reg.add_argument("--agent", default="claude-code")
p_reg.add_argument("--agent-session", default="tmux:claude")
p_reg.add_argument("--timeout", type=int, default=3600)
p_reg.add_argument("--idle-timeout", type=int, default=120)
p_reg.add_argument("--bits", type=int, default=32, help="32 (PoC) or 128 (prod)")
p_reg.add_argument("--artifact", action="append", default=[], dest="artifacts")
p_reg.add_argument("--auth-token", default=None, help="HMAC auth token for the job (auto-generated if secure broker is detected)")
p_list = sub.add_parser("list", help="list jobs (optionally by status)")
p_list.add_argument("--status", default=None)
p_list.add_argument("--json", action="store_true")
p_get = sub.add_parser("get", help="print one job record as JSON")
p_get.add_argument("--job", required=True)
p_status = sub.add_parser("status", help="set a job status")
p_status.add_argument("--job", required=True)
p_status.add_argument("--set", required=True, dest="status")
p_pick = sub.add_parser("pick", help="claim a pending job for a session; prints id")
p_pick.add_argument("--agent-session", default="tmux:claude")
p_logs = sub.add_parser(
"logs",
help="show the persistent audit log for a job, or --list every logged job",
)
p_logs.add_argument("job_id", nargs="?", default=None,
help="job id whose events.ndjson to print")
p_logs.add_argument("--list", action="store_true", dest="list_all",
help="summarise every job under the logs dir instead")
p_logs.add_argument("--logs-dir", default=None,
help="override the audit-log root (default: $DELEGATE_JOB_LOGS_DIR "
"or <cwd>/.mam/delegate_job_logs)")
p_logs.add_argument("--tail", type=int, default=0,
help="show only the last N events (0 = all)")
p_logs.add_argument("--json", action="store_true",
help="emit raw JSON lines / records instead of a table")
return parser
def main(argv: Optional[List[str]] = None) -> int:
mqtt_common.setup_logging(logging.INFO)
args = _build_parser().parse_args(argv)
rd = args.registry_dir
if args.command == "register":
job_id = register_job(
prompt=args.prompt,
agent=args.agent,
agent_session=args.agent_session,
timeout_sec=args.timeout,
idle_timeout_sec=args.idle_timeout,
registry_dir=rd,
expected_artifacts=args.artifacts,
bits=args.bits,
auth_token=args.auth_token,
)
print(job_id)
return 0
if args.command == "list":
records = list_jobs(rd, status=args.status)
if args.json:
print(json.dumps(records, ensure_ascii=False, indent=2))
else:
if not records:
print("(no jobs)")
for r in records:
print(f"{r['job_id']} {r.get('status','?'):10s} {r.get('agent_session','')}"
f" {r.get('prompt','')[:48]}")
return 0
if args.command == "get":
try:
print(json.dumps(load_job(args.job, rd), ensure_ascii=False, indent=2))
except FileNotFoundError as exc:
print(str(exc), file=sys.stderr)
return 1
return 0
if args.command == "status":
try:
update_status(args.job, rd, args.status)
except (FileNotFoundError, ValueError) as exc:
print(str(exc), file=sys.stderr)
return 1
return 0
if args.command == "pick":
job_id = pick_pending(args.agent_session, rd)
if job_id is None:
return 3 # no pending job for this session
print(job_id)
return 0
if args.command == "logs":
return _cmd_logs(args)
return 1
def _cmd_logs(args) -> int:
"""Pretty-print one job's events.ndjson, or summarise all logged jobs."""
logs_dir = args.logs_dir or mqtt_common.LOGS_DIR
if args.list_all:
jobs = mqtt_common.list_logged_jobs(logs_dir)
if args.json:
print(json.dumps(jobs, ensure_ascii=False, indent=2))
return 0
if not jobs:
print(f"(no logged jobs under {logs_dir})")
return 0
for m in jobs:
print(f"{m.get('job_id','?')} {m.get('status','?'):10s} "
f"{m.get('created_at','-'):20s} {(m.get('prompt') or '')[:48]}")
return 0
if not args.job_id:
print("logs requires a <job_id> or --list", file=sys.stderr)
return 1
events = list(mqtt_common.iter_logged_events(args.job_id, logs_dir))
if not events and not mqtt_common.job_log_dir(args.job_id, logs_dir).exists():
print(f"no audit log for job {args.job_id} under {logs_dir}", file=sys.stderr)
return 1
if args.tail and args.tail > 0:
events = events[-args.tail:]
if args.json:
for e in events:
print(json.dumps(e, ensure_ascii=False))
return 0
for e in events:
ts = e.get("logged_at") or e.get("timestamp") or "-"
extra = e.get("detail") or e.get("to") or e.get("source_event") or ""
print(f"{ts:24s} {e.get('event','?'):<16s} {extra}")
return 0
if __name__ == "__main__":
sys.exit(main())