0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

HDFSデータ比較・再取り込みシェルを作成してみた

Posted at

はじめに

これまで以下の部品は完成したため、
自動的に比較/再取り込みするシェルを作成してみた。

  • HDFSへのsyslog取り込み機構

  • 解析データへの自動変換/ローテーション機構

  • HDFS同士のデータコピー機構

仕様

以下の動きを行うシェルを作成する。

前提としてMirrorMakerでストリーミングレプリケーションを
実施しているが予防措置として別途本シェルを使用します

シェルは両方のHDFS/Hiveにアクセスできるホストで実施します。

yyyymmddは実施前日を指定すること

1. 2つのHiveに対して以下のSQLを打ち込み、戻ってきたcnt数を比較する

set mapreduce.map.memory.mb=512;
set mapreduce.reduce.memory.mb=512;
set yarn.app.mapreduce.am.resource.mb=512;

SELECT count(*) FROM logs.syslog_curated WHERE dt='yyyy-mm-dd';

あっていれば終了、違っていれば次の動作を実施する。

この際比較元のデータが0というパターンでは異常終了とする。

2. HDFS間コピーの実施

以下のコマンドを実施

sudo -u hadoop hadoop distcp \
  -Dmapreduce.job.queuename=default \
  -Dyarn.app.mapreduce.am.resource.mb=384 \
  -Dyarn.app.mapreduce.am.command-opts='-Xmx256m' \
  -Dmapreduce.am.resource.mb=384 \
  -Dmapreduce.am.command-opts='-Xmx256m' \
  -Dmapreduce.map.memory.mb=384 \
  -Dmapreduce.map.java.opts='-Xmx256m' \
  -Dmapreduce.map.cpu.vcores=1 \
  -m 2 \
  -p -update -delete \
  hdfs://cluster1/data/kafka/syslog/yyyy/mm/dd \
  hdfs://hdptest:9000/data/kafka/syslog/yyyy/mm/dd

3. 再取り込みシェルの実施

移行先をターゲットにしたデータローテーションシェルを実施。
データ取り込みのみで、既存データの削除は実施しない。

sudo DRY_RUN=1 /usr/local/sbin/syslog_curate_rotate.sh

4. データ再比較を実施

成功すれば終了、失敗したら失敗した旨を出して終了する。

シェル設置

sudo tee /usr/local/sbin/check_and_resync_syslog.sh <<'EOF'
#!/usr/bin/env bash
set -euo pipefail

# ============================================================
# 設定(環境に合わせて変更)
# ============================================================

# JDBC URL だけを変数にする
HIVE1_JDBC_URL="${HIVE1_JDBC_URL:-jdbc:hive2://master1:2181,master2:2181,master3:2181/;serviceDiscoveryMode=zooKeeper;zooKeeperNamespace=hiveserver2}"
HIVE2_JDBC_URL="${HIVE2_JDBC_URL:-jdbc:hive2://hdptest:10000/default}"

# beeline コマンドを組み立て(URLは必ず引用符付きで渡す)
HIVE1_CMD="${HIVE1_CMD:-beeline -u \"${HIVE1_JDBC_URL}\" -n hive}"
HIVE2_CMD="${HIVE2_CMD:-beeline -u \"${HIVE2_JDBC_URL}\" -n hive}"

# distcp の SRC/DST
SRC_NN="${SRC_NN:-hdfs://cluster1}"
DST_NN="${DST_NN:-hdfs://hdptest:9000}"

# 対象ディレクトリのベース
SRC_BASE="${SRC_BASE:-/data/kafka/syslog}"
DST_BASE="${DST_BASE:-/data/kafka/syslog}"

# 再取り込みシェル
CURATE_SH="${CURATE_SH:-/usr/local/sbin/syslog_curate_rotate.sh}"

# ============================================================
# 日付(前日固定 / JST)
# ============================================================
YYYYMMDD="$(TZ=Asia/Tokyo date -d "yesterday" +"%Y%m%d")"
YYYY="${YYYYMMDD:0:4}"
MM="${YYYYMMDD:4:2}"
DD="${YYYYMMDD:6:2}"
DT="${YYYY}-${MM}-${DD}"

SRC_PATH="${SRC_NN}${SRC_BASE}/${YYYY}/${MM}/${DD}"
DST_PATH="${DST_NN}${DST_BASE}/${YYYY}/${MM}/${DD}"

# ============================================================
# 関数
# ============================================================
log() { echo "[$(TZ=Asia/Tokyo date '+%F %T')] $*"; }

