LoginSignup
9
2

More than 1 year has passed since last update.

Logstashを並列分散させる仕組み

Last updated at Posted at 2020-03-09

はじめに

これまでAWSの様々なログをS3に溜めて、LogstashのS3 input pluginでログを取り込み
ログを加工後、Amazon Elasticsearch Service (以降、Amazon ES)に格納していました。

S3 input pluginは、Logstashのインスタンス性能が足りなくなった場合に
スケールアウト構成が取れないという課題を抱えていました。

同じ設定のLogstashインスタンスを並列化しても同じログをS3から取得してしまい
Elasticsearchに無駄な負荷をかけてしまうという結果になってしまいました。
(どこまでログ取得したかを記録したoffset値のsincedbを各ノード内で保持してしまうため)

【S3 input pluginのアーキテクチャ】
image.png

そこでlogstash-input-s3-sns-sqsを試してみました!

このPluginは、SNSを使ってS3バケットにログが置かれたことをトリガーにイベントをSQSに通知させて
そのS3のオブジェクトの置かれたパスをメッセージキューとして、Logstashに定期的に取りに行かせることで
Logstashを並列分散させる仕組みになります。
(どこまでログ取得したかをSQSキューで管理することで実現しています)

利用環境

product version
logstash 7.5.2
Java 11.0.6
OS(EC2) Amazon Linux2 (t3.small)
AMI ID ami-0caec5f705dc473eb
Amazon ES 7.1 (latest)
Region us-west-2

【構成図】
image.png

前提条件

  • 今回の検証ではプラグインの単体テストのみで複数のLogstashでのテストまではしていません。
  • サンプルログとして、VPC FlowLogs (カスタムフィールド全部出し)をS3バケットに出力しています。
  • EC2構築やVPC環境構築(デフォルトVPC利用、VPCエンドポイント)の説明は割愛しています。
  • Amazon ESをパブリックアクセスとしているため、認証としてCognitoを挟んでます。
  • コスト観点からオレゴンリージョン (us-west-2)を利用しています。

S3 via SNS/SQS pluginとは

S3バケットにログが置かれたことをトリガーにSNSイベント通知がSQSキューに入り
そのSQSキューメッセージ内に含まれるS3のオブジェクトパスをLogstashのInputとするプラグインになります。

※S3 via SNS/SQS plugin固有の設定項目は以下の通りです。

設定項目 デフォルト値 説明
region - (任意) SQSを利用しているAWSリージョン名を指定します。
queue - (必須) メッセージの取得対象のSQSキュー名を指定します。
queue_owner_aws_account_id - (任意) SQSを利用しているAWSアカウントを指定します。
s3_options_by_bucket - (任意) s3バケットごとに認証情報やコーデックなどを指定します。
s3_default_options - (任意) ruby aws s3::Clientのデフォルトオプションを指定します。
s3_role_session_name logstash (任意) S3::Clientsセッション名を指定します。
delete_on_success false (任意) 処理成功時にS3からログを削除するかどうかを指定します。
include_object_properties :last_modified、:content_type、:metadata (任意) 取得したいS3ファイルのオブジェクトプロパティを指定します。
from_sns true (任意) SNSトピック経由かどうかを指定します。
sqs_skip_delete false (任意) SQSメッセージ処理成功時にメッセージ削除をしないかどうかを指定します。
sqs_delete_on_failure false (任意) SQSメッセージ処理失敗時のメッセージ削除しないかどうかを指定します。
temporary_directory /tmp (任意) S3から取得したログの一時的なダウンロード領域を指定します。
consumer_threads 1 (任意) 起動するSQSリーダーのワーカー数を指定します。
visibility_timeout 5分 (任意) SQSメッセージのデフォルトの可視性タイムアウト値を指定します。
max_processing_time 8000秒 (任意) SQS/S3のメッセージブロックの処理時間のハードリミットを指定します。

※VPCエンドポイント経由でS3アクセスする場合は、s3_default_options => { "endpoint_discovery" => true }とします。
※SNSトピック経由でSQSキューにメッセージ入れる場合は、メッセージ形式が異なるため、from_sns => trueとします。

【参考】
logstash-input-s3_sns_sqs plugin

