はじめに
PythonでElasticsearchを使う機会があったため情報を収集していましたが、サイトで使われているElasticsearchのバージョンが古かったり、そもそも、情報が少なかったりしたので、今回、メモとして簡単な例と共に基本的な使い方をまとめました。
この記事を読むことで、低レベルクライアント(elasticsearch-py)を使ったインデックスおよびドキュメントのCRUD操作ができるようになります。
環境
前提として、Elasticsearchはすでに立ち上がっていることとします。
また、実行環境は以下の通りです。特殊なライブラリを使っていないので、どの環境でも基本的には問題ないと思います。
- 利用環境
Ubuntu 20.04 LTS
- Elasticsearch環境
7.7.0
- Python環境
$ python -V
Python 3.8.4
準備
- pipでElasticsearchクライアントのインストール
$ python -m pip install elasticsearch
基本操作
ここでは、基本的なPythonからElasticsearchを操作する方法を紹介します。
接続
インストールしたelasticsearchパッケージを利用してElasticsearchが起動しているホスト(localhost)へ接続する方法は以下の通りです。
基本的な接続方法
特にElasticsearch側で認証を設定していなければ、この方法で接続できます。
from elasticsearch import Elasticsearch
# Elasticsearchクライアント作成
es = Elasticsearch("http://localhost:9200")
ポートや、httpまたはhttpsを指定することもできます。
# Elasticsearchインスタンスを作成
es = Elasticsearch(
["localhost", "otherhost"],
scheme="http",
port=9200
)
HTTP認証を利用した接続
ElasticsearchにIDやパスワードを設定している場合は、この方法で接続できます。
es = Elasticsearch(
"http://localhost:9200",
http_auth=("user_id", "password")
)
user_idやpasswordは設定されたID、パスワードを表しています。
接続の解除
上記で確立した内部接続を**close()**で閉じることができます。
# Elasticsearchインスタンスを作成
es = Elasticsearch("http://localhost:9200")
# 内部接続を閉じる
es.close()
余談ですが、close()しないと、インスタンスがガーベージコレクションされた際に、例外が発生します。これを防ぐためにも、明示的に書いたほうが良いでしょう。
This warning is created by aiohttp when an open HTTP connection is garbage collected. You’ll typically run into this when closing your application. To resolve the issue ensure that close() is called before the AsyncElasticsearch instance is garbage collected.
https://elasticsearch-py.readthedocs.io/en/master/async.html?highlight=close#receiving-unclosed-client-session-connector-warning
インデックスの基本操作
インデックスを扱うための基本的な操作について紹介します。
インデックスの操作には、indicesという属性を使います。
インデックスの作成
studentsというインデックスを作成します。
タイプやドキュメントが入っていない空のインデックスです。
es.indices.create(index='students')
マッピングを使った方法
データタイプやインデックスの構造を指定して作成できます。
mapping = {
"mappings": {
"properties": {
"name": {"type": "text"},
"age": {"type": "long"},
"email": {"type": "text"}
}
}
}
es.indices.create(index="students", body=mapping)
インデックス情報の収集
ここでは、インデックスの情報を取得する方法を紹介します。
インデックス一覧の取得
接続しているElasticsearchでは、どのようなインデックスがあるのか確認したい場合は、cat属性の**indices(index="*", h="index")**を利用します。
indicesはインデックス情報を返すメソッドです。
今回は、全インデックスを一覧で取得したいので、引数のindexにはワイルドカードを指定します。
また、引数hには、列名を指定することで列情報が改行区切りで返ってきます。
# インデックス一覧の取得
indices = es.cat.indices(index='*', h='index').splitlines()
# インデックスの表示
for index in indices:
print(index)
実行結果は、以下の通りです。
.apm-custom-link
.kibana_task_manager_1
.apm-agent-configuration
students
.kibana_1
作成したstudentsだけではなく、デフォルトで入っているインデックスも表示されました。
インデックスのマッピングの確認
特定のインデックスのマッピングを確認する場合は、**get_mapping(index="インデックス名")**を利用します。
print(es.indices.get_mapping(index="students"))
実行結果は以下の通りです。
{'students': {'mappings': {'properties': {'age': {'type': 'long'}, 'email': {'type': 'text', 'fields': {'keyword': {'type': 'keyword', 'ignore_above': 256}}}, 'name': {'type': 'text', 'fields': {'keyword': {'type': 'keyword', 'ignore_above': 256}}}}}}}
また、引数にindexを指定しない場合、全インデックスのマッピングを取得することができます。
print(es.indices.get_mapping())
実行結果は割愛します。
インデックスの更新
マッピングを変更してインデックスの構造を更新する場合は、**put_mapping(index="インデックス名", body="変更分のマッピング")**を利用します。
例えば、studentsに新たに学籍番号を追加する場合は以下のようになります。
mapping = {
"properties": {
"student_number": {"type": "long"}
}
}
es.indices.put_mapping(index="students", body=mapping)
指定するマッピングは全て渡す必要はなくて、差分だけで良いです。
また、インデックス作成の際にはmappings配下にネストしましたが、更新の場合は、そうではないことに注意しましょう。
現在のマッピングは、以下のようになっています。
{
"students": {
"mappings": {
"properties": {
"age": {
"type": "long"
},
"email": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
},
"name": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
},
"student_number": {
"type": "long"
}
}
}
}
}
見ずらいですが、最後に新たに追加したstudent_numberが表示されています
インデックスの削除
特定のインデックスを削除したい場合は、**delete(index="インデックス名")**を利用します。
今回は、今まで操作してきたstudentsを消してみます。
es.indices.delete(index="students")
インデックスの存在確認
エラーハンドリングを追加するにあたって、インデックスが存在するか確認したい。
そのような場合は、**exists(index="インデックス名")**を利用します。
print(es.indices.exists(index="students"))
実行結果は、以下のようになります
False
先程、インデックスを消したのでFalseが返ってきました。
存在する場合は、Trueが返ってきます。
ドキュメントの基本操作
次は、ドキュメントを扱うための基本的な操作について紹介します。
ドキュメントの作成
ドキュメントを新しく登録するには、**create(index="インデックス名", id="ドキュメントID", body="新規ドキュメント")**を利用します。
※ インデックスが無い場合は、登録するドキュメントから自動的に型を判断して作成されます。
# 登録したいドキュメント
student = {
"name": "Taro",
"age": 36,
"email": "taro@example.com"
}
# ドキュメントの登録
es.create(index='students', id=1, body=student)
登録に成功すると、以下のような結果が出力されます。
{'took': 1, 'timed_out': False, '_shards': {'total': 1, 'successful': 1, 'skipped': 0, 'failed': 0}, 'hits': {'total': {'value': 0, 'relation': 'eq'}, 'max_score': None, 'hits': []}}
バルクインサート
createを複数回呼び出すことで、複数ドキュメントを登録できますが、**bulk(インスタンス, データ)**を使うことで、一度に登録できます。
引数で渡すデータの構造は、以下の通りで、少々複雑です。
{
"_op_type": "createやdelete、updateといったアクションを指定"
"_index": "インデックス名"
"_id": "ドキュメントのIDを指定(createの場合はなくても良い)"
"_source": "登録したいドキュメント"
}
上記のデータを配列に格納して、bulkに渡すことで複数ドキュメントを操作することができます。
今回は、配列ではなくyield
を使った方法をサンプルとして紹介します。
from elasticsearch import Elasticsearch, helpers # bulkを使うために追加
# Elasticsearchインスタンスを作成
es = Elasticsearch("http://localhost:9200")
def gendata():
# 登録したいドキュメント
students = [
{
"name": "Jiro",
"age": 25,
"email": "jiro@example.com"
},
{
"name": "Saburo",
"age": 20,
"email": "saburo@example.com"
}
]
# bulkで扱えるデータ構造に変換します
for student in students:
yield {
"_op_type": "create",
"_index": "students",
"_source": student
}
# 複数ドキュメント登録
helpers.bulk(es, gendata())
※ 100MBを超える大量のドキュメントをバルクインサートを実行する場合、エラーが発生します。このようなときは、記事下部に記載した非同期で大量のドキュメントをバルクインサートを参照ください。
ドキュメントの検索
ドキュメントを検索する方法を2つ紹介します。
クエリを使って検索
登録したドキュメントを検索するには、**search(index="インデックス名", body="検索クエリ", size=検索数)**を利用します。
bodyやsizeは指定しない場合は、全件表示されます。
# ageの値が20より大きいドキュメントを検索するためのクエリ
query = {
"query": {
"range": {
"age": {
"gt": 20
}
}
}
}
# ドキュメントを検索
result = es.search(index="students", body=query, size=3)
# 検索結果からドキュメントの内容のみ表示
for document in result["hits"]["hits"]:
print(document["_source"])
実行結果は以下のようになります。
{'name': 'Taro', 'age': 36, 'email': 'taro@example.com'}
{'name': 'Jiro', 'age': 25, 'email': 'jiro@example.com'}
Saburoの歳は20歳なので、検索から外れ表示されていません。
IDを使って検索
ドキュメントIDを指定して直接検索する場合は、**get_source(index="インデックス名", id="ドキュメントID")**を利用します。
サンプルでは、idが1のドキュメントを検索しています。
print(es.get_source(index="students", id=1))
実行結果は以下の通りです。
{'name': 'Taro', 'age': 36, 'email': 'taro@example.com'}
idが1のTaro
が表示されました。
###ドキュメント数の取得
インデックスの中にドキュメント数がいくつあるのか確認したい場合は、**count(index="インデックス名")**を利用します。
print(es.count(index="students"))
実行結果は以下の通りです。
{'count': 3, '_shards': {'total': 1, 'successful': 1, 'skipped': 0, 'failed': 0}}
countの値が3となっていいるので、インデックスstudentsには現在3つのドキュメントが存在していることがわかります。
ドキュメントの更新
ドキュメント更新には**update(index="インデックス名", id="ドキュメントID", body="変更内容")**を利用します。
# doc配下に変更したいパラメータを記述
student = {
"doc": {
"age": 40
}
}
# ドキュメントIDを指定して更新
es.update(index="students", id=1, body=student)
# 更新されているか確認
print(es.get_source(index="students", id=1))
get_source
を使って更新されているか確認しました。
{'name': 'Taro', 'age': 40, 'email': 'taro@example.com'}
ageが40に変更されています。
ドキュメントの削除
特定のドキュメントを削除する場合は、**delete(index="インデックス名", id="ドキュメントID")**を利用します。
サンプルでは、idが1のドキュメントを削除します。
es.delete(index="students", id=1)
このドキュメントが本当に削除されているかの確認は次で行います。
ドキュメントの存在確認
ドキュメントの存在を確認するためには、**exists(index="インデックス名", id="ドキュメントID")**を利用します。
print(es.exists(index="students", id=1))
False
先程、deleteでidが1のドキュメントを消しているので、Falseが返却され存在しないと表示されました。
存在する場合は、Trueなので、存在と表示されるはずです。
※ 余談ですが、インデックスには存在しているものの、_source
があるのか調べるにはexists_sourceを利用します。
アドバンス
ここでは、通常あまり使わないようなメソッドや小ネタの紹介をします。
非同期で検索
ドキュメント数が何万とある場合、検索に時間がかかってしまいます。
このような場合、v7.8.0
からサポートされたAsyncElasticsearchを使うことで非同期でリソースを効率的に検索することが出来ます。
準備
機能を使うためにはasyncioをインストールする必要がありあす。
$ python -m pip install elasticsearch [ async ] > = 7 .8.0
非同期の検索サンプル
非同期で検索するためのサンプルを用意しました。
asyncやawaitを付け加えただけで、基本的にはsearchとやることは変わりません。
import asyncio
from elasticsearch import AsyncElasticsearch
# 非同期対応したElasticsearchインスタンスを作成
es = AsyncElasticsearch("http://localhost:9200")
async def main():
# 非同期検索
result = await es.search(
index="students",
body={"query": {"match_all": {}}},
size=20
)
# 検索結果の表示
for student in result['hits']['hits']:
print(student['_source'])
# セッションをクローズ
await es.close()
# イベントループを取得
loop = asyncio.get_event_loop()
# 並列に実行して終るまで待つ
loop.run_until_complete(main())
動きとしては、
-
asyncio.get_event_loop()
でイベントループを取得 - 並列で動かしたい関数
main()
にasync
をつけて定義 - 時間がかかる処理
search
はawait
をつけて宣言 - イベントループの
run_until_complete
で並列的に実行しつつ終るまで待つ
このような感じです。
非同期でバルクインサート
次は、非同期でバルクインサートを実行する方法を紹介します。
コードはbulkとほとんど変わらず、asyncやawaitおよび非同期に対応したasync_bulkを利用するだけです。
import asyncio
from elasticsearch import AsyncElasticsearch
from elasticsearch.helpers import async_bulk
# 非同期対応したElasticsearchインスタンスを作成
es = AsyncElasticsearch("http://localhost:9200")
async def gendata():
# 登録したいドキュメント
students = [
{
"name": "Siro",
"age": 19,
"email": "siro@example.com"
},
{
"name": "Goro",
"age": 13,
"email": "goro@example.com"
}
]
# bulkで扱えるデータ構造に変換します
for student in students:
yield {
"_op_type": "create",
"_index": "students",
"_source": student
}
async def main():
# 非同期でバルクインサートを実行
await async_bulk(es, gendata())
# セッションをクローズ
await es.close()
# イベントループを取得
loop = asyncio.get_event_loop()
# 並列に実行して終るまで待つ
loop.run_until_complete(main())
実行後、登録されたドキュメントを確認すると
{'name': 'Jiro', 'age': 25, 'email': 'jiro@example.com'}
{'name': 'Saburo', 'age': 20, 'email': 'saburo@example.com'}
{'name': 'Siro', 'age': 19, 'email': 'siro@example.com'}
{'name': 'Goro', 'age': 13, 'email': 'goro@example.com'}
SaburoとGoroが追加されていることがわかります。
非同期で大量のドキュメントをバルクインサート
100MBを超えるドキュメントを一度にバルクインサートすることは出来ません。
これは、Elasticsearch側で上限が設定されているためです。
http.max_content_length
The max content of an HTTP request. Defaults to 100MB.
https://www.elastic.co/guide/en/elasticsearch/reference/current/modules-http.html
このような大量のデータを扱う場合、**async_streaming_bulk(client, actions, その他パラメータ)**を利用して、非同期でドキュメントを複数(チャンク)に分割して登録を行います。
パラメータ
指定できるパラメータについて紹介します。
-
chunk_size
- 型:整数
- Elasticsearchへ一度に送信するドキュメント数(デフォルト:500)
-
max_chunk_bytes
- 型:整数
- リクエストの最大バイトサイズ(デフォルト:100MB)
-
raise_on_error
- 型:Bool
- exceptで
BulkIndexError
発生時に、エラーリストを取得できるようになる(デフォルト: True)
-
raise_on_exception
- 型:Bool
-
False
にすると、bulk失敗時に例外を発生させず、最後に失敗したアイテムの報告のみ行う(デフォルト:True)
-
max_retries
- 型:整数
- ステータスエラー
429
発生時に再試行を行う回数(デフォルト:0)
-
initial_backoff
- 型:整数
- 再試行まで待機する秒数、2回以降は
待機秒数 x 2
となる(デフォルト:2)
-
max_backoff
- 型:整数
- 再試行が待機する最大秒数(デフォルト:600)
-
yield_ok
- 型:Bool
-
False
にすると、出力結果からbulkに成功したドキュメントが表示されなくなる(デフォルト:True)
サンプル
以下に簡単なサンプルを紹介します。
import asyncio
from elasticsearch import AsyncElasticsearch
from elasticsearch.helpers import async_streaming_bulk, BulkIndexError
# 非同期対応したElasticsearchインスタンスを作成
es = AsyncElasticsearch("http://localhost:9200")
async def gendata():
# 登録したいドキュメント
students = [
{
"name": "Siro",
"age": 19,
"email": "siro@example.com"
},
{
"name": "Goro",
"age": 13,
"email": "goro@example.com"
}
]
# bulkで扱えるデータ構造に変換します
for student in students:
yield {
"_op_type": "create",
"_index": "students",
"_source": student
}
async def main():
try:
# ドキュメントを複数(チャンク)に分けてバルクインサート
async for ok, result in async_streaming_bulk(client=es,
actions=gendata(),
chunk_size=50, # 一度に扱うドキュメント数
max_chunk_bytes=52428800 # 一度に扱うバイト数
):
# 各チャンクごとの実行結果を取得
action, result = result.popitem()
# バルクインサートに失敗した場合
if not ok:
print(f"failed to {result} document {action}")
# 例外処理
except BulkIndexError as bulk_error:
# エラーはリスト形式
print(bulk_error.errors)
# セッションのクローズ
await es.close()
# イベントループを取得
loop = asyncio.get_event_loop()
# 並列に実行して終るまで待つ
loop.run_until_complete(main())
今回指定した、チャンクに関するパラメータは、
- チャンク数:50
- チャンクの最大バイト数:50MB
こんな感じです。
これらは、登録したいデータに応じてチューニングすると良いと思います。
特定のステータスによるエラーを無視
引数ignoreに無視したい特定のステータスコードを指定することでエラーを無視することができます。
例えば、存在しないインデックスを削除しようとした場合、404エラーが発生します。
elasticsearch.exceptions.NotFoundError: NotFoundError(404, 'index_not_found_exception', 'no such index [test-index]', test-index, index_or_alias)
サンプルではこれを無視してみます。
# 404と400で発生するエラーを無視
es.indices.delete(index='test-index', ignore=[400, 404])
タイムアウト
es側ではデフォルトでタイムアウトが設定されていますが、自分で設定することも出来ます。
方法は簡単で、引数に**request_timeout=秒数(浮動小数点)**を渡すだけです。
あえて短い秒数を指定して、タイムアウトさせるサンプルを用意しました。
print(es.cluster.health(wait_for_status='yellow', request_timeout=0.001))
実行すると、指定した0.001秒以内に処理が終了しなかったため、以下のようなタイムアウトのエラーが表示されました。
elasticsearch.exceptions.ConnectionError: ConnectionError((<urllib3.connection.HTTPConnection object at 0x7f11297c5520>, 'Connection to localhost timed out. (connect timeout=0.001)')) caused by: ConnectTimeoutError((<urllib3.connection.HTTPConnection object at 0x7f11297c5520>, 'Connection to localhost timed out. (connect timeout=0.001)'))
レスポンスの整形
Elasticsearchからのレスポンスをそのまま表示すると
{'students': {'mappings': {'properties': {'age': {'type': 'long'}, 'email': {'type': 'text', 'fields': {'keyword': {'type': 'keyword', 'ignore_above': 256}}}, 'name': {'type': 'text', 'fields': {'keyword': {'type': 'keyword', 'ignore_above': 256}}}}}}}
このようにとても見づらくなってしまいます。
そこで、jsonパッケージを使って整形することで見やすく表示できます。
import json
from elasticsearch import Elasticsearch
# Elasticsearchインスタンスを作成
es = Elasticsearch("http://locahost:9200")
# マッピング情報の取得
response = es.indices.get_mapping(index="students")
# レスポンスの整形
print(json.dumps(response, indent=2))
動きとしては、
json.dumpsに整形したいデータと、インデントのスペース数を指定するだけです。
今回はレスポンスのネストが深くなることが予想されたためインデントは2を指定しました。
{
"students": {
"mappings": {
"properties": {
"age": {
"type": "long"
},
"email": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
},
"name": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
}
}
}
}
}
見やすく整形されました。
おわりに
Python Elasticserachのクライアントの基本的な使い方を紹介しました。
直接APIを叩くよりも、用意された便利なメソッドを使うことで、直感的に操作できるのでとても便利に感じました。
使い方が間違っていたり、その他便利な方法があればコメントお願いします!
参考サイト
- Python Elasticsearch公式
https://elasticsearch-py.readthedocs.io/en/master/api.html - PythonでElasticsearchの操作
https://blog.imind.jp/entry/2019/03/08/185935 - 【Python】asyncio(非同期I/O)のイベントループをこねくり回す
https://qiita.com/ynakaDream/items/b63fab24bb30dea6ddb1