AWS
Elasticsearch
kibana
Logstash
Kinesis

Logstashのkinesis input pluginとkinesis output plugin


はじめに

Elastic社の提供するETLツールLogstashを利用して、Amazon Kinesis Data Streamsにログデータを出し入れしてみました。

【参考】

Amazon Kinesis Data Streamsとは

AWS kinesisまとめ

※かなり雑に言うとApache KafkaのAWSマネージドサービスです。


利用環境と構成図

product
version

Filebeat
6.6.0

Logstash
6.6.0

Elasticsearch
6.6.0

Kibana
6.6.0

OS(EC2)
Amazon Linux2

※Elastic Stackは現行最新版の6.6.0を利用しています。

※AWSリージョンはバージニア(us-east-1)を利用しています。

敢えて「Logstash → Kinesis Data Streams」と「Kinesis Data Streams → Logstash」の両方向を実施しています。

image.png


  • LogstashからKinesis Data Streamsにデータを置くときはkinesis output pluginを利用します。

  • LogstashでKinesis Data Streamsからデータを取り出すときはkinesis input pluginを利用します。


前提

Logstash_put-kinesisマシンのJava、Logstash、Filebeatのインストールは省略しています。

またLogstash_get-kinesisマシンのJava、Logstash、Elasticsearch、Kibanaのインストールも省略しています。

上記環境が揃っていることを前提としています。


Logstash kinesis output pluginとは

Logstashの取り込んだログデータの出力先として、Amazon Kinesis Data Streamsを指定することが出来ます。

上記機能を実現するのがkinesis output pluginになります。

このPluginはElastic社公式のPluginではなく、Community Pluginになります。

設定項目
デフォルト値
説明

stream_name
- (必須)
kinesisストリーム名を指定します。

region
us-east-1 (任意)
Kinesis Data StreamsのAWSリージョンを指定します。

metrics_level
cloudwatch (任意)
メトリック統計情報のCloudWatchへの送信有無を指定します。

access_key
- (任意)
Kinesisに書き込み権限のあるIAMアカウントで作成したアクセスキーを指定します。

secret_key
- (任意)
アクセスキーを利用する場合の対になるシークレットキーを指定します。

metrics_access_key
- (任意)
CloudWatchに書き込み権限のあるIAMアカウントで作成したアクセスキーを指定します。

metrics_secret_key
- (任意)
CloudWatchのアクセスキーを利用する場合の対になるシークレットキーを指定します。

role_arn
- (任意)
AssumeRoleを利用する場合のARNを指定します。

metrics_role_arn
- (任意)
CloudWatchへの書き込みでAssumeRoleを利用する場合のARNを指定します。

event_partition_keys
- (任意)
各レコードに挿入するパーティションキーを指定します。

randomized_partition_key
false (任意)
データ送信順序を無視したランダムなパーティションキーの利用有無を指定します。

aggregation_enabled
true (任意)
Kinesisに投入するレコードを集約して送るか指定します。

max_pending_records
1000 (任意)
Kinesis高負荷時のLogstash側でのバッファリングするレコード数を指定します。


Logstash kinesis input pluginとは

Logstashがログデータの取得先として、Amazon Kinesis Data Streamsを指定することが出来ます。

上記機能を実現するのがkinesis input pluginになります。

