Streams
Streams are lazy, single-pass values produced by gen fn. A stream
emits values over time and can be consumed with for, .next(), or
.iter().
gen fn numbers(start: int, end: int) -> Stream<int> {
var n = start
while n < end {
emit n
n = n + 1
}
}
for n in numbers(1, 4) {
log(n)
}
parallel each can also return a stream instead of waiting for the
whole batch. The streamed form preserves the max_concurrent cap but
emits each result as soon as that task completes:
let results = parallel each [30, 5, 10] with { max_concurrent: 2 } { ms ->
sleep(ms)
return ms
} as stream
for result in results {
log(result) // 5, then 10, then 30
}
Use parallel_race(items, callable, options?) when only the first
successful result matters. It returns the first plain value or
Result.Ok payload, cancels remaining work, and throws an aggregate
error when every task throws or returns Result.Err.
gen is contextual in the gen fn declaration form, so existing
identifiers named gen remain valid. emit expr is only valid inside
gen fn. It sends one value to the consumer and then the function
continues when the consumer asks for the next item. Existing yield
behavior is unchanged; use emit for streams.
Stream<T> is distinct from Generator<T> in the checker. Regular
functions that already use yield keep returning Generator<T>.
gen fn returns Stream<T>.
gen fn chunks() -> Stream<string> {
emit "one"
emit "two"
}
let s: Stream<string> = chunks()
let first = s.next()
log(first.value) // one
log(first.done) // false
Errors thrown inside a stream propagate to the consumer at the point where the next value is pulled:
gen fn broken() -> Stream<int> {
emit 1
throw "failed"
}
try {
for n in broken() {
log(n)
}
} catch err {
log("caught ${err}")
}
Breaking out of a for loop stops consuming the stream. Stream
operators such as map/filter/merge/throttle and built-in LLM token
streaming are separate runtime features layered on top of this base
value type.
The event_log namespace exposes the active runtime EventLog as stream
values. event_log.subscribe({topic, from_cursor, kind_prefix?})
returns a Stream<dict> whose events include {id, cursor, topic, kind, payload, headers, occurred_at_ms}. kind_prefix filters the stream to
matching event kinds before delivery. Dropping the stream closes the
underlying subscription.