1
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

ひとりアドベントカレンダー 初めてのチャレンジ編Advent Calendar 2024

Day 5

日記BOTにクイズ機能をつけてみる② SQLまわり作成

Last updated at Posted at 2024-12-04

お疲れさまです、みやもとです。

前回は日記BOTで使うデータを格納するテーブルを準備しました。
今回はテーブルデータを操作するためのクラスを作成します。

dataclassを使ってみる

Pythonでデータベーステーブルに接続する処理を書くのが初めてのため、まずAIにあれこれ聞いて例を挙げてもらってから処理を書くことにしました。
いくつか挙げてもらった処理のうち、dataclassを使うものが個人的な感覚として読みやすかったのでまずそこから作っていきます。

models.py
from dataclasses import dataclass
from datetime import date
from typing import Optional

@dataclass
class UserStatus:
    '''ユーザーステータステーブルのデータクラス'''
    user_id: str
    status: Optional[str]
    current_diary_id: Optional[int]
    current_question_no: Optional[int]
    latest_diary_date: date


@dataclass
class Diary:
    '''日記テーブルのデータクラス'''
    id: int
    user_id: str
    diary_date: date
    original_text: Optional[str]
    english_text: Optional[str]
    japanese_text: Optional[str]
    number_of_correct_answers: int

@dataclass
class Question:
    '''問題テーブルのデータクラス'''
    id: int
    diary_id: int
    question_no: int
    question_text: str
    explanation_text: str
    mistake_flag: Optional[bool]

@dataclass
class Options:
    '''選択肢テーブルのデータクラス'''
    id: int
    question_id: int
    option_no: int
    option_text: str
    correct_flag: bool

ファイル名が「models.py」なのはAIに聞いて作ってもらったときのをそのまま使っているからですね。
entityとかにした方が良かったかなー…とちょっと後悔。
このへんの切り分けの基準とかはまた調べたり考えたりしようかと思います。
dataclass使うと項目と属性の対応がわかりやすいですね。
あとNULLABLEの項目はOptionalで明示できるのだなというのが勉強になりました。

で、データ用のクラスができたので次はSQLを発行するメソッドを準備します。
長いのでちょっと折りたたんで、部分単位で書きますね。

コード全体
querys.py
from google.cloud import bigquery
import os
from logging import getLogger
from models import UserStatus, Diary, Question, Options
from typing import Optional, List

'''
環境変数
'''
# ユーザーステータステーブルID
table_id_user_status = os.environ.get('TABLE_ID_USER_STATUS')
# 日記テーブルID
table_id_diary = os.environ.get('TABLE_ID_DIARY')
# 問題テーブルID
table_id_question = os.environ.get('TABLE_ID_QUESTION')
# 選択肢テーブルID
table_id_options = os.environ.get('TABLE_ID_OPTIONS')

# BigQueryインスタンスの作成
client = bigquery.Client()
# loggerの取得
logger = getLogger(__name__)


'''
ユーザーステータステーブルINSERT
'''
def insert_user_status(userStatus: UserStatus):
    # クエリを生成
    query = f'''INSERT INTO `{table_id_user_status}`
                (user_id, status, current_diary_id, current_question_no, latest_diary_date)
                VALUES
                (@user_id, @status, @current_diary_id, @current_question_no, @latest_diary_date)
            '''
    # データをパラメータに変換
    job_config = bigquery.QueryJobConfig(
        query_parameters=[
            bigquery.ScalarQueryParameter('user_id', 'STRING', userStatus.user_id),
            bigquery.ScalarQueryParameter('status', 'STRING', userStatus.status),
            bigquery.ScalarQueryParameter('current_diary_id', 'INTEGER', userStatus.current_diary_id),
            bigquery.ScalarQueryParameter('current_question_no', 'INTEGER', userStatus.current_question_no),
            bigquery.ScalarQueryParameter('latest_diary_date', 'DATE', userStatus.latest_diary_date)
        ]
    )

    try:
        # クエリの実行
        query_job = client.query(query, job_config=job_config)
        query_job.result()
    except Exception as e:
        # 例外が発生した場合、ログにエラーを出力
        logger.error(f'insert_user_statusでエラー発生:{e}')


