1
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

オブザーバビリティデータパイプライン Vector の使い方

Posted at

オブザーバビリティデータパイプライン Vector の使い方

はじめに

本記事では、Vectorの技術仕様、設定方法、およびVector Remap Language(VRL)の書き方を中心とした技術者向け実装ガイドを提供します。実際にVectorを導入・運用する際に必要な知識を、実践的な例とともに解説していきます。

Vector の開発経緯や位置づけについて調べた内容は以下の記事にまとめました。

動作環境と実行方法

Vectorは Rust で記述されたシングルバイナリであり、依存関係(JVMやRubyなど)を必要としません。

動作環境

  • OS: Linux (x86_64, ARM64), macOS, Windows, FreeBSD
  • プラットフォーム: Docker, Kubernetes (Sidecar または DaemonSet), Systemd

インストールと実行

最も一般的なLinux/Docker環境での例です。

Linux (パッケージマネージャー)

# RPM/Debian系など公式スクリプト
curl --proto '=https' --tlsv1.2 -sSf https://sh.vector.dev | sh
# 実行 (デフォルト設定: /etc/vector/vector.toml)
vector --config /etc/vector/vector.toml

# 設定ファイルの検証
vector validate --config /etc/vector/vector.toml

# デバッグモードで実行
vector --config /etc/vector/vector.toml --log-level debug

Docker

docker run -d \
  -v $(pwd)/vector.toml:/etc/vector/vector.toml:ro \
  timberio/vector:latest-alpine

Kubernetes (DaemonSet の例)

Kubernetes環境では、DaemonSetとしてデプロイすることで各ノードでVectorを実行できます。設定ファイルはConfigMapとして管理し、各Podにマウントします。

アーキテクチャと設定ファイル (vector.toml)

Vectorの設定は主に TOML (YAML/JSONも可) で記述します。

Source (入力) → Transform (変換) → Sink (出力) のパイプラインを構築します。各コンポーネントは inputs フィールドで前のコンポーネントの ID を指定して連結します。

設定ファイルの構造

# グローバルオプション
data_dir = "/var/lib/vector"

# [Sources]: データの入り口
[sources.my_source_id]
  type = "..."

# [Transforms]: 加工・集計 (Optional)
[transforms.my_transform_id]
  type = "..."
  inputs = ["my_source_id"] # 直前のコンポーネントIDを指定

# [Sinks]: データの出口
[sinks.my_sink_id]
  type = "..."
  inputs = ["my_transform_id"] # 直前のコンポーネントIDを指定

実践的な TOML サンプル

例1: 基本的なログ処理パイプライン

「ログファイルを読み込み、JSONとしてパースし、特定のフィールドを削除してコンソールに出力する」例です。

# 1. ソース: ローカルのログファイルをTail
[sources.app_logs]
  type = "file"
  include = ["/var/log/app/*.log"]
  ignore_older_secs = 600

# 2. トランスフォーム: JSONパースとフィールド操作 (VRL使用)
[transforms.process_logs]
  type = "remap"
  inputs = ["app_logs"]
  source = '''
    # 受信したメッセージ(JSON文字列)をパース
    . = parse_json!(.message)
    
    # タイムスタンプの正規化
    .timestamp = parse_timestamp!(.timestamp, "%Y-%m-%dT%H:%M:%S%z")
    
    # 不要なフィールドの削除
    del(.debug_info)
    
    # 環境タグの付与
    .env = "production"
  '''

# 3. シンク: コンソールへ出力 (デバッグ用)
[sinks.console_out]
  type = "console"
  inputs = ["process_logs"]
  encoding.codec = "json"

例2: 条件分岐と複数シンクへのルーティング

エラーログと通常ログを分けて、異なるシンクに送信する例です。

[sources.app_logs]
  type = "file"
  include = ["/var/log/app/*.log"]

# エラーログと通常ログを分岐
[transforms.route_logs]
  type = "route"
  inputs = ["app_logs"]
  route.error = '.level == "error"'
  route.info = '.level == "info"'

# エラーログの処理
[transforms.process_errors]
  type = "remap"
  inputs = ["route_logs.error"]
  source = '''
    . = parse_json!(.message)
    .alert = true
    .severity = "high"
  '''

