PydanticAI ビジュアルガイド

Lesson CH10-L01

Durable Execution

途中障害から再開できる耐久実行をAgentに与える。

読了目安
9 min
Colab目安
12 min
合計
21 min
前提
LLM-Judge

関連: 公式ドキュメント

一行サマリ

TemporalAgent(agent, models={...}) で既存 Agent をラップし、@workflow.defn のワークフロー内から呼ぶことで、プロセス・サーバが落ちても自動再開 できる耐久実行 (Durable Execution) を Agent に付与できる。Agent の name と Toolset の id安定的に固定 することが必須。

ヒーロー: 「落ちても続きから」

通常の agent.run_sync(...)プロセスが死んだら最初から。承認待ち / 長時間レンダリング / 複数 Agent のオーケストレーション ── 30 秒で終わらない処理は 「落ちる前提で書く」 のが本番運用の鉄則です。Durable Execution プラットフォーム (Temporal / DBOS / Prefect 等) と PydanticAI を組み合わせると、ワークフローの状態が外部に永続化 され、自動でリプレイ再開されます。

図を読み込み中…
図1. 通常実行 vs Durable Execution

概念: Temporal 統合の 3 部品

部品役割
TemporalAgent既存 Agent をワークフローから呼べる形にラップ
@workflow.defn クラス決定的ロジック (Agent 呼び出しを並べる場所)
Worker プロセスワークフローを実行する常駐プロセス

PydanticAI 側では モデル呼び出し / tool 実行 / MCP 通信Activity として自動的に外出しされ、各 Activity の入出力が Temporal のイベント履歴に保存されます。Worker が落ちても、別 Worker が履歴をリプレイして 未完了 Activity から再開 します。

コード: 3 つのパターン

パターン 1: 既存 Agent を TemporalAgent でラップ

from pydantic_ai import Agent
from pydantic_ai.models.google import GoogleModel
from pydantic_ai.durable_exec.temporal import TemporalAgent
 
# 普通の Agent
base_agent = Agent(
    GoogleModel('gemini-3-flash-preview'),
    name='customer_helper',  # ← 必須: Activity 名の安定化
    instructions='ユーザーの質問に丁寧な日本語で答えてください。',
)
 
# Temporal 用にラップ
temporal_agent = TemporalAgent(
    base_agent,
    models={'gemini-3-flash-preview': GoogleModel('gemini-3-flash-preview')},
)

ポイント:

  • 既存 Agent コードは ほとんど無変更。ラッパーで包むだけ
  • name='customer_helper' を設定 → Temporal Activity 名が安定 (デプロイ間で変わると履歴と不整合)
  • models={...} でモデルインスタンスを 事前登録 (Worker 側でシリアライズ可能にするため)

パターン 2: ワークフロー内から呼ぶ

from datetime import timedelta
from temporalio import workflow
 
@workflow.defn
class CustomerHelpWorkflow:
    @workflow.run
    async def run(self, question: str) -> str:
        # ワークフロー内では agent.run() がそのまま使える
        result = await temporal_agent.run(
            question,
            # 個別 Activity のタイムアウト設定
            start_to_close_timeout=timedelta(seconds=60),
        )
        return result.output

ワークフロー側のコードは 「Agent を呼ぶ普通の async 関数」 として書けます。Temporal が水面下で Activity 化しているため、開発体験を壊さずに永続化が手に入る形です。

パターン 3: タイムアウト・リトライ・Logfire 連携

from datetime import timedelta
from temporalio.common import RetryPolicy
from pydantic_ai.durable_exec.temporal import TemporalAgent
 
temporal_agent = TemporalAgent(
    base_agent,
    models={...},
    # 全 Activity 共通の設定
    activity_config={
        'start_to_close_timeout': timedelta(minutes=2),
        'retry_policy': RetryPolicy(maximum_attempts=3),
    },
    # モデル呼び出しだけ別設定
    model_activity_config={
        'start_to_close_timeout': timedelta(seconds=30),
    },
)

タイムアウトとリトライポリシーは Activity 単位 で細かく制御できます。Logfire (Ch9-L01) は Temporal と PydanticAI の両方をまたいだ trace を出してくれるので、本番運用では LogfirePlugin() を Temporal Client にも設定するのが定石。

図を読み込み中…
図2. Durable Execution の選択ガイド

観察: 何が永続化されるか

Temporal が記録するのは Activity の入出力イベント履歴:

  • ユーザー入力
  • 各モデル呼び出しのリクエスト・レスポンス
  • 各 tool 呼び出しの引数・戻り値
  • 中断時の例外スタック

これらをリプレイすることで「最後に成功した Activity の から再開」が成立します。ワークフロー本体は決定的 に書く必要があり、random.random() のような非決定値は Activity に追い出すのが鉄則。

まとめ

  • TemporalAgent(agent, models={...}) で既存 Agent を耐久実行に対応化
  • ワークフロー内では await temporal_agent.run(...) を普通に呼ぶ
  • Activity 名 = Agent name → デプロイ間で固定 が鉄則
  • タイムアウト / リトライは activity_config / model_activity_config で制御
  • Logfire (Ch9-L01) と組み合わせると 1 trace で全貌が見える

次レッスンでは UI Event Streamsagent.run_stream で出力を chunk ごとにユーザーに届けるパターンを扱います。

Colab で実際に動かす

本レッスンの内容を Google Colab 上で実行できるノートブックを用意しています。下のボタンから自分のColab環境に開けます (要 Google アカウント / GOOGLE_API_KEY)。

Open in Colab

notebooks/ch10/01-durable-execution.ipynb