0
0

Python、SQLite3でのマルチプロセステーブル生成

Last updated at Posted at 2024-07-21

Python、SQLite3でのマルチプロセステーブル生成検証

1.検証内容

  • 1つのDBのテーブルを参照する。参照結果をもとに複数のDBに各々テーブルを作成する
  • sqlite-utilsモジュールを使用して、1SQLクエリで複数のDBのテーブルを使用する
  • マルチプロセスでテーブルを参照、作成する

2.参照するテーブルのレイアウト

  • test_db.sqlite3.large_data
  • test_db.sqlite3.large_data2
  • test_db.sqlite3.large_data3
列名 データ・タイプ
id INT
col1 INT
col2 TEXT
col15 TEXT

3.作成するテーブルのレイアウト

  • db_1.sqlite3.table_1
  • db_2.sqlite3.table_2
  • db_3.sqlite3.table_3
  • db_4.sqlite3.table_4
  • db_5.sqlite3.table_5
列名 データ・タイプ
id INT
col1 INT
col2 TEXT
col15 TEXT
create_timestamp N/A

4.検証コード

  • あらかじめ sqlite-utils をインストールしておくこと
pip install sqlite-utils
sqlite3_multi_process_no_trace.py
import concurrent.futures
import datetime
import traceback
from sqlite_utils import Database

# 取得条件
CONDITIONS = [
    {'db': '1', 'min': '1', 'max': '1000000'},
    {'db': '2', 'min': '1000001', 'max': '1500000'},
    {'db': '3', 'min': '1500001', 'max': '3000000'},
    {'db': '4', 'min': '3000001', 'max': '4800000'},
    {'db': '5', 'min': '4800001', 'max': '5000000'}
]
# 取得条件要素数
ELEMENT_COUNT = len(CONDITIONS)
# DBパス
DB_PATH = '/Users/user/Documents/SQL/SQLite/3/'
# DBファイル名
DB_NAME = 'test_db.sqlite3'
# DBファイルパス
DB_STR = DB_PATH + DB_NAME
# テーブル名
TABLE_NAME1 = 'large_data'
TABLE_NAME2 = 'large_data2'
TABLE_NAME3 = 'large_data3'


def create_separete_table_by_condition(arg):
    '''
    データベースから条件に基づきリストを取得する
    '''
    # パラメータ取得
    cond_db = CONDITIONS[arg].get('db')
    cond_min = CONDITIONS[arg].get('min')
    cond_max = CONDITIONS[arg].get('max')

    # journal_mode変更
    query_mode = '''
    PRAGMA journal_mode=MEMORY
    '''

    # DROPクエリ
    query_drop = f'''
    DROP TABLE IF EXISTS db_{cond_db}.table_{cond_db} 
    '''

    # CREATEクエリ
    query_create = f'''
    CREATE TABLE db_{cond_db}.table_{cond_db} AS
    SELECT
        T1.id
        , T1.col1
        , T1.col2
        , T1.col3
        , T1.col4
        , T1.col5
        , T2.col6
        , T2.col7
        , T2.col8
        , T2.col9
        , T2.col10
        , T3.col11
        , T3.col12
        , T3.col13
        , T3.col14
        , T3.col15
        , datetime(CURRENT_TIMESTAMP, '+9 hours') AS create_timestamp
    FROM
        {TABLE_NAME1} T0
    LEFT OUTER JOIN {TABLE_NAME1} T1
        ON  T1.id   = T0.col1
        AND T1.col5 = T0.col5
    LEFT OUTER JOIN {TABLE_NAME2} T2
        ON  T2.id    = T0.col1
        AND T2.col10 = T0.col10
    LEFT OUTER JOIN {TABLE_NAME3} T3
        ON  T3.id    = T0.col1
        AND T3.col15 = T0.col15
    WHERE
        T1.id BETWEEN {cond_min} AND {cond_max}
    '''

    # DBに接続
    db_from = Database(DB_STR)
    # ジャーナルモード変更
    db_from.execute(query_mode)
    # DBを追加
    db_from.attach(f'db_{cond_db}', f'{DB_PATH}db_{cond_db}.sqlite3')

    print(datetime.datetime.now(), 'db接続', cond_db, ':開始')
    print(datetime.datetime.now(), 'db接続', cond_db,
          f'{TABLE_NAME1} 処理行数', cond_min, 'から', cond_max)

    print(datetime.datetime.now(), 'db接続', cond_db,
          f':db_{cond_db}.table_{cond_db} 削除開始')

    # テーブル削除
    db_from.execute(query_drop)

    print(datetime.datetime.now(), 'db接続', cond_db,
          f':db_{cond_db}.table_{cond_db} 削除終了')

    print(datetime.datetime.now(), 'db接続', cond_db,
          f':db_{cond_db}.table_{cond_db} 作成開始')

    # 条件に基づきリストを取得しテーブルを作成する
    db_from.execute(query_create)

    print(datetime.datetime.now(), 'db接続', cond_db,
          f':db_{cond_db}.table_{cond_db} 作成終了')
    db_from.close()

    print(datetime.datetime.now(), 'db接続', cond_db, ':終了')

    return True


def main():
    '''
    主処理
    '''
    print('処理開始:', datetime.datetime.now())
    print('---------------')
    print('マルチプロセス開始:', datetime.datetime.now())
    print(ELEMENT_COUNT)

    # マルチプロセスで処理実行
    with concurrent.futures.ProcessPoolExecutor(max_workers=5) as executor:
        futrues = {executor.submit(
            create_separete_table_by_condition, arg): arg for arg in range(0, ELEMENT_COUNT, 1)}

        for future in concurrent.futures.as_completed(futrues):
            print(future.result(), future._state)

    print('マルチプロセス終了:', datetime.datetime.now())
    print('---------------')
    print('処理終了:', datetime.datetime.now())


try:
    if __name__ == "__main__":
        main()

except Exception:
    print('処理終了:', datetime.datetime.now())
    traceback.print_exc()
0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0