3677e4aace
FW-01: job_subscriber.py now has on_disconnect callback (5-arg paho v2 signature), reconnect_delay_set(1,16) for exponential backoff, and with_retry-wrapped initial connect (5 attempts). paho loop_start() handles auto-reconnect internally. FW-05: publish_event.py signs payloads with HMAC-SHA256 using auth_token as key (replaces plaintext token in wire). mqtt_common.py adds verify_hmac() helper using hmac.compare_digest (timing-safe). job_subscriber.py validates incoming events via verify_hmac. PoC mode (auth_token=None) skips verification — backward compatible. Reviewed by agy-existing (PASS) and claude-existing (FAIL: on_disconnect 4-arg signature → fixed to 5-arg matching paho v2 CallbackAPIVersion).
232 lines
8.4 KiB
Python
Executable File
232 lines
8.4 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
"""publish_event.py — the single entry point for emitting a Job event.
|
|
|
|
Loads the job record from the registry, resolves its broker, assigns the next
|
|
monotonic ``seq``, builds the schema-v1 JSON payload, and publishes it to
|
|
``<topic_prefix>/events`` over QoS 1 with exponential-backoff retry.
|
|
|
|
Silent by design: nothing is printed to stdout. Diagnostics go to stderr via
|
|
logging. Terminal events (``completed``/``error``) publish with retain=True so
|
|
a late subscriber still observes the final state (production hardening).
|
|
|
|
Exit codes:
|
|
0 published successfully
|
|
1 parameter / registry error (bad args, unknown job, no pending job)
|
|
2 publish failed after retries (network / broker / ACK timeout)
|
|
|
|
Usage:
|
|
publish_event.py --job <id> --event started [--detail "..."] [--data '{...}']
|
|
publish_event.py --pick-pending --agent-session tmux:claude --event completed
|
|
publish_event.py --job <id> --event completed --retained
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import argparse
|
|
import hashlib
|
|
import hmac
|
|
import json
|
|
import logging
|
|
import sys
|
|
import time
|
|
from typing import Any, Dict, Optional
|
|
|
|
import mqtt_common
|
|
import registry
|
|
from mqtt_common import (
|
|
DEFAULT_REGISTRY_DIR,
|
|
SCHEMA_VERSION,
|
|
broker_config_from_job,
|
|
events_topic_for,
|
|
load_job,
|
|
make_client,
|
|
next_seq,
|
|
with_retry,
|
|
)
|
|
|
|
logger = logging.getLogger("delegate_job.publish_event")
|
|
|
|
VALID_EVENTS = ("started", "permission_required", "progress", "completed", "error")
|
|
TERMINAL_EVENTS = ("completed", "error")
|
|
# event -> registry status to sync as a best-effort side effect
|
|
EVENT_TO_STATUS = {
|
|
"started": "running",
|
|
"completed": "completed",
|
|
"error": "error",
|
|
}
|
|
|
|
CONNECT_ACK_TIMEOUT = 10 # seconds to wait for CONNACK
|
|
PUBLISH_ACK_TIMEOUT = 5 # seconds to wait for QoS-1 PUBACK
|
|
|
|
|
|
def build_payload(
|
|
job_id: str,
|
|
seq: int,
|
|
event: str,
|
|
detail: str,
|
|
data: Optional[Dict[str, Any]],
|
|
auth_token: Optional[str],
|
|
) -> Dict[str, Any]:
|
|
payload: Dict[str, Any] = {
|
|
"schema_version": SCHEMA_VERSION,
|
|
"seq": seq,
|
|
"job_id": job_id,
|
|
"event": event,
|
|
"timestamp": mqtt_common._utcnow(),
|
|
"detail": detail,
|
|
"data": dict(data) if data else {},
|
|
}
|
|
# Production: carry the per-job auth token so the subscriber can verify the
|
|
# publisher. The token is compared in plain text (bearer-token style) by the
|
|
# subscriber — NOT an HMAC. See SKILL.md "Auth token" and PLAN 8.2. The
|
|
# registry stores the per-job token in `auth_token`; only include it on the
|
|
# wire when set so the public broker (no auth) doesn't leak anything.
|
|
if auth_token:
|
|
sign_payload = {k: v for k, v in payload.items() if k != "data"}
|
|
sign_payload["data"] = {k: v for k, v in payload.get("data", {}).items() if k != "hmac_sig"}
|
|
msg = json.dumps(sign_payload, sort_keys=True, separators=(",", ":")).encode()
|
|
sig = hmac.new(auth_token.encode(), msg, hashlib.sha256).hexdigest()
|
|
payload["data"]["hmac_sig"] = sig
|
|
return payload
|
|
|
|
|
|
def _publish_once(config, topic: str, body: bytes, retain: bool) -> None:
|
|
"""Connect, publish one QoS-1 message, wait for the broker ACK, disconnect.
|
|
|
|
Raises on any failure so ``with_retry`` can re-run the whole sequence (a
|
|
fresh connection per attempt is the robust choice for a PoC)."""
|
|
client = make_client("publisher", config)
|
|
connected = {"rc": None}
|
|
|
|
def on_connect(_c, _u, _flags, reason_code, _props):
|
|
connected["rc"] = reason_code
|
|
|
|
client.on_connect = on_connect
|
|
client.connect(config.host, config.port, config.keepalive)
|
|
client.loop_start()
|
|
try:
|
|
# Wait for CONNACK so we fail fast on auth/TLS errors.
|
|
deadline = time.monotonic() + CONNECT_ACK_TIMEOUT
|
|
while connected["rc"] is None and time.monotonic() < deadline:
|
|
time.sleep(0.05)
|
|
if connected["rc"] is None:
|
|
raise TimeoutError("no CONNACK from broker")
|
|
if mqtt_common.reason_code_value(connected["rc"]) != 0:
|
|
raise ConnectionError(f"broker refused connection: rc={connected['rc']}")
|
|
|
|
info = client.publish(topic, payload=body, qos=1, retain=retain)
|
|
info.wait_for_publish(timeout=PUBLISH_ACK_TIMEOUT)
|
|
if not info.is_published():
|
|
raise TimeoutError("publish not acknowledged within timeout")
|
|
finally:
|
|
client.loop_stop()
|
|
try:
|
|
client.disconnect()
|
|
except Exception: # pragma: no cover - disconnect best effort
|
|
pass
|
|
|
|
|
|
def _resolve_job_id(args) -> Optional[str]:
|
|
if args.pick_pending:
|
|
return registry.pick_pending(args.agent_session, args.registry_dir)
|
|
return args.job
|
|
|
|
|
|
def main(argv=None) -> int:
|
|
parser = argparse.ArgumentParser(description="Publish a Job event to MQTT")
|
|
target = parser.add_mutually_exclusive_group(required=True)
|
|
target.add_argument("--job", help="job id to publish for")
|
|
target.add_argument("--pick-pending", action="store_true",
|
|
help="auto-select a pending job for --agent-session")
|
|
parser.add_argument("--agent-session", default="tmux:claude",
|
|
help="session label used with --pick-pending")
|
|
parser.add_argument("--event", default="progress", choices=VALID_EVENTS)
|
|
parser.add_argument("--detail", default="")
|
|
parser.add_argument("--data", default=None, help="optional JSON object string")
|
|
parser.add_argument("--retained", action="store_true",
|
|
help="force retain=True (auto for completed/error)")
|
|
parser.add_argument("--registry-dir", default=DEFAULT_REGISTRY_DIR)
|
|
parser.add_argument("--attempts", type=int, default=3)
|
|
parser.add_argument("-v", "--verbose", action="store_true")
|
|
args = parser.parse_args(argv)
|
|
|
|
mqtt_common.setup_logging(logging.DEBUG if args.verbose else logging.WARNING)
|
|
|
|
# --- parse optional data JSON (parameter error -> exit 1) ---
|
|
data: Optional[Dict[str, Any]] = None
|
|
if args.data:
|
|
try:
|
|
data = json.loads(args.data)
|
|
if not isinstance(data, dict):
|
|
raise ValueError("--data must be a JSON object")
|
|
except (ValueError, json.JSONDecodeError) as exc:
|
|
logger.error("invalid --data: %s", exc)
|
|
return 1
|
|
|
|
job_id = _resolve_job_id(args)
|
|
if not job_id:
|
|
logger.error("no job to publish for (unknown --job or no pending job)")
|
|
return 1
|
|
|
|
try:
|
|
job = load_job(job_id, args.registry_dir)
|
|
except FileNotFoundError as exc:
|
|
logger.error("%s", exc)
|
|
return 1
|
|
|
|
config = broker_config_from_job(job)
|
|
topic = job.get("topic_prefix")
|
|
topic = f"{topic}/events" if topic else events_topic_for(job_id)
|
|
seq = next_seq(job_id, args.registry_dir)
|
|
payload = build_payload(
|
|
job_id=job_id,
|
|
seq=seq,
|
|
event=args.event,
|
|
detail=args.detail,
|
|
data=data,
|
|
auth_token=job.get("auth_token"),
|
|
)
|
|
body = json.dumps(payload, ensure_ascii=False).encode("utf-8")
|
|
retain = args.retained or args.event in TERMINAL_EVENTS
|
|
|
|
publish = with_retry(
|
|
_publish_once,
|
|
attempts=args.attempts,
|
|
exceptions=(OSError, TimeoutError, ConnectionError, ValueError),
|
|
)
|
|
try:
|
|
publish(config, topic, body, retain)
|
|
except Exception as exc:
|
|
logger.error("publish failed after %d attempts: %s", args.attempts, exc)
|
|
return 2
|
|
|
|
# Persistent audit log: record the exact payload we put on the wire so the
|
|
# publish is reproducible from the log alone. Best-effort (isolated inside
|
|
# append_event) — never fails the publish.
|
|
mqtt_common.append_event(job_id, {
|
|
"event": "published",
|
|
"source_event": args.event,
|
|
"seq": seq,
|
|
"topic": topic,
|
|
"retain": retain,
|
|
"timestamp": payload["timestamp"],
|
|
"detail": args.detail,
|
|
"payload": payload,
|
|
})
|
|
|
|
# Best-effort side effects: registry status sync + (debug) event log. Never
|
|
# fail the publish on these.
|
|
registry.append_event(job_id, args.registry_dir, payload)
|
|
new_status = EVENT_TO_STATUS.get(args.event)
|
|
if new_status:
|
|
try:
|
|
mqtt_common.update_job_status(job_id, args.registry_dir, status=new_status)
|
|
except Exception as exc: # pragma: no cover - best effort
|
|
logger.warning("status sync failed: %s", exc)
|
|
|
|
logger.info("published %s seq=%d job=%s retain=%s", args.event, seq, job_id, retain)
|
|
return 0
|
|
|
|
|
|
if __name__ == "__main__":
|
|
sys.exit(main())
|