Agentive
自動化ラボ

AIでデータパイプライン構築 — ETL処理をClaude Codeで自動化

約6分で読めます

AIでデータパイプライン構築

手動でCSVを加工してDBに投入する作業は、エンジニアの時間を最も無駄にする反復作業のひとつだ。AIを使えば、ETL(Extract・Transform・Load)パイプラインを数分で構築できる。

ETLパイプラインの全体像

データソース(CSV/API/Web)

Extract: データ取得・読込

Transform: クレンジング・型変換・正規化

Load: DB投入・ファイル出力

Verify: 件数チェック・整合性検証

Claude Codeに「このCSVをPostgreSQLに取り込むETLスクリプトを書いて」と指示するだけで、スキーマ推論からバリデーションまで含めた実用的なコードが生成される。

Extract: データソースからの取得

CSVファイルの読込

import pandas as pd
from pathlib import Path

def extract_csv(file_path: str, encoding: str = "utf-8") -> pd.DataFrame:
    """CSVファイルの読込。文字コード自動検出付き"""
    try:
        df = pd.read_csv(file_path, encoding=encoding)
    except UnicodeDecodeError:
        # 日本語CSVでよくあるShift_JISフォールバック
        df = pd.read_csv(file_path, encoding="shift_jis")

    print(f"読込完了: {len(df)}行 x {len(df.columns)}列")
    print(f"カラム: {list(df.columns)}")
    return df

Web APIからの取得

import requests

def extract_from_api(url: str, headers: dict = None) -> list[dict]:
    """REST APIからデータを取得。ページネーション対応"""
    all_data = []
    page = 1

    while True:
        response = requests.get(
            url, headers=headers, params={"page": page, "per_page": 100}
        )
        response.raise_for_status()
        data = response.json()

        if not data:
            break

        all_data.extend(data)
        page += 1

    print(f"API取得完了: {len(all_data)}件")
    return all_data

Webスクレイピングによる取得

from playwright.async_api import async_playwright

async def extract_from_web(url: str, selector: str) -> list[dict]:
    """Playwrightでレンダリング済みページからデータ取得"""
    async with async_playwright() as p:
        browser = await p.chromium.launch()
        page = await browser.new_page()
        await page.goto(url, wait_until="networkidle")

        elements = await page.query_selector_all(selector)
        data = []
        for el in elements:
            text = await el.text_content()
            data.append({"text": text.strip()})

        await browser.close()
    return data

Transform: データの変換とクレンジング

基本的なデータクレンジング

def transform_data(df: pd.DataFrame) -> pd.DataFrame:
    """データクレンジングと型変換"""
    # 1. カラム名を正規化(全角→半角、スペース→アンダースコア)
    df.columns = [
        col.strip().replace(" ", "_").replace("\u3000", "_").lower()
        for col in df.columns
    ]

    # 2. 空白行の除去
    df = df.dropna(how="all")

    # 3. 日付カラムの型変換
    date_cols = [c for c in df.columns if "date" in c or "日付" in c]
    for col in date_cols:
        df[col] = pd.to_datetime(df[col], errors="coerce")

    # 4. 数値カラムのクレンジング(カンマ除去)
    for col in df.select_dtypes(include="object").columns:
        try:
            df[col] = df[col].str.replace(",", "").astype(float)
        except (ValueError, AttributeError):
            pass

    # 5. 重複除去
    before = len(df)
    df = df.drop_duplicates()
    print(f"重複除去: {before - len(df)}行削除")

    return df

AIによるデータ分類・補完

Claude APIを使って、テキストデータの自動分類やカテゴリ補完を行う。

import anthropic

client = anthropic.Anthropic()

def ai_classify(texts: list[str], categories: list[str]) -> list[str]:
    """AIでテキストをカテゴリ分類"""
    results = []
    for text in texts:
        response = client.messages.create(
            model="claude-haiku-35-20241022",
            max_tokens=50,
            messages=[{
                "role": "user",
                "content": f"以下のテキストを{categories}のいずれかに分類してください。"
                           f"カテゴリ名のみ回答:\n{text}"
            }]
        )
        results.append(response.content[0].text.strip())
    return results

Load: データベースへの投入

PostgreSQLへのロード

from sqlalchemy import create_engine, text

def load_to_postgres(df: pd.DataFrame, table_name: str, db_url: str):
    """DataFrameをPostgreSQLに投入"""
    engine = create_engine(db_url)

    # テーブルが存在しない場合は自動作成
    df.to_sql(table_name, engine, if_exists="append", index=False)

    # 件数検証
    with engine.connect() as conn:
        count = conn.execute(
            text(f"SELECT COUNT(*) FROM {table_name}")
        ).scalar()
    print(f"DB投入完了: {table_name} = {count}件")

Supabase(PostgreSQL)へのロード

// Node.js: Supabaseへのバッチ投入
const { createClient } = require('@supabase/supabase-js');

const supabase = createClient(
  process.env.SUPABASE_URL,
  process.env.SUPABASE_KEY
);

async function loadToSupabase(data, tableName) {
  const batchSize = 1000;
  let inserted = 0;

  for (let i = 0; i < data.length; i += batchSize) {
    const batch = data.slice(i, i + batchSize);
    const { error } = await supabase
      .from(tableName)
      .insert(batch);

    if (error) throw new Error(`Batch ${i}: ${error.message}`);
    inserted += batch.length;
  }

  console.log(`${tableName}: ${inserted}件投入完了`);
}

Verify: データ整合性の検証

ETL完了後に、データの正確性を自動検証する。

def verify_etl(source_df: pd.DataFrame, table_name: str, engine):
    """ETL結果の整合性チェック"""
    checks = []

    # 件数一致チェック
    with engine.connect() as conn:
        db_count = conn.execute(
            text(f"SELECT COUNT(*) FROM {table_name}")
        ).scalar()
    checks.append({
        "check": "行数一致",
        "source": len(source_df),
        "db": db_count,
        "pass": len(source_df) == db_count
    })

    for c in checks:
        status = "PASS" if c["pass"] else "FAIL"
        print(f"  [{status}] {c['check']}")

    return all(c["pass"] for c in checks)

完全なETLパイプラインの組み立て

上記のコンポーネントを組み合わせた完全なパイプライン。

def run_etl_pipeline(csv_path: str, db_url: str, table_name: str):
    """ETLパイプライン一気通貫実行"""
    print("=== ETL Pipeline Start ===")

    # Extract
    df = extract_csv(csv_path)

    # Transform
    df = transform_data(df)

    # Load
    load_to_postgres(df, table_name, db_url)

    # Verify
    engine = create_engine(db_url)
    success = verify_etl(df, table_name, engine)

    status = "SUCCESS" if success else "FAILED"
    print(f"=== ETL Pipeline {status} ===")
    return success

# 実行
run_etl_pipeline(
    csv_path="data/sales_2026.csv",
    db_url="postgresql://user:pass@localhost:5432/mydb",
    table_name="sales"
)

Claude Codeでの活用Tips

Claude Codeを使えば、ETLパイプラインの構築は対話的に行える。

  • スキーマ推論: 「このCSVの内容を分析して最適なテーブルスキーマを提案して」
  • バリデーション生成: 「このデータに必要なバリデーションルールを作って」
  • エラーハンドリング: 「このETLにリトライとエラー通知を追加して」

手作業で30分かかるCSV→DB投入が、AIなら5分で完成する。

関連記事

A

Agentive 編集部

AIエージェントを実際に使い倒す個人開発者。サイト制作の自動化を実践しながら、その知見を発信しています。