Trigger stdlib
The trigger stdlib exposes the live runtime registry to Harn scripts. Use it to inspect installed bindings, register new bindings at runtime, fire synthetic events for tests/manual invocations, replay a recorded event by id, and inspect the current dead-letter queue (DLQ).
Import the shared types from std/triggers when you want typed handles and
payloads:
import "std/triggers"
Builtins
trigger_list()
Return the current live registry snapshot as list<TriggerBinding>.
Each binding includes:
idversionsource("manifest"or"dynamic")kindproviderautonomy_tierhandler_kindstatemetrics
metrics is a typed TriggerMetrics record with counters for received,
dispatched, failed, dlq, in_flight, and the cost snapshot fields.
trigger_register(config)
Register a trigger dynamically and return its TriggerHandle.
TriggerConfig uses the same broad shape as manifest-loaded bindings:
idkindproviderautonomy_tierhandlerwhenwhen_budgetretrymatchoreventsdedupe_keyfilterallow_cleartextbudgetmanifest_pathpackage_name
Dynamic trigger_register(...) currently supports the legacy budget fields but
does not yet accept manifest-only flow-control tables such as concurrency,
throttle, rate_limit, debounce, singleton, batch, or keyed
priority.
The runtime currently accepts two handler forms:
- Local Harn closures / function references
- Remote URI strings with
a2a://...orworker://...
allow_cleartext is optional and only applies to a2a://... handlers. Set it
to true when you intentionally want HTTP A2A discovery / dispatch, for
example when talking to a local harn serve process during development.
retry is optional. The current stdlib surface accepts:
{max: N, backoff: "svix"}{max: N, backoff: "immediate"}
Example:
import "std/triggers"
fn handle_issue(event: TriggerEvent) -> dict {
return {kind: event.kind, provider: event.provider}
}
let handle: TriggerHandle = trigger_register({
id: "github-new-issue",
kind: "issue.opened",
provider: "github",
autonomy_tier: "act_with_approval",
handler: handle_issue,
allow_cleartext: nil,
when: nil,
when_budget: nil,
match: {events: ["issue.opened"]},
events: nil,
dedupe_key: nil,
filter: nil,
budget: nil,
manifest_path: nil,
package_name: nil,
})
trigger_fire(handle, event)
Fire a synthetic TriggerEvent into a binding and return a
DispatchHandle.
The builtin accepts either:
- A
TriggerHandle/TriggerBindingdict - A plain trigger id string
If the event dict omits low-level envelope fields such as id,
received_at, trace_id, or provider_payload, the runtime fills them with
synthetic defaults.
Current behavior:
- Execution routes through the trigger dispatcher, so local handlers inherit dispatcher retries, lifecycle events, action-graph updates, and DLQ moves.
whenpredicates execute before the handler and can still short-circuit a dispatch.when_budgetaccepts{max_cost_usd, tokens_max, timeout}and applies fail-closed per-predicate LLM cost governance.- When a manifest-installed binding uses
batch = { ... }, the selected leader event carries the coalesced member list inevent.batch. a2a://...handlers return either the inline remote result or a pending task handle, depending on the peer response.worker://...handlers return an enqueue receipt inDispatchHandle.resultwith{queue, job_event_id, response_topic}.
trigger_replay(event_id)
Replay a previously recorded event from the EventLog by id and return a
DispatchHandle.
Current replay behavior:
- Fetch the prior event from the
triggers.eventstopic - Re-dispatch it through the trigger dispatcher using the recorded binding
- Preserve
replay_of_event_idon the returnedDispatchHandle - Resolve the pending stdlib DLQ entry when a replay succeeds
trigger_replay(...) is still not the full deterministic T-14 replay engine.
It replays the recorded trigger event through the current dispatcher/runtime
state rather than a sandboxed drift-detecting environment.
trigger_inspect_dlq()
Return the current DLQ snapshot as list<DlqEntry>.
Each DlqEntry includes:
- The failed
event - Trigger identity (
binding_id,binding_version) - Current
state - Latest
error - Derived
error_class(provider_5xx,predicate_panic,handler_panic,handler_timeout,auth_failed,budget_exhausted, orunknown) retry_history
retry_history records every DLQ attempt, including replay attempts.
trigger_inspect_lifecycle(kind?)
Return the trigger lifecycle stream as a list of {kind, headers, payload}
records. Pass a kind such as predicate.evaluated,
predicate.budget_exceeded, or DispatchStarted to filter on the runtime
side.
trigger_inspect_action_graph(trace_id?)
Return streamed observability.action_graph records as a list of
{kind, headers, payload} records. Pass a trace_id to filter the stream to
one trigger delivery and its downstream predicate, dispatch, retry, worker,
A2A, and DLQ nodes.
handler_context()
Return the current dispatch context as HandlerContext | nil.
Inside a trigger handler, the returned record includes:
agentactiontrace_idreplay_of_event_idautonomy_tiertrigger_event
Outside trigger dispatch, the builtin returns nil.
Stream helpers
std/triggers includes small declarative helpers for stream handlers:
stream_fork(source, branches)returns aStreamForkPlanstream_join(source, join?)returns aStreamJoinPlanwindow_by(events, window)returns aStreamWindowPlanllm_classify(input, labels, options?)returns aStreamLlmClassifyPlan
These helpers do not start broker consumers by themselves. They give handlers a typed, serializable plan shape for fan-out/fan-in, manifest-aligned windowing, and cached classifier steps.
import "std/triggers"
pub fn on_quotes(event: TriggerEvent) -> dict {
let forked = stream_fork([event.provider_payload.raw], ["risk", "ledger"])
let windowed = window_by(
[event.provider_payload.raw],
{mode: "sliding", key: "event.provider_payload.key", size: "5m", every: "1m", gap: nil, max_items: 500},
)
let classified = llm_classify(event.provider_payload.raw, ["ignore", "review"], {cache: "quotes:v1"})
return {forked: forked, windowed: windowed, classified: classified}
}
trust_record(agent, action, approver, outcome, tier)
Append a manual OpenTrustGraph TrustRecord to the trust graph. Scripts usually
rely on the dispatcher's automatic end-of-handler records, but this builtin is
available for control-plane events such as promotions, demotions, or manual
audit entries. The returned record includes chain_index, previous_hash, and
entry_hash, and the runtime also mirrors a compact projection to
trust_graph.records.
trust_graph_record(decision)
Append a decision dict to the trust graph and return its TrustEntryId
(record_id). The dict accepts agent/actor_id, action, approver,
outcome, trace_id, autonomy_tier/autonomy_tier_at_time (or tier),
evidence_refs, cost_usd, and metadata.
trust_graph_query(agent, action)
Return a TrustScore for an agent and optional action. Scores include outcome
counts, success rate, latest outcome, effective autonomy tier, and a capability
policy that handlers can use as a gate.
trust_graph_policy_for(agent)
Return only the derived CapabilityPolicy for an agent.
trust_graph_verify_chain()
Verify the active trust graph hash chain and return a report with verified,
root_hash, broken_at_event_id, and errors.
trust_query(filters)
Query historical trust records from Harn code.
Supported filter keys:
agentactionsinceuntiltieroutcomelimitgrouped_by_trace
limit keeps only the newest N matching records. When grouped_by_trace is
true, the builtin returns list<{trace_id, records}> trace buckets instead of
the default flat list<TrustRecord>.
For new code, prefer import "std/trust" and call query(...) or
trust.query(...) to receive the compact issue-facing TrustGraphRecord shape
with actor_id, evidence_refs, and autonomy_tier_at_time.
Example
import "std/triggers"
fn fail_handler(event: TriggerEvent) -> any {
throw("manual failure: " + event.kind)
}
let handle = trigger_register({
id: "manual-dlq",
kind: "issue.opened",
provider: "github",
handler: fail_handler,
when: nil,
when_budget: nil,
retry: {max: 1, backoff: "immediate"},
match: nil,
events: ["issue.opened"],
dedupe_key: nil,
filter: nil,
budget: nil,
manifest_path: nil,
package_name: nil,
})
let fired = trigger_fire(handle, {provider: "github", kind: "issue.opened"})
let dlq = trigger_inspect_dlq().filter({ entry -> entry.binding_id == handle.id })
let replay = trigger_replay(fired.event_id)
log(fired.status) // "dlq"
log(len(dlq[0].retry_history)) // 1
log(replay.replay_of_event_id) // original event id
Notes
- Dynamic registrations are runtime-local.
trigger_register(...)updates the live registry in the current process; it does not rewriteharn.toml. a2a://...bindings default to HTTPS-only. Useallow_cleartext: trueonly for intentional local or otherwise trusted HTTP peers.TriggerConfig.autonomy_tierdefaults toact_autowhen omitted.trigger_fire(...)andtrigger_replay(...)need an active EventLog to persisttriggers.eventsandtriggers.dlq. If the runtime did not already install one, the stdlib wrapper falls back to an in-memory log for the current thread.- Predicate replay is deterministic for
llm_call(...): cached predicate responses are reused from the request cache plus the per-eventtrigger.inboxrecord rather than calling the live provider again. - Every terminal dispatch appends one
TrustRecordtotrust.graphplus the per-agent topictrust.graph.<agent_id>. - When
workflow_execute(...)runs inside a replayed trigger dispatch, the runtime carries the replay pointer into run metadata so derived observability can render areplay_chainedge back to the original event.