feat(lib): migrate to SQLite WAL backend for robust concurrency (FW-L1)

- Replaces python fcntl.flock with SQLite BEGIN IMMEDIATE.
- Status/Reconcile read from SQLite SSOT, with YAML fallback.
- Explicitly documented tradeoff: YAML is no longer a real-time view.
- Handles PRAGMA wal_checkpoint(TRUNCATE) safely outside transactions.
This commit is contained in:
2026-06-21 08:35:01 +00:00
parent 623eef814b
commit 9b797a5c8c
6 changed files with 155 additions and 52 deletions
+115 -38
View File
@@ -109,14 +109,21 @@ tmux() {
resolve_tmux_server() {
local session_name="$1"
SESSION_NAME="$session_name" env_python "$AGENT_SESSIONS_YAML" <<'PYEOF'
import os, sys, yaml
import os, sys, sqlite3, json, yaml
name = os.environ['SESSION_NAME']
yaml_path = os.environ['YAML_PATH']
if os.path.exists(yaml_path):
try:
db_path = yaml_path.replace('.yaml', '.db')
d = {}
try:
if os.path.exists(db_path):
conn = sqlite3.connect(db_path, timeout=10.0)
row = conn.execute('SELECT data FROM state WHERE id=1').fetchone()
if row: d = json.loads(row[0])
conn.close()
elif os.path.exists(yaml_path):
with open(yaml_path) as f:
d = yaml.safe_load(f) or {}
for s in d.get('tmux_sessions', []):
for s in d.get('tmux_sessions', []):
if s.get('name') == name:
server = s.get('tmux_server')
if server:
@@ -207,7 +214,6 @@ _atomic_dump_yaml_check_nfs() {
atomic_dump_yaml() {
local yaml_path="$1"; shift
_atomic_dump_yaml_check_nfs "$yaml_path"
local -a envs=("YAML_PATH=$yaml_path" "HOME_DIR=$HOME_DIR" "CLAUDE_PROJECT_DIR=$CLAUDE_PROJECT_DIR" "LOCAL_BIN=$LOCAL_BIN")
while [ $# -gt 0 ]; do
case "$1" in
@@ -217,13 +223,12 @@ atomic_dump_yaml() {
done
local mutation; mutation="$(cat)"
env "${envs[@]}" AGENT_SESSIONS_MUTATION="$mutation" python3 - <<'PYEOF'
import os, sys, fcntl, tempfile, shutil, glob, subprocess, json
import os, sys, tempfile, shutil, glob, subprocess, json, sqlite3
from datetime import datetime, timezone
import yaml
yaml_path = os.environ['YAML_PATH']
lock_path = yaml_path + '.lock'
db_path = yaml_path.replace('.yaml', '.db')
def _validate(d):
if not isinstance(d, dict):
@@ -242,41 +247,95 @@ def _validate(d):
if not isinstance(s.get('pane'), dict):
raise SystemExit(f"VALIDATE: tmux_sessions[{i}] {s.get('name')!r} missing pane")
def get_terminal_set(d):
return {s.get('name'): s.get('status') for s in d.get('tmux_sessions', []) if s.get('status') in ('stopped', 'terminated', 'archived')}
os.makedirs(os.path.dirname(db_path) or '.', exist_ok=True)
conn = sqlite3.connect(db_path, timeout=60.0)
for f in [db_path, db_path + '-wal', db_path + '-shm']:
if os.path.exists(f):
try:
os.chmod(f, 0o600)
except Exception:
pass
def is_nfs(path):
try:
df_out = subprocess.check_output(['df', '--output=target', path], text=True, stderr=subprocess.DEVNULL)
target = df_out.strip().split('\n')[-1].strip()
mount_out = subprocess.check_output(['mount'], text=True)
for line in mount_out.split('\n'):
if f" on {target} " in line and (' type nfs ' in line or ' type cifs ' in line or ' fuse.sshfs ' in line):
return True
except Exception:
pass
return False
if is_nfs(os.path.dirname(db_path) or '.'):
conn.execute('PRAGMA journal_mode=DELETE')
else:
conn.execute('PRAGMA journal_mode=WAL')
lock_fh = open(lock_path, 'w')
fcntl.flock(lock_fh, fcntl.LOCK_EX)
try:
if os.path.exists(yaml_path):
with open(yaml_path) as f:
d = yaml.safe_load(f) or {}
# Disable auto-commit by explicitly starting a transaction with BEGIN IMMEDIATE
# This prevents the read-modify-write lost update race condition.
conn.execute('BEGIN IMMEDIATE')
conn.execute('CREATE TABLE IF NOT EXISTS state (id INTEGER PRIMARY KEY, data TEXT)')
row = conn.execute('SELECT data FROM state WHERE id=1').fetchone()
if row:
d = json.loads(row[0])
else:
d = {}
# Seed from YAML
if os.path.exists(yaml_path):
with open(yaml_path) as f:
d = yaml.safe_load(f) or {}
else:
d = {}
conn.execute('INSERT INTO state (id, data) VALUES (1, ?)', (json.dumps(d),))
old_terminals = get_terminal_set(d)
# --- caller mutation (module scope: sees d, yaml, os, glob, subprocess) ---
exec(compile(os.environ['AGENT_SESSIONS_MUTATION'], '<mutation>', 'exec'), globals())
_validate(d)
if os.path.exists(yaml_path):
conn.execute('REPLACE INTO state (id, data) VALUES (1, ?)', (json.dumps(d),))
new_terminals = get_terminal_set(d)
# Write to YAML ONLY when a session transitions to a finished state
if new_terminals != old_terminals:
if os.path.exists(yaml_path):
try:
shutil.copy2(yaml_path, yaml_path + '.bak')
except Exception:
pass
dir_ = os.path.dirname(yaml_path) or '.'
fd, tmp = tempfile.mkstemp(dir=dir_, prefix='.agent-sessions.', suffix='.tmp')
try:
shutil.copy2(yaml_path, yaml_path + '.bak')
with os.fdopen(fd, 'w') as f:
yaml.safe_dump(d, f, default_flow_style=False, sort_keys=False,
allow_unicode=True, width=4096)
os.replace(tmp, yaml_path)
except Exception:
if os.path.exists(tmp):
os.remove(tmp)
raise
conn.commit()
if new_terminals != old_terminals:
try:
conn.execute('PRAGMA wal_checkpoint(TRUNCATE)')
except Exception:
pass
dir_ = os.path.dirname(yaml_path) or '.'
fd, tmp = tempfile.mkstemp(dir=dir_, prefix='.agent-sessions.', suffix='.tmp')
try:
with os.fdopen(fd, 'w') as f:
yaml.safe_dump(d, f, default_flow_style=False, sort_keys=False,
allow_unicode=True, width=4096)
os.replace(tmp, yaml_path)
except Exception:
if os.path.exists(tmp):
os.remove(tmp)
raise
except Exception:
conn.rollback()
raise
finally:
fcntl.flock(lock_fh, fcntl.LOCK_UN)
lock_fh.close()
conn.close()
PYEOF
}
@@ -298,19 +357,28 @@ find_workspace_uuid() {
local workspace="$1" agent="$2"
local abs; abs="$(cd "$workspace" 2>/dev/null && pwd)" || abs="$workspace"
WS_ABS="$abs" AGENT="$agent" env_python "$AGENT_SESSIONS_YAML" <<'PYEOF'
import os, json, glob
import os, json, glob, sqlite3
import yaml
ws = os.environ['WS_ABS']
agent = os.environ['AGENT']
home = os.environ['HOME_DIR']
yaml_path = os.environ['YAML_PATH']
db_path = yaml_path.replace('.yaml', '.db')
claude_project_dir = os.environ.get('CLAUDE_PROJECT_DIR', f"{home}/.claude/projects")
d = {}
if os.path.exists(yaml_path):
with open(yaml_path) as f:
d = yaml.safe_load(f) or {}
try:
if os.path.exists(db_path):
conn = sqlite3.connect(db_path, timeout=10.0)
row = conn.execute('SELECT data FROM state WHERE id=1').fetchone()
if row: d = json.loads(row[0])
conn.close()
elif os.path.exists(yaml_path):
with open(yaml_path) as f:
d = yaml.safe_load(f) or {}
except Exception:
pass
def jsonl_exists(uuid):
@@ -412,13 +480,22 @@ capture_conversation_id() {
is_already_stopped() {
local session_name="$1"
SESSION_NAME="$session_name" env_python "$AGENT_SESSIONS_YAML" <<'PYEOF'
import os, yaml
import os, yaml, sqlite3, json
name = os.environ['SESSION_NAME']
yaml_path = os.environ['YAML_PATH']
db_path = yaml_path.replace('.yaml', '.db')
d = {}
if os.path.exists(yaml_path):
with open(yaml_path) as f:
d = yaml.safe_load(f) or {}
try:
if os.path.exists(db_path):
conn = sqlite3.connect(db_path, timeout=10.0)
row = conn.execute('SELECT data FROM state WHERE id=1').fetchone()
if row: d = json.loads(row[0])
conn.close()
elif os.path.exists(yaml_path):
with open(yaml_path) as f:
d = yaml.safe_load(f) or {}
except Exception:
pass
for s in d.get('tmux_sessions', []):
if s.get('name') == name and s.get('status') == 'stopped':
print(f"stopped_at={s.get('stopped_at', '?')}")
@@ -237,8 +237,20 @@ now_iso = datetime.now(timezone.utc).strftime('%Y-%m-%dT%H:%M:%SZ')
try:
d
except NameError:
with open(yaml_path) as f:
d = yaml.safe_load(f) or {}
import sqlite3
db_path = yaml_path.replace('.yaml', '.db')
d = {}
try:
if os.path.exists(db_path):
conn = sqlite3.connect(db_path, timeout=10.0)
row = conn.execute('SELECT data FROM state WHERE id=1').fetchone()
if row: d = json.loads(row[0])
conn.close()
elif os.path.exists(yaml_path):
with open(yaml_path) as f:
d = yaml.safe_load(f) or {}
except Exception:
pass
drifts = []
actions = []
@@ -37,8 +37,20 @@ home = os.environ['HOME_DIR']
claude_project_dir = os.environ.get('CLAUDE_PROJECT_DIR', f"{home}/.claude/projects")
drift = json.loads(os.environ['DRIFT_JSON'])
with open(yaml_path) as f:
d = yaml.safe_load(f) or {}
db_path = yaml_path.replace('.yaml', '.db')
d = {}
import sqlite3
try:
if os.path.exists(db_path):
conn = sqlite3.connect(db_path, timeout=10.0)
row = conn.execute('SELECT data FROM state WHERE id=1').fetchone()
if row: d = json.loads(row[0])
conn.close()
elif os.path.exists(yaml_path):
with open(yaml_path) as f:
d = yaml.safe_load(f) or {}
except Exception:
pass
alive = set(drift.get('tmux_sessions_alive', []))
drift_by_name = {}