Files
multi-agent-mux/skills/tmux-agent-orchestrate-delegate-job/scripts/job_subscriber.py
T
Godopu a6f7c045bc feat(delegate-job): bump default --timeout 600s -> 3600s (1h wall-clock budget)
Changed 11 locations across 5 files:
- scripts/registry.py: timeout_sec dataclass default + argparse default
- scripts/job_subscriber.py: help text + fallback default
- SKILL.md: 4 recommended invocation examples
- registry.md: JSON example + CLI example
- tmux-agent-orchestrate-delegate-job: bash wrapper TIMEOUT var

--idle-timeout 120s preserved unchanged.
Rationale: 10min default was too short for deep analysis / multi-file
generation tasks; 1h aligns with long-running agent delegation patterns.
2026-06-21 06:08:49 +00:00

234 lines
9.3 KiB
Python
Executable File

#!/usr/bin/env python3
"""job_subscriber.py — the single entry point for observing Job events.
Subscribes to one job's ``<topic_prefix>/events`` (or, with ``--wait-any``, the
events of every running/pending job in the registry), prints one line to stdout
per accepted event, and exits on a terminal event or a timeout.
Design points (all flagged in the PLAN review):
- terminal state machine: ``completed``/``error`` is acted on exactly once per
job, so QoS-1 duplicates or an ``error``-after-``completed`` reorder are safe.
- dual timeouts: a wall-clock ``--timeout`` (total budget, started at
subscribe time so a cold start can't hang forever) AND an idle
``--idle-timeout`` (no new event for N seconds).
- defensive parsing: undecodable payloads, ``schema_version`` mismatches, and
``job_id`` values we did not subscribe for are logged and dropped.
stdout = event lines only. Diagnostics go to stderr via logging.
Exit codes:
0 all watched jobs reached ``completed``
1 any watched job reached ``error``
2 timed out (wall-clock or idle) before all jobs finished
"""
from __future__ import annotations
import argparse
import json
import logging
import queue
import sys
import time
from typing import Any, Dict, List, Optional, Set, Tuple
import mqtt_common
import registry
from mqtt_common import (
DEFAULT_REGISTRY_DIR,
SCHEMA_VERSION,
broker_config_from_job,
load_job,
make_client,
)
logger = logging.getLogger("delegate_job.job_subscriber")
TERMINAL_EVENTS = ("completed", "error")
def _format_line(topic: str, payload: Dict[str, Any]) -> str:
return (
f"{payload.get('timestamp','-')} "
f"job={payload.get('job_id','?')} "
f"seq={payload.get('seq','?')} "
f"{payload.get('event','?'):<20} "
f"{payload.get('detail','')}"
)
class _Watcher:
"""Holds the shared queue + the set of job_ids we accept events for."""
def __init__(self, expected_job_ids: Set[str], expected_tokens: Dict[str, Optional[str]]):
self.events: "queue.Queue[Tuple[str, Dict[str, Any]]]" = queue.Queue()
self.expected = set(expected_job_ids)
self.tokens = expected_tokens # job_id -> expected auth_token (or None)
def on_message(self, _client, _userdata, msg) -> None:
# --- defensive parsing -------------------------------------------
try:
payload = json.loads(msg.payload.decode("utf-8"))
except (UnicodeDecodeError, json.JSONDecodeError) as exc:
logger.warning("drop unparseable payload on %s: %s", msg.topic, exc)
return
if not isinstance(payload, dict):
logger.warning("drop non-object payload on %s", msg.topic)
return
if payload.get("schema_version") != SCHEMA_VERSION:
logger.warning("drop event with schema_version=%r (expected %d)",
payload.get("schema_version"), SCHEMA_VERSION)
return
jid = payload.get("job_id")
if jid not in self.expected:
logger.warning("drop event for unexpected job_id=%r on %s", jid, msg.topic)
return
# --- production auth check: data.auth_token must match if expected ---
expected_token = self.tokens.get(jid)
if expected_token is not None:
got = (payload.get("data") or {}).get("auth_token")
if got != expected_token:
logger.warning("drop event for job %s: auth_token mismatch", jid)
return
# Persistent audit log from the *subscriber's* vantage point: every event
# that survives defensive parsing is recorded here, including ones a
# different host published. This is the external-observer record that
# backstops the publisher's own "published" line if it never wrote one.
mqtt_common.append_event(jid, {
"event": "received",
"source_event": payload.get("event"),
"seq": payload.get("seq"),
"topic": msg.topic,
"timestamp": payload.get("timestamp"),
"detail": payload.get("detail", ""),
})
self.events.put((msg.topic, payload))
def _collect_jobs(args) -> List[Dict[str, Any]]:
"""Resolve the list of job records this invocation should watch."""
if args.wait_any:
jobs = [r for r in registry.list_jobs(args.registry_dir)
if r.get("status") in ("pending", "running")]
if not jobs:
logger.error("no pending/running jobs to wait for")
return jobs
job = load_job(args.job, args.registry_dir) # raises FileNotFoundError
return [job]
def main(argv=None) -> int:
parser = argparse.ArgumentParser(description="Subscribe to Job events on MQTT")
target = parser.add_mutually_exclusive_group(required=True)
target.add_argument("--job", help="job id to watch")
target.add_argument("--wait-any", action="store_true",
help="watch every pending/running job in the registry")
parser.add_argument("--timeout", type=float, default=None,
help="wall-clock budget in seconds (default: job.timeout_sec or 3600)")
parser.add_argument("--idle-timeout", type=float, default=None,
help="max seconds with no new event (default: job.idle_timeout_sec or 120)")
parser.add_argument("--expect-retention", action="store_true",
help="warn if no retained terminal event arrives promptly")
parser.add_argument("--registry-dir", default=DEFAULT_REGISTRY_DIR)
parser.add_argument("-v", "--verbose", action="store_true")
args = parser.parse_args(argv)
mqtt_common.setup_logging(logging.DEBUG if args.verbose else logging.WARNING)
try:
jobs = _collect_jobs(args)
except FileNotFoundError as exc:
logger.error("%s", exc)
return 2
if not jobs:
return 2
expected_ids: Set[str] = {j["job_id"] for j in jobs}
tokens = {j["job_id"]: j.get("auth_token") for j in jobs}
watcher = _Watcher(expected_ids, tokens)
# Resolve timeouts from CLI, falling back to the (first) job's settings.
base_job = jobs[0]
wall_timeout = args.timeout if args.timeout is not None else float(base_job.get("timeout_sec", 3600))
idle_timeout = args.idle_timeout if args.idle_timeout is not None else float(base_job.get("idle_timeout_sec", 120))
# All watched jobs share a broker in practice; connect using the first
# job's broker and subscribe to each job's events topic.
config = broker_config_from_job(base_job)
client = make_client("subscriber", config)
client.on_message = watcher.on_message
subscribed_topics = []
for job in jobs:
prefix = job.get("topic_prefix") or mqtt_common.topic_prefix_for(job["job_id"])
subscribed_topics.append(f"{prefix}/events")
def on_connect(_c, _u, _flags, reason_code, _props):
if mqtt_common.reason_code_value(reason_code) != 0:
logger.error("broker connection failed: rc=%s", reason_code)
return
for topic in subscribed_topics:
_c.subscribe(topic, qos=1)
logger.info("subscribed to %s", topic)
client.on_connect = on_connect
client.connect(config.host, config.port, config.keepalive)
client.loop_start()
terminal: Dict[str, str] = {} # job_id -> "completed"/"error"
pending: Set[str] = set(expected_ids)
start = time.monotonic()
wall_deadline = start + wall_timeout
last_event = start
retention_checked = not args.expect_retention
try:
while pending:
now = time.monotonic()
if now >= wall_deadline:
logger.error("wall-clock timeout (%.0fs); still pending: %s",
wall_timeout, ", ".join(sorted(pending)))
return 2
idle_left = idle_timeout - (now - last_event)
if idle_left <= 0:
logger.error("idle timeout (%.0fs, no events); still pending: %s",
idle_timeout, ", ".join(sorted(pending)))
return 2
wait = min(wall_deadline - now, idle_left, 1.0)
try:
topic, payload = watcher.events.get(timeout=wait)
except queue.Empty:
if not retention_checked and (now - start) > 3.0:
logger.warning("--expect-retention set but no retained "
"terminal event observed yet")
retention_checked = True
continue
last_event = time.monotonic()
retention_checked = True
print(_format_line(topic, payload), flush=True)
jid = payload["job_id"]
event = payload.get("event")
if event in TERMINAL_EVENTS:
if jid in terminal:
# Already finalised: ignore duplicates / late reorders.
logger.info("ignoring duplicate terminal %s for %s", event, jid)
continue
terminal[jid] = event
pending.discard(jid)
finally:
client.loop_stop()
try:
client.disconnect()
except Exception: # pragma: no cover
pass
# All jobs reached a terminal state. error wins over completed.
if any(state == "error" for state in terminal.values()):
return 1
return 0
if __name__ == "__main__":
sys.exit(main())