1
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

Vector が普通に強すぎる。Kafka 無しでもココまでできた!大量パケットのリアルタイム解析アーキテクチャ(実践編)

Last updated at Posted at 2025-11-28

実践編: 現場で “本当に困るポイント” と対峙する

前回の概要編では、なぜ Vector がログフォワーダーの枠を超え、軽量なイベント処理エンジンとして成立するのかについて整理しました。

しかし、実際の現場ではドキュメントに書いてあることだけではうまくいかず、Vector・Lua・Cloud Storage・BigQuery の “境界” で発生する厄介な問題が数多く存在します。

この実践編では、まさにその“境界領域”でぶつかった課題と、大規模パケット処理を Vector だけで完結させるために必要だった、リアルな知見やノウハウについてまとめています。
本編は概要編で扱ったアーキテクチャの“内部で起きていること”を深掘りする内容であり、特に以下のようなポイントにフォーカスして得られた知見を共有します:

  • Lua によるロジックの適用範囲の見極め
  • 時刻処理(Lua × Vector × BigQuery)で起きる落とし穴と対処
  • BigQuery 外部テーブルの 1000 万ファイル制限と分割設計
  • JSON → CSV へ変更した理由とメリット・注意点
  • xpcall を使った堅牢なエラーハンドリング
  • Vector Sink を使った本番データの安全なサンプリング

実践的なノウハウと注意点

Lua Transform でどこまでできる?: 適用範囲と限界の見極め

Vector の Lua Transform は、単一イベント内で完結する処理であれば非常に強力です。本案件では、Java や Go などのアプリケーションを用意せず、Lua スクリプトだけで以下のような処理を実現しました:

  • 数値の型変換、フラグ判定、種別コードによる分岐などの 軽量ロジック
  • オフセットを自前で管理しながら、ペイロードを順次読み進める バイト列のパース処理
  • UDP ヘッダーやペイロードからの 固定長フィールド抽出
  • 長さフィールドを読み取りながらの 可変長フィールド抽出
  • 制御文字を含むバイト列をデバッグ用に出力する時のログ破損を避けるための 16 進ダンプ出力
  • Lua がタイムゾーンを扱えないため、自前で行う JST→UTC 変換ロジック

これらは Java であれば数百行のクラス設計を行い、ユーティリティやテストコードも必要になるような内容ですが、Lua では スクリプトとして数十から数百行で完結できるため、Vector のパイプラインに自然に組み込める点が非常に大きなメリットでした。

一方で、以下のような処理は Vector の "イベント単位モデル" と相性が悪く、Lua Transform だけでは実現困難です:

  • 複数イベントをまとめて 1 つのイベントとして出力する集約処理
  • イベントストリームを跨いだ 相関・クロス集計
  • 外部 API を呼ぶなど、ストリーム処理の境界を超える処理
  • 過去状態を保持し続ける必要がある ステートフルなロジック

こうした処理は、専用アプリケーション(Java/Go)や Kafka Streams などの別の基盤に委ねるほうが適しています。今回の案件は「1 パケット内で完結する解析」が中心だったため、Vector + Lua だけで十分実運用に耐えることができました。

時刻処理の注意点(Lua × Vector × BigQuery)

Vector + Lua + BigQuery の組み合わせでは、時刻処理が最も実務的な落とし穴になります。
本案件では、受信したパケットの内部に含まれる時刻フィールドを解析してログ化する必要がありましたが、下記の問題が注意すべき落とし穴を生んでいました:

Lua の日時関数がタイムゾーンを扱えない問題

  • Lua の日時関数には タイムゾーンの概念がないos.date 関数などはローカルタイムを前提とするが、TZ 反映が一部曖昧)
  • このため、Vector に TZ 環境変数を設定していても Lua に渡す時点で タイムゾーン情報が消える
  • BigQuery はタイムゾーンなし timestamp は UTC として解釈する

