LoginSignup
2
3

More than 5 years have passed since last update.

ワークフローエンジン考察

Last updated at Posted at 2017-11-21

最近、

ワークフローエンジンを触る機会があって思うところがあったので書いてみた話。

DSLつらい

digdag のスクリプトは、ordereddict的な序列付きのyaml拡張を採用してて、
kuroko2 は自前でスクリプトパースしているのを見て、
なんかいい方法ないかなー、とか考えてたら、html(というか雑なxml)が使えそうな気がした。

htmlってなにげに有能で、

  • 階層構造に制限がない
  • 子の要素と独立して親にも属性持たせられる
  • 階層内にちゃんと序列がある(xpath便利)
  • たいていの言語で扱える

とここまで書いて、javaはxmlを採用して大顰蹙だったのを思い出した…
ので、hamlとかで書いてhtmlに変換してしまえばいいんじゃないか、と。

コード

というわけで、構想1日書いたの1日くらいでこんな感じに。
タスクのID代わりにxpathがそのまま使える。便利。

再開機能どうしようか悩んだけど、embulk みたいにレジュームファイル残すアプローチにしようかと思った。
けど、どうせhtmlだし、xpathをキーにして実行時の環境変数丸ごとjsonにしてrootの属性に入れてしまうか、みたいな。

