refactor(security,concurrency): resolve structural issues, enforce Claude permission skip, update docs
This commit is contained in:
@@ -328,24 +328,24 @@ def update_job_status(job_id: str, registry_dir: str = DEFAULT_REGISTRY_DIR, **f
|
||||
|
||||
This is the single chokepoint for status writes (both ``registry.update_status``
|
||||
and ``publish_event.py``'s status sync route through here), so it also mirrors
|
||||
any ``status`` change into the persistent audit log — best-effort, after the
|
||||
registry lock is released so a slow/failed log write never blocks the record."""
|
||||
any ``status`` change into the persistent audit log. We perform the log mirror
|
||||
under the lock to guarantee sequential consistency in audit history."""
|
||||
with registry_lock(registry_dir):
|
||||
record = load_job(job_id, registry_dir)
|
||||
old_status = record.get("status")
|
||||
record.update(fields)
|
||||
record["updated_at"] = _utcnow()
|
||||
_atomic_write_record(job_id, registry_dir, record)
|
||||
if "status" in fields:
|
||||
new_status = record.get("status")
|
||||
update_logged_status(job_id, new_status, updated_at=record["updated_at"])
|
||||
if old_status != new_status:
|
||||
append_event(job_id, {
|
||||
"event": "status_changed",
|
||||
"from": old_status,
|
||||
"to": new_status,
|
||||
"timestamp": record["updated_at"],
|
||||
})
|
||||
if "status" in fields:
|
||||
new_status = record.get("status")
|
||||
update_logged_status(job_id, new_status, updated_at=record["updated_at"])
|
||||
if old_status != new_status:
|
||||
append_event(job_id, {
|
||||
"event": "status_changed",
|
||||
"from": old_status,
|
||||
"to": new_status,
|
||||
"timestamp": record["updated_at"],
|
||||
})
|
||||
return record
|
||||
|
||||
|
||||
@@ -410,6 +410,21 @@ def _file_lock(fh):
|
||||
fcntl.flock(fh.fileno(), fcntl.LOCK_UN)
|
||||
|
||||
|
||||
def _redact_dict(d: Any) -> Any:
|
||||
"""Recursively mask sensitive values (passwords, secrets, tokens) inside logs."""
|
||||
if isinstance(d, dict):
|
||||
redacted = {}
|
||||
for k, v in d.items():
|
||||
if any(s in k.lower() for s in ("password", "token", "secret", "auth_token", "key")):
|
||||
redacted[k] = "[REDACTED]"
|
||||
else:
|
||||
redacted[k] = _redact_dict(v)
|
||||
return redacted
|
||||
elif isinstance(d, list):
|
||||
return [_redact_dict(item) for item in d]
|
||||
return d
|
||||
|
||||
|
||||
def append_event(job_id: str, event_dict: Dict[str, Any], logs_dir: Optional[str] = None) -> None:
|
||||
"""Append one event as a JSON line to ``<logs>/<job_id>/events.ndjson``.
|
||||
|
||||
@@ -418,7 +433,7 @@ def append_event(job_id: str, event_dict: Dict[str, Any], logs_dir: Optional[str
|
||||
try:
|
||||
path = job_log_path(job_id, EVENTS_FILENAME, logs_dir)
|
||||
path.parent.mkdir(parents=True, exist_ok=True)
|
||||
record = dict(event_dict)
|
||||
record = _redact_dict(dict(event_dict))
|
||||
record.setdefault("logged_at", _utcnow_precise())
|
||||
line = json.dumps(record, ensure_ascii=False) + "\n"
|
||||
with open(path, "a", encoding="utf-8") as fh:
|
||||
@@ -453,8 +468,9 @@ def init_job_log(job_id: str, meta: Dict[str, Any], logs_dir: Optional[str] = No
|
||||
try:
|
||||
d = job_log_dir(job_id, logs_dir)
|
||||
d.mkdir(parents=True, exist_ok=True)
|
||||
meta_redacted = _redact_dict(meta)
|
||||
with open(d / META_FILENAME, "w", encoding="utf-8") as fh:
|
||||
json.dump(meta, fh, ensure_ascii=False, indent=2)
|
||||
json.dump(meta_redacted, fh, ensure_ascii=False, indent=2)
|
||||
fh.write("\n")
|
||||
status = meta.get("status", "pending")
|
||||
update_logged_status(
|
||||
|
||||
Reference in New Issue
Block a user