AIエージェントのレート制限対策 — API制限を賢く回避する
約5分で読めます
レート制限とは
APIプロバイダーは過負荷を防ぐためにリクエスト数を制限している。Anthropic APIの場合、Tier別にRPM(リクエスト/分)とTPM(トークン/分)の上限がある。エージェントを安定稼働させるには、この制限を理解した上で対策を実装する必要がある。
主要APIのレート制限一覧
| プロバイダー | Tier | RPM | TPM |
|---|---|---|---|
| Anthropic | Tier 1 | 50 | 40,000 |
| Anthropic | Tier 4 | 4,000 | 400,000 |
| OpenAI | Tier 1 | 60 | 60,000 |
| OpenAI | Tier 4 | 10,000 | 2,000,000 |
エージェントが複数のサブエージェントを並列実行すると、あっという間にRPM上限に到達する。
指数バックオフの実装
レート制限に引っかかったら、待機時間を指数的に増やしてリトライする。
import time
import random
from anthropic import Anthropic, RateLimitError
client = Anthropic()
def call_with_backoff(messages, max_retries=5):
"""指数バックオフ付きAPI呼び出し"""
for attempt in range(max_retries):
try:
return client.messages.create(
model="claude-sonnet-4-20250514",
max_tokens=1024,
messages=messages
)
except RateLimitError as e:
if attempt == max_retries - 1:
raise
# 指数バックオフ + ジッター
wait = min(2 ** attempt + random.uniform(0, 1), 60)
print(f"Rate limited. Retry in {wait:.1f}s "
f"(attempt {attempt + 1}/{max_retries})")
time.sleep(wait)
raise Exception("Max retries exceeded")
ジッター(ランダムな揺らぎ)を加えることで、複数エージェントが同時にリトライして再び制限に引っかかる「thundering herd」問題を防ぐ。
リクエストキューの実装
並列実行するサブエージェントのリクエストを、キューで制御する。
import asyncio
from collections import deque
from time import time
class RequestQueue:
"""RPM制限を守るリクエストキュー"""
def __init__(self, max_rpm=50):
self.max_rpm = max_rpm
self.timestamps = deque()
self.lock = asyncio.Lock()
async def acquire(self):
"""スロットが空くまで待機"""
async with self.lock:
now = time()
# 60秒より古いタイムスタンプを除去
while self.timestamps and \
self.timestamps[0] < now - 60:
self.timestamps.popleft()
if len(self.timestamps) >= self.max_rpm:
# 最古のリクエストから60秒後まで待機
wait = 60 - (now - self.timestamps[0])
await asyncio.sleep(max(wait, 0.1))
self.timestamps.append(time())
async def call_api(self, messages):
"""キュー制御付きAPI呼び出し"""
await self.acquire()
return await async_api_call(messages)
# 使用例
queue = RequestQueue(max_rpm=45) # 余裕を持って45に設定
async def run_agents():
tasks = [
queue.call_api(msg) for msg in agent_messages
]
return await asyncio.gather(*tasks)
max_rpmは公式上限より少し低く設定するのがコツ。バッファを持たせることで、制限到達を予防する。
モデルフォールバック戦略
高性能モデルが制限に達したら、軽量モデルに自動切替する。
MODEL_FALLBACK_CHAIN = [
{
"model": "claude-opus-4-20250514",
"use_for": "complex_reasoning",
"rpm_limit": 50
},
{
"model": "claude-sonnet-4-20250514",
"use_for": "general_tasks",
"rpm_limit": 100
},
{
"model": "claude-haiku-35-20241022",
"use_for": "simple_tasks",
"rpm_limit": 200
}
]
class ModelRouter:
def __init__(self):
self.queues = {
m["model"]: RequestQueue(m["rpm_limit"])
for m in MODEL_FALLBACK_CHAIN
}
self.blocked = set()
async def call(self, messages, complexity="general"):
"""複雑度に応じたモデル選択 + フォールバック"""
chain = self._get_chain(complexity)
for model_config in chain:
model = model_config["model"]
if model in self.blocked:
continue
try:
return await self.queues[model].call_api(
messages, model=model
)
except RateLimitError:
self.blocked.add(model)
# 60秒後にブロック解除
asyncio.create_task(
self._unblock(model, 60)
)
continue
raise Exception("All models exhausted")
複雑なタスクはOpusから試行し、制限に達したらSonnet、さらにHaikuへフォールバックする。タスクの複雑度に応じて開始モデルを変えることで、コストも最適化できる。
レスポンスヘッダの活用
APIレスポンスには残りリクエスト数の情報が含まれる。これを監視して事前に制御する。
def check_rate_headers(response):
"""レスポンスヘッダからレート状況を確認"""
remaining = int(response.headers.get(
"x-ratelimit-remaining", 0
))
reset_at = response.headers.get(
"x-ratelimit-reset"
)
if remaining < 5:
print(f"Warning: Only {remaining} requests left. "
f"Resets at {reset_at}")
return "throttle"
return "ok"
実運用でのベストプラクティス
| 対策 | 効果 | 実装難度 |
|---|---|---|
| 指数バックオフ | リトライ成功率95%以上 | 低 |
| リクエストキュー | RPM超過を完全防止 | 中 |
| モデルフォールバック | 可用性99.9%達成 | 中 |
| レスポンスヘッダ監視 | 事前警告で予防 | 低 |
| プロンプトキャッシュ | リクエスト数を50%削減 | 低 |
| バッチAPI活用 | RPM消費を大幅削減 | 中 |
関連記事
A
Agentive 編集部
AIエージェントを実際に使い倒す個人開発者。サイト制作の自動化を実践しながら、その知見を発信しています。