0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 3 years have passed since last update.

Apache NiFiでHttp経由でCSVファイルをアップロードしDBへ投入する

Posted at

はじめに

Apache NiFiの勉強のため、ユースケースを考えながらいろいろ弄っている。

サンプルユースケース

  • Http経由でCSVファイルをアップロードする
  • フォーマット変換がOKだったら、200ステイタスのレスポンス、そうでなければ400のレスポンス
  • CSVファイルからデータベースに投入(Httpレスポンスとは同期させない)

下準備

MySQL

DockerHubのMySQL8を利用。

version: '3'
services:
  mysql:
    image: "mysql:latest"
    container_name: "mysql"
    enviroment:
      MYSQL_ROOT_PASSWORD: password
      MYSQL_DATABSE: mydb
      MYSQL_USER: user
      MYSQL_PASSWORD: password
    commnad: ["--character-set-server=utf8mb4","--default-authentication-plugin=mysql_native_password"]

MySQLを起動

# MySQLサーバー起動
$ sudo docker-compose up -d mysql
# MySQLクライアント起動
$ sudo docker exec -it mysql mysql -u user -p mydb

あらかじめテーブルを作成しておく。

# テーブル作成
$ mysql> create table mytable (
  mysql>   _id INT AUTO_INCREMENT PRIMARY KEY,
  mysql>   name TEXT,
  mysql>   age  INT);

Apache NiFi

NiFiは以前の記事参照。貧弱なマシンでメモリがやばいので、クラスタ構成ではなく1台構成で。
MySQLのJDBCドライバをあらかじめダウンロードしておき、コンテナへコピーする。(本来はちゃんとDockerfileで最初からJDBCドライバをインストールすべきです)

# NiFiとZookeeperを起動
sudo docker-compose up -d nifi1 zookeeper

# JDBCドライバをコンテナ内へコピーしておく。(本来はDockerイメージに組み入れるべきだが、手抜き)
sudo docker cp ./mysql-connector-java-8.0.22.jar nifi1:/home/nifi/mysql-connector-java-8.0.22.jar

Apache Nifi のフロー

flow.png

プロセッサ

配置するプロセッサと主なプロパティは以下

HandleHttpRequest

  • CSVファイルをHTTPリクエストとして受け付ける。
  • ポート番号を指定する(ポート番号は適当な番号)。
プロパティ 設定値
Listening Port 8090
HTTP Context Map HTTP Context Map

CovertRecord

  • CSVをArvoレコード形式に変換を行う。
  • ReaderにCSVReader、WriterにAvroRecordSetWriterを指定。
プロパティ 設定値
Record Reader CSVReader
Record Writer AvroRecordSetWriter

ReplaceText

  • 変換成功時にHTTPレスポンスボディを設定する(成功時は空文字)
  • Replacement Textプロパティに空文字(Empty String)を設定。
プロパティ 設定値
Replacement Value Empty string set

ReplaceText

  • 変換失敗時にHTTPレスポンスボディを設定する(失敗時は{"error":true}というJSONを返すことにする。)
  • Replacement Textプロパティに{"error":true}と設定。
プロパティ 設定値
Replacement Value {"error":true}

PutDatabaseRecord

  • 変換成功時にMySQLにレコードをINSERTする。
  • Record ReaderプロパティをAvroReaderに設定
  • Statement TypeプロパティをINSERTに設定
  • Table Nameプロパティに挿入するテーブル名を設定
プロパティ 設定値
Record Reader AvroReader
Statement Type INSERT
Database Connection Pooling Service DBCPConnectionPool
Table Name mytable

HandleHttpResponse

  • 成功時にレスポンスを返す。
  • ステータスコードを200に設定。
プロパティ 設定値
HTTP Status Code 200
HTTP Context Map StandardHttpContextMap

HandleHttpResponse

  • 失敗時にレスポンスを返す。
  • ステータスコードを400に設定
プロパティ 設定値
HTTP Status Code 400
HTTP Context Map StandardHttpContextMap

コントローラサービス

配置するコントローラサービスと主なプロパティは以下

StandardHttpContextMap

  • HTTPリクエスト・レスポンスを行う。HandleHttpRequestHandleHttpResponseプロセッサにアタッチする。

CSVReader

  • CSV形式を読み込む。ConvertRecordプロセッサにアタッチする。
プロパティ 設定値
Schema Access Strategy Use 'Schema Name' Property
Schema Registry AvroSchemaRegistry
Schema Name person
Treat First Line as Header true

AvroRecordSetWriter

  • Avro形式を書き込む。ConvertRecordプロセッサにアタッチする
プロパティ 設定値
Schema Write Strategy Embed Avro Schema

AvroReader

  • Avro形式を読み込む。PutDatabaseRecordプロセッサにアタッチする。
プロパティ 設定値
Schema Access Strategy Use 'Schema Name' Property
Schema Registry AvroSchemaRegistry
Schema Name person

AvroSchemaRegistory

  • レコードのデータスキーマを定義しておく。AvroReaderコントローラ CSVReaderコントローラにアタッチする。
  • レコードのスキーマのために新しいプロパティ「person」を追加し、Avroスキーマ形式でスキーマを記述しておく。
プロパティ 設定値
person {
  "type":"record",
  "name":"person",
  "namespace":"nifi",
  "fields":[
    {"name":"name","type":"string"},
    {"name":"age","type":"int"}
  ]
}

DBCPConnectionPool

  • データベース接続のコネクションプール。PutDatabaseRecordプロセッサにアタッチする。
  • コネクションURL、JDBCドライバ名、ドライバのJAR、DBユーザとパスをそれぞれ設定。
プロパティ 設定値
Database Connection URL jdbc:mysql://mysql:3306/mydb
Database Driver Class Name com.mysql.jdbc.Driver
Database Driver Location(s) file:///home/nifi/mysql-connector-java-8.0.22.jar
Database User user
Password password

実行

NiFiのフローを実行状態にして、curlで投げてみる。

# こんなCSVファイルがあったとして
$ cat data.csv
name,age
Yamada,31
Tanaka,21
Yoshida,19

# NiFiのリクエストハンドラにファイルアップロード
$ curl -X POST -F "file=@data.csv" localhost:8090

DBに登録されていれば成功。

$ sudo docker exec -it mysql mysql -u user -p mydb

$ mysql> select * from mytable;

| _id | name  | age |
|-----|-------|-----|
| 0   |Yamada | 31  |
| 1   |Tanaka | 21  |
| 2   |Yoshida| 19  |
0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?