実施内容

  1. S3バケット作成 (前準備)
  2. SNSトピック作成 (前準備)
  3. SQSキュー作成 (前準備)
  4. VPC FlowLogs設定 (前準備)
  5. S3イベント設定 (前準備)
  6. Cognito ユーザプール作成 (前準備)
  7. Cognito IDプール作成 (前準備)
  8. Amazon ESドメイン作成 (前準備)
  9. IAMロール作成 (前準備)
  10. Logstash設定
  11. Kibanaでの各設定

1. S3バケット作成 (前準備)

  • AWSコンソール画面よりストレージ > S3を選択します。
  • + バケットを作成するをクリックし、下記内容でS3のバケットを作成します。
項目
バケット名 loganalyticsdev-bucket
リージョン 米国西部(オレゴン)
既存のバケットから設定のコピー 空白 (デフォルトのまま)
バージョ二ング 無効 (デフォルトのまま)
サーバアクセスのログ記録 無効 (デフォルトのまま)
タグ 0 タグ (デフォルトのまま)
オブジェクトレベルのログ記録 無効 (デフォルトのまま)
デフォルト暗号化 なし (デフォルトのまま)
オブジェクトのロック 無効 (デフォルトのまま)
CloudWatch リクエストメトリクス 無効 (デフォルトのまま)
パブリックアクセスをすべてブロック オン (デフォルトのまま)
システムのアクセス許可 無効 (デフォルトのまま)

2. SNSトピック作成 (前準備)

  • AWSコンソール画面よりアプリケーション統合 > Simple Notification Serviceを選択します。
  • トピックの作成から、下記内容でSNSのトピックを作成します。
項目
トピック名 loganalyticsdev-topic
表示名-オプション 空白 (デフォルトのまま)
暗号化-オプション 暗号化の無効化 (デフォルトのまま)
アクセスポリシー-オプション 高度な (JSONエディタは下記に記載)
配信再試行ポリシー(HTTP/S)-オプション デフォルトの配信再試行ポリシーの使用 (デフォルトのまま)
配信ステータスのログ記録-オプション (デフォルトのまま)
タグ 0 タグ (デフォルトのまま)

※サブスクリプションの設定は、SQS側から実施します。

  • アクセスポリシーには、S3からのSNSパブリッシュを許可する必要があります。
JSONエディタの内容
{
  "Version": "2008-10-17",
  "Id": "__default_policy_ID",
  "Statement": [
    {
      "Sid": "__default_statement_ID",
      "Effect": "Allow",
      "Principal": {
        "AWS": "*"
      },
      "Action": [
        "SNS:GetTopicAttributes",
        "SNS:SetTopicAttributes",
        "SNS:AddPermission",
        "SNS:RemovePermission",
        "SNS:DeleteTopic",
        "SNS:Subscribe",
        "SNS:ListSubscriptionsByTopic",
        "SNS:Publish",
        "SNS:Receive"
      ],
      "Resource": "arn:aws:sns:us-west-2:<AWSアカウント>:loganalyticsdev-topic",
      "Condition": {
        "StringEquals": {
          "AWS:SourceOwner": "<AWSアカウント>"
        }
      }
    },
    {
      "Sid": "AWSS3AccessAllow",
      "Effect": "Allow",
      "Principal": {
        "Service": "s3.amazonaws.com"
      },
      "Action": "SNS:Publish",
      "Resource": "arn:aws:sns:us-west-2:<AWSアカウント>:loganalyticsdev-topic",
      "Condition": {
        "ArnLike": {
          "aws:SourceArn": "arn:aws:s3:*:*:loganalyticsdev-bucket"
        }
      }
    }
  ]
}

3. SQSキュー作成 (前準備)

  • AWSコンソール画面よりアプリケーション統合 > Simple Queue Serviceを選択します。
  • 今すぐ始めるから、下記内容でSQSのキューを作成します。
項目
キュー名 loganalyticsdev-queue
リージョン 米国西部(オレゴン)
キュータイプ 標準キュー (デフォルトのまま)
デフォルトの可視性タイムアウト 30秒 (デフォルトのまま)
メッセージ保持期間 4日 (デフォルトのまま)
最大メッセージサイズ 256KB (デフォルトのまま)
配信遅延 0秒 (デフォルトのまま)
メッセージ受信待機時間 0秒 (デフォルトのまま)
再処理ポリシーの使用 無効 (デフォルトのまま)
SSEの使用 無効 (デフォルトのまま)

