Ir al contenido

Canalización de streaming (Python)

El SDK Python implementa la misma arquitectura de canalización basada en operadores que el tiempo de ejecución Rust, adaptada al ecosistema asíncrono de Python.

Raw Bytes → Decoder → Selector → Accumulator → FanOut → EventMapper → StreamingEvent

Convierte bytes de respuesta HTTP en frames JSON:

Decoder ClassProvider Format
SseDecoderSSE estándar (OpenAI, Groq, etc.)
JsonLinesDecoderJSON delimitado por líneas nuevas
AnthropicSseDecoderSSE personalizado de Anthropic

El decodificador se selecciona según streaming.decoder.format del manifiesto.

Filtra frames JSON usando expresiones JSONPath del manifiesto:

# Internally, the pipeline creates selectors from manifest rules:
# match: "$.choices[0].delta.content" → emit: "PartialContentDelta"

Utiliza jsonpath-ng para la evaluación de expresiones JSONPath.

Ensambla llamadas a herramientas parciales en invocaciones completas:

# Provider streams:
# {"tool_calls": [{"index": 0, "function": {"arguments": '{"ci'}}]}
# {"tool_calls": [{"index": 0, "function": {"arguments": 'ty":"T'}}]}
# {"tool_calls": [{"index": 0, "function": {"arguments": 'okyo"}'}}]}
# Accumulator produces complete: {"city": "Tokyo"}

Para respuestas multicandidato (n > 1), expande en flujos por candidato.

Tres implementaciones de mapeador:

MapperDescription
ProtocolEventMapperUsa reglas event_map del manifiesto (JSONPath → tipo de evento)
DefaultEventMapperRespaldo para proveedores compatibles con OpenAI
AnthropicEventMapperManeja la estructura de eventos única de Anthropic

La canalización expone los eventos como un iterador asíncrono:

async for event in client.chat().user("Hello").stream():
if event.is_content_delta:
text = event.as_content_delta.text
print(text, end="")
elif event.is_tool_call_started:
call = event.as_tool_call_started
print(f"\nTool: {call.name}")
elif event.is_stream_end:
end = event.as_stream_end
print(f"\nFinish: {end.finish_reason}")

Los flujos soportan cancelación elegante:

from ai_lib_python import CancelToken
token = CancelToken()
async for event in client.chat().user("...").stream(cancel_token=token):
# Cancel after receiving enough content
if total_chars > 1000:
token.cancel()
break