Pipeline lifecycle presets (std/lifecycle)
The pipeline DSL accepts a single on_finish callback that runs after
the pipeline's declared steps complete and before the pipeline returns
its value to the host. The callback signature is
fn(harness, return_value) and its return value replaces the pipeline's
return value. Register a callback with the global pipeline_on_finish
builtin from anywhere inside the pipeline body.
import { on_finish_drain } from "std/lifecycle"
pipeline default() {
pipeline_on_finish(on_finish_drain)
return "ok"
}
Harness lifecycle methods
harness.unsettled_state() returns one snapshot dict with five lists:
suspended_subagents, queued_triggers, partial_handoffs,
in_flight_llm_calls, and pool_pending_tasks. Suspended subagents
come from the host worker registry, queued triggers come from the
trigger inbox and worker queue event-log records, partial handoffs come
from harness.handoff_to, in-flight LLM calls come from the live LLM
call registry, and pool pending tasks come from the thread-local pool
registry (queued + running tasks created by pool.submit(...)).
harness.is_empty(state?) returns true when every unsettled bucket is
empty. Pass an already-captured snapshot to keep decisions consistent
inside one callback, or omit the argument to ask the harness for a fresh
snapshot.
harness.counts(state?) returns {suspended, queued, partial, in_flight, pool_pending} for either the supplied snapshot or a fresh
harness snapshot. Use it for audit payloads and branch decisions instead
of recomputing bucket lengths manually.
harness.summary(state?) returns a single operator-readable line such
as no unsettled work or a per-bucket count summary. It is intended for
logs and audit records, not as a machine-readable contract.
harness.resume_subagent(handle, input?) delegates to the host worker
resume primitive for suspended workers. When input is supplied and the
worker is an awaiting retriggerable subagent, the harness falls back to
the existing send-input primitive.
harness.cancel_subagent(handle, reason?) delegates to
__host_worker_close and returns that primitive's worker summary or
error. The optional reason is reserved for callers that want to include
the reason in their own audit payload before cancelling.
harness.handoff_to(target_pipeline, payload?) records a partial
handoff envelope and returns {status: "queued", envelope}. The envelope
appears in subsequent partial_handoffs snapshots with from, to,
payload_summary, queued_at_ms, and age_ms fields.
harness.acknowledge_trigger(id) settles one queued trigger item. Worker
queue items are acknowledged with the existing queue ack event, while
trigger inbox items are settled by appending the existing dispatcher
cancel request; missing ids return {status: "not_found"}.
harness.defer_trigger(id, target_pipeline?) acknowledges the queued
trigger and records a partial handoff envelope for the supplied target
(default deferred-triggers). If the trigger cannot be acknowledged,
the returned receipt keeps the failed acknowledgement result.
harness.acknowledge_handoff(envelope_id, decision?) removes a partial
handoff envelope from the unsettled registry and emits a
handoff_acknowledged lifecycle audit entry with the caller's decision
payload.
harness.wait_for_any_settlement(max_duration?) takes a fresh snapshot
and returns {status, timed_out, state}. It returns settled when the
snapshot is empty and unsettled otherwise; callers that need a richer
drain loop should delegate to the drain preset or settlement agent.
harness.emit_audit(kind, payload?) records a typed
LifecycleAuditEntry with a monotonic per-run seq number. When an
active event log exists, the same entry is persisted to the
pipeline.lifecycle.audit topic alongside run-record data.
harness.finalize(disposition?) stores the pipeline disposition for the
current lifecycle run and emits a pipeline_finalized audit entry. The
receipt is shaped {status: "finalized", method: "finalize", entry}.
harness.spawn_settlement_agent(unsettled, return_value) is the handoff
point used by on_finish_drain when unsettled work remains. The
settlement-agent loop and constrained drain tool surface are tracked by
harn#1856; until that lands this method returns a typed unsupported
receipt rather than silently dropping the work.
harness.current_pipeline_id() returns the current run id when a
mutation session is installed, otherwise the session id, otherwise
nil. Handoff producers use this to stamp the origin pipeline.
Four canonical presets ship from std/lifecycle. Each is a pure
function (or a pure factory returning one) with no captured state, so
chaining and reuse are safe.
on_finish_abandon(harness, return_value)
Reproduces today's no-callback behavior, but emits a
pipeline_abandoned_unsettled audit entry when unsettled state is
non-empty so the lost work is at least observable. Returns
return_value unchanged.
Use this preset when the pipeline's contract is "fire and forget" — deferred work that survives the pipeline's exit is acceptable and any downstream cleanup is the host's responsibility.
import { on_finish_abandon } from "std/lifecycle"
pipeline default() {
pipeline_on_finish(on_finish_abandon)
return "ok"
}
on_finish_drain(harness, return_value)
The recommended default. Scans the harness unsettled state and either
finalizes the pipeline immediately (when nothing is deferred) or
delegates the per-item disposition to a settlement agent via
harness.spawn_settlement_agent. The settlement-agent loop itself
lands in harn#1856 (P-03); until that ticket ships, the harness method
returns a typed unsupported receipt and the preset surfaces that
receipt as the pipeline's return value so callers can detect the gap
deterministically.
import { on_finish_drain } from "std/lifecycle"
pipeline default() {
pipeline_on_finish(on_finish_drain)
return "triage complete"
}
on_finish_block_until_settled(timeout, fallback?)
Returns a callback that asks the harness to wait for unsettled work to
drain naturally. If everything settles within timeout, the callback
emits pipeline_finalized:settled_within_timeout and returns the
unchanged return_value. On timeout it emits settlement_timeout and
delegates to fallback (default on_finish_drain). The fallback may
itself be any callback, so chains compose cleanly.
import { on_finish_block_until_settled } from "std/lifecycle"
pipeline default() {
pipeline_on_finish(on_finish_block_until_settled(30s))
return "ok"
}
on_finish_handoff_to(target_pipeline, options?)
Returns a callback that packages the current unsettled-state snapshot
into a typed envelope (with origin, unsettled, and any
caller-supplied options) and hands it to a target pipeline via
harness.handoff_to. When there is nothing unsettled, the callback
short-circuits to pipeline_finalized and returns the unchanged
return_value — the typical case where the handoff pipeline does not
need to run at all.
import { on_finish_handoff_to } from "std/lifecycle"
pipeline default() {
pipeline_on_finish(on_finish_handoff_to("nightly-drain"))
return "triage complete"
}
Composing presets
The factories accept other callbacks as their fallback / target argument, so composition is just function nesting. A common production chain — "wait briefly, otherwise hand off to a long-running pipeline, otherwise drain immediately" — reads top-to-bottom:
import {
on_finish_block_until_settled,
on_finish_handoff_to,
} from "std/lifecycle"
pipeline default() {
pipeline_on_finish(
on_finish_block_until_settled(
30s,
on_finish_handoff_to("nightly-drain"),
),
)
return "ok"
}
Each preset emits typed audit entries via harness.emit_audit. The
entries live on a per-pipeline-run audit log that conformance fixtures
and replay oracles can drain via lifecycle_audit_log_take() (or peek
at without consuming via lifecycle_audit_log_snapshot()).
Callback combinators (std/lifecycle/combinators)
The combinators in std/lifecycle/combinators wrap any
(harness, return_value) -> return_value-shaped callback — hook
handlers, resume_by callbacks, the presets above, and any custom
on_finish body. All six are pure factories that return closures with
no captured mutable state, so they nest freely:
compose([cb, ...])runs each callback sequentially, threading the prior callback's return value into the next callback'sreturn_value.first_available([cb, ...])invokes callbacks in order and returns the first non-nil result (fallback chains).with_telemetry(cb, span_name?)opens aSpanKind::FnCallOTel span and emits paired{span_name}_started/_completed/_erroredaudit entries.with_timeout(cb, ms)is a soft, clock-aware deadline. On overrun the wrapper returns a sentinel dict{__timed_out: true, timeout_ms, elapsed_ms, return_value}and emits alifecycle_callback_timed_outaudit.if_unsettled(cb)invokes the wrapped callback only whenharness.unsettled_state()reports pending work (exactly one snapshot per call).when(predicate, cb)guards on an arbitrarypredicate(harness, return_value)and short-circuits to the inbound return value when false.
A realistic composition wraps on_finish_drain with telemetry, a
soft deadline, and an unsettled-state guard so the drain only runs
when there is real work to do:
import {
compose, if_unsettled, with_telemetry, with_timeout,
} from "std/lifecycle/combinators"
import { on_finish_drain } from "std/lifecycle"
pipeline default() {
pipeline_on_finish(
if_unsettled(
with_telemetry(with_timeout(on_finish_drain, 30000), "drain"),
),
)
return "ok"
}
Budget-threshold callbacks (std/lifecycle/on_budget)
std/lifecycle/on_budget ships three named callback strategies for the
OnBudgetThreshold lifecycle event. Each follows the same
(harness, budget_state) -> result shape as the rest of the lifecycle
layer, so they compose freely with std/lifecycle/combinators:
terminate(harness, budget_state)emits abudget_exceededaudit entry and then throws a structuredbudget_exceededexception so the enclosing agent loop / pipeline unwinds. The thrown payload carries{category: "budget_exceeded", kind: "terminal", reason: "on_budget_terminate", strategy: "terminate", budget_state, message}.graceful_exit(harness, budget_state)emits abudget_graceful_exitaudit entry and returns a deterministic exit envelope shaped{status: "budget_exhausted", strategy: "graceful_exit", reason: "on_budget_graceful_exit", budget_state, message}. Unliketerminateit does not throw — the envelope becomes the callback's return value so downstream combinators and the pipeline'son_finishchain can drain in-flight work and surface it as the pipeline's value.warn_and_continue(harness, budget_state)emits abudget_warn_and_continueaudit entry, injects a 1-turnbudget_warningsystem_reminder viatool_hooks_inject_reminder, and returns the originalbudget_stateunchanged so combinator chains see a passthrough.
The OnBudget() factory returns a dict of all three strategies so
callers can use dotted access (OnBudget.terminate,
OnBudget.graceful_exit, OnBudget.warn_and_continue) after a single
import, mirroring the QueueStrategy() / Backpressure() factories.
import { OnBudget, warn_and_continue } from "std/lifecycle/on_budget"
import { compose, with_telemetry } from "std/lifecycle/combinators"
register_persona_hook(
"*",
"OnBudgetThreshold",
compose([warn_and_continue, with_telemetry(my_logger)]),
)
Inspecting unsettled state directly
Custom on_finish callbacks have full access to the harness:
import { counts, summary } from "std/lifecycle"
pipeline default() {
pipeline_on_finish(
{ harness, return_value ->
let state = harness.unsettled_state()
if !harness.is_empty(state) {
harness.emit_audit(
"custom_drain",
{counts: counts(state), summary: summary(state)},
)
}
return return_value
},
)
return "ok"
}