The jdbc input sets max_work_threads for the Rufus scheduler to one. If there is no available worker thread then trigger_queue does nothing, so that instance of the job will never be run. It will wait until the next time the queue should be triggered.
「JDBC input pluginではスキップされる」という記載をみました。これでは定期実行を当てにして「6分前~現在を5分毎」などの設定をしていると、filter処理で遅延が生じた場合など次の定期実行までにパイプラインの処理が終わらないとデータの欠損が生じてしまいます。(実際にはJDBC input pluginであれば、:sql_last_value
を使う方法が推奨されています。)
Elasticsearch input pluginでどうなるか、試してみました。
環境:
i5-7500K
Windows Pro
pipeline.workers: 4 (logstash.yml)
input {
elasticsearch {
hosts => ""
query => '{"sort": [ "_doc" ] }'
index => ""
user => ""
password => ""
target => "message"
schedule => "/5 * * * * *"
}
}
filter {
mutate {
remove_field => "message"
}
ruby {
code => "sleep 10
event.set('running_time', Time.now.strftime('%Y-%m-%d %H:%M:%S'))
"
}
}
output {
stdout {
codec => rubydebug {metadata => true}
}
}
{
"@timestamp" => 2022-02-10T11:58:15.225Z,
"@version" => "1",
"running_time" => "2022-02-10 20:58:25"
}
{
"@timestamp" => 2022-02-10T11:58:20.186Z,
"@version" => "1",
"running_time" => "2022-02-10 20:58:30"
}
{
"@timestamp" => 2022-02-10T11:58:25.160Z,
"@version" => "1",
"running_time" => "2022-02-10 20:58:35"
}
{
"@timestamp" => 2022-02-10T11:58:30.132Z,
"@version" => "1",
"running_time" => "2022-02-10 20:58:40"
}
{
"@timestamp" => 2022-02-10T11:58:35.062Z,
"@version" => "1",
"running_time" => "2022-02-10 20:58:45"
}
{
"@timestamp" => 2022-02-10T11:58:40.028Z,
"@version" => "1",
"running_time" => "2022-02-10 20:58:50"
}
{
"@timestamp" => 2022-02-10T11:58:45.273Z,
"@version" => "1",
"running_time" => "2022-02-10 20:58:55"
}
ちゃんと5秒おきに実行してくれているようです。キューがあふれるなどが無ければ、取りこぼすことなく実行できそうです。