7
2

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

Pycopgを用いて、PythonからPostgreSQLを操作する基本です

実行環境

Python 3.11.9
PostgreSQL 15.6
Psycopg 3.2.3

Psycopgとは

Psycopgは、Python からPostgreSQLに接続するためのデータベース アダプターです。簡単に、PostgreSQLへの接続やクエリの実行を行うことができます。
psycopg3公式ドキュメントはこちら

使用するには、pipインストールを行う必要があります。

Psycopgのpipインストール

コマンドプロンプトで下記のコマンドを実行することでインストールが可能です。

pip install "psycopg[binary]"

インストール後、下記のコマンドを実行し、Psycopgのversion情報などが表示されたらインストール成功です。

pip show psycopg

PostgreSQLに接続する

connect.py
import psycopg

try:
    # データベースへの接続 
    conn = psycopg.connect("dbname=sample_db user=test password=test host=localhost port=5432")
    print(type(conn))

    # データベース操作を実行するためのカーソルを開く
    cur = conn.cursor()
    print(type(cur))

    # クエリの実行
    cur.execute("SELECT version()")

    # 結果の取得
    version = cur.fetchone()
    print(f"PostgreSQL version: {version}")

except Exception as ex:
    print(f"エラー発生:{ex}")
finally:
    # 接続のを閉じる
    if cur:
        cur.close()
    if conn:
        conn.close()

接続の流れ

  1. importを行う
     import psycopgを記述することで、psycopgライブラリを使用できるようにします。
  2. データベースへの接続
     psycopg.connect() メソッドに引数として、 「dbname,user,password,host,port」を設定し、データベースに接続します。戻り値は、'psycopg.Connection'オブジェクト。
  3. データベース操作を実行するためのカーソルを開く
     生成した、psycopg.Connectionオブジェクトのcursor()メソッドで、DBにコマンドとクエリを送信するための新しいカーソルを生成します。戻り値は、'psycopg.Cursor'オブジェクト。
  4. クエリの実行
     psycopg.Cursorオブジェクトのexecute()メソッドで、 データベースに対してクエリまたはコマンドを実行します。第一引数に実行したいSQL文を文字列形式で渡します。
  5. 結果の取得
     クエリ結果の取得は、fetchone()やfetchall(),fetchmany()を使用します。
  6. 接続のを閉じる
     最後に、カーソルとデータベース接続を明示的に閉じる必要があります。
    close()処理を行わない場合、DBへの接続が閉じれることなく、リソースの浪費パフォーマンスの低下トランザクションの整合性問題などが、発生する可能性がある為、必ず実行する必要があります。

withを使用したデータベース接続

接続の閉じ忘れを防ぐためにも、withを使用したデータベース接続は非常に便利です。

withconnect.py
host = "localhost"
dbname = "sample_db"
user = "test"
password = "test"
host = "localhost"
port = "5432"

# データベースへの接続
try:
    with psycopg.connect(dbname=dbname, user=user, password=password, host=host, port=port) as conn:
        # データベース操作を実行するためのカーソルを開く
        with conn.cursor() as cur:
            # SQLクエリの実行
            cur.execute("SELECT version()")
            version = cur.fetchone()
            print(f"PostgreSQL version: {version}")
except Exception as ex:
    print(f"エラー発生:{ex}")

withを使用したDB接続の場合、明示的に接続を閉じる必要がなくなります。
withブロックの終了時に、接続が閉じられます。
またwithを使用した場合は、最初のデータベース操作(cur.executeを実行)でトランザクションが開始され、withブロック終了後に、自動的にCOMMITされます。(エラーが発生した場合はROLLBACKが実行される)
Psycopg公式ドキュメント

withを使用しない接続の場合は、下記のように明示的なCOMMITが必要となります。conn.commit()を行わない場合、データベースに変更が反映されません。

commit.py
conn = psycopg.connect(dbname=dbname, user=user, password=password, host=host, port=port)

cur = conn.cursor()

insert_query = """
            INSERT
            INTO users(username, age, birthplace)
             values (%(name)s, %(age)s, %(birthplace)s)
            """

user_data = {"name": "inert_user", "age": 45, "birthplace": "Tokyo"}
cur.execute(insert_query, user_data)

# 明示的なcommitが必要
conn.commit()
cur.close()
conn.close()

COMMITの動作については、後ほど補足します。


以降、解説用のテーブルを用いてPscopgの基本操作をまとめます。
下記のCREATE文とINSERT文で、テーブル作成ができます。

CREATE TABLE users (
    id SERIAL PRIMARY KEY,
    username VARCHAR(100),
    age INT,
    birthplace VARCHAR(100)
);

INSERT INTO users (username, age, birthplace) VALUES ('Emma', 20, 'Tokyo');
INSERT INTO users (username, age, birthplace) VALUES ('Liam', 25, 'New York');
INSERT INTO users (username, age, birthplace) VALUES ('Olivia', 30, 'London');
INSERT INTO users (username, age, birthplace) VALUES ('Noah', 35, 'Berlin');
INSERT INTO users (username, age, birthplace) VALUES ('Oliver', 40, 'Paris'); 