'''
日記テーブルINSERT
'''
def insert_diary(diaryEntry: Diary) -> Optional[int]:
    # ID採番
    id = get_id(table_id_diary)
    # クエリを生成
    query = f'''INSERT INTO `{table_id_diary}`
                (id, user_id, diary_date, original_text, english_text, 
                 japanese_text, number_of_correct_answers)
                VALUES
                (@id, @user_id, @diary_date, @original_text, @english_text,
                 @japanese_text, @number_of_correct_answers)
            '''
    # データをパラメータに変換
    job_config = bigquery.QueryJobConfig(
        query_parameters=[
            bigquery.ScalarQueryParameter('id', 'INTEGER', id),
            bigquery.ScalarQueryParameter('user_id', 'STRING', diaryEntry.user_id),
            bigquery.ScalarQueryParameter('diary_date', 'DATE', diaryEntry.diary_date),
            bigquery.ScalarQueryParameter('original_text', 'STRING', diaryEntry.original_text),
            bigquery.ScalarQueryParameter('english_text', 'STRING', diaryEntry.english_text),
            bigquery.ScalarQueryParameter('japanese_text', 'STRING', diaryEntry.japanese_text),
            bigquery.ScalarQueryParameter('number_of_correct_answers', 'INTEGER', diaryEntry.number_of_correct_answers)
        ]
    )

    try:
        # クエリの実行
        query_job = client.query(query, job_config=job_config)
        query_job.result()
        return id
    except Exception as e:
        # 例外が発生した場合、ログにエラーを出力
        logger.error(f'insert_diaryでエラー発生:{e}')
        return None


'''
問題テーブルINSERT
'''
def insert_question(questionEntry: Question) -> Optional[int]:
    # ID採番
    id = get_id(table_id_question)
    # クエリを生成
    query = f'''INSERT INTO `{table_id_question}`
                (id, diary_id, question_no, question_text, explanation_text)
                VALUES
                (@id, @diary_id, @question_no, @question_text, @explanation_text)
            '''
    # データをパラメータに変換
    job_config = bigquery.QueryJobConfig(
        query_parameters=[
            bigquery.ScalarQueryParameter('id', 'INTEGER', id),
            bigquery.ScalarQueryParameter('diary_id', 'INTEGER', questionEntry.diary_id),
            bigquery.ScalarQueryParameter('question_no', 'INTEGER', questionEntry.question_no),
            bigquery.ScalarQueryParameter('question_text', 'STRING', questionEntry.question_text),
            bigquery.ScalarQueryParameter('explanation_text', 'STRING', questionEntry.explanation_text)
        ]
    )

    try:
        # クエリの実行
        query_job = client.query(query, job_config=job_config)
        query_job.result()
        return id
    except Exception as e:
        # 例外が発生した場合、ログにエラーを出力
        logger.error(f'insert_questionでエラー発生:{e}')
        return None


'''
選択肢テーブルINSERT
'''
def insert_option(optionEntry: Options):
    # ID採番
    id = get_id(table_id_options)
    # クエリを生成
    query = f'''INSERT INTO `{table_id_options}`
                (id, question_id, option_no, option_text, correct_flag)
                VALUES
                (@id, @question_id, @option_no, @option_text, @correct_flag)
            '''
    # データをパラメータに変換
    job_config = bigquery.QueryJobConfig(
        query_parameters=[
            bigquery.ScalarQueryParameter(
                'id', 'INTEGER', id),
            bigquery.ScalarQueryParameter(
                'question_id', 'INTEGER', optionEntry.question_id),
            bigquery.ScalarQueryParameter(
                'option_no', 'INTEGER', optionEntry.option_no),
            bigquery.ScalarQueryParameter(
                'option_text', 'STRING', optionEntry.option_text),
            bigquery.ScalarQueryParameter(
                'correct_flag', 'BOOL', optionEntry.correct_flag)
        ]
    )

    try:
        # クエリの実行
        query_job = client.query(query, job_config=job_config)
        query_job.result()
    except Exception as e:
        # 例外が発生した場合、ログにエラーを出力
        logger.error(f'insert_optionでエラー発生:{e}')


