The loop primitive
An agent is a loop over your state.
In effect-uai, that loop is not hidden inside an Agent class. State is a
plain record, the body is a Stream, and a small tagged step type
(Value / Next / Stop) controls iteration. Each turn, tool call,
fallback, compaction, or pause is just ordinary Effect code in that body.
The loop is pull-based: there is no producer fiber and no queue buffering. The
next iteration only starts when the downstream consumer pulls the outer stream.
That keeps backpressure, cancellation, and resource cleanup aligned with normal
Effect Stream semantics.
The shape
loop<S, A, E, R>( initial: S, body: (state: S) => Stream<Step<A, S>, E, R>,): Stream<A, E, R>Both data-first (loop(initial, body)) and data-last
(pipe(initial, loop(body))) forms work. The body may also return an
Effect<Stream<...>> so it can yield services before producing the
stream.
The step type
Each pull, the body emits a chunk of Step<A, S>:
type Step<A, S> = | { _tag: "Value"; value: A } // flows downstream | { _tag: "Next"; state: S } // end this iteration, continue with new state | { _tag: "Stop" } // end the loop entirely | { _tag: "StopWith"; state: S } // end the loop AND surface a final stateA Next or Stop is terminal for the iteration. Anything emitted
in the same chunk after one is discarded - prefer the helpers below
over building events by hand.
Helpers
Loop.value(a) // wrap a value as a Step (use inside Stream.map)Loop.next(state) // single-element stream: end this iteration, continue with new stateLoop.stop() // single-element stream that ends the loopLoop.stop(state) // end the loop AND surface a final statenext and stop each emit a single terminal step, so concatenate your
values before them: emit a run of values then continue with
values.pipe(Stream.map(Loop.value), Stream.concat(Loop.next(state))),
or just Loop.next(state) when there were no values. The streaming-tool
helper Toolkit.continueWithResults bundles that
pattern: it forwards every ToolEvent as Loop.value, accumulates the
terminal Output events into a ReadonlyArray<ToolResult>, and concats
one Loop.next(build(results)) at end-of-stream.
Reach for stop(state) when the loop ending is the result you care
about — a summarised state, a tallied result, a final checkpoint.
loopWithState exposes that final state to the caller; with plain
stop() it’s discarded.
onTurnComplete
Most loop bodies wrap a provider’s Stream<TurnEvent>. The pattern is
always the same: forward events to the consumer, wait for the terminal
TurnComplete, then decide what to do with the assembled Turn.
Loop.onTurnComplete packages exactly that:
import { Effect } from "effect"import { loop, stop, onTurnComplete } from "@effect-uai/core/Loop"import { toToolCallOutput } from "@effect-uai/core/ToolResult"import * as Tool from "@effect-uai/core/Tool"import type { ToolEvent } from "@effect-uai/core/ToolEvent"import * as Toolkit from "@effect-uai/core/Toolkit"import * as Turn from "@effect-uai/core/Turn"import { Responses } from "@effect-uai/responses"
pipe( initial, loop((state) => Effect.gen(function* () { const oai = yield* Responses
return oai .streamTurn({ history: state.history, model: "gpt-5.4-mini", tools: Tool.toDescriptors(allTools), }) .pipe( onTurnComplete<State, ToolEvent>((turn) => Effect.gen(function* () { const calls = Turn.getToolCalls(turn)
// No tool calls means there is nothing to feed back. if (calls.length === 0) return stop()
return Toolkit.run(allTools, calls).pipe( Toolkit.continueWithResults((results) => // Build the next state only after every tool call has an output. Turn.appendToHistory(state, turn, results.map(toToolCallOutput)), ), ) }), ), ) }), ),)What it does:
- Each
TurnEventpasses through asLoop.value(event)— including the terminalTurnComplete, so the consumer sees turn boundaries. - Once the terminal arrives, the callback runs with the assembled
Turnand its returned event-stream is concatenated. Typically that stream comes fromToolkit.runthreaded throughcontinueWithResultsto advance — or juststop(). ToolEvents emitted by the executor (Progress,Output,ApprovalRequested) flow through alongside theTurnEvents.- Pre-pipe transforms work as you’d expect:
Stream.tapfor logging,Stream.filterto drop events you don’t care about,Stream.mapto reshape them.
If the upstream ends without a TurnComplete, the resulting stream
fails with AiError.IncompleteTurn. Catch it with Stream.catchTag if
you want to recover.
Cancellation and resources
Because the loop is pull-based, structured concurrency works without extra wiring:
- Each iteration’s body stream lives in its own forked scope. When the iteration ends (decision arrives, error, downstream interrupt), that scope closes - finalizers attached to the body run synchronously.
- The outer scope owns whichever body is currently active. Closing the
outer scope (e.g. via
Stream.interruptWhenfrom a consumer) closes the active body too. - This is why mid-stream abort is a
one-liner: the HTTP
Stream.ensuringfinalizer in the provider rides the same chain straight down toAbortController.abort()on the underlyingfetch.
Picking a provider
The loop primitive is provider-agnostic. The body just yields a
service tag and calls streamTurn. See Providers
for the OpenAI Responses and Google Gemini integrations and how to
swap between them at the layer or per-iteration.