Python から pyscopg2 を使って PostgreSQL サーバにアクセスするときによくやる操作をまとめておく。
他にも思いついたら随時追記していく。
[2020.08 追記] asyncpg 版も書いた: asyncpg でよくやる操作まとめ
DB-API について
psycopg2 は PEP 249 -- Python Database API Specification v2.0 で定められているインタフェースを満たす API を提供している。
import psycopg2
print(psycopg2.apilevel) #=> '2.0'
これにより、MySQL などの他のデータソースと同じようにコネクションやカーソルを操作してデータベースを触ることができる。
PostgreSQL サーバに接続する
psycopg2 は接続情報を文字列で指定するだけでよしなにパースして接続してくれる。
from os import getenv
import psycopg2
from psycopg2.extensions import connection
def get_connection() -> connection:
return psycopg2.connect(getenv("DATABASE_URL"))
環境変数 DATABASE_URL
には postgresql://{username}:{password}@{hostname}:{port}/{database}
というフォーマットでデータベースの接続情報を指定する。
以下、この get_connection
関数を使ってコネクションオブジェクトを取得するものとする。
クエリを実行する
コネクションオブジェクトからカーソルオブジェクトを取得して、クエリを実行する。
conn = get_connection()
cur = conn.cursor()
cur.execute('SELECT * FROM users')
cur.close()
conn.close()
with
文を使うと以下のように書ける。
with get_connection() as conn:
with conn.cursor() as cur:
cur.execute('SELECT * FROM users')
例外が発生した場合などに close し忘れる等のミスを防げるので、 with
文を利用した方がいい。
クエリにパラメータを埋め込む
cur.execute
の第二引数にタプル (またはリスト) を渡すことでクエリにパラメータを安全に埋め込むことができる。
name = "' OR 1=1 --" # 悪意のあるパラメータ
cur.execute('SELECT * FROM users WHERE name = %s', (name,))
print(cur.query) #=> "SELECT * FROM users WHERE name = ''' OR 1=1 --'"
クエリの実行結果を取得する
ひとつだけ取得する
cur.execute('SELECT COUNT(1) FROM users')
(count,) = cur.fetchone()
まとめて取得する
cur.execute('SELECT * FROM users')
rows = cur.fetchall()
ひとつずつ取得する
cur.execute('SELECT * FROM users')
for row in cur:
print(row)
実行結果のカラム名を取得する
カーソルオブジェクトの description
に各カラムの情報が含まれている。
cur.execute('SELECT * FROM users')
colnames = [col.name for col in cur.description]
これを使うと、実行結果を Pandas DataFrame として読み込むことも簡単にできる。
import pandas as pd
with conn.cursor() as cur:
cur.execute('SELECT * FROM users')
df = pd.DataFrame(cur.fetchall(), columns=[col.name for col in cur.description])
実行結果を辞書形式で取得する
カーソルオブジェクト取得時の cursor_factory
パラメータにpsycopg2.extras.DictCursor
を指定すると、実行結果がディクショナリとして取得できる。
from psycopg2.extras import DictCursor
with get_connection() as conn:
with conn.cursor(cursor_factory=DictCursor) as cur:
cur.execute('SELECT COUNT(1) AS count FROM users')
row = cur.fetchone()
print(row) #=> { "count": 123 }
レコードを追加する
with get_connection() as conn:
with conn.cursor() as cur:
cur.execute('INSERT INTO users (name) VALUES (%s)', ('foo',))
conn.commit()
psycopg2 ではデフォルトでトランザクションが有効になっているので commit を行わないと反映されない。
コネクションオブジェクトの生成に with
文を利用していると、ブロック内で例外が発生した場合に自動で conn.rollback()
が呼ばれるため、明示的にロールバックを実行する必要はない。
トランザクションを無効にしたい 自動で commit したい場合は autocommit = True
を設定する。
with get_connection() as conn:
conn.autocommit = True
with conn.cursor() as cur:
cur.execute('INSERT INTO users (name) VALUES (%s)', ('foo',))
(2023.09 追記) autocommit = True
はトランザクションを無効にする設定だと思っていたが、 with 句を使用してコネクションを作成した場合は autocommit = True
を設定してもトランザクションそのものは作成されてしまうらしい。1
トランザクションを無効にしたい場合は、with 句を使用せずに書く必要がある。
conn = get_connection()
conn.autocommit = True
try:
with conn.cursor() as cur:
cur.execute('INSERT INTO users (name) VALUES (%s)', ('foo',))
finally:
conn.close()
バイナリデータを追加する
bytea
型のカラムにバイナリデータを INSERT したい場合は、データを psycopg2.Binary()
でラップする。
b = b'some binary data'
with get_connection() as conn:
with conn.cursor() as cur:
cur.execute('INSERT INTO users (name, image) VALUES (%s, %s)', ('foo', psycopg2.Binary(b)))
conn.commit()
クエリのタイムアウトを設定する
psycopg2 にはクエリのタイムアウトを設定するようなパラメータはなさそうなので、自分で statement_timeout
を設定する。
from psycopg2.extensions import QueryCanceledError
try:
with get_connection() as conn:
with conn.cursor() as cur:
cur.execute('SET statement_timeout TO 100') # 0.1秒でタイムアウトさせる
cur.execute('SELECT pg_sleep(1)') # 実行に1秒かかるクエリの例
except QueryCanceledError as err:
print(err) #=> "canceling statement due to statement timeout"
SET
で設定した値はそのセッション内でのみ有効なので、他のクエリに影響することはない。同じセッション内で別のクエリも実行する場合は RESET statement_timeout
すると値をリセットできる。
サーバサイドカーソルを利用する
通常の cur.execute()
では、クエリの実行結果をすべてメモリに載せてからカーソルオブジェクトが参照していく。そのため、メモリに乗り切らないほど膨大なレコードが返ってくるクエリを実行することができない。
このような場合はサーバサイドカーソルを利用する。
サーバサイドカーソルを利用するとデータベース側で DECLARE CURSOR
されるため、実行結果を少しずつ取得して処理することができるようになる。
詳しくは: PythonとDB: DBIのcursorを理解する - Qiita
psycopg2 では、名前付きカーソルを作成するとサーバサイドカーソルになる。
with conn.cursor('query1') as cur:
一度の fetch で取得する行数は cur.itersize
で定められていて、デフォルトでは 2000 行ずつ取得するようになっている。特に問題がなければこのままでいいが、変更することもできる。
cur.itersize = 10000
サーバサイドカーソルを利用する際の注意点
サーバサイドカーソル利用時は、execute した時点ではカーソルが作成されるだけでまだクエリは実行されていない。クエリが実行されるのは最初に fetch したときなので、それ以降でないと cur.description
や cur.rownumber
を利用することができない。
with conn.cursor('query1') as cur:
cur.execute('SELECT * FROM users')
print(cur.description) # None
cur.fetchone()
print(cur.description) # カラム情報が入っている
execute の直後にカラム情報を取得する処理を書いている場合は注意が必要。