概要
Elasticsearchに.NETからつなげるクライアントライブラリの5.0がリリースされた記念ということで、Elasticsearch5.0から入ったIngest Nodeをこのライブラリを使ったらどう使えるのよ、というところを試した記録です。
5.0のリリースの案内はこちら
NEST 5.0 released
試した環境は以下の通りです。
製品 | version | 備考 |
---|---|---|
elasticsearch | 5.1.1 | elastic.co配布のDockerイメージ |
kibana | 5.1.1 | elastic.co配布のDockerイメージ |
NEST | 5.0.0 |
実践
dependenciesに追加
dotnet new -t console
の直後に生成されたproject.jsonに対して追加した例がこちら。"Nest": "5.0.0"の箇所です。
{
"version": "1.0.0-*",
"buildOptions": {
"debugType": "portable",
"emitEntryPoint": true
},
"dependencies": {
"Nest": "5.0.0"
},
"frameworks": {
"netcoreapp1.1": {
"dependencies": {
"Microsoft.NETCore.App": {
"type": "platform",
"version": "1.1.0-preview1-001100-00"
}
},
"imports": "dnxcore50"
}
}
}
X-Pack導入環境に対しての接続
X-Packを導入して認証を求められる環境に対して接続する場合は、以下のような方法がある。
- Uriの前にユーザ名、パスワードを書く
var settings = new ConnectionSettings(new Uri("http://elastic:changeme@192.168.xxx.xxx:9200"))
.DisableDirectStreaming(); //これはデバッグ用
var client = new ElasticClient(settings);
- ConnectionSettingsのBasicAuthenticationで指定して接続
var settings = new ConnectionSettings(new Uri("http://192.168.xxx.xxx:9200"))
.DisableDirectStreaming() //これはデバッグ用
.BasicAuthentication("elastic", "changeme");
var client = new ElasticClient(settings);
Ingest Node
Tweeterのtweetを例として、Ingest Nodeを使用例を説明したIngest Node: A Client's Perspectiveを参考にします。
Pipeline、Processorの内容や大まかな流れとしては、こういう理解で良いと思います。
- 加工処理の中身(Processor)を決めて、順序を定めてパイプラインを作成する
- パイプラインをelasticsearchに登録する
- Index生成時に使用するパイプラインを指定する
- パイプラインで処理が行われて、指定された処理が終わったあとのデータがIndexとして登録される
Apache Solrで言うところのUpdateRequestProcessorや、古くはFAST ESPで言うところのDocument Proccessorに相当するもんだと思います。
使えるProcessorについては、公式ドキュメントを確認されたい。
https://www.elastic.co/guide/en/elasticsearch/reference/5.1/ingest-processors.html
experimentalとかついているのもあるので、そこは注意が必要。
Ingest Nodeの使い方例
var res = client.PutPipeline("tweet-pipeline", p => p
.Processors(ps => ps
.Convert<Tweet>(c => c
.Field(t => t.Retweets)
.Type(ConvertProcessorType.Integer))
.Script(s => s
.Lang("painless")
.Inline("ctx.retweets_calc = ctx.retweets * params.val")
.Params(param => param
.Add("val", 10))
)
.Grok<Tweet>(g => g
.Field(t => t.Message)
.Patterns(new string[]{
"%{WORD:word1} %{WORD:word2}"
}))
.Set<Tweet>(s => s
.Field("fixedField")
.Value("fixedValue1, fixedvalue2"))
.Split<Tweet>(s => s
.Field("fixedField")
.Separator(",[\\s]*"))
.Uppercase<Tweet>(u => u
.Field(t => t.Lang))))
;
以下、今回使ってたProcessorごとにコメントしていく。
Convert
型を変換するくらいなら、最初からその型でフィールドを宣言してたらいいがな、ということもあろうかと思います。
または、マッピングをちゃんと書いておけばいいじゃない、と。
しかしながら、そうはイカのふんどし、のっぴきならない事情に対応する1つのアプローチとして、Convertが使えるかもしれません。
.Convert<Tweet>(c => c
.Field(t => t.Retweets)
.Type(ConvertProcessorType.Integer))
この例は、Tweet.Retweetsがstring型だけども、入っている値はIntegerなのでintegerに変換する場合の例です。
Fieldに変換元の値が入っているフィールド、Typeに変換先の型を書きます。
ConvertProcessorTypeでは、次の項目が設定できます。
- Integer
- Float
- String
- Boolean
- Auto
Script
何らかのスクリプトを実行して、その結果を格納したいんや!というときに使えそうです。
.Script(s => s
.Lang("painless")
.Inline("ctx.retweets_calc = ctx.retweets * params.val")
.Params(param => param
.Add("val", 10)
)
Langでは、デフォルトがpainlessというelasticさんのオリジナル言語です。
他にはgroovyやpythonも使えるそうですし、得意なのを選べばいいと思います。
上の例では、retweetsに格納されている値に対して、外から与えられたパラメータ params.valの値を乗じる、ということをやっています。
もちろん、if文やfor文なども使えるので、ちょっとした加工であれば、独自のIngestNodeプラグインを書くまでもなく、これで出来そうです。
ctx.XXXXXXXと、ctxとフィールド名でパイプラインで処理されている中身のデータに対してアクセスできるようです。
また引数はparamsで指定し、key, valueのように設定しておくことで、スクリプトではparamsでアクセスできるようです。
ctxがContextに見えれば、何も説明なくてもApacheのcommons-chainのようなもんか、と想像できて納得できるのではと思います。
Grok
みんな大好きGrok!Logstashでログファイルを読み込んで加工する、といった場合において、もっともお世話になったのではないでしょうか。
.Grok<Tweet>(g => g
.Field(t => t.Message)
.Patterns(new string[]{"%{WORD:word1} %{WORD:word2}"}))
上の例では、Messageフィールドに入っている値をPatternsで設定した条件にあてはめ、結果を格納します。
もし、Messageが「Hello World!」という文字列だったらば、word1フィールドに"Hello"、word2フィールドに"World!"と展開されてはいることになります。
Set
固定値を入れてやりたいとき、ありますよね。そういうときに使えそうです。
.Set<Tweet>(s => s
.Field("fixedField")
.Value("fixedValue1, fixedvalue2"))
上の例では、fixedFieldというところに"fixedValue1, fixedValue2"という値を設定します。
.Fieldの書き方ですが、.Field(t => t.XXXXXX) というような書き方ができますが、.Field("フィールド名")のように書くと、
Tweetクラスに存在しないフィールドを指定することもできるようです。
もちろん、そんなことをすると検索した結果をDeserializeしてオブジェクトとしてとってきたときに、値が取れないということになるわけですが、Pipelineを通っている間だけ使う仮の値を格納するとか、そういう用途では使うことがあるかもしれません。
Split
カンマ区切りやタブ区切り、改行コードで区切られた文字列があって、これをSplitして各要素として使いたいとき、特にAggregationしたいフィールドでは、こういうことが多いように思います。
keywordやtag情報をDBに格納するときに、指定文字で連結して入れる、とかそんなシステムがあるかもしれません。
.Split<Tweet>(s => s
.Field("fixedField")
.Separator(","))
上の例では、fixedFieldに入っている文字列をSeparatorで指定した"," で区切って、結果をfixedFieldに格納します。
今回は、Set ProcessorでfixedFieldには、"fixedValue1, fixeValue2" と入っていましたが(この時点では、fixedFieldはただのstring)、ここでSplitされることで、fixedFieldは、["fixedValue1", " fixedValue2"]とarrayになることが確認できます。
単純にカンマで区切ってしまうと、今回の例ですとfixedValue2の前に半角空白が入ってしまいます。
Separatorをこうしてやると、変な空白はなくなります。
.Split<Tweet>(s => s
.Field("fixedField")
.Separator(",[\\s]*"))
メモ) Trim ProcessorはArrayのフィールドには適用できない
Uppercase
大文字に変換して格納しなおす、というだけのもの。わかりやすいので説明を割愛します。
参考
Tweetクラス
public class Tweet
{
public string Message { get; set; }
public string Lang { get; set; }
public string Retweets { get; set; }
}
Index作成
client.Index(new Tweet { Retweets = "123", Message = "Hallo Twitter Co,.Ltd!", Lang = "nl" }, i => i
.Index("tweets")
.Pipeline("tweet-pipeline")
);
これをIndexすると、先のPipelineで処理された結果、こうなる。
Consoleより検索を実行して確認
GET tweets/_search
{
"query" : {
"match_all": {}
}
}
{
"took": 3,
"timed_out": false,
"_shards": {
"total": 5,
"successful": 5,
"failed": 0
},
"hits": {
"total": 1,
"max_score": 1,
"hits": [
{
"_index": "tweets",
"_type": "tweet",
"_id": "AVj4sLYcvxqDpVrCocq3",
"_score": 1,
"_source": {
"fixedField": [
"fixedValue1",
"fixedvalue2"
],
"word1": "Hallo",
"word2": "Twitter",
"message": "Hallo Twitter Co,.Ltd!",
"lang": "NL",
"retweets": 123,
"retweets_calc": 1230
}
}
]
}
}
おわりに
Ingest Nodeは、Apache Solrで独自のProcessorを作ってUpdate Request Processorsに追加していた人、FAST ESPをやっていた人にはすごく馴染みやすいと思う。
もちろん、どこまでIngest Nodeでやって、どこまでを投げる側が事前にやっておくか、といった線引きはあるだろう。
NESTは、書き方ってこういう感じなのね、と1度確認できれば、学習コストがあまり高くない、分かりやすい作りになっているように思う。
kibana/Consoleで出来ることと、似たような感じで実装されているので、メソッドも探しやすい。
.NETCoreもサポートされているところが個人的にかなりプラス評価。
まだ開発は活発に行われているようなので、バグに泣くかもしれないけども、そこはご愛敬。