これらの仕様が組み合わさるため、パケット側の時刻が JST で渡ってくる場合、時刻を正しく BigQuery に取り込ませるには、Lua 側で必ず自前で JST→UTC(-9h)変換を行ってからタイムスタンプを出力するか、2025-01-01T00:00:00+09:00 のような明示的なタイムゾーン付き文字列にして出力する必要がありました。これを怠ると、BigQuery 上で 9 時間ずれたログが並ぶという致命的なズレが発生します。

Cloud Storage Sink の UTC パーティション問題

前述の問題にも関係しますが、Vector を使って Cloud Storage へログやイベントを保存する際、ハマりやすい落とし穴のひとつが「Cloud Storage Sink による UTC パーティション評価」です。

Vector の Cloud Storage Sink では、key_prefix 設定で year, month, day などの日時を用いて Cloud Storage 上のパスへ動的に振り分けてログファイルを格納することができます。これにより、Cloud Storage バケットを BigQuery の外部テーブルとする場合に、日付などで検索範囲を指定した BigQuery のパーティション化クエリに対応することができます。

sinks:
  gcs_sink:
    type: gcp_cloud_storage
    inputs:
      - my_lua_transform
    bucket: my-log-bucket
    key_prefix: "year=%Y/month=%m/day=%d/"
    encoding:
      codec: "csv"
      csv:
        fields:
          - "timestamp"
          - "field1"
          - "field2"
          - (以下、割愛)

しかし、このパス振り分けに使用される時刻は 必ず UTC で評価され、TZ=Asia/Tokyo などの Vector 側でタイムゾーンを設定していても評価されず、一切変更できません。(BigQuery 側の仕様に合わせているため、と思われます)

この仕様により、JST で見ると「1 月 1 日の朝 8 時」に出力されたログファイルが、UTC では「12 月 31 日 23:00」であるため、前日のフォルダに振り分けられるという現象が起きます。

このため、日次集計などでは、前日フォルダも読み込まないと完全なデータにならない、という厄介なことになり、実務的には日次バッチや時間帯集計で大きな問題につながります。

しかし、この UTC 評価による問題は BigQuery で集計する時に注意すべき一般的なトピックになるため、下記のように、BigQuery 側のクエリ側で対処するための様々な方法が存在します。Qiita 上にも各種記事がありますので、参考にされるとよいでしょう。

  • 日次集計クエリでは、JST と UTC の間で時刻変換するなどして、検索範囲に前日分のパーティションも含まれるように WHERE 条件を指定する
  • クエリ結果についても、JST と UTC の間で時刻変換日時関数を用いて JST に変換してから表示させる

Cloud Storage Sink の UTC 固定仕様は一見シンプルですが、タイムゾーンを持たない Lua、UTC 解釈が前提の BigQuery と組み合わせることで、保存日付と実際のログ時刻がズレる深刻な問題につながりやすいので注意しましょう。

時刻処理の注意点まとめ

上記の問題をまとめると、

  • タイムゾーンを持たない Lua は、受け取った JST 時刻をそのまま出力してしまうとタイムゾーン表記なしになるため、それをそのまま BigQuery に渡すと UTC と誤解される。このため、Lua 側で BigQuery の取り込み仕様に配慮した時刻変換が必要。
  • Cloud Storage Sink は常に UTC でフォルダ振り分けをする。このため、早朝のログが「前日フォルダ」に入るため、BigQuery の集計クエリ側で工夫しないと日次集計が破綻する

となります。これらを考えると、保存時刻をすべて UTC に正規化するのが最も安全で、後続の分析処理も一貫して取り扱える方法かと思います。
大規模なパケット処理パイプラインでは、時刻ズレが原因の不具合は発見が難しく、影響範囲も大きいため、設計段階でこのポイントを明確に理解しておくことが極めて重要です。

BigQuery 外部テーブル 1000 万ファイル制限と分割設計

本案件では、Cloud Storage に格納・保持しておく量は非常に多くなりますが、BigQuery による検索頻度は高くない案件でしたので、集計に時間がかかっても外部テーブル方式がコスト的に有利でした。しかし、Cloud Storage に非常に大量のファイルが出力される場合、BigQuery 外部テーブルは 1 テーブルあたり最大 1000 万ファイルまで という制限があるため、“すぐに上限へ到達する問題” が起きます。

