0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

PostgreSQL psycopg2 で更新クエリーを一括実行するには?

Posted at

過去の投稿で PostgreSQLデータベースにおいて python の psycopg2 ライブラリの execute_values 関数を使って一括登録する pythonスクリプトを紹介いたしました。

今回は execute_values 関数を使って更新クエリーを一括実行する pythonスクリプトを紹介します。

動作環境

  • Ubuntu 22.04 Desktop
  • データベース: PostgreSQL 16 on docker
  • python
    • version: 3.10.12
      プロジェクト専用のpython仮想環境を作成しpythonスクリプトを実行
    • pythonライブラリ
      $ pip install psycopg2-binary

参考URL

Fast execution helpers のコード例

>>> execute_values(cur,
... """UPDATE test SET v1 = data.v1 FROM (VALUES %s) AS data (id, v1)
... WHERE test.id = data.id""",
... [(1, 20), (4, 50)])

>>> cur.execute("select * from test order by id")
>>> cur.fetchall()
[(1, 20, 3), (4, 50, 6), (7, 8, 9)])

1.テーブル定義

過去の投稿で使用したものを以下に再掲します。

-- Qiita投稿用に作成したテスト用スキーマとテーブル定義
CREATE SCHEMA mainte2;

-- 不正アクセスIPマスタテーブル
CREATE TABLE mainte2.unauth_ip_addr(
   id INTEGER NOT NULL,
   ip_addr VARCHAR(15) NOT NULL,
   reg_date DATE NOT NULL,
   country_code CHAR(2)
);
CREATE SEQUENCE mainte2.ip_addr_id OWNED BY mainte2.unauth_ip_addr.id;
ALTER TABLE mainte2.unauth_ip_addr ALTER id SET DEFAULT nextval('mainte2.ip_addr_id');
ALTER TABLE mainte2.unauth_ip_addr ADD CONSTRAINT pk_unauth_ip_addr PRIMARY KEY (id);
-- IPアドレスはユニーク
CREATE UNIQUE INDEX idx_ip_addr ON mainte2.unauth_ip_addr(ip_addr);

-- 不正アクセスカウンターテープル
CREATE TABLE mainte2.ssh_auth_error(
   log_date date NOT NULL,
   ip_id INTEGER,
   appear_count INTEGER NOT NULL
);
ALTER TABLE mainte2.ssh_auth_error ADD CONSTRAINT pk_ssh_auth_error
   PRIMARY KEY (log_date, ip_id);
-- 参照整合性制約: 不正アクセスIPマスタテーブルのID(PRYMARY KEY)
ALTER TABLE mainte2.ssh_auth_error ADD CONSTRAINT fk_ssh_auth_error
   FOREIGN KEY (ip_id) REFERENCES mainte2.unauth_ip_addr (id);

各テーブルへのデータの登録イメージも再掲します。

SshAuthErrorTables_overview.png

2. 動作確認スクリプト

今回は複数レコードを一括更新するpythonスクリプトを紹介することが主題なので、一括登録スクリプトについては割愛します。最初に紹介した投稿記事をご覧ください。

2-1.手入力でレコードを登録

稼働中の dockerコンテナの PostgreSQLの psqlコマンドを使って登録します。

$ docker exec -it postgres-qiita bash

サンプルデータ (CSVファイルイメージ)

"log_date","ip_addr","appear_count"
"2024-06-10","20.244.134.31",41
"2024-06-11","14.215.120.90",41
"2024-06-12","14.50.30.17",33

(1) 不正アクセスIPマスタテーブルに3件登録

af08a238d21f:/# echo "INSERT INTO mainte2.unauth_ip_addr(ip_addr,reg_date) VALUES 
 ('20.244.134.31','2024-06-10'),
 ('14.215.120.90','2024-06-11'),
 ('14.50.30.17','2024-06-12') RETURNING id, ip_addr;" \
> | psql -Udeveloper qiita_exampledb
 id |    ip_addr    
