0
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

MySQLとDiscord Botを連携させる

Posted at

はじめに

自分はこの記事のようにこれまでDiscord Botを色々と作成してきました。

Discord Botを用いると色々なことができますが、自分は以下のようなことをやってきました。

  • YouTubeで新規投稿があったら通知する
  • あるサイトで新しい記事ができたら通知する
  • サーバメンバーの欲しいものリストを管理する
  • サークルの予定を通知する

これらのことを実現するために、最初はBot単体でどうにかやっていたのですが、運用しているうちにBotだけではしんどい場面がかなり多くなってきました。特にデータの管理が厳しいなと思っていました。

そこで、データベースと連携されることを思いつき、それを実行してみました。すると格段に動作が良くなったうえに安定性を得ることもできました。

しかし、そこまでの過程はかなり難しく、トライ&エラーの連続でした。この記事では、Discord BotとMySQLを連携し、いい感じになるまで試行錯誤した記録を残そうと思います。

下準備

実際に連携させていく過程を紹介するわけですが、これを参考に何かを作る場合は事前にDiscord Bot及びMySQLの準備を行っておきましょう。

以下のことができればOKです。

  • Discord Botを何かしらのDiscordサーバ上で動かせる
    • トークンを取得
    • 権限の付与
  • MySQLがインストールされており、アクセスが可能

例として用いるサービス

今回はこれを例とします。このDiscord BotはMARVEL公式サイトをスクレイピングし、何かしら新しいニュースが来たらそのリンクをサーバに送るという仕組みになっています。

この「新しいニュース」ということを「まだ通知していないニュース」という意味に読み替え、さらにそれを「データベース上に未登録のニュース」ということとしています。

すなわち、以下のような仕組みで動いているわけです。

  1. トップページからニュースへのリンクを取得
  2. 各リンクがデータベースにあるか調べる
  3. 無いリンクを通知、データベースに新規登録
  4. 時間をおいて1に戻る

2でSELECT文、3でINSERT文が使われるわけです。

ここからは各ファイルの説明をしていきます。基本的には冒頭の記事の通りDockerを用いてBotをホスティングする形となっていますが、上記の記事から内容がアップデートされているのでご注意ください。

使うテーブルのDDL

今回用いるテーブルのDDL(データベースの定義)は以下となります。

