Files

421 lines
16 KiB
Python
Executable File

"""Job registry for the multi-agent-mux-delegate-job skill.
A job record is the single source of truth for one delegated unit of work:
its id, prompt, owning agent session, broker connection, timeouts, and status.
Records live as ``<registry_dir>/<job_id>.json`` with an append-only event log
``<registry_dir>/<job_id>.events.log`` and a shared ``<registry_dir>/.lock``.
Concurrency is handled via the fcntl lock in :mod:`mqtt_common` (PoC). For
multi-host delegation, migrate to SQLite WAL — see references/registry.md.
Importable as a library and runnable as a CLI (``register``/``list``/``get``/
``status``/``pick``) so the ``multi-agent-mux-delegate-job`` bash wrapper can shell out.
"""
from __future__ import annotations
import argparse
import json
import logging
import sys
import uuid
from pathlib import Path
from typing import Any, Dict, List, Optional
import mqtt_common
from mqtt_common import (
DEFAULT_REGISTRY_DIR,
SCHEMA_VERSION,
_atomic_write_record,
_utcnow,
broker_config_from_env,
load_job,
registry_lock,
topic_prefix_for,
)
logger = logging.getLogger("delegate_job.registry")
TERMINAL_STATUSES = ("completed", "error", "cancelled")
VALID_STATUSES = ("pending", "running", "completed", "error", "cancelled")
def generate_job_id(bits: int = 32) -> str:
"""PoC: 32-bit hex (8 chars). Production: 128-bit (full uuid4 hex)."""
if bits >= 128:
return uuid.uuid4().hex
nibbles = max(1, bits // 4)
return uuid.uuid4().hex[:nibbles]
def register_job(
prompt: str,
agent: str = "claude-code",
agent_session: str = "tmux:claude",
broker: Optional[Dict[str, Any]] = None,
timeout_sec: int = 3600,
idle_timeout_sec: int = 120,
registry_dir: str = DEFAULT_REGISTRY_DIR,
job_id: Optional[str] = None,
expected_artifacts: Optional[List[str]] = None,
bits: int = 32,
auth_token: Optional[str] = None,
job_type: str = "direct",
reviewer: Optional[str] = None,
reviewer_session: Optional[str] = None,
max_iterations: int = 5,
) -> str:
"""Create a new ``pending`` job record and return its id.
``broker`` defaults to the current environment's resolved broker block, so
the registry alone is enough for ``publish_event.py`` to connect later.
"""
job_id = job_id or generate_job_id(bits)
if broker is None:
broker = broker_config_from_env().to_registry_block()
if auth_token is None:
# Auto-generate token if secure broker configuration (TLS or username) is detected
if broker.get("tls") or broker.get("username"):
import secrets
auth_token = secrets.token_urlsafe(32)
now = _utcnow()
record: Dict[str, Any] = {
"schema_version": SCHEMA_VERSION,
"job_id": job_id,
"status": "pending",
"created_at": now,
"updated_at": now,
"prompt": prompt,
"agent": agent,
"agent_session": agent_session,
"broker": broker,
"topic_prefix": topic_prefix_for(job_id),
"timeout_sec": int(timeout_sec),
"idle_timeout_sec": int(idle_timeout_sec),
"expected_artifacts": expected_artifacts or [],
"last_seq": 0,
"auth_token": auth_token,
"job_type": job_type,
"reviewer": reviewer,
"reviewer_session": reviewer_session,
"max_iterations": int(max_iterations),
"iteration": 1,
}
with registry_lock(registry_dir):
if mqtt_common._job_path(job_id, registry_dir).exists():
raise FileExistsError(f"job already exists: {job_id}")
_atomic_write_record(job_id, registry_dir, record)
# Seed the persistent audit log (meta.json + status.json + a "registered"
# event). Best-effort inside init_job_log — never blocks registration.
mqtt_common.init_job_log(job_id, meta=record)
logger.info("registered job %s (agent=%s session=%s)", job_id, agent, agent_session)
return job_id
def pick_pending(agent_session: str, registry_dir: str = DEFAULT_REGISTRY_DIR) -> Optional[str]:
"""Claim the oldest ``pending`` job for ``agent_session``, flipping it to
``running`` atomically under the lock. Returns the job id, or None if no
pending job matches. This is how each tmux session takes only its own work
without two sessions grabbing the same job."""
with registry_lock(registry_dir):
candidates = []
for record in _iter_records(registry_dir):
if record.get("status") == "pending" and record.get("agent_session") == agent_session:
candidates.append(record)
if not candidates:
return None
candidates.sort(key=lambda r: r.get("created_at", ""))
chosen = candidates[0]
chosen["status"] = "running"
chosen["updated_at"] = _utcnow()
_atomic_write_record(chosen["job_id"], registry_dir, chosen)
logger.info("session %s picked job %s", agent_session, chosen["job_id"])
job_id = chosen["job_id"]
updated_at = chosen["updated_at"]
# pick_pending writes the record directly (not via update_job_status), so it
# mirrors the pending->running transition into the audit log here. Best-effort.
mqtt_common.update_logged_status(job_id, "running", updated_at=updated_at)
mqtt_common.append_event(job_id, {
"event": "status_changed",
"from": "pending",
"to": "running",
"by": agent_session,
"timestamp": updated_at,
})
return job_id
def update_status(job_id: str, registry_dir: str, status: str) -> Dict[str, Any]:
if status not in VALID_STATUSES:
raise ValueError(f"invalid status {status!r}; expected one of {VALID_STATUSES}")
return mqtt_common.update_job_status(job_id, registry_dir, status=status)
def list_jobs(registry_dir: str = DEFAULT_REGISTRY_DIR, status: Optional[str] = None) -> List[Dict[str, Any]]:
records = list(_iter_records(registry_dir))
if status:
records = [r for r in records if r.get("status") == status]
records.sort(key=lambda r: r.get("created_at", ""))
return records
def append_event(job_id: str, registry_dir: str, payload: Dict[str, Any]) -> None:
"""Append one event payload as a JSON line to the job's events log. Best
effort, debug-only; failures are logged but never raised to the caller."""
try:
Path(registry_dir).mkdir(parents=True, exist_ok=True)
log_path = Path(registry_dir) / f"{job_id}.events.log"
with open(log_path, "a", encoding="utf-8") as fh:
fh.write(json.dumps(payload, ensure_ascii=False) + "\n")
except OSError as exc: # pragma: no cover - best effort
logger.warning("could not append event for %s: %s", job_id, exc)
# convenience re-export so callers can `from registry import load_job`
__all__ = [
"register_job", "pick_pending", "update_status", "load_job",
"list_jobs", "append_event", "generate_job_id", "get_feedback",
]
def _iter_records(registry_dir: str):
base = Path(registry_dir)
if not base.exists():
return
for path in sorted(base.glob("*.json")):
try:
with open(path, "r", encoding="utf-8") as fh:
yield json.load(fh)
except (OSError, json.JSONDecodeError) as exc:
logger.warning("skipping unreadable record %s: %s", path, exc)
def get_feedback(job_id: str, registry_dir: str = DEFAULT_REGISTRY_DIR) -> str:
"""Read the job's audit log or events log and return the detail of the last completed/error event."""
# 1) Try the unified audit log first (ndjson) since it's written synchronously by the subscriber
try:
import mqtt_common
logs_dir = mqtt_common.LOGS_DIR
events = list(mqtt_common.iter_logged_events(job_id, logs_dir))
for e in reversed(events):
if e.get("source_event") in ("completed", "error"):
return e.get("detail", "")
if e.get("event") in ("completed", "error"):
return e.get("detail", "")
except Exception:
pass
# 2) Fallback to local .events.log
log_path = Path(registry_dir) / f"{job_id}.events.log"
if log_path.exists():
feedback = ""
try:
with open(log_path, "r", encoding="utf-8") as fh:
for line in fh:
if not line.strip():
continue
try:
payload = json.loads(line)
if payload.get("event") in ("completed", "error"):
feedback = payload.get("detail", "")
except json.JSONDecodeError:
continue
except OSError:
pass
if feedback:
return feedback
return ""
# --------------------------------------------------------------------------
# CLI (so the bash wrapper can shell out without inline python)
# --------------------------------------------------------------------------
def _build_parser() -> argparse.ArgumentParser:
parser = argparse.ArgumentParser(description="multi-agent-mux-delegate-job registry CLI")
parser.add_argument("--registry-dir", default=DEFAULT_REGISTRY_DIR)
sub = parser.add_subparsers(dest="command", required=True)
p_reg = sub.add_parser("register", help="create a pending job; prints the job id")
p_reg.add_argument("--prompt", required=True)
p_reg.add_argument("--agent", default="claude-code")
p_reg.add_argument("--agent-session", default="tmux:claude")
p_reg.add_argument("--timeout", type=int, default=3600)
p_reg.add_argument("--idle-timeout", type=int, default=120)
p_reg.add_argument("--bits", type=int, default=32, help="32 (PoC) or 128 (prod)")
p_reg.add_argument("--artifact", action="append", default=[], dest="artifacts")
p_reg.add_argument("--auth-token", default=None, help="HMAC auth token for the job (auto-generated if secure broker is detected)")
p_reg.add_argument("--job-type", default="direct", choices=["direct", "loop", "discuss"])
p_reg.add_argument("--reviewer", default=None)
p_reg.add_argument("--reviewer-session", default=None)
p_reg.add_argument("--max-iterations", type=int, default=5)
p_list = sub.add_parser("list", help="list jobs (optionally by status)")
p_list.add_argument("--status", default=None)
p_list.add_argument("--json", action="store_true")
p_get = sub.add_parser("get", help="print one job record as JSON")
p_get.add_argument("--job", required=True)
p_status = sub.add_parser("status", help="set a job status")
p_status.add_argument("--job", required=True)
p_status.add_argument("--set", required=True, dest="status")
p_update = sub.add_parser("update", help="update a job record")
p_update.add_argument("--job", required=True)
p_update.add_argument("--status", default=None)
p_update.add_argument("--agent-session", default=None)
p_update.add_argument("--prompt", default=None)
p_update.add_argument("--iteration", type=int, default=None)
p_feedback = sub.add_parser("get-feedback", help="get the last feedback detail (completed/error) for a job")
p_feedback.add_argument("--job", required=True)
p_pick = sub.add_parser("pick", help="claim a pending job for a session; prints id")
p_pick.add_argument("--agent-session", default="tmux:claude")
p_logs = sub.add_parser(
"logs",
help="show the persistent audit log for a job, or --list every logged job",
)
p_logs.add_argument("job_id", nargs="?", default=None,
help="job id whose events.ndjson to print")
p_logs.add_argument("--list", action="store_true", dest="list_all",
help="summarise every job under the logs dir instead")
p_logs.add_argument("--logs-dir", default=None,
help="override the audit-log root (default: $DELEGATE_JOB_LOGS_DIR "
"or <cwd>/.mam/delegate_job_logs)")
p_logs.add_argument("--tail", type=int, default=0,
help="show only the last N events (0 = all)")
p_logs.add_argument("--json", action="store_true",
help="emit raw JSON lines / records instead of a table")
return parser
def main(argv: Optional[List[str]] = None) -> int:
mqtt_common.setup_logging(logging.INFO)
args = _build_parser().parse_args(argv)
rd = args.registry_dir
if args.command == "register":
job_id = register_job(
prompt=args.prompt,
agent=args.agent,
agent_session=args.agent_session,
timeout_sec=args.timeout,
idle_timeout_sec=args.idle_timeout,
registry_dir=rd,
expected_artifacts=args.artifacts,
bits=args.bits,
auth_token=args.auth_token,
job_type=args.job_type,
reviewer=args.reviewer,
reviewer_session=args.reviewer_session,
max_iterations=args.max_iterations,
)
print(job_id)
return 0
if args.command == "list":
records = list_jobs(rd, status=args.status)
if args.json:
print(json.dumps(records, ensure_ascii=False, indent=2))
else:
if not records:
print("(no jobs)")
for r in records:
print(f"{r['job_id']} {r.get('status','?'):10s} {r.get('agent_session','')}"
f" {r.get('prompt','')[:48]}")
return 0
if args.command == "get":
try:
print(json.dumps(load_job(args.job, rd), ensure_ascii=False, indent=2))
except FileNotFoundError as exc:
print(str(exc), file=sys.stderr)
return 1
return 0
if args.command == "status":
try:
update_status(args.job, rd, args.status)
except (FileNotFoundError, ValueError) as exc:
print(str(exc), file=sys.stderr)
return 1
return 0
if args.command == "update":
fields = {}
if args.status is not None:
fields["status"] = args.status
if args.agent_session is not None:
fields["agent_session"] = args.agent_session
if args.prompt is not None:
fields["prompt"] = args.prompt
if args.iteration is not None:
fields["iteration"] = args.iteration
try:
mqtt_common.update_job_status(args.job, rd, **fields)
except FileNotFoundError as exc:
print(str(exc), file=sys.stderr)
return 1
return 0
if args.command == "get-feedback":
print(get_feedback(args.job, rd))
return 0
if args.command == "pick":
job_id = pick_pending(args.agent_session, rd)
if job_id is None:
return 3 # no pending job for this session
print(job_id)
return 0
if args.command == "logs":
return _cmd_logs(args)
return 1
def _cmd_logs(args) -> int:
"""Pretty-print one job's events.ndjson, or summarise all logged jobs."""
logs_dir = args.logs_dir or mqtt_common.LOGS_DIR
if args.list_all:
jobs = mqtt_common.list_logged_jobs(logs_dir)
if args.json:
print(json.dumps(jobs, ensure_ascii=False, indent=2))
return 0
if not jobs:
print(f"(no logged jobs under {logs_dir})")
return 0
for m in jobs:
print(f"{m.get('job_id','?')} {m.get('status','?'):10s} "
f"{m.get('created_at','-'):20s} {(m.get('prompt') or '')[:48]}")
return 0
if not args.job_id:
print("logs requires a <job_id> or --list", file=sys.stderr)
return 1
events = list(mqtt_common.iter_logged_events(args.job_id, logs_dir))
if not events and not mqtt_common.job_log_dir(args.job_id, logs_dir).exists():
print(f"no audit log for job {args.job_id} under {logs_dir}", file=sys.stderr)
return 1
if args.tail and args.tail > 0:
events = events[-args.tail:]
if args.json:
for e in events:
print(json.dumps(e, ensure_ascii=False))
return 0
for e in events:
ts = e.get("logged_at") or e.get("timestamp") or "-"
extra = e.get("detail") or e.get("to") or e.get("source_event") or ""
print(f"{ts:24s} {e.get('event','?'):<16s} {extra}")
return 0
if __name__ == "__main__":
sys.exit(main())