0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 1 year has passed since last update.

戸建て住宅マーケティングAI 開発 (2) bulk sql for postgres + python3

Posted at

開発中
タイトルなし.png

今回、バッチ処理には python、databaseには postgres を使用しますが、
大量データの insert , update , upsert (update or insert)を行う為、
psycopg2.extras for python3 を使用し、bulk sql を用意します。

srcは以降に記載していますので、詳細は、そちらをご覧ください。

insert , update , upsert のいずれの場合も、処理対象のレコード群を渡すと、
まず、bulk 処理する単位(chunk)にレコードを分割し、
その後、sqlを実行します。

参考させて頂いたurl

mysqlでも bulk処理を実装できますが、
今回のpostgresの場合、以下を参考にさせて頂き、これを python3化しています。

処理対象のレコード群分割 (chunk化)

    # for bulk insert
    def divide_rows(self, org_rows, chunk_size, atri_keys):
        i = 0
        chunk = []
        ret_rows = []
        for org_row in org_rows:
            new_tuple = ()
            for atri_key in atri_keys:
                new_tuple += (org_row[atri_key],)
            chunk.append( new_tuple )
            
            if len(chunk) >= chunk_size:
                ret_rows.append(chunk)
                chunk = []
            i += 1

        if len(chunk) > 0:
            ret_rows.append(chunk)

        return ret_rows

bulk insert

from psycopg2  import extras # for bulk insert

    def bulk_ insert(self, tbl_name, atri_keys, rows):
        
        bulk_insert_size = self.get_conf()["common"]["bulk_insert_size"]
        row_groups = self.divide_rows(rows, bulk_insert_size, atri_keys )
        
        sql = "INSERT INTO %s (%s) VALUES %s" % (tbl_name,
                                                 ",".join(atri_keys),"%s")
        
        db_conn = self.db_connect()
        with self.db_cursor(db_conn) as db_cur:
            for row_group in row_groups:
                try:
                    # bulk insert
                    extras.execute_values(db_cur,sql,row_group)
                except Exception as e:
                    logger.error(e)
                    logger.error(sql)
                    logger.error(row_group)
                    return False
                    
            db_conn.commit()
        return True

bulk update

    # bulk update or insert
    def bulk_update(self, tbl_name, pkeys, atri_keys, rows):

        bulk_insert_size = self.get_conf()["common"]["bulk_insert_size"]
        row_groups = self.divide_rows(rows, bulk_insert_size, atri_keys )
        
        sql = """
UPDATE {0}
SET    {1}
FROM ( VALUES {2}) AS data_tbl({3})
WHERE  {4}
"""
        set_key_vals = []
        for atri_key in atri_keys:
            set_key_vals.append("%s=data_tbl.%s" % (atri_key,atri_key) )
            
        where_conds  = []
        for pkey in pkeys:
            where_conds.append("%s.%s=data_tbl.%s" % (tbl_name,pkey,pkey))

        set_key_vals_str = ",".join( set_key_vals )
        atri_key_str     = ",".join( atri_keys )
        where_conds_str  = " AND ".join(where_conds)
        
        sql = sql.format( tbl_name,
                          set_key_vals_str,
                          "%s",
                          atri_key_str,
                          where_conds_str )
        
        db_conn = self.db_connect()
        with self.db_cursor(db_conn) as db_cur:
            for row_group in row_groups:
                try:
                    # bulk upsert
                    extras.execute_values(db_cur,sql, row_group )
                except Exception as e:
                    logger.error(e)
                    logger.error(sql)
                    logger.error(row_group)
                    return False
                    
        db_conn.commit()
        
        return True

bulk upsert ( update or insert )

    def bulk_update(self, tbl_name, pkeys, atri_keys, rows):

        bulk_insert_size = self.get_conf()["common"]["bulk_insert_size"]
        row_groups = self.divide_rows(rows, bulk_insert_size, atri_keys )
        
        sql = """
UPDATE {0}
SET    {1}
FROM ( VALUES {2}) AS data_tbl({3})
WHERE  {4}
"""
        set_key_vals = []
        for atri_key in atri_keys:
            set_key_vals.append("%s=data_tbl.%s" % (atri_key,atri_key) )
            
        where_conds  = []
        for pkey in pkeys:
            where_conds.append("%s.%s=data_tbl.%s" % (tbl_name,pkey,pkey))

        set_key_vals_str = ",".join( set_key_vals )
        atri_key_str     = ",".join( atri_keys )
        where_conds_str  = " AND ".join(where_conds)
        
        sql = sql.format( tbl_name,
                          set_key_vals_str,
                          "%s",
                          atri_key_str,
                          where_conds_str )
        
        db_conn = self.db_connect()
        with self.db_cursor(db_conn) as db_cur:
            for row_group in row_groups:
                try:
                    # bulk upsert
                    extras.execute_values(db_cur,sql, row_group )
                except Exception as e:
                    logger.error(e)
                    logger.error(sql)
                    logger.error(row_group)
                    return False
                    
        db_conn.commit()
        
        return True

その他 - DB( postgres )接続

import psycopg2
import psycopg2.extras

    def db_connect(self):
        global db_conn
        
        if db_conn:
            return db_conn
        
        db_conn = psycopg2.connect(
            database    = conf["db"]["db_name"],
            user        = conf["db"]["db_user"],
            password    = conf["db"]["db_pass"],
            host        = conf["db"]["db_host"],
            port        = conf["db"]["db_port"] )
        return db_conn
	
    def db_cursor(self,db_conn):
        return db_conn.cursor(cursor_factory=psycopg2.extras.DictCursor)

使い方 - bulk upsertの場合

        util_db = Db()
        util_db.bulk_upsert(
            self.tbl_name_header()+"_sales_count_by_city_price",
            ["pref","city","price","calc_date"],
            ["pref","city","price","calc_date",
             "discuss_count", "discuss_days",
             "onsale_count",  "onsale_days",
             "sold_count"],
            ["discuss_count", "discuss_days",
             "onsale_count",  "onsale_days",
             "sold_count"],
            ret_datas )
0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?