やりたかったこと
複数回pd.read_sql
実行する処理をmultiprocessing
で並列処理したかった。
Versions
Name | Version |
---|---|
python | 3.7.3 |
pandas | 0.24.2 |
numpy | 1.16.2 |
psycopg2-binary | 2.8.4 |
PostgreSQL | 11.5 |
成功例(非並列ver.)
コード
import multiprocessing
import pandas as pd
import numpy as np
import psycopg2
def get_connection():
connection = psycopg2.connect(
host='hostname',
user='username',
database='databasename',
password='password')
return connection
def function():
for fuga in hoge:
# fuga, pd.read_sql(sql='SELECT yyy...', con=conn)を含むなんらかの処理
return
if __name__ == '__main__':
with get_connection() as conn:
hoge = np.ravel(pd.read_sql(sql='SELECT xxx...', con=conn).to_numpy())
function
失敗例
上記成功例のコード最下部をmultiprocessing
で書き直し、それに応じて変数を食えるようにfunction()
を修正したところ、エラーが出力された。
コード
def function(i):
fuga = hoge[i]
# fuga, pd.read_sql(sql='SELECT yyy...', con=conn)を含むなんらかの処理
return
if __name__ == '__main__':
with get_connection() as conn:
hoge = np.ravel(pd.read_sql(sql='SELECT xxx...', con=conn).to_numpy())
with multiprocessing.Pool(processes=64) as pool:
for _ in pool.imap_unordered(function, range(len(hoge))):
pass
エラー抜粋
multiprocessing.pool.RemoteTraceback:
# 中略
psycopg2.OperationalError: lost synchronization with server: got message type
# 中略
psycopg2.InterfaceError: connection already closed
# 中略
pandas.io.sql.DatabaseError: Execution failed on sql: SELECT yyy...
lost synchronization with server: got message type
unable to rollback
# 中略
成功例
上記失敗例のコードにおいて、function()
に接続情報を書くことで回避できた。
コード
def function(i):
with get_connection() as conn:
fuga = hoge[i]
# fuga, pd.read_sql(sql='SELECT yyy...', con=conn)を含むなんらかの処理
return
注意
並列数やfunction()
の処理内容に依っては下記エラーが出るが、export OMP_NUM_THREADS=1
とすることで回避できた。
OMP: Error #34: System unable to allocate necessary resources for OMP thread:
OMP: System error #11: Resource temporarily unavailable
OMP: Hint Try decreasing the value of OMP_NUM_THREADS.