ちょっとGCPを触る機会があったので、そのメモです。
結構よくあるケースだから記事がなかったのか、
自分のスキルが雑魚なのか。
あまり日本語の記事もなかったので、きっといつか誰かの役に立つはず。
クエリとかは実際からはシンプルに修正してます。
やったこと
- GCSからデータを取得してちょっと加工する
- 加工したデータをBigQueryにアップロードする
- GCSのデータをアーカイブする
- BigQueryからSQLでデータを取得
- BigQueryの別のテーブルにロードする
実行環境
PC:MacBook Pro
OS:Mac OS High Sierra
言語:Python2.7
フレームワーク:Apache Beam SDK
環境セットアップ
セットアップとか、サンプルの実行は
こちらを参考にしました。
メイン処理と引数
parserで引数を設定します。
PipelineOptionsを生成して、
argv = sys.argv
parser = argparse.ArgumentParser()
parser.add_argument('--input',
dest='input',
default=default_input,
help='Input file to process.')
known_args, pipeline_args = parser.parse_known_args(argv)
# PipelineOptionを設定
options = PipelineOptions(pipeline_args)
# googleで実行するためのオプションを追加で設定
google_cloud_options = options.view_as(GoogleCloudOptions)
google_cloud_options.project = projectid
google_cloud_options.staging_location = staging_location
google_cloud_options.temp_location = temp_location
# GoogleDataflowで実行する場合はこのように指定します。
options.view_as(StandardOptions).runner = 'DataflowRunner'
1.GCSからデータを取得してちょっと加工する→2.加工したデータをBigQueryにアップロードする
サンプルではCSVデータを加工していないのですが、
メソッドcsvParseを修正したら、加工できます。加工した値をreturnしてくださいませ。
今回は、変数rowsからBQのみに書き出していますが、
ここでGCSにも出力
みたいなことも可能です。そうすると、dataflow上でフローチャートが分岐するイメージです。
def pipeline1(options=None, known_args=None):
"""
GCSからCSVを読み込み、BQに投げ込むパイプライン
:param options:
:param known_args:
:return:
"""
# CSVデータのパース
def csvParse(elem):
import csv
text = elem.encode('utf-8')
reader = csv.reader(text.splitlines(), delimiter=',')
row = next(reader)
return {
'id': int(row[0]),
'name': row[1],
'age': row[2]
}
# パイプラインを生成
p = beam.Pipeline(options=options)
# GCSから読み込んで、CSVをパースする
# skip_header_linesで、ヘッダーをスキップ可能
# 読み込んだCSVはrows変数に格納される
rows = p | 'Read From GCS' >> beam.io.ReadFromText(known_args.input, skip_header_lines=1) \
| 'parse data' >> beam.Map(csvParse)
# rowsをBQに格納する
# schemaで、bqのテーブルのスキーマを指定する。先に作ってある場合は、
# bq show --format=prettyjson projectname:datasetname.tablename | jq '.schema.fields.name' > schema.txt
# で確認する
# create_dispositionはCREATE_NEVER、CREATE_IF_NEEDED
# から選択。write_dispositionは、WRITE_TRUNCATE、WRITE_APPEND 、WRITE_EMPTY
# から選択
rows | 'Insert to BQ.master' >> beam.io.WriteToBigQuery(project=projectid, dataset=datasetname, table=tablename,
schema='id:INTEGER,name:STRING,age:INTEGER',
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)
## 実行する
p.run().wait_until_finish()
3.GCSのデータをアーカイブする
BQに取り込んだら別のバケットにアーカイブするのを想定しています。
「BQへ書き込んだあと」という表現はできないので、
パイプラインが完了したあとにアーカイブの処理を記述します。
ちなみにここに記述した処理は、GoogleCloudDataflowには表示されません。
def gcs_move(src, dest):
gcsio.GcsIO().copy(src, dest)
def gcs_delete(fileName):
gcsio.GcsIO().delete(fileName)
# csvファイル一覧を取得
fileList = gcsio.GcsIO().glob(known_args.input)
if len(fileList) == 0:
exit()
# GCSから読み込んだファイルのアーカイブ
timestamp = datetime.now()
for file in fileList:
filename = file.encode('utf-8')
bucket, name = gcsio.parse_gcs_path(filename)
newFileName = 'gs://{}/{}/{}_{}'.format(
archiveBucket,
timestamp.strftime("%Y/%m/%d"),
timestamp.strftime("%H%M%S"), name)
gcs_move(filename, newFileName)
gcs_delete(filename)
4.BigQueryからSQLでデータを取得→5.BigQueryの別のテーブルにロードする
BigQuerySourceを使って、クエリを実行します。
取得したデータは変数linesに入るので、
今度は、それをBigQuerySinkで別のテーブルにロードします。
def pipeline2(options=None, known_args=None):
"""
BQにクエリを投げ、別のテーブルに出力するパイプライン
:param options:
:param known_args:
:return:
"""
query = """
SELECT
id,
name,
age
from {}:{}.{}
""".format(projectid, datasetname, tablename)
## 新たなパイプラインを生成
q = beam.Pipeline(options=options)
# クエリを実行
lines = q | 'ReadFromBigQuery' >> beam.io.Read(beam.io.BigQuerySource(query=query, use_standard_sql=True))
# 別のテーブルに書き込む
lines | 'WriteToBQ' >> beam.io.Write(
beam.io.BigQuerySink(project=projectid, dataset=datasetname, table=tablename2,
schema='id:INTEGER,name:STRING,age:INTEGER',
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE))
# 実行する
q.run().wait_until_finish()
全体
上記合わせて、こんな感じになりました。
import argparse
import sys
from datetime import datetime
from apache_beam.io.gcp import gcsio
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions, GoogleCloudOptions, StandardOptions
projectid = 'myproject'
datasetname = 'mydataset'
tablename = 'mytable'
tablename2 = 'mytable2'
default_input = 'gs://gcs_input_bucket/*.csv'
archiveBucket = 'gs://gcs_archive-bucket'
staging_location = 'gs://gcs_dataflow_bucket/code/'
temp_location = 'gs://gcs_dataflow_bucket/tmp/'
def pipeline1(options=None, known_args=None):
"""
GCSからCSVを読み込み、BQに投げ込むパイプライン
:param options:
:param known_args:
:return:
"""
# CSVデータのパース
def csvParse(elem):
import csv
text = elem.encode('utf-8')
reader = csv.reader(text.splitlines(), delimiter=',')
row = next(reader)
return {
'id': int(row[0]),
'name': row[1],
'age': row[2]
}
# パイプラインを生成
p = beam.Pipeline(options=options)
# GCSから読み込んで、CSVをパースする
# skip_header_linesで、ヘッダーをスキップ可能
# 読み込んだCSVはrows変数に格納される
rows = p | 'Read From GCS' >> beam.io.ReadFromText(known_args.input, skip_header_lines=1) \
| 'parse data' >> beam.Map(csvParse)
# rowsをBQに格納する
# schemaで、bqのテーブルのスキーマを指定する。先に作ってある場合は、
# bq show --format=prettyjson projectname:datasetname.tablename | jq '.schema.fields.name' > schema.txt
# で確認する
# create_dispositionはCREATE_NEVER、CREATE_IF_NEEDED
# から選択。write_dispositionは、WRITE_TRUNCATE、WRITE_APPEND 、WRITE_EMPTY
# から選択
rows | 'Insert to BQ.master' >> beam.io.WriteToBigQuery(project=projectid, dataset=datasetname, table=tablename,
schema='id:INTEGER,name:STRING,age:INTEGER',
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)
## 実行する
p.run().wait_until_finish()
def pipeline2(options=None, known_args=None):
"""
BQにクエリを投げ、別のテーブルに出力するパイプライン
:param options:
:param known_args:
:return:
"""
query = """
SELECT
id,
name,
age
from {}:{}.{}
""".format(projectid, datasetname, tablename)
## 新たなパイプラインを生成
q = beam.Pipeline(options=options)
# クエリを実行
lines = q | 'ReadFromBigQuery' >> beam.io.Read(beam.io.BigQuerySource(query=query, use_standard_sql=True))
# 別のテーブルに書き込む
lines | 'WriteToBQ' >> beam.io.Write(
beam.io.BigQuerySink(project=projectid, dataset=datasetname, table=tablename2,
schema='id:INTEGER,name:STRING,age:INTEGER',
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE))
# 実行する
q.run().wait_until_finish()
def gcs_move(src, dest):
gcsio.GcsIO().copy(src, dest)
def gcs_delete(fileName):
gcsio.GcsIO().delete(fileName)
if __name__ == '__main__':
argv = sys.argv
parser = argparse.ArgumentParser()
parser.add_argument('--input',
dest='input',
default=default_input,
help='Input file to process.')
known_args, pipeline_args = parser.parse_known_args(argv)
# PipelineOptionを設定
options = PipelineOptions(pipeline_args)
google_cloud_options = options.view_as(GoogleCloudOptions)
google_cloud_options.project = projectid
google_cloud_options.staging_location = staging_location
google_cloud_options.temp_location = temp_location
options.view_as(StandardOptions).runner = 'DataflowRunner'
# csvファイル一覧を取得
fileList = gcsio.GcsIO().glob(known_args.input)
if len(fileList) == 0:
exit()
# CSVデータの取り込み
pipeline1(options, known_args)
# GCSから読み込んだファイルのアーカイブ
timestamp = datetime.now()
for file in fileList:
filename = file.encode('utf-8')
bucket, name = gcsio.parse_gcs_path(filename)
newFileName = '{}/{}/{}_{}'.format(
archiveBucket,
timestamp.strftime("%Y/%m/%d"),
timestamp.strftime("%H%M%S"), name)
gcs_move(filename, newFileName)
gcs_delete(filename)
# BQからBQ
pipeline2(options, known_args)
実行してみます。
実行時にはGOOGLE_APPLICATION_CREDENTIALS
をちゃんと指定しましょう。
実行結果
dataflowの確認
pipeline1の実行結果が、dataflowで表示されています。
pipeline2の実行結果も。
パイプラインを2つ生成しているので、jobが2本実行されています。
CloudStorageの確認
gcsからも、実行前に存在していたデータが
inputから消えていて。
指定しているバケットに移動されています。
BigQueryの確認
BQにもテーブルが2つ存在していて
table1にデータがロードされています。
table2も。
まとめ
上記のソースでDataflowでCloudStorageからのデータ読み込み,バケットの移動、BQへのロードができました。
ただ、今回のソースだと、厳密には
取得したGCSのオブジェクトのリスト=ApacheBeamで読み込んだファイルのリスト
ではないので、
データ到着と、処理実行の時間が明確なら、まぁ問題ないかもしれませんが考慮が必要ですね。