97f649a3e1
- Copy delegate-job-skill/skills/delegate-job/ → skills/delegate-job/ - Move requirements.txt (paho-mqtt>=2.0.0) into the new location - Refactor outdated hardcoded paths (~/PuKi/lab/, ~/.hermes/skills/) to dynamic resolution - Add MQTT connection timeout / retry hardening - Remove legacy delegate-job-skill/ directory - Update .gitignore Note: delegate-job-skill git history is squashed — preserved content, dropped commit lineage.
184 lines
6.7 KiB
Markdown
184 lines
6.7 KiB
Markdown
# Job Registry
|
|
|
|
The registry is the **single source of truth** for delegated work. Job metadata
|
|
(id, prompt, broker, status, timeouts) lives in files, **not** environment
|
|
variables — so one tmux session can handle many jobs sequentially or in
|
|
parallel without collisions, and `publish_event.py` / `job_subscriber.py` can
|
|
reconstruct everything they need from the registry alone.
|
|
|
|
Reference implementation: [`./scripts/registry.py`](./scripts/registry.py)
|
|
(library + CLI) over the primitives in
|
|
[`./scripts/mqtt_common.py`](./scripts/mqtt_common.py).
|
|
|
|
---
|
|
|
|
## 1. Directory layout
|
|
|
|
```
|
|
.hermes/jobs/
|
|
<job_id>.json # job metadata record (schema below)
|
|
<job_id>.events.log # append-only JSON-lines event log (debug, optional)
|
|
.lock # shared advisory lock (fcntl) for the whole registry
|
|
```
|
|
|
|
`registry_dir` defaults to `.hermes/jobs` and is overridable everywhere via
|
|
`--registry-dir`.
|
|
|
|
---
|
|
|
|
## 2. Job record schema
|
|
|
|
```json
|
|
{
|
|
"schema_version": 1,
|
|
"job_id": "abc12345",
|
|
"status": "pending | running | completed | error | cancelled",
|
|
"created_at": "2026-06-19T09:30:00Z",
|
|
"updated_at": "2026-06-19T09:32:00Z",
|
|
"prompt": "정렬 문제 10개를 만들어 sort_problems.md로 저장…",
|
|
"agent": "claude-code",
|
|
"agent_session": "tmux:claude",
|
|
"broker": {
|
|
"host": "broker.hivemq.com",
|
|
"port": 1883,
|
|
"tls": false,
|
|
"username": null,
|
|
"password": null
|
|
},
|
|
"topic_prefix": "python/mqtt/jobs/abc12345",
|
|
"timeout_sec": 600,
|
|
"idle_timeout_sec": 120,
|
|
"expected_artifacts": ["sort_problems.md"],
|
|
"last_seq": 0,
|
|
"auth_token": null
|
|
}
|
|
```
|
|
|
|
- `broker` lets `publish_event.py` connect from the record alone (env still
|
|
overrides toggles like `MQTT_TLS`).
|
|
- `topic_prefix` → the events topic is `<topic_prefix>/events`.
|
|
- `last_seq` backs the monotonic `seq` counter so it survives process restarts.
|
|
- `expected_artifacts` is the hook a user `validate.sh` checks (existence/content).
|
|
- `auth_token` is `null` in PoC; production sets `secrets.token_urlsafe(32)`.
|
|
|
|
---
|
|
|
|
## 3. Concurrency rules
|
|
|
|
### PoC — fcntl advisory lock
|
|
|
|
Every read-modify-write (`register_job`, `pick_pending`, `update_status`,
|
|
`next_seq`) runs inside `registry_lock(registry_dir)`, an exclusive
|
|
`fcntl.flock` over `.lock`. Single-host, good enough for many tmux sessions on
|
|
one machine.
|
|
|
|
### Production — SQLite WAL
|
|
|
|
When delegation spans **multiple hosts**, the file lock no longer serialises
|
|
across machines. Migrate the same operations to a SQLite database in WAL mode
|
|
(`PRAGMA journal_mode=WAL`) with a transaction per claim. The function
|
|
signatures stay identical; only the storage backend changes.
|
|
|
|
---
|
|
|
|
## 4. How multiple sessions take only their own work
|
|
|
|
Each tmux session carries an `agent_session` label (`tmux:claude`,
|
|
`tmux:claude-a`, `tmux:claude-b`, …). `pick_pending(agent_session)`:
|
|
|
|
1. acquires the registry lock,
|
|
2. scans for the **oldest** record with `status == "pending"` **and**
|
|
matching `agent_session`,
|
|
3. flips it to `running` and writes it back **atomically**,
|
|
4. releases the lock and returns the `job_id` (or `None`).
|
|
|
|
Because the scan + flip happen under one lock, two sessions can never claim the
|
|
same job. Sessions with distinct labels naturally partition the work; sessions
|
|
sharing a label compete safely — first to acquire the lock wins, the other sees
|
|
the job already `running` and moves on.
|
|
|
|
```bash
|
|
# session A only ever runs its own pending jobs
|
|
PY scripts/registry.py pick --agent-session tmux:claude-a # prints id or exits 3
|
|
```
|
|
|
|
---
|
|
|
|
## 5. Atomic status updates
|
|
|
|
All writes use a temp-file + `os.replace` rename, which is atomic on POSIX:
|
|
|
|
1. take the registry lock,
|
|
2. load the current record,
|
|
3. mutate fields + refresh `updated_at` (and `last_seq` for `next_seq`),
|
|
4. write to `.<job_id>.<rand>.tmp` in the **same directory**, `fsync`,
|
|
5. `os.replace(tmp, <job_id>.json)`,
|
|
6. release the lock.
|
|
|
|
A reader therefore always sees either the old or the new complete record, never
|
|
a half-written file. This is the file-based equivalent of the rename trick
|
|
(`pending.<session>` → `running.<session>`) and maps cleanly onto a single
|
|
SQLite transaction when you migrate.
|
|
|
|
---
|
|
|
|
## 6. CLI quick reference
|
|
|
|
```bash
|
|
PY=.venv/bin/python
|
|
$PY scripts/registry.py register --prompt "…" --agent claude-code \
|
|
--agent-session tmux:claude --timeout 600 --idle-timeout 120 # → prints job_id
|
|
$PY scripts/registry.py list # human table
|
|
$PY scripts/registry.py list --json # full records
|
|
$PY scripts/registry.py get --job <id> # one record
|
|
$PY scripts/registry.py status --job <id> --set completed # set status
|
|
$PY scripts/registry.py pick --agent-session tmux:claude # claim → running
|
|
```
|
|
|
|
Exit codes: `0` ok, `1` not found / bad status, `3` (`pick`) no pending job for
|
|
that session.
|
|
|
|
---
|
|
|
|
## 7. Persistent audit log
|
|
|
|
Separate from the registry, every job is also mirrored to a durable append-only
|
|
audit log at `.hermes/delegate_job_logs/<job_id>/` (override with
|
|
`DELEGATE_JOB_LOGS_DIR`, default `<cwd>/.hermes/delegate_job_logs`). The registry
|
|
is **live state** mutated in place; the audit log is **history** that survives
|
|
even after the registry dir is cleaned up. It is git-ignored.
|
|
|
|
```
|
|
.hermes/delegate_job_logs/<job_id>/
|
|
meta.json # registration snapshot (the full job record at register time)
|
|
events.ndjson # append-only, one JSON event per line, time-ordered
|
|
status.json # current status only (fast point-query)
|
|
```
|
|
|
|
`events.ndjson` lines are written automatically at four points:
|
|
|
|
| Trigger | line `event` | Source |
|
|
|---------|-------------|--------|
|
|
| `register_job` | `registered` | `registry.register_job` → `mqtt_common.init_job_log` |
|
|
| status change (`update_status`, `pick`, publish status sync) | `status_changed` (`from`/`to`) | `mqtt_common.update_job_status` / `pick_pending` |
|
|
| event published | `published` (embeds the exact payload) | `publish_event.py` |
|
|
| event received | `received` | `job_subscriber.py` |
|
|
|
|
Helpers live in [`./scripts/mqtt_common.py`](./scripts/mqtt_common.py):
|
|
`LOGS_DIR`, `job_log_path`, `init_job_log`, `append_event` (fcntl-locked,
|
|
concurrent-append safe), `update_logged_status`, and the readers
|
|
`read_logged_meta` / `read_logged_status` / `iter_logged_events` /
|
|
`list_logged_jobs`. Every writer is **best-effort and isolated** — wrapped in
|
|
`try/except` with a `logger.warning`, so an audit-log failure never breaks the
|
|
registry write, the publish, or the subscribe it shadows.
|
|
|
|
Read them via the CLI:
|
|
|
|
```bash
|
|
PY=.venv/bin/python
|
|
$PY scripts/registry.py logs <job_id> # pretty timeline
|
|
$PY scripts/registry.py logs <job_id> --tail 20 # last 20 events
|
|
$PY scripts/registry.py logs <job_id> --json # raw JSON lines
|
|
$PY scripts/registry.py logs --list # every job, live status
|
|
```
|