Agentive
自動化ラボ

AI通知システムの構築 — Discord/Slack/メールへの自動アラート

約5分で読めます

AI通知システムの構築

AI処理の結果をリアルタイムで通知する仕組みは、自動化パイプラインの「目」と「口」にあたる。処理が完了しても通知がなければ気づけない。エラーが発生しても通知がなければ放置される。Discord Webhook、Slack Bot、SMTPメールの3チャネルで通知システムを構築する。

通知システムの設計原則

  • 重要度で分類: INFO/WARNING/ERROR/CRITICALの4段階
  • チャネル分離: 重要度によって通知先を分ける
  • レート制限: 同じ通知の連続送信を防ぐ
from enum import Enum
from dataclasses import dataclass
from datetime import datetime

class Severity(Enum):
    INFO = "info"
    WARNING = "warning"
    ERROR = "error"
    CRITICAL = "critical"

Discord Webhook通知

基本的なWebhook送信

import urllib.request
import json

class DiscordNotifier:
    COLORS = {
        Severity.INFO: 0x3498DB,
        Severity.WARNING: 0xF39C12,
        Severity.ERROR: 0xE74C3C,
        Severity.CRITICAL: 0x8E44AD,
    }

    def __init__(self, webhook_url: str):
        self.webhook_url = webhook_url

    def send(self, notification: Notification):
        payload = json.dumps({
            "embeds": [{
                "title": notification.title,
                "description": notification.message,
                "color": self.COLORS[notification.severity],
                "timestamp": notification.timestamp.isoformat(),
                "footer": {"text": notification.source}
            }]
        }).encode()
        req = urllib.request.Request(
            self.webhook_url, data=payload,
            headers={"Content-Type": "application/json"}
        )
        urllib.request.urlopen(req)

ファイル添付付き通知

def send_with_file(webhook_url, message, file_path):
    boundary = "----FormBoundary7MA4YWxk"
    file_data = open(file_path, "rb").read()
    file_name = file_path.split("/")[-1]
    header_part = (
        "--" + boundary + "\r\n"
        + "Content-Disposition: form-data; name=\"content\"\r\n\r\n"
        + message + "\r\n--" + boundary + "--\r\n"
    )
    body = header_part.encode() + file_data
    req = urllib.request.Request(webhook_url, data=body,
        headers={"Content-Type": "multipart/form-data; boundary=" + boundary})
    urllib.request.urlopen(req)

Slack Bot通知

Slack Webhook通知

class SlackNotifier:
    EMOJI = {
        Severity.INFO: ":information_source:",
        Severity.WARNING: ":warning:",
        Severity.ERROR: ":x:",
        Severity.CRITICAL: ":rotating_light:",
    }

    def __init__(self, webhook_url):
        self.webhook_url = webhook_url

    def send(self, notification):
        emoji = self.EMOJI[notification.severity]
        payload = json.dumps({
            "blocks": [
                {"type": "header",
                 "text": {"type": "plain_text",
                          "text": emoji + " " + notification.title}},
                {"type": "section",
                 "text": {"type": "mrkdwn", "text": notification.message}},
                {"type": "context",
                 "elements": [{"type": "mrkdwn",
                               "text": "Source: " + notification.source}]}
            ]
        }).encode()
        req = urllib.request.Request(self.webhook_url, data=payload,
            headers={"Content-Type": "application/json"})
        urllib.request.urlopen(req)

Node.jsでのSlack通知

const https = require("https");

function sendSlackNotification(webhookUrl, title, message, severity) {
  const colorMap = {
    info: "#3498db", warning: "#f39c12",
    error: "#e74c3c", critical: "#8e44ad"
  };
  const payload = JSON.stringify({
    attachments: [{
      color: colorMap[severity] || "#3498db",
      blocks: [
        { type: "header", text: { type: "plain_text", text: title } },
        { type: "section", text: { type: "mrkdwn", text: message } }
      ]
    }]
  });
  const url = new URL(webhookUrl);
  const req = https.request({
    hostname: url.hostname, path: url.pathname,
    method: "POST", headers: { "Content-Type": "application/json" }
  });
  req.write(payload);
  req.end();
}

メール通知(SMTP)

HTML形式のメール通知

import smtplib
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart

class EmailNotifier:
    def __init__(self, smtp_host, smtp_port, username, password):
        self.smtp_host = smtp_host
        self.smtp_port = smtp_port
        self.username = username
        self.password = password

    def send(self, notification, recipients):
        msg = MIMEMultipart("alternative")
        severity_label = notification.severity.value.upper()
        msg["Subject"] = "[" + severity_label + "] " + notification.title
        msg["From"] = self.username
        msg["To"] = ", ".join(recipients)
        html = ("<html><body><h2>" + notification.title + "</h2>"
                + "<p>" + notification.message + "</p></body></html>")
        msg.attach(MIMEText(html, "html"))
        with smtplib.SMTP(self.smtp_host, self.smtp_port) as server:
            server.starttls()
            server.login(self.username, self.password)
            server.sendmail(self.username, recipients, msg.as_string())

統合通知マネージャー

重要度に応じて複数チャネルに同時通知する統合マネージャー。

class NotificationManager:
    def __init__(self):
        self.channels = {}
        self.rules = {}

    def add_channel(self, name, notifier):
        self.channels[name] = notifier

    def add_rule(self, severity, channel_names):
        self.rules[severity] = channel_names

    def notify(self, notification):
        channels = self.rules.get(notification.severity, [])
        for ch_name in channels:
            try:
                self.channels[ch_name].send(notification)
            except Exception as e:
                print("通知失敗: " + ch_name)

# 使用例
manager = NotificationManager()
manager.add_channel("discord", DiscordNotifier(DISCORD_WEBHOOK))
manager.add_channel("slack", SlackNotifier(SLACK_WEBHOOK))
manager.add_rule(Severity.INFO, ["discord"])
manager.add_rule(Severity.WARNING, ["discord"])
manager.add_rule(Severity.ERROR, ["discord", "slack"])
manager.add_rule(Severity.CRITICAL, ["discord", "slack"])

レート制限の実装

同一内容の通知が連続送信されるのを防ぐ。

from collections import defaultdict
import hashlib

class RateLimiter:
    def __init__(self, cooldown_seconds=300):
        self.cooldown = cooldown_seconds
        self.last_sent = defaultdict(lambda: datetime.min)

    def should_send(self, notification):
        key_str = notification.title + ":" + notification.severity.value
        key = hashlib.md5(key_str.encode()).hexdigest()
        elapsed = (datetime.now() - self.last_sent[key]).total_seconds()
        if elapsed < self.cooldown:
            return False
        self.last_sent[key] = datetime.now()
        return True

AI処理との統合例

async def run_ai_task_with_notification(task_name, task_func):
    try:
        result = await task_func()
        manager.notify(Notification(
            title=task_name + " 完了",
            message="処理OK",
            severity=Severity.INFO))
    except Exception as e:
        manager.notify(Notification(
            title=task_name + " エラー",
            message="エラー: " + str(e),
            severity=Severity.ERROR))
        raise

関連記事

A

Agentive 編集部

AIエージェントを実際に使い倒す個人開発者。サイト制作の自動化を実践しながら、その知見を発信しています。