はじめに
Apache NiFiの勉強のため、ユースケースを考えながらいろいろ弄っている。
サンプルユースケース
- Http経由でCSVファイルをアップロードする
- フォーマット変換がOKだったら、200ステイタスのレスポンス、そうでなければ400のレスポンス
- CSVファイルからデータベースに投入(Httpレスポンスとは同期させない)
下準備
MySQL
DockerHubのMySQL8を利用。
version: '3'
services:
mysql:
image: "mysql:latest"
container_name: "mysql"
enviroment:
MYSQL_ROOT_PASSWORD: password
MYSQL_DATABSE: mydb
MYSQL_USER: user
MYSQL_PASSWORD: password
commnad: ["--character-set-server=utf8mb4","--default-authentication-plugin=mysql_native_password"]
MySQLを起動
# MySQLサーバー起動
$ sudo docker-compose up -d mysql
# MySQLクライアント起動
$ sudo docker exec -it mysql mysql -u user -p mydb
あらかじめテーブルを作成しておく。
# テーブル作成
$ mysql> create table mytable (
mysql> _id INT AUTO_INCREMENT PRIMARY KEY,
mysql> name TEXT,
mysql> age INT);
Apache NiFi
NiFiは以前の記事参照。貧弱なマシンでメモリがやばいので、クラスタ構成ではなく1台構成で。
MySQLのJDBCドライバをあらかじめダウンロードしておき、コンテナへコピーする。(本来はちゃんとDockerfileで最初からJDBCドライバをインストールすべきです)
# NiFiとZookeeperを起動
sudo docker-compose up -d nifi1 zookeeper
# JDBCドライバをコンテナ内へコピーしておく。(本来はDockerイメージに組み入れるべきだが、手抜き)
sudo docker cp ./mysql-connector-java-8.0.22.jar nifi1:/home/nifi/mysql-connector-java-8.0.22.jar
Apache Nifi のフロー
プロセッサ
配置するプロセッサと主なプロパティは以下
HandleHttpRequest
- CSVファイルをHTTPリクエストとして受け付ける。
- ポート番号を指定する(ポート番号は適当な番号)。
プロパティ | 設定値 |
---|---|
Listening Port | 8090 |
HTTP Context Map | HTTP Context Map |
CovertRecord
- CSVをArvoレコード形式に変換を行う。
- Readerに
CSVReader
、WriterにAvroRecordSetWriter
を指定。
プロパティ | 設定値 |
---|---|
Record Reader | CSVReader |
Record Writer | AvroRecordSetWriter |
ReplaceText
- 変換成功時にHTTPレスポンスボディを設定する(成功時は空文字)
-
Replacement Text
プロパティに空文字(Empty String)を設定。
プロパティ | 設定値 |
---|---|
Replacement Value | Empty string set |
ReplaceText
- 変換失敗時にHTTPレスポンスボディを設定する(失敗時は
{"error":true}
というJSONを返すことにする。) -
Replacement Text
プロパティに{"error":true}
と設定。
プロパティ | 設定値 |
---|---|
Replacement Value | {"error":true} |
PutDatabaseRecord
- 変換成功時にMySQLにレコードをINSERTする。
- Record ReaderプロパティをAvroReaderに設定
- Statement TypeプロパティをINSERTに設定
- Table Nameプロパティに挿入するテーブル名を設定
プロパティ | 設定値 |
---|---|
Record Reader | AvroReader |
Statement Type | INSERT |
Database Connection Pooling Service | DBCPConnectionPool |
Table Name | mytable |
HandleHttpResponse
- 成功時にレスポンスを返す。
- ステータスコードを200に設定。
プロパティ | 設定値 |
---|---|
HTTP Status Code | 200 |
HTTP Context Map | StandardHttpContextMap |
HandleHttpResponse
- 失敗時にレスポンスを返す。
- ステータスコードを400に設定
プロパティ | 設定値 |
---|---|
HTTP Status Code | 400 |
HTTP Context Map | StandardHttpContextMap |
コントローラサービス
配置するコントローラサービスと主なプロパティは以下
StandardHttpContextMap
- HTTPリクエスト・レスポンスを行う。
HandleHttpRequest
とHandleHttpResponse
プロセッサにアタッチする。
CSVReader
- CSV形式を読み込む。
ConvertRecord
プロセッサにアタッチする。
プロパティ | 設定値 |
---|---|
Schema Access Strategy | Use 'Schema Name' Property |
Schema Registry | AvroSchemaRegistry |
Schema Name | person |
Treat First Line as Header | true |
AvroRecordSetWriter
- Avro形式を書き込む。
ConvertRecord
プロセッサにアタッチする
プロパティ | 設定値 |
---|---|
Schema Write Strategy | Embed Avro Schema |
AvroReader
- Avro形式を読み込む。
PutDatabaseRecord
プロセッサにアタッチする。
プロパティ | 設定値 |
---|---|
Schema Access Strategy | Use 'Schema Name' Property |
Schema Registry | AvroSchemaRegistry |
Schema Name | person |
AvroSchemaRegistory
- レコードのデータスキーマを定義しておく。
AvroReader
コントローラCSVReader
コントローラにアタッチする。 - レコードのスキーマのために新しいプロパティ「person」を追加し、Avroスキーマ形式でスキーマを記述しておく。
プロパティ | 設定値 |
---|---|
person | { "type":"record", "name":"person", "namespace":"nifi", "fields":[ {"name":"name","type":"string"}, {"name":"age","type":"int"} ] } |
DBCPConnectionPool
- データベース接続のコネクションプール。
PutDatabaseRecord
プロセッサにアタッチする。 - コネクションURL、JDBCドライバ名、ドライバのJAR、DBユーザとパスをそれぞれ設定。
プロパティ | 設定値 |
---|---|
Database Connection URL | jdbc:mysql://mysql:3306/mydb |
Database Driver Class Name | com.mysql.jdbc.Driver |
Database Driver Location(s) | file:///home/nifi/mysql-connector-java-8.0.22.jar |
Database User | user |
Password | password |
実行
NiFiのフローを実行状態にして、curlで投げてみる。
# こんなCSVファイルがあったとして
$ cat data.csv
name,age
Yamada,31
Tanaka,21
Yoshida,19
# NiFiのリクエストハンドラにファイルアップロード
$ curl -X POST -F "file=@data.csv" localhost:8090
DBに登録されていれば成功。
$ sudo docker exec -it mysql mysql -u user -p mydb
$ mysql> select * from mytable;
| _id | name | age |
|-----|-------|-----|
| 0 |Yamada | 31 |
| 1 |Tanaka | 21 |
| 2 |Yoshida| 19 |