PydanticAI ビジュアルガイド

Lesson CH08-L03

Pydantic Graph

状態と遷移を持つグラフでマルチステップ処理を組む。

読了目安
12 min
Colab目安
18 min
合計
30 min
前提
Hand-off

関連: 公式ドキュメント

一行サマリ

pydantic_graphBaseNode でノードを定義し、Graph(nodes=[...]) に集めて await graph.run(start_node, state=...) で実行する。各ノードの async def run(ctx) -> NextNode | End[T] の戻り型がそのままグラフの辺 になる、型で書くワークフローエンジン

ヒーロー: 状態 + 遷移を持つフロー

Hand-off の連鎖が 4 段以上になったり、条件分岐 / ループ / 永続化 / 再開 が必要になると、Python の if / await 記述ではすぐに保守不能になります。Pydantic Graph「ノード = 工程、辺 = 戻り型、状態 = dataclass」 という 3 概念だけで、この複雑さを型で扱える形に整理します。

図を読み込み中…
図1. Pydantic Graph の 3 要素

概念: 4 つの基本パーツ

パーツ役割
BaseNode[State, Deps, ReturnT]1 工程を表すクラス。async def run(ctx) を実装
GraphRunContext[State]ctx.state (共有状態) と ctx.deps を提供
End[T]グラフ終了マーカー。run の戻り値として返す
Graph(nodes=[A, B, C])ノード集合 + 実行エンジン

ノードは @dataclass で書くのが定石。フィールドはそのノードへの 入力パラメータ になります。

コード: 3 つのパターン

パターン 1: 最小グラフ (固定の 2 ノード直列)

from dataclasses import dataclass, field
from pydantic_graph import BaseNode, End, Graph, GraphRunContext
 
@dataclass
class CounterState:
    count: int = 0
 
@dataclass
class Increment(BaseNode[CounterState]):
    """状態のカウンタを 1 増やして次のノードへ。"""
 
    async def run(self, ctx: GraphRunContext[CounterState]) -> 'CheckDone':
        ctx.state.count += 1
        return CheckDone()
 
@dataclass
class CheckDone(BaseNode[CounterState, None, int]):
    """3 まで来たら終了、未満ならまた Increment へ。"""
 
    async def run(self, ctx: GraphRunContext[CounterState]) -> Increment | End[int]:
        if ctx.state.count >= 3:
            return End(ctx.state.count)
        return Increment()
 
graph = Graph(nodes=[Increment, CheckDone])
 
async def main():
    result = await graph.run(Increment(), state=CounterState())
    print(f'最終カウント: {result.output}')
 
# import asyncio; asyncio.run(main())

ポイント:

  • BaseNode[State]第 1 ジェネリック引数 = 共有 state の型
  • BaseNode[State, Deps, ReturnT]End を返すノードは ReturnT を明示
  • 戻り値型注釈 -> Increment | End[int]そのまま辺の宣言

パターン 2: Agent をノードに埋め込む (LLM 併用)

from dataclasses import dataclass, field
from pydantic_ai import Agent
from pydantic_ai.models.google import GoogleModel
from pydantic_graph import BaseNode, End, Graph, GraphRunContext
 
model = GoogleModel('gemini-3-flash-preview')
writer = Agent(model, instructions='与えられたお題で短い日本語の見出しを 1 つ作ってください。')
critic = Agent(model, instructions='与えられた見出しを「OK」か「NG」で 1 単語だけ返してください。')
 
@dataclass
class WriteState:
    topic: str
    headline: str = ''
    review: str = ''
    iteration: int = 0
 
@dataclass
class Write(BaseNode[WriteState]):
    async def run(self, ctx: GraphRunContext[WriteState]) -> 'Review':
        r = await writer.run(ctx.state.topic)
        ctx.state.headline = r.output
        ctx.state.iteration += 1
        return Review()
 