データベース基本操作

Pythonから、PostgreSQLを操作する方法をまとめます。
データベースにクエリまたはコマンドを実行するには、Cursorクラスのexecuteメソッドを使用します。第一引数に文字列形式でSQL文を渡し、第二引数にはパラメータを渡すこともできます。

SELECT文とデータ取得方法

SELECT文を実行しデータ取得するには、いくつかの方法がありますが、今回はfetchallfetchoneを解説します。

fetchall

fetchall.py
try:
    with psycopg.connect(dbname=dbname, user=user, password=password, host=host, port=port) as conn:
        with conn.cursor() as cur:

            # SQLクエリの実行 Cursorクラスのexecuteメソッドを用います。 第一引数に文字列形式でSQL文を渡します。第二引数にパラメータを渡すこともできる
            cur.execute("""
                    SELECT
                        id
                        , username
                        , age
                        , birthplace
                    FROM
                        users
                    ORDER BY
                        id
                    """
            )

            users = cur.fetchall()
            print(f"fetchallの結果: {users}")
            print(f"fetchallの戻り値の型: {type(users)}")
            for user in users:
                print(f"1行ずつの出力: {user}")
                print(f"usersの中身の型: {type(user)}")

except Exception as ex:
    print(f"エラー発生:{ex}")

cur.fetchall()では、クエリの結果をすべて取得し、タプル型のリストとして返します。
for文を回し、1行分のレコードを取得することもできます。

fatchone

fetchone.py
try:
    with psycopg.connect(dbname=dbname, user=user, password=password, host=host, port=port) as conn:
        with conn.cursor() as cur:
            cur.execute("""
                    SELECT
                        id
                        , username
                        , age
                        , birthplace
                    FROM
                        users
                    ORDER BY
                        id
                    """
            )
            
            # usersテーブルに5件データをINSERTした後に、実行すると6回目のcur.fetchone()実行でNoneがかる
            # cur.fetchoneの実行回数記録
            fatch_count = 1
            user = cur.fetchone()
            print(f"{fatch_count}回目の実行: {user}")
            print(f"fatchone()の戻り値の型: {type(user)}")
            fatch_count += 1
            user = cur.fetchone()
            print(f"{fatch_count}回目の実行: {user}")
            fatch_count += 1
            user = cur.fetchone()
            print(f"{fatch_count}回目の実行: {user}")
            fatch_count += 1
            user = cur.fetchone()
            print(f"{fatch_count}回目の実行: {user}")
            fatch_count += 1
            user = cur.fetchone()
            print(f"{fatch_count}回目の実行: {user}")
            fatch_count += 1
            user = cur.fetchone()
            print(f"{fatch_count}回目の実行: {user}")
            fatch_count += 1
            user = cur.fetchone()

except Exception as ex:
    print(f"エラー発生:{ex}")

cur.fetchone()では、現在のレコードセットから次のレコードをタプル型で返します。(取得結果の1行分を返す)
全てのレコードを取得し終えたら、Noneを返します。
テーブルの主キーに対する検索の際、使用すると効果的です。

レコードをタプル型でなく、辞書型で取得する方法

Pythonの処理の中で取得レコードを使用するため、タプル型でなく、辞書型で取得することもできます。

row_factory.py
from psycopg.rows import dict_row

with psycopg.connect(dbname=dbname, user=user, password=password, host=host, port=port, row_factory=dict_row) as conn:
    # row_factory = dict_rowを引数に渡す
    with conn.cursor(row_factory=dict_row) as cur:
        cur.execute("""
                    SELECT
                        id
                        , username
                        , age
                        , birthplace
                    FROM
                        users
                    ORDER BY
                        id
                    """
        )

        user = cur.fetchone()
        print(f"fatchone()の戻り値の型: {type(user)}")

from psycopg.rows import dict_rowを行い、 with conn.cursor(row_factory=dict_row) as cur:のように、引数でrow_factory=dict_rowとします。
レコード取得後の処理によって、タプル型で取得するか辞書型で取得するか使い分けると便利です。

SQLのWHERE句などの条件を動的に設定する方法

CursorクラスのexecuteメソッドにSQL文を渡す際、WHERE句などの条件を動的に設定したい場合は、**プレースホルダー ** を使用することで可能です。記述方法は%s, %(arg)sの2種類があります

placeholder.py
try:
    with psycopg.connect(dbname=dbname, user=user, password=password, host=host, port=port) as conn:
        with conn.cursor() as cur:
        
            cur.execute("""
                    SELECT
                        id
                        , username
                        , age
                        , birthplace
                    FROM
                        users
                    WHERE
                        id = %s OR age = %s
                    """,
                (4, 20),
            )

            users = cur.fetchall()
            print(users)

            cur.execute("""
                    SELECT
                        id
                        , username
                        , age
                        , birthplace
                    FROM
                        users
                    WHERE
                        id = %(id)s OR username = %(name)s
                    """,
                {"id": 2, "name": "Olivia"},
            )

            users = cur.fetchall()
            print(users)

except Exception as ex:
    print(f"エラー発生:{ex}")

