背景
Pythonで並列処理が簡単に実現できるのはわかったものの、プロセスごとにデータベースに接続する方法がわからず、調べてたどり着けたので、共有します。
データベース接続なしの単純な並列処理の例
リストの値を渡して表示する、という単純な並列処理は、下記のコードで実現できます。
from multiprocessing import Pool
def doSomething(x):
print(str(x[0])
if __name__ == '__main__':
idAry = [0, 1, 2, 3, 4]
p = Pool(processes=プロセッサ数)
p.map(doSomething,idAry)
p.close()
p.join()
このとき、呼び出す関数doSomethingのなかで、データベース接続して処理をしたい場合に今回ご紹介する方法が役立ちます。
毎回doSomethingのなかでデータベース接続するということでも実現できます。
だけど接続に時間がかかったり、接続上限数に達する可能性もあるので、おすすめできません。
並列処理でMySQLに接続する方法
たとえば会員マスタから会員IDを抽出し、会員ごとの購入回数を集計して表示するというプログラムを書くとします。
(SQLでGROUP BYでできるのは重々承知ですが^^;)
from multiprocessing import Pool
import mysql.connector
def init():
global cnx #接続オブジェクトをグローバル変数で定義する。
cnx = mysql.connector.connect(host="ホスト名", port="ポート番号", user="ユーザ名", password="パスワード", database="データベース名")
def doSomething(x):
curChild = cnx.cursor(buffered=True) #初期処理で作ったグローバル変数のDB接続オブジェクトからカーソルを作る
sel_sql = "SELECT COUNT(*) FROM 購買データのテーブル WHERE 会員ID = '" + str(x[0]) + "'"
curChild.execute(sel_sql)
res = curChild.fetchone()
print(str(x[0]) + " " + str(res[0]))
curChild.close()
if __name__ == '__main__':
db = mysql.connector.connect(host="ホスト名", port="ポート番号", user="ユーザ名", password="パスワード", database="データベース名")
cur = db.cursor(buffered=True)
sel_sql = "SELECT 会員ID FROM 会員マスタ"
cur.execute(sel_sql)
idAry = cur.fetchall()
p = Pool(processes=プロセッサ数, initializer=init) #initializerオプションで、初期処理関数を定義する。
p.map(doSomething, idAry)
p.close()
p.join()
initializer
で各プロセスからMySQLへの接続を呼び出す init
の定義が重要なポイントです。
ここで各プロセスの初期処理でデータベースへの接続を確立します。
その後、各プロセスの doSomething
実行では各プロセス用のデータベース接続からカーソルを作ってSQLを実行します。
接続回数は並列処理のプロセッサ数となるため、いちいち接続することもなくなり、実行速度がより高速になります。