refactor(skills): rename multi-agent-* + agent-sessions-monitor + delegate-job to tmux-agent-orchestrate-*
Renamed 6 skills directories to tmux-agent-orchestrate-* prefix: - multi-agent-create → tmux-agent-orchestrate-create - multi-agent-resume → tmux-agent-orchestrate-resume - multi-agent-delete → tmux-agent-orchestrate-delete - multi-agent-status → tmux-agent-orchestrate-status - agent-sessions-monitor → tmux-agent-orchestrate-monitor - delegate-job → tmux-agent-orchestrate-delegate-job Updated: - skills/lib.sh internal paths (delegate_submit_job etc.) - skills/tmux-agent-orchestrate-status/scripts/status.sh (monitor path) - skills/tmux-agent-orchestrate-monitor/scripts/reconcile.sh - .gitignore (HTML ignore patterns) - 6 SKILL.md frontmatter (name, related_skills, prereq_skills) and body - All script headers and Korean comments Notes: - tmux session naming convention unchanged (<slug>-creator-<agent>) — workspace identifier based, kept for backward compatibility - Existing 2 sessions in -L multi-agent-canary untouched - YAML delegate_job_id / agent-session (tmux:canary-...) preserved for log history compatibility Verified on isolated server -L agy-rename-test (kill-server after).
This commit is contained in:
@@ -0,0 +1,233 @@
|
||||
#!/usr/bin/env python3
|
||||
"""job_subscriber.py — the single entry point for observing Job events.
|
||||
|
||||
Subscribes to one job's ``<topic_prefix>/events`` (or, with ``--wait-any``, the
|
||||
events of every running/pending job in the registry), prints one line to stdout
|
||||
per accepted event, and exits on a terminal event or a timeout.
|
||||
|
||||
Design points (all flagged in the PLAN review):
|
||||
- terminal state machine: ``completed``/``error`` is acted on exactly once per
|
||||
job, so QoS-1 duplicates or an ``error``-after-``completed`` reorder are safe.
|
||||
- dual timeouts: a wall-clock ``--timeout`` (total budget, started at
|
||||
subscribe time so a cold start can't hang forever) AND an idle
|
||||
``--idle-timeout`` (no new event for N seconds).
|
||||
- defensive parsing: undecodable payloads, ``schema_version`` mismatches, and
|
||||
``job_id`` values we did not subscribe for are logged and dropped.
|
||||
|
||||
stdout = event lines only. Diagnostics go to stderr via logging.
|
||||
|
||||
Exit codes:
|
||||
0 all watched jobs reached ``completed``
|
||||
1 any watched job reached ``error``
|
||||
2 timed out (wall-clock or idle) before all jobs finished
|
||||
"""
|
||||
from __future__ import annotations
|
||||
|
||||
import argparse
|
||||
import json
|
||||
import logging
|
||||
import queue
|
||||
import sys
|
||||
import time
|
||||
from typing import Any, Dict, List, Optional, Set, Tuple
|
||||
|
||||
import mqtt_common
|
||||
import registry
|
||||
from mqtt_common import (
|
||||
DEFAULT_REGISTRY_DIR,
|
||||
SCHEMA_VERSION,
|
||||
broker_config_from_job,
|
||||
load_job,
|
||||
make_client,
|
||||
)
|
||||
|
||||
logger = logging.getLogger("delegate_job.job_subscriber")
|
||||
|
||||
TERMINAL_EVENTS = ("completed", "error")
|
||||
|
||||
|
||||
def _format_line(topic: str, payload: Dict[str, Any]) -> str:
|
||||
return (
|
||||
f"{payload.get('timestamp','-')} "
|
||||
f"job={payload.get('job_id','?')} "
|
||||
f"seq={payload.get('seq','?')} "
|
||||
f"{payload.get('event','?'):<20} "
|
||||
f"{payload.get('detail','')}"
|
||||
)
|
||||
|
||||
|
||||
class _Watcher:
|
||||
"""Holds the shared queue + the set of job_ids we accept events for."""
|
||||
|
||||
def __init__(self, expected_job_ids: Set[str], expected_tokens: Dict[str, Optional[str]]):
|
||||
self.events: "queue.Queue[Tuple[str, Dict[str, Any]]]" = queue.Queue()
|
||||
self.expected = set(expected_job_ids)
|
||||
self.tokens = expected_tokens # job_id -> expected auth_token (or None)
|
||||
|
||||
def on_message(self, _client, _userdata, msg) -> None:
|
||||
# --- defensive parsing -------------------------------------------
|
||||
try:
|
||||
payload = json.loads(msg.payload.decode("utf-8"))
|
||||
except (UnicodeDecodeError, json.JSONDecodeError) as exc:
|
||||
logger.warning("drop unparseable payload on %s: %s", msg.topic, exc)
|
||||
return
|
||||
if not isinstance(payload, dict):
|
||||
logger.warning("drop non-object payload on %s", msg.topic)
|
||||
return
|
||||
if payload.get("schema_version") != SCHEMA_VERSION:
|
||||
logger.warning("drop event with schema_version=%r (expected %d)",
|
||||
payload.get("schema_version"), SCHEMA_VERSION)
|
||||
return
|
||||
jid = payload.get("job_id")
|
||||
if jid not in self.expected:
|
||||
logger.warning("drop event for unexpected job_id=%r on %s", jid, msg.topic)
|
||||
return
|
||||
# --- production auth check: data.auth_token must match if expected ---
|
||||
expected_token = self.tokens.get(jid)
|
||||
if expected_token is not None:
|
||||
got = (payload.get("data") or {}).get("auth_token")
|
||||
if got != expected_token:
|
||||
logger.warning("drop event for job %s: auth_token mismatch", jid)
|
||||
return
|
||||
# Persistent audit log from the *subscriber's* vantage point: every event
|
||||
# that survives defensive parsing is recorded here, including ones a
|
||||
# different host published. This is the external-observer record that
|
||||
# backstops the publisher's own "published" line if it never wrote one.
|
||||
mqtt_common.append_event(jid, {
|
||||
"event": "received",
|
||||
"source_event": payload.get("event"),
|
||||
"seq": payload.get("seq"),
|
||||
"topic": msg.topic,
|
||||
"timestamp": payload.get("timestamp"),
|
||||
"detail": payload.get("detail", ""),
|
||||
})
|
||||
self.events.put((msg.topic, payload))
|
||||
|
||||
|
||||
def _collect_jobs(args) -> List[Dict[str, Any]]:
|
||||
"""Resolve the list of job records this invocation should watch."""
|
||||
if args.wait_any:
|
||||
jobs = [r for r in registry.list_jobs(args.registry_dir)
|
||||
if r.get("status") in ("pending", "running")]
|
||||
if not jobs:
|
||||
logger.error("no pending/running jobs to wait for")
|
||||
return jobs
|
||||
job = load_job(args.job, args.registry_dir) # raises FileNotFoundError
|
||||
return [job]
|
||||
|
||||
|
||||
def main(argv=None) -> int:
|
||||
parser = argparse.ArgumentParser(description="Subscribe to Job events on MQTT")
|
||||
target = parser.add_mutually_exclusive_group(required=True)
|
||||
target.add_argument("--job", help="job id to watch")
|
||||
target.add_argument("--wait-any", action="store_true",
|
||||
help="watch every pending/running job in the registry")
|
||||
parser.add_argument("--timeout", type=float, default=None,
|
||||
help="wall-clock budget in seconds (default: job.timeout_sec or 600)")
|
||||
parser.add_argument("--idle-timeout", type=float, default=None,
|
||||
help="max seconds with no new event (default: job.idle_timeout_sec or 120)")
|
||||
parser.add_argument("--expect-retention", action="store_true",
|
||||
help="warn if no retained terminal event arrives promptly")
|
||||
parser.add_argument("--registry-dir", default=DEFAULT_REGISTRY_DIR)
|
||||
parser.add_argument("-v", "--verbose", action="store_true")
|
||||
args = parser.parse_args(argv)
|
||||
|
||||
mqtt_common.setup_logging(logging.DEBUG if args.verbose else logging.WARNING)
|
||||
|
||||
try:
|
||||
jobs = _collect_jobs(args)
|
||||
except FileNotFoundError as exc:
|
||||
logger.error("%s", exc)
|
||||
return 2
|
||||
if not jobs:
|
||||
return 2
|
||||
|
||||
expected_ids: Set[str] = {j["job_id"] for j in jobs}
|
||||
tokens = {j["job_id"]: j.get("auth_token") for j in jobs}
|
||||
watcher = _Watcher(expected_ids, tokens)
|
||||
|
||||
# Resolve timeouts from CLI, falling back to the (first) job's settings.
|
||||
base_job = jobs[0]
|
||||
wall_timeout = args.timeout if args.timeout is not None else float(base_job.get("timeout_sec", 600))
|
||||
idle_timeout = args.idle_timeout if args.idle_timeout is not None else float(base_job.get("idle_timeout_sec", 120))
|
||||
|
||||
# All watched jobs share a broker in practice; connect using the first
|
||||
# job's broker and subscribe to each job's events topic.
|
||||
config = broker_config_from_job(base_job)
|
||||
client = make_client("subscriber", config)
|
||||
client.on_message = watcher.on_message
|
||||
|
||||
subscribed_topics = []
|
||||
for job in jobs:
|
||||
prefix = job.get("topic_prefix") or mqtt_common.topic_prefix_for(job["job_id"])
|
||||
subscribed_topics.append(f"{prefix}/events")
|
||||
|
||||
def on_connect(_c, _u, _flags, reason_code, _props):
|
||||
if mqtt_common.reason_code_value(reason_code) != 0:
|
||||
logger.error("broker connection failed: rc=%s", reason_code)
|
||||
return
|
||||
for topic in subscribed_topics:
|
||||
_c.subscribe(topic, qos=1)
|
||||
logger.info("subscribed to %s", topic)
|
||||
|
||||
client.on_connect = on_connect
|
||||
client.connect(config.host, config.port, config.keepalive)
|
||||
client.loop_start()
|
||||
|
||||
terminal: Dict[str, str] = {} # job_id -> "completed"/"error"
|
||||
pending: Set[str] = set(expected_ids)
|
||||
start = time.monotonic()
|
||||
wall_deadline = start + wall_timeout
|
||||
last_event = start
|
||||
retention_checked = not args.expect_retention
|
||||
|
||||
try:
|
||||
while pending:
|
||||
now = time.monotonic()
|
||||
if now >= wall_deadline:
|
||||
logger.error("wall-clock timeout (%.0fs); still pending: %s",
|
||||
wall_timeout, ", ".join(sorted(pending)))
|
||||
return 2
|
||||
idle_left = idle_timeout - (now - last_event)
|
||||
if idle_left <= 0:
|
||||
logger.error("idle timeout (%.0fs, no events); still pending: %s",
|
||||
idle_timeout, ", ".join(sorted(pending)))
|
||||
return 2
|
||||
wait = min(wall_deadline - now, idle_left, 1.0)
|
||||
try:
|
||||
topic, payload = watcher.events.get(timeout=wait)
|
||||
except queue.Empty:
|
||||
if not retention_checked and (now - start) > 3.0:
|
||||
logger.warning("--expect-retention set but no retained "
|
||||
"terminal event observed yet")
|
||||
retention_checked = True
|
||||
continue
|
||||
|
||||
last_event = time.monotonic()
|
||||
retention_checked = True
|
||||
print(_format_line(topic, payload), flush=True)
|
||||
|
||||
jid = payload["job_id"]
|
||||
event = payload.get("event")
|
||||
if event in TERMINAL_EVENTS:
|
||||
if jid in terminal:
|
||||
# Already finalised: ignore duplicates / late reorders.
|
||||
logger.info("ignoring duplicate terminal %s for %s", event, jid)
|
||||
continue
|
||||
terminal[jid] = event
|
||||
pending.discard(jid)
|
||||
finally:
|
||||
client.loop_stop()
|
||||
try:
|
||||
client.disconnect()
|
||||
except Exception: # pragma: no cover
|
||||
pass
|
||||
|
||||
# All jobs reached a terminal state. error wins over completed.
|
||||
if any(state == "error" for state in terminal.values()):
|
||||
return 1
|
||||
return 0
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
sys.exit(main())
|
||||
@@ -0,0 +1,550 @@
|
||||
"""Shared MQTT + registry helpers for the tmux-agent-orchestrate-delegate-job skill.
|
||||
|
||||
Single entry point for:
|
||||
- broker configuration (env -> dataclass),
|
||||
- paho client construction (auth + TLS + unique client id),
|
||||
- monotonic per-job sequence counters,
|
||||
- retry-with-exponential-backoff,
|
||||
- atomic registry record load/update under an fcntl lock.
|
||||
|
||||
Requires paho-mqtt >= 2.0 (uses CallbackAPIVersion.VERSION2).
|
||||
|
||||
This module is the *only* place that talks to the broker config and to the
|
||||
raw job record file, so PoC -> production migration touches just env/registry
|
||||
values, never code (see references/mqtt-broker-setup.md).
|
||||
"""
|
||||
from __future__ import annotations
|
||||
|
||||
import functools
|
||||
import json
|
||||
import logging
|
||||
import os
|
||||
import tempfile
|
||||
import time
|
||||
import uuid
|
||||
from contextlib import contextmanager
|
||||
from dataclasses import asdict, dataclass
|
||||
from pathlib import Path
|
||||
from typing import Any, Callable, Dict, Iterable, List, Optional
|
||||
|
||||
import paho.mqtt.client as mqtt
|
||||
|
||||
logger = logging.getLogger("delegate_job.mqtt_common")
|
||||
|
||||
# --------------------------------------------------------------------------
|
||||
# Constants
|
||||
# --------------------------------------------------------------------------
|
||||
SCHEMA_VERSION = 1
|
||||
DEFAULT_REGISTRY_DIR = ".hermes/jobs"
|
||||
DEFAULT_TOPIC_ROOT = "python/mqtt/jobs"
|
||||
LOCK_FILENAME = ".lock"
|
||||
|
||||
# Persistent audit-log layout: .hermes/delegate_job_logs/<job_id>/{meta,events,status}.
|
||||
# This is a *separate* artifact from the registry: the registry is the live job
|
||||
# record (mutated in place), the audit log is an append-only history that
|
||||
# survives even if the registry dir is cleaned up.
|
||||
META_FILENAME = "meta.json"
|
||||
EVENTS_FILENAME = "events.ndjson"
|
||||
STATUS_FILENAME = "status.json"
|
||||
|
||||
|
||||
def _default_logs_dir() -> str:
|
||||
"""Audit-log root. Overridable with ``DELEGATE_JOB_LOGS_DIR``; otherwise
|
||||
``<cwd>/.hermes/delegate_job_logs`` — we keep audit logs next to the
|
||||
live registry (``.hermes/jobs/``) so the two runtime artifacts sit
|
||||
under the same parent dir and follow the same ``.gitignore`` rule.
|
||||
The cwd of whichever process emits events (the bash wrapper and
|
||||
scripts) is used as the anchor."""
|
||||
env = os.environ.get("DELEGATE_JOB_LOGS_DIR")
|
||||
if env and env.strip():
|
||||
return env
|
||||
return os.path.join(os.getcwd(), ".hermes", "delegate_job_logs")
|
||||
|
||||
|
||||
LOGS_DIR = _default_logs_dir()
|
||||
|
||||
|
||||
# --------------------------------------------------------------------------
|
||||
# Broker configuration
|
||||
# --------------------------------------------------------------------------
|
||||
@dataclass
|
||||
class BrokerConfig:
|
||||
"""Resolved broker connection settings.
|
||||
|
||||
PoC defaults target the public HiveMQ broker. Production overrides arrive
|
||||
either from environment variables or from a job record's ``broker.*`` block
|
||||
(see ``broker_config_from_job``).
|
||||
"""
|
||||
|
||||
host: str = "broker.hivemq.com"
|
||||
port: int = 1883
|
||||
tls: bool = False
|
||||
username: Optional[str] = None
|
||||
password: Optional[str] = None
|
||||
client_id_prefix: str = "hermes"
|
||||
# TLS material (only consulted when tls is True).
|
||||
ca_certs: Optional[str] = None
|
||||
certfile: Optional[str] = None
|
||||
keyfile: Optional[str] = None
|
||||
keepalive: int = 60
|
||||
|
||||
def to_dict(self) -> Dict[str, Any]:
|
||||
return asdict(self)
|
||||
|
||||
def to_registry_block(self) -> Dict[str, Any]:
|
||||
"""The subset that gets persisted into a job record's broker block."""
|
||||
return {
|
||||
"host": self.host,
|
||||
"port": self.port,
|
||||
"tls": self.tls,
|
||||
"username": self.username,
|
||||
"password": self.password,
|
||||
}
|
||||
|
||||
|
||||
def _env_bool(name: str, default: bool = False) -> bool:
|
||||
raw = os.environ.get(name)
|
||||
if raw is None:
|
||||
return default
|
||||
return raw.strip().lower() in ("1", "true", "yes", "on")
|
||||
|
||||
|
||||
def _env_int(name: str, default: int) -> int:
|
||||
raw = os.environ.get(name)
|
||||
if raw is None or raw.strip() == "":
|
||||
return default
|
||||
try:
|
||||
return int(raw)
|
||||
except ValueError:
|
||||
logger.warning("invalid int for %s=%r; using default %d", name, raw, default)
|
||||
return default
|
||||
|
||||
|
||||
def broker_config_from_env(overrides: Optional[Dict[str, Any]] = None) -> BrokerConfig:
|
||||
"""Build a :class:`BrokerConfig` from environment variables.
|
||||
|
||||
Recognised vars (all optional, PoC defaults shown):
|
||||
MQTT_BROKER (broker.hivemq.com), MQTT_PORT (1883), MQTT_TLS (0),
|
||||
MQTT_USERNAME, MQTT_PASSWORD, MQTT_CLIENT_ID_PREFIX (hermes),
|
||||
MQTT_CA_CERTS, MQTT_CERTFILE, MQTT_KEYFILE, MQTT_KEEPALIVE (60).
|
||||
|
||||
``overrides`` (e.g. a job record's broker block) wins over the env values
|
||||
for any key it specifies with a non-None value.
|
||||
"""
|
||||
cfg = BrokerConfig(
|
||||
host=os.environ.get("MQTT_BROKER", "broker.hivemq.com"),
|
||||
port=_env_int("MQTT_PORT", 1883),
|
||||
tls=_env_bool("MQTT_TLS", False),
|
||||
username=os.environ.get("MQTT_USERNAME") or None,
|
||||
password=os.environ.get("MQTT_PASSWORD") or None,
|
||||
client_id_prefix=os.environ.get("MQTT_CLIENT_ID_PREFIX", "hermes"),
|
||||
ca_certs=os.environ.get("MQTT_CA_CERTS") or None,
|
||||
certfile=os.environ.get("MQTT_CERTFILE") or None,
|
||||
keyfile=os.environ.get("MQTT_KEYFILE") or None,
|
||||
keepalive=_env_int("MQTT_KEEPALIVE", 60),
|
||||
)
|
||||
if overrides:
|
||||
for key, value in overrides.items():
|
||||
if value is not None and hasattr(cfg, key):
|
||||
setattr(cfg, key, value)
|
||||
return cfg
|
||||
|
||||
|
||||
def broker_config_from_job(job: Dict[str, Any]) -> BrokerConfig:
|
||||
"""Resolve broker config for a job: env defaults, then the job's broker.*
|
||||
block overrides. This lets ``publish_event.py`` connect from the registry
|
||||
alone, while still honouring environment toggles (e.g. MQTT_TLS=1)."""
|
||||
return broker_config_from_env(overrides=job.get("broker") or {})
|
||||
|
||||
|
||||
def make_client(role: str, config: Optional[BrokerConfig] = None) -> mqtt.Client:
|
||||
"""Return a configured paho ``Client`` (not yet connected).
|
||||
|
||||
The client id is ``f"{prefix}-{role}-{uuid8}"`` so concurrent publishers /
|
||||
subscribers never collide on the broker. Auth and TLS are applied when the
|
||||
config supplies them.
|
||||
"""
|
||||
config = config or broker_config_from_env()
|
||||
client_id = f"{config.client_id_prefix}-{role}-{uuid.uuid4().hex[:8]}"
|
||||
client = mqtt.Client(
|
||||
callback_api_version=mqtt.CallbackAPIVersion.VERSION2,
|
||||
client_id=client_id,
|
||||
)
|
||||
if config.username:
|
||||
client.username_pw_set(config.username, config.password)
|
||||
if config.tls:
|
||||
# If ca_certs is None paho uses the system trust store (good enough for
|
||||
# public CAs); a private CA bundle path is passed through unchanged.
|
||||
client.tls_set(
|
||||
ca_certs=config.ca_certs,
|
||||
certfile=config.certfile,
|
||||
keyfile=config.keyfile,
|
||||
)
|
||||
logger.debug("built client id=%s tls=%s host=%s", client_id, config.tls, config.host)
|
||||
return client
|
||||
|
||||
|
||||
def reason_code_value(rc: Any) -> int:
|
||||
"""Normalise a paho v2 connect reason code to an int.
|
||||
|
||||
paho-mqtt 2.x hands callbacks a ``ReasonCode`` object (not an int); older
|
||||
paths may pass a plain int. ``ReasonCode`` exposes ``.value``; 0 == success.
|
||||
"""
|
||||
return int(getattr(rc, "value", rc))
|
||||
|
||||
|
||||
def topic_prefix_for(job_id: str, root: str = DEFAULT_TOPIC_ROOT) -> str:
|
||||
return f"{root}/{job_id}"
|
||||
|
||||
|
||||
def events_topic_for(job_id: str, root: str = DEFAULT_TOPIC_ROOT) -> str:
|
||||
return f"{topic_prefix_for(job_id, root)}/events"
|
||||
|
||||
|
||||
# --------------------------------------------------------------------------
|
||||
# Registry primitives (single source of truth for raw record I/O)
|
||||
# --------------------------------------------------------------------------
|
||||
def _job_path(job_id: str, registry_dir: str) -> Path:
|
||||
return Path(registry_dir) / f"{job_id}.json"
|
||||
|
||||
|
||||
def _lock_path(registry_dir: str) -> Path:
|
||||
return Path(registry_dir) / LOCK_FILENAME
|
||||
|
||||
|
||||
@contextmanager
|
||||
def registry_lock(registry_dir: str):
|
||||
"""Advisory exclusive lock over the whole registry dir via fcntl.
|
||||
|
||||
PoC-grade single-host concurrency control. Multiple tmux sessions / scripts
|
||||
serialise their read-modify-write of job records through this lock so two
|
||||
sessions never claim the same pending job. For multi-host delegation move
|
||||
to SQLite WAL (see references/registry.md)."""
|
||||
import fcntl # POSIX only; imported lazily so import works on Windows.
|
||||
|
||||
Path(registry_dir).mkdir(parents=True, exist_ok=True)
|
||||
lock_file = _lock_path(registry_dir)
|
||||
fh = open(lock_file, "a+")
|
||||
try:
|
||||
fcntl.flock(fh.fileno(), fcntl.LOCK_EX)
|
||||
yield
|
||||
finally:
|
||||
try:
|
||||
fcntl.flock(fh.fileno(), fcntl.LOCK_UN)
|
||||
finally:
|
||||
fh.close()
|
||||
|
||||
|
||||
def load_job(job_id: str, registry_dir: str = DEFAULT_REGISTRY_DIR) -> Dict[str, Any]:
|
||||
"""Load and parse a job record. Raises FileNotFoundError if absent."""
|
||||
path = _job_path(job_id, registry_dir)
|
||||
if not path.exists():
|
||||
raise FileNotFoundError(f"job record not found: {path}")
|
||||
with open(path, "r", encoding="utf-8") as fh:
|
||||
return json.load(fh)
|
||||
|
||||
|
||||
def _atomic_write_record(job_id: str, registry_dir: str, record: Dict[str, Any]) -> None:
|
||||
"""Write a record atomically: temp file in the same dir + os.replace.
|
||||
|
||||
The rename is atomic on POSIX, so readers never observe a half-written
|
||||
file. Callers MUST already hold ``registry_lock`` for read-modify-write
|
||||
correctness."""
|
||||
Path(registry_dir).mkdir(parents=True, exist_ok=True)
|
||||
path = _job_path(job_id, registry_dir)
|
||||
fd, tmp = tempfile.mkstemp(dir=str(path.parent), prefix=f".{job_id}.", suffix=".tmp")
|
||||
try:
|
||||
with os.fdopen(fd, "w", encoding="utf-8") as fh:
|
||||
json.dump(record, fh, ensure_ascii=False, indent=2)
|
||||
fh.write("\n")
|
||||
fh.flush()
|
||||
os.fsync(fh.fileno())
|
||||
os.replace(tmp, path)
|
||||
try:
|
||||
os.chmod(path, 0o600)
|
||||
except Exception:
|
||||
pass
|
||||
except BaseException:
|
||||
if os.path.exists(tmp):
|
||||
os.unlink(tmp)
|
||||
raise
|
||||
|
||||
|
||||
def update_job_status(job_id: str, registry_dir: str = DEFAULT_REGISTRY_DIR, **fields: Any) -> Dict[str, Any]:
|
||||
"""Atomically merge ``fields`` into a job record under the registry lock.
|
||||
|
||||
Always refreshes ``updated_at``. Returns the new record. Raises
|
||||
FileNotFoundError if the job does not exist.
|
||||
|
||||
This is the single chokepoint for status writes (both ``registry.update_status``
|
||||
and ``publish_event.py``'s status sync route through here), so it also mirrors
|
||||
any ``status`` change into the persistent audit log — best-effort, after the
|
||||
registry lock is released so a slow/failed log write never blocks the record."""
|
||||
with registry_lock(registry_dir):
|
||||
record = load_job(job_id, registry_dir)
|
||||
old_status = record.get("status")
|
||||
record.update(fields)
|
||||
record["updated_at"] = _utcnow()
|
||||
_atomic_write_record(job_id, registry_dir, record)
|
||||
if "status" in fields:
|
||||
new_status = record.get("status")
|
||||
update_logged_status(job_id, new_status, updated_at=record["updated_at"])
|
||||
if old_status != new_status:
|
||||
append_event(job_id, {
|
||||
"event": "status_changed",
|
||||
"from": old_status,
|
||||
"to": new_status,
|
||||
"timestamp": record["updated_at"],
|
||||
})
|
||||
return record
|
||||
|
||||
|
||||
def next_seq(job_id: str, registry_dir: str = DEFAULT_REGISTRY_DIR) -> int:
|
||||
"""Return the next monotonic sequence number for a job, persisted in the
|
||||
record's ``last_seq`` field so it stays consistent across process restarts.
|
||||
First call returns 1."""
|
||||
with registry_lock(registry_dir):
|
||||
record = load_job(job_id, registry_dir)
|
||||
seq = int(record.get("last_seq", 0)) + 1
|
||||
record["last_seq"] = seq
|
||||
record["updated_at"] = _utcnow()
|
||||
_atomic_write_record(job_id, registry_dir, record)
|
||||
return seq
|
||||
|
||||
|
||||
def _utcnow() -> str:
|
||||
"""ISO-8601 UTC timestamp with trailing Z (payload `timestamp` field)."""
|
||||
return time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime())
|
||||
|
||||
|
||||
def _utcnow_precise() -> str:
|
||||
"""ISO-8601 UTC timestamp with millisecond resolution. Used for the audit
|
||||
log's ``logged_at`` so events sort cleanly even within the same second."""
|
||||
now = time.time()
|
||||
base = time.strftime("%Y-%m-%dT%H:%M:%S", time.gmtime(now))
|
||||
return f"{base}.{int((now % 1) * 1000):03d}Z"
|
||||
|
||||
|
||||
# --------------------------------------------------------------------------
|
||||
# Persistent audit log (.hermes/delegate_job_logs/<job_id>/...)
|
||||
#
|
||||
# Every function here is idempotent, concurrency-safe, and *best-effort*: a
|
||||
# logging failure is swallowed with a logger.warning and never propagated, so it
|
||||
# can never break a publish, a subscribe, or a registry write. stdout is never
|
||||
# touched (it is reserved for data output).
|
||||
# --------------------------------------------------------------------------
|
||||
def job_log_dir(job_id: str, logs_dir: Optional[str] = None) -> Path:
|
||||
return Path(logs_dir or LOGS_DIR) / job_id
|
||||
|
||||
|
||||
def job_log_path(job_id: str, kind: str, logs_dir: Optional[str] = None) -> Path:
|
||||
"""Path to one audit-log file for a job. ``kind`` is a filename, e.g. the
|
||||
module constants META_FILENAME / EVENTS_FILENAME / STATUS_FILENAME."""
|
||||
return job_log_dir(job_id, logs_dir) / kind
|
||||
|
||||
|
||||
@contextmanager
|
||||
def _file_lock(fh):
|
||||
"""Best-effort exclusive lock over a single open file via fcntl, so two
|
||||
processes appending to events.ndjson never interleave a line. A no-op where
|
||||
fcntl is unavailable (Windows); a short append is atomic enough there."""
|
||||
try:
|
||||
import fcntl
|
||||
except ImportError: # pragma: no cover - non-POSIX
|
||||
yield
|
||||
return
|
||||
fcntl.flock(fh.fileno(), fcntl.LOCK_EX)
|
||||
try:
|
||||
yield
|
||||
finally:
|
||||
fcntl.flock(fh.fileno(), fcntl.LOCK_UN)
|
||||
|
||||
|
||||
def append_event(job_id: str, event_dict: Dict[str, Any], logs_dir: Optional[str] = None) -> None:
|
||||
"""Append one event as a JSON line to ``<logs>/<job_id>/events.ndjson``.
|
||||
|
||||
Concurrency-safe (fcntl lock over the file) and best-effort. A millisecond
|
||||
``logged_at`` is stamped when the caller did not supply one."""
|
||||
try:
|
||||
path = job_log_path(job_id, EVENTS_FILENAME, logs_dir)
|
||||
path.parent.mkdir(parents=True, exist_ok=True)
|
||||
record = dict(event_dict)
|
||||
record.setdefault("logged_at", _utcnow_precise())
|
||||
line = json.dumps(record, ensure_ascii=False) + "\n"
|
||||
with open(path, "a", encoding="utf-8") as fh:
|
||||
with _file_lock(fh):
|
||||
fh.write(line)
|
||||
fh.flush()
|
||||
except Exception as exc: # pragma: no cover - best effort
|
||||
logger.warning("append_event failed for job %s: %s", job_id, exc)
|
||||
|
||||
|
||||
def update_logged_status(job_id: str, status: str, logs_dir: Optional[str] = None, **extras: Any) -> None:
|
||||
"""Rewrite ``<logs>/<job_id>/status.json`` (current status for fast point
|
||||
queries) atomically. Best-effort; merges any ``extras``."""
|
||||
try:
|
||||
path = job_log_path(job_id, STATUS_FILENAME, logs_dir)
|
||||
path.parent.mkdir(parents=True, exist_ok=True)
|
||||
record: Dict[str, Any] = {"job_id": job_id, "status": status, "updated_at": _utcnow()}
|
||||
record.update(extras)
|
||||
tmp = path.with_name(path.name + ".tmp")
|
||||
with open(tmp, "w", encoding="utf-8") as fh:
|
||||
json.dump(record, fh, ensure_ascii=False, indent=2)
|
||||
fh.write("\n")
|
||||
os.replace(tmp, path)
|
||||
except Exception as exc: # pragma: no cover - best effort
|
||||
logger.warning("update_logged_status failed for job %s: %s", job_id, exc)
|
||||
|
||||
|
||||
def init_job_log(job_id: str, meta: Dict[str, Any], logs_dir: Optional[str] = None) -> None:
|
||||
"""Seed the per-job audit-log dir: write meta.json, status.json, and a first
|
||||
``registered`` line in events.ndjson. Idempotent (the ``registered`` line is
|
||||
written only when events.ndjson does not yet exist) and best-effort."""
|
||||
try:
|
||||
d = job_log_dir(job_id, logs_dir)
|
||||
d.mkdir(parents=True, exist_ok=True)
|
||||
with open(d / META_FILENAME, "w", encoding="utf-8") as fh:
|
||||
json.dump(meta, fh, ensure_ascii=False, indent=2)
|
||||
fh.write("\n")
|
||||
status = meta.get("status", "pending")
|
||||
update_logged_status(
|
||||
job_id, status, logs_dir=logs_dir,
|
||||
created_at=meta.get("created_at"), prompt=meta.get("prompt"),
|
||||
)
|
||||
events_path = d / EVENTS_FILENAME
|
||||
first_time = not events_path.exists()
|
||||
events_path.touch(exist_ok=True)
|
||||
if first_time:
|
||||
append_event(job_id, {
|
||||
"event": "registered",
|
||||
"status": status,
|
||||
"agent": meta.get("agent"),
|
||||
"agent_session": meta.get("agent_session"),
|
||||
"topic_prefix": meta.get("topic_prefix"),
|
||||
"timestamp": meta.get("created_at"),
|
||||
}, logs_dir=logs_dir)
|
||||
except Exception as exc: # pragma: no cover - best effort
|
||||
logger.warning("init_job_log failed for job %s: %s", job_id, exc)
|
||||
|
||||
|
||||
def read_logged_meta(job_id: str, logs_dir: Optional[str] = None) -> Optional[Dict[str, Any]]:
|
||||
"""Return a job's audit meta.json (registration snapshot), or None."""
|
||||
try:
|
||||
with open(job_log_path(job_id, META_FILENAME, logs_dir), "r", encoding="utf-8") as fh:
|
||||
return json.load(fh)
|
||||
except (OSError, json.JSONDecodeError):
|
||||
return None
|
||||
|
||||
|
||||
def read_logged_status(job_id: str, logs_dir: Optional[str] = None) -> Optional[Dict[str, Any]]:
|
||||
"""Return a job's current status.json, or None. This is the fast point-query
|
||||
file (current status only), separate from the registration-time meta.json."""
|
||||
try:
|
||||
with open(job_log_path(job_id, STATUS_FILENAME, logs_dir), "r", encoding="utf-8") as fh:
|
||||
return json.load(fh)
|
||||
except (OSError, json.JSONDecodeError):
|
||||
return None
|
||||
|
||||
|
||||
def iter_logged_events(job_id: str, logs_dir: Optional[str] = None):
|
||||
"""Yield each parsed event from a job's events.ndjson in file (time) order.
|
||||
Malformed lines are skipped with a warning."""
|
||||
path = job_log_path(job_id, EVENTS_FILENAME, logs_dir)
|
||||
if not path.exists():
|
||||
return
|
||||
with open(path, "r", encoding="utf-8") as fh:
|
||||
for line in fh:
|
||||
line = line.strip()
|
||||
if not line:
|
||||
continue
|
||||
try:
|
||||
yield json.loads(line)
|
||||
except json.JSONDecodeError:
|
||||
logger.warning("skipping malformed audit line in %s", path)
|
||||
|
||||
|
||||
def list_logged_jobs(logs_dir: Optional[str] = None) -> List[Dict[str, Any]]:
|
||||
"""Return one meta record per job directory under the logs root, oldest
|
||||
first. Falls back to ``{"job_id": <dir>}`` when meta.json is missing."""
|
||||
base = Path(logs_dir or LOGS_DIR)
|
||||
out: List[Dict[str, Any]] = []
|
||||
if not base.exists():
|
||||
return out
|
||||
for d in sorted(base.iterdir()):
|
||||
if not d.is_dir():
|
||||
continue
|
||||
meta = read_logged_meta(d.name, logs_dir) or {"job_id": d.name}
|
||||
# Overlay the live status.json so the summary reflects current state, not
|
||||
# the registration-time snapshot frozen in meta.json.
|
||||
status = read_logged_status(d.name, logs_dir)
|
||||
if status:
|
||||
meta = {**meta,
|
||||
"status": status.get("status", meta.get("status")),
|
||||
"updated_at": status.get("updated_at", meta.get("updated_at"))}
|
||||
out.append(meta)
|
||||
out.sort(key=lambda m: m.get("created_at") or "")
|
||||
return out
|
||||
|
||||
|
||||
# --------------------------------------------------------------------------
|
||||
# Retry helper
|
||||
# --------------------------------------------------------------------------
|
||||
def with_retry(
|
||||
fn: Optional[Callable] = None,
|
||||
*,
|
||||
attempts: int = 3,
|
||||
base_delay: float = 0.5,
|
||||
factor: float = 2.0,
|
||||
max_delay: float = 8.0,
|
||||
exceptions: Iterable[type] = (Exception,),
|
||||
) -> Callable:
|
||||
"""Retry ``fn`` with exponential backoff.
|
||||
|
||||
Usable two ways::
|
||||
|
||||
result = with_retry(do_publish, attempts=3)() # wrap-and-call
|
||||
@with_retry(attempts=5, base_delay=1.0) # decorator
|
||||
def do_publish(): ...
|
||||
|
||||
Re-raises the last exception once ``attempts`` is exhausted.
|
||||
"""
|
||||
exc_tuple = tuple(exceptions)
|
||||
|
||||
def decorate(func: Callable) -> Callable:
|
||||
@functools.wraps(func)
|
||||
def wrapper(*args: Any, **kwargs: Any) -> Any:
|
||||
delay = base_delay
|
||||
last_exc: Optional[BaseException] = None
|
||||
for attempt in range(1, attempts + 1):
|
||||
try:
|
||||
return func(*args, **kwargs)
|
||||
except exc_tuple as exc:
|
||||
last_exc = exc
|
||||
if attempt >= attempts:
|
||||
break
|
||||
logger.warning(
|
||||
"attempt %d/%d failed: %s; retrying in %.1fs",
|
||||
attempt, attempts, exc, delay,
|
||||
)
|
||||
time.sleep(delay)
|
||||
delay = min(delay * factor, max_delay)
|
||||
assert last_exc is not None
|
||||
raise last_exc
|
||||
|
||||
return wrapper
|
||||
|
||||
if fn is not None:
|
||||
return decorate(fn)
|
||||
return decorate
|
||||
|
||||
|
||||
def setup_logging(level: int = logging.WARNING) -> None:
|
||||
"""Configure root logging to stderr. stdout is reserved for data output
|
||||
(subscriber event lines, registry ids)."""
|
||||
import sys
|
||||
|
||||
logging.basicConfig(
|
||||
level=level,
|
||||
stream=sys.stderr,
|
||||
format="%(asctime)s %(levelname)s %(name)s: %(message)s",
|
||||
)
|
||||
@@ -0,0 +1,225 @@
|
||||
#!/usr/bin/env python3
|
||||
"""publish_event.py — the single entry point for emitting a Job event.
|
||||
|
||||
Loads the job record from the registry, resolves its broker, assigns the next
|
||||
monotonic ``seq``, builds the schema-v1 JSON payload, and publishes it to
|
||||
``<topic_prefix>/events`` over QoS 1 with exponential-backoff retry.
|
||||
|
||||
Silent by design: nothing is printed to stdout. Diagnostics go to stderr via
|
||||
logging. Terminal events (``completed``/``error``) publish with retain=True so
|
||||
a late subscriber still observes the final state (production hardening).
|
||||
|
||||
Exit codes:
|
||||
0 published successfully
|
||||
1 parameter / registry error (bad args, unknown job, no pending job)
|
||||
2 publish failed after retries (network / broker / ACK timeout)
|
||||
|
||||
Usage:
|
||||
publish_event.py --job <id> --event started [--detail "..."] [--data '{...}']
|
||||
publish_event.py --pick-pending --agent-session tmux:claude --event completed
|
||||
publish_event.py --job <id> --event completed --retained
|
||||
"""
|
||||
from __future__ import annotations
|
||||
|
||||
import argparse
|
||||
import json
|
||||
import logging
|
||||
import sys
|
||||
import time
|
||||
from typing import Any, Dict, Optional
|
||||
|
||||
import mqtt_common
|
||||
import registry
|
||||
from mqtt_common import (
|
||||
DEFAULT_REGISTRY_DIR,
|
||||
SCHEMA_VERSION,
|
||||
broker_config_from_job,
|
||||
events_topic_for,
|
||||
load_job,
|
||||
make_client,
|
||||
next_seq,
|
||||
with_retry,
|
||||
)
|
||||
|
||||
logger = logging.getLogger("delegate_job.publish_event")
|
||||
|
||||
VALID_EVENTS = ("started", "permission_required", "progress", "completed", "error")
|
||||
TERMINAL_EVENTS = ("completed", "error")
|
||||
# event -> registry status to sync as a best-effort side effect
|
||||
EVENT_TO_STATUS = {
|
||||
"started": "running",
|
||||
"completed": "completed",
|
||||
"error": "error",
|
||||
}
|
||||
|
||||
CONNECT_ACK_TIMEOUT = 10 # seconds to wait for CONNACK
|
||||
PUBLISH_ACK_TIMEOUT = 5 # seconds to wait for QoS-1 PUBACK
|
||||
|
||||
|
||||
def build_payload(
|
||||
job_id: str,
|
||||
seq: int,
|
||||
event: str,
|
||||
detail: str,
|
||||
data: Optional[Dict[str, Any]],
|
||||
auth_token: Optional[str],
|
||||
) -> Dict[str, Any]:
|
||||
payload: Dict[str, Any] = {
|
||||
"schema_version": SCHEMA_VERSION,
|
||||
"seq": seq,
|
||||
"job_id": job_id,
|
||||
"event": event,
|
||||
"timestamp": mqtt_common._utcnow(),
|
||||
"detail": detail,
|
||||
"data": dict(data) if data else {},
|
||||
}
|
||||
# Production: carry the per-job auth token so the subscriber can verify the
|
||||
# publisher. The token is compared in plain text (bearer-token style) by the
|
||||
# subscriber — NOT an HMAC. See SKILL.md "Auth token" and PLAN 8.2. The
|
||||
# registry stores the per-job token in `auth_token`; only include it on the
|
||||
# wire when set so the public broker (no auth) doesn't leak anything.
|
||||
if auth_token:
|
||||
payload["data"]["auth_token"] = auth_token
|
||||
return payload
|
||||
|
||||
|
||||
def _publish_once(config, topic: str, body: bytes, retain: bool) -> None:
|
||||
"""Connect, publish one QoS-1 message, wait for the broker ACK, disconnect.
|
||||
|
||||
Raises on any failure so ``with_retry`` can re-run the whole sequence (a
|
||||
fresh connection per attempt is the robust choice for a PoC)."""
|
||||
client = make_client("publisher", config)
|
||||
connected = {"rc": None}
|
||||
|
||||
def on_connect(_c, _u, _flags, reason_code, _props):
|
||||
connected["rc"] = reason_code
|
||||
|
||||
client.on_connect = on_connect
|
||||
client.connect(config.host, config.port, config.keepalive)
|
||||
client.loop_start()
|
||||
try:
|
||||
# Wait for CONNACK so we fail fast on auth/TLS errors.
|
||||
deadline = time.monotonic() + CONNECT_ACK_TIMEOUT
|
||||
while connected["rc"] is None and time.monotonic() < deadline:
|
||||
time.sleep(0.05)
|
||||
if connected["rc"] is None:
|
||||
raise TimeoutError("no CONNACK from broker")
|
||||
if mqtt_common.reason_code_value(connected["rc"]) != 0:
|
||||
raise ConnectionError(f"broker refused connection: rc={connected['rc']}")
|
||||
|
||||
info = client.publish(topic, payload=body, qos=1, retain=retain)
|
||||
info.wait_for_publish(timeout=PUBLISH_ACK_TIMEOUT)
|
||||
if not info.is_published():
|
||||
raise TimeoutError("publish not acknowledged within timeout")
|
||||
finally:
|
||||
client.loop_stop()
|
||||
try:
|
||||
client.disconnect()
|
||||
except Exception: # pragma: no cover - disconnect best effort
|
||||
pass
|
||||
|
||||
|
||||
def _resolve_job_id(args) -> Optional[str]:
|
||||
if args.pick_pending:
|
||||
return registry.pick_pending(args.agent_session, args.registry_dir)
|
||||
return args.job
|
||||
|
||||
|
||||
def main(argv=None) -> int:
|
||||
parser = argparse.ArgumentParser(description="Publish a Job event to MQTT")
|
||||
target = parser.add_mutually_exclusive_group(required=True)
|
||||
target.add_argument("--job", help="job id to publish for")
|
||||
target.add_argument("--pick-pending", action="store_true",
|
||||
help="auto-select a pending job for --agent-session")
|
||||
parser.add_argument("--agent-session", default="tmux:claude",
|
||||
help="session label used with --pick-pending")
|
||||
parser.add_argument("--event", default="progress", choices=VALID_EVENTS)
|
||||
parser.add_argument("--detail", default="")
|
||||
parser.add_argument("--data", default=None, help="optional JSON object string")
|
||||
parser.add_argument("--retained", action="store_true",
|
||||
help="force retain=True (auto for completed/error)")
|
||||
parser.add_argument("--registry-dir", default=DEFAULT_REGISTRY_DIR)
|
||||
parser.add_argument("--attempts", type=int, default=3)
|
||||
parser.add_argument("-v", "--verbose", action="store_true")
|
||||
args = parser.parse_args(argv)
|
||||
|
||||
mqtt_common.setup_logging(logging.DEBUG if args.verbose else logging.WARNING)
|
||||
|
||||
# --- parse optional data JSON (parameter error -> exit 1) ---
|
||||
data: Optional[Dict[str, Any]] = None
|
||||
if args.data:
|
||||
try:
|
||||
data = json.loads(args.data)
|
||||
if not isinstance(data, dict):
|
||||
raise ValueError("--data must be a JSON object")
|
||||
except (ValueError, json.JSONDecodeError) as exc:
|
||||
logger.error("invalid --data: %s", exc)
|
||||
return 1
|
||||
|
||||
job_id = _resolve_job_id(args)
|
||||
if not job_id:
|
||||
logger.error("no job to publish for (unknown --job or no pending job)")
|
||||
return 1
|
||||
|
||||
try:
|
||||
job = load_job(job_id, args.registry_dir)
|
||||
except FileNotFoundError as exc:
|
||||
logger.error("%s", exc)
|
||||
return 1
|
||||
|
||||
config = broker_config_from_job(job)
|
||||
topic = job.get("topic_prefix")
|
||||
topic = f"{topic}/events" if topic else events_topic_for(job_id)
|
||||
seq = next_seq(job_id, args.registry_dir)
|
||||
payload = build_payload(
|
||||
job_id=job_id,
|
||||
seq=seq,
|
||||
event=args.event,
|
||||
detail=args.detail,
|
||||
data=data,
|
||||
auth_token=job.get("auth_token"),
|
||||
)
|
||||
body = json.dumps(payload, ensure_ascii=False).encode("utf-8")
|
||||
retain = args.retained or args.event in TERMINAL_EVENTS
|
||||
|
||||
publish = with_retry(
|
||||
_publish_once,
|
||||
attempts=args.attempts,
|
||||
exceptions=(OSError, TimeoutError, ConnectionError, ValueError),
|
||||
)
|
||||
try:
|
||||
publish(config, topic, body, retain)
|
||||
except Exception as exc:
|
||||
logger.error("publish failed after %d attempts: %s", args.attempts, exc)
|
||||
return 2
|
||||
|
||||
# Persistent audit log: record the exact payload we put on the wire so the
|
||||
# publish is reproducible from the log alone. Best-effort (isolated inside
|
||||
# append_event) — never fails the publish.
|
||||
mqtt_common.append_event(job_id, {
|
||||
"event": "published",
|
||||
"source_event": args.event,
|
||||
"seq": seq,
|
||||
"topic": topic,
|
||||
"retain": retain,
|
||||
"timestamp": payload["timestamp"],
|
||||
"detail": args.detail,
|
||||
"payload": payload,
|
||||
})
|
||||
|
||||
# Best-effort side effects: registry status sync + (debug) event log. Never
|
||||
# fail the publish on these.
|
||||
registry.append_event(job_id, args.registry_dir, payload)
|
||||
new_status = EVENT_TO_STATUS.get(args.event)
|
||||
if new_status:
|
||||
try:
|
||||
mqtt_common.update_job_status(job_id, args.registry_dir, status=new_status)
|
||||
except Exception as exc: # pragma: no cover - best effort
|
||||
logger.warning("status sync failed: %s", exc)
|
||||
|
||||
logger.info("published %s seq=%d job=%s retain=%s", args.event, seq, job_id, retain)
|
||||
return 0
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
sys.exit(main())
|
||||
@@ -0,0 +1,327 @@
|
||||
"""Job registry for the tmux-agent-orchestrate-delegate-job skill.
|
||||
|
||||
A job record is the single source of truth for one delegated unit of work:
|
||||
its id, prompt, owning agent session, broker connection, timeouts, and status.
|
||||
Records live as ``<registry_dir>/<job_id>.json`` with an append-only event log
|
||||
``<registry_dir>/<job_id>.events.log`` and a shared ``<registry_dir>/.lock``.
|
||||
|
||||
Concurrency is handled via the fcntl lock in :mod:`mqtt_common` (PoC). For
|
||||
multi-host delegation, migrate to SQLite WAL — see references/registry.md.
|
||||
|
||||
Importable as a library and runnable as a CLI (``register``/``list``/``get``/
|
||||
``status``/``pick``) so the ``tmux-agent-orchestrate-delegate-job`` bash wrapper can shell out.
|
||||
"""
|
||||
from __future__ import annotations
|
||||
|
||||
import argparse
|
||||
import json
|
||||
import logging
|
||||
import sys
|
||||
import uuid
|
||||
from pathlib import Path
|
||||
from typing import Any, Dict, List, Optional
|
||||
|
||||
import mqtt_common
|
||||
from mqtt_common import (
|
||||
DEFAULT_REGISTRY_DIR,
|
||||
SCHEMA_VERSION,
|
||||
_atomic_write_record,
|
||||
_utcnow,
|
||||
broker_config_from_env,
|
||||
load_job,
|
||||
registry_lock,
|
||||
topic_prefix_for,
|
||||
)
|
||||
|
||||
logger = logging.getLogger("delegate_job.registry")
|
||||
|
||||
TERMINAL_STATUSES = ("completed", "error", "cancelled")
|
||||
VALID_STATUSES = ("pending", "running", "completed", "error", "cancelled")
|
||||
|
||||
|
||||
def generate_job_id(bits: int = 32) -> str:
|
||||
"""PoC: 32-bit hex (8 chars). Production: 128-bit (full uuid4 hex)."""
|
||||
if bits >= 128:
|
||||
return uuid.uuid4().hex
|
||||
nibbles = max(1, bits // 4)
|
||||
return uuid.uuid4().hex[:nibbles]
|
||||
|
||||
|
||||
def register_job(
|
||||
prompt: str,
|
||||
agent: str = "claude-code",
|
||||
agent_session: str = "tmux:claude",
|
||||
broker: Optional[Dict[str, Any]] = None,
|
||||
timeout_sec: int = 600,
|
||||
idle_timeout_sec: int = 120,
|
||||
registry_dir: str = DEFAULT_REGISTRY_DIR,
|
||||
job_id: Optional[str] = None,
|
||||
expected_artifacts: Optional[List[str]] = None,
|
||||
bits: int = 32,
|
||||
auth_token: Optional[str] = None,
|
||||
) -> str:
|
||||
"""Create a new ``pending`` job record and return its id.
|
||||
|
||||
``broker`` defaults to the current environment's resolved broker block, so
|
||||
the registry alone is enough for ``publish_event.py`` to connect later.
|
||||
"""
|
||||
job_id = job_id or generate_job_id(bits)
|
||||
if broker is None:
|
||||
broker = broker_config_from_env().to_registry_block()
|
||||
now = _utcnow()
|
||||
record: Dict[str, Any] = {
|
||||
"schema_version": SCHEMA_VERSION,
|
||||
"job_id": job_id,
|
||||
"status": "pending",
|
||||
"created_at": now,
|
||||
"updated_at": now,
|
||||
"prompt": prompt,
|
||||
"agent": agent,
|
||||
"agent_session": agent_session,
|
||||
"broker": broker,
|
||||
"topic_prefix": topic_prefix_for(job_id),
|
||||
"timeout_sec": int(timeout_sec),
|
||||
"idle_timeout_sec": int(idle_timeout_sec),
|
||||
"expected_artifacts": expected_artifacts or [],
|
||||
"last_seq": 0,
|
||||
"auth_token": auth_token,
|
||||
}
|
||||
with registry_lock(registry_dir):
|
||||
if mqtt_common._job_path(job_id, registry_dir).exists():
|
||||
raise FileExistsError(f"job already exists: {job_id}")
|
||||
_atomic_write_record(job_id, registry_dir, record)
|
||||
# Seed the persistent audit log (meta.json + status.json + a "registered"
|
||||
# event). Best-effort inside init_job_log — never blocks registration.
|
||||
mqtt_common.init_job_log(job_id, meta=record)
|
||||
logger.info("registered job %s (agent=%s session=%s)", job_id, agent, agent_session)
|
||||
return job_id
|
||||
|
||||
|
||||
def pick_pending(agent_session: str, registry_dir: str = DEFAULT_REGISTRY_DIR) -> Optional[str]:
|
||||
"""Claim the oldest ``pending`` job for ``agent_session``, flipping it to
|
||||
``running`` atomically under the lock. Returns the job id, or None if no
|
||||
pending job matches. This is how each tmux session takes only its own work
|
||||
without two sessions grabbing the same job."""
|
||||
with registry_lock(registry_dir):
|
||||
candidates = []
|
||||
for record in _iter_records(registry_dir):
|
||||
if record.get("status") == "pending" and record.get("agent_session") == agent_session:
|
||||
candidates.append(record)
|
||||
if not candidates:
|
||||
return None
|
||||
candidates.sort(key=lambda r: r.get("created_at", ""))
|
||||
chosen = candidates[0]
|
||||
chosen["status"] = "running"
|
||||
chosen["updated_at"] = _utcnow()
|
||||
_atomic_write_record(chosen["job_id"], registry_dir, chosen)
|
||||
logger.info("session %s picked job %s", agent_session, chosen["job_id"])
|
||||
job_id = chosen["job_id"]
|
||||
updated_at = chosen["updated_at"]
|
||||
# pick_pending writes the record directly (not via update_job_status), so it
|
||||
# mirrors the pending->running transition into the audit log here. Best-effort.
|
||||
mqtt_common.update_logged_status(job_id, "running", updated_at=updated_at)
|
||||
mqtt_common.append_event(job_id, {
|
||||
"event": "status_changed",
|
||||
"from": "pending",
|
||||
"to": "running",
|
||||
"by": agent_session,
|
||||
"timestamp": updated_at,
|
||||
})
|
||||
return job_id
|
||||
|
||||
|
||||
def update_status(job_id: str, registry_dir: str, status: str) -> Dict[str, Any]:
|
||||
if status not in VALID_STATUSES:
|
||||
raise ValueError(f"invalid status {status!r}; expected one of {VALID_STATUSES}")
|
||||
return mqtt_common.update_job_status(job_id, registry_dir, status=status)
|
||||
|
||||
|
||||
def list_jobs(registry_dir: str = DEFAULT_REGISTRY_DIR, status: Optional[str] = None) -> List[Dict[str, Any]]:
|
||||
records = list(_iter_records(registry_dir))
|
||||
if status:
|
||||
records = [r for r in records if r.get("status") == status]
|
||||
records.sort(key=lambda r: r.get("created_at", ""))
|
||||
return records
|
||||
|
||||
|
||||
def append_event(job_id: str, registry_dir: str, payload: Dict[str, Any]) -> None:
|
||||
"""Append one event payload as a JSON line to the job's events log. Best
|
||||
effort, debug-only; failures are logged but never raised to the caller."""
|
||||
try:
|
||||
Path(registry_dir).mkdir(parents=True, exist_ok=True)
|
||||
log_path = Path(registry_dir) / f"{job_id}.events.log"
|
||||
with open(log_path, "a", encoding="utf-8") as fh:
|
||||
fh.write(json.dumps(payload, ensure_ascii=False) + "\n")
|
||||
except OSError as exc: # pragma: no cover - best effort
|
||||
logger.warning("could not append event for %s: %s", job_id, exc)
|
||||
|
||||
|
||||
# convenience re-export so callers can `from registry import load_job`
|
||||
__all__ = [
|
||||
"register_job", "pick_pending", "update_status", "load_job",
|
||||
"list_jobs", "append_event", "generate_job_id",
|
||||
]
|
||||
|
||||
|
||||
def _iter_records(registry_dir: str):
|
||||
base = Path(registry_dir)
|
||||
if not base.exists():
|
||||
return
|
||||
for path in sorted(base.glob("*.json")):
|
||||
try:
|
||||
with open(path, "r", encoding="utf-8") as fh:
|
||||
yield json.load(fh)
|
||||
except (OSError, json.JSONDecodeError) as exc:
|
||||
logger.warning("skipping unreadable record %s: %s", path, exc)
|
||||
|
||||
|
||||
# --------------------------------------------------------------------------
|
||||
# CLI (so the bash wrapper can shell out without inline python)
|
||||
# --------------------------------------------------------------------------
|
||||
def _build_parser() -> argparse.ArgumentParser:
|
||||
parser = argparse.ArgumentParser(description="tmux-agent-orchestrate-delegate-job registry CLI")
|
||||
parser.add_argument("--registry-dir", default=DEFAULT_REGISTRY_DIR)
|
||||
sub = parser.add_subparsers(dest="command", required=True)
|
||||
|
||||
p_reg = sub.add_parser("register", help="create a pending job; prints the job id")
|
||||
p_reg.add_argument("--prompt", required=True)
|
||||
p_reg.add_argument("--agent", default="claude-code")
|
||||
p_reg.add_argument("--agent-session", default="tmux:claude")
|
||||
p_reg.add_argument("--timeout", type=int, default=600)
|
||||
p_reg.add_argument("--idle-timeout", type=int, default=120)
|
||||
p_reg.add_argument("--bits", type=int, default=32, help="32 (PoC) or 128 (prod)")
|
||||
p_reg.add_argument("--artifact", action="append", default=[], dest="artifacts")
|
||||
|
||||
p_list = sub.add_parser("list", help="list jobs (optionally by status)")
|
||||
p_list.add_argument("--status", default=None)
|
||||
p_list.add_argument("--json", action="store_true")
|
||||
|
||||
p_get = sub.add_parser("get", help="print one job record as JSON")
|
||||
p_get.add_argument("--job", required=True)
|
||||
|
||||
p_status = sub.add_parser("status", help="set a job status")
|
||||
p_status.add_argument("--job", required=True)
|
||||
p_status.add_argument("--set", required=True, dest="status")
|
||||
|
||||
p_pick = sub.add_parser("pick", help="claim a pending job for a session; prints id")
|
||||
p_pick.add_argument("--agent-session", default="tmux:claude")
|
||||
|
||||
p_logs = sub.add_parser(
|
||||
"logs",
|
||||
help="show the persistent audit log for a job, or --list every logged job",
|
||||
)
|
||||
p_logs.add_argument("job_id", nargs="?", default=None,
|
||||
help="job id whose events.ndjson to print")
|
||||
p_logs.add_argument("--list", action="store_true", dest="list_all",
|
||||
help="summarise every job under the logs dir instead")
|
||||
p_logs.add_argument("--logs-dir", default=None,
|
||||
help="override the audit-log root (default: $DELEGATE_JOB_LOGS_DIR "
|
||||
"or <cwd>/.hermes/delegate_job_logs)")
|
||||
p_logs.add_argument("--tail", type=int, default=0,
|
||||
help="show only the last N events (0 = all)")
|
||||
p_logs.add_argument("--json", action="store_true",
|
||||
help="emit raw JSON lines / records instead of a table")
|
||||
|
||||
return parser
|
||||
|
||||
|
||||
def main(argv: Optional[List[str]] = None) -> int:
|
||||
mqtt_common.setup_logging(logging.INFO)
|
||||
args = _build_parser().parse_args(argv)
|
||||
rd = args.registry_dir
|
||||
|
||||
if args.command == "register":
|
||||
job_id = register_job(
|
||||
prompt=args.prompt,
|
||||
agent=args.agent,
|
||||
agent_session=args.agent_session,
|
||||
timeout_sec=args.timeout,
|
||||
idle_timeout_sec=args.idle_timeout,
|
||||
registry_dir=rd,
|
||||
expected_artifacts=args.artifacts,
|
||||
bits=args.bits,
|
||||
)
|
||||
print(job_id)
|
||||
return 0
|
||||
|
||||
if args.command == "list":
|
||||
records = list_jobs(rd, status=args.status)
|
||||
if args.json:
|
||||
print(json.dumps(records, ensure_ascii=False, indent=2))
|
||||
else:
|
||||
if not records:
|
||||
print("(no jobs)")
|
||||
for r in records:
|
||||
print(f"{r['job_id']} {r.get('status','?'):10s} {r.get('agent_session','')}"
|
||||
f" {r.get('prompt','')[:48]}")
|
||||
return 0
|
||||
|
||||
if args.command == "get":
|
||||
try:
|
||||
print(json.dumps(load_job(args.job, rd), ensure_ascii=False, indent=2))
|
||||
except FileNotFoundError as exc:
|
||||
print(str(exc), file=sys.stderr)
|
||||
return 1
|
||||
return 0
|
||||
|
||||
if args.command == "status":
|
||||
try:
|
||||
update_status(args.job, rd, args.status)
|
||||
except (FileNotFoundError, ValueError) as exc:
|
||||
print(str(exc), file=sys.stderr)
|
||||
return 1
|
||||
return 0
|
||||
|
||||
if args.command == "pick":
|
||||
job_id = pick_pending(args.agent_session, rd)
|
||||
if job_id is None:
|
||||
return 3 # no pending job for this session
|
||||
print(job_id)
|
||||
return 0
|
||||
|
||||
if args.command == "logs":
|
||||
return _cmd_logs(args)
|
||||
|
||||
return 1
|
||||
|
||||
|
||||
def _cmd_logs(args) -> int:
|
||||
"""Pretty-print one job's events.ndjson, or summarise all logged jobs."""
|
||||
logs_dir = args.logs_dir or mqtt_common.LOGS_DIR
|
||||
|
||||
if args.list_all:
|
||||
jobs = mqtt_common.list_logged_jobs(logs_dir)
|
||||
if args.json:
|
||||
print(json.dumps(jobs, ensure_ascii=False, indent=2))
|
||||
return 0
|
||||
if not jobs:
|
||||
print(f"(no logged jobs under {logs_dir})")
|
||||
return 0
|
||||
for m in jobs:
|
||||
print(f"{m.get('job_id','?')} {m.get('status','?'):10s} "
|
||||
f"{m.get('created_at','-'):20s} {(m.get('prompt') or '')[:48]}")
|
||||
return 0
|
||||
|
||||
if not args.job_id:
|
||||
print("logs requires a <job_id> or --list", file=sys.stderr)
|
||||
return 1
|
||||
|
||||
events = list(mqtt_common.iter_logged_events(args.job_id, logs_dir))
|
||||
if not events and not mqtt_common.job_log_dir(args.job_id, logs_dir).exists():
|
||||
print(f"no audit log for job {args.job_id} under {logs_dir}", file=sys.stderr)
|
||||
return 1
|
||||
if args.tail and args.tail > 0:
|
||||
events = events[-args.tail:]
|
||||
if args.json:
|
||||
for e in events:
|
||||
print(json.dumps(e, ensure_ascii=False))
|
||||
return 0
|
||||
for e in events:
|
||||
ts = e.get("logged_at") or e.get("timestamp") or "-"
|
||||
extra = e.get("detail") or e.get("to") or e.get("source_event") or ""
|
||||
print(f"{ts:24s} {e.get('event','?'):<16s} {extra}")
|
||||
return 0
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
sys.exit(main())
|
||||
Reference in New Issue
Block a user