#!/usr/bin/env python3 """job_subscriber.py — the single entry point for observing Job events. Subscribes to one job's ``/events`` (or, with ``--wait-any``, the events of every running/pending job in the registry), prints one line to stdout per accepted event, and exits on a terminal event or a timeout. Design points (all flagged in the PLAN review): - terminal state machine: ``completed``/``error`` is acted on exactly once per job, so QoS-1 duplicates or an ``error``-after-``completed`` reorder are safe. - dual timeouts: a wall-clock ``--timeout`` (total budget, started at subscribe time so a cold start can't hang forever) AND an idle ``--idle-timeout`` (no new event for N seconds). - defensive parsing: undecodable payloads, ``schema_version`` mismatches, and ``job_id`` values we did not subscribe for are logged and dropped. stdout = event lines only. Diagnostics go to stderr via logging. Exit codes: 0 all watched jobs reached ``completed`` 1 any watched job reached ``error`` 2 timed out (wall-clock or idle) before all jobs finished """ from __future__ import annotations import argparse import json import logging import queue import sys import time from typing import Any, Dict, List, Optional, Set, Tuple import mqtt_common import registry from mqtt_common import ( DEFAULT_REGISTRY_DIR, SCHEMA_VERSION, broker_config_from_job, load_job, make_client, ) logger = logging.getLogger("delegate_job.job_subscriber") TERMINAL_EVENTS = ("completed", "error") def _format_line(topic: str, payload: Dict[str, Any]) -> str: return ( f"{payload.get('timestamp','-')} " f"job={payload.get('job_id','?')} " f"seq={payload.get('seq','?')} " f"{payload.get('event','?'):<20} " f"{payload.get('detail','')}" ) class _Watcher: """Holds the shared queue + the set of job_ids we accept events for.""" def __init__(self, expected_job_ids: Set[str], expected_tokens: Dict[str, Optional[str]]): self.events: "queue.Queue[Tuple[str, Dict[str, Any]]]" = queue.Queue() self.expected = set(expected_job_ids) self.tokens = expected_tokens # job_id -> expected auth_token (or None) def on_message(self, _client, _userdata, msg) -> None: # --- defensive parsing ------------------------------------------- try: payload = json.loads(msg.payload.decode("utf-8")) except (UnicodeDecodeError, json.JSONDecodeError) as exc: logger.warning("drop unparseable payload on %s: %s", msg.topic, exc) return if not isinstance(payload, dict): logger.warning("drop non-object payload on %s", msg.topic) return if payload.get("schema_version") != SCHEMA_VERSION: logger.warning("drop event with schema_version=%r (expected %d)", payload.get("schema_version"), SCHEMA_VERSION) return jid = payload.get("job_id") if jid not in self.expected: logger.warning("drop event for unexpected job_id=%r on %s", jid, msg.topic) return # --- production auth check: data.auth_token must match if expected --- expected_token = self.tokens.get(jid) if expected_token is not None: got = (payload.get("data") or {}).get("auth_token") if got != expected_token: logger.warning("drop event for job %s: auth_token mismatch", jid) return # Persistent audit log from the *subscriber's* vantage point: every event # that survives defensive parsing is recorded here, including ones a # different host published. This is the external-observer record that # backstops the publisher's own "published" line if it never wrote one. mqtt_common.append_event(jid, { "event": "received", "source_event": payload.get("event"), "seq": payload.get("seq"), "topic": msg.topic, "timestamp": payload.get("timestamp"), "detail": payload.get("detail", ""), }) self.events.put((msg.topic, payload)) def _collect_jobs(args) -> List[Dict[str, Any]]: """Resolve the list of job records this invocation should watch.""" if args.wait_any: jobs = [r for r in registry.list_jobs(args.registry_dir) if r.get("status") in ("pending", "running")] if not jobs: logger.error("no pending/running jobs to wait for") return jobs job = load_job(args.job, args.registry_dir) # raises FileNotFoundError return [job] def main(argv=None) -> int: parser = argparse.ArgumentParser(description="Subscribe to Job events on MQTT") target = parser.add_mutually_exclusive_group(required=True) target.add_argument("--job", help="job id to watch") target.add_argument("--wait-any", action="store_true", help="watch every pending/running job in the registry") parser.add_argument("--timeout", type=float, default=None, help="wall-clock budget in seconds (default: job.timeout_sec or 3600)") parser.add_argument("--idle-timeout", type=float, default=None, help="max seconds with no new event (default: job.idle_timeout_sec or 120)") parser.add_argument("--expect-retention", action="store_true", help="warn if no retained terminal event arrives promptly") parser.add_argument("--registry-dir", default=DEFAULT_REGISTRY_DIR) parser.add_argument("-v", "--verbose", action="store_true") args = parser.parse_args(argv) mqtt_common.setup_logging(logging.DEBUG if args.verbose else logging.WARNING) try: jobs = _collect_jobs(args) except FileNotFoundError as exc: logger.error("%s", exc) return 2 if not jobs: return 2 expected_ids: Set[str] = {j["job_id"] for j in jobs} tokens = {j["job_id"]: j.get("auth_token") for j in jobs} watcher = _Watcher(expected_ids, tokens) # Resolve timeouts from CLI, falling back to the (first) job's settings. base_job = jobs[0] wall_timeout = args.timeout if args.timeout is not None else float(base_job.get("timeout_sec", 3600)) idle_timeout = args.idle_timeout if args.idle_timeout is not None else float(base_job.get("idle_timeout_sec", 120)) # All watched jobs share a broker in practice; connect using the first # job's broker and subscribe to each job's events topic. config = broker_config_from_job(base_job) client = make_client("subscriber", config) client.on_message = watcher.on_message subscribed_topics = [] for job in jobs: prefix = job.get("topic_prefix") or mqtt_common.topic_prefix_for(job["job_id"]) subscribed_topics.append(f"{prefix}/events") def on_connect(_c, _u, _flags, reason_code, _props): if mqtt_common.reason_code_value(reason_code) != 0: logger.error("broker connection failed: rc=%s", reason_code) return for topic in subscribed_topics: _c.subscribe(topic, qos=1) logger.info("subscribed to %s", topic) client.on_connect = on_connect client.connect(config.host, config.port, config.keepalive) client.loop_start() terminal: Dict[str, str] = {} # job_id -> "completed"/"error" pending: Set[str] = set(expected_ids) start = time.monotonic() wall_deadline = start + wall_timeout last_event = start retention_checked = not args.expect_retention try: while pending: now = time.monotonic() if now >= wall_deadline: logger.error("wall-clock timeout (%.0fs); still pending: %s", wall_timeout, ", ".join(sorted(pending))) return 2 idle_left = idle_timeout - (now - last_event) if idle_left <= 0: logger.error("idle timeout (%.0fs, no events); still pending: %s", idle_timeout, ", ".join(sorted(pending))) return 2 wait = min(wall_deadline - now, idle_left, 1.0) try: topic, payload = watcher.events.get(timeout=wait) except queue.Empty: if not retention_checked and (now - start) > 3.0: logger.warning("--expect-retention set but no retained " "terminal event observed yet") retention_checked = True continue last_event = time.monotonic() retention_checked = True print(_format_line(topic, payload), flush=True) jid = payload["job_id"] event = payload.get("event") if event in TERMINAL_EVENTS: if jid in terminal: # Already finalised: ignore duplicates / late reorders. logger.info("ignoring duplicate terminal %s for %s", event, jid) continue terminal[jid] = event pending.discard(jid) finally: client.loop_stop() try: client.disconnect() except Exception: # pragma: no cover pass # All jobs reached a terminal state. error wins over completed. if any(state == "error" for state in terminal.values()): return 1 return 0 if __name__ == "__main__": sys.exit(main())