TreasureDataの新しいOSSのDigdagが先日パブリックリポジトリになりました。
今度、ゲームサーバ勉強会で初心者向けのデモとかをしようと思ったので、簡単なサンプルを紹介します。
DigdagはOperatorを利用することで様々な処理を簡単に呼べるようになっています。
http://www.digdag.io/operators.html
まだ、TreasureData向けのものがまだ多いのですが、このOperatorが増えると、分析エンジンとの連携が容易になるかと思います。作り方はそのうちドキュメント化されるのではないかと思います。
さて、今回の紹介では、CSVファイルをEmbulkでPostgreSQLにロードし、Rubyで集計し、その結果をPythonに渡して、Slack通知するというワークフローです。
ワークフローの内容
下記がワークフロー全体です。
_export
ではワークフロー全体に対するパラメータを設定します。Rubyノスクリプトを使う場合にはここでrequireする必要があるみたいです。
+~
の~の部分は任意の名前を付与できます。その下にオペレータとして、embulkやrubyやpythonを利用しています。
timezone: UTC
_export:
rb:
require: 'tasks/myworkflow'
+dataload:
embulk>: demo/config.yml
+pg_calc:
rb>: MyWorkflow.pg_calc
+slack:
py>: tasks.MyWorkflow.slack
RubyではスクリプトではMyWorkflowというクラスにpg_calcというメソッドを定義しています。
またそこではローカルのPostgreSQLに対してクエリを投げて、その結果をDigdag.env.store
に対して値を代入しています。
Digdag.env.store
を利用することでDigdagのワークフロー全体に対して変数を受け渡すことができます。
class MyWorkflow
require "pg"
def pg_calc
conn = PGconn.connect('localhost',5432,'','','postgres','takahashi','')
q = "select count(1) as cnt from test"
begin
result = conn.exec(q)
Digdag.env.store(query_result: result[0]['cnt'])
ensure
conn.finish
end
end
end
Pythonのスクリプトでは、slackというメソッドの引数にRubyで利用したquery_result
を引数として受け渡しています。
これにより、RubyからPythonに対して変数が渡されて、SlackにCOUNTした値が通知されます。
class MyWorkflow(object):
def __init__(self):
pass
def slack(self, session_time = None, query_result='0'):
import requests
import json
requests.post('https://hooks.slack.com/services/XXXx/XXXX/XXXXX', data = json.dumps({
'text': "レコード件数は{result}件です".format(result=query_result),
'username': u'digdag',
'icon_emoji': u':ghost:',
'link_names': 1,
}))
ワークフローの実行
設定したワークフローを元に下記をを実行すると処理が開始され、最終的にはSlackに通知が行われます。
$digdag run mydag.dig
また、Digdagでは一度成功したタスクについては、digdag runを行っても処理はスキップされます(session_timeを設定している場合は異なる)。
そのため、再実行するときは、digdag run mydag.dig --rerun
を行います。
簡単ですが、変数の受け渡し方法について説明しました。
詳細は下記を参照してみてください。
http://www.digdag.io/ruby_api.html