こちらのPluginはElastic社公式のPluginになりますが、サポート対象外です(´;ω;`)

設定項目
デフォルト値
説明

application_name
logstash (任意)
dynamodb調整テーブルに使用されるアプリケーション名を指定します。

checkpoint_interval_seconds
60 (任意)
dynamodbをチェックにしに行くインターバル(秒)を指定します。

kinesis_stream_name
- (必須)
kinesisストリーム名を指定します。

metrics
nil (任意)
メトリック統計情報のCloudWatchへの送信有無を指定します。

profile
- (任意)
AWS認証情報を参照する場合のファイルパスを指定します。

region
us-east-1 (任意)
Kinesis Data StreamsのAWSリージョンを指定します。

role_arn
- (任意)
AssumeRoleを利用する場合のARNを指定します。

role_session_name
- (任意)
IAMロールを引き受けるときに使用するセッション名を指定します。


設定手順


  1. Kinesis Streamsの作成

  2. IAM Roleの作成

  3. Pluginインストール

  4. logstash.confの設定


1. Kinesis Streamsの作成


  • AWSマネージメントコンソールからAmazon Kinesisを選択します。[今すぐ始める]をクリックします。

    image.png


  • Kinesis Data Streamsを選択して、[データストリームの作成]をクリックします。

    image.png


  • [Kinesisストリームの名称]はkinesis_pocとします。


  • シャード数はサイジング要件に応じてセットすれば良いですが、今回は1とします。


  • 下部の[Kinesisストリームの作成]をクリックします。

    image.png


※Kinesis Data Streamのサイジングや動きに関して、以下の記事が良くまとまっています。

【参考】

AWS Kinesis Stream を大規模データで検証してわかったことの事例紹介


2. IAMロールの作成


  • Logstash_put-kinesisのEC2インスタンスに割り当てるIAM Roleに必要なIAM Policy


kinesis_put_policy

{

"Version": "2012-10-17",
"Statement": [
{
"Sid": "VisualEditor0",
"Effect": "Allow",
"Action": [
"kinesis:PutRecord",
"kinesis:PutRecords",
"kinesis:DescribeStream"
],
"Resource": "arn:aws:kinesis:us-east-1:<AWSアカウント>:stream/kinesis_poc"
},
{
"Sid": "VisualEditor1",
"Effect": "Allow",
"Action": [
"cloudwatch:PutMetricData"
],
"Resource": "*"
}
]
}


  • Logstash_get-kinesisのEC2インスタンスに割り当てるIAM Roleに必要なIAM Policy


kinesis_get_policy

{

"Version": "2012-10-17",
"Statement": [
{
"Sid": "VisualEditor0",
"Effect": "Allow",
"Action": [
"kinesis:GetShardIterator",
"kinesis:GetRecords",
"kinesis:DescribeStream",
"kinesis:ListShards"
],
"Resource": "arn:aws:kinesis:us-east-1:<AWSアカウント>:stream/kinesis_poc"
},
{
"Sid": "VisualEditor1",
"Effect": "Allow",
"Action": [
"dynamodb:CreateTable",
"dynamodb:DescribeTable",
"dynamodb:Scan",
"dynamodb:PutItem",
"dynamodb:UpdateItem",
"dynamodb:GetItem",
"dynamodb:DeleteItem",
"cloudwatch:PutMetricData"
],
"Resource": "*"
}
]
}

IAM ポリシーとユーザーの作成に必要なIAM Policyが記載されていますが、これでは足りませんでした。

権限が不足している場合は、/var/log/messagesに以下のようなエラーが出力されますので、見逃さないように!!

※以下の例はdynamodb:DescribeTableが不足しています。


/var/log/messages

Feb 13 16:01:39 ip-172-31-1-190 logstash: Caused by: com.amazonaws.services.dynamodbv2.model.AmazonDynamoDBException: User: arn:aws:sts::<AWSアカウント>:assumed-role/kinesis_get_role/i-03c32b05117b3235a is not authorized to perform: dynamodb:DescribeTable on resource: arn:aws:dynamodb:us-east-1:<AWSアカウント>:table/logstash (Service: AmazonDynamoDBv2; Status Code: 400; Error Code: AccessDeniedException; Request ID: BQRBG0RSE187CLGTP802ONS5BFVV4KQNSO5AEMVJF66Q9ASUAAJG)



3. Pluginインストール

標準でインストールされているPluginは/usr/share/logstash/Gemfileを参照することで把握することが出来ます。

※ちなみに今回対象の2つのPluginはいずれも標準では未インストールです。追加でインストールしましょう。

[root@ip-172-31-1-190 ~]# cat /usr/share/logstash/Gemfile

# This is a Logstash generated Gemfile.
# If you modify this file manually all comments and formatting will be lost.

source "https://rubygems.org"
gem "logstash-core", :path => "./logstash-core"
gem "logstash-core-plugin-api", :path => "./logstash-core-plugin-api"
gem "paquet", "~> 0.2.0"
gem "ruby-progressbar", "~> 1.8.1"
gem "builder", "~> 3.2.2"
gem "ci_reporter_rspec", "1.0.0", :group => :development
gem "tins", "1.6", :group => :development
gem "rspec", "~> 3.5", :group => :development
gem "logstash-devutils", "= 1.3.5", :group => :development
gem "benchmark-ips", :group => :development
gem "octokit", "3.8.0", :group => :build
gem "stud", "~> 0.0.22", :group => :build
gem "fpm", "~> 1.3.3", :group => :build
gem "rubyzip", "~> 1.2.1", :group => :build
gem "gems", "~> 0.8.3", :group => :build
gem "rack-test", :require => "rack/test", :group => :development
gem "flores", "~> 0.0.6", :group => :development
gem "term-ansicolor", "~> 1.3.2", :group => :development
gem "json-schema", "~> 2.6", :group => :development
gem "belzebuth", :group => :development
gem "pleaserun", "~>0.0.28"
gem "webrick", "~> 1.3.1"
gem "atomic", "<= 1.1.99"
gem "rake", "~> 12.2.1", :group => :build
gem "logstash-codec-cef"
gem "logstash-codec-collectd"
gem "logstash-codec-dots"
gem "logstash-codec-edn"
gem "logstash-codec-edn_lines"
gem "logstash-codec-es_bulk"
gem "logstash-codec-fluent"
gem "logstash-codec-graphite"
gem "logstash-codec-json"
gem "logstash-codec-json_lines"
gem "logstash-codec-line"
gem "logstash-codec-msgpack"
gem "logstash-codec-multiline"
gem "logstash-codec-netflow", ">=3.14.1", "<4.0.0"
gem "logstash-codec-plain"
gem "logstash-codec-rubydebug"
gem "logstash-filter-aggregate"
gem "logstash-filter-anonymize"
gem "logstash-filter-cidr"
gem "logstash-filter-clone"
gem "logstash-filter-csv"
gem "logstash-filter-date"
gem "logstash-filter-de_dot"
gem "logstash-filter-dissect"
gem "logstash-filter-dns"
gem "logstash-filter-drop"
gem "logstash-filter-elasticsearch"
gem "logstash-filter-fingerprint"
gem "logstash-filter-geoip"
gem "logstash-filter-grok"
gem "logstash-filter-http"
gem "logstash-filter-jdbc_static"
gem "logstash-filter-jdbc_streaming"
gem "logstash-filter-json"
gem "logstash-filter-kv"
gem "logstash-filter-memcached"
gem "logstash-filter-metrics"
gem "logstash-filter-mutate"
gem "logstash-filter-ruby"
gem "logstash-filter-sleep"
gem "logstash-filter-split"
gem "logstash-filter-syslog_pri"
gem "logstash-filter-throttle"
gem "logstash-filter-translate"
gem "logstash-filter-truncate"
gem "logstash-filter-urldecode"
gem "logstash-filter-useragent"
gem "logstash-filter-xml"
gem "logstash-input-beats"
gem "logstash-input-azure_event_hubs"
gem "logstash-input-dead_letter_queue"
gem "logstash-input-elasticsearch"
gem "logstash-input-exec"
gem "logstash-input-file"
gem "logstash-input-ganglia"
gem "logstash-input-gelf"
gem "logstash-input-generator"
gem "logstash-input-graphite"
gem "logstash-input-heartbeat"
gem "logstash-input-http"
gem "logstash-input-http_poller"
gem "logstash-input-imap"
gem "logstash-input-jdbc"
gem "logstash-input-kafka"
gem "logstash-input-pipe"
gem "logstash-input-rabbitmq"
gem "logstash-input-redis"
gem "logstash-input-s3"
gem "logstash-input-snmp"
gem "logstash-input-snmptrap"
gem "logstash-input-sqs"
gem "logstash-input-stdin"
gem "logstash-input-syslog"
gem "logstash-input-tcp"
gem "logstash-input-twitter"
gem "logstash-input-udp"
gem "logstash-input-unix"
gem "logstash-output-elastic_app_search"
gem "logstash-output-cloudwatch"
gem "logstash-output-csv"
gem "logstash-output-elasticsearch"
gem "logstash-output-email"
gem "logstash-output-file"
gem "logstash-output-graphite"
gem "logstash-output-http"
gem "logstash-output-kafka"
gem "logstash-output-lumberjack"
gem "logstash-output-nagios"
gem "logstash-output-null"
gem "logstash-output-pagerduty"
gem "logstash-output-pipe"
gem "logstash-output-rabbitmq"
gem "logstash-output-redis"
gem "logstash-output-s3", ">=4.0.9", "<5.0.0"
gem "logstash-output-sns"
gem "logstash-output-sqs"
gem "logstash-output-stdout"
gem "logstash-output-tcp"
gem "logstash-output-udp"
gem "logstash-output-webhdfs"


  • Logstash_put-kinesisにはkinesis output pluginをインストールします。

[root@ip-172-31-1-235 ~]# /usr/share/logstash/bin/logstash-plugin install logstash-output-kinesis

Validating logstash-output-kinesis
Installing logstash-output-kinesis
Installation successful


  • Logstash_get-kinesisにはkinesis input pluginをインストールします。

[root@ip-172-31-9-35 ~]# /usr/share/logstash/bin/logstash-plugin install logstash-input-kinesis

Validating logstash-input-kinesis
Installing logstash-input-kinesis
Installation successful


4. logstash.confの設定


  • Logstash_put-kinesisのlogstash.confを作成します。


logstash.conf

input {

beats {
port => 5044
}
}
output {
kinesis {
stream_name => "kinesis_poc"
max_pending_records => 10000
randomized_partition_key => true
region => "us-east-1"
}
}

※Filebeatからログデータを受信したものをKinesisに流します。

※複数のシャードに分散する時はrandomized_partition_keyを有効化します。


  • Logstash_get-kinesisのlogstash.confを作成します。


logstash

input {

kinesis {
kinesis_stream_name => "kinesis_poc"
application_name => "kinesis_poc"
checkpoint_interval_seconds => 10
metrics => "cloudwatch"
region => "us-east-1"
codec => json {}
}
}
output {
elasticsearch {
hosts => [ "localhost:9200" ]
}
}

※今回は同じOS内にインストール済のElasticsearchにoutputして、Kibanaで可視化出来るようにしています。

codecを指定しないと以下のように全てmessageにvalueとして入ってしまったので、codec => json {}を記載しています。

image.png


実施結果


  • Kinesisデータストリームの[モニタリング]タブでPUTレコードにデータが流れてくるのが確認できれば、Logstash_put-kinesisは成功です。

  • レコードの取得にデータが流れてくるのが確認できれば、Logstash_get-kinesisも成功です。
    image.png
    image.png

公式サイトには、なぜかオプションとして記載がないのですが、githubにはイテレータの指定が2つのみですが、出来ると書いてあります。


まとめ

いかがでしたでしょうか。

これまではAWS上のDataLakeとしてS3 Bucketを利用することが多かった私ですが

DataLakeから大量のログデータを拾い、分析に必要なストアへリアルタイムに投入するには

スケールアウト出来るアーキテクチャが必要になりましたので、この度試してみました!!

意外とLogstashとKinesis Data Streamsの連携に関する情報が少ないので、何かの役に立てれば幸いです^^