import concurrent.futures
import datetime
import traceback
from sqlite_utils import Database
# 取得条件
CONDITIONS = [
{'db': '1', 'min': '1', 'max': '1000000'},
{'db': '2', 'min': '1000001', 'max': '1500000'},
{'db': '3', 'min': '1500001', 'max': '3000000'},
{'db': '4', 'min': '3000001', 'max': '4800000'},
{'db': '5', 'min': '4800001', 'max': '5000000'}
]
# 取得条件要素数
ELEMENT_COUNT = len(CONDITIONS)
# DBパス
DB_PATH = '/Users/user/Documents/SQL/SQLite/3/'
# DBファイル名
DB_NAME = 'test_db.sqlite3'
# DBファイルパス
DB_STR = DB_PATH + DB_NAME
# テーブル名
TABLE_NAME1 = 'large_data'
TABLE_NAME2 = 'large_data2'
TABLE_NAME3 = 'large_data3'
def create_separete_table_by_condition(arg):
'''
データベースから条件に基づきリストを取得する
'''
# パラメータ取得
cond_db = CONDITIONS[arg].get('db')
cond_min = CONDITIONS[arg].get('min')
cond_max = CONDITIONS[arg].get('max')
# journal_mode変更
query_mode = '''
PRAGMA journal_mode=MEMORY
'''
# DROPクエリ
query_drop = f'''
DROP TABLE IF EXISTS db_{cond_db}.table_{cond_db}
'''
# CREATEクエリ
query_create = f'''
CREATE TABLE db_{cond_db}.table_{cond_db} AS
SELECT
T1.id
, T1.col1
, T1.col2
, T1.col3
, T1.col4
, T1.col5
, T2.col6
, T2.col7
, T2.col8
, T2.col9
, T2.col10
, T3.col11
, T3.col12
, T3.col13
, T3.col14
, T3.col15
, datetime(CURRENT_TIMESTAMP, '+9 hours') AS create_timestamp
FROM
{TABLE_NAME1} T0
LEFT OUTER JOIN {TABLE_NAME1} T1
ON T1.id = T0.col1
AND T1.col5 = T0.col5
LEFT OUTER JOIN {TABLE_NAME2} T2
ON T2.id = T0.col1
AND T2.col10 = T0.col10
LEFT OUTER JOIN {TABLE_NAME3} T3
ON T3.id = T0.col1
AND T3.col15 = T0.col15
WHERE
T1.id BETWEEN {cond_min} AND {cond_max}
'''
# DBに接続
db_from = Database(DB_STR)
# ジャーナルモード変更
db_from.execute(query_mode)
# DBを追加
db_from.attach(f'db_{cond_db}', f'{DB_PATH}db_{cond_db}.sqlite3')
print(datetime.datetime.now(), 'db接続', cond_db, ':開始')
print(datetime.datetime.now(), 'db接続', cond_db,
f':{TABLE_NAME1} 処理行数', cond_min, 'から', cond_max)
print(datetime.datetime.now(), 'db接続', cond_db,
f':db_{cond_db}.table_{cond_db} 削除開始')
# テーブル削除
db_from.execute(query_drop)
print(datetime.datetime.now(), 'db接続', cond_db,
f':db_{cond_db}.table_{cond_db} 削除終了')
print(datetime.datetime.now(), 'db接続', cond_db,
f':db_{cond_db}.table_{cond_db} 作成開始')
# 条件に基づきリストを取得しテーブルを作成する
db_from.execute(query_create)
print(datetime.datetime.now(), 'db接続', cond_db,
f':db_{cond_db}.table_{cond_db} 作成終了')
db_from.close()
print(datetime.datetime.now(), 'db接続', cond_db, ':終了')
return True
def main():
'''
主処理
'''
print('処理開始:', datetime.datetime.now())
print('---------------')
print('マルチプロセス開始:', datetime.datetime.now())
print(ELEMENT_COUNT)
# マルチプロセスで処理実行
with concurrent.futures.ProcessPoolExecutor(max_workers=5) as executor:
futrues = {executor.submit(
create_separete_table_by_condition, arg): arg for arg in range(0, ELEMENT_COUNT, 1)}
for future in concurrent.futures.as_completed(futrues):
print(future.result(), future._state)
print('マルチプロセス終了:', datetime.datetime.now())
print('---------------')
print('処理終了:', datetime.datetime.now())
try:
if __name__ == "__main__":
main()
except Exception:
print('処理終了:', datetime.datetime.now())
traceback.print_exc()