Files
multi-agent-mux/skills/tmux-agent-orchestrate-delegate-job/scripts/publish_event.py
T
Godopu 3677e4aace feat(delegate-job): add subscriber auto-reconnect (FW-01) + HMAC-SHA256 event signing (FW-05)
FW-01: job_subscriber.py now has on_disconnect callback (5-arg paho v2
  signature), reconnect_delay_set(1,16) for exponential backoff, and
  with_retry-wrapped initial connect (5 attempts). paho loop_start()
  handles auto-reconnect internally.

FW-05: publish_event.py signs payloads with HMAC-SHA256 using auth_token
  as key (replaces plaintext token in wire). mqtt_common.py adds
  verify_hmac() helper using hmac.compare_digest (timing-safe).
  job_subscriber.py validates incoming events via verify_hmac.
  PoC mode (auth_token=None) skips verification — backward compatible.

Reviewed by agy-existing (PASS) and claude-existing (FAIL: on_disconnect
  4-arg signature → fixed to 5-arg matching paho v2 CallbackAPIVersion).
2026-06-21 06:31:39 +00:00

232 lines
8.4 KiB
Python
Executable File

#!/usr/bin/env python3
"""publish_event.py — the single entry point for emitting a Job event.
Loads the job record from the registry, resolves its broker, assigns the next
monotonic ``seq``, builds the schema-v1 JSON payload, and publishes it to
``<topic_prefix>/events`` over QoS 1 with exponential-backoff retry.
Silent by design: nothing is printed to stdout. Diagnostics go to stderr via
logging. Terminal events (``completed``/``error``) publish with retain=True so
a late subscriber still observes the final state (production hardening).
Exit codes:
0 published successfully
1 parameter / registry error (bad args, unknown job, no pending job)
2 publish failed after retries (network / broker / ACK timeout)
Usage:
publish_event.py --job <id> --event started [--detail "..."] [--data '{...}']
publish_event.py --pick-pending --agent-session tmux:claude --event completed
publish_event.py --job <id> --event completed --retained
"""
from __future__ import annotations
import argparse
import hashlib
import hmac
import json
import logging
import sys
import time
from typing import Any, Dict, Optional
import mqtt_common
import registry
from mqtt_common import (
DEFAULT_REGISTRY_DIR,
SCHEMA_VERSION,
broker_config_from_job,
events_topic_for,
load_job,
make_client,
next_seq,
with_retry,
)
logger = logging.getLogger("delegate_job.publish_event")
VALID_EVENTS = ("started", "permission_required", "progress", "completed", "error")
TERMINAL_EVENTS = ("completed", "error")
# event -> registry status to sync as a best-effort side effect
EVENT_TO_STATUS = {
"started": "running",
"completed": "completed",
"error": "error",
}
CONNECT_ACK_TIMEOUT = 10 # seconds to wait for CONNACK
PUBLISH_ACK_TIMEOUT = 5 # seconds to wait for QoS-1 PUBACK
def build_payload(
job_id: str,
seq: int,
event: str,
detail: str,
data: Optional[Dict[str, Any]],
auth_token: Optional[str],
) -> Dict[str, Any]:
payload: Dict[str, Any] = {
"schema_version": SCHEMA_VERSION,
"seq": seq,
"job_id": job_id,
"event": event,
"timestamp": mqtt_common._utcnow(),
"detail": detail,
"data": dict(data) if data else {},
}
# Production: carry the per-job auth token so the subscriber can verify the
# publisher. The token is compared in plain text (bearer-token style) by the
# subscriber — NOT an HMAC. See SKILL.md "Auth token" and PLAN 8.2. The
# registry stores the per-job token in `auth_token`; only include it on the
# wire when set so the public broker (no auth) doesn't leak anything.
if auth_token:
sign_payload = {k: v for k, v in payload.items() if k != "data"}
sign_payload["data"] = {k: v for k, v in payload.get("data", {}).items() if k != "hmac_sig"}
msg = json.dumps(sign_payload, sort_keys=True, separators=(",", ":")).encode()
sig = hmac.new(auth_token.encode(), msg, hashlib.sha256).hexdigest()
payload["data"]["hmac_sig"] = sig
return payload
def _publish_once(config, topic: str, body: bytes, retain: bool) -> None:
"""Connect, publish one QoS-1 message, wait for the broker ACK, disconnect.
Raises on any failure so ``with_retry`` can re-run the whole sequence (a
fresh connection per attempt is the robust choice for a PoC)."""
client = make_client("publisher", config)
connected = {"rc": None}
def on_connect(_c, _u, _flags, reason_code, _props):
connected["rc"] = reason_code
client.on_connect = on_connect
client.connect(config.host, config.port, config.keepalive)
client.loop_start()
try:
# Wait for CONNACK so we fail fast on auth/TLS errors.
deadline = time.monotonic() + CONNECT_ACK_TIMEOUT
while connected["rc"] is None and time.monotonic() < deadline:
time.sleep(0.05)
if connected["rc"] is None:
raise TimeoutError("no CONNACK from broker")
if mqtt_common.reason_code_value(connected["rc"]) != 0:
raise ConnectionError(f"broker refused connection: rc={connected['rc']}")
info = client.publish(topic, payload=body, qos=1, retain=retain)
info.wait_for_publish(timeout=PUBLISH_ACK_TIMEOUT)
if not info.is_published():
raise TimeoutError("publish not acknowledged within timeout")
finally:
client.loop_stop()
try:
client.disconnect()
except Exception: # pragma: no cover - disconnect best effort
pass
def _resolve_job_id(args) -> Optional[str]:
if args.pick_pending:
return registry.pick_pending(args.agent_session, args.registry_dir)
return args.job
def main(argv=None) -> int:
parser = argparse.ArgumentParser(description="Publish a Job event to MQTT")
target = parser.add_mutually_exclusive_group(required=True)
target.add_argument("--job", help="job id to publish for")
target.add_argument("--pick-pending", action="store_true",
help="auto-select a pending job for --agent-session")
parser.add_argument("--agent-session", default="tmux:claude",
help="session label used with --pick-pending")
parser.add_argument("--event", default="progress", choices=VALID_EVENTS)
parser.add_argument("--detail", default="")
parser.add_argument("--data", default=None, help="optional JSON object string")
parser.add_argument("--retained", action="store_true",
help="force retain=True (auto for completed/error)")
parser.add_argument("--registry-dir", default=DEFAULT_REGISTRY_DIR)
parser.add_argument("--attempts", type=int, default=3)
parser.add_argument("-v", "--verbose", action="store_true")
args = parser.parse_args(argv)
mqtt_common.setup_logging(logging.DEBUG if args.verbose else logging.WARNING)
# --- parse optional data JSON (parameter error -> exit 1) ---
data: Optional[Dict[str, Any]] = None
if args.data:
try:
data = json.loads(args.data)
if not isinstance(data, dict):
raise ValueError("--data must be a JSON object")
except (ValueError, json.JSONDecodeError) as exc:
logger.error("invalid --data: %s", exc)
return 1
job_id = _resolve_job_id(args)
if not job_id:
logger.error("no job to publish for (unknown --job or no pending job)")
return 1
try:
job = load_job(job_id, args.registry_dir)
except FileNotFoundError as exc:
logger.error("%s", exc)
return 1
config = broker_config_from_job(job)
topic = job.get("topic_prefix")
topic = f"{topic}/events" if topic else events_topic_for(job_id)
seq = next_seq(job_id, args.registry_dir)
payload = build_payload(
job_id=job_id,
seq=seq,
event=args.event,
detail=args.detail,
data=data,
auth_token=job.get("auth_token"),
)
body = json.dumps(payload, ensure_ascii=False).encode("utf-8")
retain = args.retained or args.event in TERMINAL_EVENTS
publish = with_retry(
_publish_once,
attempts=args.attempts,
exceptions=(OSError, TimeoutError, ConnectionError, ValueError),
)
try:
publish(config, topic, body, retain)
except Exception as exc:
logger.error("publish failed after %d attempts: %s", args.attempts, exc)
return 2
# Persistent audit log: record the exact payload we put on the wire so the
# publish is reproducible from the log alone. Best-effort (isolated inside
# append_event) — never fails the publish.
mqtt_common.append_event(job_id, {
"event": "published",
"source_event": args.event,
"seq": seq,
"topic": topic,
"retain": retain,
"timestamp": payload["timestamp"],
"detail": args.detail,
"payload": payload,
})
# Best-effort side effects: registry status sync + (debug) event log. Never
# fail the publish on these.
registry.append_event(job_id, args.registry_dir, payload)
new_status = EVENT_TO_STATUS.get(args.event)
if new_status:
try:
mqtt_common.update_job_status(job_id, args.registry_dir, status=new_status)
except Exception as exc: # pragma: no cover - best effort
logger.warning("status sync failed: %s", exc)
logger.info("published %s seq=%d job=%s retain=%s", args.event, seq, job_id, retain)
return 0
if __name__ == "__main__":
sys.exit(main())