概要
EC2インスタンスからFluentdを使ってKinesis Data Streamsにログを送信するアーキテクチャを構築します。
fluent-plugin-kinesisでKinesis Streamsにログを送信する
こちらの記事を参考にしました。
基本的な流れは同じですが、簡単な説明だったり情報が古かったりして少し戸惑ったのでまとめました。
今回は一から作成していますが、必要に応じて作成済みのものを利用してください。
環境
作業 OS:macOS Mojave
EC2 OS:Amazon Linux 2 AMI (HVM)
fluentd:1.4.2
td-agent:1.3.3
fluent-plugin-kinesis:3.0.0
ruby:2.6.2
目次
- EC2の作成
- Streamの作成
- EC2にロール付与
(1) StreamのARNを指定して、Kinesisのポリシーを作成
(2) ポリシーを指定して、ロールを作成
(3) EC2にロールを付与する
- EC2にSSH接続してログイン
- Apacheのインストール
- rbenv/Rubyのインストール
- Fluentdのインストール
- プラグインのインストール
- Fluentd設定ファイルを修正する
- 設定ファイルのシンタックスチェックを行う
- Fluentdの起動
- Streamにデータが送信されていることを確認
(1) シャードイテレータを確認
(2) シャードイテレータを指定してレコード取得
(3) デコードして中身を確認
1. EC2の作成
まず、Fluentdを入れるためのAWS EC2を作成します。
AWSマネジメントコンソール > EC2 > インスタンス を開きます。
インスタンス作成ボタンから、任意のOSを選択して作成します。
今回はAmazon Linuxを利用し、設定等はデフォルトで作成しました。
SHH接続の際に使用するキーペアが生成されるので、保存してください。
2. Streamの作成
次に、ログの送信先であるKinesis Data Streamを作成します。
AWSマネジメントコンソール > Kinesis > Data Streamを開きます。
Kibesisストリームの作成ボタンから作成します。
今回はストリーム名をtest、シャード数を1に設定して作成しました。
※シャード…ストリーム内の一意に識別されたデータレコードのシーケンス。ストリームの総容量はシャードの容量の合計。
その後、作成したストリームをクリックし、詳細タブのストリーム ARNの項目を確認しておきます。
これは次のポリシー作成の際に使用します。
3. EC2にロール付与
先ほど作成したストリームのリソースに対するKinesisの権限をEC2に付与します。
(1) 作成したStreamのARNを指定して、Kinesisのポリシーを作成
AWSマネジメントコンソール > IAM > ポリシー を開きます。
ポリシーの作成ボタンから作成します。
ビジュアルエディタの設定で、以下のように設定します。(その他の設定は任意です)
- サービス:Kinesis
- アクション:すべてのKinesisアクション (kinesis:*)
- リソース:stream > ARNの追加 > ARNを手動でリスト > ストリームARNを貼り付ける
ここではポリシーの名前をtest-kinesis-policyとします。
(2) 作成したポリシーを指定して、ロールを作成
AWSマネジメントコンソール > IAM > ロール を開きます。
ポリシーの作成ボタンから作成します。
設定は以下のようにします。(その他の設定は任意です。)
- このロールを使用するサービスを選択:EC2
- Attach アクセス権限ポリシー:test-kinesis-policy
ここではロールの名前をtest-kinesis-roleとします。
(3) EC2に作成したロールを付与する
AWSマネジメントコンソール > EC2 > インスタンス を開きます。
作成したEC2を選択 > アクション > インスタンスの設定 > IAMロールの割り当て/置換 を開きます。
以下を指定して、適用します。
- IAMロール:test-kinesis-role
4. EC2にSSH接続してログイン
ターミナルを開いて、EC2にSSH接続します。
$ ssh -i <key-pair>.pem <ユーザ名>@<パブリックIP or パブリックDNS名>
5. Apacheのインストール
EC2にApatchをインストールして起動します。
$ sudo yum install -y httpd
$ sudo service httpd start
6. rbenv/Rubyのインストール
EC2にrbevnとRubyをインストールします。
Rubyのバージョンは今回は安定版の2.6.2(2019/4/3現在)をインストールしました。
※Fluentdの公式ではRubyのversionは2.1以上をサポートしているとのことです。
参考:Amazon Linuxに rbenv をインストールして、Rubyをバージョンアップ(Bundlerも)
7. Fluentdのインストール
EC2にFluentdをgemでインストールします。
$ gem install fluentd --no-document
参考:
Installing Fluentd Using Ruby Gem | Fluentd
※Fluentdの公式に書かれている--no-ri --no-rdoc
オプションは廃止されました。
以下のコマンドで、エラーなく動くことを確認します。
$ fluentd --setup ./fluent
$ fluentd -c ./fluent/fluent.conf -vv &
$ echo '{"json":"message"}' | fluent-cat debug.test
最後のコマンドは、Fluentdに「debug.test」タグのメッセージ「{"json": "message"}」を送信します。
インストールが成功していた場合、Fluentdは以下のメッセージを出力します。
2019-04-04 xx:xx:xx +xxxx debug.test:{"json":"message"}
8. プラグインのインストール
EC2にfluent-plugin-kinesisをインストールします。
$ sudo td-agent-gem install fluent-plugin-kinesis
OSによってはtd-agent-gemコマンドのパスが通っていない場合があるので、通して実行してください。
参考:Plugin Management | Fluentd
9. Fluentd設定ファイルを修正する
Fluentdの設定を行います。
$ sudo vim /etc/td-agent/td-agent.conf
以下をファイルに追記します。
データソースの定義が <source>タグ、データ処理の定義が <match>タグです。
使い方などはaws-fluent-plugin-kinesis#Getting startedを確認してください。
<source>
@type tail
@id output_log
format apache2
path /var/log/httpd/access_log
pos_file /var/log/td-agent/httpd-access.pos
tag log.httpd.access
</source>
<match log.httpd.*>
@type kinesis_streams
@id input_httpd
stream_name test
region ap-northeast-1
random_partition_key true
use_yajl true
</match>
10. 設定ファイルのシンタックスチェックを行う
$ td-agent --dry-run -c /etc/td-agent/td-agent.conf
エラーがおきなければOKです。
11. Fluentdの起動
Fluentdを起動します。
まずはフォアグラウンドで実行できることを確認して、その後バックグラウンドで実行します。
フォアグラウンド実行
$ sudo td-agent -c /etc/td-agent/td-agent.conf
バックグラウンド実行
$ sudo service td-agent start
Apacheにリクエストを投げて、Kinesis Streamに送信するログを生成します。
$ curl localhost
12. Streamにデータが送信されていることを確認
(1) シャードイテレータを確認
$ STREAM_NAME=test
$ aws kinesis get-shard-iterator --shard-id shardId-000000000000 --stream-name $STREAM_NAME --shard-iterator-type TRIM_HORIZON
{
"ShardIterator": "AAAAAAAAAAEYTwTh..."
}
(2) シャードイテレータを指定してレコード取得
$ aws kinesis get-records --shard-iterator "AAAAAAAAAAEYTwTh..."
{
"Records": [
{
"Data": "eyJob3N0IjoiMTI3LjAuMC4xIiwidXNlciI6bnVsbCwibWV0aG9kIjoiR0VUIiwicGF0aCI6Ii8iLCJjb2RlIjo0MDMsInNpemUiOjM2MzAsInJlZmVyZXIiOm51bGwsImFnZW50IjoiY3VybC83LjYxLjEifQo=",
"PartitionKey": "8c3e250cc103933ec37f63cec968054c",
"ApproximateArrivalTimestamp": 1554283487.951,
"SequenceNumber": "49594439765182586642277267931515081312153319763399933954"
},
{
"Data": "eyJob3N0IjoiMTI3LjAuMC4xIiwidXNlciI6bnVsbCwibWV0aG9kIjoiR0VUIiwicGF0aCI6Ii8iLCJjb2RlIjo0MDMsInNpemUiOjM2MzAsInJlZmVyZXIiOm51bGwsImFnZW50IjoiY3VybC83LjYxLjEifQo=",
"PartitionKey": "10e21500031d1d5f5d6beadb2fbd5fe3",
"ApproximateArrivalTimestamp": 1554283679.042,
"SequenceNumber": "49594439765182586642277267931516290237972947586714173442"
}
],
"NextShardIterator": "AAAAAAAAAAEVvuiUpuBdo5LXOID3x5ksssSyqNfkxhNFkG7jkkhiqaX7knHtVolTj2JgOIgYT0vJVsYgIQA9BT950RdbxFfSIgyv2y1t75br6/UQlZXfVN3TWHLKJPyu9Xzx0fr+TN3avc0UyzOwg++NQCaPEBV2dTaDhXY03Q0dpXPhROAF7Dnkm8oxRjNYD3tIN+mkkPHu1YZs7mAOwVTuew85iAmF",
"MillisBehindLatest": 0
}
(3) デコードして中身を確認
$ DATA="eyJob3N0IjoiMTI3LjAuMC4xIiwidXNlciI6bnVsbCwibWV0aG9kIjoiR0VUIiwicGF0aCI6Ii8iLCJjb2RlIjo0MDMsInNpemUiOjM2MzAsInJlZmVyZXIiOm51bGwsImFnZW50IjoiY3VybC83LjYxLjEifQo="
$ echo -n $DATA | base64 -d
{"host":"127.0.0.1","user":null,"method":"GET","path":"/","code":403,"size":3630,"referer":null,"agent":"curl/7.61.1"}
Apatchから投げたリクエストを確認できました。
最後に
記事に間違いや不明点があれば遠慮なくご指摘ください。