'''
ユーザーステータステーブルSELECT(ユーザーIDから取得)
'''
def select_user_status(user_id: str) -> Optional[UserStatus]:
    # クエリを生成
    query = f'''SELECT * FROM `{table_id_user_status}`
                WHERE user_id = @user_id
            '''
    # データをパラメータに変換
    job_config = bigquery.QueryJobConfig(
        query_parameters=[
            bigquery.ScalarQueryParameter(
                'user_id', 'STRING', user_id)
        ]
    )

    try:
        # クエリの実行
        query_job = client.query(query, job_config=job_config)
        result = list(query_job.result())
        if not result:
            # データを取得できなかった場合はNoneを返す
            return None
        # 1件目のデータを返す
        row = result[0]
        return UserStatus(
            user_id=row.user_id,
            status=row.status,
            current_diary_id=row.current_diary_id,
            current_question_no=row.current_question_no,
            latest_diary_date=row.latest_diary_date
        )
    except Exception as e:
        # 例外が発生した場合、ログにエラーを出力
        logger.error(f'select_user_statusでエラー発生:{e}')
        return None


'''
日記テーブルSELECT(IDから取得)
'''
def select_diary(id: int) -> Optional[Diary]:
    # クエリを生成
    query = f'''SELECT * FROM `{table_id_diary}` WHERE id = @id
            '''
    # データをパラメータに変換
    job_config = bigquery.QueryJobConfig(
        query_parameters=[
            bigquery.ScalarQueryParameter(
                'id', 'INTEGER', id)
        ]
    )

    try:
        # クエリの実行
        query_job = client.query(query, job_config=job_config)
        result = list(query_job.result())
        if not result:
            # データを取得できなかった場合はNoneを返す
            return None
        # 取得したデータの1件目を返す
        row = result[0]
        return Diary(
            id=row.id,
            user_id=row.user_id,
            diary_date=row.diary_date,
            original_text=row.original_text,
            english_text=row.english_text,
            japanese_text=row.japanese_text,
            number_of_correct_answers=row.number_of_correct_answers
        )
    except Exception as e:
        # 例外が発生した場合、ログにエラーを出力
        logger.error(f'select_diaryでエラー発生:{e}')
        return None


'''
質問テーブルSELECT(日記IDと問題番号から取得)
'''
def select_question(diary_id: int, question_no: int) -> Optional[Question]:
    # クエリを生成
    query = f'''SELECT * FROM `{table_id_question}`
                WHERE diary_id = @diary_id AND question_no = @question_no
            '''
    # データをパラメータに変換
    job_config = bigquery.QueryJobConfig(
        query_parameters=[
            bigquery.ScalarQueryParameter(
                'diary_id', 'INTEGER', diary_id),
            bigquery.ScalarQueryParameter(
                'question_no', 'INTEGER', question_no)
        ]
    )
    
    try:
        # クエリ実行
        query_job = client.query(query, job_config=job_config)
        result = list(query_job.result())
        if not result:
            # データを取得できなかった場合はNoneを返す
            return None
        # 取得したデータの1件目を返す
        row = result[0]
        return Question(
            id=row.id,
            diary_id=row.diary_id,
            question_no=row.question_no,
            question_text=row.question_text,
            explanation_text=row.explanation_text,
            mistake_flag=row.mistake_flag
        )
    except Exception as e:
        # 例外が発生した場合、ログにエラーを出力
        logger.error(f'select_questionでエラー発生:{e}')
        return None


