#!/usr/bin/env python3 """publish_event.py — the single entry point for emitting a Job event. Loads the job record from the registry, resolves its broker, assigns the next monotonic ``seq``, builds the schema-v1 JSON payload, and publishes it to ``/events`` over QoS 1 with exponential-backoff retry. Silent by design: nothing is printed to stdout. Diagnostics go to stderr via logging. Terminal events (``completed``/``error``) publish with retain=True so a late subscriber still observes the final state (production hardening). Exit codes: 0 published successfully 1 parameter / registry error (bad args, unknown job, no pending job) 2 publish failed after retries (network / broker / ACK timeout) Usage: publish_event.py --job --event started [--detail "..."] [--data '{...}'] publish_event.py --pick-pending --agent-session tmux:claude --event completed publish_event.py --job --event completed --retained """ from __future__ import annotations import argparse import hashlib import hmac import json import logging import sys import time from typing import Any, Dict, Optional import mqtt_common import registry from mqtt_common import ( DEFAULT_REGISTRY_DIR, SCHEMA_VERSION, broker_config_from_job, events_topic_for, load_job, make_client, next_seq, with_retry, ) logger = logging.getLogger("delegate_job.publish_event") VALID_EVENTS = ("started", "permission_required", "progress", "completed", "error") TERMINAL_EVENTS = ("completed", "error") # event -> registry status to sync as a best-effort side effect EVENT_TO_STATUS = { "started": "running", "completed": "completed", "error": "error", } CONNECT_ACK_TIMEOUT = 10 # seconds to wait for CONNACK PUBLISH_ACK_TIMEOUT = 5 # seconds to wait for QoS-1 PUBACK def build_payload( job_id: str, seq: int, event: str, detail: str, data: Optional[Dict[str, Any]], auth_token: Optional[str], ) -> Dict[str, Any]: payload: Dict[str, Any] = { "schema_version": SCHEMA_VERSION, "seq": seq, "job_id": job_id, "event": event, "timestamp": mqtt_common._utcnow(), "detail": detail, "data": dict(data) if data else {}, } # Production: carry the per-job auth token so the subscriber can verify the # publisher. The token is compared in plain text (bearer-token style) by the # subscriber — NOT an HMAC. See SKILL.md "Auth token" and PLAN 8.2. The # registry stores the per-job token in `auth_token`; only include it on the # wire when set so the public broker (no auth) doesn't leak anything. if auth_token: sign_payload = {k: v for k, v in payload.items() if k != "data"} sign_payload["data"] = {k: v for k, v in payload.get("data", {}).items() if k != "hmac_sig"} msg = json.dumps(sign_payload, sort_keys=True, separators=(",", ":")).encode() sig = hmac.new(auth_token.encode(), msg, hashlib.sha256).hexdigest() payload["data"]["hmac_sig"] = sig return payload def _publish_once(config, topic: str, body: bytes, retain: bool) -> None: """Connect, publish one QoS-1 message, wait for the broker ACK, disconnect. Raises on any failure so ``with_retry`` can re-run the whole sequence (a fresh connection per attempt is the robust choice for a PoC).""" client = make_client("publisher", config) connected = {"rc": None} def on_connect(_c, _u, _flags, reason_code, _props): connected["rc"] = reason_code client.on_connect = on_connect client.connect(config.host, config.port, config.keepalive) client.loop_start() try: # Wait for CONNACK so we fail fast on auth/TLS errors. deadline = time.monotonic() + CONNECT_ACK_TIMEOUT while connected["rc"] is None and time.monotonic() < deadline: time.sleep(0.05) if connected["rc"] is None: raise TimeoutError("no CONNACK from broker") if mqtt_common.reason_code_value(connected["rc"]) != 0: raise ConnectionError(f"broker refused connection: rc={connected['rc']}") info = client.publish(topic, payload=body, qos=1, retain=retain) info.wait_for_publish(timeout=PUBLISH_ACK_TIMEOUT) if not info.is_published(): raise TimeoutError("publish not acknowledged within timeout") finally: client.loop_stop() try: client.disconnect() except Exception: # pragma: no cover - disconnect best effort pass def _resolve_job_id(args) -> Optional[str]: if args.pick_pending: return registry.pick_pending(args.agent_session, args.registry_dir) return args.job def main(argv=None) -> int: parser = argparse.ArgumentParser(description="Publish a Job event to MQTT") target = parser.add_mutually_exclusive_group(required=True) target.add_argument("--job", help="job id to publish for") target.add_argument("--pick-pending", action="store_true", help="auto-select a pending job for --agent-session") parser.add_argument("--agent-session", default="tmux:claude", help="session label used with --pick-pending") parser.add_argument("--event", default="progress", choices=VALID_EVENTS) parser.add_argument("--detail", default="") parser.add_argument("--data", default=None, help="optional JSON object string") parser.add_argument("--retained", action="store_true", help="force retain=True (auto for completed/error)") parser.add_argument("--registry-dir", default=DEFAULT_REGISTRY_DIR) parser.add_argument("--attempts", type=int, default=3) parser.add_argument("-v", "--verbose", action="store_true") args = parser.parse_args(argv) mqtt_common.setup_logging(logging.DEBUG if args.verbose else logging.WARNING) # --- parse optional data JSON (parameter error -> exit 1) --- data: Optional[Dict[str, Any]] = None if args.data: try: data = json.loads(args.data) if not isinstance(data, dict): raise ValueError("--data must be a JSON object") except (ValueError, json.JSONDecodeError) as exc: logger.error("invalid --data: %s", exc) return 1 job_id = _resolve_job_id(args) if not job_id: logger.error("no job to publish for (unknown --job or no pending job)") return 1 try: job = load_job(job_id, args.registry_dir) except FileNotFoundError as exc: logger.error("%s", exc) return 1 config = broker_config_from_job(job) topic = job.get("topic_prefix") topic = f"{topic}/events" if topic else events_topic_for(job_id) seq = next_seq(job_id, args.registry_dir) payload = build_payload( job_id=job_id, seq=seq, event=args.event, detail=args.detail, data=data, auth_token=job.get("auth_token"), ) body = json.dumps(payload, ensure_ascii=False).encode("utf-8") retain = args.retained or args.event in TERMINAL_EVENTS publish = with_retry( _publish_once, attempts=args.attempts, exceptions=(OSError, TimeoutError, ConnectionError, ValueError), ) try: publish(config, topic, body, retain) except Exception as exc: logger.error("publish failed after %d attempts: %s", args.attempts, exc) return 2 # Persistent audit log: record the exact payload we put on the wire so the # publish is reproducible from the log alone. Best-effort (isolated inside # append_event) — never fails the publish. mqtt_common.append_event(job_id, { "event": "published", "source_event": args.event, "seq": seq, "topic": topic, "retain": retain, "timestamp": payload["timestamp"], "detail": args.detail, "payload": payload, }) # Best-effort side effects: registry status sync + (debug) event log. Never # fail the publish on these. registry.append_event(job_id, args.registry_dir, payload) new_status = EVENT_TO_STATUS.get(args.event) if new_status: try: mqtt_common.update_job_status(job_id, args.registry_dir, status=new_status) except Exception as exc: # pragma: no cover - best effort logger.warning("status sync failed: %s", exc) logger.info("published %s seq=%d job=%s retain=%s", args.event, seq, job_id, retain) return 0 if __name__ == "__main__": sys.exit(main())