395
376

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

psycopg2 でよくやる操作まとめ

Last updated at Posted at 2017-10-25

Python から pyscopg2 を使って PostgreSQL サーバにアクセスするときによくやる操作をまとめておく。

他にも思いついたら随時追記していく。

[2020.08 追記] asyncpg 版も書いた: asyncpg でよくやる操作まとめ

DB-API について

psycopg2 は PEP 249 -- Python Database API Specification v2.0 で定められているインタフェースを満たす API を提供している。

import psycopg2

print(psycopg2.apilevel)  #=> '2.0'

これにより、MySQL などの他のデータソースと同じようにコネクションやカーソルを操作してデータベースを触ることができる。

PostgreSQL サーバに接続する

psycopg2 は接続情報を文字列で指定するだけでよしなにパースして接続してくれる。

from os import getenv
import psycopg2
from psycopg2.extensions import connection


def get_connection() -> connection:
    return psycopg2.connect(getenv("DATABASE_URL"))

環境変数 DATABASE_URL には postgresql://{username}:{password}@{hostname}:{port}/{database} というフォーマットでデータベースの接続情報を指定する。

以下、この get_connection 関数を使ってコネクションオブジェクトを取得するものとする。

クエリを実行する

コネクションオブジェクトからカーソルオブジェクトを取得して、クエリを実行する。

conn = get_connection()
cur = conn.cursor()
cur.execute('SELECT * FROM users')
cur.close()
conn.close()

with 文を使うと以下のように書ける。

with get_connection() as conn:
    with conn.cursor() as cur:
        cur.execute('SELECT * FROM users')

例外が発生した場合などに close し忘れる等のミスを防げるので、 with 文を利用した方がいい。

クエリにパラメータを埋め込む

cur.execute の第二引数にタプル (またはリスト) を渡すことでクエリにパラメータを安全に埋め込むことができる。

name = "' OR 1=1 --"  # 悪意のあるパラメータ
cur.execute('SELECT * FROM users WHERE name = %s', (name,))

print(cur.query)  #=> "SELECT * FROM users WHERE name = ''' OR 1=1 --'"

クエリの実行結果を取得する

ひとつだけ取得する

cur.execute('SELECT COUNT(1) FROM users')
(count,) = cur.fetchone()

まとめて取得する

cur.execute('SELECT * FROM users')
rows = cur.fetchall()

ひとつずつ取得する

cur.execute('SELECT * FROM users')
for row in cur:
    print(row)

実行結果のカラム名を取得する

カーソルオブジェクトの description に各カラムの情報が含まれている。

cur.execute('SELECT * FROM users')
colnames = [col.name for col in cur.description]

これを使うと、実行結果を Pandas DataFrame として読み込むことも簡単にできる。

import pandas as pd

with conn.cursor() as cur:
    cur.execute('SELECT * FROM users')
    df = pd.DataFrame(cur.fetchall(), columns=[col.name for col in cur.description])

実行結果を辞書形式で取得する

カーソルオブジェクト取得時の cursor_factory パラメータにpsycopg2.extras.DictCursor を指定すると、実行結果がディクショナリとして取得できる。

from psycopg2.extras import DictCursor

with get_connection() as conn:
    with conn.cursor(cursor_factory=DictCursor) as cur:
        cur.execute('SELECT COUNT(1) AS count FROM users')
        row = cur.fetchone()
        print(row)  #=> { "count": 123 }

レコードを追加する

with get_connection() as conn:
    with conn.cursor() as cur:
        cur.execute('INSERT INTO users (name) VALUES (%s)', ('foo',))
    conn.commit()

psycopg2 ではデフォルトでトランザクションが有効になっているので commit を行わないと反映されない。

コネクションオブジェクトの生成に with 文を利用していると、ブロック内で例外が発生した場合に自動で conn.rollback() が呼ばれるため、明示的にロールバックを実行する必要はない。

トランザクションを無効にしたい 自動で commit したい場合は autocommit = True を設定する。

with get_connection() as conn:
    conn.autocommit = True
    with conn.cursor() as cur:
        cur.execute('INSERT INTO users (name) VALUES (%s)', ('foo',))

(2023.09 追記) autocommit = True はトランザクションを無効にする設定だと思っていたが、 with 句を使用してコネクションを作成した場合は autocommit = True を設定してもトランザクションそのものは作成されてしまうらしい。1
トランザクションを無効にしたい場合は、with 句を使用せずに書く必要がある。

conn = get_connection()
conn.autocommit = True
try:
    with conn.cursor() as cur:
        cur.execute('INSERT INTO users (name) VALUES (%s)', ('foo',))
finally:
    conn.close()

バイナリデータを追加する

bytea 型のカラムにバイナリデータを INSERT したい場合は、データを psycopg2.Binary() でラップする。

b = b'some binary data'

with get_connection() as conn:
    with conn.cursor() as cur:
        cur.execute('INSERT INTO users (name, image) VALUES (%s, %s)', ('foo', psycopg2.Binary(b)))
    conn.commit()

クエリのタイムアウトを設定する

psycopg2 にはクエリのタイムアウトを設定するようなパラメータはなさそうなので、自分で statement_timeout を設定する。

from psycopg2.extensions import QueryCanceledError

try:
    with get_connection() as conn:
        with conn.cursor() as cur:
            cur.execute('SET statement_timeout TO 100')  # 0.1秒でタイムアウトさせる
            cur.execute('SELECT pg_sleep(1)')  # 実行に1秒かかるクエリの例
except QueryCanceledError as err:
    print(err)  #=> "canceling statement due to statement timeout"

SET で設定した値はそのセッション内でのみ有効なので、他のクエリに影響することはない。同じセッション内で別のクエリも実行する場合は RESET statement_timeout すると値をリセットできる。

サーバサイドカーソルを利用する

通常の cur.execute() では、クエリの実行結果をすべてメモリに載せてからカーソルオブジェクトが参照していく。そのため、メモリに乗り切らないほど膨大なレコードが返ってくるクエリを実行することができない。

このような場合はサーバサイドカーソルを利用する。
サーバサイドカーソルを利用するとデータベース側で DECLARE CURSOR されるため、実行結果を少しずつ取得して処理することができるようになる。

詳しくは: PythonとDB: DBIのcursorを理解する - Qiita

psycopg2 では、名前付きカーソルを作成するとサーバサイドカーソルになる。

with conn.cursor('query1') as cur:

一度の fetch で取得する行数は cur.itersize で定められていて、デフォルトでは 2000 行ずつ取得するようになっている。特に問題がなければこのままでいいが、変更することもできる。

cur.itersize = 10000

サーバサイドカーソルを利用する際の注意点

サーバサイドカーソル利用時は、execute した時点ではカーソルが作成されるだけでまだクエリは実行されていない。クエリが実行されるのは最初に fetch したときなので、それ以降でないと cur.descriptioncur.rownumber を利用することができない。

with conn.cursor('query1') as cur:
    cur.execute('SELECT * FROM users')
    print(cur.description)  # None
    cur.fetchone()
    print(cur.description)  # カラム情報が入っている

execute の直後にカラム情報を取得する処理を書いている場合は注意が必要。

  1. AutoCommit flag broken since 2.9 release · Issue #1406 · psycopg/psycopg2

395
376
4

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
395
376

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?