やりたかったこと
複数回pd.read_sql実行する処理をmultiprocessingで並列処理したかった。
Versions
| Name | Version |
|---|---|
| python | 3.7.3 |
| pandas | 0.24.2 |
| numpy | 1.16.2 |
| psycopg2-binary | 2.8.4 |
| PostgreSQL | 11.5 |
成功例(非並列ver.)
コード
import multiprocessing
import pandas as pd
import numpy as np
import psycopg2
def get_connection():
connection = psycopg2.connect(
host='hostname',
user='username',
database='databasename',
password='password')
return connection
def function():
for fuga in hoge:
# fuga, pd.read_sql(sql='SELECT yyy...', con=conn)を含むなんらかの処理
return
if __name__ == '__main__':
with get_connection() as conn:
hoge = np.ravel(pd.read_sql(sql='SELECT xxx...', con=conn).to_numpy())
function
失敗例
上記成功例のコード最下部をmultiprocessingで書き直し、それに応じて変数を食えるようにfunction()を修正したところ、エラーが出力された。
コード
def function(i):
fuga = hoge[i]
# fuga, pd.read_sql(sql='SELECT yyy...', con=conn)を含むなんらかの処理
return
if __name__ == '__main__':
with get_connection() as conn:
hoge = np.ravel(pd.read_sql(sql='SELECT xxx...', con=conn).to_numpy())
with multiprocessing.Pool(processes=64) as pool:
for _ in pool.imap_unordered(function, range(len(hoge))):
pass
エラー抜粋
multiprocessing.pool.RemoteTraceback:
# 中略
psycopg2.OperationalError: lost synchronization with server: got message type
# 中略
psycopg2.InterfaceError: connection already closed
# 中略
pandas.io.sql.DatabaseError: Execution failed on sql: SELECT yyy...
lost synchronization with server: got message type
unable to rollback
# 中略
成功例
上記失敗例のコードにおいて、function()に接続情報を書くことで回避できた。
コード
def function(i):
with get_connection() as conn:
fuga = hoge[i]
# fuga, pd.read_sql(sql='SELECT yyy...', con=conn)を含むなんらかの処理
return
注意
並列数やfunction()の処理内容に依っては下記エラーが出るが、export OMP_NUM_THREADS=1とすることで回避できた。
OMP: Error #34: System unable to allocate necessary resources for OMP thread:
OMP: System error #11: Resource temporarily unavailable
OMP: Hint Try decreasing the value of OMP_NUM_THREADS.