refactor: migrate skills/ directory to .agents/skills/

This commit is contained in:
2026-06-21 14:42:12 +00:00
parent e1d998e1ef
commit 30e447189e
28 changed files with 44 additions and 44 deletions
@@ -0,0 +1,252 @@
#!/usr/bin/env python3
"""job_subscriber.py — the single entry point for observing Job events.
Subscribes to one job's ``<topic_prefix>/events`` (or, with ``--wait-any``, the
events of every running/pending job in the registry), prints one line to stdout
per accepted event, and exits on a terminal event or a timeout.
Design points (all flagged in the PLAN review):
- terminal state machine: ``completed``/``error`` is acted on exactly once per
job, so QoS-1 duplicates or an ``error``-after-``completed`` reorder are safe.
- dual timeouts: a wall-clock ``--timeout`` (total budget, started at
subscribe time so a cold start can't hang forever) AND an idle
``--idle-timeout`` (no new event for N seconds).
- defensive parsing: undecodable payloads, ``schema_version`` mismatches, and
``job_id`` values we did not subscribe for are logged and dropped.
stdout = event lines only. Diagnostics go to stderr via logging.
Exit codes:
0 all watched jobs reached ``completed``
1 any watched job reached ``error``
2 timed out (wall-clock or idle) before all jobs finished
"""
from __future__ import annotations
import argparse
import json
import logging
import queue
import sys
import time
from typing import Any, Dict, List, Optional, Set, Tuple
import mqtt_common
import registry
from mqtt_common import (
DEFAULT_REGISTRY_DIR,
SCHEMA_VERSION,
broker_config_from_job,
load_job,
make_client,
)
logger = logging.getLogger("delegate_job.job_subscriber")
TERMINAL_EVENTS = ("completed", "error")
def _format_line(topic: str, payload: Dict[str, Any]) -> str:
return (
f"{payload.get('timestamp','-')} "
f"job={payload.get('job_id','?')} "
f"seq={payload.get('seq','?')} "
f"{payload.get('event','?'):<20} "
f"{payload.get('detail','')}"
)
class _Watcher:
"""Holds the shared queue + the set of job_ids we accept events for."""
def __init__(self, expected_job_ids: Set[str], expected_tokens: Dict[str, Optional[str]]):
self.events: "queue.Queue[Tuple[str, Dict[str, Any]]]" = queue.Queue()
self.expected = set(expected_job_ids)
self.tokens = expected_tokens # job_id -> expected auth_token (or None)
self.last_seq: Dict[str, int] = {jid: 0 for jid in expected_job_ids}
def on_message(self, _client, _userdata, msg) -> None:
# --- defensive parsing -------------------------------------------
try:
payload = json.loads(msg.payload.decode("utf-8"))
except (UnicodeDecodeError, json.JSONDecodeError) as exc:
logger.warning("drop unparseable payload on %s: %s", msg.topic, exc)
return
if not isinstance(payload, dict):
logger.warning("drop non-object payload on %s", msg.topic)
return
if payload.get("schema_version") != SCHEMA_VERSION:
logger.warning("drop event with schema_version=%r (expected %d)",
payload.get("schema_version"), SCHEMA_VERSION)
return
jid = payload.get("job_id")
if jid not in self.expected:
logger.warning("drop event for unexpected job_id=%r on %s", jid, msg.topic)
return
# --- production auth check: data.auth_token must match if expected ---
expected_token = self.tokens.get(jid)
if not mqtt_common.verify_hmac(payload, expected_token):
logger.warning("drop event for job %s: HMAC verify failed", jid)
return
# --- replay attack defense: check monotonic sequence ---
seq = payload.get("seq")
if seq is None or not isinstance(seq, int):
logger.warning("drop event for job %s: missing or invalid seq", jid)
return
if seq <= self.last_seq.get(jid, 0):
logger.warning("drop event for job %s: seq %d is not monotonically increasing (last %d)",
jid, seq, self.last_seq.get(jid, 0))
return
self.last_seq[jid] = seq
# Persistent audit log from the *subscriber's* vantage point: every event
# that survives defensive parsing is recorded here, including ones a
# different host published. This is the external-observer record that
# backstops the publisher's own "published" line if it never wrote one.
mqtt_common.append_event(jid, {
"event": "received",
"source_event": payload.get("event"),
"seq": payload.get("seq"),
"topic": msg.topic,
"timestamp": payload.get("timestamp"),
"detail": payload.get("detail", ""),
})
self.events.put((msg.topic, payload))
def _collect_jobs(args) -> List[Dict[str, Any]]:
"""Resolve the list of job records this invocation should watch."""
if args.wait_any:
jobs = [r for r in registry.list_jobs(args.registry_dir)
if r.get("status") in ("pending", "running")]
if not jobs:
logger.error("no pending/running jobs to wait for")
return jobs
job = load_job(args.job, args.registry_dir) # raises FileNotFoundError
return [job]
def main(argv=None) -> int:
parser = argparse.ArgumentParser(description="Subscribe to Job events on MQTT")
target = parser.add_mutually_exclusive_group(required=True)
target.add_argument("--job", help="job id to watch")
target.add_argument("--wait-any", action="store_true",
help="watch every pending/running job in the registry")
parser.add_argument("--timeout", type=float, default=None,
help="wall-clock budget in seconds (default: job.timeout_sec or 3600)")
parser.add_argument("--idle-timeout", type=float, default=None,
help="max seconds with no new event (default: job.idle_timeout_sec or 120)")
parser.add_argument("--expect-retention", action="store_true",
help="warn if no retained terminal event arrives promptly")
parser.add_argument("--registry-dir", default=DEFAULT_REGISTRY_DIR)
parser.add_argument("-v", "--verbose", action="store_true")
args = parser.parse_args(argv)
mqtt_common.setup_logging(logging.DEBUG if args.verbose else logging.WARNING)
try:
jobs = _collect_jobs(args)
except FileNotFoundError as exc:
logger.error("%s", exc)
return 2
if not jobs:
return 2
expected_ids: Set[str] = {j["job_id"] for j in jobs}
tokens = {j["job_id"]: j.get("auth_token") for j in jobs}
watcher = _Watcher(expected_ids, tokens)
# Resolve timeouts from CLI, falling back to the (first) job's settings.
base_job = jobs[0]
wall_timeout = args.timeout if args.timeout is not None else float(base_job.get("timeout_sec", 3600))
idle_timeout = args.idle_timeout if args.idle_timeout is not None else float(base_job.get("idle_timeout_sec", 120))
# All watched jobs share a broker in practice; connect using the first
# job's broker and subscribe to each job's events topic.
config = broker_config_from_job(base_job)
client = make_client("subscriber", config)
client.on_message = watcher.on_message
subscribed_topics = []
for job in jobs:
prefix = job.get("topic_prefix") or mqtt_common.topic_prefix_for(job["job_id"])
subscribed_topics.append(f"{prefix}/events")
def on_connect(_c, _u, _flags, reason_code, _props):
if mqtt_common.reason_code_value(reason_code) != 0:
logger.error("broker connection failed: rc=%s", reason_code)
return
for topic in subscribed_topics:
_c.subscribe(topic, qos=1)
logger.info("subscribed to %s", topic)
def on_disconnect(_c, _u, _flags, reason_code, _props):
rc = mqtt_common.reason_code_value(reason_code)
if rc != 0:
logger.warning("broker disconnected (rc=%s); will retry reconnect", reason_code)
client.on_connect = on_connect
client.on_disconnect = on_disconnect
client.reconnect_delay_set(min_delay=1, max_delay=16)
mqtt_common.with_retry(
lambda: client.connect(config.host, config.port, config.keepalive),
attempts=5, base_delay=1.0, max_delay=16.0
)()
client.loop_start()
terminal: Dict[str, str] = {} # job_id -> "completed"/"error"
pending: Set[str] = set(expected_ids)
start = time.monotonic()
wall_deadline = start + wall_timeout
last_event = start
retention_checked = not args.expect_retention
try:
while pending:
now = time.monotonic()
if now >= wall_deadline:
logger.error("wall-clock timeout (%.0fs); still pending: %s",
wall_timeout, ", ".join(sorted(pending)))
return 2
idle_left = idle_timeout - (now - last_event)
if idle_left <= 0:
logger.error("idle timeout (%.0fs, no events); still pending: %s",
idle_timeout, ", ".join(sorted(pending)))
return 2
wait = min(wall_deadline - now, idle_left, 1.0)
try:
topic, payload = watcher.events.get(timeout=wait)
except queue.Empty:
if not retention_checked and (now - start) > 3.0:
logger.warning("--expect-retention set but no retained "
"terminal event observed yet")
retention_checked = True
continue
last_event = time.monotonic()
retention_checked = True
print(_format_line(topic, payload), flush=True)
jid = payload["job_id"]
event = payload.get("event")
if event in TERMINAL_EVENTS:
if jid in terminal:
# Already finalised: ignore duplicates / late reorders.
logger.info("ignoring duplicate terminal %s for %s", event, jid)
continue
terminal[jid] = event
pending.discard(jid)
finally:
client.loop_stop()
try:
client.disconnect()
except Exception: # pragma: no cover
pass
# All jobs reached a terminal state. error wins over completed.
if any(state == "error" for state in terminal.values()):
return 1
return 0
if __name__ == "__main__":
sys.exit(main())
@@ -0,0 +1,600 @@
"""Shared MQTT + registry helpers for the tmux-agent-orchestrate-delegate-job skill.
Single entry point for:
- broker configuration (env -> dataclass),
- paho client construction (auth + TLS + unique client id),
- monotonic per-job sequence counters,
- retry-with-exponential-backoff,
- atomic registry record load/update under an fcntl lock.
Requires paho-mqtt >= 2.0 (uses CallbackAPIVersion.VERSION2).
This module is the *only* place that talks to the broker config and to the
raw job record file, so PoC -> production migration touches just env/registry
values, never code (see references/mqtt-broker-setup.md).
"""
from __future__ import annotations
import functools
import hashlib
import hmac
import json
import logging
import os
import tempfile
import time
import uuid
from contextlib import contextmanager
from dataclasses import asdict, dataclass
from pathlib import Path
from typing import Any, Callable, Dict, Iterable, List, Optional
import paho.mqtt.client as mqtt
logger = logging.getLogger("delegate_job.mqtt_common")
def _load_dotenv(workspace_dir: str = None) -> None:
"""Load .env file from workspace if it exists and env var not already set.
This ensures Python scripts get the same env vars as the shell wrapper
scripts that source .env. Only sets vars that are not already in os.environ
(i.e. OS env takes precedence over .env file).
"""
import os
if workspace_dir is None:
# Walk up from this script to find workspace root
d = os.path.dirname(os.path.abspath(__file__))
for _ in range(5):
if os.path.isfile(os.path.join(d, ".env")):
break
d = os.path.dirname(d)
else:
d = workspace_dir
env_path = os.path.join(d, ".env")
if not os.path.isfile(env_path):
return
with open(env_path, "r") as f:
for line in f:
line = line.strip()
if not line or line.startswith("#"):
continue
if "=" in line:
key, _, val = line.partition("=")
key = key.strip()
val = val.strip().strip('"').strip("'")
if key and key not in os.environ:
os.environ[key] = val
_load_dotenv()
# --------------------------------------------------------------------------
# Constants
# --------------------------------------------------------------------------
SCHEMA_VERSION = 1
DEFAULT_REGISTRY_DIR = ".hermes/jobs"
DEFAULT_TOPIC_ROOT = "python/mqtt/jobs"
LOCK_FILENAME = ".lock"
# Persistent audit-log layout: .hermes/delegate_job_logs/<job_id>/{meta,events,status}.
# This is a *separate* artifact from the registry: the registry is the live job
# record (mutated in place), the audit log is an append-only history that
# survives even if the registry dir is cleaned up.
META_FILENAME = "meta.json"
EVENTS_FILENAME = "events.ndjson"
STATUS_FILENAME = "status.json"
def _default_logs_dir() -> str:
"""Audit-log root. Overridable with ``DELEGATE_JOB_LOGS_DIR``; otherwise
``<cwd>/.hermes/delegate_job_logs`` — we keep audit logs next to the
live registry (``.hermes/jobs/``) so the two runtime artifacts sit
under the same parent dir and follow the same ``.gitignore`` rule.
The cwd of whichever process emits events (the bash wrapper and
scripts) is used as the anchor."""
env = os.environ.get("DELEGATE_JOB_LOGS_DIR")
if env and env.strip():
return env
return os.path.join(os.getcwd(), ".hermes", "delegate_job_logs")
LOGS_DIR = _default_logs_dir()
# --------------------------------------------------------------------------
# Broker configuration
# --------------------------------------------------------------------------
@dataclass
class BrokerConfig:
"""Resolved broker connection settings.
PoC defaults target the public HiveMQ broker. Production overrides arrive
either from environment variables or from a job record's ``broker.*`` block
(see ``broker_config_from_job``).
"""
host: str = "broker.hivemq.com"
port: int = 1883
tls: bool = False
username: Optional[str] = None
password: Optional[str] = None
client_id_prefix: str = "hermes"
# TLS material (only consulted when tls is True).
ca_certs: Optional[str] = None
certfile: Optional[str] = None
keyfile: Optional[str] = None
keepalive: int = 60
def to_dict(self) -> Dict[str, Any]:
return asdict(self)
def to_registry_block(self) -> Dict[str, Any]:
"""The subset that gets persisted into a job record's broker block."""
return {
"host": self.host,
"port": self.port,
"tls": self.tls,
"username": self.username,
"password": self.password,
}
def _env_bool(name: str, default: bool = False) -> bool:
raw = os.environ.get(name)
if raw is None:
return default
return raw.strip().lower() in ("1", "true", "yes", "on")
def _env_int(name: str, default: int) -> int:
raw = os.environ.get(name)
if raw is None or raw.strip() == "":
return default
try:
return int(raw)
except ValueError:
logger.warning("invalid int for %s=%r; using default %d", name, raw, default)
return default
def broker_config_from_env(overrides: Optional[Dict[str, Any]] = None) -> BrokerConfig:
"""Build a :class:`BrokerConfig` from environment variables.
Recognised vars (all optional, PoC defaults shown):
MQTT_BROKER (broker.hivemq.com), MQTT_PORT (1883), MQTT_TLS (0),
MQTT_USERNAME, MQTT_PASSWORD, MQTT_CLIENT_ID_PREFIX (hermes),
MQTT_CA_CERTS, MQTT_CERTFILE, MQTT_KEYFILE, MQTT_KEEPALIVE (60).
``overrides`` (e.g. a job record's broker block) wins over the env values
for any key it specifies with a non-None value.
"""
cfg = BrokerConfig(
host=os.environ.get("MQTT_BROKER", "broker.hivemq.com"),
port=_env_int("MQTT_PORT", 1883),
tls=_env_bool("MQTT_TLS", False),
username=os.environ.get("MQTT_USERNAME") or None,
password=os.environ.get("MQTT_PASSWORD") or None,
client_id_prefix=os.environ.get("MQTT_CLIENT_ID_PREFIX", "hermes"),
ca_certs=os.environ.get("MQTT_CA_CERTS") or None,
certfile=os.environ.get("MQTT_CERTFILE") or None,
keyfile=os.environ.get("MQTT_KEYFILE") or None,
keepalive=_env_int("MQTT_KEEPALIVE", 60),
)
if overrides:
for key, value in overrides.items():
if value is not None and hasattr(cfg, key):
setattr(cfg, key, value)
return cfg
def broker_config_from_job(job: Dict[str, Any]) -> BrokerConfig:
"""Resolve broker config for a job: env defaults, then the job's broker.*
block overrides. This lets ``publish_event.py`` connect from the registry
alone, while still honouring environment toggles (e.g. MQTT_TLS=1)."""
return broker_config_from_env(overrides=job.get("broker") or {})
def make_client(role: str, config: Optional[BrokerConfig] = None) -> mqtt.Client:
"""Return a configured paho ``Client`` (not yet connected).
The client id is ``f"{prefix}-{role}-{uuid8}"`` so concurrent publishers /
subscribers never collide on the broker. Auth and TLS are applied when the
config supplies them.
"""
config = config or broker_config_from_env()
client_id = f"{config.client_id_prefix}-{role}-{uuid.uuid4().hex[:8]}"
client = mqtt.Client(
callback_api_version=mqtt.CallbackAPIVersion.VERSION2,
client_id=client_id,
)
if config.username:
client.username_pw_set(config.username, config.password)
if config.tls:
# If ca_certs is None paho uses the system trust store (good enough for
# public CAs); a private CA bundle path is passed through unchanged.
client.tls_set(
ca_certs=config.ca_certs,
certfile=config.certfile,
keyfile=config.keyfile,
)
logger.debug("built client id=%s tls=%s host=%s", client_id, config.tls, config.host)
return client
def reason_code_value(rc: Any) -> int:
"""Normalise a paho v2 connect reason code to an int.
paho-mqtt 2.x hands callbacks a ``ReasonCode`` object (not an int); older
paths may pass a plain int. ``ReasonCode`` exposes ``.value``; 0 == success.
"""
return int(getattr(rc, "value", rc))
def verify_hmac(payload: dict, auth_token: Optional[str]) -> bool:
"""Verify HMAC-SHA256 signature. Returns True if valid or no token set."""
if not auth_token:
return True # PoC mode — no auth
sig = payload.get("data", {}).get("hmac_sig")
if not sig:
return False
sign_payload = {k: v for k, v in payload.items() if k != "data"}
sign_payload["data"] = {k: v for k, v in payload.get("data", {}).items() if k != "hmac_sig"}
msg = json.dumps(sign_payload, sort_keys=True, separators=(",", ":")).encode()
expected = hmac.new(auth_token.encode(), msg, hashlib.sha256).hexdigest()
return hmac.compare_digest(sig, expected)
def topic_prefix_for(job_id: str, root: str = DEFAULT_TOPIC_ROOT) -> str:
return f"{root}/{job_id}"
def events_topic_for(job_id: str, root: str = DEFAULT_TOPIC_ROOT) -> str:
return f"{topic_prefix_for(job_id, root)}/events"
# --------------------------------------------------------------------------
# Registry primitives (single source of truth for raw record I/O)
# --------------------------------------------------------------------------
def _job_path(job_id: str, registry_dir: str) -> Path:
return Path(registry_dir) / f"{job_id}.json"
def _lock_path(registry_dir: str) -> Path:
return Path(registry_dir) / LOCK_FILENAME
@contextmanager
def registry_lock(registry_dir: str):
"""Advisory exclusive lock over the whole registry dir via fcntl.
PoC-grade single-host concurrency control. Multiple tmux sessions / scripts
serialise their read-modify-write of job records through this lock so two
sessions never claim the same pending job. For multi-host delegation move
to SQLite WAL (see references/registry.md)."""
import fcntl # POSIX only; imported lazily so import works on Windows.
Path(registry_dir).mkdir(parents=True, exist_ok=True)
lock_file = _lock_path(registry_dir)
fh = open(lock_file, "a+")
try:
fcntl.flock(fh.fileno(), fcntl.LOCK_EX)
yield
finally:
try:
fcntl.flock(fh.fileno(), fcntl.LOCK_UN)
finally:
fh.close()
def load_job(job_id: str, registry_dir: str = DEFAULT_REGISTRY_DIR) -> Dict[str, Any]:
"""Load and parse a job record. Raises FileNotFoundError if absent."""
path = _job_path(job_id, registry_dir)
if not path.exists():
raise FileNotFoundError(f"job record not found: {path}")
with open(path, "r", encoding="utf-8") as fh:
return json.load(fh)
def _atomic_write_record(job_id: str, registry_dir: str, record: Dict[str, Any]) -> None:
"""Write a record atomically: temp file in the same dir + os.replace.
The rename is atomic on POSIX, so readers never observe a half-written
file. Callers MUST already hold ``registry_lock`` for read-modify-write
correctness."""
Path(registry_dir).mkdir(parents=True, exist_ok=True)
path = _job_path(job_id, registry_dir)
fd, tmp = tempfile.mkstemp(dir=str(path.parent), prefix=f".{job_id}.", suffix=".tmp")
try:
with os.fdopen(fd, "w", encoding="utf-8") as fh:
json.dump(record, fh, ensure_ascii=False, indent=2)
fh.write("\n")
fh.flush()
os.fsync(fh.fileno())
os.replace(tmp, path)
try:
os.chmod(path, 0o600)
except Exception:
pass
except BaseException:
if os.path.exists(tmp):
os.unlink(tmp)
raise
def update_job_status(job_id: str, registry_dir: str = DEFAULT_REGISTRY_DIR, **fields: Any) -> Dict[str, Any]:
"""Atomically merge ``fields`` into a job record under the registry lock.
Always refreshes ``updated_at``. Returns the new record. Raises
FileNotFoundError if the job does not exist.
This is the single chokepoint for status writes (both ``registry.update_status``
and ``publish_event.py``'s status sync route through here), so it also mirrors
any ``status`` change into the persistent audit log — best-effort, after the
registry lock is released so a slow/failed log write never blocks the record."""
with registry_lock(registry_dir):
record = load_job(job_id, registry_dir)
old_status = record.get("status")
record.update(fields)
record["updated_at"] = _utcnow()
_atomic_write_record(job_id, registry_dir, record)
if "status" in fields:
new_status = record.get("status")
update_logged_status(job_id, new_status, updated_at=record["updated_at"])
if old_status != new_status:
append_event(job_id, {
"event": "status_changed",
"from": old_status,
"to": new_status,
"timestamp": record["updated_at"],
})
return record
def next_seq(job_id: str, registry_dir: str = DEFAULT_REGISTRY_DIR) -> int:
"""Return the next monotonic sequence number for a job, persisted in the
record's ``last_seq`` field so it stays consistent across process restarts.
First call returns 1."""
with registry_lock(registry_dir):
record = load_job(job_id, registry_dir)
seq = int(record.get("last_seq", 0)) + 1
record["last_seq"] = seq
record["updated_at"] = _utcnow()
_atomic_write_record(job_id, registry_dir, record)
return seq
def _utcnow() -> str:
"""ISO-8601 UTC timestamp with trailing Z (payload `timestamp` field)."""
return time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime())
def _utcnow_precise() -> str:
"""ISO-8601 UTC timestamp with millisecond resolution. Used for the audit
log's ``logged_at`` so events sort cleanly even within the same second."""
now = time.time()
base = time.strftime("%Y-%m-%dT%H:%M:%S", time.gmtime(now))
return f"{base}.{int((now % 1) * 1000):03d}Z"
# --------------------------------------------------------------------------
# Persistent audit log (.hermes/delegate_job_logs/<job_id>/...)
#
# Every function here is idempotent, concurrency-safe, and *best-effort*: a
# logging failure is swallowed with a logger.warning and never propagated, so it
# can never break a publish, a subscribe, or a registry write. stdout is never
# touched (it is reserved for data output).
# --------------------------------------------------------------------------
def job_log_dir(job_id: str, logs_dir: Optional[str] = None) -> Path:
return Path(logs_dir or LOGS_DIR) / job_id
def job_log_path(job_id: str, kind: str, logs_dir: Optional[str] = None) -> Path:
"""Path to one audit-log file for a job. ``kind`` is a filename, e.g. the
module constants META_FILENAME / EVENTS_FILENAME / STATUS_FILENAME."""
return job_log_dir(job_id, logs_dir) / kind
@contextmanager
def _file_lock(fh):
"""Best-effort exclusive lock over a single open file via fcntl, so two
processes appending to events.ndjson never interleave a line. A no-op where
fcntl is unavailable (Windows); a short append is atomic enough there."""
try:
import fcntl
except ImportError: # pragma: no cover - non-POSIX
yield
return
fcntl.flock(fh.fileno(), fcntl.LOCK_EX)
try:
yield
finally:
fcntl.flock(fh.fileno(), fcntl.LOCK_UN)
def append_event(job_id: str, event_dict: Dict[str, Any], logs_dir: Optional[str] = None) -> None:
"""Append one event as a JSON line to ``<logs>/<job_id>/events.ndjson``.
Concurrency-safe (fcntl lock over the file) and best-effort. A millisecond
``logged_at`` is stamped when the caller did not supply one."""
try:
path = job_log_path(job_id, EVENTS_FILENAME, logs_dir)
path.parent.mkdir(parents=True, exist_ok=True)
record = dict(event_dict)
record.setdefault("logged_at", _utcnow_precise())
line = json.dumps(record, ensure_ascii=False) + "\n"
with open(path, "a", encoding="utf-8") as fh:
with _file_lock(fh):
fh.write(line)
fh.flush()
except Exception as exc: # pragma: no cover - best effort
logger.warning("append_event failed for job %s: %s", job_id, exc)
def update_logged_status(job_id: str, status: str, logs_dir: Optional[str] = None, **extras: Any) -> None:
"""Rewrite ``<logs>/<job_id>/status.json`` (current status for fast point
queries) atomically. Best-effort; merges any ``extras``."""
try:
path = job_log_path(job_id, STATUS_FILENAME, logs_dir)
path.parent.mkdir(parents=True, exist_ok=True)
record: Dict[str, Any] = {"job_id": job_id, "status": status, "updated_at": _utcnow()}
record.update(extras)
tmp = path.with_name(path.name + ".tmp")
with open(tmp, "w", encoding="utf-8") as fh:
json.dump(record, fh, ensure_ascii=False, indent=2)
fh.write("\n")
os.replace(tmp, path)
except Exception as exc: # pragma: no cover - best effort
logger.warning("update_logged_status failed for job %s: %s", job_id, exc)
def init_job_log(job_id: str, meta: Dict[str, Any], logs_dir: Optional[str] = None) -> None:
"""Seed the per-job audit-log dir: write meta.json, status.json, and a first
``registered`` line in events.ndjson. Idempotent (the ``registered`` line is
written only when events.ndjson does not yet exist) and best-effort."""
try:
d = job_log_dir(job_id, logs_dir)
d.mkdir(parents=True, exist_ok=True)
with open(d / META_FILENAME, "w", encoding="utf-8") as fh:
json.dump(meta, fh, ensure_ascii=False, indent=2)
fh.write("\n")
status = meta.get("status", "pending")
update_logged_status(
job_id, status, logs_dir=logs_dir,
created_at=meta.get("created_at"), prompt=meta.get("prompt"),
)
events_path = d / EVENTS_FILENAME
first_time = not events_path.exists()
events_path.touch(exist_ok=True)
if first_time:
append_event(job_id, {
"event": "registered",
"status": status,
"agent": meta.get("agent"),
"agent_session": meta.get("agent_session"),
"topic_prefix": meta.get("topic_prefix"),
"timestamp": meta.get("created_at"),
}, logs_dir=logs_dir)
except Exception as exc: # pragma: no cover - best effort
logger.warning("init_job_log failed for job %s: %s", job_id, exc)
def read_logged_meta(job_id: str, logs_dir: Optional[str] = None) -> Optional[Dict[str, Any]]:
"""Return a job's audit meta.json (registration snapshot), or None."""
try:
with open(job_log_path(job_id, META_FILENAME, logs_dir), "r", encoding="utf-8") as fh:
return json.load(fh)
except (OSError, json.JSONDecodeError):
return None
def read_logged_status(job_id: str, logs_dir: Optional[str] = None) -> Optional[Dict[str, Any]]:
"""Return a job's current status.json, or None. This is the fast point-query
file (current status only), separate from the registration-time meta.json."""
try:
with open(job_log_path(job_id, STATUS_FILENAME, logs_dir), "r", encoding="utf-8") as fh:
return json.load(fh)
except (OSError, json.JSONDecodeError):
return None
def iter_logged_events(job_id: str, logs_dir: Optional[str] = None):
"""Yield each parsed event from a job's events.ndjson in file (time) order.
Malformed lines are skipped with a warning."""
path = job_log_path(job_id, EVENTS_FILENAME, logs_dir)
if not path.exists():
return
with open(path, "r", encoding="utf-8") as fh:
for line in fh:
line = line.strip()
if not line:
continue
try:
yield json.loads(line)
except json.JSONDecodeError:
logger.warning("skipping malformed audit line in %s", path)
def list_logged_jobs(logs_dir: Optional[str] = None) -> List[Dict[str, Any]]:
"""Return one meta record per job directory under the logs root, oldest
first. Falls back to ``{"job_id": <dir>}`` when meta.json is missing."""
base = Path(logs_dir or LOGS_DIR)
out: List[Dict[str, Any]] = []
if not base.exists():
return out
for d in sorted(base.iterdir()):
if not d.is_dir():
continue
meta = read_logged_meta(d.name, logs_dir) or {"job_id": d.name}
# Overlay the live status.json so the summary reflects current state, not
# the registration-time snapshot frozen in meta.json.
status = read_logged_status(d.name, logs_dir)
if status:
meta = {**meta,
"status": status.get("status", meta.get("status")),
"updated_at": status.get("updated_at", meta.get("updated_at"))}
out.append(meta)
out.sort(key=lambda m: m.get("created_at") or "")
return out
# --------------------------------------------------------------------------
# Retry helper
# --------------------------------------------------------------------------
def with_retry(
fn: Optional[Callable] = None,
*,
attempts: int = 3,
base_delay: float = 0.5,
factor: float = 2.0,
max_delay: float = 8.0,
exceptions: Iterable[type] = (Exception,),
) -> Callable:
"""Retry ``fn`` with exponential backoff.
Usable two ways::
result = with_retry(do_publish, attempts=3)() # wrap-and-call
@with_retry(attempts=5, base_delay=1.0) # decorator
def do_publish(): ...
Re-raises the last exception once ``attempts`` is exhausted.
"""
exc_tuple = tuple(exceptions)
def decorate(func: Callable) -> Callable:
@functools.wraps(func)
def wrapper(*args: Any, **kwargs: Any) -> Any:
delay = base_delay
last_exc: Optional[BaseException] = None
for attempt in range(1, attempts + 1):
try:
return func(*args, **kwargs)
except exc_tuple as exc:
last_exc = exc
if attempt >= attempts:
break
logger.warning(
"attempt %d/%d failed: %s; retrying in %.1fs",
attempt, attempts, exc, delay,
)
time.sleep(delay)
delay = min(delay * factor, max_delay)
assert last_exc is not None
raise last_exc
return wrapper
if fn is not None:
return decorate(fn)
return decorate
def setup_logging(level: int = logging.WARNING) -> None:
"""Configure root logging to stderr. stdout is reserved for data output
(subscriber event lines, registry ids)."""
import sys
logging.basicConfig(
level=level,
stream=sys.stderr,
format="%(asctime)s %(levelname)s %(name)s: %(message)s",
)
@@ -0,0 +1,229 @@
#!/usr/bin/env python3
"""publish_event.py — the single entry point for emitting a Job event.
Loads the job record from the registry, resolves its broker, assigns the next
monotonic ``seq``, builds the schema-v1 JSON payload, and publishes it to
``<topic_prefix>/events`` over QoS 1 with exponential-backoff retry.
Silent by design: nothing is printed to stdout. Diagnostics go to stderr via
logging. Terminal events (``completed``/``error``) publish with retain=True so
a late subscriber still observes the final state (production hardening).
Exit codes:
0 published successfully
1 parameter / registry error (bad args, unknown job, no pending job)
2 publish failed after retries (network / broker / ACK timeout)
Usage:
publish_event.py --job <id> --event started [--detail "..."] [--data '{...}']
publish_event.py --pick-pending --agent-session tmux:claude --event completed
publish_event.py --job <id> --event completed --retained
"""
from __future__ import annotations
import argparse
import hashlib
import hmac
import json
import logging
import sys
import time
from typing import Any, Dict, Optional
import mqtt_common
import registry
from mqtt_common import (
DEFAULT_REGISTRY_DIR,
SCHEMA_VERSION,
broker_config_from_job,
events_topic_for,
load_job,
make_client,
next_seq,
with_retry,
)
logger = logging.getLogger("delegate_job.publish_event")
VALID_EVENTS = ("started", "permission_required", "progress", "completed", "error")
TERMINAL_EVENTS = ("completed", "error")
# event -> registry status to sync as a best-effort side effect
EVENT_TO_STATUS = {
"started": "running",
"completed": "completed",
"error": "error",
}
CONNECT_ACK_TIMEOUT = 10 # seconds to wait for CONNACK
PUBLISH_ACK_TIMEOUT = 5 # seconds to wait for QoS-1 PUBACK
def build_payload(
job_id: str,
seq: int,
event: str,
detail: str,
data: Optional[Dict[str, Any]],
auth_token: Optional[str],
) -> Dict[str, Any]:
payload: Dict[str, Any] = {
"schema_version": SCHEMA_VERSION,
"seq": seq,
"job_id": job_id,
"event": event,
"timestamp": mqtt_common._utcnow(),
"detail": detail,
"data": dict(data) if data else {},
}
# Production: carry the per-job HMAC-SHA256 signature in `data.hmac_sig` so
# the subscriber can verify the publisher without exposing the secret token.
# The signature is calculated over the entire payload (with `data.hmac_sig` excluded).
if auth_token:
sign_payload = {k: v for k, v in payload.items() if k != "data"}
sign_payload["data"] = {k: v for k, v in payload.get("data", {}).items() if k != "hmac_sig"}
msg = json.dumps(sign_payload, sort_keys=True, separators=(",", ":")).encode()
sig = hmac.new(auth_token.encode(), msg, hashlib.sha256).hexdigest()
payload["data"]["hmac_sig"] = sig
return payload
def _publish_once(config, topic: str, body: bytes, retain: bool) -> None:
"""Connect, publish one QoS-1 message, wait for the broker ACK, disconnect.
Raises on any failure so ``with_retry`` can re-run the whole sequence (a
fresh connection per attempt is the robust choice for a PoC)."""
client = make_client("publisher", config)
connected = {"rc": None}
def on_connect(_c, _u, _flags, reason_code, _props):
connected["rc"] = reason_code
client.on_connect = on_connect
client.connect(config.host, config.port, config.keepalive)
client.loop_start()
try:
# Wait for CONNACK so we fail fast on auth/TLS errors.
deadline = time.monotonic() + CONNECT_ACK_TIMEOUT
while connected["rc"] is None and time.monotonic() < deadline:
time.sleep(0.05)
if connected["rc"] is None:
raise TimeoutError("no CONNACK from broker")
if mqtt_common.reason_code_value(connected["rc"]) != 0:
raise ConnectionError(f"broker refused connection: rc={connected['rc']}")
info = client.publish(topic, payload=body, qos=1, retain=retain)
info.wait_for_publish(timeout=PUBLISH_ACK_TIMEOUT)
if not info.is_published():
raise TimeoutError("publish not acknowledged within timeout")
finally:
client.loop_stop()
try:
client.disconnect()
except Exception: # pragma: no cover - disconnect best effort
pass
def _resolve_job_id(args) -> Optional[str]:
if args.pick_pending:
return registry.pick_pending(args.agent_session, args.registry_dir)
return args.job
def main(argv=None) -> int:
parser = argparse.ArgumentParser(description="Publish a Job event to MQTT")
target = parser.add_mutually_exclusive_group(required=True)
target.add_argument("--job", help="job id to publish for")
target.add_argument("--pick-pending", action="store_true",
help="auto-select a pending job for --agent-session")
parser.add_argument("--agent-session", default="tmux:claude",
help="session label used with --pick-pending")
parser.add_argument("--event", default="progress", choices=VALID_EVENTS)
parser.add_argument("--detail", default="")
parser.add_argument("--data", default=None, help="optional JSON object string")
parser.add_argument("--retained", action="store_true",
help="force retain=True (auto for completed/error)")
parser.add_argument("--registry-dir", default=DEFAULT_REGISTRY_DIR)
parser.add_argument("--attempts", type=int, default=3)
parser.add_argument("-v", "--verbose", action="store_true")
args = parser.parse_args(argv)
mqtt_common.setup_logging(logging.DEBUG if args.verbose else logging.WARNING)
# --- parse optional data JSON (parameter error -> exit 1) ---
data: Optional[Dict[str, Any]] = None
if args.data:
try:
data = json.loads(args.data)
if not isinstance(data, dict):
raise ValueError("--data must be a JSON object")
except (ValueError, json.JSONDecodeError) as exc:
logger.error("invalid --data: %s", exc)
return 1
job_id = _resolve_job_id(args)
if not job_id:
logger.error("no job to publish for (unknown --job or no pending job)")
return 1
try:
job = load_job(job_id, args.registry_dir)
except FileNotFoundError as exc:
logger.error("%s", exc)
return 1
config = broker_config_from_job(job)
topic = job.get("topic_prefix")
topic = f"{topic}/events" if topic else events_topic_for(job_id)
seq = next_seq(job_id, args.registry_dir)
payload = build_payload(
job_id=job_id,
seq=seq,
event=args.event,
detail=args.detail,
data=data,
auth_token=job.get("auth_token"),
)
body = json.dumps(payload, ensure_ascii=False).encode("utf-8")
retain = args.retained or args.event in TERMINAL_EVENTS
publish = with_retry(
_publish_once,
attempts=args.attempts,
exceptions=(OSError, TimeoutError, ConnectionError, ValueError),
)
try:
publish(config, topic, body, retain)
except Exception as exc:
logger.error("publish failed after %d attempts: %s", args.attempts, exc)
return 2
# Persistent audit log: record the exact payload we put on the wire so the
# publish is reproducible from the log alone. Best-effort (isolated inside
# append_event) — never fails the publish.
mqtt_common.append_event(job_id, {
"event": "published",
"source_event": args.event,
"seq": seq,
"topic": topic,
"retain": retain,
"timestamp": payload["timestamp"],
"detail": args.detail,
"payload": payload,
})
# Best-effort side effects: registry status sync + (debug) event log. Never
# fail the publish on these.
registry.append_event(job_id, args.registry_dir, payload)
new_status = EVENT_TO_STATUS.get(args.event)
if new_status:
try:
mqtt_common.update_job_status(job_id, args.registry_dir, status=new_status)
except Exception as exc: # pragma: no cover - best effort
logger.warning("status sync failed: %s", exc)
logger.info("published %s seq=%d job=%s retain=%s", args.event, seq, job_id, retain)
return 0
if __name__ == "__main__":
sys.exit(main())
@@ -0,0 +1,334 @@
"""Job registry for the tmux-agent-orchestrate-delegate-job skill.
A job record is the single source of truth for one delegated unit of work:
its id, prompt, owning agent session, broker connection, timeouts, and status.
Records live as ``<registry_dir>/<job_id>.json`` with an append-only event log
``<registry_dir>/<job_id>.events.log`` and a shared ``<registry_dir>/.lock``.
Concurrency is handled via the fcntl lock in :mod:`mqtt_common` (PoC). For
multi-host delegation, migrate to SQLite WAL — see references/registry.md.
Importable as a library and runnable as a CLI (``register``/``list``/``get``/
``status``/``pick``) so the ``tmux-agent-orchestrate-delegate-job`` bash wrapper can shell out.
"""
from __future__ import annotations
import argparse
import json
import logging
import sys
import uuid
from pathlib import Path
from typing import Any, Dict, List, Optional
import mqtt_common
from mqtt_common import (
DEFAULT_REGISTRY_DIR,
SCHEMA_VERSION,
_atomic_write_record,
_utcnow,
broker_config_from_env,
load_job,
registry_lock,
topic_prefix_for,
)
logger = logging.getLogger("delegate_job.registry")
TERMINAL_STATUSES = ("completed", "error", "cancelled")
VALID_STATUSES = ("pending", "running", "completed", "error", "cancelled")
def generate_job_id(bits: int = 32) -> str:
"""PoC: 32-bit hex (8 chars). Production: 128-bit (full uuid4 hex)."""
if bits >= 128:
return uuid.uuid4().hex
nibbles = max(1, bits // 4)
return uuid.uuid4().hex[:nibbles]
def register_job(
prompt: str,
agent: str = "claude-code",
agent_session: str = "tmux:claude",
broker: Optional[Dict[str, Any]] = None,
timeout_sec: int = 3600,
idle_timeout_sec: int = 120,
registry_dir: str = DEFAULT_REGISTRY_DIR,
job_id: Optional[str] = None,
expected_artifacts: Optional[List[str]] = None,
bits: int = 32,
auth_token: Optional[str] = None,
) -> str:
"""Create a new ``pending`` job record and return its id.
``broker`` defaults to the current environment's resolved broker block, so
the registry alone is enough for ``publish_event.py`` to connect later.
"""
job_id = job_id or generate_job_id(bits)
if broker is None:
broker = broker_config_from_env().to_registry_block()
if auth_token is None:
# Auto-generate token if secure broker configuration (TLS or username) is detected
if broker.get("tls") or broker.get("username"):
import secrets
auth_token = secrets.token_urlsafe(32)
now = _utcnow()
record: Dict[str, Any] = {
"schema_version": SCHEMA_VERSION,
"job_id": job_id,
"status": "pending",
"created_at": now,
"updated_at": now,
"prompt": prompt,
"agent": agent,
"agent_session": agent_session,
"broker": broker,
"topic_prefix": topic_prefix_for(job_id),
"timeout_sec": int(timeout_sec),
"idle_timeout_sec": int(idle_timeout_sec),
"expected_artifacts": expected_artifacts or [],
"last_seq": 0,
"auth_token": auth_token,
}
with registry_lock(registry_dir):
if mqtt_common._job_path(job_id, registry_dir).exists():
raise FileExistsError(f"job already exists: {job_id}")
_atomic_write_record(job_id, registry_dir, record)
# Seed the persistent audit log (meta.json + status.json + a "registered"
# event). Best-effort inside init_job_log — never blocks registration.
mqtt_common.init_job_log(job_id, meta=record)
logger.info("registered job %s (agent=%s session=%s)", job_id, agent, agent_session)
return job_id
def pick_pending(agent_session: str, registry_dir: str = DEFAULT_REGISTRY_DIR) -> Optional[str]:
"""Claim the oldest ``pending`` job for ``agent_session``, flipping it to
``running`` atomically under the lock. Returns the job id, or None if no
pending job matches. This is how each tmux session takes only its own work
without two sessions grabbing the same job."""
with registry_lock(registry_dir):
candidates = []
for record in _iter_records(registry_dir):
if record.get("status") == "pending" and record.get("agent_session") == agent_session:
candidates.append(record)
if not candidates:
return None
candidates.sort(key=lambda r: r.get("created_at", ""))
chosen = candidates[0]
chosen["status"] = "running"
chosen["updated_at"] = _utcnow()
_atomic_write_record(chosen["job_id"], registry_dir, chosen)
logger.info("session %s picked job %s", agent_session, chosen["job_id"])
job_id = chosen["job_id"]
updated_at = chosen["updated_at"]
# pick_pending writes the record directly (not via update_job_status), so it
# mirrors the pending->running transition into the audit log here. Best-effort.
mqtt_common.update_logged_status(job_id, "running", updated_at=updated_at)
mqtt_common.append_event(job_id, {
"event": "status_changed",
"from": "pending",
"to": "running",
"by": agent_session,
"timestamp": updated_at,
})
return job_id
def update_status(job_id: str, registry_dir: str, status: str) -> Dict[str, Any]:
if status not in VALID_STATUSES:
raise ValueError(f"invalid status {status!r}; expected one of {VALID_STATUSES}")
return mqtt_common.update_job_status(job_id, registry_dir, status=status)
def list_jobs(registry_dir: str = DEFAULT_REGISTRY_DIR, status: Optional[str] = None) -> List[Dict[str, Any]]:
records = list(_iter_records(registry_dir))
if status:
records = [r for r in records if r.get("status") == status]
records.sort(key=lambda r: r.get("created_at", ""))
return records
def append_event(job_id: str, registry_dir: str, payload: Dict[str, Any]) -> None:
"""Append one event payload as a JSON line to the job's events log. Best
effort, debug-only; failures are logged but never raised to the caller."""
try:
Path(registry_dir).mkdir(parents=True, exist_ok=True)
log_path = Path(registry_dir) / f"{job_id}.events.log"
with open(log_path, "a", encoding="utf-8") as fh:
fh.write(json.dumps(payload, ensure_ascii=False) + "\n")
except OSError as exc: # pragma: no cover - best effort
logger.warning("could not append event for %s: %s", job_id, exc)
# convenience re-export so callers can `from registry import load_job`
__all__ = [
"register_job", "pick_pending", "update_status", "load_job",
"list_jobs", "append_event", "generate_job_id",
]
def _iter_records(registry_dir: str):
base = Path(registry_dir)
if not base.exists():
return
for path in sorted(base.glob("*.json")):
try:
with open(path, "r", encoding="utf-8") as fh:
yield json.load(fh)
except (OSError, json.JSONDecodeError) as exc:
logger.warning("skipping unreadable record %s: %s", path, exc)
# --------------------------------------------------------------------------
# CLI (so the bash wrapper can shell out without inline python)
# --------------------------------------------------------------------------
def _build_parser() -> argparse.ArgumentParser:
parser = argparse.ArgumentParser(description="tmux-agent-orchestrate-delegate-job registry CLI")
parser.add_argument("--registry-dir", default=DEFAULT_REGISTRY_DIR)
sub = parser.add_subparsers(dest="command", required=True)
p_reg = sub.add_parser("register", help="create a pending job; prints the job id")
p_reg.add_argument("--prompt", required=True)
p_reg.add_argument("--agent", default="claude-code")
p_reg.add_argument("--agent-session", default="tmux:claude")
p_reg.add_argument("--timeout", type=int, default=3600)
p_reg.add_argument("--idle-timeout", type=int, default=120)
p_reg.add_argument("--bits", type=int, default=32, help="32 (PoC) or 128 (prod)")
p_reg.add_argument("--artifact", action="append", default=[], dest="artifacts")
p_reg.add_argument("--auth-token", default=None, help="HMAC auth token for the job (auto-generated if secure broker is detected)")
p_list = sub.add_parser("list", help="list jobs (optionally by status)")
p_list.add_argument("--status", default=None)
p_list.add_argument("--json", action="store_true")
p_get = sub.add_parser("get", help="print one job record as JSON")
p_get.add_argument("--job", required=True)
p_status = sub.add_parser("status", help="set a job status")
p_status.add_argument("--job", required=True)
p_status.add_argument("--set", required=True, dest="status")
p_pick = sub.add_parser("pick", help="claim a pending job for a session; prints id")
p_pick.add_argument("--agent-session", default="tmux:claude")
p_logs = sub.add_parser(
"logs",
help="show the persistent audit log for a job, or --list every logged job",
)
p_logs.add_argument("job_id", nargs="?", default=None,
help="job id whose events.ndjson to print")
p_logs.add_argument("--list", action="store_true", dest="list_all",
help="summarise every job under the logs dir instead")
p_logs.add_argument("--logs-dir", default=None,
help="override the audit-log root (default: $DELEGATE_JOB_LOGS_DIR "
"or <cwd>/.hermes/delegate_job_logs)")
p_logs.add_argument("--tail", type=int, default=0,
help="show only the last N events (0 = all)")
p_logs.add_argument("--json", action="store_true",
help="emit raw JSON lines / records instead of a table")
return parser
def main(argv: Optional[List[str]] = None) -> int:
mqtt_common.setup_logging(logging.INFO)
args = _build_parser().parse_args(argv)
rd = args.registry_dir
if args.command == "register":
job_id = register_job(
prompt=args.prompt,
agent=args.agent,
agent_session=args.agent_session,
timeout_sec=args.timeout,
idle_timeout_sec=args.idle_timeout,
registry_dir=rd,
expected_artifacts=args.artifacts,
bits=args.bits,
auth_token=args.auth_token,
)
print(job_id)
return 0
if args.command == "list":
records = list_jobs(rd, status=args.status)
if args.json:
print(json.dumps(records, ensure_ascii=False, indent=2))
else:
if not records:
print("(no jobs)")
for r in records:
print(f"{r['job_id']} {r.get('status','?'):10s} {r.get('agent_session','')}"
f" {r.get('prompt','')[:48]}")
return 0
if args.command == "get":
try:
print(json.dumps(load_job(args.job, rd), ensure_ascii=False, indent=2))
except FileNotFoundError as exc:
print(str(exc), file=sys.stderr)
return 1
return 0
if args.command == "status":
try:
update_status(args.job, rd, args.status)
except (FileNotFoundError, ValueError) as exc:
print(str(exc), file=sys.stderr)
return 1
return 0
if args.command == "pick":
job_id = pick_pending(args.agent_session, rd)
if job_id is None:
return 3 # no pending job for this session
print(job_id)
return 0
if args.command == "logs":
return _cmd_logs(args)
return 1
def _cmd_logs(args) -> int:
"""Pretty-print one job's events.ndjson, or summarise all logged jobs."""
logs_dir = args.logs_dir or mqtt_common.LOGS_DIR
if args.list_all:
jobs = mqtt_common.list_logged_jobs(logs_dir)
if args.json:
print(json.dumps(jobs, ensure_ascii=False, indent=2))
return 0
if not jobs:
print(f"(no logged jobs under {logs_dir})")
return 0
for m in jobs:
print(f"{m.get('job_id','?')} {m.get('status','?'):10s} "
f"{m.get('created_at','-'):20s} {(m.get('prompt') or '')[:48]}")
return 0
if not args.job_id:
print("logs requires a <job_id> or --list", file=sys.stderr)
return 1
events = list(mqtt_common.iter_logged_events(args.job_id, logs_dir))
if not events and not mqtt_common.job_log_dir(args.job_id, logs_dir).exists():
print(f"no audit log for job {args.job_id} under {logs_dir}", file=sys.stderr)
return 1
if args.tail and args.tail > 0:
events = events[-args.tail:]
if args.json:
for e in events:
print(json.dumps(e, ensure_ascii=False))
return 0
for e in events:
ts = e.get("logged_at") or e.get("timestamp") or "-"
extra = e.get("detail") or e.get("to") or e.get("source_event") or ""
print(f"{ts:24s} {e.get('event','?'):<16s} {extra}")
return 0
if __name__ == "__main__":
sys.exit(main())