Mid-stream abort
Recipe: Mid-stream abort
Scenario. A turn is mid-generation - text deltas are arriving from the provider - and something external (user clicks “stop”, a new prompt arrives, a deadline elapses) needs to cancel it. The upstream HTTP connection must close, the loop must end cleanly, and any partial deltas already emitted are kept.
This is a one-line addition to a normal loop: pipe the conversation
stream through Stream.interruptWhen(Deferred.await(abort)). When the
deferred completes, the stream interrupts, its scope closes, and
Effect’s structured concurrency tears down the HTTP response - which
signals AbortController on the underlying fetch so the TCP
connection drops.
What it shows
- Wiring an external trigger (
Deferred<void>) to a running stream withStream.interruptWhen. Any effect that completes will work here - aPromise, a wall-clock timer, an event from a queue. - The cleanup chain runs end-to-end: the
Stream.ensuringfinalizer on the provider’sstreamTurnfires when the consumer interrupts. In production this is what the HTTP client uses to callAbortController.abort(). - Partial deltas that already crossed the boundary stay with the
consumer. No
turn_completeis emitted because the turn never finished, so the recipe’s normal completion path (advance state / stop) doesn’t run.
The pattern
const conversation = pipe( initial, loop((state) => Effect.gen(function* () { const oai = yield* Responses return oai .streamTurn(state.history, { reasoning: { effort: "low" } }) .pipe(streamUntilComplete(() => Effect.sync(() => stop))) }), ),)
const abort = yield* Deferred.make<void>()
yield* Stream.runForEach( conversation.pipe(Stream.interruptWhen(Deferred.await(abort))), (event) => /* log delta, log completion, ... */,)The trigger lives outside the loop; in the demo a forked fiber sleeps
for one second then completes the deferred. Replace that with whatever
fits your app - a UI signal, a Queue.take, an AbortSignal bridged
into Effect via Effect.async.
State and partial completions
The recipe’s body never sees a turn_complete when abort fires, so
state stays at its pre-turn value. If you need the partial assistant
text to survive abort, two options:
- Capture deltas as they stream - keep a running buffer in a
Refoutside the loop. The interrupted stream still emitted them, so they land in the buffer before the abort. - Synthesize a partial
Turnin the consumer when the stream ends withoutturn_complete. The deltas you saw are enough to construct an assistant message; treat it as if the turn had finished withstop_reason: "stop".
Neither is wired here - the demo just logs deltas and lets the partial output go.
Run it
OPENAI_API_KEY=sk-... pnpm tsx recipes/mid-stream-abort/index.tsYou’ll see a stream of delta log lines, then abort fired after 3 seconds, then loop ended. No turn_complete line.
The full source lives next to this README at
index.ts.