CREATE TABLE `sent_urls` (
  `id` int NOT NULL AUTO_INCREMENT,
  `url` varchar(255) DEFAULT NULL,
  `title` varchar(255) DEFAULT NULL,
  `category` varchar(255) DEFAULT NULL,
  `service` varchar(255) DEFAULT NULL,
  `created_at` timestamp NULL DEFAULT CURRENT_TIMESTAMP,
  `updated_at` timestamp NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=1402 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci

送信したURL、ページタイトル、カテゴリ、どのサービスに関連するものなのか(複数サービスが同時に走っているので区別のため)などが情報として保存されます。

Dockerfile

FROM python:3.13.5-alpine3.22
WORKDIR /usr/src/app
COPY requirements.txt ./
RUN pip install --no-cache-dir -r requirements.txt

このファイルはあまり説明することはないと思います。今回Pythonは3.13.5を利用しています。

ワーキングディレクトリを指定し、必要なライブラリをインストールするという感じです。

docker-compose.yaml

version: '3.8'
services:
  bot:
    build: .
    volumes:
      - ./bot:/usr/src/app:rw
    restart: unless-stopped
    env_file:
      - .env
    environment:
      - TZ=Asia/Tokyo
    command: ["python3", "friday.py"]
    tty: true

これも以前の記事で説明した通りですが、環境変数をいちいち書いて指定するのではなく、env_file.envというファイルが環境変数設定ファイルだよ、という指示をしています。これによって、環境変数を変えた場合でもdocker-compose.yamlを変更しなくて良くなります。

.env

TOKEN=""
DISCORD_CHANNEL_ID=""
DB_HOST="xxx.xxx.xxx.xxx"
DB_USERNAME=""
DB_PASSWORD=""
DB_NAME=""

ここで環境変数を設定します。

Bot用のトークン、運用するチャンネルのIDは以前説明した通りですが、新たに以下のものが加わります。

  • データベースがあるところのIPアドレス
  • データベースのユーザネーム
  • データベースのパスワード
  • データベース名

こういった接続情報も秘匿するべきなので、ここに記載します。

friday.py

さて、メインプログラムは以下の通りです。

import asyncio
import os
import time
import traceback
import requests
import discord
from discord.ext import commands
from bs4 import BeautifulSoup
import mysql.connector

# Discordの接続設定
TOKEN = os.getenv("TOKEN")
DISCORD_CHANNEL_ID = int(os.environ.get("DISCORD_CHANNEL_ID"))
intent = discord.Intents.default()
intent.message_content = True
client = commands.Bot(command_prefix="-", intents=intent)
task = None
target_url = "https://marvel.disney.co.jp/news"


# MySQLへの接続
def get_connection():
    return mysql.connector.connect(
        host=os.getenv("DB_HOST"),
        user=os.getenv("DB_USER"),
        password=os.getenv("DB_PASSWORD"),
        database=os.getenv("DB_NAME"),
    )


async def run_sql(sql: str, params: tuple):
    conn = get_connection()
    cursor = conn.cursor(buffered=True)
    if params != ():
        cursor.execute(sql, params)
    else:
        cursor.execute(sql)
    if sql.strip().upper().startswith("SELECT"):
        result = cursor.fetchall()
        cursor.close()
        conn.close()
        return result
    else:
        conn.commit()
        cursor.close()
        conn.close()
        return


async def get_new_articles():
    try:
        time.sleep(1)
        response = requests.get(target_url)
        soup = BeautifulSoup(response.text, "html.parser")
        targets = soup.find_all("div", class_="text-content")
        new_articles = []
        for target in targets:
            new_articles.append(target.find("a").get("href"))
        return new_articles
    except Exception as e:
        print(e)
        return "ERROR"


async def get_article_title(url):
    try:
        time.sleep(1)
        response = requests.get(url)
        soup = BeautifulSoup(response.text, "html.parser")
        title = soup.find("title").text.strip()
        return title
    except Exception as e:
        print(e)
        return "ERROR"


async def send_new_article(new_articles):
    channel = client.get_channel(DISCORD_CHANNEL_ID)
    sent_urls = await run_sql(
        "SELECT url FROM sent_urls WHERE service = 'FRIDAY'",
        (),
    )
    for i in range(len(sent_urls)):
        if type(sent_urls[i]) is tuple:
            sent_urls[i] = sent_urls[i][0]
    for article in new_articles:
        if article not in sent_urls:
            await channel.send(article)
            while True:
                title = await get_article_title(article)
                if title != "ERROR":
                    break
            await run_sql(
                "INSERT INTO sent_urls (url, title, category, service) VALUES (%s,  %s, %s, %s)",
                (article, title, "new_article", "FRIDAY"),
            )


async def main():
    while True:
        try:
            new_articles = await get_new_articles()
            if new_articles != "ERROR":
                await send_new_article(new_articles)
        except Exception as e:
            print(f"Error: {e}")
            traceback.print_exc()
        await asyncio.sleep(60)


@client.command()
async def test(ctx):
    if ctx.channel.id == DISCORD_CHANNEL_ID:
        await ctx.send("F.R.I.D.A.Y. is working!")


@client.event
async def on_ready():
    global task
    print("F.R.I.D.A.Y. is ready!")
    if task is None or task.done():
        task = asyncio.create_task(main())


client.run(TOKEN)

ここからは各関数ごとに説明をしていきます。

ライブラリ

import asyncio
import os
import time
import traceback
import requests
import discord
from discord.ext import commands
from bs4 import BeautifulSoup
import mysql.connector

MySQLとの連携のために、新たにmysql.connectorが必要となります。

requirements.txtは以下のようになります。スクレイピング関連ライブラリを入れることも忘れないようにします。

py-cord
discord.py
beautifulsoup4
requests
mysql-connector-python

get_connection

def get_connection():
    return mysql.connector.connect(
        host=os.getenv("DB_HOST"),
        user=os.getenv("DB_USER"),
        password=os.getenv("DB_PASSWORD"),
        database=os.getenv("DB_NAME"),
    )

MySQLへ実際に接続する部分です。

接続時、この関数を実行して接続状況を受け取るわけです。

はじめはBot起動時にMySQLサーバとのコネクションを確立し、それを持ち続ける方法を取っていましたが、それだと一定時間経つとコネクションが切れ、Botが停止してしまうという問題が起きました。

というわけで、SQLを実行するときだけコネクションを確立し、終わったら切断するという形にしています。

run_sql

async def run_sql(sql: str, params: tuple):
    conn = get_connection()
    cursor = conn.cursor(buffered=True)
    if params != ():
        cursor.execute(sql, params)
    else:
        cursor.execute(sql)
    if sql.strip().upper().startswith("SELECT"):
        result = cursor.fetchall()
        cursor.close()
        conn.close()
        return result
    else:
        conn.commit()
        cursor.close()
        conn.close()
        return

SQL文を受け取り、それを実行する部分です。

connというのが接続情報で、cursorというのはデータベースを操作するカーソルとなります。

そして、パラメータの有無を確認してSQLを実行し、SQLがSELECTならその結果を返し、それ以外ならコミットしてデータベースを更新するということをします。

それが終わったらカーソル及び接続を切ります。

スクレイピング部分

async def get_new_articles():
    try:
        time.sleep(1)
        response = requests.get(target_url)
        soup = BeautifulSoup(response.text, "html.parser")
        targets = soup.find_all("div", class_="text-content")
        new_articles = []
        for target in targets:
            new_articles.append(target.find("a").get("href"))
        return new_articles
    except Exception as e:
        print(e)
        return "ERROR"


async def get_article_title(url):
    try:
        time.sleep(1)
        response = requests.get(url)
        soup = BeautifulSoup(response.text, "html.parser")
        title = soup.find("title").text.strip()
        return title
    except Exception as e:
        print(e)
        return "ERROR"

サイトに対してスクレイピングを行い、情報を得る部分です。

ニュース一覧ページ内に掲載されているニュース及び各ニュースのタイトルを取得しています。

本題から逸れるので、詳しい説明は省きます。

send_new_article

async def send_new_article(new_articles):
    channel = client.get_channel(DISCORD_CHANNEL_ID)
    sent_urls = await run_sql(
        "SELECT url FROM sent_urls WHERE service = 'FRIDAY'",
        (),
    )
    for i in range(len(sent_urls)):
        if type(sent_urls[i]) is tuple:
            sent_urls[i] = sent_urls[i][0]
    for article in new_articles:
        if article not in sent_urls:
            await channel.send(article)
            while True:
                title = await get_article_title(article)
                if title != "ERROR":
                    break
            await run_sql(
                "INSERT INTO sent_urls (url, title, category, service) VALUES (%s,  %s, %s, %s)",
                (article, title, "new_article", "FRIDAY"),
            )

過去保存したURLをSELECTですべて持って来て、スクレイピングした記事1つ1つがそこに含まれているかを確認します。もしなかったら送信して新規登録(INSERT)します。

いちいち過去の履歴全てを参照しているのであまり効率が良くないかもしれません。要改良ですね。

メインループ関連

task = None


async def main():
    while True:
        try:
            new_articles = await get_new_articles()
            if new_articles != "ERROR":
                await send_new_article(new_articles)
        except Exception as e:
            print(f"Error: {e}")
            traceback.print_exc()
        await asyncio.sleep(60)


@client.command()
async def test(ctx):
    if ctx.channel.id == DISCORD_CHANNEL_ID:
        await ctx.send("F.R.I.D.A.Y. is working!")


@client.event
async def on_ready():
    global task
    print("F.R.I.D.A.Y. is ready!")
    if task is None or task.done():
        task = asyncio.create_task(main())


client.run(TOKEN)

main関数で、60秒に一度記事一覧をスクレイピングして新規記事があるかのチェックを繰り返す処理をしています。

また、try構文を取り入れることにより、エラーをキャッチして急に停止しないようにしています。たまに接続が一時的に切れる(電波の乱れとか)ことがあるので、これをしないと何時の間にか止まっている、ということになってしまいます。

加えて、エラー原因がログからわかるようにtracebackを入れてエラー原因を出力するようにしています。

今度詳しく記事に書く予定ですが、常設して無限ループを回すタイプのBotはなぜか二重起動し、同じ動作を二回連続で行ってしまうことがあります(二回同じものをチャンネルに送信するなど)。原因はおそらくBotとネットワークとの接続が切れ、再接続した際にon_ready関数が再度実行され無限ループが複製されてしまうことと考えられます。

これを防止するために、グローバル変数taskを用意し、Bot起動時にtask = asyncio.create_task(main())でタスクを作ります。こうしておくと、Botのプログラム自体が止まらない限り再度on_readyが実行されても無限ループ(main関数)が複製されることを防げます。

おわりに

Discord BotとMySQLを上手く連携させると、個人的にかなり「それっぽい」システムが出来上がります。

Discord Botを作っては見たものの何か物足りない、と感じたらMySQLなどのデータベースとの連携を考えてみてはいかがでしょうか。

それではまた。

0
1
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?