※可視性タイムアウト値はLogstashで処理に掛かる時間を考えてセットしてください。
image.png

  • 作成したキューを指定した状態でキュー操作 > SNSトピックへのキューのサブスクライブをクリックします。
    image.png

  • 下記内容でSNSトピックへのサブスクライブを作成します。

項目
トピックリージョン 米国西部(オレゴン)
トピックの選択 loganalyticsdev-topic
トピックARN arn:aws:sns:us-west-2:xxxxxxxxxxxx:loganalyticsdev-topic

4. VPC FlowLogs設定 (前準備)

  • AWSコンソール画面よりネットワーキングとコンテンツ配信 > VPCを選択します。
  • VPCで該当VPCを選択し、フローログタブ > フローログの作成をクリックします。
項目
フィルタ すべて
Maximum aggregation interval 1 minutes
送信先 S3バケットへの送信
S3バケットARN arn:aws:s3:::loganalyticsdev-bucket
Format Custom format
Format preview ※下記に記載
Log line format ※21項目すべてを選択
Format_previewの内容
${account-id} ${action} ${bytes} ${dstaddr} ${dstport} ${end} ${instance-id} ${interface-id} ${log-status} ${packets} ${pkt-dstaddr} ${pkt-srcaddr} ${protocol} ${srcaddr} ${srcport} ${start} ${subnet-id} ${tcp-flags} ${type} ${version} ${vpc-id}

5. S3イベント設定 (前準備)

  • AWSコンソール画面よりストレージ > S3を選択します。
  • loganalyticsdev-bucketを選択し、プロパティ > イベントをクリックします。
  • +通知の追加から下記内容でS3バケットのイベントを作成します。
項目
名前 loganalyticsdev-event
イベント ☑ すべてのオブジェクト作成イベント
プレフィックス AWSLogs/[AWSアカウント]/vpcflowlogs/
サフィックス .log.gz
送信先 SNSトピック
SNS loganalyticsdev-topic

6. Cognito ユーザプール作成 (前準備)

  • AWSコンソール画面よりセキュリティ、ID、およびコンプライアンス > Cognitoを選択します。
  • ユーザープールの管理 > ユーザプールを作成するから、下記内容でユーザプールを作成します。
項目
プール名 loganalyticsdev_userpool
ユーザプールをどのように作成しますか? デフォルトを確認する
  • ドメイン名をクリックし、任意のCognitoドメインのプレフィックスを入力します。
  • 使用可能かチェックをクリックし、利用可能となれば変更の保存をクリックします。
    image.png

  • Kibanaログイン用のユーザを作成します。
    image.png

  • ユーザ名と仮パスワードを設定して、ユーザの作成をクリックします。
    image.png

7. Cognito IDプール作成 (前準備)

  • AWSコンソール画面よりセキュリティ、ID、およびコンプライアンス > Cognitoを選択します。
  • IPプールの管理 > 新しいIDプールの作成から、下記内容でIDプールを作成します。
項目
ID プール名 loganalyticsdev_idpool
認証されていない ID に対してアクセスを有効にする 有効
基本 (クラシック) フローを許可する 無効 (デフォルトのまま)
IAMロール(認証成功) 新しいIAMロールの作成 (デフォルトのまま)
ロール名(認証成功) Congnito_loganalyticsdev_idpoolAuth_Role (デフォルトのまま)
IAMロール(認証失敗) 新しいIAMロールの作成 (デフォルトのまま)
ロール名(認証失敗) Congnito_loganalyticsdev_idpoolUnauth_Role (デフォルトのまま)

8. Amazon ESドメイン作成 (前準備)

  • AWSコンソール画面より分析 > Elasticsearch Serviceを選択します。
  • 新しいドメインの作成から、下記内容でAmazon ESのドメインを作成します。
