SpyBara
Go Premium

agent-sdk/streaming-output.md 2026-06-16 21:57 UTC to 2026-06-17 17:02 UTC

1 added, 0 removed.

2026
Tue 30 23:02 Mon 29 23:02 Sat 27 01:01 Fri 26 23:00 Thu 25 23:58 Wed 24 22:02 Tue 23 22:00 Mon 22 23:59 Fri 19 22:58 Thu 18 22:00 Wed 17 17:02 Tue 16 21:57 Mon 15 23:02 Sat 13 21:59 Fri 12 22:00 Thu 11 23:01 Wed 10 23:57 Tue 9 06:34 Mon 8 06:52 Sat 6 06:24 Fri 5 06:45 Thu 4 06:52 Wed 3 06:53 Tue 2 06:51

Потоковая передача ответов в реальном времени

Получайте ответы в реальном времени от Agent SDK по мере поступления текста и вызовов инструментов

По умолчанию Agent SDK выдает полные объекты AssistantMessage после того, как Claude завершит генерацию каждого ответа. Чтобы получать добавочные обновления по мере генерации текста и вызовов инструментов, включите потоковую передачу частичных сообщений, установив include_partial_messages (Python) или includePartialMessages (TypeScript) в значение true в ваших параметрах.

Включение потоковой передачи выходных данных

Чтобы включить потоковую передачу, установите include_partial_messages (Python) или includePartialMessages (TypeScript) в значение true в ваших параметрах. Это заставляет SDK выдавать сообщения StreamEvent, содержащие необработанные события API по мере их поступления, в дополнение к обычным AssistantMessage и ResultMessage.

Ваш код затем должен:

  1. Проверить тип каждого сообщения, чтобы отличить StreamEvent от других типов сообщений
  2. Для StreamEvent извлечь поле event и проверить его type
  3. Искать события content_block_delta, где delta.type — это text_delta, которые содержат фактические текстовые фрагменты

Пример ниже включает потоковую передачу и выводит текстовые фрагменты по мере их поступления. Обратите внимание на вложенные проверки типов: сначала для StreamEvent, затем для content_block_delta, затем для text_delta:

from claude_agent_sdk import query, ClaudeAgentOptions
from claude_agent_sdk.types import StreamEvent
import asyncio


async def stream_response():
options = ClaudeAgentOptions(
include_partial_messages=True,
allowed_tools=["Bash", "Read"],
)

async for message in query(prompt="List the files in my project", options=options):
if isinstance(message, StreamEvent):
event = message.event
if event.get("type") == "content_block_delta":
delta = event.get("delta", {})
if delta.get("type") == "text_delta":
print(delta.get("text", ""), end="", flush=True)


asyncio.run(stream_response())

Справочник StreamEvent

Когда включены частичные сообщения, вы получаете необработанные события потоковой передачи Claude API, завернутые в объект. Тип имеет разные имена в каждом SDK:

  • Python: StreamEvent (импортируется из claude_agent_sdk.types)
  • TypeScript: SDKPartialAssistantMessage с type: 'stream_event'

Оба содержат необработанные события Claude API, а не накопленный текст. Вам нужно самостоятельно извлекать и накапливать текстовые дельты. Вот структура каждого типа:

@dataclass
class StreamEvent:
uuid: str  # Unique identifier for this event
session_id: str  # Session identifier
event: dict[str, Any]  # The raw Claude API stream event
parent_tool_use_id: str | None  # Parent tool ID if from a subagent

Поле event содержит необработанное событие потоковой передачи из Claude API. Распространенные типы событий включают:

Тип события Описание
message_start Начало нового сообщения
content_block_start Начало нового блока содержимого (текст или использование инструмента)
content_block_delta Добавочное обновление содержимого
content_block_stop Конец блока содержимого
message_delta Обновления на уровне сообщения (причина остановки, использование)
message_stop Конец сообщения

Поток сообщений

С включенными частичными сообщениями вы получаете сообщения в этом порядке:

StreamEvent (message_start)
StreamEvent (content_block_start) - text block
StreamEvent (content_block_delta) - text chunks...
StreamEvent (content_block_stop)
StreamEvent (content_block_start) - tool_use block
StreamEvent (content_block_delta) - tool input chunks...
StreamEvent (content_block_stop)
StreamEvent (message_delta)
StreamEvent (message_stop)
AssistantMessage - complete message with all content
... tool executes ...
... more streaming events for next turn ...
ResultMessage - final result

Без включенных частичных сообщений (include_partial_messages в Python, includePartialMessages в TypeScript) вы получаете все типы сообщений, кроме StreamEvent. Распространенные типы включают SystemMessage (инициализация сеанса), AssistantMessage (полные ответы), ResultMessage (финальный результат) и компактное граничное сообщение, указывающее на то, когда история разговора была сжата (SDKCompactBoundaryMessage в TypeScript; SystemMessage с подтипом "compact_boundary" в Python).

