Skip to content

Streaming

Agent Express streams responses in real time via StreamEvent objects. Every agent.run() and session.run() returns an AgentRun — a dual interface that supports both streaming and awaiting the final result.

AgentRun implements AsyncIterable<StreamEvent>. Use for await to process events as they arrive:

const run = agent.run("Hello")
for await (const event of run) {
switch (event.type) {
case "model:chunk":
process.stdout.write(event.text)
break
case "tool:start":
console.log(`Calling ${event.tool}...`)
break
case "error":
console.error(event.error)
break
}
}
// Or: await the final result directly
const result = await run.result

Both interfaces can be used on the same AgentRun instance. The .result promise resolves when the session completes, after all events have been emitted.


The return value of agent.run() and session.run(). Dual interface inspired by fetch().

class AgentRun implements AsyncIterable<StreamEvent> {
/** Promise resolving to the final RunResult when the session completes. */
readonly result: Promise<RunResult>
/** Async iterator yielding StreamEvents as they arrive. */
[Symbol.asyncIterator](): AsyncIterator<StreamEvent>
}

Internally, AgentRun uses an EventBus that buffers events in memory. If a consumer is already waiting, events are delivered immediately. When the run completes, the iterator drains remaining buffered events and then returns.

The EventBus is single-consumer only — each AgentRun is designed for one for await loop at a time. Multiple consumers on the same run will miss events.


StreamEvent is a discriminated union on the type field. Events follow the lifecycle nesting: session > turn > model/tool.

Emitted when a session begins.

{ type: "session:start"; sessionId: string }
FieldTypeDescription
sessionIdstringUnique session identifier.

Emitted when a session completes successfully. This is the last event before the iterator closes.

{ type: "session:end"; sessionId: string; result: RunResult }
FieldTypeDescription
sessionIdstringSession identifier.
resultRunResultFinal result with text, state, and optional data.

Emitted when a new turn begins (one user message > assistant response cycle).

{ type: "turn:start"; turnIndex: number; turnId: string }
FieldTypeDescription
turnIndexnumberTurn number within this session (0-based).
turnIdstringUnique turn identifier.

Emitted when a turn completes.

{ type: "turn:end"; turnIndex: number; turnId: string; text: string }
FieldTypeDescription
turnIndexnumberTurn number (0-based).
turnIdstringUnique turn identifier.
textstringThe assistant’s final text output for this turn.

Emitted before each LLM call.

{ type: "model:start"; model: string; callIndex: number }
FieldTypeDescription
modelstringModel identifier (e.g., "anthropic/claude-sonnet-4-6").
callIndexnumberWhich model call in this turn (0-based). A turn with tool use will have multiple model calls.

Emitted for each text chunk during streaming. Use this for real-time text display.

{ type: "model:chunk"; callIndex: number; text: string }
FieldTypeDescription
callIndexnumberWhich model call in this turn (0-based).
textstringText chunk from the model.

Emitted when an LLM call completes.

{ type: "model:end"; callIndex: number; finishReason: string }
FieldTypeDescription
callIndexnumberWhich model call in this turn (0-based).
finishReasonstringWhy the model stopped: "stop", "tool-calls", "length", "content-filter", "error", "other".

Emitted before a tool executes.

{ type: "tool:start"; tool: string; args: Record<string, unknown>; callId: string }
FieldTypeDescription
toolstringTool name.
argsRecord<string, unknown>Arguments the model passed to the tool.
callIdstringTool call ID linking this call to its result.

Emitted when a tool execution completes.

{ type: "tool:end"; tool: string; callId: string; result: unknown }
FieldTypeDescription
toolstringTool name.
callIdstringTool call ID linking to the original tool:start.
resultunknownValue returned by the tool (or error message if it failed).

Emitted when an unhandled error occurs. This is the last event before the iterator closes on failure.

{ type: "error"; error: Error }
FieldTypeDescription
errorErrorThe error that caused the failure. Typically an AgentExpressError subclass.

A typical single-turn run with one tool call produces this event sequence:

session:start { sessionId: "s-abc123" }
turn:start { turnIndex: 0, turnId: "t-def456" }
model:start { model: "anthropic/claude-sonnet-4-6", callIndex: 0 }
model:chunk { callIndex: 0, text: "Let me " }
model:chunk { callIndex: 0, text: "check that..." }
model:end { callIndex: 0, finishReason: "tool-calls" }
tool:start { tool: "get_weather", args: { city: "Paris" }, callId: "tc-1" }
tool:end { tool: "get_weather", callId: "tc-1", result: { temperature: "22C" } }
model:start { model: "anthropic/claude-sonnet-4-6", callIndex: 1 }
model:chunk { callIndex: 1, text: "The weather " }
model:chunk { callIndex: 1, text: "in Paris is 22C." }
model:end { callIndex: 1, finishReason: "stop" }
turn:end { turnIndex: 0, turnId: "t-def456", text: "The weather in Paris is 22C." }
session:end { sessionId: "s-abc123", result: { text: "...", state: {...} } }

A common pattern is filtering for specific event types:

const run = agent.run("What's the weather?")
for await (const event of run) {
// Real-time text streaming
if (event.type === "model:chunk") {
process.stdout.write(event.text)
}
// Track tool usage
if (event.type === "tool:start") {
console.log(`[tool] ${event.tool}(${JSON.stringify(event.args)})`)
}
// Handle errors
if (event.type === "error") {
console.error(`Run failed: ${event.error.message}`)
}
}

For HTTP streaming, use createHandler() from agent-express/http to convert events to Server-Sent Events. See HTTP & Web Frameworks for full framework integration examples.

import { createHandler } from "agent-express/http"
const handler = createHandler(agent)
// Produces SSE with each StreamEvent as a JSON data frame