項目
リージョン us-west-2
デプロイタイプ 開発およびテスト
Elasticsearchのバージョン 7.1 (latest)
Elasticsearchのドメイン loganalyticsdev-es
インスタンスタイプ t2.small.elasticsearch
ノードの数 1
データノードのストレージタイプ EBS
EBS ボリュームタイプ 汎用(SSD)
EBS ボリュームサイズ 30 GiB
専用マスターノード 無効 (デフォルトのまま)
自動スナップショットの開始時間 00:00 UTC (デフォルト)
ネットワークアクセス パブリックアクセス
細かいアクセスコントロールを有効化 無効 (デフォルトのまま)
Amazon Cognito認証を有効化 有効
リージョン 米国西部(オレゴン)
Cognito ユーザプール loganalyticsdev_userpool
Cognito IDプール loganalyticsdev_idpool
IAMロール名 CognitoAccessForAmazonES
ロールポリシー AmazonESCognitoAccess
ドメインアクセスポリシー カスタムアクセスポリシー (IPv4アドレス)
ドメインへのすべてのトラフィックにHTTPSを要求 有効 (デフォルトのまま)
ノード間の暗号化 無効 (デフォルトのまま)
保管時のデータの暗号化の有効化 無効 (デフォルトのまま)
ドメインアクセスポリシー
{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Effect": "Allow",
      "Principal": {
        "AWS": "arn:aws:iam::<AWSアカウント>:role/Cognito_loganalyticsdev_idpoolAuth_Role"
      },
      "Action": "es:*",
      "Resource": "arn:aws:es:us-west-2:<AWSアカウント>:domain/loganalyticsdev-es/*"
    },
    {
      "Effect": "Allow",
      "Principal": {
        "AWS": "*"
      },
      "Action": "es:*",
      "Resource": "arn:aws:es:us-west-2:<AWSアカウント>:domain/loganalyticsdev-es/*",
      "Condition": {
        "IpAddress": {
          "aws:SourceIp": "<LogstashのElasitcIP>/32"
        }
      }
    }
  ]
}

9. IAMロール作成 (前準備)

  • AWSコンソール画面よりセキュリティ、ID、およびコンプライアンス > IAMを選択します。
  • ロール > ロールの作成から、下記内容でLogstash用のEC2に割り当てるIAMロールを作成します。
項目
信頼されたエンティティの種類を選択 AWSサービス
ユースケースの選択 EC2
Attachアクセス権限ポリシー AmazonSQSFullAccess、AmazonS3ReadOnlyAccess
アクセス権限の境界の設定 アクセス権限の境界を設定せずにroleを作成する (デフォルトのまま)
ロール名 logstash-input-role
ロールの説明 Allows EC2 instances to call AWS services on your behalf.

10. Logstash設定

  • ここまでひたすら前準備でしたが、LogtashをEC2 Amazon Linux2 (t3.small)で構築します。
  • logstashの追加プラグインをインストールします。(logstash-input-s3-sns-sqsを入れます)
plugin_install
$ sudo /usr/share/logstash/bin/logstash-plugin install logstash-input-s3-sns-sqs
Validating logstash-input-s3-sns-sqs
Installing logstash-input-s3-sns-sqs
Installation successful
  • 下記のパイプライン構成ファイル(/etc/logstash/conf.d/logstash.conf)の設定を行います。
    (このタイミングではLogstashは起動しません。)
logstash.conf
input {
  ### ログ取得元のS3バケット情報を持つSQSキューを指定
  s3snssqs {
    region => "us-west-2"
    queue => "loganalyticsdev-queue"
    s3_default_options => { "endpoint_discovery" => true }
    consumer_threads => 2
    type => "vpcflowlogs"
    tags => "vpcflowlogs"
  }
}

filter {
  if "OK" in [message] {
    ### dissect filterでフィールドを抽出
    dissect {
      mapping => {
        "message" => "%{account-id} %{action} %{bytes} %{dstaddr} %{dstport} %{end} %{instance-id} %{interface-id} %{log-status} %{packets} %{pkt-dstaddr} %{pkt-srcaddr} %{protocol} %{srcaddr} %{srcport} %{start} %{subnet-id} %{tcp-flags} %{type2} %{version} %{vpc-id}"
      }
    }
    ### dateフィールドから@timestampを抽出
    date {
      match => [ "start","UNIX" ]
      target => "@timestamp"
    }
    date {
      match => [ "start","UNIX" ]
      target => "start_time"
    }
    date {
      match => [ "end","UNIX" ]
      target => "end_time"
    }
    ### @timstampから日本時間を抽出
    ruby {
      code => "event.set('[@metadata][local_time]',event.get('[@timestamp]').time.localtime.strftime('%Y-%m-%d'))"
    }
    ### グローバルIPから国などをマッピングするために指定
    geoip {
      source => "pkt-srcaddr"
      target => "src_geoip"
      tag_on_failure => "src_geoip_lookup_failure"
    }
    geoip {
      source => "pkt-dstaddr"
      target => "dst_geoip"
      tag_on_failure => "dst_geoip_lookup_failure"
    }
    ### document_idに利用する一意のIDを作成
    fingerprint {
      source => "message"
      target => "[@metadata][fingerprint]"
      method => "MURMUR3"
    }
    mutate {
      ### 不要なフィールドを削除
      remove_field => [ "start", "end" ]
    }
  }
}

