Pipeline lifecycle cookbook
End-to-end patterns for the callback-first pipeline lifecycle. Each
recipe is a self-contained, copy-paste starting point. For the surface
reference see Pipeline lifecycle; for the
per-preset stdlib reference see
Pipeline lifecycle presets; for the
LLM-friendly quickref see the "Pipeline lifecycle" section in
docs/llm/harn-quickref.md.
The recipes use harn,ignore fences because they wire up full
multi-pipeline topologies (producer, drain, trigger handlers, host glue).
Each fragment type-checks; the orchestration loop assumes a host that
runs the constituent pipelines, such as harn orchestrator.
Nightly settlement handoff
A live ingest pipeline triages events synchronously up to a watermark, then hands deferred work off to a separate nightly settlement pipeline. The handoff envelope carries the unsettled snapshot; the nightly pipeline drains it under its own schedule and budget.
import { trigger_register } from "std/triggers"
import { on_finish_handoff_to } from "std/lifecycle"
// --- live ingest pipeline ---
pipeline ingest(events) {
// Any deferred items become partial-handoff envelopes via
// harness.handoff_to so they show up in unsettled_state().
pipeline_on_finish(on_finish_handoff_to("nightly-settle", {
note: "live ingest deferred bucket",
}))
for event in events {
let outcome = try_process_now(event)
if outcome.deferred {
// Stage the item directly on the harness so the on_finish
// callback picks it up from harness.unsettled_state().
__handoff_to_settle(event)
}
}
return {status: "ingest_complete", count: len(events)}
}
// --- nightly settlement pipeline ---
pipeline nightly_settle(envelope) {
let items = envelope.unsettled.partial_handoffs
for item in items {
settle_one(item.payload)
}
return {status: "settled", count: len(items)}
}
fn __handoff_to_settle(event) {
// The harness builtin is provided by the runtime inside any
// pipeline body; declared as a helper so the snippet reads
// top-to-bottom.
current_harness().handoff_to("nightly-settle", event)
}
Why on_finish_handoff_to over emitting a channel: the handoff
envelope carries the full unsettled snapshot, with origin and timing
metadata, and the nightly pipeline gets the bucket in one call instead
of replaying N channel emits. The handoff is recorded as a
partial_handoff_envelope audit, so the replay oracle can verify the
same envelope crosses both pipelines on a second run.
Why a separate pipeline: the nightly run owns its own budget, hook
chain, and on_finish policy. A failure mid-settle does not unwind the
live ingest run, and a pre_finish block on the nightly pipeline
holds finish open until the per-item drain completes.
Audit every drain decision to a custom store
A regulated team needs every disposition the settlement-agent loop
makes to land in an external audit store (Splunk, BigQuery, S3), not
just the per-run pipeline.lifecycle.audit topic. A
register_session_hook("on_drain_decision", ...) callback observes
each decision before it persists, and a composed on_finish callback
runs the drain loop with telemetry around it.
import { on_finish_drain } from "std/lifecycle"
import { compose, with_telemetry } from "std/lifecycle/combinators"
pipeline triage(input) {
// 1. Mirror every drain disposition to our external store.
register_session_hook("on_drain_decision", { event ->
audit_store_push({
pipeline: event.pipeline_id,
bucket: event.bucket,
item_id: event.item_id,
disposition: event.disposition,
seq: event.seq,
at_ms: event.at_ms,
})
// Return nil to allow; we are not vetoing or rewriting.
return nil
})
// 2. Drain on finish, wrapped in an OTel span so dashboards can
// pin on the duration distribution.
pipeline_on_finish(
with_telemetry(on_finish_drain, "triage_drain"),
)
return process(input)
}
fn audit_store_push(record) {
// Replace with the real client.
__external_audit_emit(record)
}
Why a hook over inline logging in a custom on_finish: the
on_drain_decision gate fires once per item the settlement-agent loop
processes, regardless of which preset or custom callback drives the
loop. A hook captures every disposition uniformly — including from
future presets — without forking the drain logic.
Why with_telemetry outside the drain: the wrapper emits
triage_drain_started / _completed / _errored audit entries with
a stable span name, so the external store sees a paired pair of
records bracketing the per-item entries.
Long-paused agent with resume-continuity reminder
A research agent self-parks via agent_await_resumption and may
sleep for hours. When it resumes, two things must happen exactly
once: a continuity reminder lands on the first resumed turn (the
runtime injects this by default), and a custom telemetry envelope
fires so the operator dashboard shows the pause duration.
import { spawn_agent, parse_resume_conditions } from "std/agent/workers"
pipeline research_runner(task) {
// Bracket the suspend / resume gates with telemetry. The runtime
// also fires its built-in `resume_continuity` reminder; this hook
// wraps it with structured operator telemetry.
register_session_hook("post_resume", { event ->
operator_dashboard_emit("agent.resumed", {
handle: event.worker.handle,
suspended_at_ms: event.suspended_at_ms,
resumed_at_ms: event.resumed_at_ms,
duration_ms: event.resumed_at_ms - event.suspended_at_ms,
reason: event.suspend_reason,
resume_cause: event.resume_cause,
})
return nil
})
let resume_when = parse_resume_conditions({
timeout: {duration_minutes: 240, on_timeout: "resume_with_summary"},
on_event: "operator.resume",
})
let worker = spawn_agent({
prompt: task,
system: "Pause when you need a long-running external lookup.",
options: {resume_when: resume_when},
})
return wait_agent(worker)
}
Why a post_resume hook over polling the worker registry: the hook
runs once per resume, on the right session, with the suspend and
resume timestamps already paired. Polling drifts and can double-fire
across worker restarts.
Why parse resume_when upfront: the parsed dict is captured on the
suspend snapshot, so a cold-restore (harn run --resume <path>)
reconstitutes the same wake conditions instead of re-evaluating the
shape at resume time.
Hook-based supervision: deny suspend during business hours
A SaaS team wants to prevent any agent from self-parking between 9 AM
and 5 PM local time (so the on-call rotation can intervene with full
context). A pre_suspend hook denies the suspend; the runtime keeps
the worker running and the agent gets a reminder explaining the policy.
fn during_business_hours(now_ms) -> bool {
let hour = wall_clock_hour_local(now_ms)
return hour >= 9 && hour < 17
}
pipeline supervised_agents() {
register_session_hook("pre_suspend", { event ->
if !during_business_hours(now_ms()) {
return nil // allow
}
return {
block: true,
reason: "agents may not self-park during business hours; ask on-call",
reminder: {
body: "Your suspend was denied (business-hours policy). Continue working or escalate via the on-call channel.",
tags: ["policy_violation", "business_hours_no_suspend"],
ttl_turns: 1,
dedupe_key: "policy.no_suspend_business_hours",
},
}
})
// ... rest of the setup pipeline; agents spawn under this hook
// chain inherit the deny policy.
return nil
}
Why pre_suspend over OnPersonaPaused: pre_suspend is the
gate; it can veto the suspend and keep the worker running. The
Paused events fire after the suspend is already committed.
Why a paired reminder: the agent that tried to self-park needs to see why it was denied, not just continue blindly. The 1-turn TTL ensures the reminder lands on the very next turn and then evaporates so it does not pollute later context.
Why the dedupe key: if the agent retries the suspend three times in one turn, the dedupe collapses the reminder to one entry rather than stacking three identical bodies.
Replay-deterministic test harness for a multi-suspend pipeline
A pipeline suspends a worker, waits for an external event, resumes, suspends again, and finally drains. The conformance fixture needs to replay deterministically — same audit entries, same disposition order, same handoff envelopes — across two runs of the same pipeline.
import { mock_time, advance_time } from "std/clock"
import { flush_trigger_aggregations } from "std/triggers/testing"
import { pipeline_lifecycle_audit_log_take } from "std/lifecycle"
import { on_finish_drain } from "std/lifecycle"
import { compose, with_telemetry } from "std/lifecycle/combinators"
pipeline multi_suspend_fixture() {
// 1. Pin wall-clock so queued_at_ms / age_ms are reproducible.
mock_time(1_700_000_000_000)
// 2. Capture audits via the per-run log instead of wall-clock spans.
pipeline_on_finish(
with_telemetry(on_finish_drain, "fixture_drain"),
)
// 3. Run the workload. Each suspend / resume / drain step records
// a typed audit entry.
let worker = spawn_research_worker()
emit_external_event("operator.resume", {})
advance_time(60_000)
flush_trigger_aggregations()
let worker2 = spawn_followup_worker(worker)
emit_external_event("operator.resume", {})
advance_time(120_000)
flush_trigger_aggregations()
return wait_agent(worker2)
}
pipeline assert_replay_determinism() {
// 4. Drain the audit log; the conformance harness compares this
// byte-for-byte against the recorded fixture.
let entries = pipeline_lifecycle_audit_log_take()
return {audits: entries, count: len(entries)}
}
Why mock_time + advance_time over wall-clock: the harness banlist
(make lint-test-patterns) prohibits std::thread::sleep,
Instant::now() polling, and short recv_timeout calls in tests.
Mocking the clock lets queued_at_ms and age_ms fields land at
deterministic offsets that the replay oracle can compare directly.
Why flush_trigger_aggregations before the second suspend: batched
trigger handlers buffer events until the aggregation window closes.
Explicit flushes turn an inherently time-driven boundary into a
synchronous one so each step's audit entries land before the next
step opens.
Why pipeline_lifecycle_audit_log_take over snapshot: the take
variant drains the log so a subsequent assertion starts from a clean
slate. The conformance fixture asserts on the drained list as a
whole, not on a snapshot that might include leftovers from a previous
test.
Picking the right primitive
For any of these recipes, the wrong tool is also worth knowing:
- Don't use a
post_finishhook to block.post_finishis advisory; the value is already captured. Usepre_finish(which rejectsblockand points you at the right primitive) oron_finish_block_until_settledto delay finish until work drains. - Don't write a custom drain loop unless you have to.
on_finish_drainalready walks the buckets in the canonical order, respectsHARN-DRN-001ordering enforcement, and fireson_drain_decisionper item. A custom loop has to re-derive every one of those properties. - Don't reach for
pre_suspenddenials to throttle suspends. The runtime emits an audit per attempt; a deny chain quickly becomes noise. Use a worker-side budget (OnBudget.terminateorgraceful_exit) instead. - Don't poll
harness.unsettled_state()from a tight loop. Each call walks the live registries plus the event log. Useharness.wait_for_any_settlement(max_duration)oron_finish_block_until_settledto wait on a single coalesced snapshot.