1. 実行環境
・PostgreSQL => 13.1
・Python => 3.10.5
・psycopg2 => 2.9.3
2. 実装
今回は逐次処理と並行処理でそれぞれpostgres側にsleepの処理を実行して、処理が終了するまでの時間を比較していきたいと思います。
(実用的ではないのはご了承ください)
single.py
import pandas as pd
import psycopg2
import datetime as dt
def sample_func_1(num):
DSN = '接続情報'
# num秒待つSQL。
sql = """
SELECT pg_sleep(%(num)s) ;
"""
try:
with psycopg2.connect(DSN) as conn:
with conn.cursor() as cur:
cur.execute(sql, {'num':num})
print("done : ", num)
except Exception as e:
print(e)
def main():
start = dt.datetime.now()
sample_values = list(range(10))
for v in sample_values:
sample_func(v)
print(dt.datetime.now() - start)
if __name__ == '__main__':
main()
実行結果
done : 0
done : 1
done : 2
done : 3
done : 4
done : 5
done : 6
done : 7
done : 8
done : 9
0:00:45.949056
multi.py
import pandas as pd
import psycopg2
import threading
import datetime as dt
def sample_func_3(num):
DSN = '接続情報'
# num秒待つSQL。
sql = """
SELECT pg_sleep(%(num)s) ;
"""
try:
with psycopg2.connect(DSN) as conn:
with conn.cursor() as cur:
cur.execute(sql, {'num':num})
print("done : ", num)
except Exception as e:
print(e)
def main():
start = dt.datetime.now()
sample_values = list(range(10))
with ThreadPoolExecutor(max_workers=3) as pool:
pool.map(sample_func_3, sample_values)
print(dt.datetime.now() - start)
if __name__ == '__main__':
main()
実行結果
done : 0
done : 1
done : 2
done : 3
done : 4
done : 5
done : 6
done : 7
done : 8
done : 9
0:00:18.342594
3. 結果
single.pyの結果が約45秒、multi.pyの結果が約18秒で処理時間が1/2以上速くなりました。
きちんと並行処理の恩恵を受けられていると思います。
しかしpostgresサーバに処理を投げているので、複雑な処理を同時に実行しようとするとpostgresサーバに負荷が掛かってしまいます。
なので単にスレッド数を増やせばいいというわけではなくなってくると思います。