17
13

More than 5 years have passed since last update.

Digdagでruby/python間で変数を渡す

Last updated at Posted at 2016-06-16

TreasureDataの新しいOSSのDigdagが先日パブリックリポジトリになりました。

今度、ゲームサーバ勉強会で初心者向けのデモとかをしようと思ったので、簡単なサンプルを紹介します。

DigdagはOperatorを利用することで様々な処理を簡単に呼べるようになっています。
http://www.digdag.io/operators.html

まだ、TreasureData向けのものがまだ多いのですが、このOperatorが増えると、分析エンジンとの連携が容易になるかと思います。作り方はそのうちドキュメント化されるのではないかと思います。

さて、今回の紹介では、CSVファイルをEmbulkでPostgreSQLにロードし、Rubyで集計し、その結果をPythonに渡して、Slack通知するというワークフローです。

ワークフローの内容

下記がワークフロー全体です。

_exportではワークフロー全体に対するパラメータを設定します。Rubyノスクリプトを使う場合にはここでrequireする必要があるみたいです。

+~の~の部分は任意の名前を付与できます。その下にオペレータとして、embulkやrubyやpythonを利用しています。

mydag.dig
timezone: UTC

_export:
  rb:
    require: 'tasks/myworkflow'

+dataload:
  embulk>: demo/config.yml

+pg_calc:
  rb>: MyWorkflow.pg_calc

+slack:
  py>: tasks.MyWorkflow.slack

RubyではスクリプトではMyWorkflowというクラスにpg_calcというメソッドを定義しています。
またそこではローカルのPostgreSQLに対してクエリを投げて、その結果をDigdag.env.storeに対して値を代入しています。
Digdag.env.storeを利用することでDigdagのワークフロー全体に対して変数を受け渡すことができます。

myworkflow.rb
class MyWorkflow
  require "pg"

  def pg_calc
    conn = PGconn.connect('localhost',5432,'','','postgres','takahashi','')
    q    = "select count(1) as cnt from test"
    begin
      result  = conn.exec(q)
      Digdag.env.store(query_result: result[0]['cnt'])
    ensure
      conn.finish
    end
  end
end

Pythonのスクリプトでは、slackというメソッドの引数にRubyで利用したquery_resultを引数として受け渡しています。
これにより、RubyからPythonに対して変数が渡されて、SlackにCOUNTした値が通知されます。

__init__.py
class MyWorkflow(object):
    def __init__(self):
        pass

    def slack(self, session_time = None, query_result='0'):
        import requests
        import json
        requests.post('https://hooks.slack.com/services/XXXx/XXXX/XXXXX', data = json.dumps({
          'text': "レコード件数は{result}件です".format(result=query_result),
          'username': u'digdag',
          'icon_emoji': u':ghost:',
          'link_names': 1,
        }))

ワークフローの実行

設定したワークフローを元に下記をを実行すると処理が開始され、最終的にはSlackに通知が行われます。

$digdag run mydag.dig

また、Digdagでは一度成功したタスクについては、digdag runを行っても処理はスキップされます(session_timeを設定している場合は異なる)。
そのため、再実行するときは、digdag run mydag.dig --rerunを行います。

簡単ですが、変数の受け渡し方法について説明しました。
詳細は下記を参照してみてください。
http://www.digdag.io/ruby_api.html

17
13
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
17
13