output {
  if [log-status] == "OK" {
    ### 出力先のAmazonESのIndexを指定
    elasticsearch {
      hosts => [ "https://search-loganalyticsdev-es-xxxxxxxxxxxxxxxxxxx.us-west-2.es.amazonaws.com:443" ]
      index => "%{type}-%{[@metadata][local_time]}"
      document_id => "%{[@metadata][fingerprint]}"
      ilm_enabled => false
    }
  }
}

【2020/4/21追記】
ilm_enabled => falseは、Amazon ESのバージョン7以降を利用する場合は、記載しないとエラーになるので注意してください。
※ Logstash 7.0.0でILM(Index Lifecycle Management)の自動検出機能を追加され、これが悪さをするようです。

【参考】
S3 via SNS/SQS input
dissect filter
date filter
ruby filter
geoip filter
fingerprint filter
mutate filter
elasticsearch output

11. Kibanaでの各設定

  • Amazon ESのKibanaのURLをクリックします。
    image.png

  • 作成したCognitoのユーザでログインします。
    image.png

  • Dev ToolsのConsoleからVPCFlowLogsのIndex Templateを追加します。
    image.png

  • 上記で張り付けたIndex Templateは以下の通りです。

index_templete_vpcflowlogs
PUT _template/vpcflowlogs
{
  "index_patterns": ["vpcflowlogs-*"],
  "settings": {
    "number_of_shards": 1,
    "number_of_replicas" : 1
  },
  "mappings": {
    "dynamic_templates" : [
      {
        "message_field" : {
          "path_match" : "message",
          "mapping" : {
            "norms" : false,
            "type" : "text"
          },
          "match_mapping_type" : "string"
        }
      },
      {
        "string_fields" : {
          "mapping" : {
            "norms" : false,
            "type" : "text",
            "fields" : {
              "keyword" : {
                "ignore_above" : 256,
                "type" : "keyword"
              }
            }
          },
          "match_mapping_type" : "string",
          "match" : "*"
        }
      }
    ],
    "properties": {
      "@timestamp": {
        "type": "date"
      },
      "@version" : {
        "type" : "keyword"
      },
      "account-id" : {
        "type" : "keyword"
      },
      "action" : {
        "type" : "keyword"
      },
      "bytes" : {
        "type" : "float"
      },
      "dst_geoip" : {
        "properties" : {
          "city_name" : {
            "type" : "keyword"
          },
          "continent_code" : {
            "type" : "keyword"
          },
          "country_code2" : {
            "type" : "keyword"
          },
          "country_code3" : {
            "type" : "keyword"
          },
          "country_name" : {
            "type" : "keyword"
          },
          "ip" : {
            "type" : "ip"
          },
          "latitude" : {
            "type" : "float"
          },
          "location" : {
            "properties" : {
              "lat" : {
                "type" : "float"
              },
              "lon" : {
                "type" : "float"
              }
            }
          },
          "longitude" : {
            "type" : "float"
          },
          "postal_code" : {
            "type" : "keyword"
          },
          "region_code" : {
            "type" : "keyword"
          },
          "region_name" : {
            "type" : "keyword"
          },
          "timezone" : {
            "type" : "keyword"
          }
        }
      },
      "dstaddr" : {
        "type" : "ip"
      },
      "dstport" : {
        "type" : "keyword"
      },
      "end_time" : {
        "type" : "date"
      },
      "interface-id" : {
        "type" : "keyword"
      },
      "log-status" : {
        "type" : "keyword"
      },
      "message" : {
        "type" : "text"
      },
      "packets" : {
        "type" : "float"
      },
      "protocol" : {
        "type" : "keyword"
      },
      "src_geoip" : {
        "properties" : {
          "city_name" : {
            "type" : "keyword"
          },
          "continent_code" : {
            "type" : "keyword"
          },
          "country_code2" : {
            "type" : "keyword"
          },
          "country_code3" : {
            "type" : "keyword"
          },
          "country_name" : {
            "type" : "keyword"
          },
          "dma_code" : {
            "type" : "long"
          },
          "ip" : {
            "type" : "ip"
          },
          "latitude" : {
            "type" : "float"
          },
          "location" : {
            "properties" : {
              "lat" : {
                "type" : "float"
              },
              "lon" : {
                "type" : "float"
              }
            }
          },
          "longitude" : {
            "type" : "float"
          },
          "postal_code" : {
            "type" : "keyword"
          },
          "region_code" : {
            "type" : "keyword"
          },
          "region_name" : {
            "type" : "keyword"
          },
          "timezone" : {
            "type" : "keyword"
          }
        }
      },
      "srcaddr" : {
        "type" : "ip"
      },
      "srcport" : {
        "type" : "keyword"
      },
      "start_time" : {
        "type" : "date"
      },
      "instance-id" : {
        "type" : "keyword"
      },
      "pkt-dstaddr" : {
        "type" : "ip"
      },
      "pkt-srcaddr" : {
        "type" : "ip"
      },
      "subnet-id" : {
        "type" : "keyword"
      },
      "tcp-flags" : {
        "type" : "keyword"
      },
      "type2" : {
        "type" : "keyword"
      },
      "vpc-id" : {
        "type" : "keyword"
      },      
      "tags" : {
        "type" : "keyword"
      },
      "version" : {
        "type" : "keyword"
      }
    }
  }
}
  • Logstashを起動します。
