1
1

More than 5 years have passed since last update.

[Elasticsearch] NEST 5.0 リリース記念 Ingest Nodeを使うPipelineの書き方サンプルなど

Posted at

概要

Elasticsearchに.NETからつなげるクライアントライブラリの5.0がリリースされた記念ということで、Elasticsearch5.0から入ったIngest Nodeをこのライブラリを使ったらどう使えるのよ、というところを試した記録です。

5.0のリリースの案内はこちら
NEST 5.0 released

試した環境は以下の通りです。

製品 version 備考
elasticsearch 5.1.1 elastic.co配布のDockerイメージ
kibana 5.1.1 elastic.co配布のDockerイメージ
NEST 5.0.0

実践

dependenciesに追加

dotnet new -t console

の直後に生成されたproject.jsonに対して追加した例がこちら。"Nest": "5.0.0"の箇所です。

{
  "version": "1.0.0-*",
  "buildOptions": {
    "debugType": "portable",
    "emitEntryPoint": true
  },
  "dependencies": {
    "Nest": "5.0.0"
  },
  "frameworks": {
    "netcoreapp1.1": {
      "dependencies": {
        "Microsoft.NETCore.App": {
          "type": "platform",
          "version": "1.1.0-preview1-001100-00"
        }
      },
      "imports": "dnxcore50"
    }
  }
}

X-Pack導入環境に対しての接続

X-Packを導入して認証を求められる環境に対して接続する場合は、以下のような方法がある。

  • Uriの前にユーザ名、パスワードを書く
var settings = new ConnectionSettings(new Uri("http://elastic:changeme@192.168.xxx.xxx:9200"))
            .DisableDirectStreaming(); //これはデバッグ用
var client = new ElasticClient(settings);
  • ConnectionSettingsのBasicAuthenticationで指定して接続
var settings = new ConnectionSettings(new Uri("http://192.168.xxx.xxx:9200"))
            .DisableDirectStreaming()   //これはデバッグ用
            .BasicAuthentication("elastic", "changeme");
var client = new ElasticClient(settings);

Ingest Node

Tweeterのtweetを例として、Ingest Nodeを使用例を説明したIngest Node: A Client's Perspectiveを参考にします。

Pipeline、Processorの内容や大まかな流れとしては、こういう理解で良いと思います。

  1. 加工処理の中身(Processor)を決めて、順序を定めてパイプラインを作成する
  2. パイプラインをelasticsearchに登録する
  3. Index生成時に使用するパイプラインを指定する
  4. パイプラインで処理が行われて、指定された処理が終わったあとのデータがIndexとして登録される

Apache Solrで言うところのUpdateRequestProcessorや、古くはFAST ESPで言うところのDocument Proccessorに相当するもんだと思います。

使えるProcessorについては、公式ドキュメントを確認されたい。
https://www.elastic.co/guide/en/elasticsearch/reference/5.1/ingest-processors.html
experimentalとかついているのもあるので、そこは注意が必要。

Ingest Nodeの使い方例

var res = client.PutPipeline("tweet-pipeline", p => p
                .Processors(ps => ps
                    .Convert<Tweet>(c => c
                        .Field(t => t.Retweets)
                        .Type(ConvertProcessorType.Integer))
                    .Script(s => s
                        .Lang("painless")
                        .Inline("ctx.retweets_calc = ctx.retweets * params.val")
                        .Params(param => param
                            .Add("val", 10))
                        )
                    .Grok<Tweet>(g => g
                        .Field(t => t.Message)
                        .Patterns(new string[]{
                            "%{WORD:word1} %{WORD:word2}"
                        }))
                    .Set<Tweet>(s => s
                        .Field("fixedField")
                        .Value("fixedValue1, fixedvalue2"))
                    .Split<Tweet>(s => s
                        .Field("fixedField")
                        .Separator(",[\\s]*"))
                    .Uppercase<Tweet>(u => u
                        .Field(t => t.Lang))))
                    ;

以下、今回使ってたProcessorごとにコメントしていく。

Convert

型を変換するくらいなら、最初からその型でフィールドを宣言してたらいいがな、ということもあろうかと思います。
または、マッピングをちゃんと書いておけばいいじゃない、と。

しかしながら、そうはイカのふんどし、のっぴきならない事情に対応する1つのアプローチとして、Convertが使えるかもしれません。

  .Convert<Tweet>(c => c
           .Field(t => t.Retweets)
          .Type(ConvertProcessorType.Integer))

この例は、Tweet.Retweetsがstring型だけども、入っている値はIntegerなのでintegerに変換する場合の例です。
Fieldに変換元の値が入っているフィールド、Typeに変換先の型を書きます。

ConvertProcessorTypeでは、次の項目が設定できます。

  • Integer
  • Float
  • String
  • Boolean
  • Auto

Script

何らかのスクリプトを実行して、その結果を格納したいんや!というときに使えそうです。

