Productionでtoyama0919/fluent-plugin-exec_cronというプラグイン作って動かしています。
Fluentdのoutput pluginにはTimeslicedOutputとBufferedOutputとがあって、TimeslicedOutputで「ある程度の時刻指定」によるflushが可能です。
ただinput pluginについてはintervalを指定したflushはできますが、時刻指定は出来ません。
Inputの処理が実行される時刻は、再起動した時刻に依存します。
そこでCronによる時刻指定が可能なInput pluginを作ってみました。
似たような話だと、AWSのLambdaもCronによる時刻指定がサポートされました。((レポート) CMP407: Cron としての Lambda: AWS Lambda での呼び出しのスケジュール #reinvent | Developers.IO)
使い方
<source>
type exec_cron
tag exec_cron.example
command echo '{"a":"a"}'
format json
cron * * * * *
#cron */5 * * * * 5分おき
#cron 0 * * * * 1時間おき
</source>
<match exec_cron.example>
type stdout
</match>
実行結果はこんな感じ
2015-05-27 13:07:00 +0900 exec_cron.example: {"a":"a"}
2015-05-27 13:08:00 +0900 exec_cron.example: {"a":"a"}
2015-05-27 13:09:00 +0900 exec_cron.example: {"a":"a"}
2015-05-27 13:10:00 +0900 exec_cron.example: {"a":"a"}
2015-05-27 13:11:00 +0900 exec_cron.example: {"a":"a"}
2015-05-27 13:12:00 +0900 exec_cron.example: {"a":"a"}
時刻がピッタリ分刻みで出力されています。
もちろん、再起動しても時間はずれません。
実装的には
- in_execと処理は大体同じです。
- in_execには現在、次の処理が終わるまでshutdownを待ってしまう問題があるようです。
- intervalによる待ち時間をcron syntaxから動的に計算しています。
自分で作りたい場合
siebertm/parse-cronを使うと簡単に出来ます。
in_exec_cron.rb
def initialize
...
require 'parse-cron'
...
end
def configure(conf)
...
@cron_parser = CronParser.new(@cron)
...
end
def start
@finished = false
@thread = Thread.new(&method(:run_periodic))
end
def run_periodic
until @finished
begin
secs = @cron_parser.next(Time.now) - Time.now
sleep secs
# 処理
rescue
log.error "exec failed to run or shutdown child process", :error => $!.to_s, :error_class => $!.class.to_s
log.warn_backtrace $!.backtrace
end
end
end
まとめ
- ある程度時刻を決め打ちたい場合にいかがでしょうか
- FluentdのInput Pluginに過ぎないので、Fluentdが動いていなければ当然処理は動きません^^
- また、bufferが詰まってInputに戻ってこれない場合も同様です。