@dataclass
class Review(BaseNode[WriteState, None, str]):
    async def run(self, ctx: GraphRunContext[WriteState]) -> Write | End[str]:
        r = await critic.run(ctx.state.headline)
        ctx.state.review = r.output.strip()
        if ctx.state.review.startswith('OK') or ctx.state.iteration >= 3:
            return End(ctx.state.headline)
        return Write()  # NG なら再生成
 
graph = Graph(nodes=[Write, Review])
 
async def main():
    state = WriteState(topic='生成 AI のセキュリティ')
    result = await graph.run(Write(), state=state)
    print(f'採用見出し: {result.output} (試行 {state.iteration} 回)')

LLM ベースの 「書く → レビュー → 必要なら書き直す」 ループが 2 ノード + 状態 + 戻り型だけで表現できます。

パターン 3: Graph.iter で 1 ノードずつ進める (人間入力待ち)

ユーザー入力が要るゲームや承認フローでは graph.iter を使い、ノード単位で外部入力を差し込めます。

from dataclasses import dataclass
from pydantic_graph import BaseNode, End, Graph, GraphRunContext
 
@dataclass
class GuessState:
    target: int = 7
    last_guess: int | None = None
    attempts: int = 0
 
@dataclass
class AskGuess(BaseNode[GuessState]):
    async def run(self, ctx: GraphRunContext[GuessState]) -> 'CheckGuess':
        ctx.state.attempts += 1
        # 実際は input() / Web フォーム入力等
        return CheckGuess()
 
@dataclass
class CheckGuess(BaseNode[GuessState, None, str]):
    async def run(self, ctx: GraphRunContext[GuessState]) -> AskGuess | End[str]:
        if ctx.state.last_guess == ctx.state.target:
            return End(f'{ctx.state.attempts} 回で正解!')
        return AskGuess()
 
graph = Graph(nodes=[AskGuess, CheckGuess])
 
async def play():
    state = GuessState()
    async with graph.iter(AskGuess(), state=state) as run:
        async for node in run:
            if isinstance(node, AskGuess):
                state.last_guess = int(input('数を入力: '))
            if isinstance(node, End):
                print(node.data)
                break

iterノードに到達するたびに yield されるので、UI / Web リクエスト境界での停止 + 再開が自然に書けます。

図を読み込み中…
図2. Graph の選び方の判断

観察: 状態の永続化と再開

pydantic_graph.persistence.file.FileStatePersistence を使うと、ノード進行を JSON で保存して、Web リクエスト境界やプロセス再起動を跨いで再開できます。長時間ジョブや人間承認の挟まるワークフローと相性が良い機能。

from pathlib import Path
from pydantic_graph.persistence.file import FileStatePersistence
 
persistence = FileStatePersistence(Path('graph_state.json'))
await graph.initialize(start, state=state, persistence=persistence)
 
async with graph.iter_from_persistence(persistence) as run:
    node = await run.next()
    if isinstance(node, End):
        print('完了:', node.data)

まとめ

  • BaseNode でノードを書き、戻り型がそのままグラフの辺になる
  • 共有 state は @dataclass で定義、ctx.state でアクセス
  • Graph(nodes=[...]) + await graph.run(start, state=...) で実行
  • ループ / 条件分岐 / LLM ノード混在のワークフローに強い
  • Graph.iter で人間入力やリクエスト境界の停止 + 再開
  • FileStatePersistence で長時間ワークフローを永続化

🎉 Ch8 完結 (Multi-Agent) — Advanced グループ完結

  • ✅ L01 Delegation (親 → 子) / L02 Hand-off (制御移譲) / L03 Pydantic Graph (グラフ)
  • Advanced グループ (Ch6-Ch8) 全 9 レッスン完走

次の Production グループ (Ch9-Ch10) からは、本番運用に必要な 観測 (Logfire) / 評価 (Pydantic Evals) / 永続実行 (Durable) / UI 連携 / A2A に進みます。

Colab で実際に動かす

本レッスンの内容を Google Colab 上で実行できるノートブックを用意しています。下のボタンから自分のColab環境に開けます (要 Google アカウント / GOOGLE_API_KEY)。

Open in Colab

notebooks/ch08/03-pydantic-graph.ipynb