コンテンツにスキップ

ストリーミングパイプライン(Python)

Python SDK は、Python の非同期エコシステムに適応した、Rust ランタイムと同じオペレーターベースのパイプラインアーキテクチャを実装しています。

Raw Bytes → Decoder → Selector → Accumulator → FanOut → EventMapper → StreamingEvent

HTTP レスポンスバイトを JSON フレームに変換します:

デコーダークラスプロバイダー形式
SseDecoder標準 SSE(OpenAI、Groq など)
JsonLinesDecoder改行区切り JSON
AnthropicSseDecoderAnthropic のカスタム SSE

デコーダーはマニフェストの streaming.decoder.format に基づいて選択されます。

マニフェストの JSONPath 式を使用して JSON フレームをフィルタリングします:

# 内部的に、パイプラインはマニフェストルールからセレクターを作成します:
# match: "$.choices[0].delta.content" → emit: "PartialContentDelta"

JSONPath 式の評価に jsonpath-ng を使用します。

部分的なツール呼び出しを完全な呼び出しに組み立てます:

# プロバイダーのストリーム:
# {"tool_calls": [{"index": 0, "function": {"arguments": '{"ci'}}]}
# {"tool_calls": [{"index": 0, "function": {"arguments": 'ty":"T'}}]}
# {"tool_calls": [{"index": 0, "function": {"arguments": 'okyo"}'}}]}
# アキュムレーターが完全な {"city": "Tokyo"} を生成

マルチ候補レスポンス(n > 1)の場合、候補ごとのストリームに展開します。

3 つのマッパー実装:

マッパー説明
ProtocolEventMapperマニフェストの event_map ルール(JSONPath → イベント型)を使用
DefaultEventMapperOpenAI 互換プロバイダー用フォールバック
AnthropicEventMapperAnthropic の独自イベント構造を処理

パイプラインはイベントを非同期イテレーターとして公開します:

async for event in client.chat().user("Hello").stream():
if event.is_content_delta:
text = event.as_content_delta.text
print(text, end="")
elif event.is_tool_call_started:
call = event.as_tool_call_started
print(f"\nTool: {call.name}")
elif event.is_stream_end:
end = event.as_stream_end
print(f"\nFinish: {end.finish_reason}")

ストリームはグラースフルなキャンセルをサポートします:

from ai_lib_python import CancelToken
token = CancelToken()
async for event in client.chat().user("...").stream(cancel_token=token):
# 十分なコンテンツを受信したらキャンセル
if total_chars > 1000:
token.cancel()
break