hadoop
Hdfs
DataPipeline
streamsets

StreamSetsのDataCollectorを使用してGUIでデータフローを定義する

More than 1 year has passed since last update.

Cloudera Managerを使ってセットアップしたData CollectorのGUIからデータフローを作成します。
t26.png

StreamSetsのドキュメントにあるチュートリアルに従って作ってみます。今回はBasic Tutorialのパイプラインを作ってみましょう。
https://streamsets.com/documentation/datacollector/latest/help/#Tutorial/Tutorial-title.html

テストデータのダウンロード

チュートリアルに従ってディレクトリを作成し、サンプルデータをダウンロードします。

[cloudera@quickstart ~]$ sudo mkdir /streamsets
[cloudera@quickstart ~]$ mkdir -p /streamsets/tutorial/origin
[cloudera@quickstart ~]$ mkdir -p /streamsets/tutorial/destination
[cloudera@quickstart ~]$ mkdir -p /streamsets/tutorial/error
[cloudera@quickstart ~]$ sudo chmod -R 777 /streamsets
[cloudera@quickstart ~]$ cd /streamsets/tutorial/origin/
[cloudera@quickstart origin]$ wget https://www.streamsets.com/documentation/datacollector/sample_data/tutorial/nyc_taxi_data.csv
--2016-07-20 01:56:40--  https://www.streamsets.com/documentation/datacollector/sample_data/tutorial/nyc_taxi_data.csv
Resolving www.streamsets.com... 162.242.235.205
Connecting to www.streamsets.com|162.242.235.205|:443... connected.
HTTP request sent, awaiting response... 301 Moved Permanently
Location: https://streamsets.com/documentation/datacollector/sample_data/tutorial/nyc_taxi_data.csv [following]
--2016-07-20 01:56:41--  https://streamsets.com/documentation/datacollector/sample_data/tutorial/nyc_taxi_data.csv
Resolving streamsets.com... 162.242.235.205
Connecting to streamsets.com|162.242.235.205|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 1054132 (1.0M) [text/csv]
Saving to: “nyc_taxi_data.csv”

100%[======================================>] 1,054,132    828K/s   in 1.2s    

2016-07-20 01:56:43 (828 KB/s) - “nyc_taxi_data.csv” saved [1054132/1054132]

[cloudera@quickstart origin]$ 

基本チュートリアル

チュートリアルに従って進めていきます。

パイプラインの作成とパイプラインのプロパティの定義

まずはGUIからパイプラインを作成してみます。Data Collector の Web UIから「Create New Pipeline」をクリックします。

ss_top2.png

パイプラインの名前は Tutorial1 としてみました。

t1.png

「Save」を押して保存すると、空のキャンバスが表示されます。

t2.png

キャンバスの説明はドキュメントに書かれています。
https://streamsets.com/documentation/datacollector/latest/help/#Pipeline_Configuration/DataCollectorWindow-Config.html

Directory Origin を設定する

パイプラインの「入り口」に相当するのがOriginです。今回はダウンロードしたサンプルのCSVファイルがあるディレクトリをOriginにします。ドロップダウンメニューの「Select Origin」 -> 「Directory」を選択するか、右側のStage LibraryからDirectory Originを選びます。

t3.png

t4.png

キャンバスに Directory Originが配置されました。アイコンをクリックし、下の画面からFilesタブをクリックし、次のパラメータを指定します。

  • Data Format: Delimited
  • Files Directory: /streamsets/tutorial/origin
  • Read Order: Last Modified Timestamp
  • File Name Pattern: taxi.csv
  • First File to Process: 空のまま

Data FormatにDelimitedを指定するとDelimitedタブで詳細設定ができます。今回は以下のパラメータを指定します。

  • File Type: Default CSV
  • Header Line: With Header Line (CSVの先頭行にヘッダ行があるので)
  • Root Field Type: List-Map

データのプレビュー

Originで登録したデータをプレビューしてみましょう。データが正しいかどうかをGUIから「見る」ことができるのは便利ですね。

チュートリアルによれば、この状態でPreviewアイコンをクリックできるようですが、手元の環境ではうまくいきませんでした。左側の❗️をクリックしてエラーを解決します。今回はCREATION_009というエラーが表示されています。

t7.png

エラーデータの扱いを指定しなければならないようなので、まずは無視することにしました。

  • Error Record: Discard (Libary Basic)

今度はPreviewアイコンがクリックできるようになっているので、クリックしてプレビューしましょう。

t9.png

t10.png

出力結果は以下のようになりました。CSVファイルの各列をリスト形式で表示しています。

t11.png

テーブル形式でも表示できます!これ、CSVファイルをエディタなどで見る必要ないですし、ヘッダ行も表示してくれていますし、便利ですねぇ....

t12.png

今回のチュートリアルでは、支払いタイプ(/payment_type)とクレジットカード情報(/credit_card)列を利用するようです。終わったら Close Preview アイコンをクリックします。

t13.png

Stream Selector でデータの経路を作る

「Select Processor to Connect」->「Stream Selector」を選択します。

t14.png

Stream Selector の General タブで、Required Fields テキストボックスを指定します。

t15.png

今回は支払いの種類がないレコードをdiscardしたいので、以下の設定を行います。

  • Required Fields: /payment_type

Conditionタブで条件を指定します。

条件により経路が変わります。今回は条件を満たしたデータ(支払いの種類がクレジットカード)が出力ストリーム(1)に流れていくことになります。満たさなかったデータは(2)に流れます。条件の指定の構文を覚える必要はありますが、自動補完が良くできているので、割とサクサク入力できる印象です。

  • ${record:value('/payment_type')=='CRD'}

