Qiita Teams that are logged in
You are not logged in to any team

Log in to Qiita Team
Community
OrganizationAdvent CalendarQiitadon (β)
Service
Qiita JobsQiita ZineQiita Blog
Help us understand the problem. What is going on with this article?

MiNiFiでセンサーデータを取得し、NiFiに転送してHDFS、Hiveに書き込

More than 1 year has passed since last update.

「センサーをラズパイに接続し、MiNiFiでセンサーデータをTailする。そしてNiFiに転送して、NiFiでHDFS, Hiveに書き込む」の手順をご紹介します。

実現したいこと

  • MiNiFiでセンサーデータを取得し、NiFiに転送する
  • NiFiでセンサーデータをRawデータとしてKafka経由でHDFSに保存する
  • NiFiでセンサーデータをHiveテーブルに保存する
  • NiFiでデータを加工し、温度が閾値を超えたら、Slackアラート通知する

環境情報

  • センサーをラズパイに接続
  • ラズパイでセンサーデータを取るPythonスクリプト
  • HDP3.1(Hadoop 3.1.1, Hive 3.1.0) & HDF 3.3.1(NiFi 1.8.0, Kafka 2.0.0)、クラスタは同一Ambariで管理

構成

構成は以下の様な感じになります。パース処理のところは一つしか書いてないですが、複数のパース処理の集合だと思ってください。
enter image description here

全体データフロー

  • RemoteMiNiFiというInput portとProcessDataというPrecess Groupで構成されています。 enter image description here
  • ProcessDataグループ内の詳細フロー enter image description here

やってみよう

1、MiNiFiのセットアップ

ラズパイ自体のOSインストールや、センサーとの接続ができた状態(ここでは割愛します)でMiNiFiのインストールをやります。
MiNiFiのTarファイルはHortonworksサイトにあります。
https://docs.hortonworks.com/HDPDocuments/HDF3/HDF-3.3.1/release-notes/content/hdf_repository_locations.html

sudo su -
cd /home/pi

#HortonworksのサイトからMiNiFiのtarファイルをダウンロードする
curl -O http://public-repo-1.hortonworks.com/HDF/3.3.1.0/minifi-0.6.0.3.3.1.0-10-bin.tar.gz
curl -O http://public-repo-1.hortonworks.com/HDF/3.3.1.0/minifi-toolkit-0.6.0.3.3.1.0-10-bin.tar.gz

#ファイル解答し、シンボリックリンクを作成する
tar -zxvf minifi-0.6.0.3.3.1.0-10-bin.tar.gz
tar -zxvf minifi-toolkit-0.6.0.3.3.1.0-10-bin.tar.gz
ln -s minifi-0.6.0.3.3.1.0-10 minifi
ln -s minifi-toolkit-0.6.0.3.3.1.0-10 tool_minifi

#MiNiFiをインストールする
./minifi/bin/minifi.sh install
#必要に応じてリモートNiFiホストを/etc/hostsに追加する

2、センサーデータをPythonスクリプトで取得

Pythonスクリプトは/home/pi/bme280-dataフォルダに2019-06-04.csvのようなファイルが生成されます。

中身は:「センサーID, 日付, 時間, 気圧, 温度, 湿度」のカンマ区切りのデータになります。
1,2019-06-04,23:59:51,1011.54,24.86,50.67

#coding: utf-8
import bme280_custom
import datetime
import os