----+---------------
  1 | 20.244.134.31
  2 | 14.215.120.90
  3 | 14.50.30.17
(3 rows)


INSERT 0 3

# 登録データ確認
af08a238d21f:/# echo "SELECT * FROM mainte2.unauth_ip_addr;" \
> | psql -Udeveloper qiita_exampledb
 id |    ip_addr    |  reg_date  | country_code 
----+---------------+------------+--------------
  1 | 20.244.134.31 | 2024-06-10 | 
  2 | 14.215.120.90 | 2024-06-11 | 
  3 | 14.50.30.17   | 2024-06-12 | 
(3 rows)

(2) 不正アクセスカウンターテープルに3件登録

af08a238d21f:/# echo "INSERT INTO mainte2.ssh_auth_error(log_date,ip_id,appear_count) VALUES 
 ('2024-06-10',1,41), ('2024-06-11',2,41), ('2024-06-12',3,33);" \
> | psql -Udeveloper qiita_exampledb
INSERT 0 3

# 登録データ確認
af08a238d21f:/# echo "SELECT * FROM mainte2.ssh_auth_error;" \
> | psql -Udeveloper qiita_exampledb
  log_date  | ip_id | appear_count 
------------+-------+--------------
 2024-06-10 |     1 |           41
 2024-06-11 |     2 |           41
 2024-06-12 |     3 |           33
(3 rows)

2-2. pythonスクリプト

過去投稿でIPアドレスから国コードとネットワークアドレスを取得する方法を紹介いたしました。

上記記事で紹介した python スクリプトを使って取得した国コードは以下の通りです。

IPアドレス 国コード 国名※参考
20.244.134.31 US アメリカ合衆国
14.215.120.90 CN 中華人民共和国
14.50.30.17 KR 大韓民国

2-2-1. 一括更新用SQL定義

国コードを更新するクエリーを (2)のサンプルを参考に作成

参考URL (1) 互換性
このコマンドは標準SQLに準拠しています。 ただしFROM句およびRETURNING句はPostgreSQLの拡張です。

UPDATE mainte2.unauth_ip_addr 
  SET country_code = data.cc
FROM
  (VALUES %s) AS data (ip, cc)
WHERE
  ip_addr=data.ip RETURNING id, ip_addr, country_code

FROM句の data のカラム名についての注意
(1) WHERE条件 ip_addr(列名) を data に定義すると下記のように曖昧だとエラーになります。
(2) RETURNING 句に列挙する列名も同様です。

(1) でエラーになるケース

UPDATE mainte2.unauth_ip_addr 
  SET country_code = data.country_code
FROM
  (VALUES %s) AS data (ip_addr, country_code)
WHERE
  ip_addr=data.ip_addr RETURNING id, ip_addr, country_code
ERROR column reference "ip_addr" is ambiguous
LINE 7:   ip_addr=data.ip_addr RETURNING id, ip_addr, country_code
          ^

(2) でエラーになるケース

UPDATE mainte2.unauth_ip_addr 
  SET country_code = data.country_code
FROM
  (VALUES %s) AS data (ip, country_code)
WHERE
  ip_addr=data.ip RETURNING id, ip_addr, country_code
ERROR column reference "country_code" is ambiguous
LINE 7:   ip_addr=data.ip RETURNING id, ip, country_code
                                            ^

(3) エラーにならないクエリー※RETURNING なし

UPDATE mainte2.unauth_ip_addr 
  SET country_code = data.country_code
FROM
  (VALUES %s) AS data (ip, country_code)
WHERE
  ip_addr=data.ip

2-2-2. 一括更新関数

更新関数に必要なインポート

import logging
from typing import Any, List, Optional, Tuple

import psycopg2
from psycopg2.extensions import connection, cursor
from psycopg2.extras import execute_values
2-2-2 (1) 更新結果(戻り値)を RETURNING で取得

execute_values() 関数が生成したクエリーをDEBUGで出力します

