LoginSignup
0
0

More than 3 years have passed since last update.

multiprocessing + psycopg2 でハマったところ

Posted at

やりたかったこと

複数回pd.read_sql実行する処理をmultiprocessingで並列処理したかった。

Versions

Name Version
python 3.7.3
pandas 0.24.2
numpy 1.16.2
psycopg2-binary 2.8.4
PostgreSQL 11.5

成功例(非並列ver.)

コード

import multiprocessing
import pandas as pd
import numpy as np
import psycopg2

def get_connection():
    connection = psycopg2.connect(
                        host='hostname',
                        user='username',
                        database='databasename',
                        password='password')
    return connection

def function():
    for fuga in hoge:

        # fuga, pd.read_sql(sql='SELECT yyy...', con=conn)を含むなんらかの処理

    return

if __name__ == '__main__':
    with get_connection() as conn:
        hoge = np.ravel(pd.read_sql(sql='SELECT xxx...', con=conn).to_numpy())

        function

失敗例

上記成功例のコード最下部をmultiprocessingで書き直し、それに応じて変数を食えるようにfunction()を修正したところ、エラーが出力された。

コード

def function(i):
    fuga = hoge[i]

    # fuga, pd.read_sql(sql='SELECT yyy...', con=conn)を含むなんらかの処理

    return

if __name__ == '__main__':
    with get_connection() as conn:
        hoge = np.ravel(pd.read_sql(sql='SELECT xxx...', con=conn).to_numpy())

        with multiprocessing.Pool(processes=64) as pool:
            for _ in pool.imap_unordered(function, range(len(hoge))):
                pass

エラー抜粋

multiprocessing.pool.RemoteTraceback:

# 中略

psycopg2.OperationalError: lost synchronization with server: got message type

# 中略

psycopg2.InterfaceError: connection already closed

# 中略

pandas.io.sql.DatabaseError: Execution failed on sql: SELECT yyy...
lost synchronization with server: got message type
unable to rollback

# 中略

成功例

上記失敗例のコードにおいて、function()に接続情報を書くことで回避できた。

コード

def function(i):
    with get_connection() as conn:
        fuga = hoge[i]

        # fuga, pd.read_sql(sql='SELECT yyy...', con=conn)を含むなんらかの処理

    return

注意

並列数やfunction()の処理内容に依っては下記エラーが出るが、export OMP_NUM_THREADS=1とすることで回避できた。

OMP: Error #34: System unable to allocate necessary resources for OMP thread:
OMP: System error #11: Resource temporarily unavailable
OMP: Hint Try decreasing the value of OMP_NUM_THREADS.
0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0