dir_path = '/home/pi/bme280-data’
now = datetime.datetime.now()
filename = now.strftime('%Y-%m-%d’)
label = now.strftime('%H:%M:%S’)
csv = bme280_custom.readData()
if not os.path.exists('/home/pi/bme280-data’):
  os.makedirs('/home/pi/bme280-data’)
f = open('/home/pi/bme280-data/'+filename+'.csv','a’)
f.write('1,'+filename +","+label+","+csv+"\n")
f.close()

このスクリプトをCronとかで定期的に実行するように設定する(例えば10秒ごとに実行)

3、NiFiでデータフローを作成し、MiNiFiに配布

MiNiFiで直接データフローを作成するのが難しいので、NiFiで作成して、テンプレート(.xml)としてエクスポート、MiNiFi toolkitで.xmlを.ymlに変換するのが一般的です。
今回はTailFileプロセッサとRemoteProcessGroupを使います。

  • NiFiのTop画面でInput portを追加する。名前をRemoteMiNiFiに設定する enter image description here
  • TailFileプロセッサでフォルダの.csvファイルを取り込みます。以下のプロパティを設定します。 Tailing mode:  Multiple files File(s) to Tail:  .*.csv Base Directory: /home/pi/bme280-data enter image description here
  • Remote Process Groupを以下のように設定して、RemoteMiNiFiをInput Portとして選択する URLsにNiFiのURL(http://hdp-srv4.demotest.com:9090/nifi)を指定する。複数ある場合は、カンマ区切りで入力する。 Transport ProtocolでHTTPを選択する。 enter image description here
  • 作ったデータフローをテンプレートにエクスポートする TailFileプロセッサとRemoteProcessGroupを繋いで、全部選択して、右クリック、「Create template」でテンプレートを作成する。 作成したら、ダウンロードする enter image description here
  • MiNiFiでテンプレートファイル(.xml)から.ymlファイルに変換する ラズパイにログインして、以下のコマンドでファイルを変換して、MiNiFiを起動する
sudo su -
cd /home/pi

#MiNiFi toolkitでxmlからymlに変換する
./tool_minifi/bin/config.sh transform /home/pi/sensor_minifi4.xml ./sensor_minifi4.yml

#既存のconfig.ymlファイルをバックアップし、新しいymlで上書きする
cp -p minifi/conf/config.yml minifi/conf/config.yml.bk
cp -p sensor_minifi4.yml minifi/conf/config.yml

#MiNiFiプロセスを起動する。他にstop, restartなどオプションがある
./minifi/bin/minifi.sh start

ここまできたら、MiNiFiからセンサーデータをNiFiの方に転送できるようになります。

4、NiFiでセンサーデータをRawデータとしてKafka経由でHDFSに保存する

ここからは、NiFiでセンサーデータをKafka経由でHDFSに保存するデータフローを作成していきます。
全体のデータフローはこんな感じです。
enter image description here

  • NiFi画面でUser1というProcess Groupをドラッグ&ドロップする。すでに作成済みのRemoteMiNiFi input portと繋ぐ
    enter image description here
    User1グループをダブルクリックで入って後続のフロー作成に入ります。

  • FromMinifiというInput portとPublishKafka_2_0プロセッサを作成して、繋ぐ
    enter image description here
    PublishKafka_2_0のPROPERTIESタブで必要なプロパティを設定する。
    Kafka Brokers: hdp-srv1.demotest.com:6667,hdp-srv2.demotest.com:6667,hdp-srv3.demotest.com:6667 を入力
    Topic Name: sensor_data_user1
    Delivery Guarantee: Guarantee Replicated Delivery を選択
    enter image description here
    PublishKafka_2_0のSETTINGSタブでfailureとsuccess両方をチェックする。
    enter image description here

  • ConsumeKafka_2_0プロセッサを追加する
    Kafka Brokers: hdp-srv1.demotest.com:6667,hdp-srv2.demotest.com:6667,hdp-srv3.demotest.com:6667 を入力
    Topic Name: sensor_data_user1
    Group ID: group1_user1
    enter image description here

  • MergeContentプロセッサを追加する
    Minimum Number of Entries: 10 に変更
    enter image description here

  • PutHDFSプロセッサを追加する
    Hadoop Configuration Resources: /etc/hadoop/conf/core-site.xml,/etc/hadoop/conf/hdfs-site.xml
    Directory: /tmp/sensor_data/user1
    enter image description here
    SETTINGSタブでsuccessをチェックする
    enter image description here

  • 最後にLogAttributeプロセッサを追加する
    PROPERTIESタブは既定のままで、SETTINGSタブでsuccessをチェックする
    enter image description here
    MergeContentのoriginal, failure, PutHDFSのfailureと繋ぐ

  • HDFS上のファイルの中身を見てみる
    enter image description here

5、NiFiでセンサーデータをHiveテーブルに保存する

ここでHive Streamingを使ってセンサーデータをリアルタイムにHiveテーブルに追加します。
これを実現するには、Hiveテーブルがいくつか要件を満たす必要があります。
詳細はこちらをご参照くださいStreamingDataIngest-StreamingRequirements

1,ACIDサポートのため、hive-site.xmlに以下3つのパラメータを設定(HDP3.1ではすでに設定ずみ)
hive.txn.manager = org.apache.hadoop.hive.ql.lockmgr.DbTxnManager 
hive.compactor.initiator.on = true 
hive.compactor.worker.threads > 0

2,テーブル作成時に以下を含むことが必要
STORED AS ORC
tblproperties("transactional"="true")
CLUSTERED BY (cloumn-name) INTO <num> BUCKETS

下記の通りでテーブル(sensor_data_user1)を作成します。

CREATE TABLE sensor_data_user1(id int,time_str string, pressure double, temperature double, humidity double)
PARTITIONED BY(date_str string) 
CLUSTERED BY (id) INTO 5 BUCKETS 
STORED AS ORC 
tblproperties("transactional" = "true");

この部分のデータフローは図の通りになります。
enter image description here

これからプロセッサを説明していきます。

  • UpdateAttributeプロセッサを追加し、FromMinifi input portと接続する enter image description here
  • UpdateAttributeのPROPERTIESタブでschema.name=sensor_data_schema1を追加する enter image description here
  • ConvertRecordプロセッサを追加する。CSVをAvroに変換 Record Reader: CSVReaderを選択 Record Writer: AvroRecordSetWriterを選択 enter image description here CSVReaderの→をクリックする。 Controller ServicesタブでAvroRecordSetWriterとCSVReaderが追加される。 enter image description here ⚙をクリックする。 Schema Access Strategy: Use 'Schema Name' Propertyを選択 Schema RegistryでCreate new service...を選択する enter image description here AvroSchemaRegistry...を選択してCreateをクリックする enter image description here Schema Registryで AvroSchemaRegistryが表示される。→をクリックする。Save changes before going to Controller Service?が表示され、Yesクリックする enter image description here Controller ServicesタブでAvoSchemaRegistryが追加される ⚙をクリックする enter image description here Propertiesタブでプロパティ sensor_data_schema1を追加する image.png

Avroスキーマは下記の通り

    {
         "type": "record",
         "namespace": "sensor_data_schema1",
         "name": "sensor_data_schema1",
         "fields": [
               { "name": "id", "type": "int" },
           { "name": "date_str", "type": "string" },
           { "name": "time_str", "type": "string" },
           { "name": "pressure", "type": "double" },
           { "name": "temperature", "type": "double" },
           { "name": "humidity", "type": "double" }
         ]
    } 

終わったら、CSVReaderがInvalid、AvroSchemaRegistryがDisabledの状態。
赤枠のアイコンをクリックしてAvroSchemaRegistryを有効にする(Enable→Closeをクリックする)。
CSVReaderも有効にする
enter image description here
最後にAvroRecordSetWriterも既定のままで有効にする
enter image description here
ConvertRecordのfailureをLogAttributeに繋ぐ

  • PutHive3Streamingプロセッサを追加する enter image description here PutHive3StreamingのPropertiesタブで以下の値を設定する Record Reader: AvroReaderを選択 Hive Metastore URL: thrift://hdp-srv3.demotest.com:9083を入力 Hive Configuration Resources: /etc/hive/conf/hive-site.xml,/etc/hadoop/conf/core-site.xml,/etc/hadoop/conf/hdfs-site.xmlを入力 Database Name: defaultを入力 Table Name: sensor_data_user1を入力 enter image description here Record Readerの→をクリックし、AvroReaderを有効にする(Enable→Close) enter image description here PutHive3StreamingのSettingsタブでsuccessをチェックする enter image description here
  • PutHive3StreamingのfaiureとretryをLogAttributeに接続する
  • Zeppelinでクエリを実行して確認 時間軸で温度の変化を表しています。 enter image description here

6、NiFiでデータを加工し、温度が閾値を超えたら、Slackアラート通知する

CSV形式のデータ「1,2019-06-04,23:59:51,1011.54,24.86,50.67」から温度を取得して、閾値(ここでは30度)を超えたらSlackにアラート通知します。
全体のデータフロー
enter image description here

  • ExtractTextプロセッサを追加する Propertiesタブでfieldプロパティを追加、値を(.*),(.*),(.*),(.*),(.*),(.*)に設定 image.png

Settingsタブでunmatchedをチェックする
enter image description here

  • RouteOnAttributeプロセッサを追加する Propertiesタブでhighプロパティを追加、値を${field.5:gt(40)} に設定する image.png

Settingsタブでunmatchedをチェックする
enter image description here
* PutSlackプロセッサを追加する
Propertiesタブで以下のプロパティを設定する
Webhook URL: https://hooks.slack.com/services/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
Webhook Text: Temperature is ${field.5}! Too hort!!
image.png

Settingsタブでsuccessをチェックする
enter image description here

  • PutSlackのfailureをLogAttributeに接続する
  • Slackメッセージ確認 enter image description here

最後のメモ: 事前準備

# Kafka topic作成
cd /usr/hdp/current/kafka-broker
./kafka-topics.sh --create --zookeeper hdp-srv1.demotest.com:218,hdp-srv2.demotest.com:2181,hdp-srv3.demotest.com:2181 --replication-factor 3 --partitions 1 --topic sensor_data_user1

# HDFS folder作成
sudo su - hdfs 
hdfs dfs -mkdir -p /tmp/sensor_data/user1
hdfs dfs -chmod -R 777 /tmp/sensor_data/user1

# sensor_data_user1テーブルにNiFiユーザーにrwx権限追加
hdfs dfs -setfacl -m -R user:nifi:rwx /warehouse/tablespace/managed/hive/sensor_data_user1
Why not register and get more from Qiita?
  1. We will deliver articles that match you
    By following users and tags, you can catch up information on technical fields that you are interested in as a whole
  2. you can read useful information later efficiently
    By "stocking" the articles you like, you can search right away