feat(security): implement FW-N5, FW-N6, FW-N7 (HMAC-SHA256 protocol docs, auto-generate token, replay attack defense)
This commit is contained in:
@@ -83,14 +83,10 @@ The payload shape is unchanged; the transport and trust model tighten. See
|
||||
|
||||
- **Auth / ACL** — username/password + per-topic ACL. `jobs/+/events` publish is
|
||||
granted to the worker credential, subscribe to the Hermes credential.
|
||||
- **`auth_token` (the bonus field)** — each job record carries a per-job
|
||||
`auth_token` (`secrets.token_urlsafe(32)`). The publisher copies it into
|
||||
**`data.auth_token`**; the subscriber compares it against the registry's
|
||||
expected token and **drops mismatches**. This is an integrity check on top of
|
||||
the broker ACL, useful while still on a shared/public broker.
|
||||
- **HMAC Signature Verification (`data.hmac_sig`)** — to authenticate the publisher and verify message integrity without exposing the raw secret token over the wire, each job record contains a per-job `auth_token` (`secrets.token_urlsafe(32)`). The publisher computes an HMAC-SHA256 signature over the serialized payload (excluding `data.hmac_sig` itself) using the `auth_token` as the key, and appends it to **`data.hmac_sig`**. The subscriber reconstructs this signature and **drops any message that does not match or lacks a valid signature**.
|
||||
|
||||
```json
|
||||
{ "...": "...", "data": { "auth_token": "9f3c…", "build_id": "42" } }
|
||||
{ "...": "...", "data": { "hmac_sig": "d2f3...", "build_id": "42" } }
|
||||
```
|
||||
|
||||
- **TLS** — port 8883 + private CA. Toggled with `MQTT_TLS=1` (+ `MQTT_CA_CERTS`);
|
||||
|
||||
@@ -63,6 +63,7 @@ class _Watcher:
|
||||
self.events: "queue.Queue[Tuple[str, Dict[str, Any]]]" = queue.Queue()
|
||||
self.expected = set(expected_job_ids)
|
||||
self.tokens = expected_tokens # job_id -> expected auth_token (or None)
|
||||
self.last_seq: Dict[str, int] = {jid: 0 for jid in expected_job_ids}
|
||||
|
||||
def on_message(self, _client, _userdata, msg) -> None:
|
||||
# --- defensive parsing -------------------------------------------
|
||||
@@ -87,6 +88,16 @@ class _Watcher:
|
||||
if not mqtt_common.verify_hmac(payload, expected_token):
|
||||
logger.warning("drop event for job %s: HMAC verify failed", jid)
|
||||
return
|
||||
# --- replay attack defense: check monotonic sequence ---
|
||||
seq = payload.get("seq")
|
||||
if seq is None or not isinstance(seq, int):
|
||||
logger.warning("drop event for job %s: missing or invalid seq", jid)
|
||||
return
|
||||
if seq <= self.last_seq.get(jid, 0):
|
||||
logger.warning("drop event for job %s: seq %d is not monotonically increasing (last %d)",
|
||||
jid, seq, self.last_seq.get(jid, 0))
|
||||
return
|
||||
self.last_seq[jid] = seq
|
||||
# Persistent audit log from the *subscriber's* vantage point: every event
|
||||
# that survives defensive parsing is recorded here, including ones a
|
||||
# different host published. This is the external-observer record that
|
||||
|
||||
@@ -68,6 +68,11 @@ def register_job(
|
||||
job_id = job_id or generate_job_id(bits)
|
||||
if broker is None:
|
||||
broker = broker_config_from_env().to_registry_block()
|
||||
if auth_token is None:
|
||||
# Auto-generate token if secure broker configuration (TLS or username) is detected
|
||||
if broker.get("tls") or broker.get("username"):
|
||||
import secrets
|
||||
auth_token = secrets.token_urlsafe(32)
|
||||
now = _utcnow()
|
||||
record: Dict[str, Any] = {
|
||||
"schema_version": SCHEMA_VERSION,
|
||||
@@ -191,6 +196,7 @@ def _build_parser() -> argparse.ArgumentParser:
|
||||
p_reg.add_argument("--idle-timeout", type=int, default=120)
|
||||
p_reg.add_argument("--bits", type=int, default=32, help="32 (PoC) or 128 (prod)")
|
||||
p_reg.add_argument("--artifact", action="append", default=[], dest="artifacts")
|
||||
p_reg.add_argument("--auth-token", default=None, help="HMAC auth token for the job (auto-generated if secure broker is detected)")
|
||||
|
||||
p_list = sub.add_parser("list", help="list jobs (optionally by status)")
|
||||
p_list.add_argument("--status", default=None)
|
||||
@@ -240,6 +246,7 @@ def main(argv: Optional[List[str]] = None) -> int:
|
||||
registry_dir=rd,
|
||||
expected_artifacts=args.artifacts,
|
||||
bits=args.bits,
|
||||
auth_token=args.auth_token,
|
||||
)
|
||||
print(job_id)
|
||||
return 0
|
||||
|
||||
Reference in New Issue
Block a user