def bulk_update_cc(
        conn: connection,
        params: List[Tuple[str, str]],
        logger: Optional[logging.Logger] = None) -> List[Tuple[Any, ...]]:
    if logger is not None:
        logger.debug(f"params: {params}")
    result: List[Tuple[Any, ...]]
    try:
        cur: cursor
        with conn.cursor() as cur:
            result = execute_values(cur, """
UPDATE mainte2.unauth_ip_addr 
  SET country_code = data.cc
FROM
  (VALUES %s) AS data (ip, cc)
WHERE
  ip_addr=data.ip RETURNING id, ip, country_code""",
                                    params,
                                    fetch=True
                                    )
            # 実行されたSQLを出力
            if logger is not None:
                if cur.query is not None:
                    logger.debug(f"{cur.query.decode('utf-8')}")

            return result
    except (Exception, psycopg2.DatabaseError) as err:
        raise err
2-2-2 更新結果(戻り値)を更新後発行したクエリーから取得

UPDATE で RETURNING をサポートしない PostgreSQLの使用する場合
※バージョン 8.2 以前 (もうこのバージョンは使っていないとおもれれますが)

def bulk_update_cc_with_fetch_query(
        conn: connection,
        params: List[Tuple[str, str]],
        logger: Optional[logging.Logger] = None) -> List[Tuple[Any, ...]]:
    if logger is not None:
        logger.debug(f"params: {params}")
    result: List[Tuple[Any, ...]]
    try:
        cur: cursor
        with conn.cursor() as cur:
            execute_values(cur, """
UPDATE mainte2.unauth_ip_addr 
  SET country_code = data.country_code
FROM
  (VALUES %s) AS data (ip, country_code)
WHERE
  ip_addr=data.ip""",
                           params
                           )
            # 実行されたSQLを出力
            if logger is not None:
                if cur.query is not None:
                    logger.debug(f"{cur.query.decode('utf-8')}")

            # 更新結果取得
            #  クエリーパラメータからIPアドレス(先頭)のみ取り出す
            ip_list: List[str] = [param[0] for param in params]
            in_clause: Tuple[str, ...] = tuple(ip_list)
            cur.execute("""
SELECT
   id, ip_addr, country_code
FROM
   mainte2.unauth_ip_addr
WHERE
   ip_addr IN %s ORDER BY id""", (in_clause,)
                        )
            result = cur.fetchall()
            return result
    except (Exception, psycopg2.DatabaseError) as err:
        raise err

2-2-3. テスト用クエリーパラメータ

qry_params: List[Tuple[str, str]] = [
    ('14.50.30.17', 'KR'),
    ('14.215.120.90', 'CN'),
    ('20.244.134.31', 'US'),
]

2-3. pythonバッチスクリプト

ソース全体を下記に示します。

TestBulkUpdate_unauth_ip_addr.py
import argparse
import logging
import socket
from typing import Any, Dict, List, Optional, Tuple

import psycopg2
from psycopg2.extensions import connection, cursor
from psycopg2.extras import execute_values

"""
[Qiita投稿用スクリプト]
不正アクセスIPマスタ一括更新
[テーブル]
不正アクセスIPアドレスマスタ: mainte2.unath_ip_addr
"""

# テストデータ
qry_params: List[Tuple[str, str]] = [
    ('14.50.30.17', 'KR'),
    ('14.215.120.90', 'CN'),
    ('20.244.134.31', 'US'),
]

# データベース接続情報
DB_CONF: Dict = {
    "host": "{hostname}",
    "port": "5432",
    "database": "qiita_exampledb",
    "user": "developer",
    "password": "developerPASSSWORD"
}