.Script(s => s
       .Lang("painless")
       .Inline("ctx.retweets_calc = ctx.retweets * params.val")
       .Params(param => param
            .Add("val", 10)
       )

Langでは、デフォルトがpainlessというelasticさんのオリジナル言語です。
他にはgroovyやpythonも使えるそうですし、得意なのを選べばいいと思います。

上の例では、retweetsに格納されている値に対して、外から与えられたパラメータ params.valの値を乗じる、ということをやっています。
もちろん、if文やfor文なども使えるので、ちょっとした加工であれば、独自のIngestNodeプラグインを書くまでもなく、これで出来そうです。

ctx.XXXXXXXと、ctxとフィールド名でパイプラインで処理されている中身のデータに対してアクセスできるようです。
また引数はparamsで指定し、key, valueのように設定しておくことで、スクリプトではparamsでアクセスできるようです。

ctxがContextに見えれば、何も説明なくてもApacheのcommons-chainのようなもんか、と想像できて納得できるのではと思います。

Grok

みんな大好きGrok!Logstashでログファイルを読み込んで加工する、といった場合において、もっともお世話になったのではないでしょうか。

.Grok<Tweet>(g => g
            .Field(t => t.Message)
            .Patterns(new string[]{"%{WORD:word1} %{WORD:word2}"}))

上の例では、Messageフィールドに入っている値をPatternsで設定した条件にあてはめ、結果を格納します。

もし、Messageが「Hello World!」という文字列だったらば、word1フィールドに"Hello"、word2フィールドに"World!"と展開されてはいることになります。

Set

固定値を入れてやりたいとき、ありますよね。そういうときに使えそうです。

.Set<Tweet>(s => s
           .Field("fixedField")
           .Value("fixedValue1, fixedvalue2"))

上の例では、fixedFieldというところに"fixedValue1, fixedValue2"という値を設定します。

.Fieldの書き方ですが、.Field(t => t.XXXXXX) というような書き方ができますが、.Field("フィールド名")のように書くと、
Tweetクラスに存在しないフィールドを指定することもできるようです。

もちろん、そんなことをすると検索した結果をDeserializeしてオブジェクトとしてとってきたときに、値が取れないということになるわけですが、Pipelineを通っている間だけ使う仮の値を格納するとか、そういう用途では使うことがあるかもしれません。

Split

カンマ区切りやタブ区切り、改行コードで区切られた文字列があって、これをSplitして各要素として使いたいとき、特にAggregationしたいフィールドでは、こういうことが多いように思います。
keywordやtag情報をDBに格納するときに、指定文字で連結して入れる、とかそんなシステムがあるかもしれません。

.Split<Tweet>(s => s
             .Field("fixedField")
             .Separator(","))

上の例では、fixedFieldに入っている文字列をSeparatorで指定した"," で区切って、結果をfixedFieldに格納します。
今回は、Set ProcessorでfixedFieldには、"fixedValue1, fixeValue2" と入っていましたが(この時点では、fixedFieldはただのstring)、ここでSplitされることで、fixedFieldは、["fixedValue1", " fixedValue2"]とarrayになることが確認できます。

単純にカンマで区切ってしまうと、今回の例ですとfixedValue2の前に半角空白が入ってしまいます。
Separatorをこうしてやると、変な空白はなくなります。

.Split<Tweet>(s => s
             .Field("fixedField")
             .Separator(",[\\s]*"))

メモ) Trim ProcessorはArrayのフィールドには適用できない

Uppercase

大文字に変換して格納しなおす、というだけのもの。わかりやすいので説明を割愛します。

参考

Tweetクラス

public class Tweet
{
    public string Message { get; set; }
    public string Lang { get; set; }
    public string Retweets { get; set; }
}

Index作成

client.Index(new Tweet { Retweets = "123", Message = "Hallo Twitter Co,.Ltd!", Lang = "nl" }, i => i
                .Index("tweets")
                .Pipeline("tweet-pipeline")
            );

これをIndexすると、先のPipelineで処理された結果、こうなる。
Consoleより検索を実行して確認

GET tweets/_search
{
  "query" : {
    "match_all": {}
  }
}
{
  "took": 3,
  "timed_out": false,
  "_shards": {
    "total": 5,
    "successful": 5,
    "failed": 0
  },
  "hits": {
    "total": 1,
    "max_score": 1,
    "hits": [
      {
        "_index": "tweets",
        "_type": "tweet",
        "_id": "AVj4sLYcvxqDpVrCocq3",
        "_score": 1,
        "_source": {
          "fixedField": [
            "fixedValue1",
            "fixedvalue2"
          ],
          "word1": "Hallo",
          "word2": "Twitter",
          "message": "Hallo Twitter Co,.Ltd!",
          "lang": "NL",
          "retweets": 123,
          "retweets_calc": 1230
        }
      }
    ]
  }
}

おわりに

Ingest Nodeは、Apache Solrで独自のProcessorを作ってUpdate Request Processorsに追加していた人、FAST ESPをやっていた人にはすごく馴染みやすいと思う。
もちろん、どこまでIngest Nodeでやって、どこまでを投げる側が事前にやっておくか、といった線引きはあるだろう。

NESTは、書き方ってこういう感じなのね、と1度確認できれば、学習コストがあまり高くない、分かりやすい作りになっているように思う。
kibana/Consoleで出来ることと、似たような感じで実装されているので、メソッドも探しやすい。
.NETCoreもサポートされているところが個人的にかなりプラス評価。

まだ開発は活発に行われているようなので、バグに泣くかもしれないけども、そこはご愛敬。

1
1
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
1
1