feat(monitor): consolidate per-job watchdogs into shared wildcard subscriber (FW-W3)
This commit is contained in:
@@ -55,16 +55,32 @@ if [ "$SUBSCRIBE" = "1" ]; then
|
||||
# The MQTT subscribe loop exits 3 to signal "broker unavailable → poll instead".
|
||||
set +e
|
||||
YAML_PATH="$AGENT_SESSIONS_YAML" HOME_DIR="$HOME_DIR" CLAUDE_PROJECT_DIR="$CLAUDE_PROJECT_DIR" LOCAL_BIN="$LOCAL_BIN" \
|
||||
SUB_TIMEOUT="$SUB_TIMEOUT" SUB_IDLE_TIMEOUT="$SUB_IDLE_TIMEOUT" \
|
||||
WORKSPACE_ROOT="$WORKSPACE_ROOT" SUB_TIMEOUT="$SUB_TIMEOUT" SUB_IDLE_TIMEOUT="$SUB_IDLE_TIMEOUT" \
|
||||
SKILLS_DIR="$SKILLS_DIR" LIB_SH="$LIB_SH" \
|
||||
"$PYBIN" - <<'PYEOF'
|
||||
import os, sys, json, time, subprocess
|
||||
|
||||
lib_sh = os.environ.get('LIB_SH', '')
|
||||
skills_dir = os.environ.get('SKILLS_DIR', '')
|
||||
yaml_path = os.environ.get('YAML_PATH', '')
|
||||
workspace_root = os.environ.get('WORKSPACE_ROOT', '')
|
||||
timeout = int(os.environ.get('SUB_TIMEOUT', '0') or '0') # 0 = no overall timeout
|
||||
idle_timeout = int(os.environ.get('SUB_IDLE_TIMEOUT', '3600') or '0') # 0 = no idle timeout
|
||||
|
||||
# Prevent duplicate wildcard subscribers for this workspace (concurrency race)
|
||||
import fcntl
|
||||
lock_file_path = os.path.join(workspace_root or '.', '.mam', 'monitor.lock')
|
||||
try:
|
||||
os.makedirs(os.path.dirname(lock_file_path), exist_ok=True)
|
||||
lock_file = open(lock_file_path, 'w')
|
||||
fcntl.flock(lock_file, fcntl.LOCK_EX | fcntl.LOCK_NB)
|
||||
except BlockingIOError:
|
||||
print("MQTT Monitor: another subscriber is already running for this workspace. Exiting.", flush=True)
|
||||
sys.exit(0)
|
||||
except Exception as e:
|
||||
print(f"MQTT Monitor: failed to acquire monitor lock ({e}). Exiting.", flush=True)
|
||||
sys.exit(1)
|
||||
|
||||
# Locate skills/multi-agent-mux-delegate-job/scripts to import mqtt_common — relative first, then
|
||||
# an upward walk from cwd. No hardcoded absolute path (review item 6).
|
||||
cand = os.path.join(skills_dir, 'multi-agent-mux-delegate-job', 'scripts') if skills_dir else ''
|
||||
@@ -85,6 +101,7 @@ else:
|
||||
d = os.path.dirname(d)
|
||||
|
||||
import mqtt_common
|
||||
import registry
|
||||
|
||||
# Executed INSIDE lib.sh::atomic_dump_yaml (system python3 + PyYAML), under the
|
||||
# YAML flock with schema-validate + .bak (review item 5). Marks matching running
|
||||
@@ -132,6 +149,7 @@ def handle_terminal(jid, event):
|
||||
|
||||
|
||||
state = {'last_msg': time.time(), 'connected': False, 'failed': False}
|
||||
last_seqs = {}
|
||||
|
||||
|
||||
def on_message(_client, _userdata, msg):
|
||||
@@ -140,7 +158,48 @@ def on_message(_client, _userdata, msg):
|
||||
payload = json.loads(msg.payload.decode("utf-8"))
|
||||
jid = payload.get("job_id")
|
||||
event = payload.get("event")
|
||||
if jid and event in ("completed", "error"):
|
||||
if not jid or not event:
|
||||
return
|
||||
|
||||
if workspace_root:
|
||||
registry_dir = os.path.join(workspace_root, '.mam', 'jobs')
|
||||
else:
|
||||
yaml_dir = os.path.dirname(yaml_path) if yaml_path else ""
|
||||
registry_dir = os.path.join(yaml_dir, 'jobs') if yaml_dir else '.mam/jobs'
|
||||
|
||||
try:
|
||||
job = registry.load_job(jid, registry_dir)
|
||||
except FileNotFoundError:
|
||||
# Silently ignore events for jobs not in the local registry
|
||||
return
|
||||
|
||||
expected_token = job.get("auth_token")
|
||||
if not mqtt_common.verify_hmac(payload, expected_token):
|
||||
print(f"MQTT Monitor: drop event for job {jid}: HMAC verify failed", flush=True)
|
||||
return
|
||||
|
||||
seq = payload.get("seq")
|
||||
if seq is None or not isinstance(seq, int):
|
||||
print(f"MQTT Monitor: drop event for job {jid}: missing or invalid seq", flush=True)
|
||||
return
|
||||
if seq <= last_seqs.get(jid, 0):
|
||||
print(f"MQTT Monitor: drop event for job {jid}: seq {seq} not monotonic (last {last_seqs.get(jid, 0)})", flush=True)
|
||||
return
|
||||
last_seqs[jid] = seq
|
||||
|
||||
# Append the event to events.ndjson audit trail
|
||||
mqtt_common.append_event(jid, {
|
||||
"event": "received",
|
||||
"source_event": event,
|
||||
"seq": seq,
|
||||
"topic": msg.topic,
|
||||
"timestamp": payload.get("timestamp"),
|
||||
"detail": payload.get("detail", ""),
|
||||
})
|
||||
|
||||
print(f"MQTT Monitor: recorded event {event} for job {jid} (seq={seq})", flush=True)
|
||||
|
||||
if event in ("completed", "error"):
|
||||
print(f"MQTT Monitor: received terminal event {event} for job {jid}", flush=True)
|
||||
handle_terminal(jid, event)
|
||||
except Exception as e:
|
||||
|
||||
Reference in New Issue
Block a user