'''
選択肢テーブルSELECT(問題IDから取得)
'''
def select_option(question_id: int) -> List[Options]:
    # クエリを生成
    query = f'''SELECT * FROM `{table_id_options}`
                WHERE question_id = @question_id
                ORDER BY option_no
            '''
    # データをパラメータに変換
    job_config = bigquery.QueryJobConfig(
        query_parameters=[
            bigquery.ScalarQueryParameter(
                'question_id', 'INTEGER', question_id)
        ]
    )

    try:
        # クエリ実行
        query_job = client.query(query, job_config=job_config)
        result = list(query_job.result())
        if not result:
            # データを取得できなかった場合は空のリストを返す
            return []
        # 取得したデータをリストに変換
        rows = []
        for row in result:
            rows.append(Options(
                id=row.id,
                question_id=row.question_id,
                option_no=row.option_no,
                option_text=row.option_text,
                correct_flag=row.correct_flag))
        return rows
    except Exception as e:
        # 例外が発生した場合、ログにエラーを出力
        logger.error(f'select_optionでエラー発生:{e}')
        return []


'''
正解判定(選択肢テーブルを問題番号と選択肢番号、正解フラグで取得してデータがあればtrueを返す)
'''
def is_correct(question_id: int,option_no: int) -> bool:
    # クエリを生成
    query = f'''SELECT correct_flag FROM `{table_id_options}`
                WHERE question_id = @question_id AND option_no = @option_no AND correct_flag = true
            '''
    # データをパラメータに変換
    job_config = bigquery.QueryJobConfig(
        query_parameters=[
            bigquery.ScalarQueryParameter(
                'question_id', 'INTEGER', question_id),
            bigquery.ScalarQueryParameter(
                'option_no', 'INTEGER', option_no)
        ]
    )

    try:
        # クエリ実行
        query_job = client.query(query, job_config=job_config)
        result = list(query_job.result())
        if not result:
            # データを取得できなかった場合はFalseを返す
            return False
        # データを取得している場合はTrueを返す
        row = result[0]
        return row.correct_flag
    except Exception as e:
        # 例外が発生した場合、ログにエラーを出力
        logger.error(f'is_correctでエラー発生:{e}')
        return False


'''
ユーザーステータステーブル更新(ステータス、処理中の日記ID、処理中の問題ID、最新の日記日付を更新する)
'''
def update_user_status(userStatus: UserStatus):
    # クエリを生成
    query = f'''UPDATE `{table_id_user_status}`
                SET status = @status, current_diary_id = @current_diary_id, current_question_no = @current_question_no, latest_diary_date = @latest_diary_date
                WHERE user_id = @user_id
            '''
    # データをパラメータに変換
    job_config = bigquery.QueryJobConfig(
        query_parameters=[
            bigquery.ScalarQueryParameter(
                'user_id', 'STRING', userStatus.user_id),
            bigquery.ScalarQueryParameter(
                'status', 'STRING', userStatus.status),
            bigquery.ScalarQueryParameter(
                'current_diary_id', 'INTEGER', userStatus.current_diary_id),
            bigquery.ScalarQueryParameter(
                'current_question_no', 'INTEGER', userStatus.current_question_no),
            bigquery.ScalarQueryParameter(
                'latest_diary_date', 'DATE', userStatus.latest_diary_date)
        ]
    )

    try:
        # クエリ実行
        query_job = client.query(query, job_config=job_config)
        query_job.result()
    except Exception as e:
        # 例外が発生した場合、ログにエラーを出力
        logger.error(f'update_user_statusでエラー発生:{e}')



