AIでデータパイプライン構築 — ETL処理をClaude Codeで自動化
約6分で読めます
AIでデータパイプライン構築
手動でCSVを加工してDBに投入する作業は、エンジニアの時間を最も無駄にする反復作業のひとつだ。AIを使えば、ETL(Extract・Transform・Load)パイプラインを数分で構築できる。
ETLパイプラインの全体像
データソース(CSV/API/Web)
↓
Extract: データ取得・読込
↓
Transform: クレンジング・型変換・正規化
↓
Load: DB投入・ファイル出力
↓
Verify: 件数チェック・整合性検証
Claude Codeに「このCSVをPostgreSQLに取り込むETLスクリプトを書いて」と指示するだけで、スキーマ推論からバリデーションまで含めた実用的なコードが生成される。
Extract: データソースからの取得
CSVファイルの読込
import pandas as pd
from pathlib import Path
def extract_csv(file_path: str, encoding: str = "utf-8") -> pd.DataFrame:
"""CSVファイルの読込。文字コード自動検出付き"""
try:
df = pd.read_csv(file_path, encoding=encoding)
except UnicodeDecodeError:
# 日本語CSVでよくあるShift_JISフォールバック
df = pd.read_csv(file_path, encoding="shift_jis")
print(f"読込完了: {len(df)}行 x {len(df.columns)}列")
print(f"カラム: {list(df.columns)}")
return df
Web APIからの取得
import requests
def extract_from_api(url: str, headers: dict = None) -> list[dict]:
"""REST APIからデータを取得。ページネーション対応"""
all_data = []
page = 1
while True:
response = requests.get(
url, headers=headers, params={"page": page, "per_page": 100}
)
response.raise_for_status()
data = response.json()
if not data:
break
all_data.extend(data)
page += 1
print(f"API取得完了: {len(all_data)}件")
return all_data
Webスクレイピングによる取得
from playwright.async_api import async_playwright
async def extract_from_web(url: str, selector: str) -> list[dict]:
"""Playwrightでレンダリング済みページからデータ取得"""
async with async_playwright() as p:
browser = await p.chromium.launch()
page = await browser.new_page()
await page.goto(url, wait_until="networkidle")
elements = await page.query_selector_all(selector)
data = []
for el in elements:
text = await el.text_content()
data.append({"text": text.strip()})
await browser.close()
return data
Transform: データの変換とクレンジング
基本的なデータクレンジング
def transform_data(df: pd.DataFrame) -> pd.DataFrame:
"""データクレンジングと型変換"""
# 1. カラム名を正規化(全角→半角、スペース→アンダースコア)
df.columns = [
col.strip().replace(" ", "_").replace("\u3000", "_").lower()
for col in df.columns
]
# 2. 空白行の除去
df = df.dropna(how="all")
# 3. 日付カラムの型変換
date_cols = [c for c in df.columns if "date" in c or "日付" in c]
for col in date_cols:
df[col] = pd.to_datetime(df[col], errors="coerce")
# 4. 数値カラムのクレンジング(カンマ除去)
for col in df.select_dtypes(include="object").columns:
try:
df[col] = df[col].str.replace(",", "").astype(float)
except (ValueError, AttributeError):
pass
# 5. 重複除去
before = len(df)
df = df.drop_duplicates()
print(f"重複除去: {before - len(df)}行削除")
return df
AIによるデータ分類・補完
Claude APIを使って、テキストデータの自動分類やカテゴリ補完を行う。
import anthropic
client = anthropic.Anthropic()
def ai_classify(texts: list[str], categories: list[str]) -> list[str]:
"""AIでテキストをカテゴリ分類"""
results = []
for text in texts:
response = client.messages.create(
model="claude-haiku-35-20241022",
max_tokens=50,
messages=[{
"role": "user",
"content": f"以下のテキストを{categories}のいずれかに分類してください。"
f"カテゴリ名のみ回答:\n{text}"
}]
)
results.append(response.content[0].text.strip())
return results
Load: データベースへの投入
PostgreSQLへのロード
from sqlalchemy import create_engine, text
def load_to_postgres(df: pd.DataFrame, table_name: str, db_url: str):
"""DataFrameをPostgreSQLに投入"""
engine = create_engine(db_url)
# テーブルが存在しない場合は自動作成
df.to_sql(table_name, engine, if_exists="append", index=False)
# 件数検証
with engine.connect() as conn:
count = conn.execute(
text(f"SELECT COUNT(*) FROM {table_name}")
).scalar()
print(f"DB投入完了: {table_name} = {count}件")
Supabase(PostgreSQL)へのロード
// Node.js: Supabaseへのバッチ投入
const { createClient } = require('@supabase/supabase-js');
const supabase = createClient(
process.env.SUPABASE_URL,
process.env.SUPABASE_KEY
);
async function loadToSupabase(data, tableName) {
const batchSize = 1000;
let inserted = 0;
for (let i = 0; i < data.length; i += batchSize) {
const batch = data.slice(i, i + batchSize);
const { error } = await supabase
.from(tableName)
.insert(batch);
if (error) throw new Error(`Batch ${i}: ${error.message}`);
inserted += batch.length;
}
console.log(`${tableName}: ${inserted}件投入完了`);
}
Verify: データ整合性の検証
ETL完了後に、データの正確性を自動検証する。
def verify_etl(source_df: pd.DataFrame, table_name: str, engine):
"""ETL結果の整合性チェック"""
checks = []
# 件数一致チェック
with engine.connect() as conn:
db_count = conn.execute(
text(f"SELECT COUNT(*) FROM {table_name}")
).scalar()
checks.append({
"check": "行数一致",
"source": len(source_df),
"db": db_count,
"pass": len(source_df) == db_count
})
for c in checks:
status = "PASS" if c["pass"] else "FAIL"
print(f" [{status}] {c['check']}")
return all(c["pass"] for c in checks)
完全なETLパイプラインの組み立て
上記のコンポーネントを組み合わせた完全なパイプライン。
def run_etl_pipeline(csv_path: str, db_url: str, table_name: str):
"""ETLパイプライン一気通貫実行"""
print("=== ETL Pipeline Start ===")
# Extract
df = extract_csv(csv_path)
# Transform
df = transform_data(df)
# Load
load_to_postgres(df, table_name, db_url)
# Verify
engine = create_engine(db_url)
success = verify_etl(df, table_name, engine)
status = "SUCCESS" if success else "FAILED"
print(f"=== ETL Pipeline {status} ===")
return success
# 実行
run_etl_pipeline(
csv_path="data/sales_2026.csv",
db_url="postgresql://user:pass@localhost:5432/mydb",
table_name="sales"
)
Claude Codeでの活用Tips
Claude Codeを使えば、ETLパイプラインの構築は対話的に行える。
- スキーマ推論: 「このCSVの内容を分析して最適なテーブルスキーマを提案して」
- バリデーション生成: 「このデータに必要なバリデーションルールを作って」
- エラーハンドリング: 「このETLにリトライとエラー通知を追加して」
手作業で30分かかるCSV→DB投入が、AIなら5分で完成する。
関連記事
A
Agentive 編集部
AIエージェントを実際に使い倒す個人開発者。サイト制作の自動化を実践しながら、その知見を発信しています。