feat(delegate-job): add subscriber auto-reconnect (FW-01) + HMAC-SHA256 event signing (FW-05)
FW-01: job_subscriber.py now has on_disconnect callback (5-arg paho v2 signature), reconnect_delay_set(1,16) for exponential backoff, and with_retry-wrapped initial connect (5 attempts). paho loop_start() handles auto-reconnect internally. FW-05: publish_event.py signs payloads with HMAC-SHA256 using auth_token as key (replaces plaintext token in wire). mqtt_common.py adds verify_hmac() helper using hmac.compare_digest (timing-safe). job_subscriber.py validates incoming events via verify_hmac. PoC mode (auth_token=None) skips verification — backward compatible. Reviewed by agy-existing (PASS) and claude-existing (FAIL: on_disconnect 4-arg signature → fixed to 5-arg matching paho v2 CallbackAPIVersion).
This commit is contained in:
@@ -84,11 +84,9 @@ class _Watcher:
|
|||||||
return
|
return
|
||||||
# --- production auth check: data.auth_token must match if expected ---
|
# --- production auth check: data.auth_token must match if expected ---
|
||||||
expected_token = self.tokens.get(jid)
|
expected_token = self.tokens.get(jid)
|
||||||
if expected_token is not None:
|
if not mqtt_common.verify_hmac(payload, expected_token):
|
||||||
got = (payload.get("data") or {}).get("auth_token")
|
logger.warning("drop event for job %s: auth_token mismatch", jid)
|
||||||
if got != expected_token:
|
return
|
||||||
logger.warning("drop event for job %s: auth_token mismatch", jid)
|
|
||||||
return
|
|
||||||
# Persistent audit log from the *subscriber's* vantage point: every event
|
# Persistent audit log from the *subscriber's* vantage point: every event
|
||||||
# that survives defensive parsing is recorded here, including ones a
|
# that survives defensive parsing is recorded here, including ones a
|
||||||
# different host published. This is the external-observer record that
|
# different host published. This is the external-observer record that
|
||||||
@@ -170,8 +168,18 @@ def main(argv=None) -> int:
|
|||||||
_c.subscribe(topic, qos=1)
|
_c.subscribe(topic, qos=1)
|
||||||
logger.info("subscribed to %s", topic)
|
logger.info("subscribed to %s", topic)
|
||||||
|
|
||||||
|
def on_disconnect(_c, _u, _flags, reason_code, _props):
|
||||||
|
rc = mqtt_common.reason_code_value(reason_code)
|
||||||
|
if rc != 0:
|
||||||
|
logger.warning("broker disconnected (rc=%s); will retry reconnect", reason_code)
|
||||||
|
|
||||||
client.on_connect = on_connect
|
client.on_connect = on_connect
|
||||||
client.connect(config.host, config.port, config.keepalive)
|
client.on_disconnect = on_disconnect
|
||||||
|
client.reconnect_delay_set(min_delay=1, max_delay=16)
|
||||||
|
mqtt_common.with_retry(
|
||||||
|
lambda: client.connect(config.host, config.port, config.keepalive),
|
||||||
|
attempts=5, base_delay=1.0, max_delay=16.0
|
||||||
|
)()
|
||||||
client.loop_start()
|
client.loop_start()
|
||||||
|
|
||||||
terminal: Dict[str, str] = {} # job_id -> "completed"/"error"
|
terminal: Dict[str, str] = {} # job_id -> "completed"/"error"
|
||||||
|
|||||||
@@ -16,6 +16,8 @@ values, never code (see references/mqtt-broker-setup.md).
|
|||||||
from __future__ import annotations
|
from __future__ import annotations
|
||||||
|
|
||||||
import functools
|
import functools
|
||||||
|
import hashlib
|
||||||
|
import hmac
|
||||||
import json
|
import json
|
||||||
import logging
|
import logging
|
||||||
import os
|
import os
|
||||||
@@ -193,6 +195,20 @@ def reason_code_value(rc: Any) -> int:
|
|||||||
return int(getattr(rc, "value", rc))
|
return int(getattr(rc, "value", rc))
|
||||||
|
|
||||||
|
|
||||||
|
def verify_hmac(payload: dict, auth_token: Optional[str]) -> bool:
|
||||||
|
"""Verify HMAC-SHA256 signature. Returns True if valid or no token set."""
|
||||||
|
if not auth_token:
|
||||||
|
return True # PoC mode — no auth
|
||||||
|
sig = payload.get("data", {}).get("hmac_sig")
|
||||||
|
if not sig:
|
||||||
|
return False
|
||||||
|
sign_payload = {k: v for k, v in payload.items() if k != "data"}
|
||||||
|
sign_payload["data"] = {k: v for k, v in payload.get("data", {}).items() if k != "hmac_sig"}
|
||||||
|
msg = json.dumps(sign_payload, sort_keys=True, separators=(",", ":")).encode()
|
||||||
|
expected = hmac.new(auth_token.encode(), msg, hashlib.sha256).hexdigest()
|
||||||
|
return hmac.compare_digest(sig, expected)
|
||||||
|
|
||||||
|
|
||||||
def topic_prefix_for(job_id: str, root: str = DEFAULT_TOPIC_ROOT) -> str:
|
def topic_prefix_for(job_id: str, root: str = DEFAULT_TOPIC_ROOT) -> str:
|
||||||
return f"{root}/{job_id}"
|
return f"{root}/{job_id}"
|
||||||
|
|
||||||
|
|||||||
@@ -22,6 +22,8 @@ Usage:
|
|||||||
from __future__ import annotations
|
from __future__ import annotations
|
||||||
|
|
||||||
import argparse
|
import argparse
|
||||||
|
import hashlib
|
||||||
|
import hmac
|
||||||
import json
|
import json
|
||||||
import logging
|
import logging
|
||||||
import sys
|
import sys
|
||||||
@@ -79,7 +81,11 @@ def build_payload(
|
|||||||
# registry stores the per-job token in `auth_token`; only include it on the
|
# registry stores the per-job token in `auth_token`; only include it on the
|
||||||
# wire when set so the public broker (no auth) doesn't leak anything.
|
# wire when set so the public broker (no auth) doesn't leak anything.
|
||||||
if auth_token:
|
if auth_token:
|
||||||
payload["data"]["auth_token"] = auth_token
|
sign_payload = {k: v for k, v in payload.items() if k != "data"}
|
||||||
|
sign_payload["data"] = {k: v for k, v in payload.get("data", {}).items() if k != "hmac_sig"}
|
||||||
|
msg = json.dumps(sign_payload, sort_keys=True, separators=(",", ":")).encode()
|
||||||
|
sig = hmac.new(auth_token.encode(), msg, hashlib.sha256).hexdigest()
|
||||||
|
payload["data"]["hmac_sig"] = sig
|
||||||
return payload
|
return payload
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user