Edited at

Digdagでruby/python間で変数を渡す

More than 3 years have passed since last update.

TreasureDataの新しいOSSのDigdagが先日パブリックリポジトリになりました。

今度、ゲームサーバ勉強会で初心者向けのデモとかをしようと思ったので、簡単なサンプルを紹介します。

DigdagはOperatorを利用することで様々な処理を簡単に呼べるようになっています。

http://www.digdag.io/operators.html

まだ、TreasureData向けのものがまだ多いのですが、このOperatorが増えると、分析エンジンとの連携が容易になるかと思います。作り方はそのうちドキュメント化されるのではないかと思います。

さて、今回の紹介では、CSVファイルをEmbulkでPostgreSQLにロードし、Rubyで集計し、その結果をPythonに渡して、Slack通知するというワークフローです。


ワークフローの内容

下記がワークフロー全体です。

_exportではワークフロー全体に対するパラメータを設定します。Rubyノスクリプトを使う場合にはここでrequireする必要があるみたいです。

+~の~の部分は任意の名前を付与できます。その下にオペレータとして、embulkやrubyやpythonを利用しています。


mydag.dig

timezone: UTC

_export:
rb:
require: 'tasks/myworkflow'

+dataload:
embulk>: demo/config.yml

+pg_calc:
rb>: MyWorkflow.pg_calc

+slack:
py>: tasks.MyWorkflow.slack


RubyではスクリプトではMyWorkflowというクラスにpg_calcというメソッドを定義しています。

またそこではローカルのPostgreSQLに対してクエリを投げて、その結果をDigdag.env.storeに対して値を代入しています。

Digdag.env.storeを利用することでDigdagのワークフロー全体に対して変数を受け渡すことができます。


myworkflow.rb

class MyWorkflow

require "pg"

def pg_calc
conn = PGconn.connect('localhost',5432,'','','postgres','takahashi','')
q = "select count(1) as cnt from test"
begin
result = conn.exec(q)
Digdag.env.store(query_result: result[0]['cnt'])
ensure
conn.finish
end
end
end


Pythonのスクリプトでは、slackというメソッドの引数にRubyで利用したquery_resultを引数として受け渡しています。

これにより、RubyからPythonに対して変数が渡されて、SlackにCOUNTした値が通知されます。


__init__.py

class MyWorkflow(object):

def __init__(self):
pass

def slack(self, session_time = None, query_result='0'):
import requests
import json
requests.post('https://hooks.slack.com/services/XXXx/XXXX/XXXXX', data = json.dumps({
'text': "レコード件数は{result}件です".format(result=query_result),
'username': u'digdag',
'icon_emoji': u':ghost:',
'link_names': 1,
}))



ワークフローの実行

設定したワークフローを元に下記をを実行すると処理が開始され、最終的にはSlackに通知が行われます。

$digdag run mydag.dig

また、Digdagでは一度成功したタスクについては、digdag runを行っても処理はスキップされます(session_timeを設定している場合は異なる)。

そのため、再実行するときは、digdag run mydag.dig --rerunを行います。

簡単ですが、変数の受け渡し方法について説明しました。

詳細は下記を参照してみてください。

http://www.digdag.io/ruby_api.html