はじめに
これまでAWSの様々なログをS3に溜めて、Logstashの**S3 input plugin**でログを取り込み
ログを加工後、Amazon Elasticsearch Service (以降、Amazon ES)に格納していました。
S3 input pluginは、Logstashのインスタンス性能が足りなくなった場合に
スケールアウト構成が取れないという課題を抱えていました。
同じ設定のLogstashインスタンスを並列化しても同じログをS3から取得してしまい
Elasticsearchに無駄な負荷をかけてしまうという結果になってしまいました。
(どこまでログ取得したかを記録したoffset値のsincedbを各ノード内で保持してしまうため)
そこで**logstash-input-s3-sns-sqs**を試してみました!
このPluginは、SNSを使ってS3バケットにログが置かれたことをトリガーにイベントをSQSに通知させて
そのS3のオブジェクトの置かれたパスをメッセージキューとして、Logstashに定期的に取りに行かせることで
Logstashを並列分散させる仕組みになります。
(どこまでログ取得したかをSQSキューで管理することで実現しています)
利用環境
product | version |
---|---|
logstash | 7.5.2 |
Java | 11.0.6 |
OS(EC2) | Amazon Linux2 (t3.small) |
AMI ID | ami-0caec5f705dc473eb |
Amazon ES | 7.1 (latest) |
Region | us-west-2 |
前提条件
- 今回の検証ではプラグインの単体テストのみで複数のLogstashでのテストまではしていません。
- サンプルログとして、VPC FlowLogs (カスタムフィールド全部出し)をS3バケットに出力しています。
- EC2構築やVPC環境構築(デフォルトVPC利用、VPCエンドポイント)の説明は割愛しています。
- Amazon ESをパブリックアクセスとしているため、認証としてCognitoを挟んでます。
- コスト観点からオレゴンリージョン (us-west-2)を利用しています。
S3 via SNS/SQS pluginとは
S3バケットにログが置かれたことをトリガーにSNSイベント通知がSQSキューに入り
そのSQSキューメッセージ内に含まれるS3のオブジェクトパスをLogstashのInputとするプラグインになります。
※S3 via SNS/SQS plugin固有の設定項目は以下の通りです。
設定項目 | デフォルト値 | 説明 |
---|---|---|
region | - (任意) | SQSを利用しているAWSリージョン名を指定します。 |
queue | - (必須) | メッセージの取得対象のSQSキュー名を指定します。 |
queue_owner_aws_account_id | - (任意) | SQSを利用しているAWSアカウントを指定します。 |
s3_options_by_bucket | - (任意) | s3バケットごとに認証情報やコーデックなどを指定します。 |
s3_default_options | - (任意) | ruby aws s3::Clientのデフォルトオプションを指定します。 |
s3_role_session_name | logstash (任意) | S3::Clientsセッション名を指定します。 |
delete_on_success | false (任意) | 処理成功時にS3からログを削除するかどうかを指定します。 |
include_object_properties | :last_modified、:content_type、:metadata (任意) | 取得したいS3ファイルのオブジェクトプロパティを指定します。 |
from_sns | true (任意) | SNSトピック経由かどうかを指定します。 |
sqs_skip_delete | false (任意) | SQSメッセージ処理成功時にメッセージ削除をしないかどうかを指定します。 |
sqs_delete_on_failure | false (任意) | SQSメッセージ処理失敗時のメッセージ削除しないかどうかを指定します。 |
temporary_directory | /tmp (任意) | S3から取得したログの一時的なダウンロード領域を指定します。 |
consumer_threads | 1 (任意) | 起動するSQSリーダーのワーカー数を指定します。 |
visibility_timeout | 5分 (任意) | SQSメッセージのデフォルトの可視性タイムアウト値を指定します。 |
max_processing_time | 8000秒 (任意) | SQS/S3のメッセージブロックの処理時間のハードリミットを指定します。 |
※VPCエンドポイント経由でS3アクセスする場合は、s3_default_options => { "endpoint_discovery" => true } とします。 |
||
※SNSトピック経由でSQSキューにメッセージ入れる場合は、メッセージ形式が異なるため、from_sns => true とします。 |
【参考】
・logstash-input-s3_sns_sqs plugin
実施内容
- S3バケット作成 (前準備)
- SNSトピック作成 (前準備)
- SQSキュー作成 (前準備)
- VPC FlowLogs設定 (前準備)
- S3イベント設定 (前準備)
- Cognito ユーザプール作成 (前準備)
- Cognito IDプール作成 (前準備)
- Amazon ESドメイン作成 (前準備)
- IAMロール作成 (前準備)
- Logstash設定
- Kibanaでの各設定
1. S3バケット作成 (前準備)
- AWSコンソール画面より
ストレージ
>S3
を選択します。 -
+ バケットを作成する
をクリックし、下記内容でS3のバケットを作成します。
項目 | 値 |
---|---|
バケット名 | loganalyticsdev-bucket |
リージョン | 米国西部(オレゴン) |
既存のバケットから設定のコピー | 空白 (デフォルトのまま) |
バージョ二ング | 無効 (デフォルトのまま) |
サーバアクセスのログ記録 | 無効 (デフォルトのまま) |
タグ | 0 タグ (デフォルトのまま) |
オブジェクトレベルのログ記録 | 無効 (デフォルトのまま) |
デフォルト暗号化 | なし (デフォルトのまま) |
オブジェクトのロック | 無効 (デフォルトのまま) |
CloudWatch リクエストメトリクス | 無効 (デフォルトのまま) |
パブリックアクセスをすべてブロック | オン (デフォルトのまま) |
システムのアクセス許可 | 無効 (デフォルトのまま) |
2. SNSトピック作成 (前準備)
- AWSコンソール画面より
アプリケーション統合
>Simple Notification Service
を選択します。 -
トピックの作成
から、下記内容でSNSのトピックを作成します。
項目 | 値 |
---|---|
トピック名 | loganalyticsdev-topic |
表示名-オプション | 空白 (デフォルトのまま) |
暗号化-オプション | 暗号化の無効化 (デフォルトのまま) |
アクセスポリシー-オプション | 高度な (JSONエディタは下記に記載) |
配信再試行ポリシー(HTTP/S)-オプション | デフォルトの配信再試行ポリシーの使用 (デフォルトのまま) |
配信ステータスのログ記録-オプション | (デフォルトのまま) |
タグ | 0 タグ (デフォルトのまま) |
※サブスクリプションの設定は、SQS側から実施します。 |
- アクセスポリシーには、S3からのSNSパブリッシュを許可する必要があります。
{
"Version": "2008-10-17",
"Id": "__default_policy_ID",
"Statement": [
{
"Sid": "__default_statement_ID",
"Effect": "Allow",
"Principal": {
"AWS": "*"
},
"Action": [
"SNS:GetTopicAttributes",
"SNS:SetTopicAttributes",
"SNS:AddPermission",
"SNS:RemovePermission",
"SNS:DeleteTopic",
"SNS:Subscribe",
"SNS:ListSubscriptionsByTopic",
"SNS:Publish",
"SNS:Receive"
],
"Resource": "arn:aws:sns:us-west-2:<AWSアカウント>:loganalyticsdev-topic",
"Condition": {
"StringEquals": {
"AWS:SourceOwner": "<AWSアカウント>"
}
}
},
{
"Sid": "AWSS3AccessAllow",
"Effect": "Allow",
"Principal": {
"Service": "s3.amazonaws.com"
},
"Action": "SNS:Publish",
"Resource": "arn:aws:sns:us-west-2:<AWSアカウント>:loganalyticsdev-topic",
"Condition": {
"ArnLike": {
"aws:SourceArn": "arn:aws:s3:*:*:loganalyticsdev-bucket"
}
}
}
]
}
3. SQSキュー作成 (前準備)
- AWSコンソール画面より
アプリケーション統合
>Simple Queue Service
を選択します。 -
今すぐ始める
から、下記内容でSQSのキューを作成します。
項目 | 値 |
---|---|
キュー名 | loganalyticsdev-queue |
リージョン | 米国西部(オレゴン) |
キュータイプ | 標準キュー (デフォルトのまま) |
デフォルトの可視性タイムアウト | 30秒 (デフォルトのまま) |
メッセージ保持期間 | 4日 (デフォルトのまま) |
最大メッセージサイズ | 256KB (デフォルトのまま) |
配信遅延 | 0秒 (デフォルトのまま) |
メッセージ受信待機時間 | 0秒 (デフォルトのまま) |
再処理ポリシーの使用 | 無効 (デフォルトのまま) |
SSEの使用 | 無効 (デフォルトのまま) |
※可視性タイムアウト値はLogstashで処理に掛かる時間を考えてセットしてください。
項目 | 値 |
---|---|
トピックリージョン | 米国西部(オレゴン) |
トピックの選択 | loganalyticsdev-topic |
トピックARN | arn:aws:sns:us-west-2:xxxxxxxxxxxx:loganalyticsdev-topic |
4. VPC FlowLogs設定 (前準備)
- AWSコンソール画面より
ネットワーキングとコンテンツ配信
>VPC
を選択します。 -
VPC
で該当VPCを選択し、フローログ
タブ >フローログの作成
をクリックします。
項目 | 値 |
---|---|
フィルタ | すべて |
Maximum aggregation interval | 1 minutes |
送信先 | S3バケットへの送信 |
S3バケットARN | arn:aws:s3:::loganalyticsdev-bucket |
Format | Custom format |
Format preview | ※下記に記載 |
Log line format | ※21項目すべてを選択 |
${account-id} ${action} ${bytes} ${dstaddr} ${dstport} ${end} ${instance-id} ${interface-id} ${log-status} ${packets} ${pkt-dstaddr} ${pkt-srcaddr} ${protocol} ${srcaddr} ${srcport} ${start} ${subnet-id} ${tcp-flags} ${type} ${version} ${vpc-id}
5. S3イベント設定 (前準備)
- AWSコンソール画面より
ストレージ
>S3
を選択します。 -
loganalyticsdev-bucket
を選択し、プロパティ
>イベント
をクリックします。 -
+通知の追加
から下記内容でS3バケットのイベントを作成します。
項目 | 値 |
---|---|
名前 | loganalyticsdev-event |
イベント | ☑ すべてのオブジェクト作成イベント |
プレフィックス | AWSLogs/[AWSアカウント]/vpcflowlogs/ |
サフィックス | .log.gz |
送信先 | SNSトピック |
SNS | loganalyticsdev-topic |
6. Cognito ユーザプール作成 (前準備)
- AWSコンソール画面より
セキュリティ、ID、およびコンプライアンス
>Cognito
を選択します。 -
ユーザープールの管理
>ユーザプールを作成する
から、下記内容でユーザプールを作成します。
項目 | 値 |
---|---|
プール名 | loganalyticsdev_userpool |
ユーザプールをどのように作成しますか? | デフォルトを確認する |
-
ドメイン名
をクリックし、任意のCognitoドメインのプレフィックスを入力します。
7. Cognito IDプール作成 (前準備)
- AWSコンソール画面より
セキュリティ、ID、およびコンプライアンス
>Cognito
を選択します。 -
IPプールの管理
>新しいIDプールの作成
から、下記内容でIDプールを作成します。
項目 | 値 |
---|---|
ID プール名 | loganalyticsdev_idpool |
認証されていない ID に対してアクセスを有効にする | 有効 |
基本 (クラシック) フローを許可する | 無効 (デフォルトのまま) |
IAMロール(認証成功) | 新しいIAMロールの作成 (デフォルトのまま) |
ロール名(認証成功) | Congnito_loganalyticsdev_idpoolAuth_Role (デフォルトのまま) |
IAMロール(認証失敗) | 新しいIAMロールの作成 (デフォルトのまま) |
ロール名(認証失敗) | Congnito_loganalyticsdev_idpoolUnauth_Role (デフォルトのまま) |
8. Amazon ESドメイン作成 (前準備)
- AWSコンソール画面より
分析
>Elasticsearch Service
を選択します。 -
新しいドメインの作成
から、下記内容でAmazon ESのドメインを作成します。
項目 | 値 |
---|---|
リージョン | us-west-2 |
デプロイタイプ | 開発およびテスト |
Elasticsearchのバージョン | 7.1 (latest) |
Elasticsearchのドメイン | loganalyticsdev-es |
インスタンスタイプ | t2.small.elasticsearch |
ノードの数 | 1 |
データノードのストレージタイプ | EBS |
EBS ボリュームタイプ | 汎用(SSD) |
EBS ボリュームサイズ | 30 GiB |
専用マスターノード | 無効 (デフォルトのまま) |
自動スナップショットの開始時間 | 00:00 UTC (デフォルト) |
ネットワークアクセス | パブリックアクセス |
細かいアクセスコントロールを有効化 | 無効 (デフォルトのまま) |
Amazon Cognito認証を有効化 | 有効 |
リージョン | 米国西部(オレゴン) |
Cognito ユーザプール | loganalyticsdev_userpool |
Cognito IDプール | loganalyticsdev_idpool |
IAMロール名 | CognitoAccessForAmazonES |
ロールポリシー | AmazonESCognitoAccess |
ドメインアクセスポリシー | カスタムアクセスポリシー (IPv4アドレス) |
ドメインへのすべてのトラフィックにHTTPSを要求 | 有効 (デフォルトのまま) |
ノード間の暗号化 | 無効 (デフォルトのまま) |
保管時のデータの暗号化の有効化 | 無効 (デフォルトのまま) |
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Principal": {
"AWS": "arn:aws:iam::<AWSアカウント>:role/Cognito_loganalyticsdev_idpoolAuth_Role"
},
"Action": "es:*",
"Resource": "arn:aws:es:us-west-2:<AWSアカウント>:domain/loganalyticsdev-es/*"
},
{
"Effect": "Allow",
"Principal": {
"AWS": "*"
},
"Action": "es:*",
"Resource": "arn:aws:es:us-west-2:<AWSアカウント>:domain/loganalyticsdev-es/*",
"Condition": {
"IpAddress": {
"aws:SourceIp": "<LogstashのElasitcIP>/32"
}
}
}
]
}
9. IAMロール作成 (前準備)
- AWSコンソール画面より
セキュリティ、ID、およびコンプライアンス
>IAM
を選択します。 -
ロール
>ロールの作成
から、下記内容でLogstash用のEC2に割り当てるIAMロールを作成します。
項目 | 値 |
---|---|
信頼されたエンティティの種類を選択 | AWSサービス |
ユースケースの選択 | EC2 |
Attachアクセス権限ポリシー | AmazonSQSFullAccess、AmazonS3ReadOnlyAccess |
アクセス権限の境界の設定 | アクセス権限の境界を設定せずにroleを作成する (デフォルトのまま) |
ロール名 | logstash-input-role |
ロールの説明 | Allows EC2 instances to call AWS services on your behalf. |
10. Logstash設定
- ここまでひたすら前準備でしたが、LogtashをEC2 Amazon Linux2 (t3.small)で構築します。
- logstashの追加プラグインをインストールします。(
logstash-input-s3-sns-sqs
を入れます)
$ sudo /usr/share/logstash/bin/logstash-plugin install logstash-input-s3-sns-sqs
Validating logstash-input-s3-sns-sqs
Installing logstash-input-s3-sns-sqs
Installation successful
- 下記のパイプライン構成ファイル(
/etc/logstash/conf.d/logstash.conf
)の設定を行います。
(このタイミングではLogstashは起動しません。)
input {
### ログ取得元のS3バケット情報を持つSQSキューを指定
s3snssqs {
region => "us-west-2"
queue => "loganalyticsdev-queue"
s3_default_options => { "endpoint_discovery" => true }
consumer_threads => 2
type => "vpcflowlogs"
tags => "vpcflowlogs"
}
}
filter {
if "OK" in [message] {
### dissect filterでフィールドを抽出
dissect {
mapping => {
"message" => "%{account-id} %{action} %{bytes} %{dstaddr} %{dstport} %{end} %{instance-id} %{interface-id} %{log-status} %{packets} %{pkt-dstaddr} %{pkt-srcaddr} %{protocol} %{srcaddr} %{srcport} %{start} %{subnet-id} %{tcp-flags} %{type2} %{version} %{vpc-id}"
}
}
### dateフィールドから@timestampを抽出
date {
match => [ "start","UNIX" ]
target => "@timestamp"
}
date {
match => [ "start","UNIX" ]
target => "start_time"
}
date {
match => [ "end","UNIX" ]
target => "end_time"
}
### @timstampから日本時間を抽出
ruby {
code => "event.set('[@metadata][local_time]',event.get('[@timestamp]').time.localtime.strftime('%Y-%m-%d'))"
}
### グローバルIPから国などをマッピングするために指定
geoip {
source => "pkt-srcaddr"
target => "src_geoip"
tag_on_failure => "src_geoip_lookup_failure"
}
geoip {
source => "pkt-dstaddr"
target => "dst_geoip"
tag_on_failure => "dst_geoip_lookup_failure"
}
### document_idに利用する一意のIDを作成
fingerprint {
source => "message"
target => "[@metadata][fingerprint]"
method => "MURMUR3"
}
mutate {
### 不要なフィールドを削除
remove_field => [ "start", "end" ]
}
}
}
output {
if [log-status] == "OK" {
### 出力先のAmazonESのIndexを指定
elasticsearch {
hosts => [ "https://search-loganalyticsdev-es-xxxxxxxxxxxxxxxxxxx.us-west-2.es.amazonaws.com:443" ]
index => "%{type}-%{[@metadata][local_time]}"
document_id => "%{[@metadata][fingerprint]}"
ilm_enabled => false
}
}
}
【2020/4/21追記】
※ ilm_enabled => false
は、Amazon ESのバージョン7以降を利用する場合は、記載しないとエラーになるので注意してください。
※ Logstash 7.0.0でILM(Index Lifecycle Management)の自動検出機能を追加され、これが悪さをするようです。
【参考】
・ S3 via SNS/SQS input
・ dissect filter
・ date filter
・ ruby filter
・ geoip filter
・ fingerprint filter
・ mutate filter
・ elasticsearch output
11. Kibanaでの各設定
-
上記で張り付けたIndex Templateは以下の通りです。
PUT _template/vpcflowlogs
{
"index_patterns": ["vpcflowlogs-*"],
"settings": {
"number_of_shards": 1,
"number_of_replicas" : 1
},
"mappings": {
"dynamic_templates" : [
{
"message_field" : {
"path_match" : "message",
"mapping" : {
"norms" : false,
"type" : "text"
},
"match_mapping_type" : "string"
}
},
{
"string_fields" : {
"mapping" : {
"norms" : false,
"type" : "text",
"fields" : {
"keyword" : {
"ignore_above" : 256,
"type" : "keyword"
}
}
},
"match_mapping_type" : "string",
"match" : "*"
}
}
],
"properties": {
"@timestamp": {
"type": "date"
},
"@version" : {
"type" : "keyword"
},
"account-id" : {
"type" : "keyword"
},
"action" : {
"type" : "keyword"
},
"bytes" : {
"type" : "float"
},
"dst_geoip" : {
"properties" : {
"city_name" : {
"type" : "keyword"
},
"continent_code" : {
"type" : "keyword"
},
"country_code2" : {
"type" : "keyword"
},
"country_code3" : {
"type" : "keyword"
},
"country_name" : {
"type" : "keyword"
},
"ip" : {
"type" : "ip"
},
"latitude" : {
"type" : "float"
},
"location" : {
"properties" : {
"lat" : {
"type" : "float"
},
"lon" : {
"type" : "float"
}
}
},
"longitude" : {
"type" : "float"
},
"postal_code" : {
"type" : "keyword"
},
"region_code" : {
"type" : "keyword"
},
"region_name" : {
"type" : "keyword"
},
"timezone" : {
"type" : "keyword"
}
}
},
"dstaddr" : {
"type" : "ip"
},
"dstport" : {
"type" : "keyword"
},
"end_time" : {
"type" : "date"
},
"interface-id" : {
"type" : "keyword"
},
"log-status" : {
"type" : "keyword"
},
"message" : {
"type" : "text"
},
"packets" : {
"type" : "float"
},
"protocol" : {
"type" : "keyword"
},
"src_geoip" : {
"properties" : {
"city_name" : {
"type" : "keyword"
},
"continent_code" : {
"type" : "keyword"
},
"country_code2" : {
"type" : "keyword"
},
"country_code3" : {
"type" : "keyword"
},
"country_name" : {
"type" : "keyword"
},
"dma_code" : {
"type" : "long"
},
"ip" : {
"type" : "ip"
},
"latitude" : {
"type" : "float"
},
"location" : {
"properties" : {
"lat" : {
"type" : "float"
},
"lon" : {
"type" : "float"
}
}
},
"longitude" : {
"type" : "float"
},
"postal_code" : {
"type" : "keyword"
},
"region_code" : {
"type" : "keyword"
},
"region_name" : {
"type" : "keyword"
},
"timezone" : {
"type" : "keyword"
}
}
},
"srcaddr" : {
"type" : "ip"
},
"srcport" : {
"type" : "keyword"
},
"start_time" : {
"type" : "date"
},
"instance-id" : {
"type" : "keyword"
},
"pkt-dstaddr" : {
"type" : "ip"
},
"pkt-srcaddr" : {
"type" : "ip"
},
"subnet-id" : {
"type" : "keyword"
},
"tcp-flags" : {
"type" : "keyword"
},
"type2" : {
"type" : "keyword"
},
"vpc-id" : {
"type" : "keyword"
},
"tags" : {
"type" : "keyword"
},
"version" : {
"type" : "keyword"
}
}
}
}
- Logstashを起動します。
$ sudo systemctl start logstash
$ sudo systemctl status logstash
● logstash.service - logstash
Loaded: loaded (/etc/systemd/system/logstash.service; disabled; vendor preset: disabled)
Active: active (running) since Sun 2020-03-08 23:35:48 JST; 5h 12min ago
Main PID: 4642 (java)
CGroup: /system.slice/logstash.service
└─4642 /bin/java -Xms1g -Xmx1g -XX:+UseConcMarkSweepGC -XX:CMSInitiatingOccupancyFraction=75 -XX:+UseCMSInitiatingOccupancyOnly -Djava.awt.headless=true -Dfile.encoding=UT...
Mar 09 04:41:14 ip-172-31-25-250.us-west-2.compute.internal logstash[4642]: [2020-03-09T04:41:14,118][DEBUG][logstash.inputs.s3snssqs ][main] We found a record {:record=>{"ev...JYJUKFIO
Mar 09 04:41:14 ip-172-31-25-250.us-west-2.compute.internal logstash[4642]: [2020-03-09T04:41:14,118][DEBUG][logstash.inputs.s3snssqs ][main] record is valid
Mar 09 04:46:14 ip-172-31-25-250.us-west-2.compute.internal logstash[4642]: [2020-03-09T04:46:14,026][DEBUG][logstash.inputs.s3snssqs ][main] Inside Preprocess: Start {:event...cvs5pltB
Mar 09 04:46:14 ip-172-31-25-250.us-west-2.compute.internal logstash[4642]: [2020-03-09T04:46:14,028][DEBUG][logstash.inputs.s3snssqs ][main] Payload in Preprocess: {:payloa...ncipalId
Mar 09 04:46:14 ip-172-31-25-250.us-west-2.compute.internal logstash[4642]: [2020-03-09T04:46:14,028][DEBUG][logstash.inputs.s3snssqs ][main] We found a record {:record=>{"ev...JYJUKFIO
Mar 09 04:46:14 ip-172-31-25-250.us-west-2.compute.internal logstash[4642]: [2020-03-09T04:46:14,029][DEBUG][logstash.inputs.s3snssqs ][main] record is valid
Mar 09 04:46:14 ip-172-31-25-250.us-west-2.compute.internal logstash[4642]: [2020-03-09T04:46:14,918][DEBUG][logstash.inputs.s3snssqs ][main] Inside Preprocess: Start {:event...iMh/qxG/
Mar 09 04:46:14 ip-172-31-25-250.us-west-2.compute.internal logstash[4642]: [2020-03-09T04:46:14,922][DEBUG][logstash.inputs.s3snssqs ][main] Payload in Preprocess: {:payloa...ncipalId
Mar 09 04:46:14 ip-172-31-25-250.us-west-2.compute.internal logstash[4642]: [2020-03-09T04:46:14,926][DEBUG][logstash.inputs.s3snssqs ][main] We found a record {:record=>{"ev...JYJUKFIO
Mar 09 04:46:14 ip-172-31-25-250.us-west-2.compute.internal logstash[4642]: [2020-03-09T04:46:14,928][DEBUG][logstash.inputs.s3snssqs ][main] record is valid
Hint: Some lines were ellipsized, use -l to show in full.
-
Kibanaの [Management] > [Index Patterns]で
Create index pattern
をクリックします。 -
[Discover]を開き、Index Patternに
vpcflowlogs-*
を指定します。 -
取り込んだログは以下のような感じです。
{
"_index": "vpcflowlogs-2020-03-09",
"_type": "_doc",
"_id": "530514308",
"_version": 1,
"_score": null,
"_source": {
"start_time": "2020-03-08T15:06:54.000Z",
"type2": "IPv4",
"tags": [
"vpcflowlogs",
"src_geoip_lookup_failure"
],
"version": "3",
"src_geoip": {},
"pkt-dstaddr": "54.187.220.203",
"subnet-id": "subnet-8e89b8ea",
"@timestamp": "2020-03-08T15:06:54.000Z",
"dstport": "443",
"instance-id": "i-0b9ea91a926b337a1",
"bytes": "385388",
"vpc-id": "vpc-46131d22\n",
"@version": "1",
"dst_geoip": {
"city_name": "Boardman",
"ip": "54.187.220.203",
"region_name": "Oregon",
"latitude": 45.8491,
"longitude": -119.7143,
"country_name": "United States",
"country_code3": "US",
"dma_code": 810,
"location": {
"lon": -119.7143,
"lat": 45.8491
},
"postal_code": "97818",
"region_code": "OR",
"continent_code": "NA",
"country_code2": "US",
"timezone": "America/Los_Angeles"
},
"account-id": "<AWSアカウント>",
"dstaddr": "54.187.220.203",
"packets": "279",
"protocol": "6",
"type": "vpcflowlogs",
"srcport": "51236",
"log-status": "OK",
"end_time": "2020-03-08T15:07:06.000Z",
"tcp-flags": "0",
"srcaddr": "172.31.25.250",
"interface-id": "eni-07d6dcb15ec1cb2b9",
"pkt-srcaddr": "172.31.25.250",
"message": "<AWSアカウント> ACCEPT 385388 54.187.220.203 443 1583680026 i-0b9ea91a926b337a1 eni-07d6dcb15ec1cb2b9 OK 279 54.187.220.203 172.31.25.250 6 172.31.25.250 51236 1583680014 subnet-8e89b8ea 0 IPv4 3 vpc-46131d22\n",
"action": "ACCEPT"
},
"fields": {
"end_time": [
"2020-03-08T15:07:06.000Z"
],
"start_time": [
"2020-03-08T15:06:54.000Z"
],
"@timestamp": [
"2020-03-08T15:06:54.000Z"
]
},
"sort": [
1583680014000
]
}
まとめ
いかがでしたでしょうか。
長文になってしまい申し訳ありませんm(_ _)m
ポイントは、10. Logstash設定
のconfの内容です。
この設定では、LogstashのETL処理が完了するとSQSキューにあるメッセージは削除されます。
S3バケットに蓄積されているVPCFlowLogsのログは削除されずにそのままとなるようにしています。
SNSトピック経由でのトリガーは1回しか通知されませんので、S3はそのままで大丈夫です。
再取り込みが必要になった場合は再取り込み用のLogstashを作り、直接S3からログを取得すればOKです。
あとは、LogstashをEC2ではなくFargateで展開すれば、サーバレスですね^^