pandas.read_sqlでタイムアウト例外を実装する方法をメモします。
pandas.read_sqlにはタイムアウトに関するパラメータが存在しないので、他の方法でタイムアウト例外処理を実装する必要があります。
今回はfunc_timeoutライブラリを使うことでタイムアウト例外処理を実装できました。
環境
- Python: 3.6.5
 - PostgreSQL: 10.4
 - psycopg2: 2.7.5 (dt dec pq3 ext lo64)
 - pandas: 0.24.2
 - func_timeout: 4.3.5
 
定数定義
DB接続情報とタイムアウト上限値を設定します。
# PostgreSQLの接続情報dict作成
param = {
    'port': '5432',
    'user': '*****',
    'password': '*****',
    'host': 'localhost',
    'dbname': '*****'
}
# timeoutを設定
timeout = 600 # pandas.read_sqlでのTIMEOUT上限(s)
メソッド定義
pandas.read_sqlにタイムアウト例外処理を実装した、exec_queryを定義します。
def exec_query(param: dict, query: str, timeout: int) -> pd.DataFrame:
    """
    Redshiftにクエリを発行し、実行結果を取得するメソッド。
    Parameters
    ----------
    param : dict
        接続情報のdict。
    query : string
        クエリ文字列。
    timeout : int
        タイムアウト上限(s)。
    Returns
    ----------
    result : pd.DataFrame
        実行結果のDataFrame。
    """
    with psycopg2.connect(**param) as conn:
        
        try:
            result = func_timeout(timeout, pd.read_sql, args=(query, conn))
            print('read_sql could complete.')
        # タイムアウト例外処理
        except FunctionTimedOut:
            print(f'read_sql could not complete within {timeout} seconds, hence terminated.')
            result = None
        # 上記以外の例外処理
        except Exception as e:
            print(e)
            result = None
    
    conn.close()
    print('connection object status is :', conn)
    return result
クエリ発行
DBに発行したいクエリ文字列を先ほど定義したexec_queryの引数として実行します。
# クエリ文字列作成
query = '''
select 1 from hogefuga
'''
# クエリ発行、実行結果を取得
result = exec_query(param, query, timeout)
最後に
タイムアウトくらい実装しておいてくれよなぁと思う今日この頃です。
参考