0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

ログ分析基盤に Elasticsearch / Kibana を追加してみる(構築編)

0
Last updated at Posted at 2026-05-30

ログ分析基盤に Elasticsearch / Kibana を追加してみる(構築編)

はじめに

前回の記事では、Iceberg を正本として Elasticsearch / Kibana を追加する設計について解説しました。

本記事では実際に Elasticsearch と Kibana を構築し、Iceberg のデータを日次バッチで投入するところまで実施します。

完成後の構成は以下です。


シリーズ構成

  1. 設計編
  2. 構築編(本記事)
  3. 実践編
  4. 運用編

前提条件

既存環境

Kafka
HDFS
Hive
Iceberg
Spark

対象テーブル

hive_prod.logs.syslog_iceberg
hive_prod.logs.authlog_iceberg

追加サーバ

elastic1

バッチ実行サーバ

ope1

elastic1構築

手順ではIP:192.168.11.20を使用しますが、必要に応じ変更してください。
OSはAlmaLinux9を使用してください。

スペック

項目
CPU 2 Core
Memory 4GB
Disk 30GB

Elasticsearch事前設定

sudo tee /etc/sysctl.d/99-elasticsearch.conf <<'EOF'
vm.max_map_count=1048576
EOF

sudo sysctl --system

確認

sysctl vm.max_map_count

期待値

vm.max_map_count = 1048576

Docker導入

# Dockerインストール
sudo dnf install -y docker docker-compose-plugin

# Docker起動・自動起動
sudo systemctl enable --now docker

# サービス状態確認
sudo systemctl status docker

# 動作確認
sudo docker run --rm hello-world

# 一般ユーザーでdocker実行可能にする
sudo usermod -aG docker $USER
newgrp docker

# バージョン確認
docker version
docker compose version

Elasticsearch / Kibana起動

作業ディレクトリ

mkdir -p ~/elastic-kibana
cd ~/elastic-kibana

docker-compose.yml

services:

  elasticsearch:
    image: docker.elastic.co/elasticsearch/elasticsearch:8.15.3

    container_name: elasticsearch

    environment:
      - discovery.type=single-node
      - xpack.security.enabled=false
      - ES_JAVA_OPTS=-Xms4g -Xmx4g

    ports:
      - "9200:9200"

    volumes:
      - es-data:/usr/share/elasticsearch/data

    restart: unless-stopped

  kibana:
    image: docker.elastic.co/kibana/kibana:8.15.3

    container_name: kibana

    environment:
      - ELASTICSEARCH_HOSTS=http://elasticsearch:9200
      - I18N_LOCALE=ja-JP

    ports:
      - "5601:5601"

    depends_on:
      - elasticsearch

    restart: unless-stopped

volumes:
  es-data:

起動

docker compose up -d

確認

curl http://elastic1:9200

Kibana

http://elastic1:5601

Index Template作成

登録

curl -X PUT "http://elastic1:9200/_index_template/logs_custom_template" \
  -H "Content-Type: application/json" \
  -d '{
    "index_patterns": ["logs-*"],
    "priority": 1000,
    "data_stream": {},
    "template": {
      "mappings": {
        "properties": {
          "@timestamp": { "type": "date" },
          "dt": {
            "type": "date",
            "format": "strict_date||yyyy-MM-dd"
          },
          "hr": { "type": "integer" },
          "host": { "type": "keyword" },
          "program": { "type": "keyword" },
          "severity": { "type": "integer" },
          "msg": { "type": "text" }
        }
      }
    }
  }'

確認

curl -s "http://elastic1:9200/_index_template/logs_custom_template?pretty"

ope1準備

Connector配置

sudo mkdir -p /opt/spark/jars-extra

cd /opt/spark/jars-extra

sudo wget \
https://repo1.maven.org/maven2/org/elasticsearch/elasticsearch-spark-30_2.12/8.15.3/elasticsearch-spark-30_2.12-8.15.3.jar

ディレクトリ作成

sudo mkdir -p /opt/elastic/bin

syslog投入ジョブ

ファイル

/opt/elastic/bin/export_syslog_to_es.py
sudo tee /opt/elastic/bin/export_syslog_to_es.py <<'EOF'
import sys
from datetime import datetime
from pyspark.sql import SparkSession

if len(sys.argv) != 2:
    print("Usage: export_syslog_to_es.py YYYY-MM-DD", file=sys.stderr)
    sys.exit(1)

target_dt = sys.argv[1]

try:
    datetime.strptime(target_dt, "%Y-%m-%d")
except ValueError:
    print(f"Invalid date format: {target_dt}. expected YYYY-MM-DD", file=sys.stderr)
    sys.exit(1)

index_name = "logs-syslog-" + target_dt.replace("-", ".")

spark = (
    SparkSession.builder
    .appName(f"syslog-to-es-{target_dt}")
    .enableHiveSupport()
    .getOrCreate()
)

df = spark.sql(f'''
SELECT
  host,
  ts AS `@timestamp`,
  severity,
  program,
  msg,
  CAST(dt AS STRING) AS dt,
  hr
FROM hive_prod.logs.syslog_iceberg
WHERE dt = DATE '{target_dt}'
''')

