LoginSignup
1
1

More than 5 years have passed since last update.

Luigiで毎時BigQueryのテーブルに出力するときの注意点

Last updated at Posted at 2016-10-14

処理 -> Local -> Local to GCS -> GCS to BigQueryというジョブを作成した

Luigi逆引きリファレンス
こちらの投稿を参考にさせていただき、
毎時処理結果をローカル -> GCS -> BigQueryというジョブを作りました。

import luigi
import luigi_bigquery
import pandas as pd
from luigi.contrib.gcs import GCSClient, GCSTarget
from lib.gcp_client import GCPClient, BigqueryLoadTaskEx
from luigi.contrib.bigquery import BigqueryTarget, CreateDisposition, WriteDisposition, SourceFormat

class LoadToGcs( luigi_bigquery.Query ):

    time      = luigi.DateHourParameter()
    lconfig   = luigi.configuration.get_config()

    def requires( self ):
        # 前のジョブ。ローカルにTSV形式でファイルを出力している
        return Calculate( time = self.time)

    def output( self ):
        path = 'gs://test_xxx/output' + self.time.strftime( '/%Y/%m/%d/%H' ) + '/output.txt'
        client = GCSClient( oauth_credentials = GCPClient( self.lconfig ).get_credentials() )
        return GCSTarget( path, client = client )

    def run( self ):
        with self.input().open('r') as input:
            results = pd.read_csv( input,  sep='\t' )

        with self.output().open('w') as output:
            results.to_csv( output, index=False, encoding='utf8' )


class LoadToTable( BigqueryLoadTaskEx ):

    time               = luigi.DateHourParameter()

    source_format      = SourceFormat.CSV
    # 毎時テーブルにappendしていってほしい
    write_disposition  = WriteDisposition.WRITE_APPEND
    create_disposition = CreateDisposition.CREATE_IF_NEEDED
    max_bad_records    = 0
    skip_leading_rows  = 1

    def requires( self ):
        # GCSからファイルを読む
        return LoadToGcs( time = self.time, )

    @property
    def schema_json( self ):
        return 'schemas/xxxx.json'

    def source_uris(self):
        return [ self.input().path ]

    def output(self):

        return BigqueryTarget(
                project_id = 'test_project',
                dataset_id = 'test_dataset',
                # テーブル名のイメージ: test_table_20161013
                table_id   = self.time.strftime( 'test_table' + '_%Y%m%d' ),
                client     = self.get_client()
        )

Luigiの BigqueryLoadTask を使うとコードがすごいシンプルになり感動しました。

つまずいた点

1日のテーブルに0時台しかアップロードされない

なぜか

TargetがBigQueryだから。

BigQueryの仕様上、テーブル名に日付を入れるときは末尾に _%Y%m%d とすることが推奨される
理由はドロップダウンにまとめてくれるため、テーブル数が大量になる場合にUI的にとても見やすくなるため。
参考:http://tech.vasily.jp/entry/bigquery_data_platform

1日1テーブルに対し、24回追加の処理が走ることになる。
しかしoutputが BigqueryTarget であるため、 1時以降のappendする場合(テーブルが存在する場合)、実行済みとみなしロードすることなくジョブが終了してしまう。

write_disposition  = WriteDisposition.WRITE_APPEND

これを書いておけばappendされると思っていたけど、Targetをまず見にいくというのはLuigiの仕様上絶対。(当たり前か)

table_id   = self.time.strftime( 'test_table' + '_%Y%m%d%H' )

のように、時間までテーブル名に入れてしてしまうのが一番手っ取り早い解決策だけど、1日24個づつテーブルが増え続ける上に
ドロップダウンにならないのでBigQuery上がかなり汚くなるのでやりたくないですね。

BigqueryClientを使ってBigQueryにロードする方法

TargetをローカルやGCSにして空ファイルを作成するようにする


# 普通のluigi.Taskを使う
class LoadToTable( luigi.Task ):

    time      = luigi.DateHourParameter()
    lconfig   = luigi.configuration.get_config()

    @property
    def schema_json( self ):
        return 'schemas/xxxx.json'

    def requires( self ):
        return LoadToGcs( time = self.time, )

    def run( self ):
        # BigqueryClientを使う
        bq_client = BigqueryClient(oauth_credentials=GCPClient(self.lconfig).get_credentials())
        with open( self.schema_json, 'r' ) as f:
            schema = json.load( f )

        project_id = 'test_project'
        dataset_id = 'test_dataset'
        table_id = 'test_table'

        job = {
            'configuration': {
                'load': {
                    'sourceUris': [
                        self.input().path
                    ],
                    'schema': {
                        'fields': schema
                    },
                    'destinationTable': {
                        'projectId': project_id,
                        'datasetId': dataset_id,
                        'tableId': table_id
                    },
                    'sourceFormat': SourceFormat.CSV,
                    'writeDisposition': WriteDisposition.WRITE_APPEND,
                    # 元データの1行目がカラム名なのでつける
                    'skipLeadingRows': 1,
                    'allowQuotedNewlines': 'true'
                }
            }
        }

        # BigQueryにロードする
        bq_client.run_job(project_id, job, dataset=BQDataset(project_id=project_id, dataset_id=dataset_id))

        # 空ファイル作成
        if not self.dry:
            self.output().open('w').close()

    def output( self ):
        output_path = os.path.join(
                '/tmp/work', # 作業ディレクトリ
                self.time.strftime( '%Y-%m-%d' ),        # 日付
                self.time.strftime( '%H' ),              # 時間
                str( self )                              # タスク名
        )
        return luigi.LocalTarget( output_path )

Targetはローカルにジョブ実行の証拠としての空ファイルをおくだけ。
BigqueryLoadTask に頼らない
これでできました。

1
1
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
1
1