LoginSignup
5
11

More than 5 years have passed since last update.

ElasticsearchにPDI (Spoon)を使って一括登録を行う方法

Last updated at Posted at 2016-05-11

はじめに

ここでは、PDI(Pentaho Data Integration)をつかって、CSVデータを一括登録する方法を紹介する。
データソースをDBに変更することにより、既存のデータベースからデータを取得し、登録する事も可能となる。

PENTAHOとは?

BIツールやETLツールをまとめたツール群。無償で利用できるCommunity版と有償版がある。ここでは、Community版を利用する。
このツールを利用することで、CSV→DB、DB→DB、CSV→RESTサーバ等、データの移行が(プログラムを書くのに比べて)簡単にできるようになる。

下記から公式HPにアクセスできる。
http://community.pentaho.com/

環境準備

PDIダウンロード

下記からプログラムをダウンロードする。(600MB程度)
https://sourceforge.net/projects/pentaho/files/Data%20Integration/6.1/pdi-ce-6.1.0.1-196.zip/download

サンプルCSVファイルダウンロード

GithubからサンプルCSVデータを取得する。
https://github.com/nagase2/iotworkshop

※同レポジトリに、PDIのスクリプトも置いてあります。

起動

インストーラ等は存在しないため、ダウンロードしたZipファイルを解凍し、その中にあるspoon.batを実行する。(Linux/Macの場合はspoon.sh)

実行計画作成

CSVファイルの読み込み

image

データソースの作成

ドラッグ&ドロップしたアイコンをダブルクリックすると、下記のウインドウが表示される為、下記の手順でCSVファイルを読み込む

日付には下記のテキストを記述する。
yyyy/MM/dd'T'HH:flag_mm:ssZ

rows_as_groups.png
すべてのフィールドに値が表示されていれば成功。

rows_as_groups.png

CSV→RESTコマンド変換スクリプトの作成

rows_as_groups.png

ドロップしたアイコンをダブルクリックし、開いたウインドウに下記のスクリプトを貼り付ける

bulk文字列作成
import java.util.*;

public boolean processRow(StepMetaInterface smi, StepDataInterface sdi) throws Exception 
{
    //RESTに渡す文字列
    String body="";
    Object[] r = null;
    //データを纏める件数をここで調整
    for(int i=0;i<5;i++){

        //一行すすめる
         r=getRow();

         if (r==null)
        {
            break;
        }
        String firstLine ="{ \"create\" : { \"_retry_on_conflict\" : 2} }\n";
        String secondLine = "{"
        +"\"title_str\":\""+ get(Fields.In, "タイトル").getString(r) +"\","
        +"\"body\":\""+ get(Fields.In, "本文").getString(r) +"\","
        +"\"created_date\":\""+  get(Fields.In, "作成日").getString(r) +"\","
        +"\"update_count\":\""+ get(Fields.In, "更新カウント").getString(r) +"\","
        +"\"view_count\":\""+ get(Fields.In, "ビューカウント").getString(r) +"\","
        +"\"posted_location\": ["+ get(Fields.In, "緯度").getString(r)+"," +get(Fields.In, "経度").getString(r)+"]"
        +"}\n";

        //取得したデータをbody(戻り値)に詰める
        body+=firstLine+secondLine;

    }
   Object[] outputRowData = RowDataUtil.resizeArray(r, data.outputRowMeta.size());
    int outputIndex = getInputRowMeta().size();

    //OutPutの3番めに値を追加(0スタート)
   outputRowData[outputIndex++] =body;

  //行出力
   putRow(data.outputRowMeta, outputRowData);

   if (r==null)
   {
    //処理を終了して次の処理へ進む
    setOutputDone();
    return false;
   }
   //継続
   return true;
}

UPSERTコマンド例

UPSEARTコマンドを使った場合の例は下記の通り。

updateサンプル
POST /index7/testdoc/_bulk
{ "update" : {"_id" : "999", "_retry_on_conflict" : 2}}
{ "doc" :{"会社コード":"1234","勘定科目":"D1130","グループコード":"CD001","取引先コード":"VR2334","伝票番号":"1223456"}

登録先情報の作成(ElasticSearchサーバ:)

RESTClientを右にドラッグ&ドロップする
image

作成したアイコンをダブルクリックし、下記の通り修正する。
image

ログの出力設定

左のDesignウインドウから"Write to log"をドラッグし、前に作成したRESTClientからLogに矢印をつなぐ
image

実行

画面左上にある下記のボタンをクリックするとウインドウが表示されるので、画面下に有るLaunchをクリックする。
image

結果確認

画面下のLoggingタブをクリックし、出力されているログを確認する。errors:falseとなっていれば問題なく登録されている(はず)。Errors:trueとなっている場合は、どこかに問題が有るのでエラーメッセージを確認して対象箇所を修正する。
image

注意点

大量データ登録の際のOutofMemorryエラー

一度に大量のデータを更新する場合、環境によってはOutOfMemoryエラーが発生する場合がある。その場合は作成したスクリプトをPanを使って実行することでこの問題を回避できる場合が有る。

Panでの実行コマンドは下記の通り。
(created_script.ktrはPDIを使って作成したスクリプトと仮定する)

/PDIinstalled/path/data-integration/pan.sh   /file:/path/to/script/created_script.ktr -param:paramname=param_value

まとめ

ここではPDIを使ってデータを登録する方法を紹介した。
この方法を応用することで、最初に紹介したとおり、データソースを変更することによりデータの取得もとをDBに変更することもできるし、スクリプトを記述することでデータの変更や追加なども可能になるなど、いろいろと応用ができるため、実際に試してみてマスターしておくことをおすすめする。

Kibanaを使ってグラフ&ダッシュボード作成する場合はこちら
http://qiita.com/windows222/private/098ded70fb6a6b3c49d2

5
11
1

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
5
11