# PostgreSQLデータベース接続管理クラス
class PgDatabase(object):
    def __init__(self, hostname: Optional[str] = None,
                 logger: Optional[logging.Logger] = None):
        self.logger = logger
        if hostname is None:
            hostname = socket.gethostname()
            DB_CONF["host"] = DB_CONF["host"].format(hostname=hostname)
        # default connection is itarable curosr
        self.conn = psycopg2.connect(**DB_CONF)
        if self.logger is not None:
            self.logger.debug(self.conn)

    def get_connection(self) -> connection:
        return self.conn

    def rollback(self) -> None:
        if self.conn is not None:
            self.conn.rollback()

    def commit(self) -> None:
        if self.conn is not None:
            self.conn.commit()

    def close(self) -> None:
        if self.conn is not None:
            if self.logger is not None:
                self.logger.debug(f"Close {self.conn}")
            self.conn.close()


def before_records(
        conn: connection) -> List[Tuple[Any, ...]]:
    result: List[Tuple[Any, ...]]
    try:
        cur: cursor
        with conn.cursor() as cur:
            cur.execute("""
SELECT id, ip_addr, country_code FROM mainte2.unauth_ip_addr"""
                        )
            result = cur.fetchall()
            return result
    except (Exception, psycopg2.DatabaseError) as err:
        raise err


# 一括更新: RETURNING で列挙した戻り値取得
def bulk_update_cc(
        conn: connection,
        params: List[Tuple[str, str]],
        logger: Optional[logging.Logger] = None) -> List[Tuple[Any, ...]]:
    if logger is not None:
        logger.debug(f"params: {params}")
    result: List[Tuple[Any, ...]]
    try:
        cur: cursor
        with conn.cursor() as cur:
            result = execute_values(cur, """
UPDATE mainte2.unauth_ip_addr 
  SET country_code = data.cc
FROM
  (VALUES %s) AS data (ip, cc)
WHERE
  ip_addr=data.ip RETURNING id, ip, country_code""",
                                    params,
                                    fetch=True
                                    )
            # 実行されたSQLを出力
            if logger is not None:
                if cur.query is not None:
                    logger.debug(f"{cur.query.decode('utf-8')}")

            return result
    except (Exception, psycopg2.DatabaseError) as err:
        raise err


def bulk_update_cc_with_fetch_query(
        conn: connection,
        params: List[Tuple[str, str]],
        logger: Optional[logging.Logger] = None) -> List[Tuple[Any, ...]]:
    if logger is not None:
        logger.debug(f"params: {params}")
    result: List[Tuple[Any, ...]]
    try:
        cur: cursor
        with conn.cursor() as cur:
            execute_values(cur, """
UPDATE mainte2.unauth_ip_addr 
  SET country_code = data.country_code
FROM
  (VALUES %s) AS data (ip, country_code)
WHERE
  ip_addr=data.ip""",
                           params
                           )
            # 実行されたSQLを出力
            if logger is not None:
                if cur.query is not None:
                    logger.debug(f"{cur.query.decode('utf-8')}")

            # 更新結果取得
            #  クエリーパラメータからIPアドレス(先頭)のみ取り出す
            ip_list: List[str] = [param[0] for param in params]
            in_clause: Tuple[str, ...] = tuple(ip_list)
            cur.execute("""
SELECT
   id, ip_addr, country_code
FROM
   mainte2.unauth_ip_addr
WHERE
   ip_addr IN %s ORDER BY id""", (in_clause,)
                        )
            result = cur.fetchall()
            return result
    except (Exception, psycopg2.DatabaseError) as err:
        raise err


def batch_main():
    logging.basicConfig(format='%(levelname)s %(message)s')
    app_logger = logging.getLogger(__name__)
    app_logger.setLevel(level=logging.DEBUG)

    parser = argparse.ArgumentParser()
    parser.add_argument("--values-returning", action="store_true",
                        help="Use exeecute_values with retruning.")
    args: argparse.Namespace = parser.parse_args()
    values_returning: bool = args.values_returning

    db: Optional[PgDatabase] = None
    try:
        db = PgDatabase(logger=None)
        conn: connection = db.get_connection()
        # 更新前レコード
        app_logger.info("--更新前レコード--")
        org_records: List[Tuple[Any, ...]] = before_records(conn)
        for org_rec in org_records:
            app_logger.info(org_rec)

        # 更新後のレコード
        upd_records: List[Tuple[Any, ...]]
        if values_returning:
            upd_records = bulk_update_cc(conn, qry_params, app_logger)
        else:
            upd_records = bulk_update_cc_with_fetch_query(conn, qry_params, app_logger)
        conn.commit()
        app_logger.info("--更新後レコード--")
        for upd_rec in upd_records:
            app_logger.info(upd_rec)
    except psycopg2.Error as db_err:
        app_logger.error(db_err)
        exit(1)
    except Exception as err:
        app_logger.error(err)
        exit(1)
    finally:
        if db is not None:
            db.close()


