PydanticAI ビジュアルガイド

Lesson CH10-L02

UI Event Streams

AgentイベントをUI向けに流し、リアルタイム表示する。

読了目安
10 min
Colab目安
14 min
合計
24 min
前提
Durable Execution

関連: 公式ドキュメント

一行サマリ

async with agent.run_stream(...) as result: ブロック内で async for chunk in result.stream_text(delta=True): するだけで、Agent 出力を chunk ごとに 取り出して UI にリアルタイム表示できる。agent.iter() を使えば node 単位 で tool 呼出 / モデル応答を細かく介入することも可能。

ヒーロー: 「ChatGPT のあのタイピング表示」を作る

agent.run_sync(...) は応答完成を待ってから返します。ChatGPT のように 1 文字ずつ出るあの体験 は、内部で delta streaming が動いています。PydanticAI でも agent.run_stream で同じ体験を提供でき、長文応答の 「待ち時間ゼロ感」 を作れます。

図を読み込み中…
図1. run_sync vs run_stream のレイテンシ体感

概念: 3 つのストリーミング API

API何が取れる用途
result.stream_text(delta=True)新規分のテキスト chunkチャット UI のタイピング表示
result.stream_output()部分検証された 構造化出力フォーム自動入力 / 段階的表示
agent.iter()ノード単位 のイベント (tool 呼出 / モデル応答)tool 進捗バー / 内部状態の可視化

最も使うのは stream_text(delta=True)。チャット UI の 90% はこれで足ります。stream_output は構造化出力 (Ch4) を 完成前から段階表示 したいときに。

コード: 3 つのパターン

パターン 1: テキストを chunk ごとに print

import asyncio
from pydantic_ai import Agent
from pydantic_ai.models.google import GoogleModel
 
agent = Agent(
    GoogleModel('gemini-3-flash-preview'),
    instructions='与えられたお題で短い物語を日本語で書いてください。',
)
 
async def main():
    async with agent.run_stream('森の中を歩く猫の話') as result:
        async for chunk in result.stream_text(delta=True):
            print(chunk, end='', flush=True)
        print()  # 末尾改行
 
asyncio.run(main())

ポイント:

  • async with agent.run_stream(...) as result:接続のライフサイクル を管理
  • delta=True新規分のみ 流れる (False にすると累積文字列が毎回流れる)
  • flush=True で stdout バッファを毎 chunk フラッシュ → ターミナルでもタイピング体験

パターン 2: 構造化出力を段階的に表示

from pydantic import BaseModel, Field
from pydantic_ai import Agent
from pydantic_ai.models.google import GoogleModel
 
class Profile(BaseModel):
    name: str = Field(description='氏名')
    role: str = Field(description='役職')
    bio_ja: str = Field(description='100 字以内のバイオ')
 
agent = Agent(
    GoogleModel('gemini-3-flash-preview'),
    output_type=Profile,
    instructions='与えられた人物名で架空のプロフィールを作ってください。',
)
 
async def main():
    async with agent.run_stream('山田太郎') as result:
        async for partial in result.stream_output():
            print(f'name={partial.name!r}, role={partial.role!r}, bio={partial.bio_ja!r}')
 
asyncio.run(main())
# name='', role='', bio=''
# name='山田太郎', role='', bio=''
# name='山田太郎', role='エンジニア', bio=''
# name='山田太郎', role='エンジニア', bio='Python と Web 開発が...'

部分検証された Pydantic モデルが field が埋まるたび 流れてくるので、フォームを段階表示したり、進捗インジケータを出したりできます。

パターン 3: agent.iter() で tool 呼出も観測

import asyncio
from pydantic_ai import Agent, RunContext
from pydantic_ai.models.google import GoogleModel
 
agent = Agent(GoogleModel('gemini-3-flash-preview'))
 
@agent.tool
async def search(ctx: RunContext, query: str) -> str:
    """検索ツール (擬似)。"""
    return f'{query} の検索結果: ...'
 
async def main():
    async with agent.iter('東京の天気を検索して') as run:
        async for node in run:
            print(f'  → node: {type(node).__name__}')
            if Agent.is_model_request_node(node):
                # モデルの応答を chunk で観測
                async with node.stream(run.ctx) as stream:
                    async for event in stream:
                        # FunctionToolCallEvent / PartDeltaEvent / FinalResultEvent etc.
                        print(f'    event: {type(event).__name__}')
 
asyncio.run(main())

agent.iter()最も低レベル のループ。tool が呼ばれた瞬間に UI で「○○を実行中...」と出したり、モデル応答 chunk を細かく加工したりに使います。

図を読み込み中…
図2. ストリーミング API の選び方

観察: キャンセルとエラー時の状態

ユーザーが UI で「停止」を押したら await result.cancel()。応答は state='interrupted' になり、部分的な tool 呼出引数が含まれることがある ので、後段処理は state チェックを必ず入れます。

async with agent.run_stream(prompt) as result:
    async for chunk in result.stream_text(delta=True):
        if user_pressed_cancel:
            await result.cancel()
            break
 
if result.state == 'interrupted':
    # 部分結果は使わない、または明示的にクリーンアップ
    ...

Web で配信するなら Server-Sent Events (SSE)WebSocket で chunk を JSON 化して流すのが一般的。FastAPI なら StreamingResponse と組み合わせる。

まとめ

  • async with agent.run_stream(...) as result: + result.stream_text(delta=True) がチャット UI の基本
  • 構造化出力の段階表示は result.stream_output()
  • tool 呼出や内部 event を細かく見るなら agent.iter() + node.stream(run.ctx)
  • キャンセル時は await result.cancel() + state == 'interrupted' チェック
  • Web では SSE / WebSocket と組み合わせて chunk を配信

次レッスンでは A2Aagent.to_a2a() で Agent を標準プロトコルのサーバーとして公開し、他社・他言語の Agent と相互運用するパターンを扱います。

Colab で実際に動かす

本レッスンの内容を Google Colab 上で実行できるノートブックを用意しています。下のボタンから自分のColab環境に開けます (要 Google アカウント / GOOGLE_API_KEY)。

Open in Colab

notebooks/ch10/02-ui-event-streams.ipynb