fix(skills): claude review items 4-7 (subscribe timeout, atomic_dump_yaml, hardcoded paths, lifecycle helper)

Item 4: --subscribe gains --timeout/--idle-timeout (idle default raised
        120s->600s, 0=disable); connect-error AND non-zero CONNACK now fall
        back to a polling loop. SKILL.md matches actual behaviour.
Item 5: --subscribe terminal-event YAML writes routed through
        lib.sh::atomic_dump_yaml (flock + schema-validate + .bak).
Item 6: removed hardcoded /home/godopu16/PuKi fallbacks in lib.sh,
        status.sh (x2) and reconcile.sh; paths now BASH_SOURCE-relative.
Item 7: lib.sh::delegate_publish_event helper consolidates the 4 duplicated
        lifecycle publish blocks; delete cwd|jid parser replaced with JSON.

Also: subscribe loop runs under the project venv python (paho) and delegates
all YAML work to atomic_dump_yaml on system python3 (PyYAML), since neither
interpreter has both modules — the original env_python path could never import
paho. Items 3 + 8 out of scope (per user). Verified on -L claude-phase4-test
(kill-server after).

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
This commit is contained in:
2026-06-19 15:11:09 +00:00
parent 0eb1d94a9c
commit 06f076e9cc
7 changed files with 237 additions and 197 deletions
@@ -22,6 +22,11 @@ ONCE=0
EMIT_DIFF=0
DRY_RUN=0
SUBSCRIBE=0
# --subscribe controls (review item 4): 0 = no overall timeout; idle default 600s
# (raised from the old hardcoded 120s); idle 0 = never idle-out.
SUB_TIMEOUT=0
SUB_IDLE_TIMEOUT=600
POLL_INTERVAL="${RECONCILE_POLL_INTERVAL:-15}"
while [ $# -gt 0 ]; do
case "$1" in
@@ -29,7 +34,9 @@ while [ $# -gt 0 ]; do
--emit-diff) EMIT_DIFF=1; shift ;;
--dry-run) DRY_RUN=1; shift ;;
--subscribe) SUBSCRIBE=1; shift ;;
-h|--help) echo "Usage: $0 [--once] [--emit-diff] [--dry-run] [--subscribe]"; exit 0 ;;
--timeout) SUB_TIMEOUT="$2"; shift 2 ;;
--idle-timeout) SUB_IDLE_TIMEOUT="$2"; shift 2 ;;
-h|--help) echo "Usage: $0 [--once] [--emit-diff] [--dry-run] [--subscribe [--timeout N] [--idle-timeout N]]"; exit 0 ;;
*) echo "ERROR: unknown arg: $1" >&2; exit 2 ;;
esac
done
@@ -37,122 +44,176 @@ done
[ -f "$AGENT_SESSIONS_YAML" ] || { echo "ERROR: $AGENT_SESSIONS_YAML not found" >&2; exit 1; }
if [ "$SUBSCRIBE" = "1" ]; then
SUBSCRIBE_MODE=1 env_python "$AGENT_SESSIONS_YAML" <<'PYEOF'
import os, sys, json, fcntl, tempfile, subprocess
from datetime import datetime, timezone
import yaml
# Paths resolved relative to this script (review item 6): skills/ dir + lib.sh.
SKILLS_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")/../.." && pwd)"
LIB_SH="$SKILLS_DIR/lib.sh"
# MQTT client lives in the project venv (has paho). All YAML work is delegated
# to lib.sh::atomic_dump_yaml, which runs the system python3 (has PyYAML) — so
# no single interpreter needs both paho and PyYAML (review items 4/5/6).
PYBIN="$(_delegate_py_bin)"
yaml_path = os.environ['YAML_PATH']
home = os.environ['HOME_DIR']
# The MQTT subscribe loop exits 3 to signal "broker unavailable → poll instead".
set +e
YAML_PATH="$AGENT_SESSIONS_YAML" HOME_DIR="$HOME" \
SUB_TIMEOUT="$SUB_TIMEOUT" SUB_IDLE_TIMEOUT="$SUB_IDLE_TIMEOUT" \
SKILLS_DIR="$SKILLS_DIR" LIB_SH="$LIB_SH" \
"$PYBIN" - <<'PYEOF'
import os, sys, json, time, subprocess
# Add skills/delegate-job/scripts to path to import mqtt_common
script_dir = os.path.dirname(os.path.abspath(__file__)) if '__file__' in globals() else os.getcwd()
path_candidate = os.path.join('/home/godopu16/PuKi/laa/canary_projects/advanced_multi_agent', 'skills', 'delegate-job', 'scripts')
if os.path.isdir(path_candidate):
sys.path.append(path_candidate)
lib_sh = os.environ.get('LIB_SH', '')
skills_dir = os.environ.get('SKILLS_DIR', '')
timeout = int(os.environ.get('SUB_TIMEOUT', '0') or '0') # 0 = no overall timeout
idle_timeout = int(os.environ.get('SUB_IDLE_TIMEOUT', '600') or '0') # 0 = no idle timeout
# Locate skills/delegate-job/scripts to import mqtt_common — relative first, then
# an upward walk from cwd. No hardcoded absolute path (review item 6).
cand = os.path.join(skills_dir, 'delegate-job', 'scripts') if skills_dir else ''
if cand and os.path.isdir(cand):
sys.path.append(cand)
else:
d = script_dir
found = False
while d != '/' and d:
p = os.path.join(d, 'skills', 'delegate-job', 'scripts')
if os.path.isdir(p):
sys.path.append(p)
found = True
break
p2 = os.path.join(d, 'delegate-job', 'scripts')
if os.path.isdir(p2):
sys.path.append(p2)
found = True
d = os.getcwd()
while d and d != '/':
hit = None
for sub in (('skills', 'delegate-job', 'scripts'), ('delegate-job', 'scripts')):
p = os.path.join(d, *sub)
if os.path.isdir(p):
hit = p
break
if hit:
sys.path.append(hit)
break
d = os.path.dirname(d)
import mqtt_common
cfg = mqtt_common.broker_config_from_env()
client = mqtt_common.make_client("monitor_sub", cfg)
# Executed INSIDE lib.sh::atomic_dump_yaml (system python3 + PyYAML), under the
# YAML flock with schema-validate + .bak (review item 5). Marks matching running
# sessions terminated and kills their tmux (review item 3 behaviour preserved),
# or aborts the write entirely when nothing matches. The untrusted MQTT job id /
# event arrive via env (MQTT_JID / MQTT_EVENT) — never spliced into source (P1-B).
_MUTATION = r'''
import os, subprocess
from datetime import datetime, timezone
_jid = os.environ['MQTT_JID']
_event = os.environ['MQTT_EVENT']
_now = datetime.now(timezone.utc)
_changed = False
for s in d.get('tmux_sessions', []):
if s.get('delegate_job_id') == _jid and s.get('status') == 'running':
s['status'] = 'terminated'
s['terminated_at'] = _now.strftime('%Y-%m-%dT%H:%M:%SZ')
s['terminated_at_epoch'] = int(_now.timestamp())
s['termination_mode'] = 'auto-detected (MQTT ' + _event + ')'
_name = s.get('name')
_srv = s.get('tmux_server') or 'default'
_cmd = ['tmux'] + (['-L', _srv] if _srv != 'default' else []) + ['kill-session', '-t', _name]
subprocess.run(_cmd, capture_output=True)
print('MQTT Monitor: terminated + killed ' + str(_name) + ' on ' + str(_srv), flush=True)
_changed = True
if not _changed:
raise SystemExit(0) # nothing matched — skip the write entirely
'''
def on_message(client, userdata, msg):
def handle_terminal(jid, event):
if not lib_sh or not os.path.isfile(lib_sh):
print('MQTT Monitor: lib.sh not found, cannot update YAML', flush=True)
return
env = dict(os.environ)
env['MQTT_JID'] = jid
env['MQTT_EVENT'] = event
cmd = ['bash', '-c',
'source "$LIB_SH"; atomic_dump_yaml "$YAML_PATH" MQTT_JID="$MQTT_JID" MQTT_EVENT="$MQTT_EVENT"']
r = subprocess.run(cmd, input=_MUTATION, text=True, env=env, capture_output=True)
if (r.stdout or '').strip():
print(r.stdout.strip(), flush=True)
if r.returncode != 0 and (r.stderr or '').strip():
print('MQTT Monitor: atomic_dump_yaml stderr: ' + r.stderr.strip(), flush=True)
state = {'last_msg': time.time(), 'connected': False, 'failed': False}
def on_message(_client, _userdata, msg):
state['last_msg'] = time.time()
try:
payload = json.loads(msg.payload.decode("utf-8"))
jid = payload.get("job_id")
event = payload.get("event")
if not jid or not event:
return
if event in ("completed", "error"):
if jid and event in ("completed", "error"):
print(f"MQTT Monitor: received terminal event {event} for job {jid}", flush=True)
update_session_by_job(jid, event)
handle_terminal(jid, event)
except Exception as e:
print(f"MQTT Monitor error parsing message: {e}", flush=True)
def update_session_by_job(jid, event):
lock_path = yaml_path + '.lock'
lock_fh = open(lock_path, 'w')
fcntl.flock(lock_fh, fcntl.LOCK_EX)
try:
if os.path.exists(yaml_path):
with open(yaml_path) as f:
d_local = yaml.safe_load(f) or {}
else:
d_local = {}
sessions = d_local.setdefault('tmux_sessions', [])
updated = False
for s in sessions:
if s.get('delegate_job_id') == jid and s.get('status') == 'running':
s['status'] = 'terminated'
now_iso = datetime.now(timezone.utc).strftime('%Y-%m-%dT%H:%M:%SZ')
s['terminated_at'] = now_iso
s['terminated_at_epoch'] = int(datetime.now(timezone.utc).timestamp())
s['termination_mode'] = f"auto-detected (MQTT {event})"
name = s.get('name')
srv = s.get('tmux_server') or 'default'
kill_tmux_session(name, srv)
updated = True
if updated:
dir_ = os.path.dirname(yaml_path) or '.'
fd, tmp = tempfile.mkstemp(dir=dir_, prefix='.agent-sessions.', suffix='.tmp')
try:
with os.fdopen(fd, 'w') as f:
yaml.safe_dump(d_local, f, default_flow_style=False, sort_keys=False,
allow_unicode=True, width=4096)
os.replace(tmp, yaml_path)
print(f"MQTT Monitor: updated YAML for job {jid} to terminated", flush=True)
except Exception as e:
if os.path.exists(tmp):
os.remove(tmp)
print(f"MQTT Monitor error writing YAML: {e}", flush=True)
finally:
fcntl.flock(lock_fh, fcntl.LOCK_UN)
lock_fh.close()
def kill_tmux_session(name, srv):
try:
cmd = ['tmux']
if srv != 'default':
cmd += ['-L', srv]
cmd += ['kill-session', '-t', name]
subprocess.run(cmd, capture_output=True)
print(f"MQTT Monitor: killed tmux session {name} on server {srv}", flush=True)
except Exception as e:
print(f"MQTT Monitor error killing tmux: {e}", flush=True)
client.on_message = on_message
def on_connect(_c, _u, _flags, reason_code, _props):
rc = mqtt_common.reason_code_value(reason_code)
if rc == 0:
state['connected'] = True
_c.subscribe("python/mqtt/jobs/+/events", qos=1)
print("MQTT Monitor: subscribed to python/mqtt/jobs/+/events", flush=True)
else:
print(f"MQTT Monitor connection failed: {rc}", flush=True)
state['failed'] = True
print(f"MQTT Monitor connection failed: rc={rc}", flush=True)
cfg = mqtt_common.broker_config_from_env()
client = mqtt_common.make_client("monitor_sub", cfg)
client.on_message = on_message
client.on_connect = on_connect
print(f"MQTT Monitor: connecting to {cfg.host}:{cfg.port} (TLS={cfg.tls})...", flush=True)
client.connect(cfg.host, cfg.port, cfg.keepalive)
client.loop_forever()
# Connection failure → fall back to polling (review item 4).
try:
client.connect(cfg.host, cfg.port, cfg.keepalive)
except Exception as e:
print(f"MQTT Monitor: connect failed ({e}); falling back to polling", flush=True)
sys.exit(3)
client.loop_start()
_wait = time.time()
while time.time() - _wait < 5 and not state['connected'] and not state['failed']:
time.sleep(0.1)
if not state['connected']:
print("MQTT Monitor: broker did not accept connection; falling back to polling", flush=True)
client.loop_stop()
sys.exit(3)
start = time.time()
try:
while True:
now = time.time()
if timeout and (now - start) >= timeout:
print(f"MQTT Monitor: --timeout {timeout}s reached, exiting", flush=True)
break
if idle_timeout and (now - state['last_msg']) >= idle_timeout:
print(f"MQTT Monitor: --idle-timeout {idle_timeout}s reached, exiting", flush=True)
break
time.sleep(0.5)
finally:
client.loop_stop()
try:
client.disconnect()
except Exception:
pass
sys.exit(0)
PYEOF
sub_rc=$?
set -e
if [ "$sub_rc" = "3" ]; then
echo "MQTT Monitor: broker unavailable — falling back to polling (interval ${POLL_INTERVAL}s)" >&2
_self="$SKILLS_DIR/agent-sessions-monitor/scripts/reconcile.sh"
_start=$(date +%s)
while :; do
bash "$_self" --once --emit-diff >/dev/null 2>&1 || true
if [ "$SUB_TIMEOUT" != "0" ] && [ "$(( $(date +%s) - _start ))" -ge "$SUB_TIMEOUT" ]; then
break
fi
sleep "$POLL_INTERVAL"
done
fi
exit 0
fi