注意すべきは、Hive パーティションで分割していても、“ひとつの外部テーブル全体”で 1000 万個が上限であることです。さらに、Cloud Storage を外部テーブルとして使う場合、バケット全体で 6000 万個が上限、という制約もあります。

今回のように秒間数十万〜百万パケットを処理する基盤では、ログファイルの数は膨大になります。そのため、次のような分割戦略が必要になります:

  • 時間軸でバケットを分ける(例:年別バケットと月別フォルダ)
  • データの属性(例:拠点 ID など)に応じて、水平軸でもバケットやフォルダを分割
  • これらに合わせて 外部テーブル自体も複数に分割

この設計を怠ると、数ヶ月で 1000 万ファイル上限に達し、クエリ実行不能になるリスクがあります。本案件では、時系列 × 水平軸の二軸で Cloud Storage を分割し、安全な拡張性を確保しました。また、BigQuery 側は、クエリ側でビューや UNION などを用いて工夫することで、上記の分割設計に対応することができます。

コスト面を考慮した出力形式の選択(JSON → CSV 変更の理由)

本案件では当初、パケット解析結果を JSON 形式で Cloud Storage に出力することにしていました。JSON は柔軟で、ネスト構造を自然に表現できるため、データモデルが複雑な場合やフィールド数が将来的に変わる可能性がある場合にはとても便利です。

しかし、大量にパケットを処理する必要があるケースでは、JSON は以下の理由で非効率になります:

  • JSON はキー名を毎レコードに含むため、ファイルサイズが大きくなりやすく、ストレージコストが跳ね上がる
  • BigQuery 外部テーブルの読み込みも重い

このため、JSON 出力を CSV 出力に切り替える方が、

  • ストレージコストが削減され、
  • BigQuery の外部テーブルとしての読み込みも高速化され、
  • パイプライン全体のコスト効率が向上します

ただし、CSV ではデータのネスト構造を表現できないため、必要なフィールドをすべてフラットに展開する設計が必要になります。

また、ファイルへの格納効率だけを重視すれば、Vector Sink にはバイナリ形式である Avro での出力機能がありますが、Vector Sink の Avro エンコーダは、筆者が試した時点では残念ながら BigQuery 読み込み時にエラーとされてしまい、おそらくスキーマ参照にまつわる Avro エンコーダーの仕様と推測される原因で、BigQuery 外部テーブルで直接読み込む用途とは非互換 のようでした。

このため、筆者の執筆時点で Vector Sink のエンコーダーが対応する出力形式で BigQuery 向けの出力形式としては、データのネスト構造が不要または代替可能な場合、コストと読み込み性能の面で JSON よりも CSV がよい選択肢の一つとなります。

Lua スクリプトの xpcall によるエラーハンドリング

Lua Transform は柔軟ですが、Lua スクリプト内のエラーはスタックトレースが見えにくく、原因箇所が特定しづらいという課題があります。

Lua Transform 内でエラーが起きると、エラー内容は表示されるものの:

  • どのフィールドの処理中だったか
  • どの可変長レコードの何番目で失敗したのか
  • オフセットがどの位置で止まっていたのか

といった情報が得られず、実際のパケットデータとの突き合わせが極めて難しいという課題がありました。このため、本案件では Lua スクリプトにおける適切なエラーハンドリングが不可欠でした。

そこで採用したのが xpcall を使った構造化エラーハンドリングです。Lua の xpcall(func, error_handler) は、エラー発生時にカスタムのエラーハンドリング関数へ制御を渡し、詳細なログを出力させることができます。

  • 各処理ステップを複数の関数に分割
  • それぞれを xpcall(func, error_handler) でラップして安全に呼び出す
  • どの段階でどのデータによりエラーが発生したかを、パケットダンプとともにログに記録

これにより、Lua スクリプトでも「どの段階の処理で失敗したか」が明確になり、Vector + Lua でも“本格的なパケット解析器”として運用できるレベルの堅牢で保守しやすい解析スクリプトを実現できました。