(
    df.write
      .format("org.elasticsearch.spark.sql")
      .mode("append")
      .option("es.nodes", "192.168.11.20")
      .option("es.port", "9200")
      .option("es.resource", index_name)
      .option("es.nodes.wan.only", "true")
      .option("es.write.operation", "create")
      .save()
)

spark.stop()
EOF

authlog投入ジョブ

ファイル

/opt/elastic/bin/export_authlog_to_es.py
sudo tee /opt/elastic/bin/export_authlog_to_es.py <<'EOF'
import sys
from datetime import datetime
from pyspark.sql import SparkSession

if len(sys.argv) != 2:
    print("Usage: export_syslog_to_es.py YYYY-MM-DD", file=sys.stderr)
    sys.exit(1)

target_dt = sys.argv[1]

try:
    datetime.strptime(target_dt, "%Y-%m-%d")
except ValueError:
    print(f"Invalid date format: {target_dt}. expected YYYY-MM-DD", file=sys.stderr)
    sys.exit(1)

index_name = "logs-authlog-" + target_dt.replace("-", ".")

spark = (
    SparkSession.builder
    .appName(f"syslog-to-es-{target_dt}")
    .enableHiveSupport()
    .getOrCreate()
)

df = spark.sql(f'''
SELECT
  host,
  ts AS `@timestamp`,
  severity,
  program,
  msg,
  CAST(dt AS STRING) AS dt,
  hr
FROM hive_prod.logs.authlog_iceberg
WHERE dt = DATE '{target_dt}'
''')

(
    df.write
      .format("org.elasticsearch.spark.sql")
      .mode("append")
      .option("es.nodes", "192.168.11.20")
      .option("es.port", "9200")
      .option("es.resource", index_name)
      .option("es.nodes.wan.only", "true")
      .option("es.write.operation", "create")
      .save()
)

spark.stop()
EOF

実行ラッパ

export_syslog_to_es.sh

sudo tee /opt/elastic/bin/export_syslog_to_es.sh <<'EOF'
#!/usr/bin/env bash
set -euo pipefail

ES_URL="http://elastic1:9200"

# 引数があればその日付、なければ yesterday
target_dt="${1:-$(date -d 'yesterday' +%F)}"

# YYYY-MM-DD 形式チェック
if ! [[ "${target_dt}" =~ ^[0-9]{4}-[0-9]{2}-[0-9]{2}$ ]]; then
  echo "[ERROR] target_dt must be YYYY-MM-DD. input=${target_dt}"
  exit 1
fi

index_name="logs-syslog-${target_dt//-/.}"

echo "[INFO] target_dt=${target_dt}"
echo "[INFO] index_name=${index_name}"

tmp_file="/tmp/delete_${index_name}.json"

http_code=$(curl -s -o "${tmp_file}" -w "%{http_code}" \
  -X DELETE "${ES_URL}/_data_stream/${index_name}" || true)

case "${http_code}" in
  200)
    echo "[INFO] deleted ${index_name}"
    ;;
  404)
    echo "[INFO] ${index_name} does not exist. continue."
    ;;
  *)
    echo "[ERROR] failed to delete ${index_name}. http_code=${http_code}"
    cat "${tmp_file}" || true
    exit 1
    ;;
esac

export SPARK_HOME=/opt/spark/current
export SPARK_CONF_DIR=/etc/spark/conf
export HADOOP_CONF_DIR=/etc/hadoop/conf
export HIVE_CONF_DIR=/etc/hive/conf

${SPARK_HOME}/bin/spark-submit \
  --master spark://iceberg1:7077 \
  --deploy-mode client \
  --jars /opt/spark/jars-extra/elasticsearch-spark-30_2.12-8.15.3.jar \
  /opt/elastic/bin/export_syslog_to_es.py "${target_dt}"

echo "[INFO] count after import"
curl -s "${ES_URL}/${index_name}/_count?pretty"
EOF

sudo chmod +x /opt/elastic/bin/export_syslog_to_es.sh

export_authlog_to_es.sh

sudo tee /opt/elastic/bin/export_authlog_to_es.sh <<'EOF'
#!/usr/bin/env bash
set -euo pipefail

ES_URL="http://elastic1:9200"

# 引数があればその日付、なければ yesterday
target_dt="${1:-$(date -d 'yesterday' +%F)}"

# YYYY-MM-DD 形式チェック
if ! [[ "${target_dt}" =~ ^[0-9]{4}-[0-9]{2}-[0-9]{2}$ ]]; then
  echo "[ERROR] target_dt must be YYYY-MM-DD. input=${target_dt}"
  exit 1
fi

index_name="logs-authlog-${target_dt//-/.}"

echo "[INFO] target_dt=${target_dt}"
echo "[INFO] index_name=${index_name}"

tmp_file="/tmp/delete_${index_name}.json"

http_code=$(curl -s -o "${tmp_file}" -w "%{http_code}" \
  -X DELETE "${ES_URL}/_data_stream/${index_name}" || true)