rexmlが最近の書き方に対応してなくて微妙だったけど(REXML::Elements#eachEnumerable 返さないとか)、
かといってNokogiriも使いたくなかったので、まぁ。
と思ったら html2haml がNokogiri使ってた…

workflow_haml.rb
require 'haml'
require 'html2haml'
require 'json'
require 'rexml/document'
require 'open3'

class WorkflowHaml
  FORK_POOL_SIZE = 4
  OPEN3_DEFAULT_OPTS = { unsetenv_others: true, pgroup: true }

  DEFAULT_PLUGINS = {
    echo: ->(el, env, w) {
      w.puts el.text.strip
    },

    sleep: ->(el, env, w) {
      sec = el["sec"].to_f
      sleep(sec)
    },
  }

  def initialize(reader,
                 pool_size: FORK_POOL_SIZE,
                 system_output: STDOUT,
                 command_output: STDOUT,
                 allow_eval: false
                )
    @pool = SizedQueue.new(pool_size)
    @system_output = system_output
    @command_output = command_output
    @allow_eval = allow_eval

    @plugins = DEFAULT_PLUGINS.dup
    haml = Haml::Engine.new(reader)
    @doc = REXML::Document.new(haml.render)
  end

  def add_plugins(kwargs)
    @plugins.merge!(kwargs)
  end

  def error
    @e
  end

  def error?
    ! @e.nil?
  end

  def to_haml
    Html2haml::HTML.new(@doc.to_s).render
  end

  def perform(rerun: false)
    @r_sys, @w_sys = IO.pipe
    @r_cmd, @w_cmd = IO.pipe
    @q = Queue.new
    @e = nil
    @resumes = if rerun || @doc.root.attribute("resumes").nil?
                {}
              else
                JSON.parse(@doc.root["resumes"])
              end

    th = Thread.start do
      begin
        __perform(@doc.root)
        nil
      rescue => e
        @w_sys.puts e.inspect
        e
      ensure
        @w_sys.close
        @w_cmd.close

        @q.push -> { return false }
      end
    end

    th2 = Thread.start do
      while @q.pop.call do; end
    end

    @system_output.tap do |w|
      w.puts("<system output>")
      @r_sys.each_line do |x|
        w.puts(x)
      end
      w.puts("</system output>")
    end

    @command_output.tap do |w|
      w.puts("<command output>")
      @r_cmd.each_line do |x|
        w.puts(x)
      end
      w.puts("</command output>")
    end

    th2.join
    @e = th.join.value
    @doc.root.add_attribute("resumes", @resumes.to_json)
    unless self.error?
      @doc.root.add_attribute("complete", true)
    end
  end

  private

  def __perform(grp, env: {}, opts: OPEN3_DEFAULT_OPTS.dup)
    if @resumes.key?(grp.xpath)
      env.merge!(@resumes[grp.xpath])
      @w_sys.puts ["skip group", Thread.current, grp.xpath, env].to_json
      return
    end
    @w_sys.puts ["perform group", Thread.current, grp.xpath, env].to_json
    finish_task_grp = -> {
      @resumes[grp.xpath] = env.dup
      return true
    }

    grp.elements.each do |el|
      if @resumes.key?(el.xpath)
        env.merge!(@resumes[el.xpath])
        @w_sys.puts ["skip element", Thread.current, el.xpath, el, env].to_json
        next
      end
      @w_sys.puts ["perform element", Thread.current, el.xpath, el, env].to_json
      finish_task_el = -> {
        @resumes[el.xpath] = env.dup
        return true
      }

      tag = el.name.to_sym
      case tag
      when :group, :root
        __send__(__callee__, el, env: env.dup, opts: opts.dup)
      when :chdir
        opts[:chdir] = el.text.strip
      when :fork
        [].tap do |x|
          el.elements.each do |el2|
            x << Thread.start {
              begin
                @pool.push(:lock)
                __send__(__callee__, el2, env: env.dup, opts: opts.dup)
              ensure
                @pool.pop
              end
            }
          end
        end.each(&:join)
      when :env
        el.attributes.keys.each do |k|
          env[k] = el.attributes[k]
        end
      when :eval
        raise RuntimeError.new("eval not allowed!") unless @allow_eval
        code = el.text
        begin
          $stdout = $stderr = @w_cmd
          eval(code, binding, el.xpath)
        ensure
          $stdout = STDOUT
          $stderr = STDERR
        end
      when :shell
        cmd = el.text
        env2 = env.merge("XPATH" => el.xpath)
        ret = Open3.popen2e(env2, cmd, opts) { |stdin, stdout_and_stderr, thread|
          stdin.close
          @w_cmd.puts stdout_and_stderr.read
          thread.value
        }
        unless ret.exitstatus == 0
          raise RuntimeError.new(ret)
        end
      else
        unless @plugins.key?(tag)
          raise TypeError.new("unknown tag #{tag} '#{el.xpath}'")
        end
        @plugins[tag].call(el, env, @w_sys)
      end

      @q.push finish_task_el
    end

    @q.push finish_task_grp
  end
end


if __FILE__ == $0
  str = File.read('tasks.haml')

  w = WorkflowHaml.new(str)
  w.add_plugins(
    b: ->(el, env, w) {
      w.puts [el, env, "this is b"].to_json
    },
    c: ->(el, env, w) {
      w.puts [el, env, "this is c"].to_json
    },
  )
  w.perform


  STDERR.puts [w.error?, w.error].to_json
  puts w.to_haml
end

タスク

haml はこんな感じ。

tasks.haml
%root
  %env{ hoge: 123, foo: 234}
  %shell
    echo $hoge
    echo $foo
  %fork
    -# %fork 直下は %group で包む
    %group
      %env{ hoge2: 345, foo2: 456}
      %shell
        env
        echo xpath: "$XPATH"
        echo $hoge / $foo
        echo $hoge2 / $foo2
      -# カスタムタグ
      %b{ attr1: "B1", attr2: "B2" }
    %group
      %chdir /tmp
      %c{ attr1: "C1", attr2: "C2" }
  %shell
    env
    echo xpath: "$XPATH"
    echo $hoge / $foo
    echo $hoge2 / $foo2
    exit 2
  %echo finish

resumeファイル

で、読みづらいけど実行後はこう。
最後の %shell タグで exit 2 してるので、そのタスクは完了してない。

tasks.resume.haml
%root{:resumes => "{\"/root/env\":{\"foo\":\"234\",\"hoge\":\"123\"},\"/root/shell[1]\":{\"foo\":\"234\",\"hoge\":\"123\"},\"/root/fork/group[2]/chdir\":{\"foo\":\"234\",\"hoge\":\"123\"},\"/root/fork/group[2]/c\":{\"foo\":\"234\",\"hoge\":\"123\"},\"/root/fork/group[2]\":{\"foo\":\"234\",\"hoge\":\"123\"},\"/root/fork/group[1]/env\":{\"foo\":\"234\",\"hoge\":\"123\",\"foo2\":\"456\",\"hoge2\":\"345\"},\"/root/fork/group[1]/shell\":{\"foo\":\"234\",\"hoge\":\"123\",\"foo2\":\"456\",\"hoge2\":\"345\"},\"/root/fork/group[1]/b\":{\"foo\":\"234\",\"hoge\":\"123\",\"foo2\":\"456\",\"hoge2\":\"345\"},\"/root/fork/group[1]\":{\"foo\":\"234\",\"hoge\":\"123\",\"foo2\":\"456\",\"hoge2\":\"345\"},\"/root/fork\":{\"foo\":\"234\",\"hoge\":\"123\"}}"}
  %env{:foo => "234", :hoge => "123"}
  %shell
    echo $hoge
    echo $foo
  %fork
    %group
      %env{:foo2 => "456", :hoge2 => "345"}
      %shell
        env
        echo xpath: "$XPATH"
        echo $hoge / $foo
        echo $hoge2 / $foo2
      %b{:attr1 => "B1", :attr2 => "B2"}
    %group
      %chdir /tmp
      %c{:attr1 => "C1", :attr2 => "C2"}
  %shell
    env
    echo xpath: "$XPATH"
    echo $hoge / $foo
    echo $hoge2 / $foo2
    exit 2
  %echo finish

最初 golang 1.9 でpluginパッケージが darwin/amd64 にも入るって話だったので、
せっかくだからプラグイン機構的なの書いてみようかと思ったけど、なんかまだ入ってないみたいなので、
モチベーションの半分くらい削られながら結局rubyで書いた。

kuroko2 のコードが大変参考になりました。

htmlのポテンシャルを示すのが主題なので、特にgemにしたりメンテしたりする予定は無いです。

@2017/12/12
ちょっと手直し

2
3
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
2
3