PythonのマルチプロセスでSQLite3からデータを取得する検証
マルチプロセスでSQLite3からデータを取得する
1.参照するテーブルのレイアウト
- test_db.sqlite3.large_data
列名 | データ・タイプ |
---|---|
id | INT |
col1 | INT |
col2 | TEXT |
〜 | |
col15 | TEXT |
2.検証用コード
- あらかじめ dataset をインストールしておくこと
pip install dataset
sqlite3_multi_process.py
import concurrent.futures
import datetime
import traceback
import dataset
# 取得条件
CONDITIONS = [
{'db': '1', 'min': '1', 'max': '1000000'},
{'db': '2', 'min': '1000001', 'max': '1500000'},
{'db': '3', 'min': '1500001', 'max': '3000000'},
{'db': '4', 'min': '3000001', 'max': '4000000'},
{'db': '5', 'min': '4000001', 'max': '5000000'}
]
# 取得条件要素数
ELEMENT_COUNT = len(CONDITIONS)
# DBパス
DB_PATH = '/Users/user/Documents/SQL/SQLite/3/'
# DBファイル名
DB_NAME = 'test_db.sqlite3'
# DBファイルパス
DB_STR = DB_PATH + DB_NAME
# テーブル名
TABLE_NAME = 'large_data'
def find_by_condition(arg):
'''
データベースから条件に基づきリストを取得する
'''
# DBに接続
db = dataset.connect(f'sqlite:///{DB_STR}')
# テーブル接続
tbl = db[TABLE_NAME]
print(datetime.datetime.now(), 'DB名', CONDITIONS[arg].get('db'), ':開始')
print(datetime.datetime.now(), '処理行数', CONDITIONS[arg].get(
'min'), 'から', CONDITIONS[arg].get('max'))
# テーブルから条件に基づきリスト取得
large_data_list = list(tbl.find(
id = {'between': (CONDITIONS[arg].get('min'), CONDITIONS[arg].get('max'))}))
db.close()
print(datetime.datetime.now(), 'DB名', CONDITIONS[arg].get(
'db'), '処理データ件数:', len(large_data_list))
print(datetime.datetime.now(), 'DB名',
CONDITIONS[arg].get('db'), ':終了')
def main():
'''
主処理
'''
print('処理開始:', datetime.datetime.now())
print('---------------')
print('マルチプロセス開始:', datetime.datetime.now())
print(ELEMENT_COUNT)
# マルチプロセスで処理実行
with concurrent.futures.ProcessPoolExecutor(max_workers = 5) as executor:
{executor.submit(find_by_condition, arg): arg for arg in range(0, ELEMENT_COUNT, 1)}
print('マルチプロセス終了:', datetime.datetime.now())
print('---------------')
print('処理終了:', datetime.datetime.now())
try:
if __name__ == "__main__":
main()
except Exception:
print('処理終了:', datetime.datetime.now())
traceback.print_exc()```
### シングルプロセスでSQLite3からデータを取得する
```sqlite3_single.py
import datetime
import traceback
import dataset
CONDITIONS = [
{'db': '1', 'min': '1', 'max': '1000000'},
{'db': '2', 'min': '1000001', 'max': '1500000'},
{'db': '3', 'min': '1500001', 'max': '3000000'},
{'db': '4', 'min': '3000001', 'max': '4000000'},
{'db': '5', 'min': '4000001', 'max': '5000000'}
]
ARGS = len(CONDITIONS)
def find_by_condition(conds, arg):
'''
データベースからリストを取得する。
'''
DB_PATH = '/Users/fumi/Documents/SQL/SQLite/3/'
DB_NAME = 'test_db.sqlite3'
TABLE_NAME = 'large_data'
DB = dataset.connect(f'sqlite:///{DB_PATH}{DB_NAME}')
LARGE_DATA = DB[TABLE_NAME]
print(datetime.datetime.now(), 'DB名', conds[arg].get('db'), ':開始')
print(datetime.datetime.now(), '処理行数', conds[arg].get(
'min'), 'から', conds[arg].get('max'))
large_data_list = list(LARGE_DATA.find(
id={'between': (conds[arg].get('min'), conds[arg].get('max'))}))
DB.close()
print(datetime.datetime.now(), 'DB名', conds[arg].get(
'db'), '処理データ件数:', len(large_data_list))
print(datetime.datetime.now(), 'DB名',
conds[arg].get('db'), ':終了')
def main(conds, args):
'''
主処理実行
'''
print('処理開始:', datetime.datetime.now())
print('---------------')
print('シングルプロセス開始:', datetime.datetime.now())
print(ARGS)
for arg in range(0, args, 1):
find_by_condition(conds, arg)
print('シングルプロセス終了:', datetime.datetime.now())
print('---------------')
print('処理終了:', datetime.datetime.now())
try:
if __name__ == "__main__":
main(CONDITIONS, ARGS)
except Exception:
print('処理終了:', datetime.datetime.now())
traceback.print_exc()