t16.png

2系統の出力があることがわかりますね。

Jythonを使用してクレジットカードの種類を取得する

ストリーム中のクレジットカード情報のデータを扱うため、Jythonのスクリプトを使用します。「Select processor to connect」からJython Evaluator processorを追加します。

t17.png

続いでJythonタブのスクリプトテキストボックスに、チュートリアルのJythonスクリプトを貼り付けます。

t18.png

クレジットカード番号をマスクする

セキュリティ上、クレジットカード番号をそのまま利用するのは避けたいものです。Data Collectorにはマスクの機能もあるのでデータフローでカード番号をマスクします。先ほどと同様の手順で、キャンバスにField Maskerを追加します。以下のプロパティを指定し、正規表現を使ってマスクします。

  • Mask Type: Regular Expression
  • Regular Expression: (.*)([0-9]{4})
  • Groups to Show: 2

t19.png

Destination に書き込む

パイプラインの結果の出力先を指定します。チュートリアルではローカルファイルシステムになっていますが、せっかくのCloudera Manager管理しているHDFSがあるわけですから今回はHDFSを指定しようかと思ったのですが、Kerberos化していたの後日試すことにしました。「Select Destination to Connect」から「Hadoop FS - CDH5.7.1」を選択します。

t20.png

Hadoop FS Propertyは、暫定的に書き込み先を file:///にします。

  • Hadoop FS URI: file:///
  • HDFS user: cloudera

t21.png

Output Filesタブで以下のプロパティを指定します。

  • Data Format: Delimited
  • File Prefix: _out
  • Directory Template: /streamsets/tutorial/destination
  • Data Timezone: JST

Delimitedタブで以下のプロパティを設定します。

  • Header Line: With Header Line

Expression Evaluatorで対応するフィールドを追加する

先ほどのJython Evaluatorではクレジットカード情報のデータを処理し、クレジットカードの種類を追加しています。ここでデータをプレビューしてみましょう。先ほどと同じように「Preview」をクリックし、Jython Evaluatorをクリックします。
入力データと出力データが表示されます。期待通り(?)、出力にはcredit_card_typeのデータが含まれていますね!

t22.png

クレジットカード支払いでなかった場合も考慮する必要があるので、Stream Selectorのdefaultの出力に「Expression Evaluator」を追加します。

t23.png

Expressionタブのプロパティを以下のように指定します。

  • Output Field: n/a
  • Header attributeは削除

終わったら、Expression EvaluatorとHadoop FSをつなぎます。

t24.png

データルールとアラートの作成

さて、いよいよ最後です。Stream SelectorからJython Evaluatorの間の線をクリックしてルールを追加します。今回はクレジットカードデータがない(エラー)の場合のルールを追加をしています。

Data Ruleのプロパティ
- Label: Missing Card Numbers
- Condition: ${recod:value("/credit_card")==""}
- Sampling Percentage: 35
- Alert text: At least 10 missing credit card numbers!
- Threshold Value: 10

パイプラインを実行する

いよいよ実行です。▶︎のアイコンをクリックして実行します。

t26.png

クレジットカードのエラーは40件のようです。

t27.png

出力されたファイルも見てみましょう。クレジットカード番号はマスクされ、最後の列にカードの種類が出力されています。

[cloudera@quickstart origin]$ head /streamsets/tutorial/destination/_tmp__out
F6F7D02179BE915B23EF2DB57836442D,088879B44B80CC9ED43724776C539370,VTS,CRD,12,0.5,0.5,1.75,0,14.75,1,2013-01-13 04:36asdf,2013-01-13 04:46asdf,5,600,3.12,-73.996933,40.720055,-73.993546,40.693043,xxxxxxxxxxxx2922,Visa
BE386D8524FCD16B3727DCF0A32D9B25,4EB96EC9F3A42794DEE233EC8A2616CE,VTS,CRD,12,0.5,0.5,3.12,0,16.12,1,2013-01-13 04:37:00,2013-01-13 04:48:00,2,660,3.39,-74.000313,40.730068,-73.987373,40.768406,xxxxxxxxxxxx0902,MasterCard
E9FF471F36A91031FE5B6D6228674089,72E0B04464AD6513F6A613AABB04E701,VTS,CRD,5.5,0.5,0.5,1.2,0,7.7,1,2013-01-13 04:41:00,2013-01-13 04:45:00,1,240,1.16,-73.997292,40.720982,-74.000443,40.732376,xxxxxxxxxxxx9608,Visa
A5D125F5550BE7822FC6EE156E37733A,08DB3F9FCF01530D6F7E70EB88C3AE5B,VTS,CRD,11,0.5,0.5,2,0,14,1,2013-01-13 04:37:00,2013-01-13 04:47:00,5,600,2.91,-73.966843,40.756741,-73.987885,40.722713,xxxxxxxxxxxx7252,Visa
EE1513D432B07F7E0B5E2ED1EF629086,F31D261881520931062C011366E56A04,VTS,CRD,18,0.5,0.5,3.7,0,22.7,1,2013-01-13 04:30:00,2013-01-13 04:44:00,6,840,5.21,-74.005455,40.740772,-73.967354,40.798096,xxxxxxxxxxx7913,AMEX

まとめ

正直に言ってかなり使い易いと思います。HDFS、HBase、Kafka等、様々なエコシステムへのパイプラインを作成できるので、一度試してみてはいかがでしょうか?

おまけ

Cloudera ManagerでData Collectorのモニタリング
t28.png