BigQueryを利用するにあたりRedshiftのデータをBigQueryに移行する必要がありました。
その際の手法や手順などを記載します。
前提
- RailsからRedshiftのクエリを実行できるようにしておくこと
- bqコマンドを実行できるようにしておくこと
- AWS SDKがインストールされていること
処理内容
大きくは以下のような流れで作成しました。
- RedshiftのデータをS3にアンロード
- 対象テーブルのスキーマファイル作成
- bq loadコマンドでデータをインポート
アンロード
query = sprintf("
UNLOAD ('SELECT * FROM %<schema>s.%<table>s')
TO 's3://redshift-unload-bucket/unload/%<schema>s/%<table>s/'
IAM_ROLE 'arn:aws:iam::************:role/redshift-unload-role'
MANIFEST
DELIMITER AS ','
GZIP
ADDQUOTES
ALLOWOVERWRITE
PARALLEL OFF
ESCAPE
MAXFILESIZE AS 50MB
", sql:sql, schema: schema, table: table)
ActiveRecord::Base.connection.execute query
アンロードするためのIAMロール作成
ロール名:redshift-unload-role
ポリシー:AmazonS3FullAccess、AmazonRedshiftFullAccess
特記事項
- BigQueryのロードに対応しているCSV形式(
DELIMITER AS ','
)での出力にする - BigQueryにロードする際にCSV構造が崩れないように
ADDQUOTES
とESCAPE
オプションをつけておく -
PARALLEL ON
にするとなぜかレコードが欠損する事象が発生したのでOFF
に - データ量が多すぎるとBigQueryのロードがうまくいかない問題が発生したので
MAXFILESIZE
で50MB
ずつ分割
スキーマファイル作成
Redshiftのpg_catalog.pg_table_defからスキーマ情報が取得できます。
columns = []
sql = sprintf("
SELECT * FROM pg_catalog.pg_table_def
WHERE schemaname = '%{schema}' AND tablename = '%{table}'
", schema: schema, table: table)
result = ActiveRecord::Base.connection.execute sql
result.each{|record|
column = {mode:get_mode(record["notnull"]), name:record["column"], type:get_type(record["type"])}
columns.push(column)
}
スキーマファイル出力
file = "./schema/%{schema}/%{table}.schema" % {schema:schema, table:table}
File.open(file, 'w') do |f|
f.puts columns.to_json
end
補足
RedshiftとBigQueryの型の違いを補完するためのメソッドを以下のようにつくってます
def self.get_mode(notnull)
if notnull == 'y' then
return 'REQUIRED'
end
return 'NULLABLE'
end
def self.get_type(type)
if type.index('character') then
return 'STRING'
elsif type.index('bigint') then
return 'INTEGER'
elsif type.index('integer') then
return 'INTEGER'
elsif type.index('numeric') then
return 'NUMERIC'
elsif type.index('boolean') then
return 'BOOL'
elsif type.index('timestamp') then
return 'TIMESTAMP'
elsif type.index('date') then
return 'DATE'
end
return 'STRING'
end
bq loadコマンドでBigQueryにインポート
S3上のアンロードしたオブジェクトを取得し、ループ内で
S3からGSへのアップロードしbq load
コマンドを実行しています。
prefix = "unload/%{schema}/%{table}/" % {schema:schema, table:table}
object_list = get_objects(prefix)
for object in object_list do
if object.index('.gz') then
s3_to_gs(object)
gs_path = "gs://%{bucket}/%{object}" % {bucket:$gs_bucket, object:object}
schema_path = "./schema/%{schema}/%{table}.schema" % {schema:schema, table:table}
command = "bq load %{dataset}.%{table} %{gs_path} %{schema_path}" % {
dataset:schema, table:table, gs_path:gs_path, schema_path:schema_path}
system(command)
end
end
補足
アンロードしたS3上のファイルを取得するメソッドは以下のようにつくってます
def self.s3_to_gs(object)
s3_path = "s3://%{bucket}/%{object}" % {bucket:$s3_bucket, object:object}
gs_path = "gs://%{bucket}/%{object}" % {bucket:$gs_bucket, object:object}
command = "gsutil cp -r %{s3_path} %{gs_path}" % {s3_path:s3_path, gs_path:gs_path}
system(command + $command_output)
end
アンロードしたS3上のファイルを取得するメソッドは以下のようにつくってます
def self.get_objects(prefix)
object_list = []
next_token = ''
while true do
if next_token == '' then
response = $s3.list_objects_v2(bucket: $s3_bucket, prefix: prefix)
else
response = $s3.list_objects_v2(bucket: $s3_bucket, prefix: prefix, continuation_token: next_token)
end
if response.contents.length == 0 then
return []
end
for content in response.contents do
object_list.push(content.key)
end
if response.next_continuation_token != nil then
next_token = response.next_continuation_token
else
return object_list
end
end
end
最後に
- ここではスキーマファイルを作成しBigQueryにロードさせましたが、
bq load
の--autodetect
オプションを利用すればスキーマ情報を自動検出することも可能です。 - 自動検出ではデータの値に応じて文字列、数値、時間は高い精度で検出してくれました。簡単にロードしたい場合はかなりつけると思います。
- データのサイズが大きくなると今回の`bq load'のやり方ではかなりの時間がかかることが想定されます。dataflowを使うことでBigQueryへのデータロードを大幅に削減することもできたのでまたの機会に記事にできればと思います。