Qiita Teams that are logged in
You are not logged in to any team

Log in to Qiita Team
Community
OrganizationEventAdvent CalendarQiitadon (β)
Service
Qiita JobsQiita ZineQiita Blog
5
Help us understand the problem. What are the problem?

More than 5 years have passed since last update.

StreamSetsのDataCollectorを使用してGUIでデータフローを定義する

Cloudera Managerを使ってセットアップしたData CollectorのGUIからデータフローを作成します。
t26.png

StreamSetsのドキュメントにあるチュートリアルに従って作ってみます。今回はBasic Tutorialのパイプラインを作ってみましょう。
https://streamsets.com/documentation/datacollector/latest/help/#Tutorial/Tutorial-title.html

テストデータのダウンロード

チュートリアルに従ってディレクトリを作成し、サンプルデータをダウンロードします。

[cloudera@quickstart ~]$ sudo mkdir /streamsets
[cloudera@quickstart ~]$ mkdir -p /streamsets/tutorial/origin
[cloudera@quickstart ~]$ mkdir -p /streamsets/tutorial/destination
[cloudera@quickstart ~]$ mkdir -p /streamsets/tutorial/error
[cloudera@quickstart ~]$ sudo chmod -R 777 /streamsets
[cloudera@quickstart ~]$ cd /streamsets/tutorial/origin/
[cloudera@quickstart origin]$ wget https://www.streamsets.com/documentation/datacollector/sample_data/tutorial/nyc_taxi_data.csv
--2016-07-20 01:56:40--  https://www.streamsets.com/documentation/datacollector/sample_data/tutorial/nyc_taxi_data.csv
Resolving www.streamsets.com... 162.242.235.205
Connecting to www.streamsets.com|162.242.235.205|:443... connected.
HTTP request sent, awaiting response... 301 Moved Permanently
Location: https://streamsets.com/documentation/datacollector/sample_data/tutorial/nyc_taxi_data.csv [following]
--2016-07-20 01:56:41--  https://streamsets.com/documentation/datacollector/sample_data/tutorial/nyc_taxi_data.csv
Resolving streamsets.com... 162.242.235.205
Connecting to streamsets.com|162.242.235.205|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 1054132 (1.0M) [text/csv]
Saving to: “nyc_taxi_data.csv”

100%[======================================>] 1,054,132    828K/s   in 1.2s    

2016-07-20 01:56:43 (828 KB/s) - “nyc_taxi_data.csv” saved [1054132/1054132]

[cloudera@quickstart origin]$ 

基本チュートリアル

チュートリアルに従って進めていきます。

パイプラインの作成とパイプラインのプロパティの定義

まずはGUIからパイプラインを作成してみます。Data Collector の Web UIから「Create New Pipeline」をクリックします。

ss_top2.png

パイプラインの名前は Tutorial1 としてみました。

t1.png

「Save」を押して保存すると、空のキャンバスが表示されます。

t2.png

キャンバスの説明はドキュメントに書かれています。
https://streamsets.com/documentation/datacollector/latest/help/#Pipeline_Configuration/DataCollectorWindow-Config.html

Directory Origin を設定する

パイプラインの「入り口」に相当するのがOriginです。今回はダウンロードしたサンプルのCSVファイルがあるディレクトリをOriginにします。ドロップダウンメニューの「Select Origin」 -> 「Directory」を選択するか、右側のStage LibraryからDirectory Originを選びます。

t3.png

t4.png

キャンバスに Directory Originが配置されました。アイコンをクリックし、下の画面からFilesタブをクリックし、次のパラメータを指定します。

  • Data Format: Delimited
  • Files Directory: /streamsets/tutorial/origin
  • Read Order: Last Modified Timestamp
  • File Name Pattern: taxi.csv
  • First File to Process: 空のまま

