Lesson CH10-L01
Durable Execution
途中障害から再開できる耐久実行をAgentに与える。
- 読了目安
- 9 min
- Colab目安
- 12 min
- 合計
- 21 min
- 前提
- LLM-Judge
関連: 公式ドキュメント
一行サマリ
TemporalAgent(agent, models={...}) で既存 Agent をラップし、@workflow.defn のワークフロー内から呼ぶことで、プロセス・サーバが落ちても自動再開 できる耐久実行 (Durable Execution) を Agent に付与できる。Agent の name と Toolset の id を 安定的に固定 することが必須。
ヒーロー: 「落ちても続きから」
通常の agent.run_sync(...) は プロセスが死んだら最初から。承認待ち / 長時間レンダリング / 複数 Agent のオーケストレーション ── 30 秒で終わらない処理は 「落ちる前提で書く」 のが本番運用の鉄則です。Durable Execution プラットフォーム (Temporal / DBOS / Prefect 等) と PydanticAI を組み合わせると、ワークフローの状態が外部に永続化 され、自動でリプレイ再開されます。
概念: Temporal 統合の 3 部品
| 部品 | 役割 |
|---|---|
TemporalAgent | 既存 Agent をワークフローから呼べる形にラップ |
@workflow.defn クラス | 決定的ロジック (Agent 呼び出しを並べる場所) |
| Worker プロセス | ワークフローを実行する常駐プロセス |
PydanticAI 側では モデル呼び出し / tool 実行 / MCP 通信 が Activity として自動的に外出しされ、各 Activity の入出力が Temporal のイベント履歴に保存されます。Worker が落ちても、別 Worker が履歴をリプレイして 未完了 Activity から再開 します。
コード: 3 つのパターン
パターン 1: 既存 Agent を TemporalAgent でラップ
from pydantic_ai import Agent
from pydantic_ai.models.google import GoogleModel
from pydantic_ai.durable_exec.temporal import TemporalAgent
# 普通の Agent
base_agent = Agent(
GoogleModel('gemini-3-flash-preview'),
name='customer_helper', # ← 必須: Activity 名の安定化
instructions='ユーザーの質問に丁寧な日本語で答えてください。',
)
# Temporal 用にラップ
temporal_agent = TemporalAgent(
base_agent,
models={'gemini-3-flash-preview': GoogleModel('gemini-3-flash-preview')},
)ポイント:
- 既存 Agent コードは ほとんど無変更。ラッパーで包むだけ
name='customer_helper'を設定 → Temporal Activity 名が安定 (デプロイ間で変わると履歴と不整合)models={...}でモデルインスタンスを 事前登録 (Worker 側でシリアライズ可能にするため)
パターン 2: ワークフロー内から呼ぶ
from datetime import timedelta
from temporalio import workflow
@workflow.defn
class CustomerHelpWorkflow:
@workflow.run
async def run(self, question: str) -> str:
# ワークフロー内では agent.run() がそのまま使える
result = await temporal_agent.run(
question,
# 個別 Activity のタイムアウト設定
start_to_close_timeout=timedelta(seconds=60),
)
return result.outputワークフロー側のコードは 「Agent を呼ぶ普通の async 関数」 として書けます。Temporal が水面下で Activity 化しているため、開発体験を壊さずに永続化が手に入る形です。
パターン 3: タイムアウト・リトライ・Logfire 連携
from datetime import timedelta
from temporalio.common import RetryPolicy
from pydantic_ai.durable_exec.temporal import TemporalAgent
temporal_agent = TemporalAgent(
base_agent,
models={...},
# 全 Activity 共通の設定
activity_config={
'start_to_close_timeout': timedelta(minutes=2),
'retry_policy': RetryPolicy(maximum_attempts=3),
},
# モデル呼び出しだけ別設定
model_activity_config={
'start_to_close_timeout': timedelta(seconds=30),
},
)タイムアウトとリトライポリシーは Activity 単位 で細かく制御できます。Logfire (Ch9-L01) は Temporal と PydanticAI の両方をまたいだ trace を出してくれるので、本番運用では LogfirePlugin() を Temporal Client にも設定するのが定石。
観察: 何が永続化されるか
Temporal が記録するのは Activity の入出力イベント履歴:
- ユーザー入力
- 各モデル呼び出しのリクエスト・レスポンス
- 各 tool 呼び出しの引数・戻り値
- 中断時の例外スタック
これらをリプレイすることで「最後に成功した Activity の 次 から再開」が成立します。ワークフロー本体は決定的 に書く必要があり、random.random() のような非決定値は Activity に追い出すのが鉄則。
まとめ
TemporalAgent(agent, models={...})で既存 Agent を耐久実行に対応化- ワークフロー内では
await temporal_agent.run(...)を普通に呼ぶ - Activity 名 = Agent name → デプロイ間で固定 が鉄則
- タイムアウト / リトライは
activity_config/model_activity_configで制御 - Logfire (Ch9-L01) と組み合わせると 1 trace で全貌が見える
次レッスンでは UI Event Streams — agent.run_stream で出力を chunk ごとにユーザーに届けるパターンを扱います。
Colab で実際に動かす
本レッスンの内容を Google Colab 上で実行できるノートブックを用意しています。下のボタンから自分のColab環境に開けます (要 Google アカウント / GOOGLE_API_KEY)。
notebooks/ch10/01-durable-execution.ipynb