Vector Sink を使った本番データのサンプリング転送

新機能のリリース前に、本番環境で流れる実データを使って安全にテストを行える仕組みは、データパイプラインの運用では非常に重要です。しかし、多くの場合「本番のデータをそのまま開発環境に流す」ことは仕組み的にもガバナンス的にも流量的にも難しいことが多いです。

ところが、Vector はこの問題に対して きわめてシンプルで現実的な解決策 を提供できます。

Vector → Vector へのイベント転送が簡単にできる

これは、fluentbit などの他のフォワーダーにも似た機能がありますが、Vector は Sink として 別の Vector にイベントを転送する機能を持っています。さらに Transform と組み合わせることで転送前の柔軟なイベント選別も可能になります。

つまり、本番処理用 Vector のパイプラインから:

  • 全量ではなく
  • 特定の割合だけサンプリングし
  • 別の Vector(リリース前テスト用)に安全に転送する

といった構成が、下記のように Transform → Sink の組み合わせで極めて簡単に定義できます。

transform:
  sample_log:
    type: sample
    inputs:
      - my_lua_transform
    rate: 100000  # 10 万件に 1 件に絞って転送

sinks:
  test_vector:
    type: vector
    inputs:
      - sample_log
    address: "test-vector.test.svc.cluster.local:8080"  # リリース前テスト用 Vector

本番データで“安全に”テスト・検証ができるメリット

このため、本案件では、次のようなリリースフローを組むことができました:

  1. 本番 Vector のパイプラインで、パケット解析前のデータを一部だけサンプリング
  2. サンプリングされたデータを「リリース前テスト用 Vector」に転送
  3. リリース前テスト用 Vector 上で新しい Lua スクリプトや設定を安全に検証

これにより、本番と同じデータ構造を用いながら、本番に影響を与えずにリリース前に新機能の検証ができるようになりました。

似たような機能として、Google Cloud には、Packet Mirroring という、パケットの複製・転送ができる機能がありますが、Packet Mirroring では「数百万パケットのうち N 個だけランダム抽出」といったサンプリング動作は実現できないため、Vector Sink をサンプリングと組み合わせた方式は非常に有用でした。

まとめ

本実践編では、Vector と Lua を組み合わせて大量パケットを解析・変換し、Cloud Storage ~ BigQuery まで安全・高速に届けるために必要だった、”複数の技術を組み合わせる際の落とし穴”などについて、通常のドキュメントやリファレンスでは得にくい、実際の案件で得られた実践的な知見について体系的に紹介しました。

  • Lua Transform の適用範囲の見極め観点(1 イベントで完結する処理にする)
  • Lua × Vector × BigQuery の時刻・タイムゾーン問題と、その解決法である UTC への統一
  • Cloud Storage Sink の UTC 固定仕様に伴うフォルダ振り分けの罠
  • BigQuery 外部テーブルの 1000 万ファイル制限と“二軸分割設計”
  • JSON→CSV への形式変更により得た軽量化・低コスト運用
  • xpcall による段階的エラーハンドリングでのデバッグ性向上
  • Vector Sink を活用した本番データによる安全なリリース前テスト

最後にまとめとして、Vector は:

  • シンプルに構築・運用できる(Kubernetes + Helm で容易)
  • 高速・堅牢に動作する(Rust ベースの安全性)
  • Lua によって柔軟な解析ロジックが書ける
  • Source/Transform/Sink の組み合わせで多彩なイベント処理パイプラインが組める

という特長を最大限に発揮し、今回の案件に“ちょうどよい”解法となりました。
これらはすべて、「Kafka ほどの大規模分散基盤を導入せずに、Vector の高速性・堅牢性・柔軟性を活かして処理を成立させる」という要件の中で得られた知見やノウハウです。

本記事が、同じように「大量データをリアルタイム処理しつつ、運用コストも抑えたい」という現場のエンジニアにとって、実務のヒントや設計の指針となれば幸いです。

1
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
1
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?