はじめに
最近AWSを用いた製造現場のDX化の技術検証に携わっております。
そこで検証した内容をアウトプットのため、他にも同じようなことをしている方のため
簡単にですがまとめたいなーと思いました。
今回はKinesis版です。オンプレのサーバにデータを貯めているのをどうにかしたい場合ってありますよね。
やること
オンプレのサーバにずっと格納し続けていると容量の問題や、データ分析がしづらいとのことで、一旦S3に格納してみようということです。将来的にOpen Searchを利用したリアルタイム分析やLambda、Glueを利用したETL処理してからデータ格納等もできれば良いかと。
オンプレ⇒Kinesisのデータ転送方法
オンプレのファイルをKinesis経由で転送する方法には以下2パターンあるかと思います
- Kinesis APIの利用
- Kinesis Agentの利用
これらどちらも手順等を説明しているサイトが少ないので、今回まとめてみた次第です。
DataStreams⇒Firehose⇒S3の構築手順は簡単なので飛ばします。
1. Kinesis API利用
今回は検証なのでCloud 9上にコーディングしていきます。
Kinesisとの通信が発生するので以下2つのポリシーを付与したロールを適用しています。
Json転送コード
ネットで転がっている情報を基に以下コードを記載しました。
"stream_name"には作成したDataStreamsの名前を入力してください。
id:1
x:1~100
event_time:今の時間
をJson形式でKinesisに転送します。
import datetime
import json
import time
import boto3
import pandas as pd
def main():
ex_id = 1
x=1
steps = 100
send=[]
stream_name = "stream_name"
kinesis_client = boto3.client('kinesis',region_name='ap-northeast-1')
print("Start")
for i in range(steps):
partition_key = str(ex_id)
data = {
"id": ex_id,
"x": x,
"event_time": datetime.datetime.now().isoformat()
}
kinesis_client.put_record(
StreamName=stream_name,
Data=json.dumps(data),
PartitionKey=partition_key)
send.append(data)
time.sleep(0.1)
x += 1
send_data = pd.DataFrame(send)
send_data.to_csv("./send.csv", index=False)
print("finished.")
if __name__ == '__main__':
main()
Kinesis Viewer確認
S3確認
Kinesis Agent利用
Windows Serverが良いって言われたのでWindows ServerにKinesis Agentを入れて検証していきます。
Kinesis Agentのインストール
以下のサイトを確認して、どれかの方法でKinesis Agentをインストールしてください。
https://docs.aws.amazon.com/ja_jp/kinesis-agent-windows/latest/userguide/getting-started.html
ちなみに自分はPower Shellでやりました。
appsettingファイルの設定
Kinesis Agentを利用すると、PCのログやらメトリクスやら色々取得できるらしいですね。
ただ、今回はある特定のフォルダにCSVファイルが格納されたら転送するって形です。
appsettingファイルは「Program Files/Amazon/AWSKinesisTap」配下にあります。
で、今回のコードはこちら
{特定フォルダ}に.csvファイルが格納されたらDataStreamsに転送って感じですね。
stream_nameにはDataStreamsの名前を入れましょう。簡単だ。
{
"Sources": [
{
"Id": "LogSource",
"SourceType": "DirectorySource",
"RecordParser": "SingleLine",
"Directory": "{特定フォルダ}",
"FileNameFilter": "*.csv"
}
],
"Sinks": [
{
"Id": "KinesisStreamSink",
"SinkType": "KinesisStream",
"StreamName": "stream_name",
"Region": "ap-northeast-1"
}
],
"Pipes": [
{
"Id": "LogSourceToKinesisStreamSink",
"SourceRef": "LogSource",
"SinkRef": "KinesisStreamSink"
}
]
}
Kinesis Agentの開始
以下コマンドを入力!
Runningになっていたら動いています。
$ Start-Service -Name AWSKinesisTap
$ get-service -name AWSKinesistap
Status Name DisplayName
------ ---- -----------
Running AWSKinesistap Amazon Kinesis Agent for Microsoft ...
CSVファイルの格納
適当な値を入れたCSVファイルを特定フォルダに格納してみましょう!!
自動でDataStreamsに転送されます。
(値がてきとうだ...)
おわり
情報が少なくて構築には時間がかかったけど、まとめてみたら短くなってしまった。笑笑
Kinesisって資格ベースで知識はあるけど、実際触ってみたらわからないこと多くて困りました。
後は要件によりますが、データ転送頻度とかサーバ側のデータ転送後の削除等、色々決めないといけないことはありますね。
次はGreengrassを用いたデータ収集・加工でお会いしましょう。多分