Lesson CH10-L02
UI Event Streams
AgentイベントをUI向けに流し、リアルタイム表示する。
- 読了目安
- 10 min
- Colab目安
- 14 min
- 合計
- 24 min
- 前提
- Durable Execution
関連: 公式ドキュメント
一行サマリ
async with agent.run_stream(...) as result: ブロック内で async for chunk in result.stream_text(delta=True): するだけで、Agent 出力を chunk ごとに 取り出して UI にリアルタイム表示できる。agent.iter() を使えば node 単位 で tool 呼出 / モデル応答を細かく介入することも可能。
ヒーロー: 「ChatGPT のあのタイピング表示」を作る
agent.run_sync(...) は応答完成を待ってから返します。ChatGPT のように 1 文字ずつ出るあの体験 は、内部で delta streaming が動いています。PydanticAI でも agent.run_stream で同じ体験を提供でき、長文応答の 「待ち時間ゼロ感」 を作れます。
概念: 3 つのストリーミング API
| API | 何が取れる | 用途 |
|---|---|---|
result.stream_text(delta=True) | 新規分のテキスト chunk | チャット UI のタイピング表示 |
result.stream_output() | 部分検証された 構造化出力 | フォーム自動入力 / 段階的表示 |
agent.iter() | ノード単位 のイベント (tool 呼出 / モデル応答) | tool 進捗バー / 内部状態の可視化 |
最も使うのは stream_text(delta=True)。チャット UI の 90% はこれで足ります。stream_output は構造化出力 (Ch4) を 完成前から段階表示 したいときに。
コード: 3 つのパターン
パターン 1: テキストを chunk ごとに print
import asyncio
from pydantic_ai import Agent
from pydantic_ai.models.google import GoogleModel
agent = Agent(
GoogleModel('gemini-3-flash-preview'),
instructions='与えられたお題で短い物語を日本語で書いてください。',
)
async def main():
async with agent.run_stream('森の中を歩く猫の話') as result:
async for chunk in result.stream_text(delta=True):
print(chunk, end='', flush=True)
print() # 末尾改行
asyncio.run(main())ポイント:
async with agent.run_stream(...) as result:で 接続のライフサイクル を管理delta=Trueで 新規分のみ 流れる (Falseにすると累積文字列が毎回流れる)flush=Trueで stdout バッファを毎 chunk フラッシュ → ターミナルでもタイピング体験
パターン 2: 構造化出力を段階的に表示
from pydantic import BaseModel, Field
from pydantic_ai import Agent
from pydantic_ai.models.google import GoogleModel
class Profile(BaseModel):
name: str = Field(description='氏名')
role: str = Field(description='役職')
bio_ja: str = Field(description='100 字以内のバイオ')
agent = Agent(
GoogleModel('gemini-3-flash-preview'),
output_type=Profile,
instructions='与えられた人物名で架空のプロフィールを作ってください。',
)
async def main():
async with agent.run_stream('山田太郎') as result:
async for partial in result.stream_output():
print(f'name={partial.name!r}, role={partial.role!r}, bio={partial.bio_ja!r}')
asyncio.run(main())
# name='', role='', bio=''
# name='山田太郎', role='', bio=''
# name='山田太郎', role='エンジニア', bio=''
# name='山田太郎', role='エンジニア', bio='Python と Web 開発が...'部分検証された Pydantic モデルが field が埋まるたび 流れてくるので、フォームを段階表示したり、進捗インジケータを出したりできます。
パターン 3: agent.iter() で tool 呼出も観測
import asyncio
from pydantic_ai import Agent, RunContext
from pydantic_ai.models.google import GoogleModel
agent = Agent(GoogleModel('gemini-3-flash-preview'))
@agent.tool
async def search(ctx: RunContext, query: str) -> str:
"""検索ツール (擬似)。"""
return f'{query} の検索結果: ...'
async def main():
async with agent.iter('東京の天気を検索して') as run:
async for node in run:
print(f' → node: {type(node).__name__}')
if Agent.is_model_request_node(node):
# モデルの応答を chunk で観測
async with node.stream(run.ctx) as stream:
async for event in stream:
# FunctionToolCallEvent / PartDeltaEvent / FinalResultEvent etc.
print(f' event: {type(event).__name__}')
asyncio.run(main())agent.iter() は 最も低レベル のループ。tool が呼ばれた瞬間に UI で「○○を実行中...」と出したり、モデル応答 chunk を細かく加工したりに使います。
観察: キャンセルとエラー時の状態
ユーザーが UI で「停止」を押したら await result.cancel()。応答は state='interrupted' になり、部分的な tool 呼出引数が含まれることがある ので、後段処理は state チェックを必ず入れます。
async with agent.run_stream(prompt) as result:
async for chunk in result.stream_text(delta=True):
if user_pressed_cancel:
await result.cancel()
break
if result.state == 'interrupted':
# 部分結果は使わない、または明示的にクリーンアップ
...Web で配信するなら Server-Sent Events (SSE) か WebSocket で chunk を JSON 化して流すのが一般的。FastAPI なら StreamingResponse と組み合わせる。
まとめ
async with agent.run_stream(...) as result:+result.stream_text(delta=True)がチャット UI の基本- 構造化出力の段階表示は
result.stream_output() - tool 呼出や内部 event を細かく見るなら
agent.iter()+node.stream(run.ctx) - キャンセル時は
await result.cancel()+state == 'interrupted'チェック - Web では SSE / WebSocket と組み合わせて chunk を配信
次レッスンでは A2A — agent.to_a2a() で Agent を標準プロトコルのサーバーとして公開し、他社・他言語の Agent と相互運用するパターンを扱います。
Colab で実際に動かす
本レッスンの内容を Google Colab 上で実行できるノートブックを用意しています。下のボタンから自分のColab環境に開けます (要 Google アカウント / GOOGLE_API_KEY)。
notebooks/ch10/02-ui-event-streams.ipynb