'''
日記テーブル更新(正解数を更新する)
'''
def update_diary(diaryEntry: Diary):
    # クエリを生成
    query = f'''UPDATE `{table_id_diary}`
                SET number_of_correct_answers = @number_of_correct_answers 
                WHERE id = @id
            '''
    # データをパラメータに変換
    job_config = bigquery.QueryJobConfig(
        query_parameters=[
            bigquery.ScalarQueryParameter(
                'id', 'INTEGER', diaryEntry.id),
            bigquery.ScalarQueryParameter(
                'number_of_correct_answers', 'INTEGER', diaryEntry.number_of_correct_answers)
        ]
    )

    try:
        # クエリ実行
        query_job = client.query(query, job_config=job_config)
        query_job.result()
    except Exception as e:
        # 例外が発生した場合、ログにエラーを出力
        logger.error(f'update_diaryでエラー発生:{e}')


'''
問題テーブル更新(誤答フラグを更新する)
'''
def update_question(question_id: int):
    # クエリを生成
    query = f'''UPDATE `{table_id_question}`
                SET mistake_flag = true
                WHERE id = @id
            '''
    # データをパラメータに変換
    job_config = bigquery.QueryJobConfig(
        query_parameters=[
            bigquery.ScalarQueryParameter(
                'id', 'INTEGER', question_id)
        ]
    )

    try:
        # クエリ実行
        query_job = client.query(query, job_config=job_config)
        query_job.result()
    except Exception as e:
        # 例外が発生した場合、ログにエラーを出力
        logger.error(f'update_questionでエラー発生:{e}')


'''
ID採番
    指定したテーブル上の最大IDを取得して+1した値を返す
    テーブル上にデータが無い場合は1を返す
'''
def get_id(table_id) -> Optional[int]:
    # テーブルの最大IDを取得
    query = f'''SELECT COALESCE(MAX(id), 0) + 1 as next_id 
            FROM `{table_id}`
            ''' 
    query_job = client.query(query)
    result = list(query_job.result())

    if not result:
        # 結果が取得できなかった場合は1を返す
        return 1
    
    return result[0].next_id

INSERT(例:日記)

querys.py
'''
日記テーブルINSERT
'''
def insert_diary(diaryEntry: Diary) -> Optional[int]:
    # ID採番
    id = get_id(table_id_diary)
    # クエリを生成
    query = f'''INSERT INTO `{table_id_diary}`
                (id, user_id, diary_date, original_text, english_text, 
                 japanese_text, number_of_correct_answers)
                VALUES
                (@id, @user_id, @diary_date, @original_text, @english_text,
                 @japanese_text, @number_of_correct_answers)
            '''
    # データをパラメータに変換
    job_config = bigquery.QueryJobConfig(
        query_parameters=[
            bigquery.ScalarQueryParameter('id', 'INTEGER', id),
            bigquery.ScalarQueryParameter('user_id', 'STRING', diaryEntry.user_id),
            bigquery.ScalarQueryParameter('diary_date', 'DATE', diaryEntry.diary_date),
            bigquery.ScalarQueryParameter('original_text', 'STRING', diaryEntry.original_text),
            bigquery.ScalarQueryParameter('english_text', 'STRING', diaryEntry.english_text),
            bigquery.ScalarQueryParameter('japanese_text', 'STRING', diaryEntry.japanese_text),
            bigquery.ScalarQueryParameter('number_of_correct_answers', 'INTEGER', diaryEntry.number_of_correct_answers)
        ]
    )

    try:
        # クエリの実行
        query_job = client.query(query, job_config=job_config)
        query_job.result()
        return id
    except Exception as e:
        # 例外が発生した場合、ログにエラーを出力
        logger.error(f'insert_diaryでエラー発生:{e}')
        return None

日記クラスをパラメータにして、追加した日記レコードのIDを返すようにしています。
日記IDは問題テーブルに必要なので、追加メソッド内でID採番を行ってその結果を返す形ですね。
ID採番の処理はテーブルIDをパラメータにして共通化しています。