Потоковая передача текстовых ответов

Чтобы отобразить текст по мере его генерации, ищите события content_block_delta, где delta.type — это text_delta. Они содержат добавочные текстовые фрагменты. Пример ниже выводит каждый фрагмент по мере его поступления:

from claude_agent_sdk import query, ClaudeAgentOptions
from claude_agent_sdk.types import StreamEvent
import asyncio


async def stream_text():
options = ClaudeAgentOptions(include_partial_messages=True)

async for message in query(prompt="Explain how databases work", options=options):
if isinstance(message, StreamEvent):
event = message.event
if event.get("type") == "content_block_delta":
delta = event.get("delta", {})
if delta.get("type") == "text_delta":
# Print each text chunk as it arrives
print(delta.get("text", ""), end="", flush=True)

print()  # Final newline


asyncio.run(stream_text())

Потоковая передача вызовов инструментов

Вызовы инструментов также передаются потоком добавочно. Вы можете отследить, когда инструменты начинают работу, получить их входные данные по мере их генерации и увидеть, когда они завершаются. Пример ниже отслеживает текущий вызываемый инструмент и накапливает входные данные JSON по мере их поступления. Он использует три типа событий:

  • content_block_start: инструмент начинает работу
  • content_block_delta с input_json_delta: поступают фрагменты входных данных
  • content_block_stop: вызов инструмента завершен
from claude_agent_sdk import query, ClaudeAgentOptions
from claude_agent_sdk.types import StreamEvent
import asyncio


async def stream_tool_calls():
options = ClaudeAgentOptions(
include_partial_messages=True,
allowed_tools=["Read", "Bash"],
)

# Track the current tool and accumulate its input JSON
current_tool = None
tool_input = ""

async for message in query(prompt="Read the README.md file", options=options):
if isinstance(message, StreamEvent):
event = message.event
event_type = event.get("type")

if event_type == "content_block_start":
# New tool call is starting
content_block = event.get("content_block", {})
if content_block.get("type") == "tool_use":
current_tool = content_block.get("name")
tool_input = ""
print(f"Starting tool: {current_tool}")

elif event_type == "content_block_delta":
delta = event.get("delta", {})
if delta.get("type") == "input_json_delta":
# Accumulate JSON input as it streams in
chunk = delta.get("partial_json", "")
tool_input += chunk
print(f"  Input chunk: {chunk}")

elif event_type == "content_block_stop":
# Tool call complete - show final input
if current_tool:
print(f"Tool {current_tool} called with: {tool_input}")
current_tool = None


asyncio.run(stream_tool_calls())

Создание пользовательского интерфейса потоковой передачи

Этот пример объединяет потоковую передачу текста и инструментов в единый пользовательский интерфейс. Он отслеживает, выполняет ли агент в настоящее время инструмент (используя флаг in_tool), чтобы показать индикаторы состояния, такие как [Using Read...], пока работают инструменты. Текст передается потоком нормально, когда инструмент не используется, а завершение инструмента вызывает сообщение "done". Этот паттерн полезен для интерфейсов чата, которым нужно показывать прогресс во время многошаговых задач агента.

from claude_agent_sdk import query, ClaudeAgentOptions, ResultMessage
from claude_agent_sdk.types import StreamEvent
import asyncio
import sys


async def streaming_ui():
options = ClaudeAgentOptions(
include_partial_messages=True,
allowed_tools=["Read", "Bash", "Grep"],
)

# Track whether we're currently in a tool call
in_tool = False

async for message in query(
prompt="Find all TODO comments in the codebase", options=options
):
if isinstance(message, StreamEvent):
event = message.event
event_type = event.get("type")

if event_type == "content_block_start":
content_block = event.get("content_block", {})
if content_block.get("type") == "tool_use":
# Tool call is starting - show status indicator
tool_name = content_block.get("name")
print(f"\n[Using {tool_name}...]", end="", flush=True)
in_tool = True

elif event_type == "content_block_delta":
delta = event.get("delta", {})
# Only stream text when not executing a tool
if delta.get("type") == "text_delta" and not in_tool:
sys.stdout.write(delta.get("text", ""))
sys.stdout.flush()

elif event_type == "content_block_stop":
if in_tool:
# Tool call finished
print(" done", flush=True)
in_tool = False

elif isinstance(message, ResultMessage):
# Agent finished all work
print(f"\n\n--- Complete ---")


asyncio.run(streaming_ui())

Известные ограничения

  • Structured output: результат JSON появляется только в финальном ResultMessage.structured_output, а не как потоковые дельта-обновления. Подробнее см. в разделе structured outputs.

Следующие шаги

Теперь, когда вы можете передавать текст и вызовы инструментов потоком в реальном времени, изучите эти связанные темы: