Edited at

DigdagでembulkとBigQueryの黄金コンビがさらに輝く

More than 1 year has passed since last update.


はじめに

CYBIRDエンジニア Advent Calendar 2016、今年も16日目担当の@yuichi_komatsuです。

データ分析のエンジニアをしています。

一緒に精進できる仲間も絶賛募集中です!

興味がある方はこちら!!

昨日は@sakamoto_kojiさんの「サブスクリプションのサーバサイド開発で得た知見」でした。

まさに現場で奮闘してきたからこその実用的で貴重なTipsですね!

すばらしい!!

では本題です。


今回のネタ

去年は「embulkとBigQueryが黄金コンビすぎる話」を書きましたが(こちらはコメントを記載する際に誤って削除してしまいました・・・すみません)、

そこに新戦力である

Digdagでワークフローを組むことによってembulk、BigQueryコンビが一段とパワーアップする!

という話です。


Digdagとは?

穴掘りゲームではありません。

世界のTreasure DataさんのOSSでワークフローエンジンです。

Jenkinsは弊社の多くの部署で使用していますが、それとは異なりGUIはなく(開発中?)digというファイルをYAML的な記述で作成しJOBを実行します。

同様のプロダクトとしてLuigi、AirFlowなどがあり、Luigiは部内でも一時利用していたこともありましたが、それに比べると非常に直感的で迷うことがなく、柔軟性もあるように感じています(個人的にはですが)。

luigiのようにPython力も必要ないですし。。

Digdagのインストール含めたドキュメントはこちらを参照してもらえればと思います。


使い方(モード)

・ローカルモード

・サーバモード

・クライアントモード

がありますが、弊社では今のところローカルモードで十分に要件を満たしているためサーバ1台で実行しています。

今回は弊社分析チーム内での使い方(一部)をご紹介したいと思います。

※基本的な使い方などはこちらのドキュメントを参照して下さい。

ということで、いきなりですが設定例です。


設定例(親digファイル:main.dig)

timezone: Asia/Tokyo

schedule:
daily>: 1:00:00

+main:
_export:
host: 'XXX.XXX.XXX.XXX'
user: 'hoge'
password: 'hoge_password'
database: 'testdb'
project_id: 'sample_project'
dataset: 'hoge_dataset'

+date:
py>: date.SetDate.set_date

+all_load:
_parallel: true

+load_log:
!include : 'hoge/log.dig'
+load_user:
!include : 'hoge/user.dig'
+load_master:
!include : 'hoge/master.dig'

これは弊社のあるゲームのDB(MySQL)ログをembulkでBigQueryにロードする際に使用しているdigファイルで、共通部分を定義する親のdigファイルとなります。

予め./digdag scheduler &でスケジューラーをバックグラウンドで実行しておき、上記のようにschedule:を設定することによってJOBがスケジューリングされます。

+mainタスク以下ではまず_export:でそれ以降に使用する変数を定義しています。

ここではembulkのInputで使用するMySQLのアクセス情報やOutputで使用するBigQueryのproject_id、datasetなどを定義しています。

+datepy>:ではPythonでターゲットとする日付を取得しています。

ゲームによってDBに格納している日付データがunixtime、datetimeと異なるため、どちらでも指定できるようにしています。

参考までにこちらのPythonスクリプトも記載しておきます。


__init__.py

# -*- coding: utf-8 -*-

import digdag
import time
from datetime import datetime,timedelta
from pytz import timezone

class SetDate(object):
def set_date(self, target_date = ''):
# target_dateの引数がある場合
if target_date:
# 開始条件
start_datetime = datetime.strptime(target_date, '%Y-%m-%d')
# 終了条件
end_datetime = datetime.strptime(target_date, '%Y-%m-%d') + timedelta(days=1)
# target_dateの引数がない場合
else:
# 現在時刻
utc_now = datetime.now(timezone('UTC'))
jst_now = datetime.now(timezone('Asia/Tokyo'))
# 該当日(1日前)
target_date = (jst_now - timedelta(days=1)).strftime('%Y-%m-%d')
# 開始条件
start_datetime = datetime.strptime((jst_now - timedelta(days=1)).strftime('%Y-%m-%d'), '%Y-%m-%d')
# 終了条件
end_datetime = datetime.strptime(jst_now.strftime('%Y-%m-%d'), '%Y-%m-%d')

# unixtimeに変換
start_unixtime = int(time.mktime(start_datetime.timetuple()))
end_unixtime = int(time.mktime(end_datetime.timetuple()))

# str変換
start_datetime = str(start_datetime)
end_datetime = str(end_datetime)

# 環境変数に設定
digdag.env.store({"target_date": target_date, "start_unixtime": start_unixtime, "end_unixtime": end_unixtime, "start_datetime": start_datetime, "end_datetime": end_datetime})


import digdagをし、digdag.env.storeをすることによって設定した値を環境変数として使えるようになります。

ここではembulkのyml、Chatwork連携のスクリプト内で使用する日付データを取得しています。

スクリプトはdigdag実行ディレクトリの配下に__init__.pyで配置します。

例ではdate/__init__.pyで配置しています。

親digファイルに戻ると、

+all_loadでは_parallel:でtrueを設定することによって以下の子タスクを並列に実行します。

!include :で他のdigファイルをロードするといったことも可能です。

ここではlog.diguser.digmaster.digを並列で動作させています。

下記はlog.digのサンプルです。


設定例(子digファイル:log.dig)

+log:

_export:
#----------------#
# Config by TYPE #
#----------------#
process: 'log'

+sample1_log:
_export:
table: 'sample1_log'
_error:
_export:
status: 'ERROR'
py>: info.ChatWork.post
embulk>: hoge/yml/log/sample1_log.yml

+sample2_log:
_export:
table: 'sample2_log'
_error:
_export:
status: 'ERROR'
py>: info.ChatWork.post
embulk>: hoge/yml/log/sample2_log.yml

#(中略)

+post:
# SUCCESS info to Chatwork
_export:
job: 'ALL'
status: 'SUCCESS'
py>: info.ChatWork.post

+sample1_log+sample2_log_export:table変数を設定し、embulkを実行しています。

設定した変数はembulkのyml内で使用しています。

またそこでエラーが発生した場合はpy>: info.ChatWork.postでChatWorkにポストし、どのタスクでエラーが発生したか判別できるようにしています。

JOB自体もエラーが発生すれば終了となります。

digdagはセッションを管理してくれていて、同じセッションで実行する場合はそのままdigdag run main.digでエラー箇所までスキップしてくれます。

セッションを無視して最初から実行する場合はdigdag run main.dig -aとなります。

この辺の仕様もドキュメントを参照してもらえればと思います。

なお、例ではtarget_dateを引数に設定できるようにしているので、digdag run main.dig -p target_date=2016-12-10といった指定も可能です。

embulkのymlサンプル(input:MySQL、output:BigQuery)は下記となります。

in:

type: mysql
host: ${host}
user: ${user}
password: "${password}"
database: ${database}
table: ${table}
select: "id,action,zip_url,zip_hash,zip_created_at,zip_size,summary_flg ,image_quality,created_at,updated_at"
where: created_at >= ${start_unixtime} AND created_at < ${end_unixtime}
out:
type: bigquery
mode: append
auth_method: json_key
json_keyfile: /home/hoge/embulk/keyfile/json_keyfile.json
path_prefix: /home/hoge/embulk/tmp/${dataset}/${table}
source_format: CSV
compression: GZIP
project: ${project_id}
dataset: ${dataset}
auto_create_table: true
table: ${table}
schema_file: /home/hoge/embulk/schema/${dataset}/${table}.json
delete_from_local_when_job_end: true

変数は${変数名}で参照できます。

ここではSELECTでカラムを指定しているためテーブル毎にymlファイルを参照する形にしていますが、全カラムを選択するような場合は一つのテンプレートで賄うことができるのでよりシンプルな構成にできると思います。

BigQueryのデータセットやテーブルのパーティションなども必要に応じて動的に変えることもできます。


その他

弊社分析チームでは使ってはいないのですが、Ver 0.8.18以降ではbq>bq_load>gcs_wait>といったオペレータが使えるようになったのでBigQueryにロードする際の選択の幅が広がったように思います。

まあオペレータは自作も可能なようなのでその意味では何でもできると言えますが。。


まとめ

Digdagは親子関係や依存関係がシンプルかつ直感的に定義でき、当然ながらembulkとの相性もバッチリで、動的に変数を取得して設定することにより簡潔で柔軟なワークフロー処理を行うことが出来ます!

キャプテン翼で例えると、Digdagは言わば周囲との連携をそつなくこなす三杉くんみたいな存在でしょうか。


最後に

明日のCYBIRDエンジニア Advent Calendar 2016、17日目は@cy-nana-obataさんです。

新卒ならではの若さと希望に満ちたネタを披露してくれることでしょう!?楽しみです!!!

また、弊社で提供しているサッカー育成ゲーム「BFB Champions」では現在『キャプテン翼』とタイアップしており、翼くん、岬くんの本家黄金コンビをイレブンに加えてプレイすることができるので、まだプレイしていない方は是非やってみて下さい!

三杉くんもいますよ!!