# 通常ログの処理
[transforms.process_info]
  type = "remap"
  inputs = ["route_logs.info"]
  source = '''
    . = parse_json!(.message)
    .alert = false
  '''

# エラーログをElasticsearchへ
[sinks.error_to_es]
  type = "elasticsearch"
  inputs = ["process_errors"]
  endpoints = ["http://elasticsearch:9200"]
  index = "error-logs-%Y.%m.%d"

# 通常ログをS3へ
[sinks.info_to_s3]
  type = "aws_s3"
  inputs = ["process_info"]
  bucket = "my-logs-bucket"
  key_prefix = "info-logs/"
  compression = "gzip"

主要コンポーネント一覧

主要 Sources (入力)

Type 用途 備考
file ログファイルのTail logrotate対応、チェックポイント機能あり
docker_logs Dockerコンテナログ Docker Daemonから直接収集
kubernetes_logs K8s Podログ K8sメタデータの自動付与機能あり
syslog Syslog受信 UDP/TCP/Unix Socket対応
fluent Fluentd/Fluent Bit互換 Fluentdからの移行時に使用
statsd / prometheus メトリクス収集 Push/Pull 両対応

主要 Transforms (変換)

Type 用途 備考
remap 汎用データ操作 VRLを使用。現在推奨される主要な変換方法
filter 条件による破棄 remap でも代用可能だが、フィルタ専用として高速
reduce 複数行ログの結合 スタックトレースの結合などに使用
route 条件分岐 データを複数のパイプラインへ振り分ける
sample サンプリング ログ量の削減に使用

主要 Sinks (出力)

Type 用途 備考
aws_s3 S3への保存 バッチ処理、圧縮、パーティショニング対応
elasticsearch ES / OpenSearch バルクインサート対応
datadog_logs Datadog Logs Datadog API利用
prometheus_remote_write Prometheus メトリクス書き込み
kafka Apache Kafka ストリーミングプラットフォームへ連携
console 標準出力 デバッグ用

Vector Remap Language (VRL)

VRL は、remap トランスフォーム内で使用する、Rustライクなドメイン固有言語です。高速かつ安全(パニックしない)にデータを操作できます。

基本構文

  • パス: . で現在のイベント(JSONオブジェクト)を参照します。

  • .message: messageフィールド

  • .tags.host: tagsオブジェクト内のhostフィールド

  • 代入: .new_field = "value"

主要な関数と書き方

1. パース (Parsing)
! が付く関数は失敗時にエラーを投げ(VRL全体が失敗)、付かない関数は null を返します。

# JSONをパースし、ルートオブジェクトに展開 (Merge)
. = parse_json!(.message)

# ApacheログなどをRegexでパース
structured = parse_regex!(.message, r'^(?P<ip>\d+\.\d+\.\d+\.\d+) ...')
.ip_address = structured.ip

2. データの操作

# フィールドの削除
del(.password)

# 条件分岐
if .status >= 500 {
  .level = "error"
} else {
  .level = "info"
}

# 文字列操作
.upper_msg = upcase(.message)

3. メタデータの扱い
Vectorはログ本体とは別にメタデータ(収集元ファイルパスなど)を持ちます。メタデータは % プレフィックスで参照します。

# ログ本体にファイルパスを含める
.source_file = %log.file_path

# タイムスタンプ(メタデータ)を取得
.ingest_time = %log.timestamp

4. エラーハンドリング

VRLでは、エラーが発生した場合の処理を明示的に記述できます。

# 安全なパース(エラー時はnullを返す)
parsed = parse_json(.message)
if parsed != null {
  .parsed_data = parsed
} else {
  .parse_error = "Failed to parse JSON"
}

# エラーをログに記録して続行
result = parse_timestamp(.timestamp, "%Y-%m-%dT%H:%M:%S%z")
if result == null {
  log("Invalid timestamp format: " + string(.timestamp), level: "warn")
  .timestamp = now()
}

