はじめに
これまで以下の部品は完成したため、
自動的に比較/再取り込みするシェルを作成してみた。
- HDFSへのsyslog取り込み機構
- 解析データへの自動変換/ローテーション機構
- HDFS同士のデータコピー機構
仕様
以下の動きを行うシェルを作成する。
前提としてMirrorMakerでストリーミングレプリケーションを
実施しているが予防措置として別途本シェルを使用します
シェルは両方のHDFS/Hiveにアクセスできるホストで実施します。
yyyymmddは実施前日を指定すること
1. 2つのHiveに対して以下のSQLを打ち込み、戻ってきたcnt数を比較する
set mapreduce.map.memory.mb=512;
set mapreduce.reduce.memory.mb=512;
set yarn.app.mapreduce.am.resource.mb=512;
SELECT count(*) FROM logs.syslog_curated WHERE dt='yyyy-mm-dd';
あっていれば終了、違っていれば次の動作を実施する。
この際比較元のデータが0というパターンでは異常終了とする。
2. HDFS間コピーの実施
以下のコマンドを実施
sudo -u hadoop hadoop distcp \
-Dmapreduce.job.queuename=default \
-Dyarn.app.mapreduce.am.resource.mb=384 \
-Dyarn.app.mapreduce.am.command-opts='-Xmx256m' \
-Dmapreduce.am.resource.mb=384 \
-Dmapreduce.am.command-opts='-Xmx256m' \
-Dmapreduce.map.memory.mb=384 \
-Dmapreduce.map.java.opts='-Xmx256m' \
-Dmapreduce.map.cpu.vcores=1 \
-m 2 \
-p -update -delete \
hdfs://cluster1/data/kafka/syslog/yyyy/mm/dd \
hdfs://hdptest:9000/data/kafka/syslog/yyyy/mm/dd
3. 再取り込みシェルの実施
移行先をターゲットにしたデータローテーションシェルを実施。
データ取り込みのみで、既存データの削除は実施しない。
sudo DRY_RUN=1 /usr/local/sbin/syslog_curate_rotate.sh
4. データ再比較を実施
成功すれば終了、失敗したら失敗した旨を出して終了する。
シェル設置
sudo tee /usr/local/sbin/check_and_resync_syslog.sh <<'EOF'
#!/usr/bin/env bash
set -euo pipefail
# ============================================================
# 設定(環境に合わせて変更)
# ============================================================
# JDBC URL だけを変数にする
HIVE1_JDBC_URL="${HIVE1_JDBC_URL:-jdbc:hive2://master1:2181,master2:2181,master3:2181/;serviceDiscoveryMode=zooKeeper;zooKeeperNamespace=hiveserver2}"
HIVE2_JDBC_URL="${HIVE2_JDBC_URL:-jdbc:hive2://hdptest:10000/default}"
# beeline コマンドを組み立て(URLは必ず引用符付きで渡す)
HIVE1_CMD="${HIVE1_CMD:-beeline -u \"${HIVE1_JDBC_URL}\" -n hive}"
HIVE2_CMD="${HIVE2_CMD:-beeline -u \"${HIVE2_JDBC_URL}\" -n hive}"
# distcp の SRC/DST
SRC_NN="${SRC_NN:-hdfs://cluster1}"
DST_NN="${DST_NN:-hdfs://hdptest:9000}"
# 対象ディレクトリのベース
SRC_BASE="${SRC_BASE:-/data/kafka/syslog}"
DST_BASE="${DST_BASE:-/data/kafka/syslog}"
# 再取り込みシェル
CURATE_SH="${CURATE_SH:-/usr/local/sbin/syslog_curate_rotate.sh}"
# ============================================================
# 日付(前日固定 / JST)
# ============================================================
YYYYMMDD="$(TZ=Asia/Tokyo date -d "yesterday" +"%Y%m%d")"
YYYY="${YYYYMMDD:0:4}"
MM="${YYYYMMDD:4:2}"
DD="${YYYYMMDD:6:2}"
DT="${YYYY}-${MM}-${DD}"
SRC_PATH="${SRC_NN}${SRC_BASE}/${YYYY}/${MM}/${DD}"
DST_PATH="${DST_NN}${DST_BASE}/${YYYY}/${MM}/${DD}"
# ============================================================
# 関数
# ============================================================
log() { echo "[$(TZ=Asia/Tokyo date '+%F %T')] $*"; }
run_hive_cnt() {
local hive_cmd="$1"
# ★要望の形式:SET → SELECT
# beelineは -e で複数文OK(セミコロン区切り)
local sql="
set mapreduce.map.memory.mb=512;
set mapreduce.reduce.memory.mb=512;
set yarn.app.mapreduce.am.resource.mb=512;
SELECT count(*) FROM logs.syslog_curated WHERE dt='${DT}';
"
# 出力を安定させる(結果だけ欲しい)
local out cnt
out=$(eval ${hive_cmd} --silent=true --outputformat=tsv2 --showHeader=false -e "\"${sql}\"")
# tsv2 だと結果が「1行の数字」になる想定。
# 念のため、数字だけの行を抽出して最後の1件を採用(SETの出力に数字が混ざっても回避)
cnt="$(echo "$out" | tr -d '\r' | awk '/^[0-9]+$/ {v=$0} END {print v}')"
[[ "$cnt" =~ ^[0-9]+$ ]] || {
log "ERROR: cnt parse failed. raw_out=[$out]"
return 1
}
echo "$cnt"
}
compare_cnt() {
local c1 c2
c1=$(run_hive_cnt "${HIVE1_CMD}") || { log "ERROR: Hive1 cnt取得失敗"; return 1; }
c2=$(run_hive_cnt "${HIVE2_CMD}") || { log "ERROR: Hive2 cnt取得失敗"; return 1; }
log "Compare dt=${DT}: hive1_cnt=${c1}, hive2_cnt=${c2}"
# ★追加:c1 が 0 の場合は「元データ不整合」として異常終了扱い
if [[ "${c1}" == "0" ]]; then
log "ERROR: hive1_cnt=0 は想定外。元データ不整合として終了します。"
return 20
fi
[[ "${c1}" == "${c2}" ]]
}
run_distcp() {
log "Run distcp: ${SRC_PATH} -> ${DST_PATH}"
sudo -u hadoop hadoop distcp \
-Dmapreduce.job.queuename=default \
-Dyarn.app.mapreduce.am.resource.mb=384 \
-Dyarn.app.mapreduce.am.command-opts='-Xmx256m' \
-Dmapreduce.am.resource.mb=384 \
-Dmapreduce.am.command-opts='-Xmx256m' \
-Dmapreduce.map.memory.mb=384 \
-Dmapreduce.map.java.opts='-Xmx256m' \
-Dmapreduce.map.cpu.vcores=1 \
-m 2 \
-p -update -delete \
"${SRC_PATH}" \
"${DST_PATH}"
}
run_curate() {
log "Run curate script: ${CURATE_SH} (JDBC_URL from HIVE2_JDBC_URL)"
# HIVE2_JDBC_URL を JDBC_URL として curate 側に渡す
sudo env JDBC_URL="${HIVE2_JDBC_URL}" DRY_RUN=1 "${CURATE_SH}"
}
# ============================================================
# メイン
# ============================================================
log "Start: yesterday_yyyymmdd=${YYYYMMDD} (dt=${DT})"
# 1) 初回比較
if compare_cnt; then
log "OK: cnt一致。終了します。"
exit 0
else
rc=$?
if [[ $rc -eq 20 ]]; then
log "FAIL: 元データ不整合(hive1_cnt=0)。終了します。"
exit 20
fi
fi
log "NG: cnt不一致。distcp → curate → 再比較を実施します。"
# 2) HDFS間コピー
run_distcp
# 3) 再取り込み
run_curate
# 4) 再比較
if compare_cnt; then
log "OK: 再比較で一致。終了します。"
exit 0
else
log "FAIL: 再比較でも不一致。要調査として終了します。"
exit 10
fi
EOF
シェル権限設定
sudo chmod 0755 /usr/local/sbin/check_and_resync_syslog.sh
cron実行(毎日 04:00 JST)
自動データ取り込みシェル(syslog_curate_rotate.sh)実施後に行ってください
sudo tee /etc/cron.d/check_and_resync_syslog >/dev/null <<'EOF'
SHELL=/bin/bash
PATH=/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin
TZ=Asia/Tokyo
00 3 * * * root /usr/local/sbin/check_and_resync_syslog.sh >> /var/log/check_and_resync_syslog.log 2>&1
EOF
手動実行
自動的に前日分の比較、一致しない場合はデータの整合を実施します。
sudo /usr/local/sbin/check_and_resync_syslog.sh
確認
それぞれのHiveで使用しているZeppelinに入りsyslogレコード数を比較してください。