Pycopgを用いて、PythonからPostgreSQLを操作する基本です
実行環境
Python 3.11.9
PostgreSQL 15.6
Psycopg 3.2.3
Psycopgとは
Psycopgは、Python からPostgreSQLに接続するためのデータベース アダプターです。簡単に、PostgreSQLへの接続やクエリの実行を行うことができます。
psycopg3公式ドキュメントはこちら
使用するには、pipインストールを行う必要があります。
Psycopgのpipインストール
コマンドプロンプトで下記のコマンドを実行することでインストールが可能です。
pip install "psycopg[binary]"
インストール後、下記のコマンドを実行し、Psycopgのversion情報などが表示されたらインストール成功です。
pip show psycopg
PostgreSQLに接続する
import psycopg
try:
# データベースへの接続
conn = psycopg.connect("dbname=sample_db user=test password=test host=localhost port=5432")
print(type(conn))
# データベース操作を実行するためのカーソルを開く
cur = conn.cursor()
print(type(cur))
# クエリの実行
cur.execute("SELECT version()")
# 結果の取得
version = cur.fetchone()
print(f"PostgreSQL version: {version}")
except Exception as ex:
print(f"エラー発生:{ex}")
finally:
# 接続のを閉じる
if cur:
cur.close()
if conn:
conn.close()
接続の流れ
-
importを行う
import psycopg
を記述することで、psycopgライブラリを使用できるようにします。 -
データベースへの接続
psycopg.connect()
メソッドに引数として、 「dbname,user,password,host,port」を設定し、データベースに接続します。戻り値は、'psycopg.Connection'オブジェクト。 -
データベース操作を実行するためのカーソルを開く
生成した、psycopg.Connectionオブジェクトのcursor()メソッドで、DBにコマンドとクエリを送信するための新しいカーソルを生成します。戻り値は、'psycopg.Cursor'オブジェクト。 -
クエリの実行
psycopg.Cursorオブジェクトのexecute()メソッドで、 データベースに対してクエリまたはコマンドを実行します。第一引数に実行したいSQL文を文字列形式で渡します。 -
結果の取得
クエリ結果の取得は、fetchone()やfetchall(),fetchmany()を使用します。 -
接続のを閉じる
最後に、カーソルとデータベース接続を明示的に閉じる必要があります。
close()
処理を行わない場合、DBへの接続が閉じれることなく、リソースの浪費、パフォーマンスの低下、トランザクションの整合性問題などが、発生する可能性がある為、必ず実行する必要があります。
withを使用したデータベース接続
接続の閉じ忘れを防ぐためにも、withを使用したデータベース接続は非常に便利です。
host = "localhost"
dbname = "sample_db"
user = "test"
password = "test"
host = "localhost"
port = "5432"
# データベースへの接続
try:
with psycopg.connect(dbname=dbname, user=user, password=password, host=host, port=port) as conn:
# データベース操作を実行するためのカーソルを開く
with conn.cursor() as cur:
# SQLクエリの実行
cur.execute("SELECT version()")
version = cur.fetchone()
print(f"PostgreSQL version: {version}")
except Exception as ex:
print(f"エラー発生:{ex}")
withを使用したDB接続の場合、明示的に接続を閉じる必要がなくなります。
withブロックの終了時に、接続が閉じられます。
またwithを使用した場合は、最初のデータベース操作(cur.execute
を実行)でトランザクションが開始され、withブロック終了後に、自動的にCOMMITされます。(エラーが発生した場合はROLLBACKが実行される)
Psycopg公式ドキュメント
withを使用しない接続の場合は、下記のように明示的なCOMMITが必要となります。conn.commit()
を行わない場合、データベースに変更が反映されません。
conn = psycopg.connect(dbname=dbname, user=user, password=password, host=host, port=port)
cur = conn.cursor()
insert_query = """
INSERT
INTO users(username, age, birthplace)
values (%(name)s, %(age)s, %(birthplace)s)
"""
user_data = {"name": "inert_user", "age": 45, "birthplace": "Tokyo"}
cur.execute(insert_query, user_data)
# 明示的なcommitが必要
conn.commit()
cur.close()
conn.close()
COMMITの動作については、後ほど補足します。
以降、解説用のテーブルを用いてPscopgの基本操作をまとめます。
下記のCREATE文とINSERT文で、テーブル作成ができます。
CREATE TABLE users (
id SERIAL PRIMARY KEY,
username VARCHAR(100),
age INT,
birthplace VARCHAR(100)
);
INSERT INTO users (username, age, birthplace) VALUES ('Emma', 20, 'Tokyo');
INSERT INTO users (username, age, birthplace) VALUES ('Liam', 25, 'New York');
INSERT INTO users (username, age, birthplace) VALUES ('Olivia', 30, 'London');
INSERT INTO users (username, age, birthplace) VALUES ('Noah', 35, 'Berlin');
INSERT INTO users (username, age, birthplace) VALUES ('Oliver', 40, 'Paris');
データベース基本操作
Pythonから、PostgreSQLを操作する方法をまとめます。
データベースにクエリまたはコマンドを実行するには、Cursorクラスのexecuteメソッドを使用します。第一引数に文字列形式でSQL文を渡し、第二引数にはパラメータを渡すこともできます。
SELECT文とデータ取得方法
SELECT文を実行しデータ取得するには、いくつかの方法がありますが、今回はfetchall
とfetchone
を解説します。
fetchall
try:
with psycopg.connect(dbname=dbname, user=user, password=password, host=host, port=port) as conn:
with conn.cursor() as cur:
# SQLクエリの実行 Cursorクラスのexecuteメソッドを用います。 第一引数に文字列形式でSQL文を渡します。第二引数にパラメータを渡すこともできる
cur.execute("""
SELECT
id
, username
, age
, birthplace
FROM
users
ORDER BY
id
"""
)
users = cur.fetchall()
print(f"fetchallの結果: {users}")
print(f"fetchallの戻り値の型: {type(users)}")
for user in users:
print(f"1行ずつの出力: {user}")
print(f"usersの中身の型: {type(user)}")
except Exception as ex:
print(f"エラー発生:{ex}")
cur.fetchall()
では、クエリの結果をすべて取得し、タプル型のリストとして返します。
for文を回し、1行分のレコードを取得することもできます。
fatchone
try:
with psycopg.connect(dbname=dbname, user=user, password=password, host=host, port=port) as conn:
with conn.cursor() as cur:
cur.execute("""
SELECT
id
, username
, age
, birthplace
FROM
users
ORDER BY
id
"""
)
# usersテーブルに5件データをINSERTした後に、実行すると6回目のcur.fetchone()実行でNoneがかる
# cur.fetchoneの実行回数記録
fatch_count = 1
user = cur.fetchone()
print(f"{fatch_count}回目の実行: {user}")
print(f"fatchone()の戻り値の型: {type(user)}")
fatch_count += 1
user = cur.fetchone()
print(f"{fatch_count}回目の実行: {user}")
fatch_count += 1
user = cur.fetchone()
print(f"{fatch_count}回目の実行: {user}")
fatch_count += 1
user = cur.fetchone()
print(f"{fatch_count}回目の実行: {user}")
fatch_count += 1
user = cur.fetchone()
print(f"{fatch_count}回目の実行: {user}")
fatch_count += 1
user = cur.fetchone()
print(f"{fatch_count}回目の実行: {user}")
fatch_count += 1
user = cur.fetchone()
except Exception as ex:
print(f"エラー発生:{ex}")
cur.fetchone()
では、現在のレコードセットから次のレコードをタプル型で返します。(取得結果の1行分を返す)
全てのレコードを取得し終えたら、Noneを返します。
テーブルの主キーに対する検索の際、使用すると効果的です。
レコードをタプル型でなく、辞書型で取得する方法
Pythonの処理の中で取得レコードを使用するため、タプル型でなく、辞書型で取得することもできます。
from psycopg.rows import dict_row
with psycopg.connect(dbname=dbname, user=user, password=password, host=host, port=port, row_factory=dict_row) as conn:
# row_factory = dict_rowを引数に渡す
with conn.cursor(row_factory=dict_row) as cur:
cur.execute("""
SELECT
id
, username
, age
, birthplace
FROM
users
ORDER BY
id
"""
)
user = cur.fetchone()
print(f"fatchone()の戻り値の型: {type(user)}")
from psycopg.rows import dict_row
を行い、 with conn.cursor(row_factory=dict_row) as cur:
のように、引数でrow_factory=dict_row
とします。
レコード取得後の処理によって、タプル型で取得するか辞書型で取得するか使い分けると便利です。
SQLのWHERE句などの条件を動的に設定する方法
CursorクラスのexecuteメソッドにSQL文を渡す際、WHERE句などの条件を動的に設定したい場合は、**プレースホルダー ** を使用することで可能です。記述方法は%s, %(arg)sの2種類があります
try:
with psycopg.connect(dbname=dbname, user=user, password=password, host=host, port=port) as conn:
with conn.cursor() as cur:
cur.execute("""
SELECT
id
, username
, age
, birthplace
FROM
users
WHERE
id = %s OR age = %s
""",
(4, 20),
)
users = cur.fetchall()
print(users)
cur.execute("""
SELECT
id
, username
, age
, birthplace
FROM
users
WHERE
id = %(id)s OR username = %(name)s
""",
{"id": 2, "name": "Olivia"},
)
users = cur.fetchall()
print(users)
except Exception as ex:
print(f"エラー発生:{ex}")
executeメソッドの第一引数のSQL文の動的に設定したい箇所に「%s(プレースホルダー)」を用い、第二引数に設定したい値をタプル形式(設定したい順番で)渡します。
名前付き引数をクエリ内の、プレースホルダ―にマッピングして使用することも可能です。
SQLのIN句を実行する方法
in句を使用した、クエリを実行する場合は注意が必要です。
下記のquery1変数を使用した場合は、エラーとなってしまいます。
try:
with psycopg.connect(dbname=dbname, user=user, password=password, host=host, port=port) as conn:
with conn.cursor() as cur:
# 検索する年齢のリスト
values = [10, 20, 30, 40]
# query1ではエラーが発生してしまう
query1 = """
SELECT
id,
username,
age
FROM
users
WHERE
age IN %s
"""
# query2のようにプレースホルダーを用意し、実行する
query2 = """
SELECT
id,
username,
age
FROM
users
WHERE
age IN (%s,%s,%s,%s)
"""
# クエリを実行
cur.execute(query2, values)
users = cur.fetchall()
print(users)
except Exception as ex:
print(f"エラー発生: {ex}")
query1の場合
query1を実行するとエラーが発生します。これは、IN %s
という構文が正しくないためです。この場合、1つのプレースホルダーに対して4つの値(values = [10, 20, 30, 40])をセットしようとしているため、正しく解釈されません。
query2の場合
query2では、IN句に対して必要な数のプレースホルダー(この場合は4つ)を用意しています。これにより、各値が適切にバインドされ、クエリが正しく実行されます。ただし、この方法では、パラメータの要素数が変わるたびにSQL文自体を変更する必要があります。
query1・query2の問題点を回避する方法として、ANY句を使用する方法があります。
SQLのANY句を実行する方法
try:
with psycopg.connect(dbname=dbname, user=user, password=password, host=host, port=port) as conn:
with conn.cursor() as cur:
# 検索する年齢のリスト
values_list = [10, 20, 30, 40]
query = """
SELECT
id,
username,
age
FROM
users
WHERE
age = any(%s)
"""
# クエリを実行 値をリストとして渡す必要がある
cur.execute(query, [values_list])
users = cur.fetchall()
print(users)
except Exception as ex:
print(f"エラー発生: {ex}")
ANY句を使用することで、SQLクエリの条件を一つのプレースホルダーで指定できるため、複数の値を簡潔に扱うことができます。この場合、executeメソッドの第二引数には、検索したい値のリストをリスト型として渡す必要があります。リスト全体(values_list)を一つの要素として渡す形になります。
INSERT・UPDATE・DELETE文の実行方法
INSERT・UPDATE・DELETEは下記のように、SQL文をexecuteメソッドの第一引数に渡し実行します。
try:
with psycopg.connect(dbname=dbname, user=user, password=password, host=host, port=port) as conn:
with conn.cursor() as cur:
# INSERT文
insert_query = """
INSERT
INTO users(username, age, birthplace)
values (%(name)s, %(age)s, %(birthplace)s)
"""
user_data = {"name": "inert_user", "age": 45, "birthplace": "Tokyo"}
cur.execute(insert_query, user_data)
# UPDATE文
update_query = """
UPDATE users
SET
age = %(age)s
WHERE
username = %(name)s
"""
user_data = {"age": 25, "name": "inert_user"}
cur.execute(update_query, user_data)
# DELETE文
delete_query = """
DELETE
FROM
users
WHERE
username = %(name)s
"""
user_data = {"name": "inert_user"}
cur.execute(delete_query, user_data)
except Exception as ex:
print(f"エラー発生:{ex}")
Psycopg3におけるCOMMITの動作
COMMITについては、withの説明の際少し触れましたが、ここで少し捕捉します。
上記の「INSERT・UPDATE・DELETE.py」の例では、withを使用した接続の為、明示的なコミットを行わなくてもwithブロック終了で、自動的にCOMMITされます。また、最初のcur.execute(insert_query, user_data)
のタイミングでトランザクションが開始され、INSERT・UPDATE・DELETEは全て同一のトランザクション内でのデータベース操作となり、3つの操作が一括してコミットされます。
autocommitでの自動コミット
Connectionの生成時に、psycopg.connect(autocommit=True)
のようにautocommitを設定することで、データベース操作を行うたびにCOMMITを行うようにできます。「INSERT・UPDATE・DELETE.py」の例では、各cur.execute()
を実行するたびにデータベースが更新され、その結果が即座にコミットされます。このモードでは、各SQL文が独立して処理されるため、1つ1つの操作が即座にデータベースに反映されます。