LoginSignup
4
2

More than 1 year has passed since last update.

はじめに

こんにちは。マグロです。
今回はDiscordログインの勉強がてら、簡易的なBot作成をします。

背景

僕の代表作であるDiscordとLINEの連携Botなのですが、実は今年の3月ごろにWebUIでカスタマイズできる機能を追加していました。
その際にDiscordログインを使用してユーザーを識別させていたので、メモ程度にBotを作ろうというのがきっかけです。

環境

Python 3.10.7
Windows11

本稿ではWindowsで仮想環境を立てて実行しますが、デプロイ用にDockerfileも書きます。

使うもの

Docker
PostgreSQL
FastAPI
Pycord
asyncpg

Postgresはローカルのものでなく、supabaseのpostgresに接続します。

設計

フロント、サーバー共にFastAPIで実装します。
並列実行でBotを動かします。

OAuth設定とBot作成

Discord Developer Portalにアクセスします。

右上の「New Application(新しいアプリケーション)」をクリックします。
image.png

アプリケーションに名前を付けます。
image.png

名前を入力したら、「Create(作成)」ボタンをクリックします。

作成したら、アプリ一覧から作成したアプリの「Bot」に移行し、「Add Bot」をクリックします。

image.png

Nameとiconを好きなように設定しましょう。

image.png

Intentsは公開Botにするわけではないのですべて有効にしてもいいでしょう。
image.png

左側のメニューから「OAuth2」を選択します。
image.png

「OAuth2 General」に移動し、「Redirects(リダイレクト)」に、リダイレクト先のURLを入力します。

ローカルで実行するため「 http://localhost:5000/discord-callback/ 」とURLを指定します。
image.png

また、CLIENT IDCLIENT SECRETはここに表示されます。
後あと使うので控えておきましょう。

「OAuth2 URL Generator」に移動します。

「Scopes(スコープ)」セクションでは、アプリケーションが要求する権限を選択します。
まずはBotをサーバに追加するため、「bot」のみを選択します。

image.png

下の権限では「administrator(管理者)」を選択します。
必要に応じて権限を変えてもいいです。
下に表示されるURLに遷移し、参加させるサーバを選択して招待しましょう。
image.png

招待したら、一度スコープをリセットして、「identify,guilds,guilds.members.read」スコープを選択します。
image.png
SELECT REDIRECT URLに先ほど設定したcallback urlを指定し、その下に表示されているOAuth URLをコピーして控えておいてください。

データベースの用意

今回はPostgreSQLを使用します。
クラウド上のものを使用します。
herokuやrailwayなどが候補として挙げられますが、無料で使えるsupabaseを使用します。

登録がまだの人は登録しておきましょう。
GitHubログインをお勧めします。

ログインしたら/dashboard/projectsに遷移し、New Projectから新しいプロジェクトを作ります。

Name,Database,Password,Regionを好きなように設定してください。
作成したら、SettingのDatabaseに遷移して

  • Host
  • Database Name
  • Port
  • User
  • Password

を控えておいてください。
またPasswordはプロジェクト作成時に設定したものと同様になります。

image.png

ディレクトリ構造

$ tree
.
├── base
│   ├── aio_req.py   # 非同期での操作
│   ├── database.py  # データベース操作
│   └── guild_permission.py
├── cogs
│   ├── bin
│   │   └── activity.py
│   └── vc_count.py  # 入退室管理
├── core
│   ├── db_pickle.py   # データベースのキャッシュ
│   └── start.py     # DiscordBot起動用
├── routers
│   ├── api
│   │   ├── check  
│   │   │   └── post_user_check.py
│   │   ├── admin_success.py
│   │   └── vc_signal_success.py
│   ├── guild
│   │   ├── admin     # 管理者ページ
│   │   │   └── admin.py
│   │   ├── vc_signal     # 入退室管理ページ
│   │   │   └── vc_signal.py
│   │   └── guild.py
│   ├── session_base
│   │   └── user_session.py
│   ├── callback.py     # callbackの処理
│   ├── guilds.py
│   ├── index.py     # indexページの処理
│   ├── login.py   # 非同期での操作
│   └── logout.py
├── templates
│   ├── api
│   │   ├── admin_success.html
│   │   └── vc_signal_success.html
│   ├── guild
│   │   ├── admin
│   │   │   └── admin.html
│   │   ├── vc_signal
│   │   │   └── vc_signal.html
│   │   └── guild.py
│   ├── static
│   │   ├── img
│   │   │   └── discord_icon.jpg
│   │   └── js
│   │       ├── box_allcheck.js
│   │       ├── popover.js
│   │       ├── select_id.js
│   │       └── selects_bar.js
│   ├── guilds.html     
│   ├── index.html     # indexページ
│   └── layout.html
├── .env
├── .gitignore
├── main.py  
├── server_router.py  # fastapiのルータ
├── Dockerfile
└── requirements.txt  

ライブラリインストール

requirements.txtに以下のように書き込みます。

requirements.txt
aiohttp==3.7.4.post0
aiofiles==22.1.0
anyio==3.5.0
asyncpg==0.27.0
discord.py==1.7.3
fastapi==0.88.0
gunicorn==20.1.0
itsdangerous==2.1.0
Jinja2==3.0.3
python-dotenv==0.21.0
python-multipart==0.0.5
pydantic==1.9.1
requests==2.26.0
starlette==0.22.0
uvicorn==0.20.0

仮想環境作成

仮想環境を作成します。以下のコマンドを打ち込んでください。

python -m venv venv

作成したら、仮想環境に遷移します。

.\venv\Scripts\activate

requirements.txtに書き込んだライブラリをインストールしましょう。

pip install -r requirements.txt

加えて今回はdiscord.pyの派生ライブラリのpycordを使用します。

pip install git+https://github.com/Pycord-Development/pycord

も実行しておいてください。

Dockerfile

Windowsの仮想環境を想定しているので必要ありませんが、デプロイを想定して作成します。

Dockerfile
FROM python:3.10.7
USER root

RUN apt-get -y update && apt-get -y install locales && apt-get -y upgrade && \
    localedef -f UTF-8 -i ja_JP ja_JP.UTF-8
ENV LANG ja_JP.UTF-8
ENV LANGUAGE ja_JP:ja
ENV LC_ALL ja_JP.UTF-8
ENV TZ JST-9
ENV TERM xterm

# ./root/src ディレクトリを作成 ホームのファイルをコピーして、移動
RUN mkdir -p /root/src
COPY . /root/src
WORKDIR /root/src

# Docker内で扱うffmpegをインストール
RUN apt-get install -y ffmpeg

# pipのアップグレード、requirements.txtから必要なライブラリをインストール
RUN pip install --upgrade pip
RUN pip install --upgrade setuptools
RUN pip install -r requirements.txt
# discord.pyをpycordにアップグレード
RUN pip install git+https://github.com/Pycord-Development/pycord

.gitignore

GitHubにデプロイする際に、デプロイしないファイルやフォルダを設定します。

.gitignore
.env
venv
__pycache__/
*.pickle

環境変数

環境変数を構成します。

