(2020/8/6) 7.複数レコード挿入でハマった問題と対応を追記
Cloud Pak for Data(以下CP4D)上で、分析プロジェクトのNotebookでPythonを使ってDb2のデータを読み書きする方法です。と言っても、CP4D固有の部分は最初のDB接続情報を取得するproject_libの部分だけで、後は一般的なpythonでのDb2の読み書きと同じです。
検証したCP4Dバージョン: v3.0.1
CP4D上で使えるDb2接続用pythonライブラリ
代表的には3つの方法があるようです。いろいろ比較してみたところ、Read(Select)ならデータをPandasデータフレーム化してくれるpd.read_sqlが便利で、Write(Insert/Update)やその他DB操作はibm_dbを使うのが良さそうです。
pandas | ibm_db | ibmdbpy | |
---|---|---|---|
概要 | select結果をpandasデータフレーム化(read_sql) pandasデータフレームのデータを書き込み(to_sql) |
SQLベタ書きなのでほぼ何でもできる CP4Dに導入済 |
PythonからDb2を操作するライブラリの一種。CP4Dに導入済 参考リンク |
レコード取得 | ○(read_sql) | ○ (SELECT) (*1) |
○ (SELECT) |
テーブル洗い替え(全件消去&挿入) | ○ (to_sql) (*2) |
○ (TRUNCATE→INSERT) |
○ (TRUNCATE→INSERT) |
レコード挿入 | ○ (to_sql) | ○ (INSERT) (*3) |
○ (INSERT) |
レコード更新&挿入 | × | ○ (MERGE) | × (*4) |
注記:
- (*1) 取得したデータをpandasデータフレーム化するにはフェッチする一工夫が必要。(参考リンク)
- (*2) to_sqlの
if_exists = 'replace'
オプションで可能だが、テーブル名やスキーマ名を小文字で指定する必要があるなどクセがある。 - (*3) 複数レコードを挿入するときは、pandasデータフレームのデータをtuple化し、パラメーターマーカー化したSQL文を使う方法が便利。本文内でも紹介。(参考リンク)
- (*4) 2020/7/6時点で、mergeに相当するオプションが無さそうです。
Db2データを読み書きしてみる
1.DB接続情報を取得
ここがCP4D固有です。project_libを使って、予めデータ資産に登録したDB接続データ(テーブル毎に作成するもの)から接続情報を取得しています。TBL1がデータ資産名です。
project = Project.access()
DB_credentials = project.get_connected_data(name="TBL1")
2. DB接続 (jaydebeapi)
pandasのread_sqlで使うDB接続を作成します。ここで1の接続情報を使います。パスワードなどをベタ書きする必要がないので安心です。
import jaydebeapi
DB_connection_J = jaydebeapi.connect(
'com.ibm.db2.jcc.DB2Driver',
'{}://{}:{}/{}:user={};password={};'.format('jdbc:db2',
DB_credentials['host'],
DB_credentials['port'],
DB_credentials['database'],
DB_credentials['username'],
DB_credentials['password'])
)
3. レコード取得 (pd.read_sql)
read_sqlでレコード取得すると、直ちにpandasデータフレーム化されるので便利です。
import pandas as pd
sql = "SELECT * FROM TT_SCHEMA.TBL1"
df_tbl1 = pd.read_sql(sql, DB_connection_J)
df_tbl1.head()
結果:
TBL1は4列のDb2テーブルで、3行のレコードをサンプルとして含めています。
主キーはIDで、DEL_FLGはよくある論理削除用フラグをイメージしています。参考としてDDLを添付します。
CREATE TABLE TT_SCHEMA.TBL1 (
ID INT NOT NULL,
VALUE_INT INT,
VALUE_CHAR VARCHAR(16),
DEL_FLG INT NOT NULL default 0
)
ALTER TABLE TT_SCHEMA.TBL1 PRIMARY KEY (ID)
4. DB接続 (ibm_db)
ibm_dbでDB接続を作成します。ここでも1のDB接続情報を流用しています。
import ibm_db
ssldsn = "DATABASE=" + DB_credentials['database'] + \
";HOSTNAME=" + DB_credentials['host'] + \
";PORT=" + str(DB_credentials['port']) + \
";PROTOCOL=TCPIP;UID=" + DB_credentials['username'] + ";PWD=" + DB_credentials['password'] + ";"
DB_connection_I = ibm_db.connect(ssldsn,'','')
5. レコード取得 (ibm_db)
ibm_dbでSELECT文でレコードを取得します。取得したデータをpandasデータフレームに入れるには、行ごとにfetchする必要がありちょっとめんどくさいです。pd.read_sqlの方がシンプル。 (参考情報)
sql = "SELECT * FROM TT_SCHEMA.TBL1;"
result = ibm_db.exec_immediate(DB_connection_I, sql)
result_dict = ibm_db.fetch_assoc(result)
fields = list(result_dict.keys())
values = []
while result_dict != False:
values.append(list(result_dict.values()))
result_dict = ibm_db.fetch_assoc(result)
display(fields)
display(values)
df = pd.DataFrame(data = values, columns = fields)
display(df)
6. 1レコード挿入 (ibm_db)
ibm_dbでTBL1に1件挿入してみます。挿入前後のテーブルをpd.read_sqlで確認しています。
sql = "SELECT * FROM TT_SCHEMA.TBL1"
pd.read_sql(sql, DB_connection_J)
sql = "INSERT INTO TT_SCHEMA.TBL1 (ID, VALUE_INT, VALUE_CHAR, DEL_FLG) VALUES (4, 40, 'ddd', 0)"
result = ibm_db.exec_immediate(DB_connection_I, sql)
sql = "SELECT * FROM TT_SCHEMA.TBL1"
pd.read_sql(sql, DB_connection_J)
7. 複数レコード挿入 (ibm_db)
複数レコードを挿入する場合は、こちらのStackoverflow記事にあるように、pandasデータフレームのデータをtuple化し、パラメーターマーカー化したSQL文を使う方法が便利です。
sql = "SELECT * FROM TT_SCHEMA.TBL1"
pd.read_sql(sql, DB_connection_J)
# data to be inserted
df_insert = pd.DataFrame(data={'ID': [5,6,7], 'VALUE_INT': [50,60,70], 'VALUE_CHAR': ['eee','fff','ggg']})
# transform to tuple of tuples
tuple_of_tuples = tuple([tuple(x) for x in df_insert.values])
# insert multiple records
sql = "INSERT INTO TT_SCHEMA.TBL1 (ID,VALUE_INT,VALUE_CHAR) VALUES(?,?,?)"
stmt = ibm_db.prepare(DB_connection_I, sql)
result = ibm_db.execute_many(stmt, tuple_of_tuples)
sql = "SELECT * FROM TT_SCHEMA.TBL1"
pd.read_sql(sql, DB_connection_J)
(2020/8/6追記)
ibm_db.execute_manyで実行時に、Value parameters array XX is not homogeneous with previous parameters array
というエラーにハマりました。
こちらのStackoverflow記事には、NaNを置換すれば行けたよと書いてありましたが、それもNG。
結局、投入するpandas dataframeを作る前の元の値の一部が文字列(object)ではなく数値(int)だったことが原因でした。df.dtypesでよく確かめれば見つかる、データの型に起因する問題でした。
8. レコード更新&挿入 (ibm_db)
TBL1の主キーはIDです。ID列が同じであればUpdate、なければInsertするという例です。
複数レコードの例を示しますが、1レコードでも使えます。
sql = "SELECT * FROM TT_SCHEMA.TBL1"
pd.read_sql(sql, DB_connection_J)
ID=2をUpdate、5,6をInsertします。
# data to be updated and inserted
df_insert = pd.DataFrame({'ID': [2,5,6], 'VALUE_INT': [200,50,60],'VALUE_CHAR':['aaa','bbb','ccc'],'DEL_FLG':[0,0,0]})
# transform to tuple of tuples
tuple_of_tuples = tuple([tuple(x) for x in df_insert.values])
# update & insert
sql="MERGE INTO TT_SCHEMA.TBL1 AS T1 USING (VALUES (?,?,?,?)) AS T2(ID,VALUE_INT,VALUE_CHAR,DEL_FLG) \
ON T1.ID=T2.ID \
WHEN MATCHED THEN UPDATE SET T1.VALUE_INT=T2.VALUE_INT,T1.VALUE_CHAR=T2.VALUE_CHAR, T1.DEL_FLG=T2.DEL_FLG \
WHEN NOT MATCHED THEN INSERT (ID,VALUE_INT,VALUE_CHAR,DEL_FLG) VALUES (T2.ID,T2.VALUE_INT,T2.VALUE_CHAR, T2.DEL_FLG);"
stmt=ibm_db.prepare(DB_connection_I, sql)
result = ibm_db.execute_many(stmt,tuple_of_tuples)
sql = "SELECT * FROM TT_SCHEMA.TBL1"
pd.read_sql(sql, DB_connection_J)
最後にDisconnect
忘れずにDB接続を切断します。
# disconnect jadebeapi connection
DB_connection_J.close()
# disconnect ibm_db connection
ibm_db.close(DB_connection_I)
以上です。
今後、気が向いたら他のパターンを試して追記します。