はじめに
LogstashはElastic社が出しているデータ収集、変換、および送信を行うデータ処理パイプラインツールです。本記事ではLogstashに対するオブザーバビリティとしての監視方法についての最新情報をまとめました。情報は、バージョン8.14時点でのElastic社が出している公式な監視方法に限定して紹介します。
どの監視方法を選ぶか?
Logstashの開発元のElasticが提供している監視方法はいくつかあります。監視エージェントとして何を使うか、監視機能(ダッシュボード)としてどのようなものがあるか、が選択のポイントとなります。
a. Elastic Agentをエージェントとして使用する方法(おすすめ)
こちらの場合、データがKibanaの新しいLogstash監視ダッシュボードで表示されます。また、Stack Monitoringに表示することもできます。Stack MonitoringよりもLogstash監視ダッシュボードの方がカスタマイズもできるので、今後はこちらの方が主流になると思います。
Monitoring Logstash with Elastic Agent
https://www.elastic.co/guide/en/logstash/current/monitoring-with-ea.html
b. Metricbeatをエージェントとして使用する方法
以下の2つは、MetricbeatでLogstashのテレメトリーを収集し、KibanaのStack Monitoring画面にデータが表示されます。
-
Monitoring Logstash (legacy)
https://www.elastic.co/guide/en/logstash/current/configuring-logstash.html -
Collect Logstash monitoring data with Metricbeat
https://www.elastic.co/guide/en/logstash/current/monitoring-with-metricbeat.html
c. エージェント使わない方法(Logstashから直接Elasticsearchにテレメトリーを送る)
このタイプは以下のものとなりますが、こちらv7.9.0でDeprecatedとなっているので、基本的には使わないでください。データはKibanaのStack Monitoring画面にデータが表示されます。
Collect Logstash monitoring data using legacy collectors
https://www.elastic.co/guide/en/logstash/current/monitoring-internal-collection-legacy.html
d. APIでLogstashからテレメトリーを取得する方法
Kibana以外で監視データを表示/分析したい場合、APIでデータをLogstashから取得できます。
Monitoring Logstash with APIs
https://www.elastic.co/guide/en/logstash/current/monitoring-logstash.html
おすすめのa. Elastic Agentを使った監視方法についての手順
本記事ではこのElastic Agentの方法の手順のみ案内します。
Elastic Agentのインストール方法
KibanaからIntegrationsメニューを開き、LogstashのIntegrationを見つけます。
今回はMetricsだけを収集します。URLはElastic AgentからLogstashにアクセスするURLです。今回はdocker-compose.ymlに定義されたlogstashというサービス名が使用するホスト名になります。
ここでAdd Elastic Agent to you hostsを選び、
下の赤枠の中の--url=と--enrollment-token=の値をコピーして、下で作成する.envのFLEET_URLとFLEET_ENROLLMENT_TOKENに入れます。
こちらの.envファイルを作成し、docker-compose.ymlで使う変数を外で定義します。
STACK_VERSION=8.13.2
PROJECT=logstash-monitoring
FLEET_URL=
FLEET_ENROLLMENT_TOKEN=
以下のようなdocker composeの定義を作成します。監視用のElastic AgentとLogstashを定義しています。
version: "3"
services:
monitoring-elastic-agent:
image: docker.elastic.co/beats/elastic-agent:${STACK_VERSION}
hostname: ${PROJECT}-monitoring-elastic-agent
restart: always
environment:
- FLEET_ENROLL=1
- FLEET_URL=${FLEET_URL}
- FLEET_ENROLLMENT_TOKEN=${FLEET_ENROLLMENT_TOKEN}
logstash:
image: docker.elastic.co/logstash/logstash:${STACK_VERSION}
hostname: ${PROJECT}-logstash
user: root
volumes:
- ./config/logstash.yml:/usr/share/logstash/config/logstash.yml:ro,Z
- ./pipeline:/usr/share/logstash/pipeline:ro,Z
ports:
- "5044:5044"
- "9600:9600"
environment:
LS_JAVA_OPTS: -Xms256m -Xmx256m
Logstashの設定ファイルとして、以下のファイルを作成します。
http.host: "0.0.0.0"
monitoring.enabled: false
Logstashのパイプラインファイルとして、以下のファイルを作成します。/tmp/test.logに書き込まれたログをinputで受け取り、それをoutputとしてLogstashの標準出力に出すというシンプルな定義で最初にテストしてみます。
input {
file {
path => "/tmp/test.log"
id => "/tmp/test.log"
}
}
output {
stdout {
codec => rubydebug
id => "stdout"
}
}
最終的には、今回は以下のようなファイル構成になります。
.
├── config
│ └── logstash.yml
├── docker-compose.yml
└── pipeline
└── logstash.conf
3 directories, 3 files
docker-compose up
で起動しましょう。
以下のところが緑になれば、Elastic Agentからの疎通ができたことがわかります。
今回のLogstash integrationで用意されているダッシュボードは、IntegrationsのAssetsタブで確認できます。
[Metrics Logstash] Logstash Overview
というダッシュボードを開きます。まだ何もデータ処理をしていないので、イベントの送受信が0件です。
では、さきほどDockerで起動したLogstashのターミナルにアタッチして、(docker-compose exec -it logstash bashなどでアタッチ)、/tmp/test.logに対して3行の書き込みを行ってみます。
% docker-compose exec -it logstash bash
root@logstash-monitoring-logstash:/usr/share/logstash# ls
bin data jdk logstash-core pipeline x-pack
config Gemfile JDK_VERSION logstash-core-plugin-api tools
CONTRIBUTORS Gemfile.lock lib modules vendor
root@logstash-monitoring-logstash:/usr/share/logstash# date >> /tmp/test.log
root@logstash-monitoring-logstash:/usr/share/logstash# date >> /tmp/test.log
root@logstash-monitoring-logstash:/usr/share/logstash# date >> /tmp/test.log
Events Recieved と Events Emittedにて、正しく送受信の数をとらえていることが確認できました。
次に、pipeline/logstash.confを変更し、ログを30秒間バッファして一つのJSONログにまとめるようなfilterを追加します。以下の内容に更新し、docker-compose restart logstash
で再起動して反映させます。
input {
file {
path => "/tmp/test.log"
id => "/tmp/test.log"
}
}
filter {
# Add a unique identifier to all events for aggregation
mutate {
add_field => { "task_id" => "aggregation_task" }
id => "add_field_task_id"
}
# Aggregate filter to combine logs
aggregate {
id => "aggregate_per30sec"
task_id => "%{task_id}" # Replace with a unique identifier for the task
code => "
map['total'] ||= 0
map['total'] += 1
map['events'] ||= []
map['events'] << event.to_hash
"
push_map_as_event_on_timeout => true
timeout => 30 # Adjust the timeout as needed
timeout_tags => ['aggregated']
timeout_code => "event.set('aggregate', map)"
}
if "aggregated" not in [tags] {
drop {
id => "drop not aggregated data"
}
}
}
output {
stdout {
codec => rubydebug
id => "stdout"
}
}
そして、/tmp/test.logにまた3行追加した結果、今度は期待通り1つのデータとして集約されたので、Events emittedが1でした。
Pipeline Details Overviewを開くと、パイプラインごとのデータの処理件数が確認できます。
Input: 3件
↓
Filter: aggregateは、Inputからの3件をrecieveし、集約した1件を追加して4件をemit。集約データ以外の3件はdropに処理が行きました。
↓
Output: 集約されたデータ1件が出力されました。
より実践的なテスト
input {
file {
path => "/tmp/test.log"
id => "/tmp/test.log"
}
}
output {
elasticsearch {
hosts => ["xxx"]
api_key => "xxx"
index => "logstash-logs-%{+YYYY.MM.dd}"
}
}
3件のログがElasticsearchのIndexに入ったことが確認できました。
次、Elasticsearchへの通信を遮断しました。どう見えるでしょうか?
通信が遮断されている間、Emittedが増えていないのが確認できますね。
通信を復活させてしばらくすると、Logstashの自動リトライにより、3件がElasticsearchに追加され、合計6件のEmitカウントとなりました。
メトリックとしては、このように出力(emit)の遅延が捉えられています。
おわり
Elastic Agentを使ってLogstashを監視するとLogstashを流れるデータも入力、出力の数値もわかり、稼働状況の可視化が可能で、Logstashの運用にも安心感が出ると思います。ぜひお試しください!