querys.py
'''
ID採番
    指定したテーブル上の最大IDを取得して+1した値を返す
    テーブル上にデータが無い場合は1を返す
'''
def get_id(table_id) -> Optional[int]:
    # テーブルの最大IDを取得
    query = f'''SELECT COALESCE(MAX(id), 0) + 1 as next_id 
            FROM `{table_id}`
            ''' 
    query_job = client.query(query)
    result = list(query_job.result())

    if not result:
        # 結果が取得できなかった場合は1を返す
        return 1
    
    return result[0].next_id

SELECT(例:質問)

query.py
'''
質問テーブルSELECT(日記IDと問題番号から取得)
'''
def select_question(diary_id: int, question_no: int) -> Optional[Question]:
    # クエリを生成
    query = f'''SELECT * FROM `{table_id_question}`
                WHERE diary_id = @diary_id AND question_no = @question_no
            '''
    # データをパラメータに変換
    job_config = bigquery.QueryJobConfig(
        query_parameters=[
            bigquery.ScalarQueryParameter(
                'diary_id', 'INTEGER', diary_id),
            bigquery.ScalarQueryParameter(
                'question_no', 'INTEGER', question_no)
        ]
    )
    
    try:
        # クエリ実行
        query_job = client.query(query, job_config=job_config)
        result = list(query_job.result())
        if not result:
            # データを取得できなかった場合はNoneを返す
            return None
        # 取得したデータの1件目を返す
        row = result[0]
        return Question(
            id=row.id,
            diary_id=row.diary_id,
            question_no=row.question_no,
            question_text=row.question_text,
            explanation_text=row.explanation_text,
            mistake_flag=row.mistake_flag
        )
    except Exception as e:
        # 例外が発生した場合、ログにエラーを出力
        logger.error(f'select_questionでエラー発生:{e}')
        return None

SELECT文を発行して取得データを返す形です。
選択肢テーブル以外は1件のみ処理する想定のため、リストのまま返さず最初の1件のみ返しています。

UPDATE(例:ユーザーステータス)

querys.py
'''
ユーザーステータステーブル更新(ステータス、処理中の日記ID、処理中の問題ID、最新の日記日付を更新する)
'''
def update_user_status(userStatus: UserStatus):
    # クエリを生成
    query = f'''UPDATE `{table_id_user_status}`
                SET status = @status, current_diary_id = @current_diary_id, current_question_no = @current_question_no, latest_diary_date = @latest_diary_date
                WHERE user_id = @user_id
            '''
    # データをパラメータに変換
    job_config = bigquery.QueryJobConfig(
        query_parameters=[
            bigquery.ScalarQueryParameter(
                'user_id', 'STRING', userStatus.user_id),
            bigquery.ScalarQueryParameter(
                'status', 'STRING', userStatus.status),
            bigquery.ScalarQueryParameter(
                'current_diary_id', 'INTEGER', userStatus.current_diary_id),
            bigquery.ScalarQueryParameter(
                'current_question_no', 'INTEGER', userStatus.current_question_no),
            bigquery.ScalarQueryParameter(
                'latest_diary_date', 'DATE', userStatus.latest_diary_date)
        ]
    )

    try:
        # クエリ実行
        query_job = client.query(query, job_config=job_config)
        query_job.result()
    except Exception as e:
        # 例外が発生した場合、ログにエラーを出力
        logger.error(f'update_user_statusでエラー発生:{e}')

更新については基本的に返す値がないので、処理を実行したらそれっきりになりますね。

SQLを発行する

お気づきかもしれませんが、今回の処理ではbigqueryが持つinsertAll等のメソッドではなくSQL文字列を作って実行しています。
Cloud SQLとBigQueryを比較した際に「BigQueryのストリーミングインサートは課金枠が別だから気を付けてね」みたいな応答があったので過剰に警戒した結果こうなりました。
多分応答速度に影響あるんだろうな、とは思いつつこのまましばらく使うつもりです。

1
1
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
1
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?