Data FormatにDelimitedを指定するとDelimitedタブで詳細設定ができます。今回は以下のパラメータを指定します。

  • File Type: Default CSV
  • Header Line: With Header Line (CSVの先頭行にヘッダ行があるので)
  • Root Field Type: List-Map

データのプレビュー

Originで登録したデータをプレビューしてみましょう。データが正しいかどうかをGUIから「見る」ことができるのは便利ですね。

チュートリアルによれば、この状態でPreviewアイコンをクリックできるようですが、手元の環境ではうまくいきませんでした。左側の❗️をクリックしてエラーを解決します。今回はCREATION_009というエラーが表示されています。

t7.png

エラーデータの扱いを指定しなければならないようなので、まずは無視することにしました。

  • Error Record: Discard (Libary Basic)

今度はPreviewアイコンがクリックできるようになっているので、クリックしてプレビューしましょう。

t9.png

t10.png

出力結果は以下のようになりました。CSVファイルの各列をリスト形式で表示しています。

t11.png

テーブル形式でも表示できます!これ、CSVファイルをエディタなどで見る必要ないですし、ヘッダ行も表示してくれていますし、便利ですねぇ....

t12.png

今回のチュートリアルでは、支払いタイプ(/payment_type)とクレジットカード情報(/credit_card)列を利用するようです。終わったら Close Preview アイコンをクリックします。

t13.png

Stream Selector でデータの経路を作る

「Select Processor to Connect」->「Stream Selector」を選択します。

t14.png

Stream Selector の General タブで、Required Fields テキストボックスを指定します。

t15.png

今回は支払いの種類がないレコードをdiscardしたいので、以下の設定を行います。

  • Required Fields: /payment_type

Conditionタブで条件を指定します。

条件により経路が変わります。今回は条件を満たしたデータ(支払いの種類がクレジットカード)が出力ストリーム(1)に流れていくことになります。満たさなかったデータは(2)に流れます。条件の指定の構文を覚える必要はありますが、自動補完が良くできているので、割とサクサク入力できる印象です。

  • ${record:value('/payment_type')=='CRD'}

t16.png

2系統の出力があることがわかりますね。

Jythonを使用してクレジットカードの種類を取得する

ストリーム中のクレジットカード情報のデータを扱うため、Jythonのスクリプトを使用します。「Select processor to connect」からJython Evaluator processorを追加します。

t17.png

続いでJythonタブのスクリプトテキストボックスに、チュートリアルのJythonスクリプトを貼り付けます。

t18.png

クレジットカード番号をマスクする

セキュリティ上、クレジットカード番号をそのまま利用するのは避けたいものです。Data Collectorにはマスクの機能もあるのでデータフローでカード番号をマスクします。先ほどと同様の手順で、キャンバスにField Maskerを追加します。以下のプロパティを指定し、正規表現を使ってマスクします。

  • Mask Type: Regular Expression
  • Regular Expression: (.*)([0-9]{4})
  • Groups to Show: 2

t19.png

Destination に書き込む

パイプラインの結果の出力先を指定します。チュートリアルではローカルファイルシステムになっていますが、せっかくのCloudera Manager管理しているHDFSがあるわけですから今回はHDFSを指定しようかと思ったのですが、Kerberos化していたの後日試すことにしました。「Select Destination to Connect」から「Hadoop FS - CDH5.7.1」を選択します。

t20.png

Hadoop FS Propertyは、暫定的に書き込み先を file:///にします。

  • Hadoop FS URI: file:///
  • HDFS user: cloudera

t21.png

Output Filesタブで以下のプロパティを指定します。

  • Data Format: Delimited
  • File Prefix: _out
  • Directory Template: /streamsets/tutorial/destination
  • Data Timezone: JST

Delimitedタブで以下のプロパティを設定します。

  • Header Line: With Header Line

Expression Evaluatorで対応するフィールドを追加する

