はじめに
こちらの記事の続きです。
前回までで、dmpはRDS for Oracleにインポートされました。
この記事では、Oracleの全テーブルのデータをCSVファイルに出力する部分を記載します。
前提条件
今さらですが、「全テーブルのcsvファイルを出力」についての詳細をここで明確にしておきます。
- スキーマ内の全テーブルのデータをcsvファイルに書き出して出力する
- テーブル数は1,000以上ある
- csvファイルは1テーブルにつき1ファイルとし、テーブル名をファイル名に設定したうえでカラムヘッダーを出力する
方法
私が思いついた方法は以下のとおりでした。
- SQL*PlusでSELECTの結果をSPOOLする
- AWS CLIでSELECTの結果をエクスポートする
- python-oracledbでSELECTの結果をエクスポートする
このうち、汎用性や拡張性の高さを感じたので3のpython-oracledbを使うことにしました。
また、pythonで行なう場合は、さらに実行環境として複数の選択肢があります。たとえば、Lambdaで実行することも考えられるわけですが、今回は単発の実行であることと、すでに前段の手順で(SQL*Plusクライアントとして)EC2を立ち上げていたので、これを利用することにします。
環境設定
Pythonライブラリ
Pythonですが、記事執筆時点では、Amazon Linux 2023 AMIを使ってインスタンスを作成した場合は、最初からPython 3.9がインストールされていました。
しかし、pipが無かったので次の要領でライブラリを揃えました。
$ wget https://bootstrap.pypa.io/get-pip.py
$ python3 get-pip.py #pipのインストール
$ python3 -m pip install oracledb
$ python3 -m pip install boto3
$ python3 -m pip install pandas
Pythonで必要になるライブラリは後出しですが、結論これらのライブラリが今回は必要でした。
S3へのアクセスポリシー
Python実行環境であるEC2とのあいだのファイルのやりとりはS3を経由して行ないます。この場合、EC2に対してS3へのIAMロールをアタッチしておく必要があります。
出力したCSVファイルもS3にアップロードすることにするので、S3への書き込み可能なポリシーでロールを用意して、それをアタッチしてください。
{
"Version": "2012-10-17",
"Statement": [
{
"Sid": "VisualEditor0",
"Effect": "Allow",
"Action": [
"kms:Decrypt",
"kms:GenerateDataKey"
],
"Resource": "*"
},
{
"Sid": "VisualEditor1",
"Effect": "Allow",
"Action": [
"s3:PutObject",
"s3:GetObject",
"s3:ListBucketMultipartUploads",
"s3:AbortMultipartUpload",
"s3:ListBucket",
"s3:DeleteObject",
"s3:PutObjectAcl",
"s3:ListMultipartUploadParts"
],
"Resource": [
"arn:aws:s3:::*/*",
"arn:aws:s3:::oracle-dmp"
]
}
]
}
Python
今回は次のスクリプトを書きました。
import os
import oracledb
import csv
import tempfile
import boto3
import pandas as pd
# Oracle RDS接続情報
db_username = "admin"
db_password = "xxx"
db_host = "rds_instance_name.id.ap-northeast-n.rds.amazonaws.com" #RDSエンドポイント名
db_port = "1521"
db_service_name = "ORCL"
# S3バケット情報
s3_bucket_name = "oracle-dmp"
s3_prefix = "OutputCSV" # オプション:S3のプレフィックス
# Oracle RDSに接続
os.environ["NLS_LANG"] = "Japanese_Japan.UTF8" # データベースからのデータをUTF-8で取得
dsn = oracledb.makedsn(db_host, db_port, service_name=db_service_name)
conn = oracledb.connect(user=db_username, password=db_password, dsn=dsn)
# データをエクスポートする一時ディレクトリを作成
with tempfile.TemporaryDirectory() as temp_dir:
cursor = conn.cursor()
# テーブルリストを取得
cursor.execute("SELECT table_name FROM all_tables WHERE owner = 'hogehoge' order by num_rows desc")
tables = [row[0] for row in cursor.fetchall()]
# 各テーブルからデータを取得してCSVファイルにエクスポート
for table in tables:
file_path = os.path.join(temp_dir, f"{table}.csv")
sql = f"SELECT * FROM hogehoge.{table}"
column_names = [desc[0] for desc in cursor.description]
# CSVファイルに書き込み
df = pd.read_sql(sql, con=conn)
df.to_csv(file_path, index=False)
# CSVファイルをS3にアップロード
s3_key = os.path.join(s3_prefix, f"{table}.csv")
s3_client = boto3.client("s3")
s3_client.upload_file(file_path, s3_bucket_name, s3_key)
# Oracle RDS接続をクローズ
conn.close()
かなりシンプルだと思います。
Connectしたあと、まずall_tablesから該当スキーマの全テーブルリストを取得。その後、1行ずつテーブル名を読み出して作成したSELECT文を発行しながら取得された結果セットをpandasのDataFrameにいったん投入し、すぐにCSV出力。出力後、S3へのアップロードを行なったら、テーブルリストの次の行の処理に移ります。
今回の前提条件は「全テーブル」ということだったためall_tablesへのSELECTになっていますが、もちろんビューなどを含めたかったらall_viewsなりall_objectsなりを使えばいいです。
CSV出力時にpandasのDataFrameを経由している理由
最初、pandasを経由せず、次のように書いていました。
<省略>
# データをエクスポートする一時ディレクトリを作成
with tempfile.TemporaryDirectory() as temp_dir:
cursor = conn.cursor()
# テーブルリストを取得
cursor.execute("SELECT table_name FROM all_tables WHERE owner = 'hogehoge' order by num_rows desc")
tables = [row[0] for row in cursor.fetchall()]
# 各テーブルからデータを取得してCSVファイルにエクスポート
for table in tables:
file_path = os.path.join(temp_dir, f"{table}.csv")
cursor.execute(f"SELECT * FROM hogehoge.{table}")
column_names = [desc[0] for desc in cursor.description]
# CSVファイルに書き込み(UTF-8エンコーディング)
with open(file_path, "w", newline="", encoding="utf-8") as csv_file:
csv_writer = csv.writer(csv_file)
csv_writer.writerow(column_names)
csv_writer.writerows(cursor)
# CSVファイルをS3にアップロード
s3_key = os.path.join(s3_prefix, f"{table}.csv")
s3_client = boto3.client("s3")
s3_client.upload_file(file_path, s3_bucket_name, s3_key)
この場合、次の型エラーが発生し、これが解消できませんでした。
TypeError: __str__ returned non-string (type bytes)
Oracle側のBLOBやRAWの列が、oracledbによってPythonのBytesに暗黙変換されることが原因であるということはわかったのですが、
それではとPython側のstrに変換されるように、outputtypehandlerを定義してConnect時に呼び出すことで予めDB型の型変換で対応しようとしたところ、
import os
import oracledb
import csv
import tempfile
import boto3
# Oracledb.BINALY型をstringに型変換(規定ではbyte列に変換される))
def bin2char(cursor, name, defaultType, size, precision, scale):
if defaultType in (oracledb.DB_TYPE_RAW, oracledb.DB_TYPE_BLOB):
return cursor.var(str, arraysize=cursor.arraysize)
# Oracle RDS接続情報
・・・
# S3バケット情報
・・・
# Oracle RDSに接続
os.environ["NLS_LANG"] = "Japanese_Japan.UTF8" # データベースからのデータをUTF-8で取得
dsn = oracledb.makedsn(db_host, db_port, service_name=db_service_name)
conn = oracledb.connect(user=db_username, password=db_password, dsn=dsn)
conn.outputtypehandler = bin2char
・・・
Oracledb側のエラ-が発生してしまいました。
oracledb.exceptions.DatabaseError: DPY-4007: cannot convert from data type DB_TYPE_BLOB to DB_TYPE_VARCHAR
といったところで困っていたところ、pandasを経由してみたらするっとこのあたりが解消し(てしまっ)た次第です。
おわりに
こうして、最終的には望んだとおりのCSVファイルを得ることができました。
後日談というか今回の落ちとしては、私がこのように悪戦苦闘とした結果CSVファイル化できた一方で、並行して作業していたこの道Oracle20年の協力会社のエンジニアさんが、サードパーティのクライアントツールを使ってすぐにCSVを出力していたのが印象的でした。
ツール、便利ですね。