Skip to content

Streaming Pipeline (Python)

The Python SDK implements the same operator-based pipeline architecture as the Rust runtime, adapted for Python’s async ecosystem.

Raw Bytes → Decoder → Selector → Accumulator → FanOut → EventMapper → StreamingEvent

Converts HTTP response bytes to JSON frames:

Decoder ClassProvider Format
SseDecoderStandard SSE (OpenAI, Groq, etc.)
JsonLinesDecoderNewline-delimited JSON
AnthropicSseDecoderAnthropic’s custom SSE

The decoder is selected based on the manifest’s streaming.decoder.format.

Filters JSON frames using JSONPath expressions from the manifest:

# Internally, the pipeline creates selectors from manifest rules:
# match: "$.choices[0].delta.content" → emit: "PartialContentDelta"

Uses jsonpath-ng for JSONPath expression evaluation.

Assembles partial tool calls into complete invocations:

# Provider streams:
# {"tool_calls": [{"index": 0, "function": {"arguments": '{"ci'}}]}
# {"tool_calls": [{"index": 0, "function": {"arguments": 'ty":"T'}}]}
# {"tool_calls": [{"index": 0, "function": {"arguments": 'okyo"}'}}]}
# Accumulator produces complete: {"city": "Tokyo"}

For multi-candidate responses (n > 1), expands into per-candidate streams.

Three mapper implementations:

MapperDescription
ProtocolEventMapperUses manifest’s event_map rules (JSONPath → event type)
DefaultEventMapperFallback for OpenAI-compatible providers
AnthropicEventMapperHandles Anthropic’s unique event structure

The pipeline exposes events as an async iterator:

async for event in client.chat().user("Hello").stream():
if event.is_content_delta:
text = event.as_content_delta.text
print(text, end="")
elif event.is_tool_call_started:
call = event.as_tool_call_started
print(f"\nTool: {call.name}")
elif event.is_stream_end:
end = event.as_stream_end
print(f"\nFinish: {end.finish_reason}")

Streams support graceful cancellation:

from ai_lib_python import CancelToken
token = CancelToken()
async for event in client.chat().user("...").stream(cancel_token=token):
# Cancel after receiving enough content
if total_chars > 1000:
token.cancel()
break