Lesson CH08-L03
Pydantic Graph
状態と遷移を持つグラフでマルチステップ処理を組む。
- 読了目安
- 12 min
- Colab目安
- 18 min
- 合計
- 30 min
- 前提
- Hand-off
関連: 公式ドキュメント
一行サマリ
pydantic_graph の BaseNode でノードを定義し、Graph(nodes=[...]) に集めて await graph.run(start_node, state=...) で実行する。各ノードの async def run(ctx) -> NextNode | End[T] の戻り型がそのままグラフの辺 になる、型で書くワークフローエンジン。
ヒーロー: 状態 + 遷移を持つフロー
Hand-off の連鎖が 4 段以上になったり、条件分岐 / ループ / 永続化 / 再開 が必要になると、Python の if / await 記述ではすぐに保守不能になります。Pydantic Graph は 「ノード = 工程、辺 = 戻り型、状態 = dataclass」 という 3 概念だけで、この複雑さを型で扱える形に整理します。
概念: 4 つの基本パーツ
| パーツ | 役割 |
|---|---|
BaseNode[State, Deps, ReturnT] | 1 工程を表すクラス。async def run(ctx) を実装 |
GraphRunContext[State] | ctx.state (共有状態) と ctx.deps を提供 |
End[T] | グラフ終了マーカー。run の戻り値として返す |
Graph(nodes=[A, B, C]) | ノード集合 + 実行エンジン |
ノードは @dataclass で書くのが定石。フィールドはそのノードへの 入力パラメータ になります。
コード: 3 つのパターン
パターン 1: 最小グラフ (固定の 2 ノード直列)
from dataclasses import dataclass, field
from pydantic_graph import BaseNode, End, Graph, GraphRunContext
@dataclass
class CounterState:
count: int = 0
@dataclass
class Increment(BaseNode[CounterState]):
"""状態のカウンタを 1 増やして次のノードへ。"""
async def run(self, ctx: GraphRunContext[CounterState]) -> 'CheckDone':
ctx.state.count += 1
return CheckDone()
@dataclass
class CheckDone(BaseNode[CounterState, None, int]):
"""3 まで来たら終了、未満ならまた Increment へ。"""
async def run(self, ctx: GraphRunContext[CounterState]) -> Increment | End[int]:
if ctx.state.count >= 3:
return End(ctx.state.count)
return Increment()
graph = Graph(nodes=[Increment, CheckDone])
async def main():
result = await graph.run(Increment(), state=CounterState())
print(f'最終カウント: {result.output}')
# import asyncio; asyncio.run(main())ポイント:
BaseNode[State]の 第 1 ジェネリック引数 = 共有 state の型BaseNode[State, Deps, ReturnT]で End を返すノードは ReturnT を明示- 戻り値型注釈
-> Increment | End[int]が そのまま辺の宣言
パターン 2: Agent をノードに埋め込む (LLM 併用)
from dataclasses import dataclass, field
from pydantic_ai import Agent
from pydantic_ai.models.google import GoogleModel
from pydantic_graph import BaseNode, End, Graph, GraphRunContext
model = GoogleModel('gemini-3-flash-preview')
writer = Agent(model, instructions='与えられたお題で短い日本語の見出しを 1 つ作ってください。')
critic = Agent(model, instructions='与えられた見出しを「OK」か「NG」で 1 単語だけ返してください。')
@dataclass
class WriteState:
topic: str
headline: str = ''
review: str = ''
iteration: int = 0
@dataclass
class Write(BaseNode[WriteState]):
async def run(self, ctx: GraphRunContext[WriteState]) -> 'Review':
r = await writer.run(ctx.state.topic)
ctx.state.headline = r.output
ctx.state.iteration += 1
return Review()
@dataclass
class Review(BaseNode[WriteState, None, str]):
async def run(self, ctx: GraphRunContext[WriteState]) -> Write | End[str]:
r = await critic.run(ctx.state.headline)
ctx.state.review = r.output.strip()
if ctx.state.review.startswith('OK') or ctx.state.iteration >= 3:
return End(ctx.state.headline)
return Write() # NG なら再生成
graph = Graph(nodes=[Write, Review])
async def main():
state = WriteState(topic='生成 AI のセキュリティ')
result = await graph.run(Write(), state=state)
print(f'採用見出し: {result.output} (試行 {state.iteration} 回)')LLM ベースの 「書く → レビュー → 必要なら書き直す」 ループが 2 ノード + 状態 + 戻り型だけで表現できます。
パターン 3: Graph.iter で 1 ノードずつ進める (人間入力待ち)
ユーザー入力が要るゲームや承認フローでは graph.iter を使い、ノード単位で外部入力を差し込めます。
from dataclasses import dataclass
from pydantic_graph import BaseNode, End, Graph, GraphRunContext
@dataclass
class GuessState:
target: int = 7
last_guess: int | None = None
attempts: int = 0
@dataclass
class AskGuess(BaseNode[GuessState]):
async def run(self, ctx: GraphRunContext[GuessState]) -> 'CheckGuess':
ctx.state.attempts += 1
# 実際は input() / Web フォーム入力等
return CheckGuess()
@dataclass
class CheckGuess(BaseNode[GuessState, None, str]):
async def run(self, ctx: GraphRunContext[GuessState]) -> AskGuess | End[str]:
if ctx.state.last_guess == ctx.state.target:
return End(f'{ctx.state.attempts} 回で正解!')
return AskGuess()
graph = Graph(nodes=[AskGuess, CheckGuess])
async def play():
state = GuessState()
async with graph.iter(AskGuess(), state=state) as run:
async for node in run:
if isinstance(node, AskGuess):
state.last_guess = int(input('数を入力: '))
if isinstance(node, End):
print(node.data)
breakiter は ノードに到達するたびに yield されるので、UI / Web リクエスト境界での停止 + 再開が自然に書けます。
観察: 状態の永続化と再開
pydantic_graph.persistence.file.FileStatePersistence を使うと、ノード進行を JSON で保存して、Web リクエスト境界やプロセス再起動を跨いで再開できます。長時間ジョブや人間承認の挟まるワークフローと相性が良い機能。
from pathlib import Path
from pydantic_graph.persistence.file import FileStatePersistence
persistence = FileStatePersistence(Path('graph_state.json'))
await graph.initialize(start, state=state, persistence=persistence)
async with graph.iter_from_persistence(persistence) as run:
node = await run.next()
if isinstance(node, End):
print('完了:', node.data)まとめ
BaseNodeでノードを書き、戻り型がそのままグラフの辺になる- 共有 state は
@dataclassで定義、ctx.stateでアクセス Graph(nodes=[...])+await graph.run(start, state=...)で実行- ループ / 条件分岐 / LLM ノード混在のワークフローに強い
Graph.iterで人間入力やリクエスト境界の停止 + 再開FileStatePersistenceで長時間ワークフローを永続化
🎉 Ch8 完結 (Multi-Agent) — Advanced グループ完結
- ✅ L01 Delegation (親 → 子) / L02 Hand-off (制御移譲) / L03 Pydantic Graph (グラフ)
- ✅ Advanced グループ (Ch6-Ch8) 全 9 レッスン完走
次の Production グループ (Ch9-Ch10) からは、本番運用に必要な 観測 (Logfire) / 評価 (Pydantic Evals) / 永続実行 (Durable) / UI 連携 / A2A に進みます。
Colab で実際に動かす
本レッスンの内容を Google Colab 上で実行できるノートブックを用意しています。下のボタンから自分のColab環境に開けます (要 Google アカウント / GOOGLE_API_KEY)。
notebooks/ch08/03-pydantic-graph.ipynb