.env
DISCORD_BOT_TOKEN=
DISCORD_CALLBACK_URL=http://localhost:5000/discord-callback/
DISCORD_CLIENT_ID=
DISCORD_CLIENT_SECRET=
DISCORD_SCOPE=identify%20guilds%20guilds.members.read
PGDATABASE=
PGHOST=
PGPASSWORD=
PGUSER=
PORTS=5000
MIDDLE_KEY=
  • DISCORD_BOT_TOKEN
    DiscordBotのトークン。

  • DISCORD_CALLBACK_URL
    Discordログインの認証時に遷移するURL、Developerサイトに登録したものと同じ。
    (本項ではhttp://localhost:5000/discord-callback/)

  • DISCORD_CLIENT_ID
    DiscordアプリのID、BotのユーザIDと同じ。

  • DISCORD_CLIENT_SECRET
    Discordのシークレットキー、認証時に使用する。Developerサイトから発行したもの。

  • DISCORD_SCOPE
    Discordアプリに許可する権限一覧。.envの権限通りにすること。

  • PGDATABASE
    PostgreSQLのデータベース名。

  • PGHOST
    データベースのホスト名。

  • PGPASSWORD
    データベースのパスワード。

  • PGPORT
    データベースのポート番号。

  • PGUSER
    データベースのユーザー名。

  • PORTS
    サーバ立ち上げ時に使用するポート番号。
    本項では5000で起動する。

  • MIDDLE_KEY
    セッションに認証情報を保存する際の暗号キー。
    好きなように設定する。(some-randam-codeなど)

コーデイング

main.py

main.py
from core.start import DBot
import discord
import os

from dotenv import load_dotenv
load_dotenv()

from server_router import keep_alive

# サーバー立ち上げ
keep_alive()

Token = os.environ['DISCORD_BOT_TOKEN']

# Bot立ち上げ
DBot(
    token=Token,
    intents=discord.Intents.all()
).run()

server_router.py

server_router.py
from fastapi import FastAPI
from fastapi.staticfiles import StaticFiles
from fastapi.templating import Jinja2Templates

from starlette.middleware.sessions import SessionMiddleware
from fastapi.middleware.cors import CORSMiddleware

from threading import Thread
import uvicorn
import os

from dotenv import load_dotenv
load_dotenv()

from routers import (
    index,
    login,
    callback,
    guilds,
    logout
)
from routers.guild import guild

from routers.guild.vc_signal import vc_signal
from routers.guild.admin import admin

from routers.api import (
    vc_signal_success,
    admin_success
)

app = FastAPI(
    docs_url=None, 
    redoc_url=None, 
    openapi_url=None,
    title='FastAPIを利用したDiscordログイン',
    description='OAuth2を利用してユーザー情報を取得するトークンを発行します。',
    version='0.9 beta'
)

callback_url = os.environ.get('DISCORD_CALLBACK_URL').replace('/callback/','')

origins = [
    callback_url
]

# new テンプレート関連の設定 (jinja2)
templates = Jinja2Templates(directory="templates")
jinja_env = templates.env  #Jinja2.Environment : filterやglobalの設定用

# templates/static以下のファイルを静的に扱えるようにする
app.mount("/static", StaticFiles(directory="templates/static"), name="static")

# session使用
app.add_middleware(SessionMiddleware, secret_key=os.environ.get('MIDDLE_KEY'))
# オリジン間のリソースを共有
app.add_middleware(
    CORSMiddleware, 
    allow_origins=origins,
    allow_credentials=True,
    allow_methods=["*"],
    allow_headers=["*"],
)

# 各パス
app.include_router(index.router)
app.include_router(login.router)
app.include_router(callback.router)
app.include_router(guild.router)
app.include_router(guilds.router)
app.include_router(logout.router)
app.include_router(vc_signal.router)
app.include_router(vc_signal_success.router)
app.include_router(admin.router)
app.include_router(admin_success.router)

# ローカル実行
def local_run():
    uvicorn.run(
        app,
        host='localhost',  
        port=int(os.getenv("PORTS", default=5000)), 
        log_level="info"
    )

# 本番環境
def run():
    uvicorn.run(
        "server_router:app",
        host="0.0.0.0", 
        port=int(os.getenv("PORT", default=8080)), 
        log_level="info"
    )

# DiscordBotと並列で立ち上げる
def keep_alive():
    if os.environ.get("PORTS") != None:
        t = Thread(target=local_run)
    else:
        t = Thread(target=run)
    t.setDaemon(True)
    t.start()

いくつか解説

# templates/static以下のファイルを静的に扱えるようにする
app.mount("/static", StaticFiles(directory="templates/static"), name="static")

こうすることでtemplates/static内のファイルを静的に扱えるようになります。
ここにcssやjsを置くと、/static/css/stlye.cssとhtml内で扱えるようになります。

# session使用
app.add_middleware(SessionMiddleware, secret_key=os.environ.get('MIDDLE_KEY'))

セッションに変数を保存できるようになります。
request.session['user'] = userのように代入できて、他のrouterでも参照できます。

base/aio_req.py

aio_req.py
from base.guild_permission import Permission
import aiohttp
import aiofiles

from typing import List,Any,Dict

import os
import io
import pickle

DISCORD_BASE_URL = "https://discord.com/api"

DISCORD_BOT_TOKEN = os.environ["DISCORD_BOT_TOKEN"]

# getリクエストを行う
async def aio_get_request(url: str, headers: dict) -> Dict:
    async with aiohttp.ClientSession() as session:
        async with session.get(
            url = url,
            headers = headers
        ) as resp:
            return await resp.json()

# postリクエストを行う
async def aio_post_request(url: str, headers: dict, data: dict) -> Dict:
    async with aiohttp.ClientSession() as session:
        async with session.post(
            url = url,
            headers = headers,
            data = data
        ) as resp:
            return await resp.json()
        
async def pickle_read(filename:str) -> Any:
    """
    pickleファイルの読み込み

    param:
    filename:str
        pickleファイルの名前

    return:
        pickleファイルの中身
    """
    # 読み取り
    async with aiofiles.open(
        file=f'{filename}.pickle',
        mode='rb'
    ) as f:
        pickled_bytes = await f.read()
        with io.BytesIO() as f:
            f.write(pickled_bytes)
            f.seek(0)
            fetch = pickle.load(f)
            return fetch

async def pickle_write(
    filename:str,
    table_fetch:List[Dict]
) -> None:
    """
    pickleファイルの書き込み

    param:
    filename    :str
        pickleファイルの名前
    
    table_fetch :List[Dict]
        SQLから取り出したデータ
    """
    # 取り出して書き込み
    dict_row = [
        dict(zip(record.keys(), record.values())) 
        for record in table_fetch
    ]
    # 書き込み
    async with aiofiles.open(
        file=f'{filename}.pickle',
        mode='wb'
    ) as f:
        await f.write(pickle.dumps(obj=dict_row))


async def search_guild(
    bot_in_guild_get:List[dict],
    user_in_guild_get:List[dict]
) -> List:
    """
    Botとログインしたユーザーが所属しているサーバーを調べ、同じものを返す

    param:
    bot_in_guild_get    :List[dict]
        Botが所属しているサーバー一覧

    user_in_guild_get   :List[dict]
        ユーザーが所属しているサーバー一覧

    return:
    List
        所属が同じサーバー一覧
    """

    bot_guild_id = []
    user_guild_id = []
    match_guild = []

    bot_guild_id = [
        bot_guild.get('id')
        for bot_guild in bot_in_guild_get
    ]

    user_guild_id = [
        user_guild.get('id')
        for user_guild in user_in_guild_get
    ]
    
    # for探索短縮のため、総数が少ない方をforinする
    if len(bot_guild_id) < len(user_guild_id):
        match_guild = [
            guild
            for guild_id,guild in zip(bot_guild_id,bot_in_guild_get)
            if guild_id in user_guild_id
        ]
        
    else:
        match_guild = [
            guild
            for guild_id,guild in zip(user_guild_id,user_in_guild_get)
            if guild_id in bot_guild_id
        ]
        

    return match_guild


async def search_role(
    guild_role_get:List[dict],
    user_role_get:List[dict]
) -> List[dict]:
    """
    ユーザがサーバ内で持っているロールの詳細を取得する

    param:
    guild_role_get  :List[dict]
        サーバにある全てのロール
    user_role_get   :List[dict]
        ユーザ情報

    return:
    List
        ユーザーが持っているロール一覧
    """
    guild_role_id = []
    user_role_id = []
    match_role = []

    for guild_role in guild_role_get:
        guild_role_id.append(guild_role['id'])

    for user_guild in user_role_get["roles"]:
        user_role_id.append(user_guild)
            
    for role_id,role in zip(guild_role_id,guild_role_get):
        if role_id in user_role_id:
            match_role.append(role)

    return match_role

async def return_permission(
    guild_id:int,
    user_id:int,
    access_token:str
) -> Permission:
    """
    指定されたユーザの権限を返す(ロールの権限も含む)

    guild_id        :int
        サーバのid
    user_id         :int
        ユーザのid
    access_token    :str
        ユーザのアクセストークン
    
    """

    # ログインユーザの情報を取得
    guild_user = await aio_get_request(
        url = DISCORD_BASE_URL + f'/guilds/{guild_id}/members/{user_id}',
        headers = {
            'Authorization': f'Bot {DISCORD_BOT_TOKEN}'
        }
    )

    # サーバのロールを取得
    guild_role = await aio_get_request(
        url = DISCORD_BASE_URL + f'/guilds/{guild_id}/roles',
        headers = {
            'Authorization': f'Bot {DISCORD_BOT_TOKEN}'
        }
    )

    # ログインユーザのロールの詳細を取得
    match_role = await search_role(
        guild_role_get = guild_role,
        user_role_get = guild_user
    )

    # ログインユーザの所属しているサーバを取得
    guild_info = await aio_get_request(
        url = DISCORD_BASE_URL + f'/users/@me/guilds',
        headers = {
            'Authorization': f'Bearer {access_token}'
        }
    )

    permission = 0
    user_permission = Permission()

    # サーバでの権限を取得
    for info in guild_info:
        if str(guild_id) == info["id"]:
            permission = info["permissions"]
            break

    await user_permission.get_permissions(permissions=permission)

    for role in match_role:
        # サーバー管理者であるかどうかを調べる
        role_permission = Permission()
        await role_permission.get_permissions(permissions=int(role["permissions"]))
        user_permission = user_permission | role_permission

    return user_permission

async def oauth_check(
    access_token:str
) -> bool:
    """
    OAuth2のトークンが有効か判断する

    param:
    access_token:str
        OAuth2のトークン

    return:
    bool
        トークンが有効な場合、True
        無効の場合、Falseが返される
    """
    oauth_data:dict = await aio_get_request(
        url = DISCORD_BASE_URL + '/users/@me', 
        headers = { 
            'Authorization': f'Bearer {access_token}' 
        }
    )
    if oauth_data.get('message') == '401: Unauthorized':
        return False
    else:
        return True

いくら非同期で接続するとはいえ、データベースへの接続には時間がかかります。
データの参照時間を軽減するため、pickleを使いデータベースのキャッシュを作ります。
return_permissionでは通常の権限だけでなく、ユーザが持っているロールからも権限を読み取ります。

base/database.py

PostgreSQLの操作をします。
asyncpgで非同期での操作を行います。

database.py
import asyncpg
from asyncpg.connection import Connection 
from asyncpg.exceptions import DuplicateTableError

from typing import List,Dict,Any,Union,Tuple

from dotenv import load_dotenv
load_dotenv()


class DataBaseNotConnect(Warning):...

class PostgresDB:
    def __init__(
            self,
            user:str, 
            password:str, 
            database:str, 
            host:str
    ):
        """
        PostgreSQLのクラス

        user    :str
            Postgresのユーザー名
        password:str
            パスワード
        database:str
            データベースの名前
        host    :str
            ホスト番号
        conn    :Connection
            データベースの接続情報
        """
        self.user = user
        self.password = password
        self.database = database
        self.host = host
        self.conn:Connection = None

    async def connect(self):
        """
        PostgreSQLへ接続
        """
        self.conn = await asyncpg.connect(
            user=self.user, 
            password=self.password, 
            database=self.database, 
            host=self.host
        )

    async def disconnect(self):
        """
        PostgreSQLの切断
        """
        if self.conn == None:
            raise DataBaseNotConnect
        await self.conn.close()

    async def create_table(self, table_name:str, columns:dict) -> str:
        """
        テーブルの作成

        table_name  :str
            作成するテーブル名
        colums      :dict
            テーブル内の名前と型
        """
        if self.conn == None:
            raise DataBaseNotConnect
        columns_str = ', '.join(
            [
                f"{column_name} {data_type}" for column_name, data_type in columns.items()
            ]
        )
        sql = f"CREATE TABLE IF NOT EXISTS {table_name} ({columns_str});"
        try:
            await self.conn.execute(sql)
            return "ok"
        except DuplicateTableError:
            return "DuplicateTableError"
        
    async def drop_table(self, table_name:str) -> None:
        """
        テーブルの削除

        table_name:str
            削除するテーブルの名前
        """
        sql = f"DROP TABLE IF EXISTS {table_name};"
        await self.conn.execute(sql)

    async def select_rows(
        self, 
        table_name:str, 
        columns:List[str]=None, 
        where_clause:dict=None
    ) -> List:
        """
        テーブルの参照
        
        table_name  :str
            参照するテーブルの名前
        columns     :List[str]
            参照する列、指定がない場合すべてを参照
        where_clause:dict
            条件、指定しない場合はすべて取得

        return:

        list        :List[Any]
        """
        if self.conn == None:
            raise DataBaseNotConnect
        if columns is None or len(columns) == 0:
            columns_str = '*'
        else:
            columns_str = ', '.join(columns)

        if where_clause is None:
            sql = f"SELECT {columns_str} FROM {table_name};"
        else:
            where_clause_str = ' AND '.join(
                [
                    f"{column}=${i+1}" for i, column in enumerate(
                        where_clause.keys()
                    )
                ]
            )
            where_clause_values = list(where_clause.values())
            sql = f"SELECT {columns_str} FROM {table_name} "
            if where_clause_str:
                sql += f"WHERE {where_clause_str};"
            else:
                sql += ";"

        try:
            return await self.conn.fetch(sql, *where_clause_values)
        except asyncpg.exceptions.UndefinedTableError:
            return [f"{table_name} does not exist"]

    async def insert_row(
        self, 
        table_name:str, 
        row_values:dict
    ) -> bool:
        """
        行の追加
        
        table_name:str
            対象のテーブルの名前
        row_values:dict
            追加する行の内容
        """
        if self.conn == None:
            raise DataBaseNotConnect
        columns_str = ', '.join(row_values.keys())
        values_str = ', '.join(
            [
                f"${i+1}" for i in range(len(row_values))
            ]
        )
        sql = f"INSERT INTO {table_name} ({columns_str}) VALUES ({values_str});"
        try:
            await self.conn.execute(sql, *row_values.values())
            return True
        except asyncpg.exceptions.UniqueViolationError:
            return False
        
    async def batch_insert_row(
        self, 
        table_name: str, 
        row_values: List[Dict[str, Any]]
    ) -> None:
        """
        行を一気に作成
        """
        if self.conn == None:
            raise DataBaseNotConnect
        
        
        columns = row_values[0].keys()

        values = [
            tuple(
                row[col] 
                for col in columns
            ) 
            for row in row_values
        ]
        
        await self.conn.copy_records_to_table(
            table_name=table_name,
            records=values
        )

    async def update_row(
        self, 
        table_name:str, 
        row_values:dict, 
        where_clause:dict
    ) -> None:
        """
        行の更新
        
        table_name  :str 
            テーブルの名前
        row_values  :dict 
            更新の内容
        where_clause:dict
            条件
        """
        if self.conn == None:
            raise DataBaseNotConnect
        
        set_clause_str = ', '.join(
            [
                f"{column}=${i+1}" for i, column in enumerate(
                    row_values.keys()
                )
            ]
        )
        
        where_clause_str = ' AND '.join(
            [
                f"{column}=${i+len(row_values)+1}" for i, column in enumerate(
                    where_clause.keys()
                )
            ]
        )
        values = list(row_values.values()) + list(where_clause.values())
        sql = f"UPDATE {table_name} SET {set_clause_str} "
        if where_clause_str:
            sql += f"WHERE {where_clause_str};"
        else:
            sql += ";"
        await self.conn.execute(sql, *values)

    async def primary_batch_update_rows(
        self, 
        table_name: str, 
        set_values_and_where_columns: List[Dict],
        table_colum:Dict
    ) -> None:
        """
        updateを複数行う

        param:
        tabel_name                  : str
            更新するテーブル名

        set_values_and_where_columns: List[Dict],
            更新する値と条件の辞書型配列
            それぞれに更新する行と、更新する内容を記述する
            必ず以下のような構造にすること
            また、where_clauseは主キーを指定し、重複させないこと

            set_values_and_where_columns = [
                {
                    'where_clause': {'channel_id': 0},
                    'row_values': {'カラム名': ''},
                },
                {
                    'where_clause': {'channel_id': 1},
                    'row_values': {'カラム名': ''},
                },
                {
                    'where_clause': {'channel_id': 2},
                    'row_values': {'カラム名': '', 'カラム名': ''},
                },
            ]

        table_colum                 : Dict
            テーブルのカラム一覧
            Postgresでcreateしたときのものを辞書型で表現すること

            table_colum = {
                'channel_id': 'NUMERIC PRIMARY KEY', 
                'guild_id': 'NUMERIC', 
                'line_ng_channel': 'boolean',
                'ng_message_type': 'VARCHAR(50)[]',
                'message_bot': 'boolean',
                'ng_users':'NUMERIC[]'
            }

        """
        # テーブル名と列名のリストを取得
        columns = table_colum.keys()

        # 主キーを取り出す
        primary_key = [
            key 
            for key,value in table_colum.items() 
            if 'PRIMARY KEY' in value
        ]

        updates = set_values_and_where_columns

        # SET 句の文字列を構築
        set_clauses = []
        # はじめに条件となる主キーを代入
        values = [w['where_clause'][primary_key[0]] for w in updates]
        values_len = [w['where_clause'][primary_key[0]] for w in updates]

        # 主キーの数でパラメータの初期値を決める
        param_count = len(values_len) + 1

        for i, column in enumerate(columns):
            set_clause = f"{column} = CASE "
            # case文に入るカラムがあるかのフラグ
            set_clause_flag = False

            for j, update in enumerate(updates):
                
                # 更新するカラムがあった場合
                if update['row_values'].get(column) is not None:
                    # フラグを挙げる
                    set_clause_flag = True
                    # $param_count $param_count + 1
                    set_clause += f"WHEN {primary_key[0]} = ${param_count} THEN ${param_count + 1} "
                    # 上記の数だけ2足す
                    param_count += 2
                    values.append(update['where_clause'][primary_key[0]])
                    values.append(update['row_values'][column])

                    #print(update['where_clause'][primary_key[0]],column,update['row_values'][column])

            set_clause += f"ELSE {column} END"
            # case文が書かれていた場合、配列に追加
            if set_clause_flag:
                set_clauses.append(set_clause)

        # WHERE 句の文字列を構築
        where_clause = f"{primary_key[0]} IN (" + ", ".join(
            f"${i + 1}" for i in range(len(values_len))
        ) + ")"

        # SQL 文の構築
        sql = f"UPDATE {table_name} SET \n{', '.join(set_clauses)} WHERE {where_clause}"

        await self.conn.execute(sql, *values)

    async def delete_row(
        self, 
        table_name:str, 
        where_clause:dict
    ) -> None:
        """
        行の削除
        
        param:
        table_name  :str 
            テーブルの名前
        where_clause:dict
            条件
        
        """
        if self.conn == None:
            raise DataBaseNotConnect
        where_clause_str = ' AND '.join(
            [
                f"{column}=${i+1}" for i, column in enumerate(
                    where_clause.keys()
                )
            ]
        )
        where_clause_values = list(where_clause.values())
        sql = f"DELETE FROM {table_name} "
        if where_clause_str:
            sql += f"WHERE {where_clause_str};"
        else:
            sql += ";"
        await self.conn.execute(sql, *where_clause_values)

    async def free_sql(self,sql_syntax:str)-> List:
        """
        PostgreSQLの構文を文字列にしてそのまま実行する

        param:
        sql_syntax:str
        sqlの構文

        return:
        List
        selectは結果が帰ってくる
        """
        if self.conn == None:
            raise DataBaseNotConnect
        return await self.conn.fetch(sql_syntax)
    
    async def get_columns_type(
        self, 
        table_name:str
    ) -> Dict:
        """
        指定されたテーブルの列の型を返す

        param:
        table_name  :str 
            テーブルの名前

        return:
        List[Tuple[str,str]]
            行名と型の配列

        """
        query = f"""
            SELECT column_name, data_type 
            FROM information_schema.columns 
            WHERE table_name = '{table_name}';
        """

        result = await self.conn.fetch(query)
        columns:List[Tuple[str,str]] = [
            (row['column_name'],row['data_type'])
            for row in result
        ]
        columns_dict:Dict = {}

        # ARRAY型の列の要素のデータ型を取得
        for i, (column_name,data_type) in enumerate(columns):
            # 初期の辞書型
            tmp_dict:Dict = {column_name:data_type}
            # 配列の場合
            if data_type.startswith('ARRAY'):
                query = f"""
                SELECT column_name, udt_name 
                FROM information_schema.columns 
                WHERE table_name = '{table_name}' AND column_name = '{column_name}'
                """

                result:List[Dict] = await self.conn.fetch(query)
                element_data_type:str = result[0]['udt_name']

                # 配列の場合(先頭に_がある)
                if element_data_type.startswith('_'):
                    element_data_type = element_data_type.replace('_','')
                    element_data_type = f'{element_data_type}[]'
                # 配列用に更新
                tmp_dict = {
                    column_name: element_data_type
                }

            # varcharの場合
            if data_type == 'character varying':
                tmp_dict = {
                    column_name: 'varchar'
                }
            columns_dict.update(tmp_dict)

        return columns_dict

base/guild_permission.py

ユーザが持っている権限を示すクラスです。
__and__で比較も可能です。

guild_permission.py
class Permission:
    def __init__(self) -> None:
        """
        このクラスは、Discordの権限を表すために使用されます。

        各権限は、クラスの属性として定義されています。
        それぞれの属性には、TrueまたはFalseのブール値が割り当てられています。

        administrator:              管理者権限を持っているかどうか
        create_instant_invite:      サーバーに招待リンクを作成できるかどうか
        kick_members:               メンバーをキックできるかどうか
        ban_members:                メンバーをBANできるかどうか
        manage_channels:            チャンネルを管理できるかどうか
        manage_guild:               サーバーを管理できるかどうか
        add_reactions:              メッセージにリアクションを追加できるかどうか
        view_audit_log:             サーバーの監査ログを表示できるかどうか
        priority_speaker:           優先スピーカーになれるかどうか
        stream:                     配信できるかどうか
        view_channel:               チャンネルを表示できるかどうか
        send_messages:              メッセージを送信できるかどうか
        send_tts_messages:          TTSメッセージを送信できるかどうか
        manage_messages:            メッセージを管理できるかどうか
        embed_links:                埋め込みリンクを送信できるかどうか
        attach_files:               ファイルを添付できるかどうか
        read_message_history:       メッセージ履歴を表示できるかどうか
        mention_everyone:           @everyoneや@hereを使用できるかどうか
        use_external_emojis:        外部の絵文字を使用できるかどうか
        view_guild_insights:        サーバーのインサイトを表示できるかどうか
        connect:                    ボイスチャンネルに接続できるかどうか
        speak:                      ボイスチャンネルで発言できるかどうか
        mute_members:               メンバーをミュートできるかどうか
        deafen_members:             メンバーをデフェンできるかどうか
        move_members:               メンバーを移動できるかどうか
        use_vad:                    音声検出を使用できるかどうか
        change_nickname:            ニックネームを変更できるかどうか
        manage_nicknames:           ニックネームを管理できるかどうか
        manage_roles:               ロールを管理できるかどうか
        manage_webhooks:            Webhookを管理できるかどうか
        manage_emojis_and_stickers: 絵文字とステッカーを管理できるかどうか
        use_application_commands:   アプリケーションコマンドを使用できるかどうか
        """
        self.administrator = False
        self.create_instant_invite = False
        self.kick_members = False
        self.ban_members = False
        self.manage_channels = False
        self.manage_guild = False
        self.add_reactions = False
        self.view_audit_log = False
        self.priority_speaker = False
        self.stream = False
        self.view_channel = False
        self.send_messages = False
        self.send_tts_messages = False
        self.manage_messages = False
        self.embed_links = False
        self.attach_files = False
        self.read_message_history = False
        self.mention_everyone = False
        self.use_external_emojis = False
        self.view_guild_insights = False
        self.connect = False
        self.speak = False
        self.mute_members = False
        self.deafen_members = False
        self.move_members = False
        self.use_vad = False
        self.change_nickname = False
        self.manage_nicknames = False
        self.manage_roles = False
        self.manage_webhooks = False
        self.manage_emojis_and_stickers = False
        self.use_application_commands = False
        self.request_to_speak = False
        self.manage_threads = False
        self.use_public_threads = False
        self.use_private_threads = False
        self.use_external_stickers = False

    def __or__(self, other: 'Permission') -> 'Permission':
        """
        2つのPermissionオブジェクトを論理和演算して、新しいPermissionオブジェクトを返すメソッド
        """
        result = Permission()  # 新しいPermissionオブジェクトを生成する
        result.administrator = self.administrator or other.administrator
        result.create_instant_invite = self.create_instant_invite or other.create_instant_invite
        result.kick_members = self.kick_members or other.kick_members
        result.ban_members = self.ban_members or other.ban_members
        result.manage_channels = self.manage_channels or other.manage_channels
        result.manage_guild = self.manage_guild or other.manage_guild
        result.add_reactions = self.add_reactions or other.add_reactions
        result.view_audit_log = self.view_audit_log or other.view_audit_log
        result.priority_speaker = self.priority_speaker or other.priority_speaker
        result.stream = self.stream or other.stream
        result.view_channel = self.view_channel or other.view_channel
        result.send_messages = self.send_messages or other.send_messages
        result.send_tts_messages = self.send_tts_messages or other.send_tts_messages
        result.manage_messages = self.manage_messages or other.manage_messages
        result.embed_links = self.embed_links or other.embed_links
        result.attach_files = self.attach_files or other.attach_files
        result.read_message_history = self.read_message_history or other.read_message_history
        result.mention_everyone = self.mention_everyone or other.mention_everyone
        result.use_external_emojis = self.use_external_emojis or other.use_external_emojis
        result.view_guild_insights = self.view_guild_insights or other.view_guild_insights
        result.connect = self.connect or other.connect
        result.speak = self.speak or other.speak
        result.mute_members = self.mute_members or other.mute_members
        result.deafen_members = self.deafen_members or other.deafen_members
        result.move_members = self.move_members or other.move_members
        result.use_vad = self.use_vad or other.use_vad
        result.change_nickname = self.change_nickname or other.change_nickname
        result.manage_nicknames = self.manage_nicknames or other.manage_nicknames
        result.manage_roles = self.manage_roles or other.manage_roles
        result.manage_webhooks = self.manage_webhooks or other.manage_webhooks
        result.manage_emojis_and_stickers = self.manage_emojis_and_stickers or other.manage_emojis_and_stickers
        result.use_application_commands = self.use_application_commands or other.use_application_commands
        result.request_to_speak = self.request_to_speak or other.request_to_speak
        result.manage_threads = self.manage_threads or other.manage_threads
        result.use_public_threads = self.use_public_threads or other.use_public_threads
        result.use_private_threads = self.use_private_threads or other.use_private_threads
        result.use_external_stickers = self.use_external_stickers or other.use_external_stickers
        
        return result
    
    def __and__(self, other: 'Permission') -> 'Permission':
        """
        2つのPermissionオブジェクトを論理積演算して、新しいPermissionオブジェクトを返すメソッド
        """
        result = Permission()  # 新しいPermissionオブジェクトを生成する
        result.administrator = self.administrator and other.administrator
        result.create_instant_invite = self.create_instant_invite and other.create_instant_invite
        result.kick_members = self.kick_members and other.kick_members
        result.ban_members = self.ban_members and other.ban_members
        result.manage_channels = self.manage_channels and other.manage_channels
        result.manage_guild = self.manage_guild and other.manage_guild
        result.add_reactions = self.add_reactions and other.add_reactions
        result.view_audit_log = self.view_audit_log and other.view_audit_log
        result.priority_speaker = self.priority_speaker and other.priority_speaker
        result.stream = self.stream and other.stream
        result.view_channel = self.view_channel and other.view_channel
        result.send_messages = self.send_messages and other.send_messages
        result.send_tts_messages = self.send_tts_messages and other.send_tts_messages
        result.manage_messages = self.manage_messages and other.manage_messages
        result.embed_links = self.embed_links and other.embed_links
        result.attach_files = self.attach_files and other.attach_files
        result.read_message_history = self.read_message_history and other.read_message_history
        result.mention_everyone = self.mention_everyone and other.mention_everyone
        result.use_external_emojis = self.use_external_emojis and other.use_external_emojis
        result.view_guild_insights = self.view_guild_insights and other.view_guild_insights
        result.connect = self.connect and other.connect
        result.speak = self.speak and other.speak
        result.mute_members = self.mute_members and other.mute_members
        result.deafen_members = self.deafen_members and other.deafen_members
        result.move_members = self.move_members and other.move_members
        result.use_vad = self.use_vad and other.use_vad
        result.change_nickname = self.change_nickname and other.change_nickname
        result.manage_nicknames = self.manage_nicknames and other.manage_nicknames
        result.manage_roles = self.manage_roles and other.manage_roles
        result.manage_webhooks = self.manage_webhooks and other.manage_webhooks
        result.manage_emojis_and_stickers = self.manage_emojis_and_stickers and other.manage_emojis_and_stickers
        result.use_application_commands = self.use_application_commands and other.use_application_commands
        result.request_to_speak = self.request_to_speak and other.request_to_speak
        result.manage_threads = self.manage_threads and other.manage_threads
        result.use_public_threads = self.use_public_threads and other.use_public_threads
        result.use_private_threads = self.use_private_threads and other.use_private_threads
        result.use_external_stickers = self.use_external_stickers and other.use_external_stickers
        
        return result


    async def get_permissions(self,permissions:int) -> None:
        """
        パーミッションを判別し、boolで返す
        param:
        permissions:int
            権限を表すコード
        """
        self.administrator = permissions & (1 << 3) == (1 << 3)
        self.create_instant_invite = permissions & (1 << 0) == (1 << 0)
        self.kick_members = permissions & (1 << 1) == (1 << 1)
        self.ban_members = permissions & (1 << 2) == (1 << 2)
        self.manage_channels = permissions & (1 << 4) == (1 << 4)
        self.manage_guild = permissions & (1 << 5) == (1 << 5)
        self.add_reactions = permissions & (1 << 6) == (1 << 6)
        self.view_audit_log = permissions & (1 << 7) == (1 << 7)
        self.priority_speaker = permissions & (1 << 8) == (1 << 8)
        self.stream = permissions & (1 << 9) == (1 << 9)
        self.view_channel = permissions & (1 << 10) == (1 << 10)
        self.send_messages = permissions & (1 << 11) == (1 << 11)
        self.send_tts_messages = permissions & (1 << 12) == (1 << 12)
        self.manage_messages = permissions & (1 << 13) == (1 << 13)
        self.embed_links = permissions & (1 << 14) == (1 << 14)
        self.attach_files = permissions & (1 << 15) == (1 << 15)
        self.read_message_history = permissions & (1 << 16) == (1 << 16)
        self.mention_everyone = permissions & (1 << 17) == (1 << 17)
        self.use_external_emojis = permissions & (1 << 18) == (1 << 18)
        self.view_guild_insights = permissions & (1 << 19) == (1 << 19)
        self.connect = permissions & (1 << 20) == (1 << 20)
        self.speak = permissions & (1 << 21) == (1 << 21)
        self.mute_members = permissions & (1 << 22) == (1 << 22)
        self.deafen_members = permissions & (1 << 23) == (1 << 23)
        self.move_members = permissions & (1 << 24) == (1 << 24)
        self.use_vad = permissions & (1 << 25) == (1 << 25)
        self.change_nickname = permissions & (1 << 26) == (1 << 26)
        self.manage_nicknames = permissions & (1 << 27) == (1 << 27)
        self.manage_roles = permissions & (1 << 28) == (1 << 28)
        self.manage_webhooks = permissions & (1 << 29) == (1 << 29)
        self.manage_emojis_and_stickers = permissions & (1 << 30) == (1 << 30)
        self.use_application_commands = permissions & (1 << 31) == (1 << 31)
        self.request_to_speak = permissions & (1 << 32) == (1 << 32)
        self.manage_threads = permissions & (1 << 34) == (1 << 34)
        self.use_public_threads = permissions & (1 << 35) == (1 << 35)
        self.use_private_threads = permissions & (1 << 36) == (1 << 36)
        self.manage_emojis_and_stickers = permissions & (1 << 42) == (1 << 42)
        self.use_external_stickers = permissions & (1 << 43) == (1 << 43)

    async def get_permission_code(self) -> int:
        permission_code = 0
        if self.administrator:
            permission_code += 8
        if self.create_instant_invite:
            permission_code += 1
        if self.kick_members:
            permission_code += 2
        if self.ban_members:
            permission_code += 4
        if self.manage_channels:
            permission_code += 16
        if self.manage_guild:
            permission_code += 32
        if self.add_reactions:
            permission_code += 64
        if self.view_audit_log:
            permission_code += 128
        if self.priority_speaker:
            permission_code += 256
        if self.stream:
            permission_code += 512
        if self.view_channel:
            permission_code += 1024
        if self.send_messages:
            permission_code += 2048
        if self.send_tts_messages:
            permission_code += 4096
        if self.manage_messages:
            permission_code += 8192
        if self.embed_links:
            permission_code += 16384
        if self.attach_files:
            permission_code += 32768
        if self.read_message_history:
            permission_code += 65536
        if self.mention_everyone:
            permission_code += 131072
        if self.use_external_emojis:
            permission_code += 262144
        if self.view_guild_insights:
            permission_code += 524288
        if self.connect:
            permission_code += 1048576
        if self.speak:
            permission_code += 2097152
        if self.mute_members:
            permission_code += 4194304
        if self.deafen_members:
            permission_code += 8388608
        if self.move_members:
            permission_code += 16777216
        if self.use_vad:
            permission_code += 33554432
        if self.change_nickname:
            permission_code += 67108864
        if self.manage_nicknames:
            permission_code += 134217728
        if self.manage_roles:
            permission_code += 268435456
        if self.manage_webhooks:
            permission_code += 536870912
        if self.manage_emojis_and_stickers:
            permission_code += 1073741824
        if self.use_application_commands:
            permission_code += 2147483648
        if self.request_to_speak:
            permission_code += 4294967296
        if self.manage_threads:
            permission_code += 8589934592
        if self.use_public_threads:
            permission_code += 34359738368
        if self.use_private_threads:
            permission_code += 68719476736
        if self.use_external_stickers:
            permission_code += 137438953472

        return permission_code

core/start.py

DiscordBotを立ち上げるコードです。

start.py
import discord
from discord import Intents
import os
import json
import traceback
import requests,json


from dotenv import load_dotenv
load_dotenv()

from core.db_pickle import db_pickle_save

class DBot(discord.AutoShardedBot):
    def __init__(self, token:str, intents:Intents) -> None:
        self.token = token
        super().__init__(intents = intents)
        self.load_cogs()

    async def on_ready(self) -> None:
        await self.change_presence(
            status=discord.Status.do_not_disturb,
            activity=discord.Activity(name='起動中...................',type=discord.ActivityType.watching)
        )
        await self.db_get()
        print('起動しました')
        game_name = os.environ.get('GAME_NAME')
        if game_name == None:
            game_name = 'senran kagura'
        await self.change_presence(
            status=discord.Status.online,
            activity=discord.Game(name = game_name)
        )

    def load_cogs(self) -> None:
        for file in os.listdir("./cogs"): 
            if file.endswith(".py"): 
                cog = file[:-3] 
                self.load_extension(f"cogs.{cog}")
                print(cog + "をロードしました")

    async def db_get(self) -> None:
        # データベースへ接続
        await db_pickle_save(guilds=self.guilds)

    

        

    # 起動用の補助関数です
    def run(self) -> None:
        try:
            self.loop.run_until_complete(self.start(self.token))
        except discord.LoginFailure:
            print("Discord Tokenが不正です")
        except KeyboardInterrupt:
            print("終了します")
            self.loop.run_until_complete(self.close())
        except discord.HTTPException as e:
            traceback.print_exc()
            if e.status == 429 and os.environ.get("WEBHOOK") != None:
                main_content = {'content': 'DiscordBot 429エラー\n直ちにDockerファイルを再起動してください'}
                headers      = {'Content-Type': 'application/json'}

                response     = requests.post(
                    url=os.environ.get("WEBHOOK"), 
                    data=json.dumps(main_content), 
                    headers=headers
                )
                
        except Exception as e:
            traceback.print_exc()
            if os.environ.get("WEBHOOK") != None:
                if os.environ.get("ADMIN_ID") != None:
                    admin_id = os.environ.get("ADMIN_ID")
                    text = f"<@{int(admin_id)}> {e}"
                else:
                    text = str(e)
                main_content = {
                    'content':text
                }
                headers = {
                    'Content-Type': 'application/json'
                }
                response = requests.post(
                    url=os.environ.get("WEBHOOK"), 
                    data=json.dumps(main_content), 
                    headers=headers
                )

core/db_pickle.py

データベースからデータを取得し、pickleでキャッシュを作成します。

db_pickle.py
from discord import Guild

from dotenv import load_dotenv
load_dotenv()

from typing import List
import os

from base.database import PostgresDB


from core.pickes_save import (
    vc_columns,
    guild_permissions_columns
)
DISCORD_BASE_URL = "https://discord.com/api"

DISCORD_BOT_TOKEN = os.environ["DISCORD_BOT_TOKEN"]

USER = os.getenv('PGUSER')
PASSWORD = os.getenv('PGPASSWORD')
DATABASE = os.getenv('PGDATABASE')
HOST = os.getenv('PGHOST')
db = PostgresDB(
    user=USER,
    password=PASSWORD,
    database=DATABASE,
    host=HOST
)

async def db_pickle_save(guilds:List[Guild]) -> None:
    """
    データベースのテーブルの中身をキャッシュデータとしてローカルに保存します。

    param:
    guilds  :List[discord.Guild]
    Botが所属しているDiscordサーバのクラス
    """
    # データベースへ接続
    await db.connect()
    # サーバごとにテーブルのキャッシュデータを作成
    for guild in guilds:
        await vc_columns.vc_pickle_save(db=db,guild=guild)
        await guild_permissions_columns.guild_permissions_pickle_save(db=db,guild=guild)

    await db.disconnect()

core/pickle_save/vc_columns.py

ボイスチャンネル入退室管理のテーブルを取得し、キャッシュを保存する。

vc_columns.py
from typing import List,Dict

from discord import Guild

from base.database import PostgresDB
from base.aio_req import (
    pickle_write
)

from core.pickes_save.bin.get_channel import get_discord_channel
from core.pickes_save.bin.check_table import check_table_type

VC_TABLE = 'guilds_vc_signal_'
VC_COLUMNS = {
    'vc_id'             : 'NUMERIC PRIMARY KEY', 
    'guild_id'          : 'NUMERIC', 
    'send_signal'       : 'boolean',
    'send_channel_id'   : 'NUMERIC', 
    'join_bot'          : 'boolean',
    'everyone_mention'  : 'boolean',
    'mention_role_id'   : 'NUMERIC[]'
}
VC_NEW_COLUMNS = {
    'vc_id'             : 0, 
    'guild_id'          : 0, 
    'send_signal'       : True,
    'send_channel_id'   : 0, 
    'join_bot'          : True,
    'everyone_mention'  : True,
    'mention_role_id'   : []
}
# 取得するチャンネルのタイプ
VC_TYPE = [2]

async def vc_pickle_save(
    db:PostgresDB,
    guild:Guild
) -> None:
    """
    ボイスチャンネルの入退室を管理するテーブルの作成、更新
    キャッシュデータの作成

    param:
    db:PostgresDB
        接続するデータベースのインスタンス
    guild:Guild
        Discordのサーバーインスタンス
    """

    # テーブル名を代入
    table_name:str = f"{VC_TABLE}{guild.id}"

    # テーブルをつくるか、カラムを取得するかのフラグ
    create_colum_flag = False
    create_table_flag = False

    # テーブルを削除するかのフラグ
    drop_table_flag = False

    # テーブルの要素を取得
    table_fetch:List[Dict] = await db.select_rows(
        table_name=f"{table_name}",
        columns=[],
        where_clause={}
    )

    # テーブル内のカラム名配列
    channel_colums = [key for key in VC_COLUMNS.keys()]

    # テーブルがなかった場合、作成
    if len(table_fetch) == 1:
        # テーブルが存在しない場合、作成
        if (table_fetch[0] == f"{table_name} does not exist"):
            create_table_flag = True
            table_colums = [key for key in VC_COLUMNS.keys()]
        # テーブルが存在する場合、カラムを格納
        else:
            create_colum_flag = True
            # データベース側のカラムの型を入手
            table_columns_type = await db.get_columns_type(table_name=table_name)
            table_colums = [key for key in table_columns_type.keys()]
    else:
         # データベース側のカラムの型を入手
        table_columns_type = await db.get_columns_type(table_name=table_name)
        table_colums = [key for key in table_columns_type.keys()]

    # テーブルの方が存在する場合
    if bool('table_columns_type' in locals()):
        # テーブル内のカラムの型配列を確認
        unchanged,table_fetch = await check_table_type(
            columns=VC_COLUMNS,
            table_columns=table_columns_type,
            new_columns=VC_NEW_COLUMNS,
            table_fetch=table_fetch
        )
    else:
        unchanged = False

    # テーブルに変更があるかのフラグ
    changed_table_flag = table_colums != channel_colums or unchanged != False

    # テーブルが存在しているが、中身が空
    if len(table_fetch) == 0:
        print(f'テーブル:{table_name}の要素は空です')
        # 要素が変更されていた場合
        if changed_table_flag:
            drop_table_flag = True
        # ローカル側のカラムを格納
        else:
            table_colums = [key for key in VC_COLUMNS.keys()]

        # データベース側のカラムを格納
    if create_colum_flag:
        print(f'テーブル:{table_name}のカラム名一覧を作成します')
        table_colums = [key for key in table_fetch[0].keys()]
        
    # カラムの構成が変更されていた場合、削除し新たに作成する
    if changed_table_flag or drop_table_flag:
        print(f'テーブル:{table_name}を削除します')
        create_table_flag = True
        await db.drop_table(table_name=table_name)

    # テーブルの作成
    if create_table_flag:
        print(f'テーブル:{table_name}を作成します')
        await db.create_table(
            table_name=table_name,
            columns=VC_COLUMNS
        )

    # テーブルに変更があった場合
    if changed_table_flag and len(table_fetch) != 0:
        if "does not exist" not in table_fetch[0]:
            # まとめて作成(バッジ)
            await db.batch_insert_row(
                table_name=table_name,
                row_values=table_fetch
            )

    # 中身が空の場合、テーブルが作成された場合
    if len(table_fetch) == 0 or create_table_flag:

        # Discordのチャンネルを取得
        all_channel = await get_discord_channel(
            guild_id=guild.id,
            get_channel_type=VC_TYPE
        )

        row_values = []

        for channel in all_channel:
            row = {}
            for key,values in VC_NEW_COLUMNS.items():
                # 各要素を更新
                if key == "vc_id":
                    value = channel["id"]
                elif key == "guild_id":
                    value = guild.id
                elif key == "send_channel_id":
                    # システムチャンネルがある場合代入
                    if hasattr(guild.system_channel,'id'):
                        value = guild.system_channel.id
                    else:
                        value = 0
                else:
                    value = values
                row.update({key:value})
            
            row_values.append(row)

            # 一つ一つ作成
            # await db.insert_row(table_name=table_name,row_values=row)

        # まとめて作成(バッジ)
        await db.batch_insert_row(
            table_name=table_name,
            row_values=row_values
        )

        # テーブルの要素を取得
        table_fetch:List[Dict] = await db.select_rows(
            table_name=f"{table_name}",
            columns=[],
            where_clause={}
        )

    dict_row = list()

    # テーブルに中身がある場合
    if len(table_fetch) > 0:
        if table_fetch[0] != f"{table_name} does not exist":
            print(f'{table_name}.pickleの書き込みをはじめます')
            dict_row = [
                dict(zip(record.keys(), record.values())) 
                for record in table_fetch
            ]


    # 書き込み
    # pickleファイルに書き込み
    await pickle_write(
        filename=table_name,
        table_fetch=dict_row
    )

    print(f'{table_name}.pickleの書き込みが終了しました')

core/pickle_save/guild_permissions_columns.py

権限を示すテーブルのキャッシュを保存する。

guild_permissions_columns.py
from discord import Guild

from base.database import PostgresDB
from base.aio_req import (
    pickle_write
)


from core.pickes_save.bin.check_table import check_table_type

GUILD_SET_TABLE = 'guild_set_permissions'
GUILD_SET_COLUMNS = {
    'guild_id': 'NUMERIC PRIMARY KEY', 
    'vc_permission':'NUMERIC',
    'vc_user_id_permission':'NUMERIC[]',
    'vc_role_id_permission':'NUMERIC[]'
}
GUILD_SET_NEW_COLUMNS = {
    'guild_id': 0, 
    'vc_permission':8,
    'vc_user_id_permission':[],
    'vc_role_id_permission':[]
}

async def guild_permissions_pickle_save(
    db:PostgresDB,
    guild:Guild
) -> None:
    """
    サーバーの権限を示すテーブルの作成、更新
    キャッシュデータの作成

    param:
    db:PostgresDB
        接続するデータベースのインスタンス
    guild:Guild
        Discordのサーバーインスタンス
    """
    # guildのテーブル
    table_name = f"{GUILD_SET_TABLE}"
    table_fetch = await db.select_rows(
        table_name=table_name,
        columns=[],
        where_clause={
            'guild_id': guild.id
        }
    )

    # データベース側のカラムの型を入手
    table_columns_type = await db.get_columns_type(table_name=table_name)

    # テーブル内のカラム名配列
    guild_colums = [key for key in GUILD_SET_COLUMNS.keys()]
    table_colums = [key for key in table_columns_type.keys()]

    # テーブル内のカラムの型配列
    unchanged,table_fetch = await check_table_type(
        columns=GUILD_SET_COLUMNS,
        table_columns=table_columns_type,
        new_columns=GUILD_SET_NEW_COLUMNS,
        table_fetch=table_fetch
    )

    # テーブルに変更があるかのフラグ
    changed_table_flag = table_colums != guild_colums or unchanged != False


    # テーブルをつくるか、カラムを取得するかのフラグ
    create_colum_flag = False
    create_table_flag = False

    # テーブルを削除するかのフラグ
    drop_table_flag = False

    if len(table_fetch) > 0:
        # テーブルが存在しない場合、作成
        if (table_fetch[0] == f"{table_name} does not exist"):
            create_table_flag = True
        # テーブルが存在する場合、カラムを格納
        else:
            create_colum_flag = True
    # テーブルが存在している
    elif len(table_fetch) == 0:
        print(f'テーブル:{table_name}の要素は空です')
        # 中身が空かつ、要素が変更されていた場合
        if changed_table_flag:
            drop_table_flag = True
        else:
            table_colums = [key for key in GUILD_SET_COLUMNS.keys()]

    # データベース側のカラムを格納
    if create_colum_flag:
        print(f'テーブル:{table_name}のカラム名一覧を作成します')
        table_colums = [key for key in table_fetch[0].keys()]
        
    # カラムの構成が変更されていた場合、削除し新たに作成する
    if changed_table_flag or drop_table_flag:
        print(f'テーブル:{table_name}を削除します')
        create_table_flag = True
        await db.drop_table(table_name=table_name)

    # テーブルの作成
    if create_table_flag:
        print(f'テーブル:{table_name}を作成します')
        await db.create_table(
            table_name=table_name,
            columns=GUILD_SET_COLUMNS
        )

    # テーブルに変更があった場合
    if changed_table_flag and len(table_fetch) != 0:
        if "does not exist" not in table_fetch[0]:
            # まとめて作成(バッジ)
            await db.batch_insert_row(
                table_name=table_name,
                row_values=table_fetch
            )

    # ない場合は新規で登録
    if len(table_fetch) == 0 or create_table_flag:
        guild_new_colum = GUILD_SET_NEW_COLUMNS
        guild_new_colum.update({
            'guild_id':guild.id
        })
        await db.insert_row(
            table_name=table_name,
            row_values=guild_new_colum
        )

    # テーブルの要素を取得
    table_fetch = await db.select_rows(
        table_name=table_name,
        columns=[],
        where_clause={}
    )
    
    print(f'{table_name}.pickleの書き込みをはじめます')

    # pickleファイルに書き込み
    await pickle_write(
        filename=table_name,
        table_fetch=table_fetch
    )

    print(f'{table_name}.pickleの書き込みが終了しました')

core/pickle_save/bin/check_table.py

check_table.py
from typing import List,Dict,Tuple
import re

async def check_table_type(
    columns:Dict,
    table_columns:Dict,
    new_columns:Dict,
    table_fetch:List[Dict]
) -> Tuple[bool,List[Dict]]:
    """
    テーブルのカラムに変更がないか確認する

    param:
    columns:Dict
        ローカル側のカラムの型

    table_columns:Dict
        データベース側のカラムの型

    new_columns:Dict
        ローカル側のカラムの初期値

    table_fetch:List[Dict]
        データベース側の現時点でのデータ

    return:
    Tuple[bool,List[Dict]]
        変更があった場合True
        変更があった場合、更新をしたあとの列

    """
    set_columns:Dict = {}

    # 新しく行が追加された場合
    create_items = [
        {key:value}
        for key,value in new_columns.items() 
        if key not in table_columns.keys()
    ]

    # 行が削除された場合
    delete_items = [
        {key:value}
        for key,value in table_columns.items() 
        if key not in new_columns.keys()
    ]
    
    for column_name,data_type in columns.items():
        table_data_type:str = table_columns.get(column_name)
        # (数字)が含まれていた場合、取り除く
        data_type:str = re.sub(r'\(\d+\)','',data_type)
        #print(table_data_type,data_type)
        
        # データベース側になかった場合
        # 主キーで、変更があった場合
        if (table_data_type == None or 
            table_data_type not in data_type.lower() and (
            'PRIMARY KEY' in data_type or
            'primary key' in data_type
            )):
            # listの場合tupleに変換(setがlist in listを扱えないため)
            if isinstance(new_columns[column_name],list):
                new_columns[column_name] = tuple(new_columns[column_name])
            set_columns.update(
                {
                    column_name:new_columns.get(column_name)
                }
            )
        # 完全一致(大文字小文字区別せず)あった場合
        # 主キーで、変更がない場合
        elif (table_data_type == data_type.lower() or 
              (table_data_type in data_type.lower() and (
            'PRIMARY KEY' in data_type or
            'primary key' in data_type
            ))):
            set_columns.update(
                {
                    column_name:'Unchanged'
                }
            )

    # 要素の変更がある場合True
    if (len(list(set_columns.values())) == 1 and 
        list(set_columns.values())[0] == "Unchanged"):
        unchanged = True
    else:
        unchanged = False

    # テーブルが存在する場合、要素を更新
    if len(table_fetch) > 0:
        if "does not exist" not in table_fetch[0]:
            for i,table in enumerate(table_fetch):    
                table = dict(table)
                for table_key,table_value in table.items():
                    if set_columns.get(table_key) == "Unchanged":
                        table.update({table_key:table_value})

                # 追加する場合、デフォルトの値を代入 
                for item in create_items:
                    for key,value in item.items():
                        table.update({key:value})

                # 削除された場合、削除
                for item in delete_items:
                    for key,value in item.items():
                        table.pop(key)

                table_fetch[i] = table

    #print(table_fetch)

    return unchanged,table_fetch

core/pickle_save/bin/get_channel.py

get_channel.py
from base.aio_req import (
    aio_get_request
)

from dotenv import load_dotenv
load_dotenv()


import os
from typing import List,Dict,Any,Tuple
DISCORD_BASE_URL = "https://discord.com/api"

DISCORD_BOT_TOKEN = os.environ["DISCORD_BOT_TOKEN"]

async def get_discord_channel(
    guild_id:int,
    get_channel_type:List[int]
) -> List[Dict[str,Any]]:
    """
    Discordサーバ内のチャンネルを取得します。

    param:
    guild_id            :int
    Discordのサーバid

    get_channel_type    :List[int]
    取得するチャンネルtype
    空の場合、すべて取得する
    以下に説明を記す


    0:テキストチャンネル	
        Discordサーバのテキストチャンネル
    1:ダイレクトメッセージ	
        一ユーザへのダイレクトメッセージ
    2:ボイスチャンネル	
        Discordサーバのボイスチャンネル
    3:グループダイレクトメッセージ	
        複数のユーザから構成されるダイレクトメッセージ
    4:カテゴリーチャンネル	
        テキストチャンネルやボイスチャンネルをまとめる親チャンネル
    5:ギルドアナウンスチャンネル(旧ニュースチャンネル)	
        お気に入りにしたサーバの通知や、Discord公式の通知を受け取るチャンネル
    10:アナウンスチャンネル	
        5で作成されるスレッドチャンネル
    11:公開スレッドチャンネル	
        0で作成されるスレッドチャンネル
        公開に設定されている場合はこちら
    12:非公開スレッドチャンネル	
        0で作成されるスレッドチャンネル
        非公開に設定されている場合はこちら
    13:ステージチャンネル	
        ラジオのような聞き専のチャンネル
    14:ギルドディレクトリチャンネル	
        サーバ紹介をするチャンネル
        大規模なコミュニティサーバでのみ使用可能
        以下のリンクに使用例あり
        https://support.discord.com/hc/ja/articles/4406046651927-DIscord%E5%AD%A6%E7%94%9F%E3%83%8F%E3%83%96FAQ#:~:text=Discord%E5%AD%A6%E7%94%9F%E3%83%8F%E3%83%96%E3%81%AF%E3%80%81%E5%AD%A6%E7%94%9F,%E3%81%99%E3%82%8B%E3%81%93%E3%81%A8%E3%81%8C%E3%81%A7%E3%81%8D%E3%81%BE%E3%81%99%E3%80%82
    15:フォーラムチャンネル	
        特定の話題について議論するチャンネル
    """
    # サーバのチャンネル一覧を取得
    all_channel = await aio_get_request(
        url = DISCORD_BASE_URL + f'/guilds/{guild_id}/channels',
        headers = {
            'Authorization': f'Bot {DISCORD_BOT_TOKEN}'
        }
    )

    # 空の場合、すべてのチャンネルを格納
    if len(get_channel_type) == 0:
        all_channel_filter = [
            channel
            for channel in all_channel
        ]
    else:
        # 該当するチャンネルだけ格納
        all_channel_filter = [
            channel
            for channel in all_channel
            if channel['type'] in get_channel_type
        ]

    return all_channel_filter

cogs/vc_count.py

ボイスチャンネルの入退出を通知する。

vc_count.py
import discord
from discord.ext import commands

from cogs.bin import activity
from core.start import DBot

from dotenv import load_dotenv
load_dotenv()

from base.aio_req import pickle_read

from typing import List,Dict

# ボイスチャンネルの入退室を通知
class vc_count(commands.Cog):
    def __init__(self, bot : DBot):
        self.bot = bot      

    @commands.Cog.listener(name='on_voice_state_update')
    async def voice_update(
        self, 
        member:discord.Member, 
        before:discord.VoiceState, 
        after:discord.VoiceState
    ):
        # 参加するボイスチャンネルのid
        if hasattr(after.channel,'id'):
            vc_channel_id = after.channel.id
        else:
            # 退出した場合のボイスチャンネルのid
            vc_channel_id = before.channel.id

        # 使用するデータベースのテーブル名
        TABLE = f'guilds_vc_signal_{member.guild.id}'

        # 読み取り
        vc_fetch:List[Dict] = await pickle_read(filename=TABLE)

        key_vc = [
            g 
            for g in vc_fetch 
            if int(g.get('vc_id')) == vc_channel_id
        ]

        # 通知が拒否されていた場合、終了
        if hasattr(before.channel,'id'):
            if int(key_vc[0].get('vc_id')) == before.channel.id:
                if bool(key_vc[0].get('send_signal')) == False:
                    return

        # 通知が拒否されていた場合、終了
        if hasattr(after.channel,'id'):
            if int(key_vc[0].get('vc_id')) == after.channel.id:
                if bool(key_vc[0].get('send_signal')) == False:
                    return

        # Botの場合終了
        if (bool(key_vc[0].get('join_bot')) == False and
            member.bot == True):
            return
        
        # Discordのシステムチャンネル(welcomeメッセージが送られる場所)を取得
        send_channel_id = int(key_vc[0].get('send_channel_id'))

        # ない場合システムチャンネルのidを代入
        if send_channel_id == None or send_channel_id == 0:
            if hasattr(member.guild.system_channel,'id'):
                send_channel_id = member.guild.system_channel.id
            else:
                return

        client = self.bot.get_channel(send_channel_id)

        # メンションするロールの取り出し
        mentions = [
            f"<@&{int(role_id)}> " 
            for role_id in key_vc[0].get('mention_role_id')
        ]

        # 全体メンションが有効の場合@everyoneを追加
        if (bool(key_vc[0].get('everyone_mention')) == True):
            mentions.insert(0,"@everyone")
        
        # listをstrに変換
        mention_str = " ".join(mentions)

        # ボイスチャンネルを移動したかどうか
        check = before.channel and after.channel and before.channel != after.channel

        # 退出した場合
        if (after.channel is None or check):
            # ボイスチャンネルの残り人数を取得
            await client.send(f"現在{len(before.channel.members)}人 <@{member.id}>が{before.channel.name}から退出しました。")

            # Botがボイスチャンネルに接続していて、それ以外の全員が退出した場合
            if hasattr(before.channel.guild.voice_client,'is_connected'):
                for bot_flag in before.channel.members:
                    # Bot以外のユーザーがいる場合、終了
                    if bot_flag.bot == False:
                        return
                # 退出
                await before.channel.guild.voice_client.disconnect()
                await client.send(f"{mention_str} 通話が終了しました。",embed=discord.Embed(title="通話終了",description=""))
                return

            # 全員が退出した場合
            if len(before.channel.members) == 0:
                isum = 0
                # サーバー全体のボイスチャンネル接続ユーザーを取得
                for channels in member.guild.voice_channels:
                    isum += len(channels.voice_states.keys())
                # 全てのチャンネルで通話しているユーザーがいない場合
                if isum == 0:
                    await client.send(f"{mention_str} 通話が終了しました。",embed=discord.Embed(title="通話終了",description=""))

        # 入室の場合
        if (before.channel is None or check):
            # ボイスチャンネルの残り人数を取得
            
            embed = None
            # 一人目の入室(通話開始)の場合、サーバーアイコンの埋め込みを作成
            if len(after.channel.members) == 1:
                embed = await activity.callemb(after,member)
            await client.send(f"現在{len(after.channel.members)}{mention_str} <@{member.id}>が{after.channel.name} に参加しました。",embed=embed)

        # カメラ配信が始まった場合
        if before.self_video is False and after.self_video is True:
            await client.send(
                f"{mention_str} <@{member.id}> が、{after.channel.name}でカメラ配信を始めました。",
                embed = await activity.stream(after,member,title="カメラ配信")
            )
        elif after.self_video is False and before.self_video is True:
            await client.send(f"<@{member.id}> がカメラ配信を終了しました。")
        # 画面共有が始まった場合
        elif before.self_stream is False and after.self_stream is True:
            # プレイ中のゲームがある場合、ゲーム名を含める
            mst,embed = await activity.activity(after,member,mention_str)
            await client.send(mst,embed=embed)
        elif after.self_stream is False and before.self_stream is True:
            await client.send(f"<@{member.id}> が画面共有を終了しました。")

def setup(bot:DBot):
    return bot.add_cog(vc_count(bot))

cogs/bin/activity.py

画面共有や埋め込みなどを処理する関数。

activity.py
import discord
from discord import Embed
from typing import Tuple

async def activity(
    after:discord.VoiceState,
    member:discord.Member,
    mention_str:str
) -> Tuple[str,Embed]:
    try:
        embed = discord.Embed(
            title='配信タイトル', 
            description=f'{member.activities[0].name}'
        )

        embed.set_author(
            name=member.name,  # ユーザー名
            icon_url=member.display_avatar.url  # アイコンを設定
        )

        # チャンネル名フィールド
        embed.add_field(name="チャンネル", value=after.channel.name)

        for activitie in member.activities:
            print(activitie.__str__())
        
        detail = None
        state = None

        # ステータス名がない場合は記述なし
        if hasattr(member.activities[0],'details'):
            detail = member.activities[0].details
        if hasattr(member.activities[0],'state'):
            state = member.activities[0].state

        # ステータスフィールド
        embed.add_field(name = detail,value = state)

        # ゲーム画像がある場合代入
        if hasattr(member.activities[0],'large_image_url'):
            if member.activities[0].large_image_url != None:
                embed.set_image(url=member.activities[0].large_image_url)
        
        return f"{mention_str} <@{member.id}> が、{after.channel.name}で「{member.activities[0].name}」の配信を始めました。",embed
    # 存在しない場合
    except IndexError:
        return f" {mention_str} <@{member.id}> が、{after.channel.name}で画面共有を始めました。",await stream(after,member,title="画面共有")

# 通話開始時の埋め込み作成
async def callemb(after:discord.VoiceState,member:discord.Member) -> Embed:
    embed=discord.Embed(
        title="通話開始",
        description=f"{member.guild.name}\n<#{after.channel.id}>"
    )
    # サーバーのアイコンが設定されているかどうか
    if hasattr(member.guild.icon,'url'):
        embed.set_image(url=member.guild.icon.url)
    # ない場合はユーザーのアイコンを設定
    elif hasattr(member.display_avatar,'url'):
        embed.set_image(url=member.display_avatar.url)
        
    embed.set_author(
        name=member.name,  # ユーザー名
        icon_url=member.display_avatar.url  # アイコンを設定
    )
    return embed

# 配信開始時の埋め込み作成
async def stream(after:discord.VoiceState,member:discord.Member,title:str) -> Embed:
    embed=discord.Embed(
        title=title,
        description=f"{member.guild.name}\n<#{after.channel.id}>"
    )
    embed.set_author(
        name=member.name,  # ユーザー名
        icon_url=member.display_avatar.url  # アイコンを設定
    )
    return embed

routers/index.py

index.py
from fastapi import APIRouter,Request,Header
from starlette.requests import Request
from fastapi.templating import Jinja2Templates

import os

from base.aio_req import (
    aio_get_request,
    oauth_check
)
from routers.session_base.user_session import DiscordOAuthData,DiscordUser

# new テンプレート関連の設定 (jinja2)
templates = Jinja2Templates(directory="templates")

router = APIRouter()

DISCORD_BASE_URL = "https://discord.com/api"

DISCORD_BOT_TOKEN = os.environ["DISCORD_BOT_TOKEN"]

@router.get("/")
async def index(request: Request):
    # Discordの認証情報が有効かどうか判断
    if request.session.get('discord_oauth_data'):
        oauth_session = DiscordOAuthData(**request.session.get('discord_oauth_data'))
        user_session = DiscordUser(**request.session.get('discord_user'))
        print(f"アクセスしたユーザー:{user_session.username}")
        # トークンの有効期限が切れていた場合、認証情報を削除
        if not await oauth_check(access_token=oauth_session.access_token):
            request.session.pop('discord_oauth_data')
    
    bot_data:dict = await aio_get_request(
        url = f'{DISCORD_BASE_URL}/users/@me', 
        headers = { 
            'Authorization': f'Bot {DISCORD_BOT_TOKEN}' 
        }
    )

    return templates.TemplateResponse(
        'index.html',
        {
            'request': request,
            'bot_data':bot_data,
            'title':'トップページ'
        }
    )

routers/login.py

login.py
from fastapi import APIRouter,Request,Header
from fastapi.responses import RedirectResponse,HTMLResponse
from starlette.requests import Request
from fastapi.templating import Jinja2Templates

from dotenv import load_dotenv
load_dotenv()

from base.aio_req import (
    aio_get_request,
    pickle_read
)


import os
import secrets
from typing import Dict,List

router = APIRouter()

# new テンプレート関連の設定 (jinja2)
templates = Jinja2Templates(directory="templates")

DISCORD_REDIRECT_URL = f"https://discord.com/api/oauth2/authorize?response_type=code&client_id={os.environ.get('DISCORD_CLIENT_ID')}&scope={os.environ.get('DISCORD_SCOPE')}&redirect_uri={os.environ.get('DISCORD_CALLBACK_URL')}&prompt=consent"
DISCORD_BASE_URL = "https://discord.com/api"

DISCORD_BOT_TOKEN = os.environ["DISCORD_BOT_TOKEN"]

@router.get("/discord-login")
async def discord_login(request: Request):
    # ランダムなstate値の生成
    state = secrets.token_urlsafe(16)
    request.session['state'] = state
    try:
        oauth_data:dict = await aio_get_request(
            url = DISCORD_BASE_URL + '/users/@me', 
            headers = { 
                'Authorization': f'Bearer {request.session["discord_oauth_data"]["access_token"]}' 
            }
        )
        if oauth_data.get('message') == '401: Unauthorized':
            return RedirectResponse(url=f"{DISCORD_REDIRECT_URL}&state={state}",status_code=302)
    except KeyError:
        return RedirectResponse(url=f"{DISCORD_REDIRECT_URL}&state={state}",status_code=302)
    return templates.TemplateResponse(
        'register.html',
        {
            'request': request,
        }
    )

routers/callback.py

Discordログインによる認証後の処理。

callback.py
from fastapi import APIRouter, HTTPException
from fastapi.responses import RedirectResponse
from starlette.requests import Request
from fastapi.templating import Jinja2Templates

from dotenv import load_dotenv
load_dotenv()


import os
from typing import Dict,List

from base.aio_req import (
    aio_get_request,
    aio_post_request,
    pickle_read
)

DISCORD_BASE_URL = "https://discord.com/api"
DISCORD_BOT_TOKEN = os.environ["DISCORD_BOT_TOKEN"]


router = APIRouter()

# new テンプレート関連の設定 (jinja2)
templates = Jinja2Templates(directory="templates")

@router.get('/discord-callback/')
async def discord_callback(
    code:str,
    state: str,
    request:Request
):
    # セッションの初期化
    if request.session.get('discord_user') != None:
        request.session.pop("discord_user")
    if request.session.get('discord_connection') != None:
        request.session.pop("discord_connection")
    if request.session.get("discord_oauth_data") != None:
        request.session.pop("discord_oauth_data")

    # stateが一緒しない場合、400で終了
    if request.session.get("state") != state:
        raise HTTPException(status_code=400, detail="認証失敗")
    # stateが一致した場合、削除して続行
    else:
        request.session.pop("state")
        
    authorization_code = code

    request_postdata = {
        'client_id': os.environ.get('DISCORD_CLIENT_ID'), 
        'client_secret': os.environ.get('DISCORD_CLIENT_SECRET'), 
        'grant_type': 'authorization_code', 
        'code': authorization_code, 
        'redirect_uri': os.environ.get('DISCORD_CALLBACK_URL')
    }

    responce_json = await aio_post_request(
        url = DISCORD_BASE_URL + '/oauth2/token',
        data = request_postdata,
        headers = {
            'Content-Type': 'application/x-www-form-urlencoded'
        }
    )

    request.session["discord_oauth_data"] = responce_json

    request.session["discord_user"] = await aio_get_request(
        url = DISCORD_BASE_URL + '/users/@me', 
        headers = { 
            'Authorization': f'Bearer {responce_json["access_token"]}' 
        }
    )

    # ホームページにリダイレクトする
    return RedirectResponse(url="/guilds")

routers/guilds.py

設定できるサーバー一覧を表示する。

guilds.py
from fastapi import APIRouter
from fastapi.responses import RedirectResponse
from starlette.requests import Request
from fastapi.templating import Jinja2Templates

from dotenv import load_dotenv
load_dotenv()

import os

from base.aio_req import (
    aio_get_request,
    search_guild,
    oauth_check
)
from routers.session_base.user_session import DiscordOAuthData,DiscordUser

DISCORD_BASE_URL = "https://discord.com/api"
DISCORD_REDIRECT_URL = f"https://discord.com/api/oauth2/authorize?response_type=code&client_id={os.environ.get('DISCORD_CLIENT_ID')}&scope={os.environ.get('DISCORD_SCOPE')}&redirect_uri={os.environ.get('DISCORD_CALLBACK_URL')}&prompt=consent"


DISCORD_BOT_TOKEN = os.environ["DISCORD_BOT_TOKEN"]

router = APIRouter()

# new テンプレート関連の設定 (jinja2)
templates = Jinja2Templates(directory="templates")

@router.get('/guilds')
async def guilds(request:Request):
    # OAuth2トークンが有効かどうか判断
    if request.session.get('discord_oauth_data'):
        oauth_session = DiscordOAuthData(**request.session.get('discord_oauth_data'))
        user_session = DiscordUser(**request.session.get('discord_user'))
        print(f"アクセスしたユーザー:{user_session.username}")
        # トークンの有効期限が切れていた場合、再ログインする
        if not await oauth_check(access_token=oauth_session.access_token):
            return RedirectResponse(url=DISCORD_REDIRECT_URL,status_code=302)
    else:
        return RedirectResponse(url=DISCORD_REDIRECT_URL,status_code=302)
    # Botが所属しているサーバを取得
    bot_in_guild_get = await aio_get_request(
        url = DISCORD_BASE_URL + '/users/@me/guilds',
        headers = {
            'Authorization': f'Bot {DISCORD_BOT_TOKEN}'
        }
    )

    # ログインユーザが所属しているサーバを取得
    user_in_guild_get = await aio_get_request(
        url = DISCORD_BASE_URL + '/users/@me/guilds',
        headers = {
            'Authorization': f'Bearer {oauth_session.access_token}'
        }
    )

    # ログインユーザとBotが同じ所属を見つける
    match_guild = await search_guild(
        bot_in_guild_get = bot_in_guild_get,
        user_in_guild_get = user_in_guild_get
    )

    return templates.TemplateResponse(
        "guilds.html",
        {
            "request": request, 
            "match_guild":match_guild,
            "title":f"{user_session.username}のサーバ一覧"
        }
    )

routers/logout.py

logout.py
from fastapi import APIRouter,Request
from fastapi.responses import RedirectResponse
from starlette.requests import Request
from fastapi.templating import Jinja2Templates

from dotenv import load_dotenv
load_dotenv()

import os

router = APIRouter()

# new テンプレート関連の設定 (jinja2)
templates = Jinja2Templates(directory="templates")

DISCORD_REDIRECT_URL = f"https://discord.com/api/oauth2/authorize?response_type=code&client_id={os.environ.get('DISCORD_CLIENT_ID')}&scope={os.environ.get('DISCORD_SCOPE')}&redirect_uri={os.environ.get('DISCORD_CALLBACK_URL')}&prompt=consent"

DISCORD_BASE_URL = "https://discord.com/api"

@router.get("/discord-logout")
async def discord_logout(request: Request):
    # セッションの初期化
    if request.session.get('discord_user') != None:
        request.session.pop("discord_user")
    if request.session.get('discord_connection') != None:
        request.session.pop("discord_connection")
    if request.session.get("discord_oauth_data") != None:
        request.session.pop("discord_oauth_data")

    # ホームページにリダイレクトする
    return RedirectResponse(url="/")

routers/guild/guild.py

guild.py
from fastapi import APIRouter
from fastapi.responses import RedirectResponse
from starlette.requests import Request
from fastapi.templating import Jinja2Templates

from dotenv import load_dotenv
load_dotenv()

import os

from base.aio_req import (
    aio_get_request,
    oauth_check,
    return_permission,
    pickle_read
)
from routers.session_base.user_session import DiscordOAuthData,DiscordUser

DISCORD_BASE_URL = "https://discord.com/api"
DISCORD_REDIRECT_URL = f"https://discord.com/api/oauth2/authorize?response_type=code&client_id={os.environ.get('DISCORD_CLIENT_ID')}&scope={os.environ.get('DISCORD_SCOPE')}&redirect_uri={os.environ.get('DISCORD_CALLBACK_URL')}&prompt=consent"

DISCORD_BOT_TOKEN = os.environ["DISCORD_BOT_TOKEN"]

router = APIRouter()

# new テンプレート関連の設定 (jinja2)
templates = Jinja2Templates(directory="templates")

@router.get('/guild/{guild_id}')
async def guild(
    request:Request,
    guild_id:int
):
    # OAuth2トークンが有効かどうか判断
    if request.session.get('discord_oauth_data'):
        oauth_session = DiscordOAuthData(**request.session.get('discord_oauth_data'))
        user_session = DiscordUser(**request.session.get('discord_user'))
        # トークンの有効期限が切れていた場合、再ログインする
        if not await oauth_check(access_token=oauth_session.access_token):
            return RedirectResponse(url=DISCORD_REDIRECT_URL,status_code=302)
    else:
        return RedirectResponse(url=DISCORD_REDIRECT_URL,status_code=302)
    
    # サーバの情報を取得
    guild = await aio_get_request(
        url = DISCORD_BASE_URL + f'/guilds/{guild_id}',
        headers = {
            'Authorization': f'Bot {DISCORD_BOT_TOKEN}'
        }
    )

    # サーバの権限を取得
    permission = await return_permission(
        guild_id=guild_id,
        user_id=user_session.id,
        access_token=oauth_session.access_token
    )

    try:
        tasks = await pickle_read(
            filename=f"task_{guild_id}"
        )
    except FileNotFoundError:
        tasks = list()

    return templates.TemplateResponse(
        "guild/guild.html",
        {
            "request": request, 
            "guild": guild,
            "guild_id": guild_id,
            "tasks":tasks,
            "permission":vars(permission),
            "title":guild['name'] + "の設定項目一覧"
        }
    )

routers/guild/vc_signal/vc_signal.py

ボイスチャンネルの入退出設定の画面。

vc_signal.py
from fastapi import APIRouter
from fastapi.responses import RedirectResponse
from starlette.requests import Request
from fastapi.templating import Jinja2Templates

from dotenv import load_dotenv
load_dotenv()

import os
from typing import List
from itertools import groupby,chain

from typing import List,Dict,Any,Tuple

from base.database import PostgresDB
from base.aio_req import (
    aio_get_request,
    pickle_read,
    return_permission,
    oauth_check
)
from routers.session_base.user_session import DiscordOAuthData,DiscordUser

DISCORD_BASE_URL = "https://discord.com/api"
DISCORD_REDIRECT_URL = f"https://discord.com/api/oauth2/authorize?response_type=code&client_id={os.environ.get('DISCORD_CLIENT_ID')}&scope={os.environ.get('DISCORD_SCOPE')}&redirect_uri={os.environ.get('DISCORD_CALLBACK_URL')}&prompt=consent"



DISCORD_BOT_TOKEN = os.environ["DISCORD_BOT_TOKEN"]

USER = os.getenv('PGUSER')
PASSWORD = os.getenv('PGPASSWORD')
DATABASE = os.getenv('PGDATABASE')
HOST = os.getenv('PGHOST')
db = PostgresDB(
    user=USER,
    password=PASSWORD,
    database=DATABASE,
    host=HOST
)

router = APIRouter()

# new テンプレート関連の設定 (jinja2)
templates = Jinja2Templates(directory="templates")

@router.get('/guild/{guild_id}/vc-signal')
async def line_post(
    request:Request,
    guild_id:int
):
    # OAuth2トークンが有効かどうか判断
    if request.session.get('discord_oauth_data'):
        oauth_session = DiscordOAuthData(**request.session.get('discord_oauth_data'))
        user_session = DiscordUser(**request.session.get('discord_user'))
        # トークンの有効期限が切れていた場合、再ログインする
        if not await oauth_check(access_token=oauth_session.access_token):
            return RedirectResponse(url=DISCORD_REDIRECT_URL,status_code=302)
    else:
        return RedirectResponse(url=DISCORD_REDIRECT_URL,status_code=302)
    # 使用するデータベースのテーブル名
    TABLE = f'guilds_vc_signal_{guild_id}'

    # サーバのチャンネル一覧を取得
    all_channel = await aio_get_request(
        url = DISCORD_BASE_URL + f'/guilds/{guild_id}/channels',
        headers = {
            'Authorization': f'Bot {DISCORD_BOT_TOKEN}'
        }
    )

    # チャンネルのソート
    all_channel_sort,all_channels,vc_channels = await sort_discord_channel(all_channel=all_channel)
    
    vc_cate_sort = [
        tmp 
        for tmp in all_channel_sort
        if tmp['type'] == 2 or tmp['type'] == 4
    ]

    # text_channel = list(chain.from_iterable(all_channels))
    text_channel_sort = [
        tmp 
        for tmp in all_channel_sort
        if tmp['type'] == 0
    ]


    # サーバの情報を取得
    guild = await aio_get_request(
        url = DISCORD_BASE_URL + f'/guilds/{guild_id}',
        headers = {
            'Authorization': f'Bot {DISCORD_BOT_TOKEN}'
        }
    )

    # ログインユーザの情報を取得
    guild_user = await aio_get_request(
        url = DISCORD_BASE_URL + f'/guilds/{guild_id}/members/{user_session.id}',
        headers = {
            'Authorization': f'Bot {DISCORD_BOT_TOKEN}'
        }
    )
    role_list = [g for g in guild_user["roles"]]


    # サーバの権限を取得
    guild_user_permission = await return_permission(
        guild_id=guild_id,
        user_id=user_session.id,
        access_token=oauth_session.access_token
    )

    # パーミッションの番号を取得
    permission_code = await guild_user_permission.get_permission_code()

    # キャッシュ読み取り
    guild_table_fetch:List[Dict[str,Any]] = await pickle_read(filename='guild_set_permissions')
    guild_table = [
        g 
        for g in guild_table_fetch 
        if int(g.get('guild_id')) == guild_id
    ]
    guild_permission_code = 8
    guild_permission_user = list()
    guild_permission_role = list()
    if len(guild_table) > 0:
        guild_permission_code = int(guild_table[0].get('vc_permission'))
        guild_permission_user = [
            user 
            for user in guild_table[0].get('vc_user_id_permission')
        ]
        guild_permission_role = [
            role
            for role in guild_table[0].get('vc_role_id_permission')
        ]

    and_code = guild_permission_code & permission_code
    admin_code = 8 & permission_code

    user_permission:str = 'normal'

    # 許可されている場合、管理者の場合
    if (and_code == permission_code or 
        admin_code == 8 or
        user_session.id in guild_permission_user or
        len(set(guild_permission_role) & set(role_list)) > 0
        ):
        user_permission = 'admin'

    # キャッシュ読み取り
    table_fetch:List[Dict[str,Any]] = await pickle_read(filename=TABLE)

    # データベースへ接続
    await db.connect()

    vc_set = []

    # ボイスチャンネルのみを代入
    app_vc = [int(x['id']) for x in vc_cate_sort if x['type'] == 2]

    # テータベース側のボイスチャンネルを代入
    db_vc = [int(x['vc_id']) for x in table_fetch]
    if set(app_vc) != set(db_vc):
        # データベース側で欠けているチャンネルを取得
        missing_items = [
            item 
            for item in table_fetch 
            if item not in vc_cate_sort
        ]

        # 新しくボイスチャンネルが作成されていた場合
        if len(missing_items) > 0:
            for vc in missing_items:
                if vc['type'] == 2:
                    row_values = {
                        'vc_id': vc['id'], 
                        'guild_id': guild_id, 
                        'send_signal': True,
                        'send_channel_id': guild.get('system_channel_id'), 
                        'join_bot': False,
                        'everyone_mention': True,
                        'mention_role_id':[]
                    }

                    # サーバー用に新たにカラムを作成
                    await db.insert_row(
                        table_name=TABLE,
                        row_values=row_values
                    )
                    vc_set.append(row_values)
        # ボイスチャンネルがいくつか削除されていた場合
        else:
            # 削除されたチャンネルを取得
            missing_items = [
                item 
                for item in all_channels 
                if item not in table_fetch
            ]
            
            # 削除されたチャンネルをテーブルから削除
            for vc in missing_items:
                await db.delete_row(
                    table_name=TABLE,
                    where_clause={
                        'vc_id':vc['vc_id']
                    }
                )

            # 削除後のチャンネルを除き、残りのチャンネルを取得
            vc_set = [
                d for d in table_fetch 
                if not (d.get('vc_id') in [
                    e.get('vc_id') for e in missing_items
                ] )
            ]

    else:
        vc_set = table_fetch

        # データベースの状況を取得
        db_check_fetch = await db.select_rows(
            table_name=TABLE,
            columns=[],
            where_clause={}
        )
        # データベースに登録されたが、削除されずに残っているチャンネルを削除
        check = [int(c['vc_id']) for c in db_check_fetch]
        del_check = set(check) - set(app_vc)

        for chan_id in list(del_check):
            await db.delete_row(
                table_name=TABLE,
                where_clause={
                    'channel_id':chan_id
                }
            )

    await db.disconnect()

    return templates.TemplateResponse(
        "guild/vc_signal/vc_signal.html",
        {
            "request": request, 
            "vc_cate_channel": vc_cate_sort,
            "text_channel": text_channel_sort,
            "guild": guild,
            "guild_id": guild_id,
            'vc_set' : vc_set,
            "user_permission":user_permission,
            "title": "ボイスチャンネルの送信設定/" + guild['name']
        }
    )


async def sort_discord_channel(
    all_channel:List
) -> Tuple[List,List,List]:
    # 親カテゴリー格納用
    position = []
    # ソート後のチャンネル一覧
    all_channel_sort = []

    # レスポンスのJSONからpositionでソートされたリストを作成
    sorted_channels = sorted(all_channel, key=lambda c: c['position'])

    # parent_idごとにチャンネルをまとめた辞書を作成
    channel_dict = {}

    for parent_id, group in groupby(
        sorted_channels, 
        key=lambda c: c['parent_id']
    ):
        if parent_id is None:
            # 親カテゴリーのないチャンネルは、キーがNoneの辞書に追加される
            parent_id = 'None'
    
        # キーがまだない場合、作成(同時に値も代入)
        if channel_dict.get(str(parent_id)) == None:
            channel_dict[str(parent_id)] = list(group)
        # キーがある場合、リストを取り出して結合し代入
        else:
            listtmp:List = channel_dict[str(parent_id)]
            listtmp.extend(list(group))

            # リスト内包記法でボイスチャンネルとカテゴリー以外は除外
            listtmp = [
                tmp 
                for tmp in listtmp 
                #if tmp['type'] == 2 or tmp['type'] == 4
            ]
            channel_dict[str(parent_id)] = listtmp
            # リストを空にする
            listtmp = list()

    # 親カテゴリーがある場合、Noneから取り出す
    for chan in channel_dict['None'][:]:
        if chan['type'] == 4:
            position.append(chan)
            channel_dict['None'].remove(chan)

    # 辞書を表示
    position_index = 0

    # 親カテゴリーの名前をリスト化
    extracted_list = [d["name"] for d in position]
    # カテゴリーに属しないチャンネルが存在する場合
    if len(channel_dict['None']) != 0:
        # 配列の長さをカテゴリー数+1にする
        all_channels = [{}] * (len(extracted_list) + 1)
        vc_channels = [{}] * (len(extracted_list) + 1)
    else:
        all_channels = [{}] * len(extracted_list)
        vc_channels = [{}] * len(extracted_list)

    for parent_id, channel in channel_dict.items():
        # カテゴリー内にチャンネルがある場合
        if len(channel) != 0:
            for d in position:
                # カテゴリーを探索、あった場合positionを代入
                if d['id'] == channel[0]['parent_id']:
                    position_index = d['position']
                    break
        else:
            position_index = len(extracted_list)
    
        if len(channel) != 0:
            # 指定したリストの中身が空でない場合、空のリストを探す
            while len(all_channels[position_index]) != 0:
                if len(extracted_list) == position_index:
                    position_index -= 1
                else:
                    position_index += 1

            # 指定した位置にカテゴリー内のチャンネルを代入
            all_channels[position_index] = channel
            vc_channels[position_index] = channel

            # 先頭がカテゴリーでない場合
            if channel[0]['parent_id'] != None:
                # 先頭にカテゴリーチャンネルを代入
                all_channels[position_index].insert(0,d)
    
    # list(list),[[],[]]を一つのリストにする
    all_channel_sort = list(chain.from_iterable(all_channels))

    return all_channel_sort,all_channels,vc_channels

routers/guild/admin/admin.py

管理者画面。

admin.py
from fastapi import APIRouter
from fastapi.responses import RedirectResponse,HTMLResponse
from starlette.requests import Request
from fastapi.templating import Jinja2Templates

from dotenv import load_dotenv
load_dotenv()

import os
from base.aio_req import (
    aio_get_request,
    pickle_read,
    return_permission,
    oauth_check
)
from typing import List,Dict,Any,Tuple
from routers.session_base.user_session import DiscordOAuthData,DiscordUser

DISCORD_BASE_URL = "https://discord.com/api"
DISCORD_REDIRECT_URL = f"https://discord.com/api/oauth2/authorize?response_type=code&client_id={os.environ.get('DISCORD_CLIENT_ID')}&scope={os.environ.get('DISCORD_SCOPE')}&redirect_uri={os.environ.get('DISCORD_CALLBACK_URL')}&prompt=consent"

DISCORD_BOT_TOKEN = os.environ["DISCORD_BOT_TOKEN"]

router = APIRouter()

# new テンプレート関連の設定 (jinja2)
templates = Jinja2Templates(directory="templates")

@router.get('/guild/{guild_id}/admin')
async def admin(
    request:Request,
    guild_id:int
):
    # OAuth2トークンが有効かどうか判断
    if request.session.get('discord_oauth_data'):
        oauth_session = DiscordOAuthData(**request.session.get('discord_oauth_data'))
        user_session = DiscordUser(**request.session.get('discord_user'))
        # トークンの有効期限が切れていた場合、再ログインする
        if not await oauth_check(access_token=oauth_session.access_token):
            return RedirectResponse(url=DISCORD_REDIRECT_URL,status_code=302)
    else:
        return RedirectResponse(url=DISCORD_REDIRECT_URL,status_code=302)
    
    TABLE_NAME = 'guild_set_permissions'

    # 取得上限を定める
    limit = os.environ.get('USER_LIMIT',default=100)

    # サーバの情報を取得
    guild = await aio_get_request(
        url = DISCORD_BASE_URL + f'/guilds/{guild_id}',
        headers = {
            'Authorization': f'Bot {DISCORD_BOT_TOKEN}'
        }
    )

    # サーバのメンバー一覧を取得
    guild_members = await aio_get_request(
        url = DISCORD_BASE_URL + f'/guilds/{guild_id}/members?limit={limit}',
        headers = {
            'Authorization': f'Bot {DISCORD_BOT_TOKEN}'
        }
    )

    # サーバの権限を取得
    guild_user_permission = await return_permission(
        guild_id=guild_id,
        user_id=user_session.id,
        access_token=oauth_session.access_token
    )

    # キャッシュ読み取り
    guild_table_fetch:List[Dict[str,Any]] = await pickle_read(filename=TABLE_NAME)
    guild_table = [
        g 
        for g in guild_table_fetch 
        if int(g.get('guild_id')) == guild_id
    ]
    
    user_permission:str = 'normal'

    # 管理者の場合
    if (guild_user_permission.administrator):
        user_permission = 'admin'

    # 管理者ではない場合、該当するサーバーidがない場合、終了
    if user_permission != 'admin' or len(guild_table) == 0:
        return HTMLResponse("404")
    
    return templates.TemplateResponse(
        "guild/admin/admin.html",
        {
            "request": request, 
            "guild": guild,
            "guild_members":guild_members,
            "guild_id": guild_id,
            "guild_table":guild_table[0],
            "title":request.session["discord_user"]['username']
        }
    )

routers/api/vc_signal_success.py

vc_signal_success.py
from fastapi import APIRouter
from fastapi.responses import RedirectResponse,JSONResponse
from starlette.requests import Request
from fastapi.templating import Jinja2Templates

from dotenv import load_dotenv
load_dotenv()

import os

from base.database import PostgresDB
from base.aio_req import pickle_write
from routers.api.check.post_user_check import user_checker
from routers.session_base.user_session import DiscordOAuthData,DiscordUser

from core.pickes_save.vc_columns import VC_COLUMNS

DISCORD_REDIRECT_URL = f"https://discord.com/api/oauth2/authorize?response_type=code&client_id={os.environ.get('DISCORD_CLIENT_ID')}&scope={os.environ.get('DISCORD_SCOPE')}&redirect_uri={os.environ.get('DISCORD_CALLBACK_URL')}&prompt=consent"


DISCORD_BASE_URL = "https://discord.com/api"

DISCORD_BOT_TOKEN = os.environ["DISCORD_BOT_TOKEN"]

USER = os.getenv('PGUSER')
PASSWORD = os.getenv('PGPASSWORD')
DATABASE = os.getenv('PGDATABASE')
HOST = os.getenv('PGHOST')
db = PostgresDB(
    user=USER,
    password=PASSWORD,
    database=DATABASE,
    host=HOST
)

router = APIRouter()

# new テンプレート関連の設定 (jinja2)
templates = Jinja2Templates(directory="templates")


@router.post('/api/vc-signal-success')
async def vc_post(
    request:Request
):
    form = await request.form()

    # OAuth2トークンが有効かどうか判断
    check_code = await user_checker(
        request=request,
        oauth_session=DiscordOAuthData(**request.session.get('discord_oauth_data')),
        user_session=DiscordUser(**request.session.get('discord_user'))
    )
    
    if check_code == 302:
        return RedirectResponse(url=DISCORD_REDIRECT_URL,status_code=302)
    elif check_code == 400:
        return JSONResponse(content={"message": "Fuck You. You are an idiot."})

    # 使用するデータベースのテーブル名
    TABLE = f'guilds_vc_signal_{form.get("guild_id")}'

    # "send_channel_id_"で始まるキーのみを抽出し、数字部分を取得する
    numbers = [
        int(key.replace("send_channel_id_", "")) 
        for key in form.keys() 
        if key.startswith("send_channel_id_")
    ]

    row_list = []

    # チャンネルごとに更新をかける
    for vc_id in numbers:
        row_values = {}
        send_signal = False
        join_bot = False
        everyone_mention = False
        role_key = [
            int(form.get(key))
            for key in form.keys()
            if key.startswith(f"role_{vc_id}_")
        ]

        if form.get(f"send_signal_{vc_id}") != None:
            send_signal = True
        if form.get(f"join_bot_{vc_id}") != None:
            join_bot = True
        if form.get(f"everyone_mention_{vc_id}") != None:
            everyone_mention = True
            
        row_values = {
            'send_signal':send_signal,
            'send_channel_id':form.get(f"send_channel_id_{vc_id}"),
            'join_bot':join_bot,
            'everyone_mention':everyone_mention,
            'mention_role_id':role_key
        }

        where_clause = {
            'vc_id': vc_id
        }

        row_list.append({
            'where_clause':where_clause,
            'row_values':row_values
        })

    #print(row_list)

    await db.connect()

    await db.primary_batch_update_rows(
        table_name=TABLE,
        set_values_and_where_columns=row_list,
        table_colum=VC_COLUMNS
    )

    # 更新後のテーブルを取得
    table_fetch = await db.select_rows(
        table_name=TABLE,
        columns=[],
        where_clause={}
    )

    await db.disconnect()

    #print(table_fetch)

    # pickleファイルに書き込み
    await pickle_write(
        filename=TABLE,
        table_fetch=table_fetch
    )

    return templates.TemplateResponse(
        'api/vcsignalsuccess.html',
        {
            'request': request,
            'guild_id': form['guild_id'],
            'title':'成功'
        }
    )

routers/api/admin_success.py

admin_success.py
from fastapi import APIRouter
from fastapi.responses import RedirectResponse,JSONResponse
from starlette.requests import Request
from fastapi.templating import Jinja2Templates

from dotenv import load_dotenv
load_dotenv()

import os

from base.database import PostgresDB
from base.aio_req import pickle_write
from routers.api.check.post_user_check import user_checker
from routers.session_base.user_session import DiscordOAuthData,DiscordUser


DISCORD_REDIRECT_URL = f"https://discord.com/api/oauth2/authorize?response_type=code&client_id={os.environ.get('DISCORD_CLIENT_ID')}&scope={os.environ.get('DISCORD_SCOPE')}&redirect_uri={os.environ.get('DISCORD_CALLBACK_URL')}&prompt=consent"


DISCORD_BASE_URL = "https://discord.com/api"

DISCORD_BOT_TOKEN = os.environ["DISCORD_BOT_TOKEN"]

USER = os.getenv('PGUSER')
PASSWORD = os.getenv('PGPASSWORD')
DATABASE = os.getenv('PGDATABASE')
HOST = os.getenv('PGHOST')
db = PostgresDB(
    user=USER,
    password=PASSWORD,
    database=DATABASE,
    host=HOST
)

router = APIRouter()

# new テンプレート関連の設定 (jinja2)
templates = Jinja2Templates(directory="templates")


@router.post('/api/admin-success')
async def admin_post(
    request:Request
):
    form = await request.form()

    # OAuth2トークンが有効かどうか判断
    check_code = await user_checker(
        request=request,
        oauth_session=DiscordOAuthData(**request.session.get('discord_oauth_data')),
        user_session=DiscordUser(**request.session.get('discord_user'))
    )
    
    if check_code == 302:
        return RedirectResponse(url=DISCORD_REDIRECT_URL,status_code=302)
    elif check_code == 400:
        return JSONResponse(content={"message": "Fuck You. You are an idiot."})

    TABLE = 'guild_set_permissions'

    # 各権限コード
    vc_permission_code = form.get("vc_permission_code")

    # ユーザidの取り出し
    vc_user_id_permission = [
        int(form.get(key))
        for key in form.keys()
        if key.startswith('member_select_vc_')
    ]

    # ロールidの取り出し
    vc_role_id_permission = [
        int(form.get(key))
        for key in form.keys()
        if key.startswith('role_select_vc_')
    ]
    

    row_value = {
        'vc_permission'                 :vc_permission_code,
        'vc_user_id_permission'         :vc_user_id_permission,
        'vc_role_id_permission'         :vc_role_id_permission
    }

    await db.connect()

    await db.update_row(
        table_name=TABLE,
        row_values=row_value,
        where_clause={
            'guild_id':form.get('guild_id')
        }
    )

    # 更新後のテーブルを取得
    table_fetch = await db.select_rows(
        table_name=TABLE,
        columns=[],
        where_clause={}
    )

    await db.disconnect()

    # pickleファイルに書き込み
    await pickle_write(
        filename=TABLE,
        table_fetch=table_fetch
    )

    return templates.TemplateResponse(
        'api/adminsuccess.html',
        {
            'request': request,
            'guild_id': form['guild_id'],
            'title':'成功'
        }
    )

routers/api/check/post_user_check.py

post_user_check.py
from base.aio_req import (
    aio_get_request,
    return_permission,
    oauth_check
)
from starlette.requests import Request

from dotenv import load_dotenv
load_dotenv()

import os
from typing import Dict,List

from routers.session_base.user_session import DiscordOAuthData,DiscordUser
DISCORD_BOT_TOKEN = os.environ["DISCORD_BOT_TOKEN"]
DISCORD_BASE_URL = "https://discord.com/api"

async def user_checker(
    request:Request,
    oauth_session:DiscordOAuthData,
    user_session:DiscordUser
) -> int:
    """
    postリクエストが正しいものか判別する。

    param:

    """
    form = await request.form()
    # OAuth2トークンが有効かどうか判断
    try:
        await return_permission(
            guild_id=form["guild_id"],
            user_id=user_session.id,
            access_token=oauth_session.access_token
        )

        # トークンの有効期限が切れている場合、ログイン画面に遷移
        if (not await oauth_check(
                access_token=oauth_session.access_token
            )):
            return 302
        
        # サーバのメンバー一覧を取得
        guild_member = await aio_get_request(
            url = DISCORD_BASE_URL + f'/guilds/{form.get("guild_id")}/members/{user_session.id}',
            headers = {
                'Authorization': f'Bot {DISCORD_BOT_TOKEN}'
            }
        )
        
        # ログインユーザがサーバーに所属していない場合(あり得ないリクエスト)
        if guild_member.get('message') != None:
            return 400

    except KeyError:
        return 400
    
    return 200

routers/sesion_base/user_session.py

user_session.py

from pydantic import BaseModel,validator
from typing import List,Optional,Union,Any

class DiscordOAuthData(BaseModel):
    """
    DiscordのOAuth2認証のデータ

    param:
    access_token    :str
        ユーザのアクセストークン
    expires_in      :int
        アクセストークンの有効期限(秒)
    refresh_token   :str
        リフレッシュトークン
    scope           :str
        許可されている権限
    token_type      :str
        トークンのタイプ
        Bearer
    """
    access_token    :str
    expires_in      :int
    refresh_token   :str
    scope           :str
    token_type      :str


class DiscordUser(BaseModel):
    """
    ユーザ情報

    param:
    id                  :int
        ユーザid
    username            :str
        ユーザ名
    global_name         :Union[str,None]

    display_name        :Union[str,None]

    avatar              :str
        アバターid
    avatar_decoration   :Union[str,None]
        アバターの説明
    discriminator       :str
        ユーザ名の識別子
    public_flags        :int

    flags               :int

    banner              :Union[str,None]
        プロフィールのバナー
    banner_color        :Union[str,None]
        プロフィールのバナーのカラーコード
    accent_color        :int

    locale              :str

    mfa_enabled         :bool

    premium_type        :int
        Nitroユーザのランク
    """
    id                  :int
    username            :str
    global_name         :Union[str,None]
    display_name        :Union[str,None]
    avatar              :Optional[str]
    avatar_decoration   :Union[str,None]
    discriminator       :str
    public_flags        :int
    flags               :int
    banner              :Union[str,None]
    banner_color        :Union[str,None]
    accent_color        :Optional[int]
    locale              :str
    mfa_enabled         :bool
    premium_type        :int

class DiscordConnection(BaseModel):
    """
    Discordのアカウント連携情報

    param:
    type                :str
        連携しているサービス名(Twitter,Github)
    id                  :str
        サービスでのユーザid
    name                :str
        サービスでのユーザ名
    visibility          :int
    friend_sync         :bool
    show_activity       :bool
    verified            :bool
    two_way_link        :bool
    metadata_visibility :int
    """
    type                :str
    id                  :str
    name                :str
    visibility          :int
    friend_sync         :bool
    show_activity       :bool
    verified            :bool
    two_way_link        :bool
    metadata_visibility :int


class MatchGuild(BaseModel):
    """
    Botとユーザが所属のサーバーのクラス

    id              :int
    name            :str
    icon            :str
    owner           :bool
    permissions     :int
    features        :List
    permissions_new :int
    """
    id              :int
    name            :str
    icon            :str
    owner           :bool
    permissions     :int
    features        :List[str]
    permissions_new :int

templates/index.html

index.html
<!DOCTYPE html>
<html>
  {% extends "layout.html" %}
  {% block content %}
  <body bgcolor="#252525" text="#ffffff">
    <h1>ようこそ!!!!!!!</h1>
    ここは{{bot_data.username}}の設定とかできるサイトです!!!!!
    <br/>
    {% if request.session.discord_oauth_data %}
      <a href="/guilds" class="btn btn-primary">サーバ一覧へ</a>
    {% endif %}
    
    <br/>
    
  </body>
  {% endblock %}
</html>

templates/layout.html

layout.html

<html lang="ja">
    <head>
        <link rel="stylesheet" href="https://stackpath.bootstrapcdn.com/bootstrap/4.3.1/css/bootstrap.min.css" integrity="sha384-ggOyR0iXCbMQv3Xipma34MD+dH/1fQ784/j6cY/iJTQUOhcWr7x9JvoRxT2MZw1T" crossorigin="anonymous">
        <link rel="icon" href="/static/img/discord-icon.jpg">
        <title>{{title}}</title>
        
        <meta charset="UTF-8">

        <!--レスポンシブ対応-->
        <meta name="viewport" content="width=device-width, initial-scale=1, minimum-scale=1, user-scalable=yes">
    </head>
    <body style="background-color: #36393F; color: #FFF;">
        <!-- display: flex: 要素を並列に 
             flex-direction: column:はみ出たコンテナを横に -->
        <div style="display: flex; flex-direction: column; min-height: 100vim;">
            <!--
                container は各レスポンシブブレークポイントで max-width
                container-fluid の場合、すべてのブレークポイントで width: 100% 
            -->
            <div class="container">
                <div class="container-fluid">
                    <div class="row">
                        <!--
                            mdはPC
                            smはタブレット
                            xsはスマホ
                        -->
                        <div class="col-xs-2 col-sm-10 col-md-10" style="width: 50%;">
                            <a href="/" class="btn btn-primary">top</a>
                        </div>
                        <div class="col-xs-2 col-sm-2 col-md-2" style="width: 50%;">
                            <button type="button" id="popover-btn" class="btn btn-primary">
                                {% if request.session.discord_oauth_data or request.session.line_oauth_data %}
                                Logout
                                {% else %}
                                Login
                                {% endif %}
                            </button>
                        
                            <div id="popover-content" style="display: none;position: absolute;background-color: #fff;border: 1px solid #ccc;padding: 10px; margin: 0 0 0 auto;">
                                
                                {% if request.session.discord_oauth_data %}
                                    <a href="/discord-logout" class="btn btn-primary">Discord Logout</a>
                                {% else %}
                                    <a href="/discord-login" class="btn btn-primary">Discord Login</a>
                                {% endif %}
                                
                            </div>
                        </div>
                    </div>
                </div>
                {% block content %}
                <!-- メインコンテンツ -->
                {% endblock %}
            </div>
        </div>
        <br>
        <hr>
        <footer class="bg-secondary" >
            <div align="center">
                <p>Copyright &copy; maguro Inc. All right reserves.</p>
                <p>Powered by FastAPI 0.88.0 &amp; starlette 0.22.0</p>
            </div>
        </footer>
        <script src="/static/js/popover.js" type="text/javascript"></script>
    </body>
</html>

templates/guilds.html

guilds.html
<!DOCTYPE html>
<html>
  {% extends "layout.html" %}
  {% block content %}
  <body bgcolor="#252525" text="#ffffff">
    <h1>編集、閲覧可能なサーバ一覧</h1>
    <ul>
      {% for guild in match_guild %}
        {% if not guild.icon %}
          <a href="/guild/{{guild.id}}">
            <img src="/static/img/discord-icon.jpg" />
          </a>
        {% else %}
          <a href="/guild/{{guild.id}}">
            <img src="https://cdn.discordapp.com/icons/{{guild.id}}/{{guild.icon}}.png"  />
          </a>
        {% endif %}
        <a href="/guild/{{guild.id}}">
          <li>{{ guild.name }}</li>
        </a>
        <br><br>
      {% endfor %}
    </ul>
  </body>
  {% endblock %}
</html>

templates/guild/guild.html

guild.html
<!DOCTYPE html>
<html>
  {% extends "layout.html" %}
  {% block content %}
  <body bgcolor="#252525" text="#ffffff">
    <br/>
    {% if not guild.icon %}
        <a href="/guild/{{guild.id}}">
            <img src="/static/img/discord-icon.jpg" />
        </a>
    {% else %}
        <a href="/guild/{{guild.id}}">
            <img src="https://cdn.discordapp.com/icons/{{guild.id}}/{{guild.icon}}.png"  />
        </a>
    {% endif %}
    <a href="/guild/{{guild.id}}">
        <li>{{ guild.name }}</li>
    </a>
    <br/>
    {% if permission.administrator %}
        管理者です。<br/>
        <a href="/guild/{{guild_id}}/admin" class="btn btn-primary">管理者設定</a>
    {% endif %}
    <h4>設定する項目一覧</h4>
    
    <a href="/guild/{{guild_id}}/vc-signal" class="btn btn-primary">ボイスチャンネルの通知設定</a>
    <br/>
    
    <br/>
    サーバーウィジェットが有効の場合、サーバーの状況が表示されます。<br/>
    <iframe title="discord_5second" style="height: 350px;" src="https://discord.com/widget?id={{guild_id}}&theme=dark/" sandbox="allow-popups allow-popups-to-escape-sandbox allow-same-origin allow-scripts"></iframe>
  </body>
  {% endblock %}
</html>

templates/guild/vc_signal/vc_signal.html

vc_signal.html
<!DOCTYPE html>
<html>
    <script src="/static/js/box_allcheck.js" type="text/javascript"></script>
    {% extends "layout.html" %}
    {% block content %}
    <body bgcolor="#252525" text="#ffffff">
        {% if not guild.icon %}
            <a href="/guild/{{guild.id}}">
                <img src="/static/img/discord-icon.jpg" />
            </a>
        {% else %}
            <a href="/guild/{{guild.id}}">
                <img src="https://cdn.discordapp.com/icons/{{guild.id}}/{{guild.icon}}.png"  />
            </a>
        {% endif %}
        <a href="/guild/{{guild.id}}">
            <li>{{ guild.name }}</li>
        </a>
        <br><br>

        {% set per = ['admin'] %}
    
        <h3>{{request.session.discord_user.username}}さんは
        {% if user_permission in per %}
            編集できます。
        {% else %}
            編集できません。閲覧のみになります。
        {% endif %}
        </h3>
        <br/>
        {% if user_permission not in per %}
        <fieldset disabled>
        {% endif %}

        <form action="/api/vc-signal-success" name="discordForm" method="post">
            <details>
                <summary>
                    <strong>チャンネル一覧</strong>
                </summary>
                {% set i = [1] %}
                {% set chack_flag = 0 %}
                {% for vc_cate in vc_cate_channel %}
                    {% if vc_cate.type == 4 %}
                        {{ vc_cate.name }}
                        <br/>
                    {% else %}
                        <!--ng_channelの場合初めからチェックを付ける-->
                        <details>
                            <summary>
                                <strong>
                                    {% if vc_cate.type == 2 %}
                                        ボイスチャンネル  :
                                    {% endif %}
                                    {{vc_cate.name}}
                                </strong>
                            </summary>
                            通知の送信先チャンネル<br/>
                            <select name="send_channel_id_{{vc_cate.id}}" size="1">
                                {% for chan in text_channel %}
                                    <!--テキストチャンネルの場合-->
                                    {% if chan.type == 0 %}
                                        <option value={{chan.id}} 
                                            {% for vc in vc_set %} 
                                                {% if chan.id|int == vc.send_channel_id|int %}
                                                    selected
                                                {% endif %}
                                            {% endfor %}
                                        >
                                            {% if chan.parent_id %}
                                                {% for cate in vc_cate_channel %}
                                                    {% if cate.type == 4 and cate.id == chan.parent_id %}
                                                        {{ cate.name }}:
                                                    {% endif %}
                                                {% endfor %}
                                            {% endif %}
                                            {{chan.name}}
                                        </option>
                                    {% endif %}
                                {% endfor %}
                            </select>
                            <br/>
                            <!-- ロールの数 -->
                            {% set role_len = 0 %}                        
                            {% for vc in vc_set %}
                                {% if vc_cate.id|int == vc.vc_id|int %}

                                    通知を送信する
                                    {{vc.send_signal}}
                                    {% if vc.send_signal %}
                                        <input type="checkbox" checked="checked" name="send_signal_{{vc_cate.id}}" value="True"/>
                                    {% else %}
                                        <input type="checkbox" name="send_signal_{{vc_cate.id}}" value="True"/>
                                    {% endif %}

                                    Botの通知をする
                                    {% if vc.join_bot %}
                                        <input type="checkbox" checked="checked" name="join_bot_{{vc_cate.id}}" value="True"/>
                                    {% else %}
                                        <input type="checkbox" name="join_bot_{{vc_cate.id}}" value="True"/>
                                    {% endif %}
                                    <br/>
                                    everyoneで通知する
                                    {% if vc.everyone_mention %}
                                        <input type="checkbox" checked="checked" name="everyone_mention_{{vc_cate.id}}" value="True"/>
                                    {% else %}
                                        <input type="checkbox" name="everyone_mention_{{vc_cate.id}}" value="True"/>
                                    {% endif %}
                                    <br/>
                                    通知するロールの追加
                                    <br/>

                                    <!-- 既にロールが設定されていた場合、その数を代入 -->
                                    {% if vc.mention_role_id|length > 0 %}
                                        {% set role_len = vc.mention_role_id|length %}
                                    {% endif %}
                                    
                                    <!-- サーバ内のロール一覧をselectできるようにする -->
                                    <select id="mySelect_{{vc_cate.id}}" onchange="selectRoleAddEvent('{{role_len}}',this)" size="1">
                                        {% for role in guild.roles %}
                                            <option value="{{role.id}}">
                                                {{role.name}}
                                            </option>
                                        {% endfor %}
                                    </select>
                                    <br/>
                                    {% set i = [1] %}
                                    <!-- 
                                        既に設定されている、または設定された場合、{{ロール名}}:x の形式で画面に表示する
                                        xをクリックすることで削除できる
                                     -->
                                    <div id="{{vc_cate.id}}" class="selected-items">
                                        {% for role_mention in vc.mention_role_id %}
                                            <div id="{{role_mention}}" class="selected-item">
                                                {% for role in guild.roles %}
                                                    <!-- すでに通知するロールが設定されていた場合、表示 -->
                                                    {% if role.id|int == role_mention|int %}
                                                        {{role.name}}
                                                        <span class="remove-item" onclick="removeAdd(this)">x</span>
                                                        <input type="hidden" name="role_{{vc_cate.id}}_{{i[0]}}" value="{{role.id}}"/>
                                                        {% set _ = i.append(i[0] + 1) %}
                                                        {% set _ = i.pop(0) %}
                                                    {% endif %}
                                                {% endfor %}
                                            </div>
                                        {% endfor %}
                                    </div>
                                    <br/><br/>
                                {% endif %}
                                
                            {% endfor %}
                        </details>
                    {% endif %}
                {% endfor %}
            <br/>
            <input type="button" value="全選択" onClick="check(this.form,true,'channel_')"/>
            <input type="button" value="全解除" onClick="check(this.form,false,'channel_')"/>
            </details>
            <br/>
            <br/>
            <input type="hidden" name="guild_id" value="{{guild_id}}"/>
            {% if user_permission in per %}
            <input type="submit" value="送信"/>
            {% endif %}
        </form>
        {% if user_permission not in per %}
        </fieldset>
        {% endif %}
        
        <br/>
        <a href="/guild/{{guild_id}}" class="btn btn-primary">前のページに戻る</a>
        <a href="/guilds" class="btn btn-primary">サーバ一覧に戻る</a>
        <br/>
        サーバーウィジェットが有効の場合、サーバーの状況が表示されます。<br/>
        <iframe title="discord_5second" style="height: 350px;" src="https://discord.com/widget?id={{guild_id}}&theme=dark/" sandbox="allow-popups allow-popups-to-escape-sandbox allow-same-origin allow-scripts"></iframe>
        <script src="/static/js/selects_bar.js" type="text/javascript"></script>
    </body>
    {% endblock %}
</html>

templates/admin/admin.html

admin.html
<!DOCTYPE html>
<html>
    <script src="/static/js/box_allcheck.js" type="text/javascript"></script>
    {% extends "layout.html" %}
    {% block content %}
    <body bgcolor="#252525" text="#ffffff">

        <h1>{{guild.name}}の権限設定</h1>
        <h4>各項目にアクセスできる権限を設定できます。</h4>

        

        {% if not guild.icon %}
            <a href="/guild/{{guild.id}}">
                <img src="/static/img/discord-icon.jpg" />
            </a>
        {% else %}
            <a href="/guild/{{guild.id}}">
                <img src="https://cdn.discordapp.com/icons/{{guild.id}}/{{guild.icon}}.png"  />
            </a>
        {% endif %}
        <a href="/guild/{{guild.id}}">
            <li>{{ guild.name }}</li>
        </a>
        <br/>
        <br>
        
        <form action="/api/admin-success" name="discordForm" method="post">

            <details>
                <summary>
                    <strong>ボイスチャンネルの通知設定</strong>
                </summary>
                Permission Code:{{guild_table.vc_permission|int}}

                <h6>編集を許可する権限コード</h6>
                <input type="number" name="vc_permission_code" value="{{guild_table.vc_permission|int}}" min="0" max="1099511627775"/>


                {% set member_len = guild_table.vc_user_id_permission|length %}

                <h6>アクセスを許可するメンバーの選択</h6>
                <!-- サーバ内のロール一覧をselectできるようにする -->
                <select id="memberSelect_vc" onchange="selectAdminAddEvent('{{member_len}}',this,'member_select_vc')" size="1">
                    <option hidden disabled>選択してください</option>
                    {% for member in guild_members %}
                        <option value="{{member.user.id}}">
                            {{member.user.username}}
                        </option>
                    {% endfor %}
                </select>
                <br/>
                
                <!-- 
                    既に設定されている、または設定された場合、{{ロール名}}:x の形式で画面に表示する
                    xをクリックすることで削除できる
                -->
                <div id="member_select_vc" class="selected-items">
                    {% for member_id in guild_table.mention_members %}
                        <div id="{{member_id}}" class="selected-item">
                            {% for member in guild_members %}
                                {% if member_id|int == member.user.id|int %}
                                    {{member.user.username}}
                                    <span class="remove-item" onclick="removeAdd(this)">x</span>
                                    <input type="hidden" name="member_select_vc_{{i[0]}}" value="{{member.user.id}}"/>
                                    {% set _ = i.append(i[0] + 1) %}
                                    {% set _ = i.pop(0) %}
                                {% endif %}
                            {% endfor %}
                        </div>
                    {% endfor %}
                </div>
                <br/><br/>

                {% set role_len = guild_table.vc_role_id_permission|length %}

                <h6>アクセスを許可するロールの選択</h6>
                <!-- サーバ内のロール一覧をselectできるようにする -->
                <select id="roleSelect_vc" onchange="selectAdminAddEvent('{{role_len}}',this,'role_select_vc')" size="1">
                    <option hidden disabled>選択してください</option>
                    {% for role in guild.roles %}
                        <option value="{{role.id}}">
                            {{role.name}}
                        </option>
                    {% endfor %}
                </select>
                <br/>
                
                <!-- 
                    既に設定されている、または設定された場合、{{ロール名}}:x の形式で画面に表示する
                    xをクリックすることで削除できる
                -->
                <div id="role_select_vc" class="selected-items">

                    {% for role_id in guild_table.vc_role_id_permission %}
                        <div id="{{role_id}}" class="selected-item">
                            {% for role in guild.roles %}

                                <!-- すでに通知するロールが設定されていた場合、表示 -->
                                {% if role.id|int == role_id|int %}
                                    {{role.name}}
                                    <span class="remove-item" onclick="removeAdd(this)">x</span>
                                    <input type="hidden" name="role_select_vc_{{i[0]}}" value="{{role.id}}"/>
                                    {% set _ = i.append(i[0] + 1) %}
                                    {% set _ = i.pop(0) %}
                                {% endif %}
                                
                            {% endfor %}
                        </div>
                    {% endfor %}

                </div>
                <br/><br/>
            </details>

            <input type="hidden" name="guild_id" value="{{guild_id}}"/>
            <br/><br/>
            <input type="submit" value="送信"/>
        </form>

        サーバーウィジェットが有効の場合、サーバーの状況が表示されます。<br/>
        <iframe title="discord_5second" style="height: 350px;" src="https://discord.com/widget?id={{guild_id}}&theme=dark/" sandbox="allow-popups allow-popups-to-escape-sandbox allow-same-origin allow-scripts"></iframe>
        <script src="/static/js/select_id.js" type="text/javascript"></script>
    </body>
    {% endblock %}
</html>

routers/api/adminsuccess.html

adminsuccess.html
<!DOCTYPE html>
<html>
  {% extends "layout.html" %}
  {% block content %}
  <body bgcolor="#252525" text="#ffffff">
    <div class="col-md-2">
      <a href="/guild/{{guild_id}}" class="btn btn-primary">戻る</a>
    </div>
    <div class="col-md-2">
      <a href="/guilds" class="btn btn-primary">サーバ一覧へ</a>
    </div>
  </body>
  {% endblock %}
</html>

routers/api/vcsignalsuccess.html

vcsignalsuccess.html
<!DOCTYPE html>
<html>
  {% extends "layout.html" %}
  {% block content %}
  <body bgcolor="#252525" text="#ffffff">
    <div class="col-md-2">
      <a href="/guild/{{guild_id}}" class="btn btn-primary">戻る</a>
    </div>
    <div class="col-md-2">
      <a href="/guilds" class="btn btn-primary">サーバ一覧へ</a>
    </div>
  </body>
  {% endblock %}
</html>

templates/static/js/box_allcheck.js

box_allcheck.js
function check(targetForm,flag,name){
    for (let n = 0;n <= targetForm.length - 1;n++) {
      if (targetForm.elements[n].type == "checkbox" && targetForm.elements[n].name.indexOf(name) >= 0) {
        targetForm.elements[n].checked = flag;
      }
    }
}

function classCheck(targetForm,flag,className){
  for (let i = 0;n < targetForm.length - 1;i++){
    if(targetForm.elements[i].type == "checkbox" && targetForm.elements[i].className.indexOf(className) >= 0){
      targetForm.elements[i].checked = flag;
    }
  }
}

templates/static/js/popover.js

popover.js
const popoverBtn = document.getElementById('popover-btn');
const popoverContent = document.getElementById('popover-content');

popoverBtn.addEventListener('click', function() {
  //console.log(popoverContent.style.display)
  if (popoverContent.style.display === 'block'){
    popoverContent.style.display = 'none';
  }else if (popoverContent.style.display === 'none' || popoverContent.style.display == ''){
    popoverContent.style.display = 'block';
  }
});

templates/static/js/select_id.js

select_id.js
let array = {}

const selectAdminAddEvent = (len,se,selectName) => {
    /*
    len:int
    既に設定されているメンバー、ロールの数

    se:HTMLinner
    select要素
    
    selectName:str
    選択された場合「名前:x」の形式で表示するdiv要素のid名
    */
    // idから要素を取得
    let selectId = document.getElementById(se.id);

    // チャンネルidのみを引き出す
    //const divId = se.id.substring(se.id.indexOf("_") + 1) 

    // すでにデータベース側にロールが設定された場合()
    if (len > 0){
        // チャンネルidをキーとした連想配列を作成
        array[se.id] = []
        for (let i = 0; i < len; i++){
            // ロールの数だけ代入
            array[se.id].push(i)
        }
    }

    function addSelectEvent(e) {
        const selectedItems = document.getElementById(selectName);

        // 二重に登録されないように削除
        selectId.removeEventListener("change",addSelectEvent); // change属性を削除する

        const selectedOption = e.target.selectedOptions[0];
    
        const outerTextIndex = selectedItems.outerText.indexOf(selectedOption.textContent);
        
        // 既に登録されている場合
        if (outerTextIndex !== -1){
            console.log("return")
            return
        }
    
        // 登録されているロールの合計
        const xSum = selectedItems.outerText.split("x").length
    
        // 名前
        const item = document.createElement("div");
        item.className = "selected-item";
        item.textContent = selectedOption.textContent;
        
        // xボタン
        const removeBtn = document.createElement("span");
        removeBtn.className = "remove-item";
        removeBtn.textContent = "x";
    
        //input
        const hiddenItem = document.createElement("input");
        hiddenItem.className = "hidden-item";
        hiddenItem.type = "hidden";

        console.log(array)
        
        // 登録しようとしているチャンネルがある場合
        if (array[se.id]){

            console.log(se.id+" is true")
            // 一番最後の要素がある場合(前の要素が削除されている)
            if (array[se.id].includes(xSum)){

                // 削除された要素の中で一番小さい値を取得(nullが格納されている)
                let arrayNull = array[se.id].indexOf(null)
                console.log("uwagaki="+arrayNull)
                // 削除された要素に代入
                if (arrayNull > -1){
                    //
                    hiddenItem.name = selectedItems.id + "_" +(arrayNull + 1);
                    array[se.id][arrayNull] = (arrayNull + 1)
                }
            // 一番最後に代入
            }else{
                console.log("push="+xSum)
                array[se.id].push(xSum)
                hiddenItem.name = selectedItems.id + "_" + xSum;
            }
        // チャンネルの中身が空
        }else{
            console.log(se.id+" is false")
            array[se.id] = []
            array[se.id].push(xSum)
            hiddenItem.name = selectedItems.id + "_" + xSum;
        }
        hiddenItem.value = selectedOption.value;
        
        // xボタンをクリックしたら削除するように仕込む
        removeBtn.addEventListener("click", function () {
            // 削除する要素の番号
            let roleObjName = hiddenItem.name.substring(
                hiddenItem.name.lastIndexOf("_") + 1
            );
            
            // 1から始まるため-1にnull
            array[se.id][roleObjName - 1] = null
            item.remove();
            hiddenItem.remove();
            selectedOption.selected = false;
        });
        
        item.appendChild(removeBtn);
        selectedItems.appendChild(item);
        selectedItems.appendChild(hiddenItem)
        /*
        <div class="selected-item">
            @everyone
        <span class="remove-item">x</span></div>
        <input class="hidden-item" type="hidden" name="role_{{server_id}}_1" value="{{server_id}}">
        */

    }

    selectId.removeEventListener("change",addSelectEvent); // change属性を削除する

    selectId.addEventListener("change", addSelectEvent);
}

templates/static/js/selects_bar.js

selects_bar.js
let roleArray = {}
let memberArray = {}


const selectRoleAddEvent = (roleLen,se) => {
    // idから要素を取得
    let selectId = document.getElementById(se.id);

    // チャンネルidのみを引き出す
    const divId = se.id.substring(se.id.indexOf("_") + 1) 

    // ロールが設定された場合
    if (roleLen > 0){
        // チャンネルidをキーとした連想配列を作成
        roleArray[se.id] = []
        for (let i = 0; i < roleLen; i++){
            // ロールの数だけ代入
            roleArray[se.id].push(i)
        }
    }

    function addSelectEvent(e) {
        // 親要素(サーバーidがdivid)を取得
        const selectedItems = document.getElementById(divId);

        // 二重に登録されないように削除
        selectId.removeEventListener("change",addSelectEvent); // change属性を削除する

        const selectedOption = e.target.selectedOptions[0];
    
        const outerTextIndex = selectedItems.outerText.indexOf(selectedOption.textContent);
        
        // 既に登録されている場合
        if (outerTextIndex !== -1){
            console.log("return")
            return
        }
    
        // 登録されているロールの合計
        const roleSum = selectedItems.outerText.split("x").length
    
        // 名前
        const item = document.createElement("div");
        item.className = "selected-item";
        item.textContent = selectedOption.textContent;
        
        // xボタン
        const removeBtn = document.createElement("span");
        removeBtn.className = "remove-item";
        removeBtn.textContent = "x";
    
        //input
        const hiddenItem = document.createElement("input");
        hiddenItem.className = "hidden-item";
        hiddenItem.type = "hidden";

        console.log(roleArray)
        
        // 登録しようとしているチャンネルがある場合
        if (roleArray[se.id]){
            console.log(se.id+" is true")
            // 一番最後の要素がある場合(前の要素が削除されている)
            if (roleArray[se.id].includes(roleSum)){
                // 削除された要素の中で一番小さい値を取得(nullが格納されている)
                let arrayNull = roleArray[se.id].indexOf(null)
                console.log("uwagaki="+arrayNull)
                // 削除された要素に代入
                if (arrayNull > -1){
                    hiddenItem.name = "role_" + selectedItems.id + "_" +(arrayNull + 1);
                    roleArray[se.id][arrayNull] = (arrayNull + 1)
                }
            // 一番最後に代入
            }else{
                console.log("push="+roleSum)
                roleArray[se.id].push(roleSum)
                hiddenItem.name = "role_" + selectedItems.id + "_" + roleSum;
            }
        // チャンネルの中身が空
        }else{
            console.log(se.id+" is false")
            roleArray[se.id] = []
            roleArray[se.id].push(roleSum)
            hiddenItem.name = "role_" + selectedItems.id + "_" + roleSum;
        }
        hiddenItem.value = selectedOption.value;
        
        // xボタンをクリックしたら削除するように仕込む
        removeBtn.addEventListener("click", function () {
            // 削除する要素の番号
            let roleObjName = hiddenItem.name.substring(hiddenItem.name.lastIndexOf("_") + 1);
            
            // 1から始まるため-1にnull
            roleArray[se.id][roleObjName - 1] = null
            item.remove();
            hiddenItem.remove();
            selectedOption.selected = false;
        });
        
        item.appendChild(removeBtn);
        selectedItems.appendChild(item);
        selectedItems.appendChild(hiddenItem)
    }

    selectId.removeEventListener("change",addSelectEvent); // change属性を削除する

    selectId.addEventListener("change", addSelectEvent);
}

const selectMemberAddEvent = (memberLen,se) => {
    // idから要素を取得
    let selectId = document.getElementById(se.id);

    // チャンネルidのみを引き出す
    const divId = se.id.substring(se.id.indexOf("_") + 1) 

    // 設定されているロールがあった場合
    if (memberLen > 0){
        // チャンネルidをキーとした連想配列を作成
        memberArray[se.id] = []
        for (let i = 0; i < memberLen; i++){
            // ロールの数だけ代入
            memberArray[se.id].push(i)
        }
    }

    function addSelectEvent(e) {
        const selectedItems = document.getElementById(divId);

        // 二重に登録されないように削除
        selectId.removeEventListener("change",addSelectEvent); // change属性を削除する

        const selectedOption = e.target.selectedOptions[0];
    
        const outerTextIndex = selectedItems.outerText.indexOf(selectedOption.textContent);
        
        if (outerTextIndex !== -1){
            console.log("return")
            return
        }
    
        const memberSum = selectedItems.outerText.split("x").length
    
        // 名前
        const item = document.createElement("div");
        item.className = "selected-item";
        item.textContent = selectedOption.textContent;
        
        // xボタン
        const removeBtn = document.createElement("span");
        removeBtn.className = "remove-item";
        removeBtn.textContent = "x";
    
        //input
        const hiddenItem = document.createElement("input");
        hiddenItem.className = "hidden-item";
        hiddenItem.type = "hidden";
        
        if (memberArray[se.id]){
            console.log(se.id+" is true")
            if (memberArray[se.id].includes(memberSum)){
                let arrayNull = memberArray[se.id].indexOf(null)
                console.log("uwagaki="+arrayNull)
                if (arrayNull > -1){
                    hiddenItem.name = "member_" + selectedItems.id + "_" +(arrayNull + 1);
                    memberArray[se.id][arrayNull] = (arrayNull + 1)
                }
            }else{
                console.log("push="+memberSum)
                memberArray[se.id].push(memberSum)
                hiddenItem.name = "member_" + selectedItems.id + "_" + memberSum;
            }
        // ロールの中身が空
        }else{
            console.log(se.id+" is false")
            memberArray[se.id] = []
            memberArray[se.id].push(memberSum)
            hiddenItem.name = "member_" + selectedItems.id + "_" + memberSum;
        }
        hiddenItem.value = selectedOption.value;
        
        // xボタンをクリックしたら削除するように仕込む
        removeBtn.addEventListener("click", function () {
            // 削除する要素の番号
            let memberObjName = hiddenItem.name.substring(hiddenItem.name.lastIndexOf("_") + 1);
            
            // 1から始まるため-1にnull
            memberArray[se.id][memberObjName - 1] = null
            item.remove();
            hiddenItem.remove();
            selectedOption.selected = false;
        });
        
        item.appendChild(removeBtn);
        selectedItems.appendChild(item);
        selectedItems.appendChild(hiddenItem)
    }

    selectId.removeEventListener("change",addSelectEvent); // change属性を削除する

    selectId.addEventListener("change", addSelectEvent);
}

// 既存要素の削除
const removeAdd = (removeItem) => {
    //.parentNodeで親要素の指定
    removeItem.parentNode.remove()
}

実行

pyhon main.pyで実行します。
image.png
http://localhost:5000 にアクセスしましょう。

image.png

ページに遷移したら、LoginからDiscord Loginをクリックします。

image.png

認証画面に遷移するので、認証。

image.png

設定できるサーバー一覧が表示されるので、招待したサーバーを選択。
image.png

ボイスチャンネルの通知設定をクリック。
image.png

こんな画面に遷移するので、お好きなように設定して送信。
image.png

実際にボイスチャンネルに入室してみましょう。
image.png

反映されていれば成功です。

終わりに

かなり雑に説明しましたが、Discordログインの例として参考になれば幸いです。

4
2
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
4
2