ログ分析基盤に Elasticsearch / Kibana を追加してみる(構築編)
はじめに
前回の記事では、Iceberg を正本として Elasticsearch / Kibana を追加する設計について解説しました。
本記事では実際に Elasticsearch と Kibana を構築し、Iceberg のデータを日次バッチで投入するところまで実施します。
完成後の構成は以下です。
シリーズ構成
- 設計編
- 構築編(本記事)
- 実践編
- 運用編
前提条件
既存環境
Kafka
HDFS
Hive
Iceberg
Spark
対象テーブル
hive_prod.logs.syslog_iceberg
hive_prod.logs.authlog_iceberg
追加サーバ
elastic1
バッチ実行サーバ
ope1
elastic1構築
手順ではIP:192.168.11.20を使用しますが、必要に応じ変更してください。
OSはAlmaLinux9を使用してください。
スペック
| 項目 | 値 |
|---|---|
| CPU | 2 Core |
| Memory | 4GB |
| Disk | 30GB |
Elasticsearch事前設定
sudo tee /etc/sysctl.d/99-elasticsearch.conf <<'EOF'
vm.max_map_count=1048576
EOF
sudo sysctl --system
確認
sysctl vm.max_map_count
期待値
vm.max_map_count = 1048576
Docker導入
# Dockerインストール
sudo dnf install -y docker docker-compose-plugin
# Docker起動・自動起動
sudo systemctl enable --now docker
# サービス状態確認
sudo systemctl status docker
# 動作確認
sudo docker run --rm hello-world
# 一般ユーザーでdocker実行可能にする
sudo usermod -aG docker $USER
newgrp docker
# バージョン確認
docker version
docker compose version
Elasticsearch / Kibana起動
作業ディレクトリ
mkdir -p ~/elastic-kibana
cd ~/elastic-kibana
docker-compose.yml
services:
elasticsearch:
image: docker.elastic.co/elasticsearch/elasticsearch:8.15.3
container_name: elasticsearch
environment:
- discovery.type=single-node
- xpack.security.enabled=false
- ES_JAVA_OPTS=-Xms4g -Xmx4g
ports:
- "9200:9200"
volumes:
- es-data:/usr/share/elasticsearch/data
restart: unless-stopped
kibana:
image: docker.elastic.co/kibana/kibana:8.15.3
container_name: kibana
environment:
- ELASTICSEARCH_HOSTS=http://elasticsearch:9200
- I18N_LOCALE=ja-JP
ports:
- "5601:5601"
depends_on:
- elasticsearch
restart: unless-stopped
volumes:
es-data:
起動
docker compose up -d
確認
curl http://elastic1:9200
Kibana
http://elastic1:5601
Index Template作成
登録
curl -X PUT "http://elastic1:9200/_index_template/logs_custom_template" \
-H "Content-Type: application/json" \
-d '{
"index_patterns": ["logs-*"],
"priority": 1000,
"data_stream": {},
"template": {
"mappings": {
"properties": {
"@timestamp": { "type": "date" },
"dt": {
"type": "date",
"format": "strict_date||yyyy-MM-dd"
},
"hr": { "type": "integer" },
"host": { "type": "keyword" },
"program": { "type": "keyword" },
"severity": { "type": "integer" },
"msg": { "type": "text" }
}
}
}
}'
確認
curl -s "http://elastic1:9200/_index_template/logs_custom_template?pretty"
ope1準備
Connector配置
sudo mkdir -p /opt/spark/jars-extra
cd /opt/spark/jars-extra
sudo wget \
https://repo1.maven.org/maven2/org/elasticsearch/elasticsearch-spark-30_2.12/8.15.3/elasticsearch-spark-30_2.12-8.15.3.jar
ディレクトリ作成
sudo mkdir -p /opt/elastic/bin
syslog投入ジョブ
ファイル
/opt/elastic/bin/export_syslog_to_es.py
sudo tee /opt/elastic/bin/export_syslog_to_es.py <<'EOF'
import sys
from datetime import datetime
from pyspark.sql import SparkSession
if len(sys.argv) != 2:
print("Usage: export_syslog_to_es.py YYYY-MM-DD", file=sys.stderr)
sys.exit(1)
target_dt = sys.argv[1]
try:
datetime.strptime(target_dt, "%Y-%m-%d")
except ValueError:
print(f"Invalid date format: {target_dt}. expected YYYY-MM-DD", file=sys.stderr)
sys.exit(1)
index_name = "logs-syslog-" + target_dt.replace("-", ".")
spark = (
SparkSession.builder
.appName(f"syslog-to-es-{target_dt}")
.enableHiveSupport()
.getOrCreate()
)
df = spark.sql(f'''
SELECT
host,
ts AS `@timestamp`,
severity,
program,
msg,
CAST(dt AS STRING) AS dt,
hr
FROM hive_prod.logs.syslog_iceberg
WHERE dt = DATE '{target_dt}'
''')
(
df.write
.format("org.elasticsearch.spark.sql")
.mode("append")
.option("es.nodes", "192.168.11.20")
.option("es.port", "9200")
.option("es.resource", index_name)
.option("es.nodes.wan.only", "true")
.option("es.write.operation", "create")
.save()
)
spark.stop()
EOF
authlog投入ジョブ
ファイル
/opt/elastic/bin/export_authlog_to_es.py
sudo tee /opt/elastic/bin/export_authlog_to_es.py <<'EOF'
import sys
from datetime import datetime
from pyspark.sql import SparkSession
if len(sys.argv) != 2:
print("Usage: export_syslog_to_es.py YYYY-MM-DD", file=sys.stderr)
sys.exit(1)
target_dt = sys.argv[1]
try:
datetime.strptime(target_dt, "%Y-%m-%d")
except ValueError:
print(f"Invalid date format: {target_dt}. expected YYYY-MM-DD", file=sys.stderr)
sys.exit(1)
index_name = "logs-authlog-" + target_dt.replace("-", ".")
spark = (
SparkSession.builder
.appName(f"syslog-to-es-{target_dt}")
.enableHiveSupport()
.getOrCreate()
)
df = spark.sql(f'''
SELECT
host,
ts AS `@timestamp`,
severity,
program,
msg,
CAST(dt AS STRING) AS dt,
hr
FROM hive_prod.logs.authlog_iceberg
WHERE dt = DATE '{target_dt}'
''')
(
df.write
.format("org.elasticsearch.spark.sql")
.mode("append")
.option("es.nodes", "192.168.11.20")
.option("es.port", "9200")
.option("es.resource", index_name)
.option("es.nodes.wan.only", "true")
.option("es.write.operation", "create")
.save()
)
spark.stop()
EOF
実行ラッパ
export_syslog_to_es.sh
sudo tee /opt/elastic/bin/export_syslog_to_es.sh <<'EOF'
#!/usr/bin/env bash
set -euo pipefail
ES_URL="http://elastic1:9200"
# 引数があればその日付、なければ yesterday
target_dt="${1:-$(date -d 'yesterday' +%F)}"
# YYYY-MM-DD 形式チェック
if ! [[ "${target_dt}" =~ ^[0-9]{4}-[0-9]{2}-[0-9]{2}$ ]]; then
echo "[ERROR] target_dt must be YYYY-MM-DD. input=${target_dt}"
exit 1
fi
index_name="logs-syslog-${target_dt//-/.}"
echo "[INFO] target_dt=${target_dt}"
echo "[INFO] index_name=${index_name}"
tmp_file="/tmp/delete_${index_name}.json"
http_code=$(curl -s -o "${tmp_file}" -w "%{http_code}" \
-X DELETE "${ES_URL}/_data_stream/${index_name}" || true)
case "${http_code}" in
200)
echo "[INFO] deleted ${index_name}"
;;
404)
echo "[INFO] ${index_name} does not exist. continue."
;;
*)
echo "[ERROR] failed to delete ${index_name}. http_code=${http_code}"
cat "${tmp_file}" || true
exit 1
;;
esac
export SPARK_HOME=/opt/spark/current
export SPARK_CONF_DIR=/etc/spark/conf
export HADOOP_CONF_DIR=/etc/hadoop/conf
export HIVE_CONF_DIR=/etc/hive/conf
${SPARK_HOME}/bin/spark-submit \
--master spark://iceberg1:7077 \
--deploy-mode client \
--jars /opt/spark/jars-extra/elasticsearch-spark-30_2.12-8.15.3.jar \
/opt/elastic/bin/export_syslog_to_es.py "${target_dt}"
echo "[INFO] count after import"
curl -s "${ES_URL}/${index_name}/_count?pretty"
EOF
sudo chmod +x /opt/elastic/bin/export_syslog_to_es.sh
export_authlog_to_es.sh
sudo tee /opt/elastic/bin/export_authlog_to_es.sh <<'EOF'
#!/usr/bin/env bash
set -euo pipefail
ES_URL="http://elastic1:9200"
# 引数があればその日付、なければ yesterday
target_dt="${1:-$(date -d 'yesterday' +%F)}"
# YYYY-MM-DD 形式チェック
if ! [[ "${target_dt}" =~ ^[0-9]{4}-[0-9]{2}-[0-9]{2}$ ]]; then
echo "[ERROR] target_dt must be YYYY-MM-DD. input=${target_dt}"
exit 1
fi
index_name="logs-authlog-${target_dt//-/.}"
echo "[INFO] target_dt=${target_dt}"
echo "[INFO] index_name=${index_name}"
tmp_file="/tmp/delete_${index_name}.json"
http_code=$(curl -s -o "${tmp_file}" -w "%{http_code}" \
-X DELETE "${ES_URL}/_data_stream/${index_name}" || true)
case "${http_code}" in
200)
echo "[INFO] deleted ${index_name}"
;;
404)
echo "[INFO] ${index_name} does not exist. continue."
;;
*)
echo "[ERROR] failed to delete ${index_name}. http_code=${http_code}"
cat "${tmp_file}" || true
exit 1
;;
esac
export SPARK_HOME=/opt/spark/current
export SPARK_CONF_DIR=/etc/spark/conf
export HADOOP_CONF_DIR=/etc/hadoop/conf
export HIVE_CONF_DIR=/etc/hive/conf
${SPARK_HOME}/bin/spark-submit \
--master spark://iceberg1:7077 \
--deploy-mode client \
--jars /opt/spark/jars-extra/elasticsearch-spark-30_2.12-8.15.3.jar \
/opt/elastic/bin/export_authlog_to_es.py "${target_dt}"
echo "[INFO] count after import"
curl -s "${ES_URL}/${index_name}/_count?pretty"
EOF
sudo chmod +x /opt/elastic/bin/export_authlog_to_es.sh
手動投入
前日のデータを投入する場合
/opt/elastic/bin/export_syslog_to_es.sh
/opt/elastic/bin/export_authlog_to_es.sh
指定日のデータを投入する場合
/opt/elastic/bin/export_syslog_to_es.sh yyyy-mm-dd
/opt/elastic/bin/export_authlog_to_es.sh yyyy-mm-dd
確認
curl -s http://elastic1:9200/_data_stream | jq -r '.data_streams[].name'
logs-authlog(syslog)-yyyy.mm.ddのリストが出てくること。
Kibana設定
アクセス
http://elastic1:5601
Data View作成
左側のメニューからManagement-Stack Managementをクリックする。
左側のメニューからKibane-データビューをクリックし、以下のデータビューを作成する。
syslog
Index Pattern
logs-syslog-*
authlog
Index Pattern
logs-authlog-*
Timestamp
@timestamp
Dashboard例
作成例
- ホスト別ログ件数
- severity別件数
- program別件数
- 時間帯別件数
14日ローテーション
delete_old_es_indices.sh
sudo tee /opt/elastic/bin/delete_old_es_indices.sh <<'EOF'
#!/usr/bin/env bash
set -euo pipefail
ES_HOST="http://elastic1:9200"
# 引数があればその指定日分ローテーション、なければ14日分ローテーション
RETENTION_DAYS="${1:-14}"
CUTOFF=$(date -d "${RETENTION_DAYS} days ago" +%s)
for prefix in logs-syslog logs-authlog
do
curl -s "${ES_HOST}/_data_stream/${prefix}-*" \
| jq -r '.data_streams[].name' \
| while read -r ds
do
[ -z "${ds}" ] && continue
idx_date=${ds#${prefix}-}
if ! [[ "${idx_date}" =~ ^[0-9]{4}\.[0-9]{2}\.[0-9]{2}$ ]]; then
echo "[WARN] skip invalid data stream name: ${ds}"
continue
fi
epoch=$(date -d "${idx_date//./-}" +%s)
if [ "${epoch}" -lt "${CUTOFF}" ]; then
echo "[INFO] delete data stream: ${ds}"
curl -s -X DELETE "${ES_HOST}/_data_stream/${ds}"
echo
else
echo "[INFO] keep data stream: ${ds}"
fi
done
done
EOF
sudo chmod +x /opt/elastic/bin/delete_old_es_indices.sh
手動投入
14日より前のデータを削除する場合
/opt/elastic/bin/delete_old_es_indices.sh
指定日のデータを投入する場合(7日より前のデータを削除する場合)
/opt/elastic/bin/delete_old_es_indices.sh 7
確認
curl -s http://elastic1:9200/_data_stream | jq -r '.data_streams[].name'
logs-authlog(syslog)-yyyy.mm.ddのリストを参照し、指定の日数以前のデータが出てこないこと。
cron登録
0 1 * * * root /opt/elastic/bin/export_syslog_to_es.sh
10 1 * * * root /opt/elastic/bin/export_authlog_to_es.sh
0 2 * * * root /opt/elastic/bin/delete_old_es_indices.sh
動作確認
Index一覧
curl http://elastic1:9200/_cat/indices?v
件数確認
curl http://elastic1:9200/logs-syslog-yyyy.mm.dd/_count
Discover確認
Analytics
↓
Discover
トラブルシュート
Indexが作成されない
確認
curl http://elastic1:9200/_cat/indices?v
Elasticsearch接続失敗
curl http://elastic1:9200
Kibanaが空
Data View確認
logs-syslog-*
logs-authlog-*
まとめ
本記事では Iceberg を正本として Elasticsearch / Kibana を追加しました。
この構成により、既存の Kafka / Hive / Iceberg 基盤へ影響を与えることなく、Kibana による全文検索や運用分析機能を利用できるようになります。
次回はKibanaでのデータ解析実践方法を解説します。