Files
multi-agent-mux/skills/agent-sessions-monitor/scripts/reconcile.sh
T
Godopu 0eb1d94a9c refactor(skills): cleanup dead code + full workflow A→B→C→D integration
Cleanup:
- Remove unused validate_yaml() helper from lib.sh
- Remove USER_MANUAL.html + mqtt-broker-setup.html (no refs found)

Workflow A (create_session ↔ delegate-job):
- Add --submit-job <prompt> option to create_session.sh
- Auto-register session in delegate-job registry, store delegate_job_id in YAML

Workflow B (push-based monitor):
- Migrate reconcile.sh to MQTT subscriber mode (polling fallback preserved)

Workflow C (unified status):
- status.sh now shows session + delegate-job state in single column

Workflow D (audit log + perms):
- JSON job files chmod 600
- create/delete/resume now publish lifecycle events to delegate-job
2026-06-19 14:27:29 +00:00

398 lines
15 KiB
Bash
Executable File

#!/usr/bin/env bash
# reconcile.sh — agent-sessions-monitor 의 부속 스크립트
# YAML ↔ tmux ↔ 디스크 artifact 간 drift 감지 (+ YAML 자동 갱신).
#
# Usage:
# bash reconcile.sh --once --emit-diff # drift 감지 + 갱신
# bash reconcile.sh --once --emit-diff --dry-run # drift 만 계산, 쓰기 안 함 (P1-E)
#
# --dry-run: 부수효과 없는 read-only. "지금 뭐 돌고 있지?" 질문에 안전.
# multi-agent-status 스킬이 이걸 재사용.
#
# 출력 (JSON): {timestamp, yaml_path, tmux_sessions_alive, tmux_confirmed, drifts, actions}
#
# Exit codes: 0 = ok | 1 = YAML not found | 2 = error
set -euo pipefail
source "$(cd "$(dirname "${BASH_SOURCE[0]}")/../.." && pwd)/lib.sh"
STATE_DIR="${AGENT_SESSIONS_STATE_DIR:-$HOME/.cache/agent-sessions-monitor}"
ONCE=0
EMIT_DIFF=0
DRY_RUN=0
SUBSCRIBE=0
while [ $# -gt 0 ]; do
case "$1" in
--once) ONCE=1; shift ;;
--emit-diff) EMIT_DIFF=1; shift ;;
--dry-run) DRY_RUN=1; shift ;;
--subscribe) SUBSCRIBE=1; shift ;;
-h|--help) echo "Usage: $0 [--once] [--emit-diff] [--dry-run] [--subscribe]"; exit 0 ;;
*) echo "ERROR: unknown arg: $1" >&2; exit 2 ;;
esac
done
[ -f "$AGENT_SESSIONS_YAML" ] || { echo "ERROR: $AGENT_SESSIONS_YAML not found" >&2; exit 1; }
if [ "$SUBSCRIBE" = "1" ]; then
SUBSCRIBE_MODE=1 env_python "$AGENT_SESSIONS_YAML" <<'PYEOF'
import os, sys, json, fcntl, tempfile, subprocess
from datetime import datetime, timezone
import yaml
yaml_path = os.environ['YAML_PATH']
home = os.environ['HOME_DIR']
# Add skills/delegate-job/scripts to path to import mqtt_common
script_dir = os.path.dirname(os.path.abspath(__file__)) if '__file__' in globals() else os.getcwd()
path_candidate = os.path.join('/home/godopu16/PuKi/laa/canary_projects/advanced_multi_agent', 'skills', 'delegate-job', 'scripts')
if os.path.isdir(path_candidate):
sys.path.append(path_candidate)
else:
d = script_dir
found = False
while d != '/' and d:
p = os.path.join(d, 'skills', 'delegate-job', 'scripts')
if os.path.isdir(p):
sys.path.append(p)
found = True
break
p2 = os.path.join(d, 'delegate-job', 'scripts')
if os.path.isdir(p2):
sys.path.append(p2)
found = True
break
d = os.path.dirname(d)
import mqtt_common
cfg = mqtt_common.broker_config_from_env()
client = mqtt_common.make_client("monitor_sub", cfg)
def on_message(client, userdata, msg):
try:
payload = json.loads(msg.payload.decode("utf-8"))
jid = payload.get("job_id")
event = payload.get("event")
if not jid or not event:
return
if event in ("completed", "error"):
print(f"MQTT Monitor: received terminal event {event} for job {jid}", flush=True)
update_session_by_job(jid, event)
except Exception as e:
print(f"MQTT Monitor error parsing message: {e}", flush=True)
def update_session_by_job(jid, event):
lock_path = yaml_path + '.lock'
lock_fh = open(lock_path, 'w')
fcntl.flock(lock_fh, fcntl.LOCK_EX)
try:
if os.path.exists(yaml_path):
with open(yaml_path) as f:
d_local = yaml.safe_load(f) or {}
else:
d_local = {}
sessions = d_local.setdefault('tmux_sessions', [])
updated = False
for s in sessions:
if s.get('delegate_job_id') == jid and s.get('status') == 'running':
s['status'] = 'terminated'
now_iso = datetime.now(timezone.utc).strftime('%Y-%m-%dT%H:%M:%SZ')
s['terminated_at'] = now_iso
s['terminated_at_epoch'] = int(datetime.now(timezone.utc).timestamp())
s['termination_mode'] = f"auto-detected (MQTT {event})"
name = s.get('name')
srv = s.get('tmux_server') or 'default'
kill_tmux_session(name, srv)
updated = True
if updated:
dir_ = os.path.dirname(yaml_path) or '.'
fd, tmp = tempfile.mkstemp(dir=dir_, prefix='.agent-sessions.', suffix='.tmp')
try:
with os.fdopen(fd, 'w') as f:
yaml.safe_dump(d_local, f, default_flow_style=False, sort_keys=False,
allow_unicode=True, width=4096)
os.replace(tmp, yaml_path)
print(f"MQTT Monitor: updated YAML for job {jid} to terminated", flush=True)
except Exception as e:
if os.path.exists(tmp):
os.remove(tmp)
print(f"MQTT Monitor error writing YAML: {e}", flush=True)
finally:
fcntl.flock(lock_fh, fcntl.LOCK_UN)
lock_fh.close()
def kill_tmux_session(name, srv):
try:
cmd = ['tmux']
if srv != 'default':
cmd += ['-L', srv]
cmd += ['kill-session', '-t', name]
subprocess.run(cmd, capture_output=True)
print(f"MQTT Monitor: killed tmux session {name} on server {srv}", flush=True)
except Exception as e:
print(f"MQTT Monitor error killing tmux: {e}", flush=True)
client.on_message = on_message
def on_connect(_c, _u, _flags, reason_code, _props):
rc = mqtt_common.reason_code_value(reason_code)
if rc == 0:
_c.subscribe("python/mqtt/jobs/+/events", qos=1)
print("MQTT Monitor: subscribed to python/mqtt/jobs/+/events", flush=True)
else:
print(f"MQTT Monitor connection failed: {rc}", flush=True)
client.on_connect = on_connect
print(f"MQTT Monitor: connecting to {cfg.host}:{cfg.port} (TLS={cfg.tls})...", flush=True)
client.connect(cfg.host, cfg.port, cfg.keepalive)
client.loop_forever()
PYEOF
exit 0
fi
mkdir -p "$STATE_DIR"
# 모든 비교 로직을 단일 소스로 둔다. dry-run 은 env_python(읽기전용), 그 외엔
# atomic_dump_yaml(flock + temp+rename) 로 같은 소스를 돌린다. atomic 래퍼에서는
# 'actions' 가 없으면 SystemExit(0) 으로 쓰기를 건너뛴다 (불필요한 재포맷 방지).
read -r -d '' RECON_SRC <<'PYEOF' || true
import os, json, glob, subprocess, time
from datetime import datetime, timezone
import yaml
yaml_path = os.environ['YAML_PATH']
home = os.environ['HOME_DIR']
now_iso = datetime.now(timezone.utc).strftime('%Y-%m-%dT%H:%M:%SZ')
# atomic 래퍼에서는 d 가 이미 로드돼 있음. env_python(dry-run)에서는 여기서 로드.
try:
d
except NameError:
with open(yaml_path) as f:
d = yaml.safe_load(f) or {}
drifts = []
actions = []
# === 현재 tmux 상태 — transient 실패를 'no sessions' 와 구분 (P1-E) ===
tmux_sessions = []
tmux_confirmed = True
# YAML 에 등록된 고유한 tmux_server 목록 수집 + 환경변수 TMUX_SERVER_NAME 포함
unique_servers = {'default'}
if 'TMUX_SERVER_NAME' in os.environ:
unique_servers.add(os.environ['TMUX_SERVER_NAME'])
for s in d.get('tmux_sessions', []):
srv = s.get('tmux_server') or 'default'
unique_servers.add(srv)
try:
for srv in sorted(unique_servers):
cmd = ['tmux']
if srv != 'default':
cmd += ['-L', srv]
cmd += ['ls', '-F', '#{session_name}|#{session_created}']
r = subprocess.run(cmd, capture_output=True, text=True)
if r.returncode == 0:
for line in r.stdout.strip().split('\n'):
if not line:
continue
name, created = line.split('|', 1)
tmux_sessions.append({'name': name, 'created': int(created), 'server': srv})
else:
err = (r.stderr or '').lower()
is_empty = ('no server running' in err) or ('no sessions' in err) or ('failed to connect' in err)
if not is_empty:
tmux_confirmed = False
except Exception:
tmux_confirmed = False
def pane_meta(session, srv):
try:
cmd = ['tmux']
if srv != 'default':
cmd += ['-L', srv]
cmd += ['list-panes', '-t', session, '-F',
'#{pane_pid}|#{pane_current_path}|#{pane_current_command}']
out = subprocess.check_output(cmd, text=True)
parts = out.strip().split('\n')[0].split('|')
return {'pid': int(parts[0]), 'cwd': parts[1], 'cmd': parts[2]}
except Exception:
return None
yaml_sessions = d.get('tmux_sessions', [])
yaml_session_names = {s['name'] for s in yaml_sessions if s.get('name')}
alive_set = {(t['name'], t.get('server', 'default')) for t in tmux_sessions}
# === drift A: tmux dead + YAML running → auto-terminate ===
# tmux 응답을 확정했을 때만. transient 실패 시 모두 terminated 로 마크하지 않음 (P1-E)
if tmux_confirmed:
for s in yaml_sessions:
name = s.get('name')
if not name:
continue
if s.get('status') in ('terminated', 'archived'):
continue
srv = s.get('tmux_server') or 'default'
if (name, srv) not in alive_set:
s['status'] = 'terminated'
s['terminated_at'] = now_iso
s['terminated_at_epoch'] = int(datetime.now(timezone.utc).timestamp())
s['termination_mode'] = 'auto-detected (tmux gone)'
pane = s.get('pane') or {}
drifts.append({'class': 'A', 'name': name,
'msg': f"{name}: tmux gone (was pane {pane.get('pid')}, cmd {pane.get('cmd')}). Marked terminated."})
actions.append(f"terminated: {name}")
# === drift B: tmux alive + not in YAML → auto-register ===
if tmux_confirmed:
for t in tmux_sessions:
name = t['name']
if name in yaml_session_names:
continue
if not (name.endswith('-creator-claude') or name.endswith('-creator-agy')):
continue
srv = t.get('server', 'default')
pm = pane_meta(name, srv)
if not pm:
continue
agent = 'claude' if name.endswith('-creator-claude') else 'agy'
cmd_full = 'claude' if agent == 'claude' else 'agy --dangerously-skip-permissions'
server_opt = f"-L {srv} " if srv != 'default' else ""
entry = {
'name': name,
'status': 'running',
'tmux_session_created_at': datetime.fromtimestamp(t['created'], tz=timezone.utc).strftime('%Y-%m-%dT%H:%M:%SZ'),
'tmux_session_epoch': t['created'],
'tmux_server': srv,
'pane': {'index': 0, 'pid': pm['pid'], 'cmd': agent, 'cmd_full': cmd_full, 'cwd': pm['cwd']},
# P2: cwd 인용
'start_command': f'tmux {server_opt}new-session -d -s "{name}" -x 140 -y 40 -c "{pm["cwd"]}" "{cmd_full}"',
'attach_command': f'tmux {server_opt}attach -t {name}',
'kill_command': f'tmux {server_opt}kill-session -t {name}',
'last_visible_status': 'auto-registered by monitor',
}
if agent == 'claude':
entry['tui'] = {'model': '(unknown — capture after first message)', 'provider': 'anthropic',
'plan': '(unknown)', 'account': '(unknown)', 'version': '(unknown)'}
entry['claude_session_id_own'] = None
else:
entry['child_pid'] = 0
entry['agy_conversation_id_own'] = None
entry['mcp_attachments'] = [
{
'name': 'stitch',
'transport': 'mcp-remote',
'endpoint': 'https://stitch.googleapis.com/mcp'
}
]
d.setdefault('tmux_sessions', []).append(entry)
yaml_session_names.add(name)
drifts.append({'class': 'B', 'name': name,
'msg': f"{name}: tmux found but not in YAML. Auto-registered (pane {pm['pid']}, cmd {pm['cmd']}, cwd {pm['cwd']})."})
actions.append(f"registered: {name}")
# === drift C: claude 새 session id materialize (per-row own id) ===
for s in d.get('tmux_sessions', []):
if not s.get('name', '').endswith('-creator-claude'):
continue
if s.get('status') != 'running':
continue
if s.get('claude_session_id_own'):
continue
cwd = (s.get('pane') or {}).get('cwd', '')
if not cwd:
continue
proj_key = cwd.replace('/', '-').replace('_', '-')
proj_dir = f"{home}/.claude/projects/{proj_key}"
if not os.path.isdir(proj_dir):
continue
jsonls = sorted(glob.glob(f"{proj_dir}/*.jsonl"), key=os.path.getmtime, reverse=True)
if not jsonls:
continue
latest = jsonls[0]
if time.time() - os.path.getmtime(latest) > 300:
continue
try:
with open(latest) as f:
first = f.readline().strip()
if not first:
continue
sid = json.loads(first).get('sessionId')
if not sid:
continue
except Exception:
continue
s['claude_session_id_own'] = sid
drifts.append({'class': 'C', 'name': s['name'], 'msg': f"{s['name']}: session id materialized: {sid}"})
actions.append(f"updated session id: {sid}")
# === drift C (agy): agy 새 session id materialize (per-row own id) ===
for s in d.get('tmux_sessions', []):
if not s.get('name', '').endswith('-creator-agy'):
continue
if s.get('status') != 'running':
continue
if s.get('agy_conversation_id_own'):
continue
cwd = (s.get('pane') or {}).get('cwd', '')
if not cwd:
continue
lc = f"{home}/.gemini/antigravity-cli/cache/last_conversations.json"
if os.path.exists(lc):
try:
with open(lc) as f:
lc_data = json.load(f)
cid = lc_data.get(cwd)
if cid and os.path.exists(f"{home}/.gemini/antigravity-cli/conversations/{cid}.db"):
s['agy_conversation_id_own'] = cid
drifts.append({'class': 'C', 'name': s['name'], 'msg': f"{s['name']}: conversation id materialized: {cid}"})
actions.append(f"updated conversation id: {cid}")
except Exception:
pass
# === drift D: stale UUID (cache 의 artifact 가 사라짐) — 보고만, 변경 없음 ===
ai = d.get('agent_identities', {}) or {}
cl = (ai.get('claude') or {})
if cl.get('session_id'):
sid = cl['session_id']
if not glob.glob(f"{home}/.claude/projects/*/{sid}.jsonl"):
drifts.append({'class': 'D', 'name': '(claude identity cache)',
'msg': f"stale UUID in agent_identities.claude.session_id: {sid} (jsonl missing)"})
ag = (ai.get('agy') or {})
if ag.get('conversation_id'):
cid = ag['conversation_id']
if not os.path.exists(f"{home}/.gemini/antigravity-cli/conversations/{cid}.db"):
drifts.append({'class': 'D', 'name': '(agy identity cache)',
'msg': f"stale UUID in agent_identities.agy.conversation_id: {cid} (.db missing)"})
result = {
'timestamp': now_iso,
'yaml_path': yaml_path,
'tmux_sessions_alive': sorted(f"{t['name']}|{t.get('server', 'default')}" for t in tmux_sessions),
'tmux_confirmed': tmux_confirmed,
'drifts': drifts,
'actions': actions,
}
print(json.dumps(result, indent=2, ensure_ascii=False))
# atomic 래퍼: actions 가 없으면 쓰기를 건너뛴다. env_python(dry-run)에선 무해.
if not actions:
raise SystemExit(0)
PYEOF
if [ "$DRY_RUN" = "1" ]; then
printf '%s' "$RECON_SRC" | env_python "$AGENT_SESSIONS_YAML"
else
printf '%s' "$RECON_SRC" | atomic_dump_yaml "$AGENT_SESSIONS_YAML"
fi