psycopg2とasyncpgの書き方の違い
psycopg2で記載されたソースコードを高速と噂のasyncpgに書き換える場合の参考資料です。
asyncioの知識が前提のサンプルコードが多いため、このままで動作するコード例を記載しているため冗長です。
なお、本当に高速なのかはこれから検証予定です。
PostgreSQLに接続する
with構文を使用しない
PostgreSQLの接続~接続解除を意識する場合の基本形で、大きな差異はありません。
import psycopg2
DSN = "postgresql://{ユーザ名}:{パスワード}@{ホスト}:{ポート}/{DB名}"
def main():
conn = psycopg2.connect(DSN)
conn.close()
if __name__ == '__main__':
main()
import asyncio
import asyncpg
DSN = "postgresql://{ユーザ名}:{パスワード}@{ホスト}:{ポート}/{DB名}"
async def main():
conn = await asyncpg.connect(DSN)
await conn.close()
if __name__ == '__main__':
asyncio.get_event_loop().run_until_complete(main())
with構文を利用する
with構文を利用する場合も差異はありません。
def main():
with psycopg2.connect(DSN) as conn:
# PostgreSQLへの処理
async def main():
with await asyncpg.connect(DSN) as conn:
# PostgreSQLへの処理
コネクションプールを利用する
PostgreSQLへの接続を作成する処理はオーバヘッドが大きいので、繰り返し利用する場合はコネクションプールが有利です。
with構文を使用しない
psycopg2とasyncpgで関数、パラメタ名は異なりますが、ほぼ機能差はなく利用できます。
なお、psycopg2をスレッド環境で利用する場合は、SimpleConnectionPoolではなくThreadedConnectionPoolに変更します。asyncpgではスレッドを利用する必要がないため該当する機能はありません。
import psycopg2.pool
def main():
pool = psycopg2.pool.SimpleConnectionPool(dsn=DSN, minconn=2, maxconn=4)
# プールからコネクションを取得
conn = pool.getconn()
# PostgreSQLへの処理
# プールにコネクションを返却
pool.putconn(conn)
async def main():
pool = await asyncpg.create_pool(dsn=DSN, min_size=2, max_size=4)
# プールからコネクションを取得
conn = await pool.acquire()
# PostgreSQLへの処理
# プールにコネクションを返却
await pool.release(conn)
with構文を利用する
with構文を利用する場合も差異はありません。
import psycopg2.pool
def main():
pool = psycopg2.pool.SimpleConnectionPool(dsn=DSN, minconn=2, maxconn=4)
# プールからコネクションを取得
with pool.getconn() as conn:
# PostgreSQLへの処理
async def main():
async with asyncpg.create_pool(dsn=DSN, min_size=2, max_size=4) as pool:
async with pool.acquire() as conn:
# PostgreSQLへの処理
SQLを実行する
SQLの実行はどちらもexecute()を使用します。
ただし、psycopg2ではcursorオブジェクトから実行しますが、asyncpgではconnectionオブジェクトから実行する差異があります。
cur = conn.cursor()
cur.execute('CREATE TEMPORARY TABLE test(data INTEGER, message TEXT)')
await conn.execute('CREATE TEMPORARY TABLE test(data INTEGER, message TEXT)')
バインドパラメタを利用する
psycopg2ではSQLのバインド変数は%sを使用し、パラメタをリストで指定します。
asyncpgではSQLのバインド変数を$1, $2のように指定し、パラメタをリストで指定しません。
バルクインサートを実施する場合は、どちらもexecutemany()を利用できますが、こちらのパラメタの指定方法は同じです。
cur.execute('INSERT INTO test VALUES(%s, %s)', [100, 'memo1'])
cur.executemany('INSERT INTO test VALUES(%s, %s)', [[200, 'memo2'], [300, 'memo3']])
await conn.execute('INSERT INTO test VALUES($1, $2)', 100, 'memo')
await conn.executemany('INSERT INTO test VALUES($1, $2)', [[200, 'memo2'], [300, 'memo3']])
1行だけ検索する
psycopg2ではfetchone()、asyncpgではfetchrow()となります。
asyncpgではディクショナリ型で返却されるため、psycopg2でcursorをRealDictCursorを利用している場合と同じ取り扱いとなります。
import psycopg2.extras
cur = conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor)
cur.execute('SELECT * from test')
row = cur.fetchone()
print(row)
row = await conn.fetchrow('SELECT * from test')
print(row)
複数行を検索する
psycopg2ではfetchall()、asyncpgではfetch()となります。
cur.execute('SELECT * from test')
rows = cur.fetchall()
for row in rows:
print(row)
rows = await conn.fetch('SELECT * from test')
for row in rows:
print(row)
カーソルを利用する
psycopg2ではscroll()、asyncpgではforward()となります。
asyncpgでは明にトランザクションを開始する必要があります。
cur.execute('SELECT * from test')
print(cur.fetchone())
cur.scroll(1)
print(cur.fetchone())
async with conn.transaction():
cur = await conn.cursor('SELECT * from test')
print(await cur.fetchrow())
await cur.forward(1)
print(await cur.fetchrow())
まとめ
import psycopg2
import psycopg2.extras
DSN = "postgresql://{ユーザ名}:{パスワード}@{ホスト}:{ポート}/{DB名}"
def main():
# PostgreSQLに接続する
conn = psycopg2.connect(DSN)
# SQLを実行する
cur = conn.cursor()
cur.execute('CREATE TEMPORARY TABLE test(data INTEGER, message TEXT)')
# バインドパラメタを利用する
cur.execute('INSERT INTO test VALUES(%s, %s)', [100, 'memo1'])
cur.executemany('INSERT INTO test VALUES(%s, %s)', [[200, 'memo2'], [300, 'memo3']])
# 1行だけ検索する
cur = conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor)
cur.execute('SELECT * from test')
row = cur.fetchone()
print(row)
# 複数行を検索する
cur.execute('SELECT * from test')
rows = cur.fetchall()
for row in rows:
print(row)
# カーソルを利用する
cur.execute('SELECT * from test')
print(cur.fetchone())
cur.scroll(1)
print(cur.fetchone())
# PostgreSQLから切断する
conn.close()
if __name__ == '__main__':
main()
import asyncio
import asyncpg
DSN = "postgresql://{ユーザ名}:{パスワード}@{ホスト}:{ポート}/{DB名}"
async def main():
# PostgreSQLに接続する
conn = await asyncpg.connect(DSN)
# SQLを実行する
await conn.execute('CREATE TEMPORARY TABLE test2(data INTEGER, message TEXT)')
# バインドパラメタを利用する
await conn.execute('INSERT INTO test VALUES($1, $2)', 100, 'memo')
await conn.executemany('INSERT INTO test VALUES($1, $2)', [[200, 'memo2'], [300, 'memo3']])
# 1行だけ検索する
row = await conn.fetchrow('SELECT * from test')
print(row)
# 複数行を検索する
rows = await conn.fetch('SELECT * from test')
for row in rows:
print(row)
# カーソルを利用する
async with conn.transaction():
cur = await conn.cursor('SELECT * from test')
print(await cur.fetchrow())
await cur.forward(1)
print(await cur.fetchrow())
# PostgreSQLから切断する
await conn.close()
if __name__ == '__main__':
asyncio.get_event_loop().run_until_complete(main())