logstash_start
$ sudo systemctl start logstash
$ sudo systemctl status logstash
● logstash.service - logstash
   Loaded: loaded (/etc/systemd/system/logstash.service; disabled; vendor preset: disabled)
   Active: active (running) since Sun 2020-03-08 23:35:48 JST; 5h 12min ago
 Main PID: 4642 (java)
   CGroup: /system.slice/logstash.service
           └─4642 /bin/java -Xms1g -Xmx1g -XX:+UseConcMarkSweepGC -XX:CMSInitiatingOccupancyFraction=75 -XX:+UseCMSInitiatingOccupancyOnly -Djava.awt.headless=true -Dfile.encoding=UT...

Mar 09 04:41:14 ip-172-31-25-250.us-west-2.compute.internal logstash[4642]: [2020-03-09T04:41:14,118][DEBUG][logstash.inputs.s3snssqs ][main] We found a record {:record=>{"ev...JYJUKFIO
Mar 09 04:41:14 ip-172-31-25-250.us-west-2.compute.internal logstash[4642]: [2020-03-09T04:41:14,118][DEBUG][logstash.inputs.s3snssqs ][main] record is valid
Mar 09 04:46:14 ip-172-31-25-250.us-west-2.compute.internal logstash[4642]: [2020-03-09T04:46:14,026][DEBUG][logstash.inputs.s3snssqs ][main] Inside Preprocess: Start {:event...cvs5pltB
Mar 09 04:46:14 ip-172-31-25-250.us-west-2.compute.internal logstash[4642]: [2020-03-09T04:46:14,028][DEBUG][logstash.inputs.s3snssqs ][main] Payload in Preprocess:  {:payloa...ncipalId
Mar 09 04:46:14 ip-172-31-25-250.us-west-2.compute.internal logstash[4642]: [2020-03-09T04:46:14,028][DEBUG][logstash.inputs.s3snssqs ][main] We found a record {:record=>{"ev...JYJUKFIO
Mar 09 04:46:14 ip-172-31-25-250.us-west-2.compute.internal logstash[4642]: [2020-03-09T04:46:14,029][DEBUG][logstash.inputs.s3snssqs ][main] record is valid
Mar 09 04:46:14 ip-172-31-25-250.us-west-2.compute.internal logstash[4642]: [2020-03-09T04:46:14,918][DEBUG][logstash.inputs.s3snssqs ][main] Inside Preprocess: Start {:event...iMh/qxG/
Mar 09 04:46:14 ip-172-31-25-250.us-west-2.compute.internal logstash[4642]: [2020-03-09T04:46:14,922][DEBUG][logstash.inputs.s3snssqs ][main] Payload in Preprocess:  {:payloa...ncipalId
Mar 09 04:46:14 ip-172-31-25-250.us-west-2.compute.internal logstash[4642]: [2020-03-09T04:46:14,926][DEBUG][logstash.inputs.s3snssqs ][main] We found a record {:record=>{"ev...JYJUKFIO
Mar 09 04:46:14 ip-172-31-25-250.us-west-2.compute.internal logstash[4642]: [2020-03-09T04:46:14,928][DEBUG][logstash.inputs.s3snssqs ][main] record is valid
Hint: Some lines were ellipsized, use -l to show in full.
  • Kibanaの [Management] > [Index Patterns]でCreate index patternをクリックします。

  • vpcflowlogs-*という名前でIndex Patternを作成します。
    image.png

  • @timestampを指定します。
    image.png

  • [Discover]を開き、Index Patternにvpcflowlogs-*を指定します。

  • デフォルトでは直近15分間のログが表示されます。表示されていればOKです。
    image.png

  • 取り込んだログは以下のような感じです。

vpcflowlogs_log
{
  "_index": "vpcflowlogs-2020-03-09",
  "_type": "_doc",
  "_id": "530514308",
  "_version": 1,
  "_score": null,
  "_source": {
    "start_time": "2020-03-08T15:06:54.000Z",
    "type2": "IPv4",
    "tags": [
      "vpcflowlogs",
      "src_geoip_lookup_failure"
    ],
    "version": "3",
    "src_geoip": {},
    "pkt-dstaddr": "54.187.220.203",
    "subnet-id": "subnet-8e89b8ea",
    "@timestamp": "2020-03-08T15:06:54.000Z",
    "dstport": "443",
    "instance-id": "i-0b9ea91a926b337a1",
    "bytes": "385388",
    "vpc-id": "vpc-46131d22\n",
    "@version": "1",
    "dst_geoip": {
      "city_name": "Boardman",
      "ip": "54.187.220.203",
      "region_name": "Oregon",
      "latitude": 45.8491,
      "longitude": -119.7143,
      "country_name": "United States",
      "country_code3": "US",
      "dma_code": 810,
      "location": {
        "lon": -119.7143,
        "lat": 45.8491
      },
      "postal_code": "97818",
      "region_code": "OR",
      "continent_code": "NA",
      "country_code2": "US",
      "timezone": "America/Los_Angeles"
    },
    "account-id": "<AWSアカウント>",
    "dstaddr": "54.187.220.203",
    "packets": "279",
    "protocol": "6",
    "type": "vpcflowlogs",
    "srcport": "51236",
    "log-status": "OK",
    "end_time": "2020-03-08T15:07:06.000Z",
    "tcp-flags": "0",
    "srcaddr": "172.31.25.250",
    "interface-id": "eni-07d6dcb15ec1cb2b9",
    "pkt-srcaddr": "172.31.25.250",
    "message": "<AWSアカウント> ACCEPT 385388 54.187.220.203 443 1583680026 i-0b9ea91a926b337a1 eni-07d6dcb15ec1cb2b9 OK 279 54.187.220.203 172.31.25.250 6 172.31.25.250 51236 1583680014 subnet-8e89b8ea 0 IPv4 3 vpc-46131d22\n",
    "action": "ACCEPT"
  },
  "fields": {
    "end_time": [
      "2020-03-08T15:07:06.000Z"
    ],
    "start_time": [
      "2020-03-08T15:06:54.000Z"
    ],
    "@timestamp": [
      "2020-03-08T15:06:54.000Z"
    ]
  },
  "sort": [
    1583680014000
  ]
}

まとめ

いかがでしたでしょうか。
長文になってしまい申し訳ありませんm(_ _)m
ポイントは、10. Logstash設定のconfの内容です。

この設定では、LogstashのETL処理が完了するとSQSキューにあるメッセージは削除されます。
S3バケットに蓄積されているVPCFlowLogsのログは削除されずにそのままとなるようにしています。
SNSトピック経由でのトリガーは1回しか通知されませんので、S3はそのままで大丈夫です。
再取り込みが必要になった場合は再取り込み用のLogstashを作り、直接S3からログを取得すればOKです。

あとは、LogstashをEC2ではなくFargateで展開すれば、サーバレスですね^^

掛かったコストは、日次で0.85UDSでした。
image.png

9
2
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
9
2