1
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

Qiita×Findy記事投稿キャンペーン 「自分のエンジニアとしてのキャリアを振り返ろう!」

Kinesis Agent, APIを用いたオンプレ⇒AWSのデータ転送方法比較

Last updated at Posted at 2024-03-08

はじめに

最近AWSを用いた製造現場のDX化の技術検証に携わっております。
そこで検証した内容をアウトプットのため、他にも同じようなことをしている方のため
簡単にですがまとめたいなーと思いました。

今回はKinesis版です。オンプレのサーバにデータを貯めているのをどうにかしたい場合ってありますよね。

やること

image.png

オンプレのサーバにずっと格納し続けていると容量の問題や、データ分析がしづらいとのことで、一旦S3に格納してみようということです。将来的にOpen Searchを利用したリアルタイム分析やLambda、Glueを利用したETL処理してからデータ格納等もできれば良いかと。

オンプレ⇒Kinesisのデータ転送方法

オンプレのファイルをKinesis経由で転送する方法には以下2パターンあるかと思います

  1. Kinesis APIの利用
  2. Kinesis Agentの利用

これらどちらも手順等を説明しているサイトが少ないので、今回まとめてみた次第です。
DataStreams⇒Firehose⇒S3の構築手順は簡単なので飛ばします。

1. Kinesis API利用

今回は検証なのでCloud 9上にコーディングしていきます。
Kinesisとの通信が発生するので以下2つのポリシーを付与したロールを適用しています。
1.role.PNG

Json転送コード

ネットで転がっている情報を基に以下コードを記載しました。
"stream_name"には作成したDataStreamsの名前を入力してください。

id:1
x:1~100
event_time:今の時間
をJson形式でKinesisに転送します。

import datetime
import json
import time

import boto3
import pandas as pd

def main():
    ex_id = 1
    x=1
    steps = 100
    send=[]

    stream_name = "stream_name"
    kinesis_client = boto3.client('kinesis',region_name='ap-northeast-1')
    
    print("Start")
  
    for i in range(steps):

        partition_key = str(ex_id)
        
        data = {
            "id": ex_id,
            "x": x,
            "event_time": datetime.datetime.now().isoformat()
        }

        kinesis_client.put_record(
            StreamName=stream_name,
            Data=json.dumps(data),
            PartitionKey=partition_key)

        send.append(data)
        time.sleep(0.1)
        
        x += 1
        
    send_data = pd.DataFrame(send)
    send_data.to_csv("./send.csv", index=False)

    print("finished.")


if __name__ == '__main__':
    main()

Kinesis Viewer確認

こんな感じでDataStreamsにデータが転送されます。
6. Streams Viewer.PNG

S3確認

S3にはこんな感じで格納されます。
7.S3.PNG

で、開いたらこんな感じ
8.中身.PNG

Kinesis Agent利用

Windows Serverが良いって言われたのでWindows ServerにKinesis Agentを入れて検証していきます。

Kinesis Agentのインストール

以下のサイトを確認して、どれかの方法でKinesis Agentをインストールしてください。
https://docs.aws.amazon.com/ja_jp/kinesis-agent-windows/latest/userguide/getting-started.html

ちなみに自分はPower Shellでやりました。

appsettingファイルの設定

Kinesis Agentを利用すると、PCのログやらメトリクスやら色々取得できるらしいですね。
ただ、今回はある特定のフォルダにCSVファイルが格納されたら転送するって形です。

appsettingファイルは「Program Files/Amazon/AWSKinesisTap」配下にあります。
4.appsetting.PNG

で、今回のコードはこちら
{特定フォルダ}に.csvファイルが格納されたらDataStreamsに転送って感じですね。
stream_nameにはDataStreamsの名前を入れましょう。簡単だ。

{
  "Sources": [
    {
      "Id": "LogSource",
      "SourceType": "DirectorySource",
      "RecordParser": "SingleLine",
      "Directory": "{特定フォルダ}",
      "FileNameFilter": "*.csv"
    }
  ],
  "Sinks": [
    {
      "Id": "KinesisStreamSink",
      "SinkType": "KinesisStream",
      "StreamName": "stream_name",
      "Region": "ap-northeast-1"
    }
  ],
  "Pipes": [
    {
      "Id": "LogSourceToKinesisStreamSink",
      "SourceRef": "LogSource",
      "SinkRef": "KinesisStreamSink"
    }
  ]
}        

Kinesis Agentの開始

以下コマンドを入力!
Runningになっていたら動いています。

$ Start-Service -Name AWSKinesisTap
$ get-service -name AWSKinesistap

Status   Name               DisplayName
------   ----               -----------
Running  AWSKinesistap      Amazon Kinesis Agent for Microsoft ...

CSVファイルの格納

適当な値を入れたCSVファイルを特定フォルダに格納してみましょう!!
自動でDataStreamsに転送されます。
8.Streams Viewer.PNG
(値がてきとうだ...)

おわり

情報が少なくて構築には時間がかかったけど、まとめてみたら短くなってしまった。笑笑
Kinesisって資格ベースで知識はあるけど、実際触ってみたらわからないこと多くて困りました。

後は要件によりますが、データ転送頻度とかサーバ側のデータ転送後の削除等、色々決めないといけないことはありますね。

次はGreengrassを用いたデータ収集・加工でお会いしましょう。多分

1
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
1
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?