if __name__ == '__main__':
    batch_main()

3. スクリプト実行

python 仮想環境 (~py_venv/py_psycopg2) に入る

$ . ~/py_venv/py_psycopg2/bin/activate
(py_psycopg2) $ 

3-1. 通常の関数実行 (別途SELECTクエリー発行)

(py_psycopg2) $ python TestBulkUpdate_unauth_ip_addr.py 
INFO --更新前レコード--
INFO (3, '14.50.30.17', None)
INFO (2, '14.215.120.90', None)
INFO (1, '20.244.134.31', None)
DEBUG params: [('14.50.30.17', 'KR'), ('14.215.120.90', 'CN'), ('20.244.134.31', 'US')]
DEBUG 
UPDATE mainte2.unauth_ip_addr 
  SET country_code = data.cc
FROM
  (VALUES ('14.50.30.17','KR'),('14.215.120.90','CN'),('20.244.134.31','US')) AS data (ip, cc)
WHERE
  ip_addr=data.ip
INFO --更新後レコード--
INFO (1, '20.244.134.31', 'US')
INFO (2, '14.215.120.90', 'CN')
INFO (3, '14.50.30.17', 'KR')

3-2. RETURNINGの戻り値利用した関数実行

(py_psycopg2) $ python TestBulkUpdate_unauth_ip_addr.py --values-returning
INFO --更新前レコード--
INFO (3, '14.50.30.17', None)
INFO (2, '14.215.120.90', None)
INFO (1, '20.244.134.31', None)
DEBUG params: [('14.50.30.17', 'KR'), ('14.215.120.90', 'CN'), ('20.244.134.31', 'US')]
DEBUG 
UPDATE mainte2.unauth_ip_addr 
  SET country_code = data.cc
FROM
  (VALUES ('14.50.30.17','KR'),('14.215.120.90','CN'),('20.244.134.31','US')) AS data (ip, cc)
WHERE
  ip_addr=data.ip RETURNING id, ip_addr, country_code
INFO --更新後レコード--
INFO (3, '14.50.30.17', 'KR')
INFO (2, '14.215.120.90', 'CN')
INFO (1, '20.244.134.31', 'US')

最後に

試しにログ出力したクエリーを dockerコンテナ内の psqlコマンドで実行してみます。

国コードをアルファベット列の値に変更

IPアドレス 国コード アルファベット
20.244.134.31 US EF
14.215.120.90 CN CD
14.50.30.17 KR AB
af08a238d21f:/# echo "UPDATE mainte2.unauth_ip_addr 
  SET country_code = data.cc
FROM
  (VALUES ('14.50.30.17','AB'),('14.215.120.90','CD'),('20.244.134.31','EF')) AS data (ip, cc)
WHERE
  ip_addr=data.ip RETURNING id, ip_addr, country_code;" \
> | psql -Udeveloper qiita_exampledb
 id |    ip_addr    | country_code 
----+---------------+--------------
  3 | 14.50.30.17   | AB
  2 | 14.215.120.90 | CD
  1 | 20.244.134.31 | EF
(3 rows)

このクエリーで一括更新できるなんてこのスクリプトを実行して初めて知りました。

参考URL (1) のドキュメントのサンプルをざっと読んでみましたがこのクエリーは思いつかないですね。

0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?