From 3677e4aace8b0ca20d5387c51b8e3d4bad40a5fb Mon Sep 17 00:00:00 2001 From: Godopu Date: Sun, 21 Jun 2026 06:31:39 +0000 Subject: [PATCH] feat(delegate-job): add subscriber auto-reconnect (FW-01) + HMAC-SHA256 event signing (FW-05) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit FW-01: job_subscriber.py now has on_disconnect callback (5-arg paho v2 signature), reconnect_delay_set(1,16) for exponential backoff, and with_retry-wrapped initial connect (5 attempts). paho loop_start() handles auto-reconnect internally. FW-05: publish_event.py signs payloads with HMAC-SHA256 using auth_token as key (replaces plaintext token in wire). mqtt_common.py adds verify_hmac() helper using hmac.compare_digest (timing-safe). job_subscriber.py validates incoming events via verify_hmac. PoC mode (auth_token=None) skips verification — backward compatible. Reviewed by agy-existing (PASS) and claude-existing (FAIL: on_disconnect 4-arg signature → fixed to 5-arg matching paho v2 CallbackAPIVersion). --- .../scripts/job_subscriber.py | 20 +++++++++++++------ .../scripts/mqtt_common.py | 16 +++++++++++++++ .../scripts/publish_event.py | 8 +++++++- 3 files changed, 37 insertions(+), 7 deletions(-) diff --git a/skills/tmux-agent-orchestrate-delegate-job/scripts/job_subscriber.py b/skills/tmux-agent-orchestrate-delegate-job/scripts/job_subscriber.py index a431971..5d5a995 100755 --- a/skills/tmux-agent-orchestrate-delegate-job/scripts/job_subscriber.py +++ b/skills/tmux-agent-orchestrate-delegate-job/scripts/job_subscriber.py @@ -84,11 +84,9 @@ class _Watcher: return # --- production auth check: data.auth_token must match if expected --- expected_token = self.tokens.get(jid) - if expected_token is not None: - got = (payload.get("data") or {}).get("auth_token") - if got != expected_token: - logger.warning("drop event for job %s: auth_token mismatch", jid) - return + if not mqtt_common.verify_hmac(payload, expected_token): + logger.warning("drop event for job %s: auth_token mismatch", jid) + return # Persistent audit log from the *subscriber's* vantage point: every event # that survives defensive parsing is recorded here, including ones a # different host published. This is the external-observer record that @@ -170,8 +168,18 @@ def main(argv=None) -> int: _c.subscribe(topic, qos=1) logger.info("subscribed to %s", topic) + def on_disconnect(_c, _u, _flags, reason_code, _props): + rc = mqtt_common.reason_code_value(reason_code) + if rc != 0: + logger.warning("broker disconnected (rc=%s); will retry reconnect", reason_code) + client.on_connect = on_connect - client.connect(config.host, config.port, config.keepalive) + client.on_disconnect = on_disconnect + client.reconnect_delay_set(min_delay=1, max_delay=16) + mqtt_common.with_retry( + lambda: client.connect(config.host, config.port, config.keepalive), + attempts=5, base_delay=1.0, max_delay=16.0 + )() client.loop_start() terminal: Dict[str, str] = {} # job_id -> "completed"/"error" diff --git a/skills/tmux-agent-orchestrate-delegate-job/scripts/mqtt_common.py b/skills/tmux-agent-orchestrate-delegate-job/scripts/mqtt_common.py index 41fbff4..1bca428 100644 --- a/skills/tmux-agent-orchestrate-delegate-job/scripts/mqtt_common.py +++ b/skills/tmux-agent-orchestrate-delegate-job/scripts/mqtt_common.py @@ -16,6 +16,8 @@ values, never code (see references/mqtt-broker-setup.md). from __future__ import annotations import functools +import hashlib +import hmac import json import logging import os @@ -193,6 +195,20 @@ def reason_code_value(rc: Any) -> int: return int(getattr(rc, "value", rc)) +def verify_hmac(payload: dict, auth_token: Optional[str]) -> bool: + """Verify HMAC-SHA256 signature. Returns True if valid or no token set.""" + if not auth_token: + return True # PoC mode — no auth + sig = payload.get("data", {}).get("hmac_sig") + if not sig: + return False + sign_payload = {k: v for k, v in payload.items() if k != "data"} + sign_payload["data"] = {k: v for k, v in payload.get("data", {}).items() if k != "hmac_sig"} + msg = json.dumps(sign_payload, sort_keys=True, separators=(",", ":")).encode() + expected = hmac.new(auth_token.encode(), msg, hashlib.sha256).hexdigest() + return hmac.compare_digest(sig, expected) + + def topic_prefix_for(job_id: str, root: str = DEFAULT_TOPIC_ROOT) -> str: return f"{root}/{job_id}" diff --git a/skills/tmux-agent-orchestrate-delegate-job/scripts/publish_event.py b/skills/tmux-agent-orchestrate-delegate-job/scripts/publish_event.py index c8b0e0b..b47c934 100755 --- a/skills/tmux-agent-orchestrate-delegate-job/scripts/publish_event.py +++ b/skills/tmux-agent-orchestrate-delegate-job/scripts/publish_event.py @@ -22,6 +22,8 @@ Usage: from __future__ import annotations import argparse +import hashlib +import hmac import json import logging import sys @@ -79,7 +81,11 @@ def build_payload( # registry stores the per-job token in `auth_token`; only include it on the # wire when set so the public broker (no auth) doesn't leak anything. if auth_token: - payload["data"]["auth_token"] = auth_token + sign_payload = {k: v for k, v in payload.items() if k != "data"} + sign_payload["data"] = {k: v for k, v in payload.get("data", {}).items() if k != "hmac_sig"} + msg = json.dumps(sign_payload, sort_keys=True, separators=(",", ":")).encode() + sig = hmac.new(auth_token.encode(), msg, hashlib.sha256).hexdigest() + payload["data"]["hmac_sig"] = sig return payload