case "${http_code}" in
  200)
    echo "[INFO] deleted ${index_name}"
    ;;
  404)
    echo "[INFO] ${index_name} does not exist. continue."
    ;;
  *)
    echo "[ERROR] failed to delete ${index_name}. http_code=${http_code}"
    cat "${tmp_file}" || true
    exit 1
    ;;
esac

export SPARK_HOME=/opt/spark/current
export SPARK_CONF_DIR=/etc/spark/conf
export HADOOP_CONF_DIR=/etc/hadoop/conf
export HIVE_CONF_DIR=/etc/hive/conf

${SPARK_HOME}/bin/spark-submit \
  --master spark://iceberg1:7077 \
  --deploy-mode client \
  --jars /opt/spark/jars-extra/elasticsearch-spark-30_2.12-8.15.3.jar \
  /opt/elastic/bin/export_authlog_to_es.py "${target_dt}"

echo "[INFO] count after import"
curl -s "${ES_URL}/${index_name}/_count?pretty"
EOF

sudo chmod +x /opt/elastic/bin/export_authlog_to_es.sh

手動投入

前日のデータを投入する場合

/opt/elastic/bin/export_syslog_to_es.sh

/opt/elastic/bin/export_authlog_to_es.sh

指定日のデータを投入する場合

/opt/elastic/bin/export_syslog_to_es.sh yyyy-mm-dd

/opt/elastic/bin/export_authlog_to_es.sh yyyy-mm-dd

確認

curl -s http://elastic1:9200/_data_stream | jq -r '.data_streams[].name'

logs-authlog(syslog)-yyyy.mm.ddのリストが出てくること。


Kibana設定

アクセス

http://elastic1:5601

Data View作成

左側のメニューからManagement-Stack Managementをクリックする。
左側のメニューからKibane-データビューをクリックし、以下のデータビューを作成する。

syslog

Index Pattern

logs-syslog-*

authlog

Index Pattern

logs-authlog-*

Timestamp

@timestamp

Dashboard例

作成例

  • ホスト別ログ件数
  • severity別件数
  • program別件数
  • 時間帯別件数

14日ローテーション

delete_old_es_indices.sh

sudo tee /opt/elastic/bin/delete_old_es_indices.sh <<'EOF'
#!/usr/bin/env bash
set -euo pipefail

ES_HOST="http://elastic1:9200"

# 引数があればその指定日分ローテーション、なければ14日分ローテーション
RETENTION_DAYS="${1:-14}"

CUTOFF=$(date -d "${RETENTION_DAYS} days ago" +%s)

for prefix in logs-syslog logs-authlog
do
  curl -s "${ES_HOST}/_data_stream/${prefix}-*" \
  | jq -r '.data_streams[].name' \
  | while read -r ds
    do
      [ -z "${ds}" ] && continue

      idx_date=${ds#${prefix}-}

      if ! [[ "${idx_date}" =~ ^[0-9]{4}\.[0-9]{2}\.[0-9]{2}$ ]]; then
        echo "[WARN] skip invalid data stream name: ${ds}"
        continue
      fi

      epoch=$(date -d "${idx_date//./-}" +%s)

      if [ "${epoch}" -lt "${CUTOFF}" ]; then
        echo "[INFO] delete data stream: ${ds}"
        curl -s -X DELETE "${ES_HOST}/_data_stream/${ds}"
        echo
      else
        echo "[INFO] keep data stream: ${ds}"
      fi
    done
done
EOF

sudo chmod +x /opt/elastic/bin/delete_old_es_indices.sh

手動投入

14日より前のデータを削除する場合

/opt/elastic/bin/delete_old_es_indices.sh

指定日のデータを投入する場合(7日より前のデータを削除する場合)

/opt/elastic/bin/delete_old_es_indices.sh 7

確認

curl -s http://elastic1:9200/_data_stream | jq -r '.data_streams[].name'

logs-authlog(syslog)-yyyy.mm.ddのリストを参照し、指定の日数以前のデータが出てこないこと。


cron登録

0 1 * * * root /opt/elastic/bin/export_syslog_to_es.sh
10 1 * * * root /opt/elastic/bin/export_authlog_to_es.sh
0 2 * * * root /opt/elastic/bin/delete_old_es_indices.sh

動作確認

Index一覧

curl http://elastic1:9200/_cat/indices?v

件数確認

curl http://elastic1:9200/logs-syslog-yyyy.mm.dd/_count

Discover確認

Analytics
↓
Discover

トラブルシュート

Indexが作成されない

確認

curl http://elastic1:9200/_cat/indices?v

Elasticsearch接続失敗

curl http://elastic1:9200

Kibanaが空

Data View確認

logs-syslog-*
logs-authlog-*

まとめ

本記事では Iceberg を正本として Elasticsearch / Kibana を追加しました。

この構成により、既存の Kafka / Hive / Iceberg 基盤へ影響を与えることなく、Kibana による全文検索や運用分析機能を利用できるようになります。

次回はKibanaでのデータ解析実践方法を解説します。

0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?