executeメソッドの第一引数のSQL文の動的に設定したい箇所に「%s(プレースホルダー)」を用い、第二引数に設定したい値をタプル形式(設定したい順番で)渡します。
名前付き引数をクエリ内の、プレースホルダ―にマッピングして使用することも可能です。

SQLのIN句を実行する方法

in句を使用した、クエリを実行する場合は注意が必要です。
下記のquery1変数を使用した場合は、エラーとなってしまいます。

in.py
try:
    with psycopg.connect(dbname=dbname, user=user, password=password, host=host, port=port) as conn:
        with conn.cursor() as cur:

            # 検索する年齢のリスト
            values = [10, 20, 30, 40]

            # query1ではエラーが発生してしまう
            query1 = """
                    SELECT
                        id,
                        username,
                        age
                    FROM
                        users
                    WHERE
                        age IN %s
                    """
            # query2のようにプレースホルダーを用意し、実行する
            query2 = """
                    SELECT
                        id,
                        username,
                        age
                    FROM
                        users
                    WHERE
                        age IN (%s,%s,%s,%s)
                    """

            # クエリを実行
            cur.execute(query2, values)

            users = cur.fetchall()
            print(users)

except Exception as ex:
    print(f"エラー発生: {ex}")

query1の場合

query1を実行するとエラーが発生します。これは、IN %sという構文が正しくないためです。この場合、1つのプレースホルダーに対して4つの値(values = [10, 20, 30, 40])をセットしようとしているため、正しく解釈されません。

query2の場合

query2では、IN句に対して必要な数のプレースホルダー(この場合は4つ)を用意しています。これにより、各値が適切にバインドされ、クエリが正しく実行されます。ただし、この方法では、パラメータの要素数が変わるたびにSQL文自体を変更する必要があります。

query1・query2の問題点を回避する方法として、ANY句を使用する方法があります。

SQLのANY句を実行する方法

any.py
try:
    with psycopg.connect(dbname=dbname, user=user, password=password, host=host, port=port) as conn:
        with conn.cursor() as cur:

            # 検索する年齢のリスト
            values_list = [10, 20, 30, 40]

            query = """
                    SELECT
                        id,
                        username,
                        age
                    FROM
                        users
                    WHERE
                        age = any(%s)
                    """

            # クエリを実行 値をリストとして渡す必要がある
            cur.execute(query, [values_list])

            users = cur.fetchall()
            print(users)

except Exception as ex:
    print(f"エラー発生: {ex}")

ANY句を使用することで、SQLクエリの条件を一つのプレースホルダーで指定できるため、複数の値を簡潔に扱うことができます。この場合、executeメソッドの第二引数には、検索したい値のリストをリスト型として渡す必要があります。リスト全体(values_list)を一つの要素として渡す形になります。

INSERT・UPDATE・DELETE文の実行方法

INSERT・UPDATE・DELETEは下記のように、SQL文をexecuteメソッドの第一引数に渡し実行します。

INSERT・UPDATE・DELETE.py
try:
    with psycopg.connect(dbname=dbname, user=user, password=password, host=host, port=port) as conn:
        with conn.cursor() as cur:

            # INSERT文
            insert_query = """
                            INSERT
                            INTO users(username, age, birthplace)
                            values (%(name)s, %(age)s, %(birthplace)s)
                            """

            user_data = {"name": "inert_user", "age": 45, "birthplace": "Tokyo"}
            cur.execute(insert_query, user_data)
            
            # UPDATE文
            update_query = """
                            UPDATE users
                            SET
                                age = %(age)s
                            WHERE
                                username = %(name)s
                            """

            user_data = {"age": 25, "name": "inert_user"}
            cur.execute(update_query, user_data)

            # DELETE文
            delete_query = """
                            DELETE
                            FROM
                                users
                            WHERE
                                username = %(name)s
                            """

            user_data = {"name": "inert_user"}
            cur.execute(delete_query, user_data)

except Exception as ex:
    print(f"エラー発生:{ex}")

Psycopg3におけるCOMMITの動作

COMMITについては、withの説明の際少し触れましたが、ここで少し捕捉します。
上記の「INSERT・UPDATE・DELETE.py」の例では、withを使用した接続の為、明示的なコミットを行わなくてもwithブロック終了で、自動的にCOMMITされます。また、最初のcur.execute(insert_query, user_data)のタイミングでトランザクションが開始され、INSERT・UPDATE・DELETEは全て同一のトランザクション内でのデータベース操作となり、3つの操作が一括してコミットされます。

autocommitでの自動コミット

Connectionの生成時に、psycopg.connect(autocommit=True)のようにautocommitを設定することで、データベース操作を行うたびにCOMMITを行うようにできます。「INSERT・UPDATE・DELETE.py」の例では、各cur.execute()を実行するたびにデータベースが更新され、その結果が即座にコミットされます。このモードでは、各SQL文が独立して処理されるため、1つ1つの操作が即座にデータベースに反映されます。

参考

psycopg3公式ドキュメント
PythonとPostgreSQLの完全ガイド:基本操作から高度なテクニックまで

7
2
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
7
2

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?