先ほどのJython Evaluatorではクレジットカード情報のデータを処理し、クレジットカードの種類を追加しています。ここでデータをプレビューしてみましょう。先ほどと同じように「Preview」をクリックし、Jython Evaluatorをクリックします。
入力データと出力データが表示されます。期待通り(?)、出力にはcredit_card_typeのデータが含まれていますね!

t22.png

クレジットカード支払いでなかった場合も考慮する必要があるので、Stream Selectorのdefaultの出力に「Expression Evaluator」を追加します。

t23.png

Expressionタブのプロパティを以下のように指定します。

  • Output Field: n/a
  • Header attributeは削除

終わったら、Expression EvaluatorとHadoop FSをつなぎます。

t24.png

データルールとアラートの作成

さて、いよいよ最後です。Stream SelectorからJython Evaluatorの間の線をクリックしてルールを追加します。今回はクレジットカードデータがない(エラー)の場合のルールを追加をしています。

Data Ruleのプロパティ
- Label: Missing Card Numbers
- Condition: ${recod:value("/credit_card")==""}
- Sampling Percentage: 35
- Alert text: At least 10 missing credit card numbers!
- Threshold Value: 10

パイプラインを実行する

いよいよ実行です。▶︎のアイコンをクリックして実行します。

t26.png

クレジットカードのエラーは40件のようです。

t27.png

出力されたファイルも見てみましょう。クレジットカード番号はマスクされ、最後の列にカードの種類が出力されています。

[cloudera@quickstart origin]$ head /streamsets/tutorial/destination/_tmp__out
F6F7D02179BE915B23EF2DB57836442D,088879B44B80CC9ED43724776C539370,VTS,CRD,12,0.5,0.5,1.75,0,14.75,1,2013-01-13 04:36asdf,2013-01-13 04:46asdf,5,600,3.12,-73.996933,40.720055,-73.993546,40.693043,xxxxxxxxxxxx2922,Visa
BE386D8524FCD16B3727DCF0A32D9B25,4EB96EC9F3A42794DEE233EC8A2616CE,VTS,CRD,12,0.5,0.5,3.12,0,16.12,1,2013-01-13 04:37:00,2013-01-13 04:48:00,2,660,3.39,-74.000313,40.730068,-73.987373,40.768406,xxxxxxxxxxxx0902,MasterCard
E9FF471F36A91031FE5B6D6228674089,72E0B04464AD6513F6A613AABB04E701,VTS,CRD,5.5,0.5,0.5,1.2,0,7.7,1,2013-01-13 04:41:00,2013-01-13 04:45:00,1,240,1.16,-73.997292,40.720982,-74.000443,40.732376,xxxxxxxxxxxx9608,Visa
A5D125F5550BE7822FC6EE156E37733A,08DB3F9FCF01530D6F7E70EB88C3AE5B,VTS,CRD,11,0.5,0.5,2,0,14,1,2013-01-13 04:37:00,2013-01-13 04:47:00,5,600,2.91,-73.966843,40.756741,-73.987885,40.722713,xxxxxxxxxxxx7252,Visa
EE1513D432B07F7E0B5E2ED1EF629086,F31D261881520931062C011366E56A04,VTS,CRD,18,0.5,0.5,3.7,0,22.7,1,2013-01-13 04:30:00,2013-01-13 04:44:00,6,840,5.21,-74.005455,40.740772,-73.967354,40.798096,xxxxxxxxxxx7913,AMEX

まとめ

正直に言ってかなり使い易いと思います。HDFS、HBase、Kafka等、様々なエコシステムへのパイプラインを作成できるので、一度試してみてはいかがでしょうか?

おまけ

Cloudera ManagerでData Collectorのモニタリング
t28.png

Why not register and get more from Qiita?
  1. We will deliver articles that match you
    By following users and tags, you can catch up information on technical fields that you are interested in as a whole
  2. you can read useful information later efficiently
    By "stocking" the articles you like, you can search right away
5
Help us understand the problem. What are the problem?