処理 -> Local -> Local to GCS -> GCS to BigQueryというジョブを作成した
Luigi逆引きリファレンス
こちらの投稿を参考にさせていただき、
毎時処理結果をローカル -> GCS -> BigQueryというジョブを作りました。
import luigi
import luigi_bigquery
import pandas as pd
from luigi.contrib.gcs import GCSClient, GCSTarget
from lib.gcp_client import GCPClient, BigqueryLoadTaskEx
from luigi.contrib.bigquery import BigqueryTarget, CreateDisposition, WriteDisposition, SourceFormat
class LoadToGcs( luigi_bigquery.Query ):
time = luigi.DateHourParameter()
lconfig = luigi.configuration.get_config()
def requires( self ):
# 前のジョブ。ローカルにTSV形式でファイルを出力している
return Calculate( time = self.time)
def output( self ):
path = 'gs://test_xxx/output' + self.time.strftime( '/%Y/%m/%d/%H' ) + '/output.txt'
client = GCSClient( oauth_credentials = GCPClient( self.lconfig ).get_credentials() )
return GCSTarget( path, client = client )
def run( self ):
with self.input().open('r') as input:
results = pd.read_csv( input, sep='\t' )
with self.output().open('w') as output:
results.to_csv( output, index=False, encoding='utf8' )
class LoadToTable( BigqueryLoadTaskEx ):
time = luigi.DateHourParameter()
source_format = SourceFormat.CSV
# 毎時テーブルにappendしていってほしい
write_disposition = WriteDisposition.WRITE_APPEND
create_disposition = CreateDisposition.CREATE_IF_NEEDED
max_bad_records = 0
skip_leading_rows = 1
def requires( self ):
# GCSからファイルを読む
return LoadToGcs( time = self.time, )
@property
def schema_json( self ):
return 'schemas/xxxx.json'
def source_uris(self):
return [ self.input().path ]
def output(self):
return BigqueryTarget(
project_id = 'test_project',
dataset_id = 'test_dataset',
# テーブル名のイメージ: test_table_20161013
table_id = self.time.strftime( 'test_table' + '_%Y%m%d' ),
client = self.get_client()
)
Luigiの BigqueryLoadTask
を使うとコードがすごいシンプルになり感動しました。
つまずいた点
1日のテーブルに0時台しかアップロードされない
なぜか
TargetがBigQueryだから。
BigQueryの仕様上、テーブル名に日付を入れるときは末尾に _%Y%m%d
とすることが推奨される
理由はドロップダウンにまとめてくれるため、テーブル数が大量になる場合にUI的にとても見やすくなるため。
参考:http://tech.vasily.jp/entry/bigquery_data_platform
1日1テーブルに対し、24回追加の処理が走ることになる。
しかしoutputが BigqueryTarget
であるため、 1時以降のappendする場合(テーブルが存在する場合)、実行済みとみなしロードすることなくジョブが終了してしまう。
write_disposition = WriteDisposition.WRITE_APPEND
これを書いておけばappendされると思っていたけど、Targetをまず見にいくというのはLuigiの仕様上絶対。(当たり前か)
table_id = self.time.strftime( 'test_table' + '_%Y%m%d%H' )
のように、時間までテーブル名に入れてしてしまうのが一番手っ取り早い解決策だけど、1日24個づつテーブルが増え続ける上に
ドロップダウンにならないのでBigQuery上がかなり汚くなるのでやりたくないですね。
BigqueryClientを使ってBigQueryにロードする方法
TargetをローカルやGCSにして空ファイルを作成するようにする
# 普通のluigi.Taskを使う
class LoadToTable( luigi.Task ):
time = luigi.DateHourParameter()
lconfig = luigi.configuration.get_config()
@property
def schema_json( self ):
return 'schemas/xxxx.json'
def requires( self ):
return LoadToGcs( time = self.time, )
def run( self ):
# BigqueryClientを使う
bq_client = BigqueryClient(oauth_credentials=GCPClient(self.lconfig).get_credentials())
with open( self.schema_json, 'r' ) as f:
schema = json.load( f )
project_id = 'test_project'
dataset_id = 'test_dataset'
table_id = 'test_table'
job = {
'configuration': {
'load': {
'sourceUris': [
self.input().path
],
'schema': {
'fields': schema
},
'destinationTable': {
'projectId': project_id,
'datasetId': dataset_id,
'tableId': table_id
},
'sourceFormat': SourceFormat.CSV,
'writeDisposition': WriteDisposition.WRITE_APPEND,
# 元データの1行目がカラム名なのでつける
'skipLeadingRows': 1,
'allowQuotedNewlines': 'true'
}
}
}
# BigQueryにロードする
bq_client.run_job(project_id, job, dataset=BQDataset(project_id=project_id, dataset_id=dataset_id))
# 空ファイル作成
if not self.dry:
self.output().open('w').close()
def output( self ):
output_path = os.path.join(
'/tmp/work', # 作業ディレクトリ
self.time.strftime( '%Y-%m-%d' ), # 日付
self.time.strftime( '%H' ), # 時間
str( self ) # タスク名
)
return luigi.LocalTarget( output_path )
Targetはローカルにジョブ実行の証拠としての空ファイルをおくだけ。
BigqueryLoadTask
に頼らない
これでできました。