この記事はElastic Stack (のうち Elasticsearch, Kibana, Logstash)に関するHow Toの個人的メモ書きです。
Elastic Stack 7.2をRHEL 7にインストールした前提で記載しています。
Elasticsearch
Elasticsearchを外部サーバーからアクセスする
/etc/elasticsearch/elasticsearch.yml に以下を追加。
network.bind_host: 0
discovery.seed_hosts: ["127.0.0.1", "[::1]"]
デフォルトでは、localhostにバインドされます。0は0.0.0.0と同義で、すべてのアドレスにバインドされます。
バインドを変えるとdiscoveryでエラーになるため、seed_hostsの設定も追加します。
データファイルを移設する
Elasticsearchをインストールした状態では、データファイルは /var/lib/elasticsearch
に作成されます。
これを別のディレクトリに移動するには、以下の通り実行します。
-
/var/lib/elasticsearch
にあるファイルをtar
して、移動先に移動します。 -
/etc/elasticsearch/elasticsearch.yml
で、移動先ファイルを参照するようにします。
# ----------------------------------- Paths ------------------------------------
#
# Path to directory where to store the data (separate multiple locations by comma):
#
path.data: /path/to/the/new/location
path.data
にデータファイルのありかが定義されていますので、これを移動先に書き換えます。
Elasticsearch でindexの一覧を表示する
GET /_cat/indices?v
ElasticsearchでN-gramを使う
以下のようにindexを作っておく。
全角半角は同一と見なし、また大文字小文字も同一とします。
ただし、「サイド」は「サイド」で検索することはできないようです。「サイト」ではマッチします。
PUT /my-index
{
"settings": {
"analysis": {
"analyzer": {
"ngram_analyzer": {
"type": "custom",
"tokenizer": "ngram_1to2_tokenizer",
"filter": ["cjk_width", "lowercase"]
}
},
"tokenizer": {
"ngram_1to2_tokenizer": {
"type": "ngram",
"min_gram": 1,
"max_gram": 2,
"token_chars": []
}
}
}
}
}
さらに、文字列のフィールドにAnalyzerを設定しておきます。
PUT /my-index/_mapping
{
"properties": {
"body": {
"type": "text",
"analyzer": "ngram_analyzer"
}
}
}
これで、bodyフィールドをnGramで検索できるようになりました。
例えば、bodyフィールドのみを持つインデックスを作成し、n-gramで検索できるようにするには、以下のように設定します。
PUT /my-index
{
"mappings" : {
"properties" : {
"body" : {
"type" : "text",
"analyzer": "ngram_analyzer"
}
}
},
"settings" : {
"analysis": {
"analyzer": {
"ngram_analyzer": {
"type": "custom",
"tokenizer": "ngram_1to2_tokenizer",
"filter": ["cjk_width", "lowercase"]
}
},
"tokenizer": {
"ngram_1to2_tokenizer": {
"type": "ngram",
"min_gram": 1,
"max_gram": 2,
"token_chars": []
}
}
}
}
}
インデックス作成のパフォーマンス改善
大量のドキュメントを作成する場合で、即時に検索結果に反映する必要がない場合は、refreshの間隔を長くすることでパフォーマンスを改善できます。
デフォルトは1秒ですが、これをrefresh_interval
で長く設定できます。
PUT /my-index
{
"settings" : {
"index" : {
"refresh_interval" : "-1",
}
}
}
この例では-1
にして、自動更新を無効にしています。この場合は、GET /my-index/_refresh
を呼び出して手動で更新できます。
Kibana
Kibanaを外部サーバーからアクセスする
/etc/kibana/kibana.ymlに以下を追加。
server.host: 0.0.0.0
デフォルトではlocalhostにバインドされます。
Logstash
Logstashでファイル読み込み後ファイルが削除されなくする
デフォルトではファイル読み込み後削除されます。これは、file_completed_actionがdeleteのためです。削除されないためにはlogに設定します。
input {
file {
mode => "read"
path => ["/my-file.csv"]
file_completed_action => "log"
file_completed_log_path => "/file-complete-log.log"
LogstashでCSV読み込み時、余計なフィールドが登録されないようにする
CSVを読み込む際、デフォルトでmessage、host、pathがフィールドとして登録されるようです。不要な場合は、remove_fieldで設定します。
filter {
csv {
remove_field => ["message","host","path"]
Python API
PythonからElasticsearchを呼び出したい
公式ページに記載があります。
ガイド
API Reference
基本的には、
pip install elasticsearch
したあとに、
from elasticsearch import Elasticsearch
es = Elasticsearch()
とすればよいです。
localhostにサーバーが動作している前提で呼び出されますので、リモートのサーバーを呼び出す際は、
client = Elasticsearch(hosts='192.168.0.200:9200')
のように指定できます。
XMLからインポート
XMLの大きなデータを一括取り込みをする場合は、以下のようにできます。
XMLからインポートデータを生成するgeneratorを設定し、streaming_bulk
に渡すだけです。
from elasticsearch import Elasticsearch
from elasticsearch.helpers import streaming_bulk
import lxml.etree as ET
def gen_data():
with open('/my/xml/file', mode='rb') as f:
context = ET.iterparse(f, events=('end',), tag='s') # to import <s> tag
for _, elem in context:
yield {'body': elem.text}
elem.clear()
for ancestor in elem.xpath('ancestor-or-self::*'):
while ancestor.getprevious() is not None:
del ancestor.getparent()[0]
del context
es = Elasticsearch(timeout=60)
for ok, action in streaming_bulk(client=es, index='my-data', actions=gen_data()):
if not ok:
print(f'error: {action}')
環境やデータによってはタイムアウトするかもしれませんので、必要に応じてサーバーに接続する際に timeout
を調整してください。
(その他、何かあれば追記します)