AI通知システムの構築 — Discord/Slack/メールへの自動アラート
約5分で読めます
AI通知システムの構築
AI処理の結果をリアルタイムで通知する仕組みは、自動化パイプラインの「目」と「口」にあたる。処理が完了しても通知がなければ気づけない。エラーが発生しても通知がなければ放置される。Discord Webhook、Slack Bot、SMTPメールの3チャネルで通知システムを構築する。
通知システムの設計原則
- 重要度で分類: INFO/WARNING/ERROR/CRITICALの4段階
- チャネル分離: 重要度によって通知先を分ける
- レート制限: 同じ通知の連続送信を防ぐ
from enum import Enum
from dataclasses import dataclass
from datetime import datetime
class Severity(Enum):
INFO = "info"
WARNING = "warning"
ERROR = "error"
CRITICAL = "critical"
Discord Webhook通知
基本的なWebhook送信
import urllib.request
import json
class DiscordNotifier:
COLORS = {
Severity.INFO: 0x3498DB,
Severity.WARNING: 0xF39C12,
Severity.ERROR: 0xE74C3C,
Severity.CRITICAL: 0x8E44AD,
}
def __init__(self, webhook_url: str):
self.webhook_url = webhook_url
def send(self, notification: Notification):
payload = json.dumps({
"embeds": [{
"title": notification.title,
"description": notification.message,
"color": self.COLORS[notification.severity],
"timestamp": notification.timestamp.isoformat(),
"footer": {"text": notification.source}
}]
}).encode()
req = urllib.request.Request(
self.webhook_url, data=payload,
headers={"Content-Type": "application/json"}
)
urllib.request.urlopen(req)
ファイル添付付き通知
def send_with_file(webhook_url, message, file_path):
boundary = "----FormBoundary7MA4YWxk"
file_data = open(file_path, "rb").read()
file_name = file_path.split("/")[-1]
header_part = (
"--" + boundary + "\r\n"
+ "Content-Disposition: form-data; name=\"content\"\r\n\r\n"
+ message + "\r\n--" + boundary + "--\r\n"
)
body = header_part.encode() + file_data
req = urllib.request.Request(webhook_url, data=body,
headers={"Content-Type": "multipart/form-data; boundary=" + boundary})
urllib.request.urlopen(req)
Slack Bot通知
Slack Webhook通知
class SlackNotifier:
EMOJI = {
Severity.INFO: ":information_source:",
Severity.WARNING: ":warning:",
Severity.ERROR: ":x:",
Severity.CRITICAL: ":rotating_light:",
}
def __init__(self, webhook_url):
self.webhook_url = webhook_url
def send(self, notification):
emoji = self.EMOJI[notification.severity]
payload = json.dumps({
"blocks": [
{"type": "header",
"text": {"type": "plain_text",
"text": emoji + " " + notification.title}},
{"type": "section",
"text": {"type": "mrkdwn", "text": notification.message}},
{"type": "context",
"elements": [{"type": "mrkdwn",
"text": "Source: " + notification.source}]}
]
}).encode()
req = urllib.request.Request(self.webhook_url, data=payload,
headers={"Content-Type": "application/json"})
urllib.request.urlopen(req)
Node.jsでのSlack通知
const https = require("https");
function sendSlackNotification(webhookUrl, title, message, severity) {
const colorMap = {
info: "#3498db", warning: "#f39c12",
error: "#e74c3c", critical: "#8e44ad"
};
const payload = JSON.stringify({
attachments: [{
color: colorMap[severity] || "#3498db",
blocks: [
{ type: "header", text: { type: "plain_text", text: title } },
{ type: "section", text: { type: "mrkdwn", text: message } }
]
}]
});
const url = new URL(webhookUrl);
const req = https.request({
hostname: url.hostname, path: url.pathname,
method: "POST", headers: { "Content-Type": "application/json" }
});
req.write(payload);
req.end();
}
メール通知(SMTP)
HTML形式のメール通知
import smtplib
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
class EmailNotifier:
def __init__(self, smtp_host, smtp_port, username, password):
self.smtp_host = smtp_host
self.smtp_port = smtp_port
self.username = username
self.password = password
def send(self, notification, recipients):
msg = MIMEMultipart("alternative")
severity_label = notification.severity.value.upper()
msg["Subject"] = "[" + severity_label + "] " + notification.title
msg["From"] = self.username
msg["To"] = ", ".join(recipients)
html = ("<html><body><h2>" + notification.title + "</h2>"
+ "<p>" + notification.message + "</p></body></html>")
msg.attach(MIMEText(html, "html"))
with smtplib.SMTP(self.smtp_host, self.smtp_port) as server:
server.starttls()
server.login(self.username, self.password)
server.sendmail(self.username, recipients, msg.as_string())
統合通知マネージャー
重要度に応じて複数チャネルに同時通知する統合マネージャー。
class NotificationManager:
def __init__(self):
self.channels = {}
self.rules = {}
def add_channel(self, name, notifier):
self.channels[name] = notifier
def add_rule(self, severity, channel_names):
self.rules[severity] = channel_names
def notify(self, notification):
channels = self.rules.get(notification.severity, [])
for ch_name in channels:
try:
self.channels[ch_name].send(notification)
except Exception as e:
print("通知失敗: " + ch_name)
# 使用例
manager = NotificationManager()
manager.add_channel("discord", DiscordNotifier(DISCORD_WEBHOOK))
manager.add_channel("slack", SlackNotifier(SLACK_WEBHOOK))
manager.add_rule(Severity.INFO, ["discord"])
manager.add_rule(Severity.WARNING, ["discord"])
manager.add_rule(Severity.ERROR, ["discord", "slack"])
manager.add_rule(Severity.CRITICAL, ["discord", "slack"])
レート制限の実装
同一内容の通知が連続送信されるのを防ぐ。
from collections import defaultdict
import hashlib
class RateLimiter:
def __init__(self, cooldown_seconds=300):
self.cooldown = cooldown_seconds
self.last_sent = defaultdict(lambda: datetime.min)
def should_send(self, notification):
key_str = notification.title + ":" + notification.severity.value
key = hashlib.md5(key_str.encode()).hexdigest()
elapsed = (datetime.now() - self.last_sent[key]).total_seconds()
if elapsed < self.cooldown:
return False
self.last_sent[key] = datetime.now()
return True
AI処理との統合例
async def run_ai_task_with_notification(task_name, task_func):
try:
result = await task_func()
manager.notify(Notification(
title=task_name + " 完了",
message="処理OK",
severity=Severity.INFO))
except Exception as e:
manager.notify(Notification(
title=task_name + " エラー",
message="エラー: " + str(e),
severity=Severity.ERROR))
raise
関連記事
A
Agentive 編集部
AIエージェントを実際に使い倒す個人開発者。サイト制作の自動化を実践しながら、その知見を発信しています。