制約と注意点

  1. Disk Buffer の設計:
    Sinks への送信が詰まった場合、メモリまたはディスクにバッファリングします。ディスクバッファ使用時は、ディスク容量の上限設定 (max_size) が必須です。ディスク容量不足になるとデータが失われる可能性があるため、適切なサイズ設定と監視が必要です。
  2. At-least-once (少なくとも一回の到達保証):
    Vector は信頼性を重視するため、送信失敗時の再送によりデータが重複する可能性があります。厳密な Exactly-once は保証されません。重複を許容できない場合は、シンク側で重複排除の仕組みを実装する必要があります。
  3. VRL の型安全性:
    VRL は型に厳格です。文字列を数値として扱いたい場合は明示的に to_int などでキャストする必要があります。型の不一致は実行時エラーとなるため、VRL プレイグラウンドで事前に検証することを推奨します。
  4. ステートフル変換のメモリ:
    reduce (複数行結合) や aggregate などのステートフルな処理はメモリを消費します。高トラフィック時は設定(タイムアウトやバッファサイズ)に注意が必要です。メモリ不足を防ぐため、適切なタイムアウト設定とリソース制限を設けましょう。

モニタリングと運用

Vector自身のメトリクス

Vectorは内部メトリクスをPrometheus形式で公開します。デフォルトでは http://localhost:8686/metrics で取得できます。

主要なメトリクス:

  • vector_events_processed_total: 処理されたイベント数
  • vector_events_dropped_total: ドロップされたイベント数
  • vector_component_errors_total: コンポーネントエラー数
  • vector_component_received_events_total: コンポーネントが受信したイベント数

ヘルスチェック

Vectorはヘルスチェックエンドポイントを提供します。

# ヘルスチェック
curl http://localhost:8686/health

# メトリクス取得
curl http://localhost:8686/metrics

ログの確認

Vector自身のログは、設定ファイルの log_level で制御できます。本番環境では info または warn に設定し、必要に応じて debug に切り替えてトラブルシューティングを行います。

7. パフォーマンスチューニングとベストプラクティス

リソース設定

Vectorのパフォーマンスを最適化するため、以下の設定を検討してください。

  • 並列処理:
    num_workers パラメータで並列度を調整できます。CPUコア数に応じて適切な値を設定しましょう。
  • バッファサイズ:
    メモリバッファとディスクバッファのサイズを、データ量と可用性要件に応じて調整します。
  • バッチサイズ:
    シンクへの送信時にバッチサイズを調整することで、スループットとレイテンシのバランスを取れます。

設定の検証

設定ファイルの構文チェックは以下のコマンドで実行できます。

vector validate --config /etc/vector/vector.toml

ログレベルの調整

デバッグ時は --log-level debug を指定することで、詳細なログを確認できます。本番環境では info または warn に設定することを推奨します。

トラブルシューティング

よくある問題と対処法

  1. データが送信されない:

    • 設定ファイルの構文エラーを確認(vector validate を使用)
    • ソースが正しくデータを生成しているか確認
    • シンクの接続先が正しいか、認証情報が有効か確認
  2. メモリ使用量が高い:

    • ステートフルなトランスフォーム(reduceaggregate)の使用を確認
    • バッファサイズを適切に設定
    • 不要なデータの早期フィルタリングを検討
  3. VRL のエラー:

    • VRL プレイグラウンドでコードを事前検証
    • 型の不一致がないか確認(to_intto_string などのキャストを適切に使用)
    • エラーメッセージの行番号を確認して該当箇所を修正

公式リファレンス

最新の仕様や全関数リストは公式ドキュメントを参照してください。

まとめ

本記事では、Vectorの実装と運用に必要な知識を解説しました。主要なポイントは以下の通りです。

  • Vectorはシングルバイナリで、依存関係が少なく導入が容易
  • Source → Transform → Sink のパイプライン構造で柔軟なデータ処理が可能
  • VRL による高速かつ安全なデータ変換
  • 適切な設定と監視により、高パフォーマンスな運用が可能

前編と合わせて、Vectorの全体像から実装までを理解していただけたと思います。実際の導入時は、公式ドキュメントやVRLプレイグラウンドを活用しながら、段階的にパイプラインを構築していくことをお勧めします。

1
1
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
1
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?