run_hive_cnt() {
  local hive_cmd="$1"

  # ★要望の形式:SET → SELECT
  # beelineは -e で複数文OK(セミコロン区切り)
  local sql="
set mapreduce.map.memory.mb=512;
set mapreduce.reduce.memory.mb=512;
set yarn.app.mapreduce.am.resource.mb=512;

SELECT count(*) FROM logs.syslog_curated WHERE dt='${DT}';
"

  # 出力を安定させる(結果だけ欲しい)
  local out cnt
  out=$(eval ${hive_cmd} --silent=true --outputformat=tsv2 --showHeader=false -e "\"${sql}\"")

  # tsv2 だと結果が「1行の数字」になる想定。
  # 念のため、数字だけの行を抽出して最後の1件を採用(SETの出力に数字が混ざっても回避)
  cnt="$(echo "$out" | tr -d '\r' | awk '/^[0-9]+$/ {v=$0} END {print v}')"

  [[ "$cnt" =~ ^[0-9]+$ ]] || {
    log "ERROR: cnt parse failed. raw_out=[$out]"
    return 1
  }

  echo "$cnt"
}

compare_cnt() {
  local c1 c2
  c1=$(run_hive_cnt "${HIVE1_CMD}") || { log "ERROR: Hive1 cnt取得失敗"; return 1; }
  c2=$(run_hive_cnt "${HIVE2_CMD}") || { log "ERROR: Hive2 cnt取得失敗"; return 1; }

  log "Compare dt=${DT}: hive1_cnt=${c1}, hive2_cnt=${c2}"

  # ★追加:c1 が 0 の場合は「元データ不整合」として異常終了扱い
  if [[ "${c1}" == "0" ]]; then
    log "ERROR: hive1_cnt=0 は想定外。元データ不整合として終了します。"
    return 20
  fi

  [[ "${c1}" == "${c2}" ]]
}

run_distcp() {
  log "Run distcp: ${SRC_PATH} -> ${DST_PATH}"
  sudo -u hadoop hadoop distcp \
    -Dmapreduce.job.queuename=default \
    -Dyarn.app.mapreduce.am.resource.mb=384 \
    -Dyarn.app.mapreduce.am.command-opts='-Xmx256m' \
    -Dmapreduce.am.resource.mb=384 \
    -Dmapreduce.am.command-opts='-Xmx256m' \
    -Dmapreduce.map.memory.mb=384 \
    -Dmapreduce.map.java.opts='-Xmx256m' \
    -Dmapreduce.map.cpu.vcores=1 \
    -m 2 \
    -p -update -delete \
    "${SRC_PATH}" \
    "${DST_PATH}"
}

run_curate() {
  log "Run curate script: ${CURATE_SH} (JDBC_URL from HIVE2_JDBC_URL)"

  # HIVE2_JDBC_URL を JDBC_URL として curate 側に渡す
  sudo env JDBC_URL="${HIVE2_JDBC_URL}" DRY_RUN=1 "${CURATE_SH}"
}

# ============================================================
# メイン
# ============================================================
log "Start: yesterday_yyyymmdd=${YYYYMMDD} (dt=${DT})"

# 1) 初回比較
if compare_cnt; then
  log "OK: cnt一致。終了します。"
  exit 0
else
  rc=$?
  if [[ $rc -eq 20 ]]; then
    log "FAIL: 元データ不整合(hive1_cnt=0)。終了します。"
    exit 20
  fi
fi

log "NG: cnt不一致。distcp → curate → 再比較を実施します。"

# 2) HDFS間コピー
run_distcp

# 3) 再取り込み
run_curate

# 4) 再比較
if compare_cnt; then
  log "OK: 再比較で一致。終了します。"
  exit 0
else
  log "FAIL: 再比較でも不一致。要調査として終了します。"
  exit 10
fi
EOF

シェル権限設定

sudo chmod 0755 /usr/local/sbin/check_and_resync_syslog.sh

cron実行(毎日 04:00 JST)

自動データ取り込みシェル(syslog_curate_rotate.sh)実施後に行ってください

sudo tee /etc/cron.d/check_and_resync_syslog >/dev/null <<'EOF'
SHELL=/bin/bash
PATH=/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin
TZ=Asia/Tokyo

00 3 * * * root /usr/local/sbin/check_and_resync_syslog.sh >> /var/log/check_and_resync_syslog.log 2>&1
EOF

手動実行

自動的に前日分の比較、一致しない場合はデータの整合を実施します。

sudo /usr/local/sbin/check_and_resync_syslog.sh

確認

それぞれのHiveで使用しているZeppelinに入りsyslogレコード数を比較してください。

0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?