feat: implement loop and discuss task delegation types in multi-agent-mux-delegate-job

This commit is contained in:
2026-06-27 08:28:47 +09:00
parent 3b8db1eca2
commit dfd0a9483d
4 changed files with 417 additions and 50 deletions
@@ -59,6 +59,10 @@ def register_job(
expected_artifacts: Optional[List[str]] = None,
bits: int = 32,
auth_token: Optional[str] = None,
job_type: str = "direct",
reviewer: Optional[str] = None,
reviewer_session: Optional[str] = None,
max_iterations: int = 5,
) -> str:
"""Create a new ``pending`` job record and return its id.
@@ -90,6 +94,11 @@ def register_job(
"expected_artifacts": expected_artifacts or [],
"last_seq": 0,
"auth_token": auth_token,
"job_type": job_type,
"reviewer": reviewer,
"reviewer_session": reviewer_session,
"max_iterations": int(max_iterations),
"iteration": 1,
}
with registry_lock(registry_dir):
if mqtt_common._job_path(job_id, registry_dir).exists():
@@ -164,7 +173,7 @@ def append_event(job_id: str, registry_dir: str, payload: Dict[str, Any]) -> Non
# convenience re-export so callers can `from registry import load_job`
__all__ = [
"register_job", "pick_pending", "update_status", "load_job",
"list_jobs", "append_event", "generate_job_id",
"list_jobs", "append_event", "generate_job_id", "get_feedback",
]
@@ -180,6 +189,44 @@ def _iter_records(registry_dir: str):
logger.warning("skipping unreadable record %s: %s", path, exc)
def get_feedback(job_id: str, registry_dir: str = DEFAULT_REGISTRY_DIR) -> str:
"""Read the job's audit log or events log and return the detail of the last completed/error event."""
# 1) Try the unified audit log first (ndjson) since it's written synchronously by the subscriber
try:
import mqtt_common
logs_dir = mqtt_common.LOGS_DIR
events = list(mqtt_common.iter_logged_events(job_id, logs_dir))
for e in reversed(events):
if e.get("source_event") in ("completed", "error"):
return e.get("detail", "")
if e.get("event") in ("completed", "error"):
return e.get("detail", "")
except Exception:
pass
# 2) Fallback to local .events.log
log_path = Path(registry_dir) / f"{job_id}.events.log"
if log_path.exists():
feedback = ""
try:
with open(log_path, "r", encoding="utf-8") as fh:
for line in fh:
if not line.strip():
continue
try:
payload = json.loads(line)
if payload.get("event") in ("completed", "error"):
feedback = payload.get("detail", "")
except json.JSONDecodeError:
continue
except OSError:
pass
if feedback:
return feedback
return ""
# --------------------------------------------------------------------------
# CLI (so the bash wrapper can shell out without inline python)
# --------------------------------------------------------------------------
@@ -197,6 +244,10 @@ def _build_parser() -> argparse.ArgumentParser:
p_reg.add_argument("--bits", type=int, default=32, help="32 (PoC) or 128 (prod)")
p_reg.add_argument("--artifact", action="append", default=[], dest="artifacts")
p_reg.add_argument("--auth-token", default=None, help="HMAC auth token for the job (auto-generated if secure broker is detected)")
p_reg.add_argument("--job-type", default="direct", choices=["direct", "loop", "discuss"])
p_reg.add_argument("--reviewer", default=None)
p_reg.add_argument("--reviewer-session", default=None)
p_reg.add_argument("--max-iterations", type=int, default=5)
p_list = sub.add_parser("list", help="list jobs (optionally by status)")
p_list.add_argument("--status", default=None)
@@ -209,6 +260,16 @@ def _build_parser() -> argparse.ArgumentParser:
p_status.add_argument("--job", required=True)
p_status.add_argument("--set", required=True, dest="status")
p_update = sub.add_parser("update", help="update a job record")
p_update.add_argument("--job", required=True)
p_update.add_argument("--status", default=None)
p_update.add_argument("--agent-session", default=None)
p_update.add_argument("--prompt", default=None)
p_update.add_argument("--iteration", type=int, default=None)
p_feedback = sub.add_parser("get-feedback", help="get the last feedback detail (completed/error) for a job")
p_feedback.add_argument("--job", required=True)
p_pick = sub.add_parser("pick", help="claim a pending job for a session; prints id")
p_pick.add_argument("--agent-session", default="tmux:claude")
@@ -247,6 +308,10 @@ def main(argv: Optional[List[str]] = None) -> int:
expected_artifacts=args.artifacts,
bits=args.bits,
auth_token=args.auth_token,
job_type=args.job_type,
reviewer=args.reviewer,
reviewer_session=args.reviewer_session,
max_iterations=args.max_iterations,
)
print(job_id)
return 0
@@ -279,6 +344,27 @@ def main(argv: Optional[List[str]] = None) -> int:
return 1
return 0
if args.command == "update":
fields = {}
if args.status is not None:
fields["status"] = args.status
if args.agent_session is not None:
fields["agent_session"] = args.agent_session
if args.prompt is not None:
fields["prompt"] = args.prompt
if args.iteration is not None:
fields["iteration"] = args.iteration
try:
mqtt_common.update_job_status(args.job, rd, **fields)
except FileNotFoundError as exc:
print(str(exc), file=sys.stderr)
return 1
return 0
if args.command == "get-feedback":
print(get_feedback(args.job, rd))
return 0
if args.command == "pick":
job_id = pick_pending(args.agent_session, rd)
if job_id is None: