はじめに
こんにちは。マグロです。
今回はDiscordログインの勉強がてら、簡易的なBot作成をします。
背景
僕の代表作であるDiscordとLINEの連携Botなのですが、実は今年の3月ごろにWebUIでカスタマイズできる機能を追加していました。
その際にDiscordログインを使用してユーザーを識別させていたので、メモ程度にBotを作ろうというのがきっかけです。
環境
Python 3.10.7
Windows11
本稿ではWindowsで仮想環境を立てて実行しますが、デプロイ用にDockerfileも書きます。
使うもの
Docker
PostgreSQL
FastAPI
Pycord
asyncpg
Postgresはローカルのものでなく、supabaseのpostgresに接続します。
設計
フロント、サーバー共にFastAPIで実装します。
並列実行でBotを動かします。
OAuth設定とBot作成
Discord Developer Portalにアクセスします。
右上の「New Application(新しいアプリケーション)」をクリックします。
名前を入力したら、「Create(作成)」ボタンをクリックします。
作成したら、アプリ一覧から作成したアプリの「Bot」に移行し、「Add Bot」をクリックします。
Nameとiconを好きなように設定しましょう。
Intentsは公開Botにするわけではないのですべて有効にしてもいいでしょう。
「OAuth2 General」に移動し、「Redirects(リダイレクト)」に、リダイレクト先のURLを入力します。
ローカルで実行するため「 http://localhost:5000/discord-callback/ 」とURLを指定します。
また、CLIENT ID
とCLIENT SECRET
はここに表示されます。
後あと使うので控えておきましょう。
「OAuth2 URL Generator」に移動します。
「Scopes(スコープ)」セクションでは、アプリケーションが要求する権限を選択します。
まずはBotをサーバに追加するため、「bot」のみを選択します。
下の権限では「administrator(管理者)」を選択します。
必要に応じて権限を変えてもいいです。
下に表示されるURLに遷移し、参加させるサーバを選択して招待しましょう。
招待したら、一度スコープをリセットして、「identify,guilds,guilds.members.read」スコープを選択します。
SELECT REDIRECT URLに先ほど設定したcallback urlを指定し、その下に表示されているOAuth URLをコピーして控えておいてください。
データベースの用意
今回はPostgreSQLを使用します。
クラウド上のものを使用します。
herokuやrailwayなどが候補として挙げられますが、無料で使えるsupabaseを使用します。
登録がまだの人は登録しておきましょう。
GitHubログインをお勧めします。
ログインしたら/dashboard/projects
に遷移し、New Project
から新しいプロジェクトを作ります。
Name,Database,Password,Regionを好きなように設定してください。
作成したら、SettingのDatabaseに遷移して
- Host
- Database Name
- Port
- User
- Password
を控えておいてください。
またPassword
はプロジェクト作成時に設定したものと同様になります。
ディレクトリ構造
$ tree
.
├── base
│ ├── aio_req.py # 非同期での操作
│ ├── database.py # データベース操作
│ └── guild_permission.py
├── cogs
│ ├── bin
│ │ └── activity.py
│ └── vc_count.py # 入退室管理
├── core
│ ├── db_pickle.py # データベースのキャッシュ
│ └── start.py # DiscordBot起動用
├── routers
│ ├── api
│ │ ├── check
│ │ │ └── post_user_check.py
│ │ ├── admin_success.py
│ │ └── vc_signal_success.py
│ ├── guild
│ │ ├── admin # 管理者ページ
│ │ │ └── admin.py
│ │ ├── vc_signal # 入退室管理ページ
│ │ │ └── vc_signal.py
│ │ └── guild.py
│ ├── session_base
│ │ └── user_session.py
│ ├── callback.py # callbackの処理
│ ├── guilds.py
│ ├── index.py # indexページの処理
│ ├── login.py # 非同期での操作
│ └── logout.py
├── templates
│ ├── api
│ │ ├── admin_success.html
│ │ └── vc_signal_success.html
│ ├── guild
│ │ ├── admin
│ │ │ └── admin.html
│ │ ├── vc_signal
│ │ │ └── vc_signal.html
│ │ └── guild.py
│ ├── static
│ │ ├── img
│ │ │ └── discord_icon.jpg
│ │ └── js
│ │ ├── box_allcheck.js
│ │ ├── popover.js
│ │ ├── select_id.js
│ │ └── selects_bar.js
│ ├── guilds.html
│ ├── index.html # indexページ
│ └── layout.html
├── .env
├── .gitignore
├── main.py
├── server_router.py # fastapiのルータ
├── Dockerfile
└── requirements.txt
ライブラリインストール
requirements.txtに以下のように書き込みます。
aiohttp==3.7.4.post0
aiofiles==22.1.0
anyio==3.5.0
asyncpg==0.27.0
discord.py==1.7.3
fastapi==0.88.0
gunicorn==20.1.0
itsdangerous==2.1.0
Jinja2==3.0.3
python-dotenv==0.21.0
python-multipart==0.0.5
pydantic==1.9.1
requests==2.26.0
starlette==0.22.0
uvicorn==0.20.0
仮想環境作成
仮想環境を作成します。以下のコマンドを打ち込んでください。
python -m venv venv
作成したら、仮想環境に遷移します。
.\venv\Scripts\activate
requirements.txtに書き込んだライブラリをインストールしましょう。
pip install -r requirements.txt
加えて今回はdiscord.pyの派生ライブラリのpycord
を使用します。
pip install git+https://github.com/Pycord-Development/pycord
も実行しておいてください。
Dockerfile
Windowsの仮想環境を想定しているので必要ありませんが、デプロイを想定して作成します。
FROM python:3.10.7
USER root
RUN apt-get -y update && apt-get -y install locales && apt-get -y upgrade && \
localedef -f UTF-8 -i ja_JP ja_JP.UTF-8
ENV LANG ja_JP.UTF-8
ENV LANGUAGE ja_JP:ja
ENV LC_ALL ja_JP.UTF-8
ENV TZ JST-9
ENV TERM xterm
# ./root/src ディレクトリを作成 ホームのファイルをコピーして、移動
RUN mkdir -p /root/src
COPY . /root/src
WORKDIR /root/src
# Docker内で扱うffmpegをインストール
RUN apt-get install -y ffmpeg
# pipのアップグレード、requirements.txtから必要なライブラリをインストール
RUN pip install --upgrade pip
RUN pip install --upgrade setuptools
RUN pip install -r requirements.txt
# discord.pyをpycordにアップグレード
RUN pip install git+https://github.com/Pycord-Development/pycord
.gitignore
GitHubにデプロイする際に、デプロイしないファイルやフォルダを設定します。
.env
venv
__pycache__/
*.pickle
環境変数
環境変数を構成します。
DISCORD_BOT_TOKEN=
DISCORD_CALLBACK_URL=http://localhost:5000/discord-callback/
DISCORD_CLIENT_ID=
DISCORD_CLIENT_SECRET=
DISCORD_SCOPE=identify%20guilds%20guilds.members.read
PGDATABASE=
PGHOST=
PGPASSWORD=
PGUSER=
PORTS=5000
MIDDLE_KEY=
-
DISCORD_BOT_TOKEN
DiscordBotのトークン。 -
DISCORD_CALLBACK_URL
Discordログインの認証時に遷移するURL、Developerサイトに登録したものと同じ。
(本項ではhttp://localhost:5000/discord-callback/
) -
DISCORD_CLIENT_ID
DiscordアプリのID、BotのユーザIDと同じ。 -
DISCORD_CLIENT_SECRET
Discordのシークレットキー、認証時に使用する。Developerサイトから発行したもの。 -
DISCORD_SCOPE
Discordアプリに許可する権限一覧。.envの権限通りにすること。 -
PGDATABASE
PostgreSQLのデータベース名。 -
PGHOST
データベースのホスト名。 -
PGPASSWORD
データベースのパスワード。 -
PGPORT
データベースのポート番号。 -
PGUSER
データベースのユーザー名。 -
PORTS
サーバ立ち上げ時に使用するポート番号。
本項では5000で起動する。 -
MIDDLE_KEY
セッションに認証情報を保存する際の暗号キー。
好きなように設定する。(some-randam-code
など)
コーデイング
main.py
from core.start import DBot
import discord
import os
from dotenv import load_dotenv
load_dotenv()
from server_router import keep_alive
# サーバー立ち上げ
keep_alive()
Token = os.environ['DISCORD_BOT_TOKEN']
# Bot立ち上げ
DBot(
token=Token,
intents=discord.Intents.all()
).run()
server_router.py
from fastapi import FastAPI
from fastapi.staticfiles import StaticFiles
from fastapi.templating import Jinja2Templates
from starlette.middleware.sessions import SessionMiddleware
from fastapi.middleware.cors import CORSMiddleware
from threading import Thread
import uvicorn
import os
from dotenv import load_dotenv
load_dotenv()
from routers import (
index,
login,
callback,
guilds,
logout
)
from routers.guild import guild
from routers.guild.vc_signal import vc_signal
from routers.guild.admin import admin
from routers.api import (
vc_signal_success,
admin_success
)
app = FastAPI(
docs_url=None,
redoc_url=None,
openapi_url=None,
title='FastAPIを利用したDiscordログイン',
description='OAuth2を利用してユーザー情報を取得するトークンを発行します。',
version='0.9 beta'
)
callback_url = os.environ.get('DISCORD_CALLBACK_URL').replace('/callback/','')
origins = [
callback_url
]
# new テンプレート関連の設定 (jinja2)
templates = Jinja2Templates(directory="templates")
jinja_env = templates.env #Jinja2.Environment : filterやglobalの設定用
# templates/static以下のファイルを静的に扱えるようにする
app.mount("/static", StaticFiles(directory="templates/static"), name="static")
# session使用
app.add_middleware(SessionMiddleware, secret_key=os.environ.get('MIDDLE_KEY'))
# オリジン間のリソースを共有
app.add_middleware(
CORSMiddleware,
allow_origins=origins,
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# 各パス
app.include_router(index.router)
app.include_router(login.router)
app.include_router(callback.router)
app.include_router(guild.router)
app.include_router(guilds.router)
app.include_router(logout.router)
app.include_router(vc_signal.router)
app.include_router(vc_signal_success.router)
app.include_router(admin.router)
app.include_router(admin_success.router)
# ローカル実行
def local_run():
uvicorn.run(
app,
host='localhost',
port=int(os.getenv("PORTS", default=5000)),
log_level="info"
)
# 本番環境
def run():
uvicorn.run(
"server_router:app",
host="0.0.0.0",
port=int(os.getenv("PORT", default=8080)),
log_level="info"
)
# DiscordBotと並列で立ち上げる
def keep_alive():
if os.environ.get("PORTS") != None:
t = Thread(target=local_run)
else:
t = Thread(target=run)
t.setDaemon(True)
t.start()
いくつか解説
# templates/static以下のファイルを静的に扱えるようにする
app.mount("/static", StaticFiles(directory="templates/static"), name="static")
こうすることでtemplates/static
内のファイルを静的に扱えるようになります。
ここにcssやjsを置くと、/static/css/stlye.css
とhtml内で扱えるようになります。
# session使用
app.add_middleware(SessionMiddleware, secret_key=os.environ.get('MIDDLE_KEY'))
セッションに変数を保存できるようになります。
request.session['user'] = user
のように代入できて、他のrouterでも参照できます。
base/aio_req.py
from base.guild_permission import Permission
import aiohttp
import aiofiles
from typing import List,Any,Dict
import os
import io
import pickle
DISCORD_BASE_URL = "https://discord.com/api"
DISCORD_BOT_TOKEN = os.environ["DISCORD_BOT_TOKEN"]
# getリクエストを行う
async def aio_get_request(url: str, headers: dict) -> Dict:
async with aiohttp.ClientSession() as session:
async with session.get(
url = url,
headers = headers
) as resp:
return await resp.json()
# postリクエストを行う
async def aio_post_request(url: str, headers: dict, data: dict) -> Dict:
async with aiohttp.ClientSession() as session:
async with session.post(
url = url,
headers = headers,
data = data
) as resp:
return await resp.json()
async def pickle_read(filename:str) -> Any:
"""
pickleファイルの読み込み
param:
filename:str
pickleファイルの名前
return:
pickleファイルの中身
"""
# 読み取り
async with aiofiles.open(
file=f'{filename}.pickle',
mode='rb'
) as f:
pickled_bytes = await f.read()
with io.BytesIO() as f:
f.write(pickled_bytes)
f.seek(0)
fetch = pickle.load(f)
return fetch
async def pickle_write(
filename:str,
table_fetch:List[Dict]
) -> None:
"""
pickleファイルの書き込み
param:
filename :str
pickleファイルの名前
table_fetch :List[Dict]
SQLから取り出したデータ
"""
# 取り出して書き込み
dict_row = [
dict(zip(record.keys(), record.values()))
for record in table_fetch
]
# 書き込み
async with aiofiles.open(
file=f'{filename}.pickle',
mode='wb'
) as f:
await f.write(pickle.dumps(obj=dict_row))
async def search_guild(
bot_in_guild_get:List[dict],
user_in_guild_get:List[dict]
) -> List:
"""
Botとログインしたユーザーが所属しているサーバーを調べ、同じものを返す
param:
bot_in_guild_get :List[dict]
Botが所属しているサーバー一覧
user_in_guild_get :List[dict]
ユーザーが所属しているサーバー一覧
return:
List
所属が同じサーバー一覧
"""
bot_guild_id = []
user_guild_id = []
match_guild = []
bot_guild_id = [
bot_guild.get('id')
for bot_guild in bot_in_guild_get
]
user_guild_id = [
user_guild.get('id')
for user_guild in user_in_guild_get
]
# for探索短縮のため、総数が少ない方をforinする
if len(bot_guild_id) < len(user_guild_id):
match_guild = [
guild
for guild_id,guild in zip(bot_guild_id,bot_in_guild_get)
if guild_id in user_guild_id
]
else:
match_guild = [
guild
for guild_id,guild in zip(user_guild_id,user_in_guild_get)
if guild_id in bot_guild_id
]
return match_guild
async def search_role(
guild_role_get:List[dict],
user_role_get:List[dict]
) -> List[dict]:
"""
ユーザがサーバ内で持っているロールの詳細を取得する
param:
guild_role_get :List[dict]
サーバにある全てのロール
user_role_get :List[dict]
ユーザ情報
return:
List
ユーザーが持っているロール一覧
"""
guild_role_id = []
user_role_id = []
match_role = []
for guild_role in guild_role_get:
guild_role_id.append(guild_role['id'])
for user_guild in user_role_get["roles"]:
user_role_id.append(user_guild)
for role_id,role in zip(guild_role_id,guild_role_get):
if role_id in user_role_id:
match_role.append(role)
return match_role
async def return_permission(
guild_id:int,
user_id:int,
access_token:str
) -> Permission:
"""
指定されたユーザの権限を返す(ロールの権限も含む)
guild_id :int
サーバのid
user_id :int
ユーザのid
access_token :str
ユーザのアクセストークン
"""
# ログインユーザの情報を取得
guild_user = await aio_get_request(
url = DISCORD_BASE_URL + f'/guilds/{guild_id}/members/{user_id}',
headers = {
'Authorization': f'Bot {DISCORD_BOT_TOKEN}'
}
)
# サーバのロールを取得
guild_role = await aio_get_request(
url = DISCORD_BASE_URL + f'/guilds/{guild_id}/roles',
headers = {
'Authorization': f'Bot {DISCORD_BOT_TOKEN}'
}
)
# ログインユーザのロールの詳細を取得
match_role = await search_role(
guild_role_get = guild_role,
user_role_get = guild_user
)
# ログインユーザの所属しているサーバを取得
guild_info = await aio_get_request(
url = DISCORD_BASE_URL + f'/users/@me/guilds',
headers = {
'Authorization': f'Bearer {access_token}'
}
)
permission = 0
user_permission = Permission()
# サーバでの権限を取得
for info in guild_info:
if str(guild_id) == info["id"]:
permission = info["permissions"]
break
await user_permission.get_permissions(permissions=permission)
for role in match_role:
# サーバー管理者であるかどうかを調べる
role_permission = Permission()
await role_permission.get_permissions(permissions=int(role["permissions"]))
user_permission = user_permission | role_permission
return user_permission
async def oauth_check(
access_token:str
) -> bool:
"""
OAuth2のトークンが有効か判断する
param:
access_token:str
OAuth2のトークン
return:
bool
トークンが有効な場合、True
無効の場合、Falseが返される
"""
oauth_data:dict = await aio_get_request(
url = DISCORD_BASE_URL + '/users/@me',
headers = {
'Authorization': f'Bearer {access_token}'
}
)
if oauth_data.get('message') == '401: Unauthorized':
return False
else:
return True
いくら非同期で接続するとはいえ、データベースへの接続には時間がかかります。
データの参照時間を軽減するため、pickle
を使いデータベースのキャッシュを作ります。
return_permission
では通常の権限だけでなく、ユーザが持っているロールからも権限を読み取ります。
base/database.py
PostgreSQLの操作をします。
asyncpgで非同期での操作を行います。
import asyncpg
from asyncpg.connection import Connection
from asyncpg.exceptions import DuplicateTableError
from typing import List,Dict,Any,Union,Tuple
from dotenv import load_dotenv
load_dotenv()
class DataBaseNotConnect(Warning):...
class PostgresDB:
def __init__(
self,
user:str,
password:str,
database:str,
host:str
):
"""
PostgreSQLのクラス
user :str
Postgresのユーザー名
password:str
パスワード
database:str
データベースの名前
host :str
ホスト番号
conn :Connection
データベースの接続情報
"""
self.user = user
self.password = password
self.database = database
self.host = host
self.conn:Connection = None
async def connect(self):
"""
PostgreSQLへ接続
"""
self.conn = await asyncpg.connect(
user=self.user,
password=self.password,
database=self.database,
host=self.host
)
async def disconnect(self):
"""
PostgreSQLの切断
"""
if self.conn == None:
raise DataBaseNotConnect
await self.conn.close()
async def create_table(self, table_name:str, columns:dict) -> str:
"""
テーブルの作成
table_name :str
作成するテーブル名
colums :dict
テーブル内の名前と型
"""
if self.conn == None:
raise DataBaseNotConnect
columns_str = ', '.join(
[
f"{column_name} {data_type}" for column_name, data_type in columns.items()
]
)
sql = f"CREATE TABLE IF NOT EXISTS {table_name} ({columns_str});"
try:
await self.conn.execute(sql)
return "ok"
except DuplicateTableError:
return "DuplicateTableError"
async def drop_table(self, table_name:str) -> None:
"""
テーブルの削除
table_name:str
削除するテーブルの名前
"""
sql = f"DROP TABLE IF EXISTS {table_name};"
await self.conn.execute(sql)
async def select_rows(
self,
table_name:str,
columns:List[str]=None,
where_clause:dict=None
) -> List:
"""
テーブルの参照
table_name :str
参照するテーブルの名前
columns :List[str]
参照する列、指定がない場合すべてを参照
where_clause:dict
条件、指定しない場合はすべて取得
return:
list :List[Any]
"""
if self.conn == None:
raise DataBaseNotConnect
if columns is None or len(columns) == 0:
columns_str = '*'
else:
columns_str = ', '.join(columns)
if where_clause is None:
sql = f"SELECT {columns_str} FROM {table_name};"
else:
where_clause_str = ' AND '.join(
[
f"{column}=${i+1}" for i, column in enumerate(
where_clause.keys()
)
]
)
where_clause_values = list(where_clause.values())
sql = f"SELECT {columns_str} FROM {table_name} "
if where_clause_str:
sql += f"WHERE {where_clause_str};"
else:
sql += ";"
try:
return await self.conn.fetch(sql, *where_clause_values)
except asyncpg.exceptions.UndefinedTableError:
return [f"{table_name} does not exist"]
async def insert_row(
self,
table_name:str,
row_values:dict
) -> bool:
"""
行の追加
table_name:str
対象のテーブルの名前
row_values:dict
追加する行の内容
"""
if self.conn == None:
raise DataBaseNotConnect
columns_str = ', '.join(row_values.keys())
values_str = ', '.join(
[
f"${i+1}" for i in range(len(row_values))
]
)
sql = f"INSERT INTO {table_name} ({columns_str}) VALUES ({values_str});"
try:
await self.conn.execute(sql, *row_values.values())
return True
except asyncpg.exceptions.UniqueViolationError:
return False
async def batch_insert_row(
self,
table_name: str,
row_values: List[Dict[str, Any]]
) -> None:
"""
行を一気に作成
"""
if self.conn == None:
raise DataBaseNotConnect
columns = row_values[0].keys()
values = [
tuple(
row[col]
for col in columns
)
for row in row_values
]
await self.conn.copy_records_to_table(
table_name=table_name,
records=values
)
async def update_row(
self,
table_name:str,
row_values:dict,
where_clause:dict
) -> None:
"""
行の更新
table_name :str
テーブルの名前
row_values :dict
更新の内容
where_clause:dict
条件
"""
if self.conn == None:
raise DataBaseNotConnect
set_clause_str = ', '.join(
[
f"{column}=${i+1}" for i, column in enumerate(
row_values.keys()
)
]
)
where_clause_str = ' AND '.join(
[
f"{column}=${i+len(row_values)+1}" for i, column in enumerate(
where_clause.keys()
)
]
)
values = list(row_values.values()) + list(where_clause.values())
sql = f"UPDATE {table_name} SET {set_clause_str} "
if where_clause_str:
sql += f"WHERE {where_clause_str};"
else:
sql += ";"
await self.conn.execute(sql, *values)
async def primary_batch_update_rows(
self,
table_name: str,
set_values_and_where_columns: List[Dict],
table_colum:Dict
) -> None:
"""
updateを複数行う
param:
tabel_name : str
更新するテーブル名
set_values_and_where_columns: List[Dict],
更新する値と条件の辞書型配列
それぞれに更新する行と、更新する内容を記述する
必ず以下のような構造にすること
また、where_clauseは主キーを指定し、重複させないこと
set_values_and_where_columns = [
{
'where_clause': {'channel_id': 0},
'row_values': {'カラム名': '値'},
},
{
'where_clause': {'channel_id': 1},
'row_values': {'カラム名': '値'},
},
{
'where_clause': {'channel_id': 2},
'row_values': {'カラム名': '値', 'カラム名': '値'},
},
]
table_colum : Dict
テーブルのカラム一覧
Postgresでcreateしたときのものを辞書型で表現すること
table_colum = {
'channel_id': 'NUMERIC PRIMARY KEY',
'guild_id': 'NUMERIC',
'line_ng_channel': 'boolean',
'ng_message_type': 'VARCHAR(50)[]',
'message_bot': 'boolean',
'ng_users':'NUMERIC[]'
}
"""
# テーブル名と列名のリストを取得
columns = table_colum.keys()
# 主キーを取り出す
primary_key = [
key
for key,value in table_colum.items()
if 'PRIMARY KEY' in value
]
updates = set_values_and_where_columns
# SET 句の文字列を構築
set_clauses = []
# はじめに条件となる主キーを代入
values = [w['where_clause'][primary_key[0]] for w in updates]
values_len = [w['where_clause'][primary_key[0]] for w in updates]
# 主キーの数でパラメータの初期値を決める
param_count = len(values_len) + 1
for i, column in enumerate(columns):
set_clause = f"{column} = CASE "
# case文に入るカラムがあるかのフラグ
set_clause_flag = False
for j, update in enumerate(updates):
# 更新するカラムがあった場合
if update['row_values'].get(column) is not None:
# フラグを挙げる
set_clause_flag = True
# $param_count $param_count + 1
set_clause += f"WHEN {primary_key[0]} = ${param_count} THEN ${param_count + 1} "
# 上記の数だけ2足す
param_count += 2
values.append(update['where_clause'][primary_key[0]])
values.append(update['row_values'][column])
#print(update['where_clause'][primary_key[0]],column,update['row_values'][column])
set_clause += f"ELSE {column} END"
# case文が書かれていた場合、配列に追加
if set_clause_flag:
set_clauses.append(set_clause)
# WHERE 句の文字列を構築
where_clause = f"{primary_key[0]} IN (" + ", ".join(
f"${i + 1}" for i in range(len(values_len))
) + ")"
# SQL 文の構築
sql = f"UPDATE {table_name} SET \n{', '.join(set_clauses)} WHERE {where_clause}"
await self.conn.execute(sql, *values)
async def delete_row(
self,
table_name:str,
where_clause:dict
) -> None:
"""
行の削除
param:
table_name :str
テーブルの名前
where_clause:dict
条件
"""
if self.conn == None:
raise DataBaseNotConnect
where_clause_str = ' AND '.join(
[
f"{column}=${i+1}" for i, column in enumerate(
where_clause.keys()
)
]
)
where_clause_values = list(where_clause.values())
sql = f"DELETE FROM {table_name} "
if where_clause_str:
sql += f"WHERE {where_clause_str};"
else:
sql += ";"
await self.conn.execute(sql, *where_clause_values)
async def free_sql(self,sql_syntax:str)-> List:
"""
PostgreSQLの構文を文字列にしてそのまま実行する
param:
sql_syntax:str
sqlの構文
return:
List
selectは結果が帰ってくる
"""
if self.conn == None:
raise DataBaseNotConnect
return await self.conn.fetch(sql_syntax)
async def get_columns_type(
self,
table_name:str
) -> Dict:
"""
指定されたテーブルの列の型を返す
param:
table_name :str
テーブルの名前
return:
List[Tuple[str,str]]
行名と型の配列
"""
query = f"""
SELECT column_name, data_type
FROM information_schema.columns
WHERE table_name = '{table_name}';
"""
result = await self.conn.fetch(query)
columns:List[Tuple[str,str]] = [
(row['column_name'],row['data_type'])
for row in result
]
columns_dict:Dict = {}
# ARRAY型の列の要素のデータ型を取得
for i, (column_name,data_type) in enumerate(columns):
# 初期の辞書型
tmp_dict:Dict = {column_name:data_type}
# 配列の場合
if data_type.startswith('ARRAY'):
query = f"""
SELECT column_name, udt_name
FROM information_schema.columns
WHERE table_name = '{table_name}' AND column_name = '{column_name}'
"""
result:List[Dict] = await self.conn.fetch(query)
element_data_type:str = result[0]['udt_name']
# 配列の場合(先頭に_がある)
if element_data_type.startswith('_'):
element_data_type = element_data_type.replace('_','')
element_data_type = f'{element_data_type}[]'
# 配列用に更新
tmp_dict = {
column_name: element_data_type
}
# varcharの場合
if data_type == 'character varying':
tmp_dict = {
column_name: 'varchar'
}
columns_dict.update(tmp_dict)
return columns_dict
base/guild_permission.py
ユーザが持っている権限を示すクラスです。
__and__で比較も可能です。
class Permission:
def __init__(self) -> None:
"""
このクラスは、Discordの権限を表すために使用されます。
各権限は、クラスの属性として定義されています。
それぞれの属性には、TrueまたはFalseのブール値が割り当てられています。
administrator: 管理者権限を持っているかどうか
create_instant_invite: サーバーに招待リンクを作成できるかどうか
kick_members: メンバーをキックできるかどうか
ban_members: メンバーをBANできるかどうか
manage_channels: チャンネルを管理できるかどうか
manage_guild: サーバーを管理できるかどうか
add_reactions: メッセージにリアクションを追加できるかどうか
view_audit_log: サーバーの監査ログを表示できるかどうか
priority_speaker: 優先スピーカーになれるかどうか
stream: 配信できるかどうか
view_channel: チャンネルを表示できるかどうか
send_messages: メッセージを送信できるかどうか
send_tts_messages: TTSメッセージを送信できるかどうか
manage_messages: メッセージを管理できるかどうか
embed_links: 埋め込みリンクを送信できるかどうか
attach_files: ファイルを添付できるかどうか
read_message_history: メッセージ履歴を表示できるかどうか
mention_everyone: @everyoneや@hereを使用できるかどうか
use_external_emojis: 外部の絵文字を使用できるかどうか
view_guild_insights: サーバーのインサイトを表示できるかどうか
connect: ボイスチャンネルに接続できるかどうか
speak: ボイスチャンネルで発言できるかどうか
mute_members: メンバーをミュートできるかどうか
deafen_members: メンバーをデフェンできるかどうか
move_members: メンバーを移動できるかどうか
use_vad: 音声検出を使用できるかどうか
change_nickname: ニックネームを変更できるかどうか
manage_nicknames: ニックネームを管理できるかどうか
manage_roles: ロールを管理できるかどうか
manage_webhooks: Webhookを管理できるかどうか
manage_emojis_and_stickers: 絵文字とステッカーを管理できるかどうか
use_application_commands: アプリケーションコマンドを使用できるかどうか
"""
self.administrator = False
self.create_instant_invite = False
self.kick_members = False
self.ban_members = False
self.manage_channels = False
self.manage_guild = False
self.add_reactions = False
self.view_audit_log = False
self.priority_speaker = False
self.stream = False
self.view_channel = False
self.send_messages = False
self.send_tts_messages = False
self.manage_messages = False
self.embed_links = False
self.attach_files = False
self.read_message_history = False
self.mention_everyone = False
self.use_external_emojis = False
self.view_guild_insights = False
self.connect = False
self.speak = False
self.mute_members = False
self.deafen_members = False
self.move_members = False
self.use_vad = False
self.change_nickname = False
self.manage_nicknames = False
self.manage_roles = False
self.manage_webhooks = False
self.manage_emojis_and_stickers = False
self.use_application_commands = False
self.request_to_speak = False
self.manage_threads = False
self.use_public_threads = False
self.use_private_threads = False
self.use_external_stickers = False
def __or__(self, other: 'Permission') -> 'Permission':
"""
2つのPermissionオブジェクトを論理和演算して、新しいPermissionオブジェクトを返すメソッド
"""
result = Permission() # 新しいPermissionオブジェクトを生成する
result.administrator = self.administrator or other.administrator
result.create_instant_invite = self.create_instant_invite or other.create_instant_invite
result.kick_members = self.kick_members or other.kick_members
result.ban_members = self.ban_members or other.ban_members
result.manage_channels = self.manage_channels or other.manage_channels
result.manage_guild = self.manage_guild or other.manage_guild
result.add_reactions = self.add_reactions or other.add_reactions
result.view_audit_log = self.view_audit_log or other.view_audit_log
result.priority_speaker = self.priority_speaker or other.priority_speaker
result.stream = self.stream or other.stream
result.view_channel = self.view_channel or other.view_channel
result.send_messages = self.send_messages or other.send_messages
result.send_tts_messages = self.send_tts_messages or other.send_tts_messages
result.manage_messages = self.manage_messages or other.manage_messages
result.embed_links = self.embed_links or other.embed_links
result.attach_files = self.attach_files or other.attach_files
result.read_message_history = self.read_message_history or other.read_message_history
result.mention_everyone = self.mention_everyone or other.mention_everyone
result.use_external_emojis = self.use_external_emojis or other.use_external_emojis
result.view_guild_insights = self.view_guild_insights or other.view_guild_insights
result.connect = self.connect or other.connect
result.speak = self.speak or other.speak
result.mute_members = self.mute_members or other.mute_members
result.deafen_members = self.deafen_members or other.deafen_members
result.move_members = self.move_members or other.move_members
result.use_vad = self.use_vad or other.use_vad
result.change_nickname = self.change_nickname or other.change_nickname
result.manage_nicknames = self.manage_nicknames or other.manage_nicknames
result.manage_roles = self.manage_roles or other.manage_roles
result.manage_webhooks = self.manage_webhooks or other.manage_webhooks
result.manage_emojis_and_stickers = self.manage_emojis_and_stickers or other.manage_emojis_and_stickers
result.use_application_commands = self.use_application_commands or other.use_application_commands
result.request_to_speak = self.request_to_speak or other.request_to_speak
result.manage_threads = self.manage_threads or other.manage_threads
result.use_public_threads = self.use_public_threads or other.use_public_threads
result.use_private_threads = self.use_private_threads or other.use_private_threads
result.use_external_stickers = self.use_external_stickers or other.use_external_stickers
return result
def __and__(self, other: 'Permission') -> 'Permission':
"""
2つのPermissionオブジェクトを論理積演算して、新しいPermissionオブジェクトを返すメソッド
"""
result = Permission() # 新しいPermissionオブジェクトを生成する
result.administrator = self.administrator and other.administrator
result.create_instant_invite = self.create_instant_invite and other.create_instant_invite
result.kick_members = self.kick_members and other.kick_members
result.ban_members = self.ban_members and other.ban_members
result.manage_channels = self.manage_channels and other.manage_channels
result.manage_guild = self.manage_guild and other.manage_guild
result.add_reactions = self.add_reactions and other.add_reactions
result.view_audit_log = self.view_audit_log and other.view_audit_log
result.priority_speaker = self.priority_speaker and other.priority_speaker
result.stream = self.stream and other.stream
result.view_channel = self.view_channel and other.view_channel
result.send_messages = self.send_messages and other.send_messages
result.send_tts_messages = self.send_tts_messages and other.send_tts_messages
result.manage_messages = self.manage_messages and other.manage_messages
result.embed_links = self.embed_links and other.embed_links
result.attach_files = self.attach_files and other.attach_files
result.read_message_history = self.read_message_history and other.read_message_history
result.mention_everyone = self.mention_everyone and other.mention_everyone
result.use_external_emojis = self.use_external_emojis and other.use_external_emojis
result.view_guild_insights = self.view_guild_insights and other.view_guild_insights
result.connect = self.connect and other.connect
result.speak = self.speak and other.speak
result.mute_members = self.mute_members and other.mute_members
result.deafen_members = self.deafen_members and other.deafen_members
result.move_members = self.move_members and other.move_members
result.use_vad = self.use_vad and other.use_vad
result.change_nickname = self.change_nickname and other.change_nickname
result.manage_nicknames = self.manage_nicknames and other.manage_nicknames
result.manage_roles = self.manage_roles and other.manage_roles
result.manage_webhooks = self.manage_webhooks and other.manage_webhooks
result.manage_emojis_and_stickers = self.manage_emojis_and_stickers and other.manage_emojis_and_stickers
result.use_application_commands = self.use_application_commands and other.use_application_commands
result.request_to_speak = self.request_to_speak and other.request_to_speak
result.manage_threads = self.manage_threads and other.manage_threads
result.use_public_threads = self.use_public_threads and other.use_public_threads
result.use_private_threads = self.use_private_threads and other.use_private_threads
result.use_external_stickers = self.use_external_stickers and other.use_external_stickers
return result
async def get_permissions(self,permissions:int) -> None:
"""
パーミッションを判別し、boolで返す
param:
permissions:int
権限を表すコード
"""
self.administrator = permissions & (1 << 3) == (1 << 3)
self.create_instant_invite = permissions & (1 << 0) == (1 << 0)
self.kick_members = permissions & (1 << 1) == (1 << 1)
self.ban_members = permissions & (1 << 2) == (1 << 2)
self.manage_channels = permissions & (1 << 4) == (1 << 4)
self.manage_guild = permissions & (1 << 5) == (1 << 5)
self.add_reactions = permissions & (1 << 6) == (1 << 6)
self.view_audit_log = permissions & (1 << 7) == (1 << 7)
self.priority_speaker = permissions & (1 << 8) == (1 << 8)
self.stream = permissions & (1 << 9) == (1 << 9)
self.view_channel = permissions & (1 << 10) == (1 << 10)
self.send_messages = permissions & (1 << 11) == (1 << 11)
self.send_tts_messages = permissions & (1 << 12) == (1 << 12)
self.manage_messages = permissions & (1 << 13) == (1 << 13)
self.embed_links = permissions & (1 << 14) == (1 << 14)
self.attach_files = permissions & (1 << 15) == (1 << 15)
self.read_message_history = permissions & (1 << 16) == (1 << 16)
self.mention_everyone = permissions & (1 << 17) == (1 << 17)
self.use_external_emojis = permissions & (1 << 18) == (1 << 18)
self.view_guild_insights = permissions & (1 << 19) == (1 << 19)
self.connect = permissions & (1 << 20) == (1 << 20)
self.speak = permissions & (1 << 21) == (1 << 21)
self.mute_members = permissions & (1 << 22) == (1 << 22)
self.deafen_members = permissions & (1 << 23) == (1 << 23)
self.move_members = permissions & (1 << 24) == (1 << 24)
self.use_vad = permissions & (1 << 25) == (1 << 25)
self.change_nickname = permissions & (1 << 26) == (1 << 26)
self.manage_nicknames = permissions & (1 << 27) == (1 << 27)
self.manage_roles = permissions & (1 << 28) == (1 << 28)
self.manage_webhooks = permissions & (1 << 29) == (1 << 29)
self.manage_emojis_and_stickers = permissions & (1 << 30) == (1 << 30)
self.use_application_commands = permissions & (1 << 31) == (1 << 31)
self.request_to_speak = permissions & (1 << 32) == (1 << 32)
self.manage_threads = permissions & (1 << 34) == (1 << 34)
self.use_public_threads = permissions & (1 << 35) == (1 << 35)
self.use_private_threads = permissions & (1 << 36) == (1 << 36)
self.manage_emojis_and_stickers = permissions & (1 << 42) == (1 << 42)
self.use_external_stickers = permissions & (1 << 43) == (1 << 43)
async def get_permission_code(self) -> int:
permission_code = 0
if self.administrator:
permission_code += 8
if self.create_instant_invite:
permission_code += 1
if self.kick_members:
permission_code += 2
if self.ban_members:
permission_code += 4
if self.manage_channels:
permission_code += 16
if self.manage_guild:
permission_code += 32
if self.add_reactions:
permission_code += 64
if self.view_audit_log:
permission_code += 128
if self.priority_speaker:
permission_code += 256
if self.stream:
permission_code += 512
if self.view_channel:
permission_code += 1024
if self.send_messages:
permission_code += 2048
if self.send_tts_messages:
permission_code += 4096
if self.manage_messages:
permission_code += 8192
if self.embed_links:
permission_code += 16384
if self.attach_files:
permission_code += 32768
if self.read_message_history:
permission_code += 65536
if self.mention_everyone:
permission_code += 131072
if self.use_external_emojis:
permission_code += 262144
if self.view_guild_insights:
permission_code += 524288
if self.connect:
permission_code += 1048576
if self.speak:
permission_code += 2097152
if self.mute_members:
permission_code += 4194304
if self.deafen_members:
permission_code += 8388608
if self.move_members:
permission_code += 16777216
if self.use_vad:
permission_code += 33554432
if self.change_nickname:
permission_code += 67108864
if self.manage_nicknames:
permission_code += 134217728
if self.manage_roles:
permission_code += 268435456
if self.manage_webhooks:
permission_code += 536870912
if self.manage_emojis_and_stickers:
permission_code += 1073741824
if self.use_application_commands:
permission_code += 2147483648
if self.request_to_speak:
permission_code += 4294967296
if self.manage_threads:
permission_code += 8589934592
if self.use_public_threads:
permission_code += 34359738368
if self.use_private_threads:
permission_code += 68719476736
if self.use_external_stickers:
permission_code += 137438953472
return permission_code
core/start.py
DiscordBotを立ち上げるコードです。
import discord
from discord import Intents
import os
import json
import traceback
import requests,json
from dotenv import load_dotenv
load_dotenv()
from core.db_pickle import db_pickle_save
class DBot(discord.AutoShardedBot):
def __init__(self, token:str, intents:Intents) -> None:
self.token = token
super().__init__(intents = intents)
self.load_cogs()
async def on_ready(self) -> None:
await self.change_presence(
status=discord.Status.do_not_disturb,
activity=discord.Activity(name='起動中...................',type=discord.ActivityType.watching)
)
await self.db_get()
print('起動しました')
game_name = os.environ.get('GAME_NAME')
if game_name == None:
game_name = 'senran kagura'
await self.change_presence(
status=discord.Status.online,
activity=discord.Game(name = game_name)
)
def load_cogs(self) -> None:
for file in os.listdir("./cogs"):
if file.endswith(".py"):
cog = file[:-3]
self.load_extension(f"cogs.{cog}")
print(cog + "をロードしました")
async def db_get(self) -> None:
# データベースへ接続
await db_pickle_save(guilds=self.guilds)
# 起動用の補助関数です
def run(self) -> None:
try:
self.loop.run_until_complete(self.start(self.token))
except discord.LoginFailure:
print("Discord Tokenが不正です")
except KeyboardInterrupt:
print("終了します")
self.loop.run_until_complete(self.close())
except discord.HTTPException as e:
traceback.print_exc()
if e.status == 429 and os.environ.get("WEBHOOK") != None:
main_content = {'content': 'DiscordBot 429エラー\n直ちにDockerファイルを再起動してください。'}
headers = {'Content-Type': 'application/json'}
response = requests.post(
url=os.environ.get("WEBHOOK"),
data=json.dumps(main_content),
headers=headers
)
except Exception as e:
traceback.print_exc()
if os.environ.get("WEBHOOK") != None:
if os.environ.get("ADMIN_ID") != None:
admin_id = os.environ.get("ADMIN_ID")
text = f"<@{int(admin_id)}> {e}"
else:
text = str(e)
main_content = {
'content':text
}
headers = {
'Content-Type': 'application/json'
}
response = requests.post(
url=os.environ.get("WEBHOOK"),
data=json.dumps(main_content),
headers=headers
)
core/db_pickle.py
データベースからデータを取得し、pickle
でキャッシュを作成します。
from discord import Guild
from dotenv import load_dotenv
load_dotenv()
from typing import List
import os
from base.database import PostgresDB
from core.pickes_save import (
vc_columns,
guild_permissions_columns
)
DISCORD_BASE_URL = "https://discord.com/api"
DISCORD_BOT_TOKEN = os.environ["DISCORD_BOT_TOKEN"]
USER = os.getenv('PGUSER')
PASSWORD = os.getenv('PGPASSWORD')
DATABASE = os.getenv('PGDATABASE')
HOST = os.getenv('PGHOST')
db = PostgresDB(
user=USER,
password=PASSWORD,
database=DATABASE,
host=HOST
)
async def db_pickle_save(guilds:List[Guild]) -> None:
"""
データベースのテーブルの中身をキャッシュデータとしてローカルに保存します。
param:
guilds :List[discord.Guild]
Botが所属しているDiscordサーバのクラス
"""
# データベースへ接続
await db.connect()
# サーバごとにテーブルのキャッシュデータを作成
for guild in guilds:
await vc_columns.vc_pickle_save(db=db,guild=guild)
await guild_permissions_columns.guild_permissions_pickle_save(db=db,guild=guild)
await db.disconnect()
core/pickle_save/vc_columns.py
ボイスチャンネル入退室管理のテーブルを取得し、キャッシュを保存する。
from typing import List,Dict
from discord import Guild
from base.database import PostgresDB
from base.aio_req import (
pickle_write
)
from core.pickes_save.bin.get_channel import get_discord_channel
from core.pickes_save.bin.check_table import check_table_type
VC_TABLE = 'guilds_vc_signal_'
VC_COLUMNS = {
'vc_id' : 'NUMERIC PRIMARY KEY',
'guild_id' : 'NUMERIC',
'send_signal' : 'boolean',
'send_channel_id' : 'NUMERIC',
'join_bot' : 'boolean',
'everyone_mention' : 'boolean',
'mention_role_id' : 'NUMERIC[]'
}
VC_NEW_COLUMNS = {
'vc_id' : 0,
'guild_id' : 0,
'send_signal' : True,
'send_channel_id' : 0,
'join_bot' : True,
'everyone_mention' : True,
'mention_role_id' : []
}
# 取得するチャンネルのタイプ
VC_TYPE = [2]
async def vc_pickle_save(
db:PostgresDB,
guild:Guild
) -> None:
"""
ボイスチャンネルの入退室を管理するテーブルの作成、更新
キャッシュデータの作成
param:
db:PostgresDB
接続するデータベースのインスタンス
guild:Guild
Discordのサーバーインスタンス
"""
# テーブル名を代入
table_name:str = f"{VC_TABLE}{guild.id}"
# テーブルをつくるか、カラムを取得するかのフラグ
create_colum_flag = False
create_table_flag = False
# テーブルを削除するかのフラグ
drop_table_flag = False
# テーブルの要素を取得
table_fetch:List[Dict] = await db.select_rows(
table_name=f"{table_name}",
columns=[],
where_clause={}
)
# テーブル内のカラム名配列
channel_colums = [key for key in VC_COLUMNS.keys()]
# テーブルがなかった場合、作成
if len(table_fetch) == 1:
# テーブルが存在しない場合、作成
if (table_fetch[0] == f"{table_name} does not exist"):
create_table_flag = True
table_colums = [key for key in VC_COLUMNS.keys()]
# テーブルが存在する場合、カラムを格納
else:
create_colum_flag = True
# データベース側のカラムの型を入手
table_columns_type = await db.get_columns_type(table_name=table_name)
table_colums = [key for key in table_columns_type.keys()]
else:
# データベース側のカラムの型を入手
table_columns_type = await db.get_columns_type(table_name=table_name)
table_colums = [key for key in table_columns_type.keys()]
# テーブルの方が存在する場合
if bool('table_columns_type' in locals()):
# テーブル内のカラムの型配列を確認
unchanged,table_fetch = await check_table_type(
columns=VC_COLUMNS,
table_columns=table_columns_type,
new_columns=VC_NEW_COLUMNS,
table_fetch=table_fetch
)
else:
unchanged = False
# テーブルに変更があるかのフラグ
changed_table_flag = table_colums != channel_colums or unchanged != False
# テーブルが存在しているが、中身が空
if len(table_fetch) == 0:
print(f'テーブル:{table_name}の要素は空です')
# 要素が変更されていた場合
if changed_table_flag:
drop_table_flag = True
# ローカル側のカラムを格納
else:
table_colums = [key for key in VC_COLUMNS.keys()]
# データベース側のカラムを格納
if create_colum_flag:
print(f'テーブル:{table_name}のカラム名一覧を作成します')
table_colums = [key for key in table_fetch[0].keys()]
# カラムの構成が変更されていた場合、削除し新たに作成する
if changed_table_flag or drop_table_flag:
print(f'テーブル:{table_name}を削除します')
create_table_flag = True
await db.drop_table(table_name=table_name)
# テーブルの作成
if create_table_flag:
print(f'テーブル:{table_name}を作成します')
await db.create_table(
table_name=table_name,
columns=VC_COLUMNS
)
# テーブルに変更があった場合
if changed_table_flag and len(table_fetch) != 0:
if "does not exist" not in table_fetch[0]:
# まとめて作成(バッジ)
await db.batch_insert_row(
table_name=table_name,
row_values=table_fetch
)
# 中身が空の場合、テーブルが作成された場合
if len(table_fetch) == 0 or create_table_flag:
# Discordのチャンネルを取得
all_channel = await get_discord_channel(
guild_id=guild.id,
get_channel_type=VC_TYPE
)
row_values = []
for channel in all_channel:
row = {}
for key,values in VC_NEW_COLUMNS.items():
# 各要素を更新
if key == "vc_id":
value = channel["id"]
elif key == "guild_id":
value = guild.id
elif key == "send_channel_id":
# システムチャンネルがある場合代入
if hasattr(guild.system_channel,'id'):
value = guild.system_channel.id
else:
value = 0
else:
value = values
row.update({key:value})
row_values.append(row)
# 一つ一つ作成
# await db.insert_row(table_name=table_name,row_values=row)
# まとめて作成(バッジ)
await db.batch_insert_row(
table_name=table_name,
row_values=row_values
)
# テーブルの要素を取得
table_fetch:List[Dict] = await db.select_rows(
table_name=f"{table_name}",
columns=[],
where_clause={}
)
dict_row = list()
# テーブルに中身がある場合
if len(table_fetch) > 0:
if table_fetch[0] != f"{table_name} does not exist":
print(f'{table_name}.pickleの書き込みをはじめます')
dict_row = [
dict(zip(record.keys(), record.values()))
for record in table_fetch
]
# 書き込み
# pickleファイルに書き込み
await pickle_write(
filename=table_name,
table_fetch=dict_row
)
print(f'{table_name}.pickleの書き込みが終了しました')
core/pickle_save/guild_permissions_columns.py
権限を示すテーブルのキャッシュを保存する。
from discord import Guild
from base.database import PostgresDB
from base.aio_req import (
pickle_write
)
from core.pickes_save.bin.check_table import check_table_type
GUILD_SET_TABLE = 'guild_set_permissions'
GUILD_SET_COLUMNS = {
'guild_id': 'NUMERIC PRIMARY KEY',
'vc_permission':'NUMERIC',
'vc_user_id_permission':'NUMERIC[]',
'vc_role_id_permission':'NUMERIC[]'
}
GUILD_SET_NEW_COLUMNS = {
'guild_id': 0,
'vc_permission':8,
'vc_user_id_permission':[],
'vc_role_id_permission':[]
}
async def guild_permissions_pickle_save(
db:PostgresDB,
guild:Guild
) -> None:
"""
サーバーの権限を示すテーブルの作成、更新
キャッシュデータの作成
param:
db:PostgresDB
接続するデータベースのインスタンス
guild:Guild
Discordのサーバーインスタンス
"""
# guildのテーブル
table_name = f"{GUILD_SET_TABLE}"
table_fetch = await db.select_rows(
table_name=table_name,
columns=[],
where_clause={
'guild_id': guild.id
}
)
# データベース側のカラムの型を入手
table_columns_type = await db.get_columns_type(table_name=table_name)
# テーブル内のカラム名配列
guild_colums = [key for key in GUILD_SET_COLUMNS.keys()]
table_colums = [key for key in table_columns_type.keys()]
# テーブル内のカラムの型配列
unchanged,table_fetch = await check_table_type(
columns=GUILD_SET_COLUMNS,
table_columns=table_columns_type,
new_columns=GUILD_SET_NEW_COLUMNS,
table_fetch=table_fetch
)
# テーブルに変更があるかのフラグ
changed_table_flag = table_colums != guild_colums or unchanged != False
# テーブルをつくるか、カラムを取得するかのフラグ
create_colum_flag = False
create_table_flag = False
# テーブルを削除するかのフラグ
drop_table_flag = False
if len(table_fetch) > 0:
# テーブルが存在しない場合、作成
if (table_fetch[0] == f"{table_name} does not exist"):
create_table_flag = True
# テーブルが存在する場合、カラムを格納
else:
create_colum_flag = True
# テーブルが存在している
elif len(table_fetch) == 0:
print(f'テーブル:{table_name}の要素は空です')
# 中身が空かつ、要素が変更されていた場合
if changed_table_flag:
drop_table_flag = True
else:
table_colums = [key for key in GUILD_SET_COLUMNS.keys()]
# データベース側のカラムを格納
if create_colum_flag:
print(f'テーブル:{table_name}のカラム名一覧を作成します')
table_colums = [key for key in table_fetch[0].keys()]
# カラムの構成が変更されていた場合、削除し新たに作成する
if changed_table_flag or drop_table_flag:
print(f'テーブル:{table_name}を削除します')
create_table_flag = True
await db.drop_table(table_name=table_name)
# テーブルの作成
if create_table_flag:
print(f'テーブル:{table_name}を作成します')
await db.create_table(
table_name=table_name,
columns=GUILD_SET_COLUMNS
)
# テーブルに変更があった場合
if changed_table_flag and len(table_fetch) != 0:
if "does not exist" not in table_fetch[0]:
# まとめて作成(バッジ)
await db.batch_insert_row(
table_name=table_name,
row_values=table_fetch
)
# ない場合は新規で登録
if len(table_fetch) == 0 or create_table_flag:
guild_new_colum = GUILD_SET_NEW_COLUMNS
guild_new_colum.update({
'guild_id':guild.id
})
await db.insert_row(
table_name=table_name,
row_values=guild_new_colum
)
# テーブルの要素を取得
table_fetch = await db.select_rows(
table_name=table_name,
columns=[],
where_clause={}
)
print(f'{table_name}.pickleの書き込みをはじめます')
# pickleファイルに書き込み
await pickle_write(
filename=table_name,
table_fetch=table_fetch
)
print(f'{table_name}.pickleの書き込みが終了しました')
core/pickle_save/bin/check_table.py
from typing import List,Dict,Tuple
import re
async def check_table_type(
columns:Dict,
table_columns:Dict,
new_columns:Dict,
table_fetch:List[Dict]
) -> Tuple[bool,List[Dict]]:
"""
テーブルのカラムに変更がないか確認する
param:
columns:Dict
ローカル側のカラムの型
table_columns:Dict
データベース側のカラムの型
new_columns:Dict
ローカル側のカラムの初期値
table_fetch:List[Dict]
データベース側の現時点でのデータ
return:
Tuple[bool,List[Dict]]
変更があった場合True
変更があった場合、更新をしたあとの列
"""
set_columns:Dict = {}
# 新しく行が追加された場合
create_items = [
{key:value}
for key,value in new_columns.items()
if key not in table_columns.keys()
]
# 行が削除された場合
delete_items = [
{key:value}
for key,value in table_columns.items()
if key not in new_columns.keys()
]
for column_name,data_type in columns.items():
table_data_type:str = table_columns.get(column_name)
# (数字)が含まれていた場合、取り除く
data_type:str = re.sub(r'\(\d+\)','',data_type)
#print(table_data_type,data_type)
# データベース側になかった場合
# 主キーで、変更があった場合
if (table_data_type == None or
table_data_type not in data_type.lower() and (
'PRIMARY KEY' in data_type or
'primary key' in data_type
)):
# listの場合tupleに変換(setがlist in listを扱えないため)
if isinstance(new_columns[column_name],list):
new_columns[column_name] = tuple(new_columns[column_name])
set_columns.update(
{
column_name:new_columns.get(column_name)
}
)
# 完全一致(大文字小文字区別せず)あった場合
# 主キーで、変更がない場合
elif (table_data_type == data_type.lower() or
(table_data_type in data_type.lower() and (
'PRIMARY KEY' in data_type or
'primary key' in data_type
))):
set_columns.update(
{
column_name:'Unchanged'
}
)
# 要素の変更がある場合True
if (len(list(set_columns.values())) == 1 and
list(set_columns.values())[0] == "Unchanged"):
unchanged = True
else:
unchanged = False
# テーブルが存在する場合、要素を更新
if len(table_fetch) > 0:
if "does not exist" not in table_fetch[0]:
for i,table in enumerate(table_fetch):
table = dict(table)
for table_key,table_value in table.items():
if set_columns.get(table_key) == "Unchanged":
table.update({table_key:table_value})
# 追加する場合、デフォルトの値を代入
for item in create_items:
for key,value in item.items():
table.update({key:value})
# 削除された場合、削除
for item in delete_items:
for key,value in item.items():
table.pop(key)
table_fetch[i] = table
#print(table_fetch)
return unchanged,table_fetch
core/pickle_save/bin/get_channel.py
from base.aio_req import (
aio_get_request
)
from dotenv import load_dotenv
load_dotenv()
import os
from typing import List,Dict,Any,Tuple
DISCORD_BASE_URL = "https://discord.com/api"
DISCORD_BOT_TOKEN = os.environ["DISCORD_BOT_TOKEN"]
async def get_discord_channel(
guild_id:int,
get_channel_type:List[int]
) -> List[Dict[str,Any]]:
"""
Discordサーバ内のチャンネルを取得します。
param:
guild_id :int
Discordのサーバid
get_channel_type :List[int]
取得するチャンネルtype
空の場合、すべて取得する
以下に説明を記す
0:テキストチャンネル
Discordサーバのテキストチャンネル
1:ダイレクトメッセージ
一ユーザへのダイレクトメッセージ
2:ボイスチャンネル
Discordサーバのボイスチャンネル
3:グループダイレクトメッセージ
複数のユーザから構成されるダイレクトメッセージ
4:カテゴリーチャンネル
テキストチャンネルやボイスチャンネルをまとめる親チャンネル
5:ギルドアナウンスチャンネル(旧ニュースチャンネル)
お気に入りにしたサーバの通知や、Discord公式の通知を受け取るチャンネル
10:アナウンスチャンネル
5で作成されるスレッドチャンネル
11:公開スレッドチャンネル
0で作成されるスレッドチャンネル
公開に設定されている場合はこちら
12:非公開スレッドチャンネル
0で作成されるスレッドチャンネル
非公開に設定されている場合はこちら
13:ステージチャンネル
ラジオのような聞き専のチャンネル
14:ギルドディレクトリチャンネル
サーバ紹介をするチャンネル
大規模なコミュニティサーバでのみ使用可能
以下のリンクに使用例あり
https://support.discord.com/hc/ja/articles/4406046651927-DIscord%E5%AD%A6%E7%94%9F%E3%83%8F%E3%83%96FAQ#:~:text=Discord%E5%AD%A6%E7%94%9F%E3%83%8F%E3%83%96%E3%81%AF%E3%80%81%E5%AD%A6%E7%94%9F,%E3%81%99%E3%82%8B%E3%81%93%E3%81%A8%E3%81%8C%E3%81%A7%E3%81%8D%E3%81%BE%E3%81%99%E3%80%82
15:フォーラムチャンネル
特定の話題について議論するチャンネル
"""
# サーバのチャンネル一覧を取得
all_channel = await aio_get_request(
url = DISCORD_BASE_URL + f'/guilds/{guild_id}/channels',
headers = {
'Authorization': f'Bot {DISCORD_BOT_TOKEN}'
}
)
# 空の場合、すべてのチャンネルを格納
if len(get_channel_type) == 0:
all_channel_filter = [
channel
for channel in all_channel
]
else:
# 該当するチャンネルだけ格納
all_channel_filter = [
channel
for channel in all_channel
if channel['type'] in get_channel_type
]
return all_channel_filter
cogs/vc_count.py
ボイスチャンネルの入退出を通知する。
import discord
from discord.ext import commands
from cogs.bin import activity
from core.start import DBot
from dotenv import load_dotenv
load_dotenv()
from base.aio_req import pickle_read
from typing import List,Dict
# ボイスチャンネルの入退室を通知
class vc_count(commands.Cog):
def __init__(self, bot : DBot):
self.bot = bot
@commands.Cog.listener(name='on_voice_state_update')
async def voice_update(
self,
member:discord.Member,
before:discord.VoiceState,
after:discord.VoiceState
):
# 参加するボイスチャンネルのid
if hasattr(after.channel,'id'):
vc_channel_id = after.channel.id
else:
# 退出した場合のボイスチャンネルのid
vc_channel_id = before.channel.id
# 使用するデータベースのテーブル名
TABLE = f'guilds_vc_signal_{member.guild.id}'
# 読み取り
vc_fetch:List[Dict] = await pickle_read(filename=TABLE)
key_vc = [
g
for g in vc_fetch
if int(g.get('vc_id')) == vc_channel_id
]
# 通知が拒否されていた場合、終了
if hasattr(before.channel,'id'):
if int(key_vc[0].get('vc_id')) == before.channel.id:
if bool(key_vc[0].get('send_signal')) == False:
return
# 通知が拒否されていた場合、終了
if hasattr(after.channel,'id'):
if int(key_vc[0].get('vc_id')) == after.channel.id:
if bool(key_vc[0].get('send_signal')) == False:
return
# Botの場合終了
if (bool(key_vc[0].get('join_bot')) == False and
member.bot == True):
return
# Discordのシステムチャンネル(welcomeメッセージが送られる場所)を取得
send_channel_id = int(key_vc[0].get('send_channel_id'))
# ない場合システムチャンネルのidを代入
if send_channel_id == None or send_channel_id == 0:
if hasattr(member.guild.system_channel,'id'):
send_channel_id = member.guild.system_channel.id
else:
return
client = self.bot.get_channel(send_channel_id)
# メンションするロールの取り出し
mentions = [
f"<@&{int(role_id)}> "
for role_id in key_vc[0].get('mention_role_id')
]
# 全体メンションが有効の場合@everyoneを追加
if (bool(key_vc[0].get('everyone_mention')) == True):
mentions.insert(0,"@everyone")
# listをstrに変換
mention_str = " ".join(mentions)
# ボイスチャンネルを移動したかどうか
check = before.channel and after.channel and before.channel != after.channel
# 退出した場合
if (after.channel is None or check):
# ボイスチャンネルの残り人数を取得
await client.send(f"現在{len(before.channel.members)}人 <@{member.id}>が{before.channel.name}から退出しました。")
# Botがボイスチャンネルに接続していて、それ以外の全員が退出した場合
if hasattr(before.channel.guild.voice_client,'is_connected'):
for bot_flag in before.channel.members:
# Bot以外のユーザーがいる場合、終了
if bot_flag.bot == False:
return
# 退出
await before.channel.guild.voice_client.disconnect()
await client.send(f"{mention_str} 通話が終了しました。",embed=discord.Embed(title="通話終了",description=""))
return
# 全員が退出した場合
if len(before.channel.members) == 0:
isum = 0
# サーバー全体のボイスチャンネル接続ユーザーを取得
for channels in member.guild.voice_channels:
isum += len(channels.voice_states.keys())
# 全てのチャンネルで通話しているユーザーがいない場合
if isum == 0:
await client.send(f"{mention_str} 通話が終了しました。",embed=discord.Embed(title="通話終了",description=""))
# 入室の場合
if (before.channel is None or check):
# ボイスチャンネルの残り人数を取得
embed = None
# 一人目の入室(通話開始)の場合、サーバーアイコンの埋め込みを作成
if len(after.channel.members) == 1:
embed = await activity.callemb(after,member)
await client.send(f"現在{len(after.channel.members)}人 {mention_str} <@{member.id}>が{after.channel.name} に参加しました。",embed=embed)
# カメラ配信が始まった場合
if before.self_video is False and after.self_video is True:
await client.send(
f"{mention_str} <@{member.id}> が、{after.channel.name}でカメラ配信を始めました。",
embed = await activity.stream(after,member,title="カメラ配信")
)
elif after.self_video is False and before.self_video is True:
await client.send(f"<@{member.id}> がカメラ配信を終了しました。")
# 画面共有が始まった場合
elif before.self_stream is False and after.self_stream is True:
# プレイ中のゲームがある場合、ゲーム名を含める
mst,embed = await activity.activity(after,member,mention_str)
await client.send(mst,embed=embed)
elif after.self_stream is False and before.self_stream is True:
await client.send(f"<@{member.id}> が画面共有を終了しました。")
def setup(bot:DBot):
return bot.add_cog(vc_count(bot))
cogs/bin/activity.py
画面共有や埋め込みなどを処理する関数。
import discord
from discord import Embed
from typing import Tuple
async def activity(
after:discord.VoiceState,
member:discord.Member,
mention_str:str
) -> Tuple[str,Embed]:
try:
embed = discord.Embed(
title='配信タイトル',
description=f'{member.activities[0].name}'
)
embed.set_author(
name=member.name, # ユーザー名
icon_url=member.display_avatar.url # アイコンを設定
)
# チャンネル名フィールド
embed.add_field(name="チャンネル", value=after.channel.name)
for activitie in member.activities:
print(activitie.__str__())
detail = None
state = None
# ステータス名がない場合は記述なし
if hasattr(member.activities[0],'details'):
detail = member.activities[0].details
if hasattr(member.activities[0],'state'):
state = member.activities[0].state
# ステータスフィールド
embed.add_field(name = detail,value = state)
# ゲーム画像がある場合代入
if hasattr(member.activities[0],'large_image_url'):
if member.activities[0].large_image_url != None:
embed.set_image(url=member.activities[0].large_image_url)
return f"{mention_str} <@{member.id}> が、{after.channel.name}で「{member.activities[0].name}」の配信を始めました。",embed
# 存在しない場合
except IndexError:
return f" {mention_str} <@{member.id}> が、{after.channel.name}で画面共有を始めました。",await stream(after,member,title="画面共有")
# 通話開始時の埋め込み作成
async def callemb(after:discord.VoiceState,member:discord.Member) -> Embed:
embed=discord.Embed(
title="通話開始",
description=f"{member.guild.name}\n<#{after.channel.id}>"
)
# サーバーのアイコンが設定されているかどうか
if hasattr(member.guild.icon,'url'):
embed.set_image(url=member.guild.icon.url)
# ない場合はユーザーのアイコンを設定
elif hasattr(member.display_avatar,'url'):
embed.set_image(url=member.display_avatar.url)
embed.set_author(
name=member.name, # ユーザー名
icon_url=member.display_avatar.url # アイコンを設定
)
return embed
# 配信開始時の埋め込み作成
async def stream(after:discord.VoiceState,member:discord.Member,title:str) -> Embed:
embed=discord.Embed(
title=title,
description=f"{member.guild.name}\n<#{after.channel.id}>"
)
embed.set_author(
name=member.name, # ユーザー名
icon_url=member.display_avatar.url # アイコンを設定
)
return embed
routers/index.py
from fastapi import APIRouter,Request,Header
from starlette.requests import Request
from fastapi.templating import Jinja2Templates
import os
from base.aio_req import (
aio_get_request,
oauth_check
)
from routers.session_base.user_session import DiscordOAuthData,DiscordUser
# new テンプレート関連の設定 (jinja2)
templates = Jinja2Templates(directory="templates")
router = APIRouter()
DISCORD_BASE_URL = "https://discord.com/api"
DISCORD_BOT_TOKEN = os.environ["DISCORD_BOT_TOKEN"]
@router.get("/")
async def index(request: Request):
# Discordの認証情報が有効かどうか判断
if request.session.get('discord_oauth_data'):
oauth_session = DiscordOAuthData(**request.session.get('discord_oauth_data'))
user_session = DiscordUser(**request.session.get('discord_user'))
print(f"アクセスしたユーザー:{user_session.username}")
# トークンの有効期限が切れていた場合、認証情報を削除
if not await oauth_check(access_token=oauth_session.access_token):
request.session.pop('discord_oauth_data')
bot_data:dict = await aio_get_request(
url = f'{DISCORD_BASE_URL}/users/@me',
headers = {
'Authorization': f'Bot {DISCORD_BOT_TOKEN}'
}
)
return templates.TemplateResponse(
'index.html',
{
'request': request,
'bot_data':bot_data,
'title':'トップページ'
}
)
routers/login.py
from fastapi import APIRouter,Request,Header
from fastapi.responses import RedirectResponse,HTMLResponse
from starlette.requests import Request
from fastapi.templating import Jinja2Templates
from dotenv import load_dotenv
load_dotenv()
from base.aio_req import (
aio_get_request,
pickle_read
)
import os
import secrets
from typing import Dict,List
router = APIRouter()
# new テンプレート関連の設定 (jinja2)
templates = Jinja2Templates(directory="templates")
DISCORD_REDIRECT_URL = f"https://discord.com/api/oauth2/authorize?response_type=code&client_id={os.environ.get('DISCORD_CLIENT_ID')}&scope={os.environ.get('DISCORD_SCOPE')}&redirect_uri={os.environ.get('DISCORD_CALLBACK_URL')}&prompt=consent"
DISCORD_BASE_URL = "https://discord.com/api"
DISCORD_BOT_TOKEN = os.environ["DISCORD_BOT_TOKEN"]
@router.get("/discord-login")
async def discord_login(request: Request):
# ランダムなstate値の生成
state = secrets.token_urlsafe(16)
request.session['state'] = state
try:
oauth_data:dict = await aio_get_request(
url = DISCORD_BASE_URL + '/users/@me',
headers = {
'Authorization': f'Bearer {request.session["discord_oauth_data"]["access_token"]}'
}
)
if oauth_data.get('message') == '401: Unauthorized':
return RedirectResponse(url=f"{DISCORD_REDIRECT_URL}&state={state}",status_code=302)
except KeyError:
return RedirectResponse(url=f"{DISCORD_REDIRECT_URL}&state={state}",status_code=302)
return templates.TemplateResponse(
'register.html',
{
'request': request,
}
)
routers/callback.py
Discordログインによる認証後の処理。
from fastapi import APIRouter, HTTPException
from fastapi.responses import RedirectResponse
from starlette.requests import Request
from fastapi.templating import Jinja2Templates
from dotenv import load_dotenv
load_dotenv()
import os
from typing import Dict,List
from base.aio_req import (
aio_get_request,
aio_post_request,
pickle_read
)
DISCORD_BASE_URL = "https://discord.com/api"
DISCORD_BOT_TOKEN = os.environ["DISCORD_BOT_TOKEN"]
router = APIRouter()
# new テンプレート関連の設定 (jinja2)
templates = Jinja2Templates(directory="templates")
@router.get('/discord-callback/')
async def discord_callback(
code:str,
state: str,
request:Request
):
# セッションの初期化
if request.session.get('discord_user') != None:
request.session.pop("discord_user")
if request.session.get('discord_connection') != None:
request.session.pop("discord_connection")
if request.session.get("discord_oauth_data") != None:
request.session.pop("discord_oauth_data")
# stateが一緒しない場合、400で終了
if request.session.get("state") != state:
raise HTTPException(status_code=400, detail="認証失敗")
# stateが一致した場合、削除して続行
else:
request.session.pop("state")
authorization_code = code
request_postdata = {
'client_id': os.environ.get('DISCORD_CLIENT_ID'),
'client_secret': os.environ.get('DISCORD_CLIENT_SECRET'),
'grant_type': 'authorization_code',
'code': authorization_code,
'redirect_uri': os.environ.get('DISCORD_CALLBACK_URL')
}
responce_json = await aio_post_request(
url = DISCORD_BASE_URL + '/oauth2/token',
data = request_postdata,
headers = {
'Content-Type': 'application/x-www-form-urlencoded'
}
)
request.session["discord_oauth_data"] = responce_json
request.session["discord_user"] = await aio_get_request(
url = DISCORD_BASE_URL + '/users/@me',
headers = {
'Authorization': f'Bearer {responce_json["access_token"]}'
}
)
# ホームページにリダイレクトする
return RedirectResponse(url="/guilds")
routers/guilds.py
設定できるサーバー一覧を表示する。
from fastapi import APIRouter
from fastapi.responses import RedirectResponse
from starlette.requests import Request
from fastapi.templating import Jinja2Templates
from dotenv import load_dotenv
load_dotenv()
import os
from base.aio_req import (
aio_get_request,
search_guild,
oauth_check
)
from routers.session_base.user_session import DiscordOAuthData,DiscordUser
DISCORD_BASE_URL = "https://discord.com/api"
DISCORD_REDIRECT_URL = f"https://discord.com/api/oauth2/authorize?response_type=code&client_id={os.environ.get('DISCORD_CLIENT_ID')}&scope={os.environ.get('DISCORD_SCOPE')}&redirect_uri={os.environ.get('DISCORD_CALLBACK_URL')}&prompt=consent"
DISCORD_BOT_TOKEN = os.environ["DISCORD_BOT_TOKEN"]
router = APIRouter()
# new テンプレート関連の設定 (jinja2)
templates = Jinja2Templates(directory="templates")
@router.get('/guilds')
async def guilds(request:Request):
# OAuth2トークンが有効かどうか判断
if request.session.get('discord_oauth_data'):
oauth_session = DiscordOAuthData(**request.session.get('discord_oauth_data'))
user_session = DiscordUser(**request.session.get('discord_user'))
print(f"アクセスしたユーザー:{user_session.username}")
# トークンの有効期限が切れていた場合、再ログインする
if not await oauth_check(access_token=oauth_session.access_token):
return RedirectResponse(url=DISCORD_REDIRECT_URL,status_code=302)
else:
return RedirectResponse(url=DISCORD_REDIRECT_URL,status_code=302)
# Botが所属しているサーバを取得
bot_in_guild_get = await aio_get_request(
url = DISCORD_BASE_URL + '/users/@me/guilds',
headers = {
'Authorization': f'Bot {DISCORD_BOT_TOKEN}'
}
)
# ログインユーザが所属しているサーバを取得
user_in_guild_get = await aio_get_request(
url = DISCORD_BASE_URL + '/users/@me/guilds',
headers = {
'Authorization': f'Bearer {oauth_session.access_token}'
}
)
# ログインユーザとBotが同じ所属を見つける
match_guild = await search_guild(
bot_in_guild_get = bot_in_guild_get,
user_in_guild_get = user_in_guild_get
)
return templates.TemplateResponse(
"guilds.html",
{
"request": request,
"match_guild":match_guild,
"title":f"{user_session.username}のサーバ一覧"
}
)
routers/logout.py
from fastapi import APIRouter,Request
from fastapi.responses import RedirectResponse
from starlette.requests import Request
from fastapi.templating import Jinja2Templates
from dotenv import load_dotenv
load_dotenv()
import os
router = APIRouter()
# new テンプレート関連の設定 (jinja2)
templates = Jinja2Templates(directory="templates")
DISCORD_REDIRECT_URL = f"https://discord.com/api/oauth2/authorize?response_type=code&client_id={os.environ.get('DISCORD_CLIENT_ID')}&scope={os.environ.get('DISCORD_SCOPE')}&redirect_uri={os.environ.get('DISCORD_CALLBACK_URL')}&prompt=consent"
DISCORD_BASE_URL = "https://discord.com/api"
@router.get("/discord-logout")
async def discord_logout(request: Request):
# セッションの初期化
if request.session.get('discord_user') != None:
request.session.pop("discord_user")
if request.session.get('discord_connection') != None:
request.session.pop("discord_connection")
if request.session.get("discord_oauth_data") != None:
request.session.pop("discord_oauth_data")
# ホームページにリダイレクトする
return RedirectResponse(url="/")
routers/guild/guild.py
from fastapi import APIRouter
from fastapi.responses import RedirectResponse
from starlette.requests import Request
from fastapi.templating import Jinja2Templates
from dotenv import load_dotenv
load_dotenv()
import os
from base.aio_req import (
aio_get_request,
oauth_check,
return_permission,
pickle_read
)
from routers.session_base.user_session import DiscordOAuthData,DiscordUser
DISCORD_BASE_URL = "https://discord.com/api"
DISCORD_REDIRECT_URL = f"https://discord.com/api/oauth2/authorize?response_type=code&client_id={os.environ.get('DISCORD_CLIENT_ID')}&scope={os.environ.get('DISCORD_SCOPE')}&redirect_uri={os.environ.get('DISCORD_CALLBACK_URL')}&prompt=consent"
DISCORD_BOT_TOKEN = os.environ["DISCORD_BOT_TOKEN"]
router = APIRouter()
# new テンプレート関連の設定 (jinja2)
templates = Jinja2Templates(directory="templates")
@router.get('/guild/{guild_id}')
async def guild(
request:Request,
guild_id:int
):
# OAuth2トークンが有効かどうか判断
if request.session.get('discord_oauth_data'):
oauth_session = DiscordOAuthData(**request.session.get('discord_oauth_data'))
user_session = DiscordUser(**request.session.get('discord_user'))
# トークンの有効期限が切れていた場合、再ログインする
if not await oauth_check(access_token=oauth_session.access_token):
return RedirectResponse(url=DISCORD_REDIRECT_URL,status_code=302)
else:
return RedirectResponse(url=DISCORD_REDIRECT_URL,status_code=302)
# サーバの情報を取得
guild = await aio_get_request(
url = DISCORD_BASE_URL + f'/guilds/{guild_id}',
headers = {
'Authorization': f'Bot {DISCORD_BOT_TOKEN}'
}
)
# サーバの権限を取得
permission = await return_permission(
guild_id=guild_id,
user_id=user_session.id,
access_token=oauth_session.access_token
)
try:
tasks = await pickle_read(
filename=f"task_{guild_id}"
)
except FileNotFoundError:
tasks = list()
return templates.TemplateResponse(
"guild/guild.html",
{
"request": request,
"guild": guild,
"guild_id": guild_id,
"tasks":tasks,
"permission":vars(permission),
"title":guild['name'] + "の設定項目一覧"
}
)
routers/guild/vc_signal/vc_signal.py
ボイスチャンネルの入退出設定の画面。
from fastapi import APIRouter
from fastapi.responses import RedirectResponse
from starlette.requests import Request
from fastapi.templating import Jinja2Templates
from dotenv import load_dotenv
load_dotenv()
import os
from typing import List
from itertools import groupby,chain
from typing import List,Dict,Any,Tuple
from base.database import PostgresDB
from base.aio_req import (
aio_get_request,
pickle_read,
return_permission,
oauth_check
)
from routers.session_base.user_session import DiscordOAuthData,DiscordUser
DISCORD_BASE_URL = "https://discord.com/api"
DISCORD_REDIRECT_URL = f"https://discord.com/api/oauth2/authorize?response_type=code&client_id={os.environ.get('DISCORD_CLIENT_ID')}&scope={os.environ.get('DISCORD_SCOPE')}&redirect_uri={os.environ.get('DISCORD_CALLBACK_URL')}&prompt=consent"
DISCORD_BOT_TOKEN = os.environ["DISCORD_BOT_TOKEN"]
USER = os.getenv('PGUSER')
PASSWORD = os.getenv('PGPASSWORD')
DATABASE = os.getenv('PGDATABASE')
HOST = os.getenv('PGHOST')
db = PostgresDB(
user=USER,
password=PASSWORD,
database=DATABASE,
host=HOST
)
router = APIRouter()
# new テンプレート関連の設定 (jinja2)
templates = Jinja2Templates(directory="templates")
@router.get('/guild/{guild_id}/vc-signal')
async def line_post(
request:Request,
guild_id:int
):
# OAuth2トークンが有効かどうか判断
if request.session.get('discord_oauth_data'):
oauth_session = DiscordOAuthData(**request.session.get('discord_oauth_data'))
user_session = DiscordUser(**request.session.get('discord_user'))
# トークンの有効期限が切れていた場合、再ログインする
if not await oauth_check(access_token=oauth_session.access_token):
return RedirectResponse(url=DISCORD_REDIRECT_URL,status_code=302)
else:
return RedirectResponse(url=DISCORD_REDIRECT_URL,status_code=302)
# 使用するデータベースのテーブル名
TABLE = f'guilds_vc_signal_{guild_id}'
# サーバのチャンネル一覧を取得
all_channel = await aio_get_request(
url = DISCORD_BASE_URL + f'/guilds/{guild_id}/channels',
headers = {
'Authorization': f'Bot {DISCORD_BOT_TOKEN}'
}
)
# チャンネルのソート
all_channel_sort,all_channels,vc_channels = await sort_discord_channel(all_channel=all_channel)
vc_cate_sort = [
tmp
for tmp in all_channel_sort
if tmp['type'] == 2 or tmp['type'] == 4
]
# text_channel = list(chain.from_iterable(all_channels))
text_channel_sort = [
tmp
for tmp in all_channel_sort
if tmp['type'] == 0
]
# サーバの情報を取得
guild = await aio_get_request(
url = DISCORD_BASE_URL + f'/guilds/{guild_id}',
headers = {
'Authorization': f'Bot {DISCORD_BOT_TOKEN}'
}
)
# ログインユーザの情報を取得
guild_user = await aio_get_request(
url = DISCORD_BASE_URL + f'/guilds/{guild_id}/members/{user_session.id}',
headers = {
'Authorization': f'Bot {DISCORD_BOT_TOKEN}'
}
)
role_list = [g for g in guild_user["roles"]]
# サーバの権限を取得
guild_user_permission = await return_permission(
guild_id=guild_id,
user_id=user_session.id,
access_token=oauth_session.access_token
)
# パーミッションの番号を取得
permission_code = await guild_user_permission.get_permission_code()
# キャッシュ読み取り
guild_table_fetch:List[Dict[str,Any]] = await pickle_read(filename='guild_set_permissions')
guild_table = [
g
for g in guild_table_fetch
if int(g.get('guild_id')) == guild_id
]
guild_permission_code = 8
guild_permission_user = list()
guild_permission_role = list()
if len(guild_table) > 0:
guild_permission_code = int(guild_table[0].get('vc_permission'))
guild_permission_user = [
user
for user in guild_table[0].get('vc_user_id_permission')
]
guild_permission_role = [
role
for role in guild_table[0].get('vc_role_id_permission')
]
and_code = guild_permission_code & permission_code
admin_code = 8 & permission_code
user_permission:str = 'normal'
# 許可されている場合、管理者の場合
if (and_code == permission_code or
admin_code == 8 or
user_session.id in guild_permission_user or
len(set(guild_permission_role) & set(role_list)) > 0
):
user_permission = 'admin'
# キャッシュ読み取り
table_fetch:List[Dict[str,Any]] = await pickle_read(filename=TABLE)
# データベースへ接続
await db.connect()
vc_set = []
# ボイスチャンネルのみを代入
app_vc = [int(x['id']) for x in vc_cate_sort if x['type'] == 2]
# テータベース側のボイスチャンネルを代入
db_vc = [int(x['vc_id']) for x in table_fetch]
if set(app_vc) != set(db_vc):
# データベース側で欠けているチャンネルを取得
missing_items = [
item
for item in table_fetch
if item not in vc_cate_sort
]
# 新しくボイスチャンネルが作成されていた場合
if len(missing_items) > 0:
for vc in missing_items:
if vc['type'] == 2:
row_values = {
'vc_id': vc['id'],
'guild_id': guild_id,
'send_signal': True,
'send_channel_id': guild.get('system_channel_id'),
'join_bot': False,
'everyone_mention': True,
'mention_role_id':[]
}
# サーバー用に新たにカラムを作成
await db.insert_row(
table_name=TABLE,
row_values=row_values
)
vc_set.append(row_values)
# ボイスチャンネルがいくつか削除されていた場合
else:
# 削除されたチャンネルを取得
missing_items = [
item
for item in all_channels
if item not in table_fetch
]
# 削除されたチャンネルをテーブルから削除
for vc in missing_items:
await db.delete_row(
table_name=TABLE,
where_clause={
'vc_id':vc['vc_id']
}
)
# 削除後のチャンネルを除き、残りのチャンネルを取得
vc_set = [
d for d in table_fetch
if not (d.get('vc_id') in [
e.get('vc_id') for e in missing_items
] )
]
else:
vc_set = table_fetch
# データベースの状況を取得
db_check_fetch = await db.select_rows(
table_name=TABLE,
columns=[],
where_clause={}
)
# データベースに登録されたが、削除されずに残っているチャンネルを削除
check = [int(c['vc_id']) for c in db_check_fetch]
del_check = set(check) - set(app_vc)
for chan_id in list(del_check):
await db.delete_row(
table_name=TABLE,
where_clause={
'channel_id':chan_id
}
)
await db.disconnect()
return templates.TemplateResponse(
"guild/vc_signal/vc_signal.html",
{
"request": request,
"vc_cate_channel": vc_cate_sort,
"text_channel": text_channel_sort,
"guild": guild,
"guild_id": guild_id,
'vc_set' : vc_set,
"user_permission":user_permission,
"title": "ボイスチャンネルの送信設定/" + guild['name']
}
)
async def sort_discord_channel(
all_channel:List
) -> Tuple[List,List,List]:
# 親カテゴリー格納用
position = []
# ソート後のチャンネル一覧
all_channel_sort = []
# レスポンスのJSONからpositionでソートされたリストを作成
sorted_channels = sorted(all_channel, key=lambda c: c['position'])
# parent_idごとにチャンネルをまとめた辞書を作成
channel_dict = {}
for parent_id, group in groupby(
sorted_channels,
key=lambda c: c['parent_id']
):
if parent_id is None:
# 親カテゴリーのないチャンネルは、キーがNoneの辞書に追加される
parent_id = 'None'
# キーがまだない場合、作成(同時に値も代入)
if channel_dict.get(str(parent_id)) == None:
channel_dict[str(parent_id)] = list(group)
# キーがある場合、リストを取り出して結合し代入
else:
listtmp:List = channel_dict[str(parent_id)]
listtmp.extend(list(group))
# リスト内包記法でボイスチャンネルとカテゴリー以外は除外
listtmp = [
tmp
for tmp in listtmp
#if tmp['type'] == 2 or tmp['type'] == 4
]
channel_dict[str(parent_id)] = listtmp
# リストを空にする
listtmp = list()
# 親カテゴリーがある場合、Noneから取り出す
for chan in channel_dict['None'][:]:
if chan['type'] == 4:
position.append(chan)
channel_dict['None'].remove(chan)
# 辞書を表示
position_index = 0
# 親カテゴリーの名前をリスト化
extracted_list = [d["name"] for d in position]
# カテゴリーに属しないチャンネルが存在する場合
if len(channel_dict['None']) != 0:
# 配列の長さをカテゴリー数+1にする
all_channels = [{}] * (len(extracted_list) + 1)
vc_channels = [{}] * (len(extracted_list) + 1)
else:
all_channels = [{}] * len(extracted_list)
vc_channels = [{}] * len(extracted_list)
for parent_id, channel in channel_dict.items():
# カテゴリー内にチャンネルがある場合
if len(channel) != 0:
for d in position:
# カテゴリーを探索、あった場合positionを代入
if d['id'] == channel[0]['parent_id']:
position_index = d['position']
break
else:
position_index = len(extracted_list)
if len(channel) != 0:
# 指定したリストの中身が空でない場合、空のリストを探す
while len(all_channels[position_index]) != 0:
if len(extracted_list) == position_index:
position_index -= 1
else:
position_index += 1
# 指定した位置にカテゴリー内のチャンネルを代入
all_channels[position_index] = channel
vc_channels[position_index] = channel
# 先頭がカテゴリーでない場合
if channel[0]['parent_id'] != None:
# 先頭にカテゴリーチャンネルを代入
all_channels[position_index].insert(0,d)
# list(list),[[],[]]を一つのリストにする
all_channel_sort = list(chain.from_iterable(all_channels))
return all_channel_sort,all_channels,vc_channels
routers/guild/admin/admin.py
管理者画面。
from fastapi import APIRouter
from fastapi.responses import RedirectResponse,HTMLResponse
from starlette.requests import Request
from fastapi.templating import Jinja2Templates
from dotenv import load_dotenv
load_dotenv()
import os
from base.aio_req import (
aio_get_request,
pickle_read,
return_permission,
oauth_check
)
from typing import List,Dict,Any,Tuple
from routers.session_base.user_session import DiscordOAuthData,DiscordUser
DISCORD_BASE_URL = "https://discord.com/api"
DISCORD_REDIRECT_URL = f"https://discord.com/api/oauth2/authorize?response_type=code&client_id={os.environ.get('DISCORD_CLIENT_ID')}&scope={os.environ.get('DISCORD_SCOPE')}&redirect_uri={os.environ.get('DISCORD_CALLBACK_URL')}&prompt=consent"
DISCORD_BOT_TOKEN = os.environ["DISCORD_BOT_TOKEN"]
router = APIRouter()
# new テンプレート関連の設定 (jinja2)
templates = Jinja2Templates(directory="templates")
@router.get('/guild/{guild_id}/admin')
async def admin(
request:Request,
guild_id:int
):
# OAuth2トークンが有効かどうか判断
if request.session.get('discord_oauth_data'):
oauth_session = DiscordOAuthData(**request.session.get('discord_oauth_data'))
user_session = DiscordUser(**request.session.get('discord_user'))
# トークンの有効期限が切れていた場合、再ログインする
if not await oauth_check(access_token=oauth_session.access_token):
return RedirectResponse(url=DISCORD_REDIRECT_URL,status_code=302)
else:
return RedirectResponse(url=DISCORD_REDIRECT_URL,status_code=302)
TABLE_NAME = 'guild_set_permissions'
# 取得上限を定める
limit = os.environ.get('USER_LIMIT',default=100)
# サーバの情報を取得
guild = await aio_get_request(
url = DISCORD_BASE_URL + f'/guilds/{guild_id}',
headers = {
'Authorization': f'Bot {DISCORD_BOT_TOKEN}'
}
)
# サーバのメンバー一覧を取得
guild_members = await aio_get_request(
url = DISCORD_BASE_URL + f'/guilds/{guild_id}/members?limit={limit}',
headers = {
'Authorization': f'Bot {DISCORD_BOT_TOKEN}'
}
)
# サーバの権限を取得
guild_user_permission = await return_permission(
guild_id=guild_id,
user_id=user_session.id,
access_token=oauth_session.access_token
)
# キャッシュ読み取り
guild_table_fetch:List[Dict[str,Any]] = await pickle_read(filename=TABLE_NAME)
guild_table = [
g
for g in guild_table_fetch
if int(g.get('guild_id')) == guild_id
]
user_permission:str = 'normal'
# 管理者の場合
if (guild_user_permission.administrator):
user_permission = 'admin'
# 管理者ではない場合、該当するサーバーidがない場合、終了
if user_permission != 'admin' or len(guild_table) == 0:
return HTMLResponse("404")
return templates.TemplateResponse(
"guild/admin/admin.html",
{
"request": request,
"guild": guild,
"guild_members":guild_members,
"guild_id": guild_id,
"guild_table":guild_table[0],
"title":request.session["discord_user"]['username']
}
)
routers/api/vc_signal_success.py
from fastapi import APIRouter
from fastapi.responses import RedirectResponse,JSONResponse
from starlette.requests import Request
from fastapi.templating import Jinja2Templates
from dotenv import load_dotenv
load_dotenv()
import os
from base.database import PostgresDB
from base.aio_req import pickle_write
from routers.api.check.post_user_check import user_checker
from routers.session_base.user_session import DiscordOAuthData,DiscordUser
from core.pickes_save.vc_columns import VC_COLUMNS
DISCORD_REDIRECT_URL = f"https://discord.com/api/oauth2/authorize?response_type=code&client_id={os.environ.get('DISCORD_CLIENT_ID')}&scope={os.environ.get('DISCORD_SCOPE')}&redirect_uri={os.environ.get('DISCORD_CALLBACK_URL')}&prompt=consent"
DISCORD_BASE_URL = "https://discord.com/api"
DISCORD_BOT_TOKEN = os.environ["DISCORD_BOT_TOKEN"]
USER = os.getenv('PGUSER')
PASSWORD = os.getenv('PGPASSWORD')
DATABASE = os.getenv('PGDATABASE')
HOST = os.getenv('PGHOST')
db = PostgresDB(
user=USER,
password=PASSWORD,
database=DATABASE,
host=HOST
)
router = APIRouter()
# new テンプレート関連の設定 (jinja2)
templates = Jinja2Templates(directory="templates")
@router.post('/api/vc-signal-success')
async def vc_post(
request:Request
):
form = await request.form()
# OAuth2トークンが有効かどうか判断
check_code = await user_checker(
request=request,
oauth_session=DiscordOAuthData(**request.session.get('discord_oauth_data')),
user_session=DiscordUser(**request.session.get('discord_user'))
)
if check_code == 302:
return RedirectResponse(url=DISCORD_REDIRECT_URL,status_code=302)
elif check_code == 400:
return JSONResponse(content={"message": "Fuck You. You are an idiot."})
# 使用するデータベースのテーブル名
TABLE = f'guilds_vc_signal_{form.get("guild_id")}'
# "send_channel_id_"で始まるキーのみを抽出し、数字部分を取得する
numbers = [
int(key.replace("send_channel_id_", ""))
for key in form.keys()
if key.startswith("send_channel_id_")
]
row_list = []
# チャンネルごとに更新をかける
for vc_id in numbers:
row_values = {}
send_signal = False
join_bot = False
everyone_mention = False
role_key = [
int(form.get(key))
for key in form.keys()
if key.startswith(f"role_{vc_id}_")
]
if form.get(f"send_signal_{vc_id}") != None:
send_signal = True
if form.get(f"join_bot_{vc_id}") != None:
join_bot = True
if form.get(f"everyone_mention_{vc_id}") != None:
everyone_mention = True
row_values = {
'send_signal':send_signal,
'send_channel_id':form.get(f"send_channel_id_{vc_id}"),
'join_bot':join_bot,
'everyone_mention':everyone_mention,
'mention_role_id':role_key
}
where_clause = {
'vc_id': vc_id
}
row_list.append({
'where_clause':where_clause,
'row_values':row_values
})
#print(row_list)
await db.connect()
await db.primary_batch_update_rows(
table_name=TABLE,
set_values_and_where_columns=row_list,
table_colum=VC_COLUMNS
)
# 更新後のテーブルを取得
table_fetch = await db.select_rows(
table_name=TABLE,
columns=[],
where_clause={}
)
await db.disconnect()
#print(table_fetch)
# pickleファイルに書き込み
await pickle_write(
filename=TABLE,
table_fetch=table_fetch
)
return templates.TemplateResponse(
'api/vcsignalsuccess.html',
{
'request': request,
'guild_id': form['guild_id'],
'title':'成功'
}
)
routers/api/admin_success.py
from fastapi import APIRouter
from fastapi.responses import RedirectResponse,JSONResponse
from starlette.requests import Request
from fastapi.templating import Jinja2Templates
from dotenv import load_dotenv
load_dotenv()
import os
from base.database import PostgresDB
from base.aio_req import pickle_write
from routers.api.check.post_user_check import user_checker
from routers.session_base.user_session import DiscordOAuthData,DiscordUser
DISCORD_REDIRECT_URL = f"https://discord.com/api/oauth2/authorize?response_type=code&client_id={os.environ.get('DISCORD_CLIENT_ID')}&scope={os.environ.get('DISCORD_SCOPE')}&redirect_uri={os.environ.get('DISCORD_CALLBACK_URL')}&prompt=consent"
DISCORD_BASE_URL = "https://discord.com/api"
DISCORD_BOT_TOKEN = os.environ["DISCORD_BOT_TOKEN"]
USER = os.getenv('PGUSER')
PASSWORD = os.getenv('PGPASSWORD')
DATABASE = os.getenv('PGDATABASE')
HOST = os.getenv('PGHOST')
db = PostgresDB(
user=USER,
password=PASSWORD,
database=DATABASE,
host=HOST
)
router = APIRouter()
# new テンプレート関連の設定 (jinja2)
templates = Jinja2Templates(directory="templates")
@router.post('/api/admin-success')
async def admin_post(
request:Request
):
form = await request.form()
# OAuth2トークンが有効かどうか判断
check_code = await user_checker(
request=request,
oauth_session=DiscordOAuthData(**request.session.get('discord_oauth_data')),
user_session=DiscordUser(**request.session.get('discord_user'))
)
if check_code == 302:
return RedirectResponse(url=DISCORD_REDIRECT_URL,status_code=302)
elif check_code == 400:
return JSONResponse(content={"message": "Fuck You. You are an idiot."})
TABLE = 'guild_set_permissions'
# 各権限コード
vc_permission_code = form.get("vc_permission_code")
# ユーザidの取り出し
vc_user_id_permission = [
int(form.get(key))
for key in form.keys()
if key.startswith('member_select_vc_')
]
# ロールidの取り出し
vc_role_id_permission = [
int(form.get(key))
for key in form.keys()
if key.startswith('role_select_vc_')
]
row_value = {
'vc_permission' :vc_permission_code,
'vc_user_id_permission' :vc_user_id_permission,
'vc_role_id_permission' :vc_role_id_permission
}
await db.connect()
await db.update_row(
table_name=TABLE,
row_values=row_value,
where_clause={
'guild_id':form.get('guild_id')
}
)
# 更新後のテーブルを取得
table_fetch = await db.select_rows(
table_name=TABLE,
columns=[],
where_clause={}
)
await db.disconnect()
# pickleファイルに書き込み
await pickle_write(
filename=TABLE,
table_fetch=table_fetch
)
return templates.TemplateResponse(
'api/adminsuccess.html',
{
'request': request,
'guild_id': form['guild_id'],
'title':'成功'
}
)
routers/api/check/post_user_check.py
from base.aio_req import (
aio_get_request,
return_permission,
oauth_check
)
from starlette.requests import Request
from dotenv import load_dotenv
load_dotenv()
import os
from typing import Dict,List
from routers.session_base.user_session import DiscordOAuthData,DiscordUser
DISCORD_BOT_TOKEN = os.environ["DISCORD_BOT_TOKEN"]
DISCORD_BASE_URL = "https://discord.com/api"
async def user_checker(
request:Request,
oauth_session:DiscordOAuthData,
user_session:DiscordUser
) -> int:
"""
postリクエストが正しいものか判別する。
param:
"""
form = await request.form()
# OAuth2トークンが有効かどうか判断
try:
await return_permission(
guild_id=form["guild_id"],
user_id=user_session.id,
access_token=oauth_session.access_token
)
# トークンの有効期限が切れている場合、ログイン画面に遷移
if (not await oauth_check(
access_token=oauth_session.access_token
)):
return 302
# サーバのメンバー一覧を取得
guild_member = await aio_get_request(
url = DISCORD_BASE_URL + f'/guilds/{form.get("guild_id")}/members/{user_session.id}',
headers = {
'Authorization': f'Bot {DISCORD_BOT_TOKEN}'
}
)
# ログインユーザがサーバーに所属していない場合(あり得ないリクエスト)
if guild_member.get('message') != None:
return 400
except KeyError:
return 400
return 200
routers/sesion_base/user_session.py
from pydantic import BaseModel,validator
from typing import List,Optional,Union,Any
class DiscordOAuthData(BaseModel):
"""
DiscordのOAuth2認証のデータ
param:
access_token :str
ユーザのアクセストークン
expires_in :int
アクセストークンの有効期限(秒)
refresh_token :str
リフレッシュトークン
scope :str
許可されている権限
token_type :str
トークンのタイプ
Bearer
"""
access_token :str
expires_in :int
refresh_token :str
scope :str
token_type :str
class DiscordUser(BaseModel):
"""
ユーザ情報
param:
id :int
ユーザid
username :str
ユーザ名
global_name :Union[str,None]
display_name :Union[str,None]
avatar :str
アバターid
avatar_decoration :Union[str,None]
アバターの説明
discriminator :str
ユーザ名の識別子
public_flags :int
flags :int
banner :Union[str,None]
プロフィールのバナー
banner_color :Union[str,None]
プロフィールのバナーのカラーコード
accent_color :int
locale :str
mfa_enabled :bool
premium_type :int
Nitroユーザのランク
"""
id :int
username :str
global_name :Union[str,None]
display_name :Union[str,None]
avatar :Optional[str]
avatar_decoration :Union[str,None]
discriminator :str
public_flags :int
flags :int
banner :Union[str,None]
banner_color :Union[str,None]
accent_color :Optional[int]
locale :str
mfa_enabled :bool
premium_type :int
class DiscordConnection(BaseModel):
"""
Discordのアカウント連携情報
param:
type :str
連携しているサービス名(Twitter,Github)
id :str
サービスでのユーザid
name :str
サービスでのユーザ名
visibility :int
friend_sync :bool
show_activity :bool
verified :bool
two_way_link :bool
metadata_visibility :int
"""
type :str
id :str
name :str
visibility :int
friend_sync :bool
show_activity :bool
verified :bool
two_way_link :bool
metadata_visibility :int
class MatchGuild(BaseModel):
"""
Botとユーザが所属のサーバーのクラス
id :int
name :str
icon :str
owner :bool
permissions :int
features :List
permissions_new :int
"""
id :int
name :str
icon :str
owner :bool
permissions :int
features :List[str]
permissions_new :int
templates/index.html
<!DOCTYPE html>
<html>
{% extends "layout.html" %}
{% block content %}
<body bgcolor="#252525" text="#ffffff">
<h1>ようこそ!!!!!!!</h1>
ここは{{bot_data.username}}の設定とかできるサイトです!!!!!
<br/>
{% if request.session.discord_oauth_data %}
<a href="/guilds" class="btn btn-primary">サーバ一覧へ</a>
{% endif %}
<br/>
</body>
{% endblock %}
</html>
templates/layout.html
<html lang="ja">
<head>
<link rel="stylesheet" href="https://stackpath.bootstrapcdn.com/bootstrap/4.3.1/css/bootstrap.min.css" integrity="sha384-ggOyR0iXCbMQv3Xipma34MD+dH/1fQ784/j6cY/iJTQUOhcWr7x9JvoRxT2MZw1T" crossorigin="anonymous">
<link rel="icon" href="/static/img/discord-icon.jpg">
<title>{{title}}</title>
<meta charset="UTF-8">
<!--レスポンシブ対応-->
<meta name="viewport" content="width=device-width, initial-scale=1, minimum-scale=1, user-scalable=yes">
</head>
<body style="background-color: #36393F; color: #FFF;">
<!-- display: flex: 要素を並列に
flex-direction: column:はみ出たコンテナを横に -->
<div style="display: flex; flex-direction: column; min-height: 100vim;">
<!--
container は各レスポンシブブレークポイントで max-width
container-fluid の場合、すべてのブレークポイントで width: 100%
-->
<div class="container">
<div class="container-fluid">
<div class="row">
<!--
mdはPC
smはタブレット
xsはスマホ
-->
<div class="col-xs-2 col-sm-10 col-md-10" style="width: 50%;">
<a href="/" class="btn btn-primary">top</a>
</div>
<div class="col-xs-2 col-sm-2 col-md-2" style="width: 50%;">
<button type="button" id="popover-btn" class="btn btn-primary">
{% if request.session.discord_oauth_data or request.session.line_oauth_data %}
Logout
{% else %}
Login
{% endif %}
</button>
<div id="popover-content" style="display: none;position: absolute;background-color: #fff;border: 1px solid #ccc;padding: 10px; margin: 0 0 0 auto;">
{% if request.session.discord_oauth_data %}
<a href="/discord-logout" class="btn btn-primary">Discord Logout</a>
{% else %}
<a href="/discord-login" class="btn btn-primary">Discord Login</a>
{% endif %}
</div>
</div>
</div>
</div>
{% block content %}
<!-- メインコンテンツ -->
{% endblock %}
</div>
</div>
<br>
<hr>
<footer class="bg-secondary" >
<div align="center">
<p>Copyright © maguro Inc. All right reserves.</p>
<p>Powered by FastAPI 0.88.0 & starlette 0.22.0</p>
</div>
</footer>
<script src="/static/js/popover.js" type="text/javascript"></script>
</body>
</html>
templates/guilds.html
<!DOCTYPE html>
<html>
{% extends "layout.html" %}
{% block content %}
<body bgcolor="#252525" text="#ffffff">
<h1>編集、閲覧可能なサーバ一覧</h1>
<ul>
{% for guild in match_guild %}
{% if not guild.icon %}
<a href="/guild/{{guild.id}}">
<img src="/static/img/discord-icon.jpg" />
</a>
{% else %}
<a href="/guild/{{guild.id}}">
<img src="https://cdn.discordapp.com/icons/{{guild.id}}/{{guild.icon}}.png" />
</a>
{% endif %}
<a href="/guild/{{guild.id}}">
<li>{{ guild.name }}</li>
</a>
<br><br>
{% endfor %}
</ul>
</body>
{% endblock %}
</html>
templates/guild/guild.html
<!DOCTYPE html>
<html>
{% extends "layout.html" %}
{% block content %}
<body bgcolor="#252525" text="#ffffff">
<br/>
{% if not guild.icon %}
<a href="/guild/{{guild.id}}">
<img src="/static/img/discord-icon.jpg" />
</a>
{% else %}
<a href="/guild/{{guild.id}}">
<img src="https://cdn.discordapp.com/icons/{{guild.id}}/{{guild.icon}}.png" />
</a>
{% endif %}
<a href="/guild/{{guild.id}}">
<li>{{ guild.name }}</li>
</a>
<br/>
{% if permission.administrator %}
管理者です。<br/>
<a href="/guild/{{guild_id}}/admin" class="btn btn-primary">管理者設定</a>
{% endif %}
<h4>設定する項目一覧</h4>
<a href="/guild/{{guild_id}}/vc-signal" class="btn btn-primary">ボイスチャンネルの通知設定</a>
<br/>
<br/>
サーバーウィジェットが有効の場合、サーバーの状況が表示されます。<br/>
<iframe title="discord_5second" style="height: 350px;" src="https://discord.com/widget?id={{guild_id}}&theme=dark/" sandbox="allow-popups allow-popups-to-escape-sandbox allow-same-origin allow-scripts"></iframe>
</body>
{% endblock %}
</html>
templates/guild/vc_signal/vc_signal.html
<!DOCTYPE html>
<html>
<script src="/static/js/box_allcheck.js" type="text/javascript"></script>
{% extends "layout.html" %}
{% block content %}
<body bgcolor="#252525" text="#ffffff">
{% if not guild.icon %}
<a href="/guild/{{guild.id}}">
<img src="/static/img/discord-icon.jpg" />
</a>
{% else %}
<a href="/guild/{{guild.id}}">
<img src="https://cdn.discordapp.com/icons/{{guild.id}}/{{guild.icon}}.png" />
</a>
{% endif %}
<a href="/guild/{{guild.id}}">
<li>{{ guild.name }}</li>
</a>
<br><br>
{% set per = ['admin'] %}
<h3>{{request.session.discord_user.username}}さんは
{% if user_permission in per %}
編集できます。
{% else %}
編集できません。閲覧のみになります。
{% endif %}
</h3>
<br/>
{% if user_permission not in per %}
<fieldset disabled>
{% endif %}
<form action="/api/vc-signal-success" name="discordForm" method="post">
<details>
<summary>
<strong>チャンネル一覧</strong>
</summary>
{% set i = [1] %}
{% set chack_flag = 0 %}
{% for vc_cate in vc_cate_channel %}
{% if vc_cate.type == 4 %}
{{ vc_cate.name }}
<br/>
{% else %}
<!--ng_channelの場合初めからチェックを付ける-->
<details>
<summary>
<strong>
{% if vc_cate.type == 2 %}
ボイスチャンネル :
{% endif %}
{{vc_cate.name}}
</strong>
</summary>
通知の送信先チャンネル<br/>
<select name="send_channel_id_{{vc_cate.id}}" size="1">
{% for chan in text_channel %}
<!--テキストチャンネルの場合-->
{% if chan.type == 0 %}
<option value={{chan.id}}
{% for vc in vc_set %}
{% if chan.id|int == vc.send_channel_id|int %}
selected
{% endif %}
{% endfor %}
>
{% if chan.parent_id %}
{% for cate in vc_cate_channel %}
{% if cate.type == 4 and cate.id == chan.parent_id %}
{{ cate.name }}:
{% endif %}
{% endfor %}
{% endif %}
{{chan.name}}
</option>
{% endif %}
{% endfor %}
</select>
<br/>
<!-- ロールの数 -->
{% set role_len = 0 %}
{% for vc in vc_set %}
{% if vc_cate.id|int == vc.vc_id|int %}
通知を送信する
{{vc.send_signal}}
{% if vc.send_signal %}
<input type="checkbox" checked="checked" name="send_signal_{{vc_cate.id}}" value="True"/>
{% else %}
<input type="checkbox" name="send_signal_{{vc_cate.id}}" value="True"/>
{% endif %}
Botの通知をする
{% if vc.join_bot %}
<input type="checkbox" checked="checked" name="join_bot_{{vc_cate.id}}" value="True"/>
{% else %}
<input type="checkbox" name="join_bot_{{vc_cate.id}}" value="True"/>
{% endif %}
<br/>
everyoneで通知する
{% if vc.everyone_mention %}
<input type="checkbox" checked="checked" name="everyone_mention_{{vc_cate.id}}" value="True"/>
{% else %}
<input type="checkbox" name="everyone_mention_{{vc_cate.id}}" value="True"/>
{% endif %}
<br/>
通知するロールの追加
<br/>
<!-- 既にロールが設定されていた場合、その数を代入 -->
{% if vc.mention_role_id|length > 0 %}
{% set role_len = vc.mention_role_id|length %}
{% endif %}
<!-- サーバ内のロール一覧をselectできるようにする -->
<select id="mySelect_{{vc_cate.id}}" onchange="selectRoleAddEvent('{{role_len}}',this)" size="1">
{% for role in guild.roles %}
<option value="{{role.id}}">
{{role.name}}
</option>
{% endfor %}
</select>
<br/>
{% set i = [1] %}
<!--
既に設定されている、または設定された場合、{{ロール名}}:x の形式で画面に表示する
xをクリックすることで削除できる
-->
<div id="{{vc_cate.id}}" class="selected-items">
{% for role_mention in vc.mention_role_id %}
<div id="{{role_mention}}" class="selected-item">
{% for role in guild.roles %}
<!-- すでに通知するロールが設定されていた場合、表示 -->
{% if role.id|int == role_mention|int %}
{{role.name}}
<span class="remove-item" onclick="removeAdd(this)">x</span>
<input type="hidden" name="role_{{vc_cate.id}}_{{i[0]}}" value="{{role.id}}"/>
{% set _ = i.append(i[0] + 1) %}
{% set _ = i.pop(0) %}
{% endif %}
{% endfor %}
</div>
{% endfor %}
</div>
<br/><br/>
{% endif %}
{% endfor %}
</details>
{% endif %}
{% endfor %}
<br/>
<input type="button" value="全選択" onClick="check(this.form,true,'channel_')"/>
<input type="button" value="全解除" onClick="check(this.form,false,'channel_')"/>
</details>
<br/>
<br/>
<input type="hidden" name="guild_id" value="{{guild_id}}"/>
{% if user_permission in per %}
<input type="submit" value="送信"/>
{% endif %}
</form>
{% if user_permission not in per %}
</fieldset>
{% endif %}
<br/>
<a href="/guild/{{guild_id}}" class="btn btn-primary">前のページに戻る</a>
<a href="/guilds" class="btn btn-primary">サーバ一覧に戻る</a>
<br/>
サーバーウィジェットが有効の場合、サーバーの状況が表示されます。<br/>
<iframe title="discord_5second" style="height: 350px;" src="https://discord.com/widget?id={{guild_id}}&theme=dark/" sandbox="allow-popups allow-popups-to-escape-sandbox allow-same-origin allow-scripts"></iframe>
<script src="/static/js/selects_bar.js" type="text/javascript"></script>
</body>
{% endblock %}
</html>
templates/admin/admin.html
<!DOCTYPE html>
<html>
<script src="/static/js/box_allcheck.js" type="text/javascript"></script>
{% extends "layout.html" %}
{% block content %}
<body bgcolor="#252525" text="#ffffff">
<h1>{{guild.name}}の権限設定</h1>
<h4>各項目にアクセスできる権限を設定できます。</h4>
{% if not guild.icon %}
<a href="/guild/{{guild.id}}">
<img src="/static/img/discord-icon.jpg" />
</a>
{% else %}
<a href="/guild/{{guild.id}}">
<img src="https://cdn.discordapp.com/icons/{{guild.id}}/{{guild.icon}}.png" />
</a>
{% endif %}
<a href="/guild/{{guild.id}}">
<li>{{ guild.name }}</li>
</a>
<br/>
<br>
<form action="/api/admin-success" name="discordForm" method="post">
<details>
<summary>
<strong>ボイスチャンネルの通知設定</strong>
</summary>
Permission Code:{{guild_table.vc_permission|int}}
<h6>編集を許可する権限コード</h6>
<input type="number" name="vc_permission_code" value="{{guild_table.vc_permission|int}}" min="0" max="1099511627775"/>
{% set member_len = guild_table.vc_user_id_permission|length %}
<h6>アクセスを許可するメンバーの選択</h6>
<!-- サーバ内のロール一覧をselectできるようにする -->
<select id="memberSelect_vc" onchange="selectAdminAddEvent('{{member_len}}',this,'member_select_vc')" size="1">
<option hidden disabled>選択してください</option>
{% for member in guild_members %}
<option value="{{member.user.id}}">
{{member.user.username}}
</option>
{% endfor %}
</select>
<br/>
<!--
既に設定されている、または設定された場合、{{ロール名}}:x の形式で画面に表示する
xをクリックすることで削除できる
-->
<div id="member_select_vc" class="selected-items">
{% for member_id in guild_table.mention_members %}
<div id="{{member_id}}" class="selected-item">
{% for member in guild_members %}
{% if member_id|int == member.user.id|int %}
{{member.user.username}}
<span class="remove-item" onclick="removeAdd(this)">x</span>
<input type="hidden" name="member_select_vc_{{i[0]}}" value="{{member.user.id}}"/>
{% set _ = i.append(i[0] + 1) %}
{% set _ = i.pop(0) %}
{% endif %}
{% endfor %}
</div>
{% endfor %}
</div>
<br/><br/>
{% set role_len = guild_table.vc_role_id_permission|length %}
<h6>アクセスを許可するロールの選択</h6>
<!-- サーバ内のロール一覧をselectできるようにする -->
<select id="roleSelect_vc" onchange="selectAdminAddEvent('{{role_len}}',this,'role_select_vc')" size="1">
<option hidden disabled>選択してください</option>
{% for role in guild.roles %}
<option value="{{role.id}}">
{{role.name}}
</option>
{% endfor %}
</select>
<br/>
<!--
既に設定されている、または設定された場合、{{ロール名}}:x の形式で画面に表示する
xをクリックすることで削除できる
-->
<div id="role_select_vc" class="selected-items">
{% for role_id in guild_table.vc_role_id_permission %}
<div id="{{role_id}}" class="selected-item">
{% for role in guild.roles %}
<!-- すでに通知するロールが設定されていた場合、表示 -->
{% if role.id|int == role_id|int %}
{{role.name}}
<span class="remove-item" onclick="removeAdd(this)">x</span>
<input type="hidden" name="role_select_vc_{{i[0]}}" value="{{role.id}}"/>
{% set _ = i.append(i[0] + 1) %}
{% set _ = i.pop(0) %}
{% endif %}
{% endfor %}
</div>
{% endfor %}
</div>
<br/><br/>
</details>
<input type="hidden" name="guild_id" value="{{guild_id}}"/>
<br/><br/>
<input type="submit" value="送信"/>
</form>
サーバーウィジェットが有効の場合、サーバーの状況が表示されます。<br/>
<iframe title="discord_5second" style="height: 350px;" src="https://discord.com/widget?id={{guild_id}}&theme=dark/" sandbox="allow-popups allow-popups-to-escape-sandbox allow-same-origin allow-scripts"></iframe>
<script src="/static/js/select_id.js" type="text/javascript"></script>
</body>
{% endblock %}
</html>
routers/api/adminsuccess.html
<!DOCTYPE html>
<html>
{% extends "layout.html" %}
{% block content %}
<body bgcolor="#252525" text="#ffffff">
<div class="col-md-2">
<a href="/guild/{{guild_id}}" class="btn btn-primary">戻る</a>
</div>
<div class="col-md-2">
<a href="/guilds" class="btn btn-primary">サーバ一覧へ</a>
</div>
</body>
{% endblock %}
</html>
routers/api/vcsignalsuccess.html
<!DOCTYPE html>
<html>
{% extends "layout.html" %}
{% block content %}
<body bgcolor="#252525" text="#ffffff">
<div class="col-md-2">
<a href="/guild/{{guild_id}}" class="btn btn-primary">戻る</a>
</div>
<div class="col-md-2">
<a href="/guilds" class="btn btn-primary">サーバ一覧へ</a>
</div>
</body>
{% endblock %}
</html>
templates/static/js/box_allcheck.js
function check(targetForm,flag,name){
for (let n = 0;n <= targetForm.length - 1;n++) {
if (targetForm.elements[n].type == "checkbox" && targetForm.elements[n].name.indexOf(name) >= 0) {
targetForm.elements[n].checked = flag;
}
}
}
function classCheck(targetForm,flag,className){
for (let i = 0;n < targetForm.length - 1;i++){
if(targetForm.elements[i].type == "checkbox" && targetForm.elements[i].className.indexOf(className) >= 0){
targetForm.elements[i].checked = flag;
}
}
}
templates/static/js/popover.js
const popoverBtn = document.getElementById('popover-btn');
const popoverContent = document.getElementById('popover-content');
popoverBtn.addEventListener('click', function() {
//console.log(popoverContent.style.display)
if (popoverContent.style.display === 'block'){
popoverContent.style.display = 'none';
}else if (popoverContent.style.display === 'none' || popoverContent.style.display == ''){
popoverContent.style.display = 'block';
}
});
templates/static/js/select_id.js
let array = {}
const selectAdminAddEvent = (len,se,selectName) => {
/*
len:int
既に設定されているメンバー、ロールの数
se:HTMLinner
select要素
selectName:str
選択された場合「名前:x」の形式で表示するdiv要素のid名
*/
// idから要素を取得
let selectId = document.getElementById(se.id);
// チャンネルidのみを引き出す
//const divId = se.id.substring(se.id.indexOf("_") + 1)
// すでにデータベース側にロールが設定された場合()
if (len > 0){
// チャンネルidをキーとした連想配列を作成
array[se.id] = []
for (let i = 0; i < len; i++){
// ロールの数だけ代入
array[se.id].push(i)
}
}
function addSelectEvent(e) {
const selectedItems = document.getElementById(selectName);
// 二重に登録されないように削除
selectId.removeEventListener("change",addSelectEvent); // change属性を削除する
const selectedOption = e.target.selectedOptions[0];
const outerTextIndex = selectedItems.outerText.indexOf(selectedOption.textContent);
// 既に登録されている場合
if (outerTextIndex !== -1){
console.log("return")
return
}
// 登録されているロールの合計
const xSum = selectedItems.outerText.split("x").length
// 名前
const item = document.createElement("div");
item.className = "selected-item";
item.textContent = selectedOption.textContent;
// xボタン
const removeBtn = document.createElement("span");
removeBtn.className = "remove-item";
removeBtn.textContent = "x";
//input
const hiddenItem = document.createElement("input");
hiddenItem.className = "hidden-item";
hiddenItem.type = "hidden";
console.log(array)
// 登録しようとしているチャンネルがある場合
if (array[se.id]){
console.log(se.id+" is true")
// 一番最後の要素がある場合(前の要素が削除されている)
if (array[se.id].includes(xSum)){
// 削除された要素の中で一番小さい値を取得(nullが格納されている)
let arrayNull = array[se.id].indexOf(null)
console.log("uwagaki="+arrayNull)
// 削除された要素に代入
if (arrayNull > -1){
//
hiddenItem.name = selectedItems.id + "_" +(arrayNull + 1);
array[se.id][arrayNull] = (arrayNull + 1)
}
// 一番最後に代入
}else{
console.log("push="+xSum)
array[se.id].push(xSum)
hiddenItem.name = selectedItems.id + "_" + xSum;
}
// チャンネルの中身が空
}else{
console.log(se.id+" is false")
array[se.id] = []
array[se.id].push(xSum)
hiddenItem.name = selectedItems.id + "_" + xSum;
}
hiddenItem.value = selectedOption.value;
// xボタンをクリックしたら削除するように仕込む
removeBtn.addEventListener("click", function () {
// 削除する要素の番号
let roleObjName = hiddenItem.name.substring(
hiddenItem.name.lastIndexOf("_") + 1
);
// 1から始まるため-1にnull
array[se.id][roleObjName - 1] = null
item.remove();
hiddenItem.remove();
selectedOption.selected = false;
});
item.appendChild(removeBtn);
selectedItems.appendChild(item);
selectedItems.appendChild(hiddenItem)
/*
<div class="selected-item">
@everyone
<span class="remove-item">x</span></div>
<input class="hidden-item" type="hidden" name="role_{{server_id}}_1" value="{{server_id}}">
*/
}
selectId.removeEventListener("change",addSelectEvent); // change属性を削除する
selectId.addEventListener("change", addSelectEvent);
}
templates/static/js/selects_bar.js
let roleArray = {}
let memberArray = {}
const selectRoleAddEvent = (roleLen,se) => {
// idから要素を取得
let selectId = document.getElementById(se.id);
// チャンネルidのみを引き出す
const divId = se.id.substring(se.id.indexOf("_") + 1)
// ロールが設定された場合
if (roleLen > 0){
// チャンネルidをキーとした連想配列を作成
roleArray[se.id] = []
for (let i = 0; i < roleLen; i++){
// ロールの数だけ代入
roleArray[se.id].push(i)
}
}
function addSelectEvent(e) {
// 親要素(サーバーidがdivid)を取得
const selectedItems = document.getElementById(divId);
// 二重に登録されないように削除
selectId.removeEventListener("change",addSelectEvent); // change属性を削除する
const selectedOption = e.target.selectedOptions[0];
const outerTextIndex = selectedItems.outerText.indexOf(selectedOption.textContent);
// 既に登録されている場合
if (outerTextIndex !== -1){
console.log("return")
return
}
// 登録されているロールの合計
const roleSum = selectedItems.outerText.split("x").length
// 名前
const item = document.createElement("div");
item.className = "selected-item";
item.textContent = selectedOption.textContent;
// xボタン
const removeBtn = document.createElement("span");
removeBtn.className = "remove-item";
removeBtn.textContent = "x";
//input
const hiddenItem = document.createElement("input");
hiddenItem.className = "hidden-item";
hiddenItem.type = "hidden";
console.log(roleArray)
// 登録しようとしているチャンネルがある場合
if (roleArray[se.id]){
console.log(se.id+" is true")
// 一番最後の要素がある場合(前の要素が削除されている)
if (roleArray[se.id].includes(roleSum)){
// 削除された要素の中で一番小さい値を取得(nullが格納されている)
let arrayNull = roleArray[se.id].indexOf(null)
console.log("uwagaki="+arrayNull)
// 削除された要素に代入
if (arrayNull > -1){
hiddenItem.name = "role_" + selectedItems.id + "_" +(arrayNull + 1);
roleArray[se.id][arrayNull] = (arrayNull + 1)
}
// 一番最後に代入
}else{
console.log("push="+roleSum)
roleArray[se.id].push(roleSum)
hiddenItem.name = "role_" + selectedItems.id + "_" + roleSum;
}
// チャンネルの中身が空
}else{
console.log(se.id+" is false")
roleArray[se.id] = []
roleArray[se.id].push(roleSum)
hiddenItem.name = "role_" + selectedItems.id + "_" + roleSum;
}
hiddenItem.value = selectedOption.value;
// xボタンをクリックしたら削除するように仕込む
removeBtn.addEventListener("click", function () {
// 削除する要素の番号
let roleObjName = hiddenItem.name.substring(hiddenItem.name.lastIndexOf("_") + 1);
// 1から始まるため-1にnull
roleArray[se.id][roleObjName - 1] = null
item.remove();
hiddenItem.remove();
selectedOption.selected = false;
});
item.appendChild(removeBtn);
selectedItems.appendChild(item);
selectedItems.appendChild(hiddenItem)
}
selectId.removeEventListener("change",addSelectEvent); // change属性を削除する
selectId.addEventListener("change", addSelectEvent);
}
const selectMemberAddEvent = (memberLen,se) => {
// idから要素を取得
let selectId = document.getElementById(se.id);
// チャンネルidのみを引き出す
const divId = se.id.substring(se.id.indexOf("_") + 1)
// 設定されているロールがあった場合
if (memberLen > 0){
// チャンネルidをキーとした連想配列を作成
memberArray[se.id] = []
for (let i = 0; i < memberLen; i++){
// ロールの数だけ代入
memberArray[se.id].push(i)
}
}
function addSelectEvent(e) {
const selectedItems = document.getElementById(divId);
// 二重に登録されないように削除
selectId.removeEventListener("change",addSelectEvent); // change属性を削除する
const selectedOption = e.target.selectedOptions[0];
const outerTextIndex = selectedItems.outerText.indexOf(selectedOption.textContent);
if (outerTextIndex !== -1){
console.log("return")
return
}
const memberSum = selectedItems.outerText.split("x").length
// 名前
const item = document.createElement("div");
item.className = "selected-item";
item.textContent = selectedOption.textContent;
// xボタン
const removeBtn = document.createElement("span");
removeBtn.className = "remove-item";
removeBtn.textContent = "x";
//input
const hiddenItem = document.createElement("input");
hiddenItem.className = "hidden-item";
hiddenItem.type = "hidden";
if (memberArray[se.id]){
console.log(se.id+" is true")
if (memberArray[se.id].includes(memberSum)){
let arrayNull = memberArray[se.id].indexOf(null)
console.log("uwagaki="+arrayNull)
if (arrayNull > -1){
hiddenItem.name = "member_" + selectedItems.id + "_" +(arrayNull + 1);
memberArray[se.id][arrayNull] = (arrayNull + 1)
}
}else{
console.log("push="+memberSum)
memberArray[se.id].push(memberSum)
hiddenItem.name = "member_" + selectedItems.id + "_" + memberSum;
}
// ロールの中身が空
}else{
console.log(se.id+" is false")
memberArray[se.id] = []
memberArray[se.id].push(memberSum)
hiddenItem.name = "member_" + selectedItems.id + "_" + memberSum;
}
hiddenItem.value = selectedOption.value;
// xボタンをクリックしたら削除するように仕込む
removeBtn.addEventListener("click", function () {
// 削除する要素の番号
let memberObjName = hiddenItem.name.substring(hiddenItem.name.lastIndexOf("_") + 1);
// 1から始まるため-1にnull
memberArray[se.id][memberObjName - 1] = null
item.remove();
hiddenItem.remove();
selectedOption.selected = false;
});
item.appendChild(removeBtn);
selectedItems.appendChild(item);
selectedItems.appendChild(hiddenItem)
}
selectId.removeEventListener("change",addSelectEvent); // change属性を削除する
selectId.addEventListener("change", addSelectEvent);
}
// 既存要素の削除
const removeAdd = (removeItem) => {
//.parentNodeで親要素の指定
removeItem.parentNode.remove()
}
実行
pyhon main.py
で実行します。
http://localhost:5000 にアクセスしましょう。
ページに遷移したら、LoginからDiscord Loginをクリックします。
認証画面に遷移するので、認証。
設定できるサーバー一覧が表示されるので、招待したサーバーを選択。
反映されていれば成功です。
終わりに
かなり雑に説明しましたが、Discordログインの例として参考になれば幸いです。