はじめに
Elasticsearchにはbulk insertというデータを一括でindexに登録する仕組みがありますが,登録するためのjsonフォーマットが少々複雑で用意するのがちょっと面倒です.
そこで調べた方法を3つほど共有します.
- jq
- yield
- list
元となるデータ形式や,CLIで使用するのかなどによって使い分けて頂ければと思います.
対象者
この記事は下記のような項目について知りたい人を対象にしています。
- Elasticsearchのbulk-insertの手法について
- pythonアプリケーションからのbulk-insertについて
- pythonアプリケーションとElasticsearchの連携について
1. jq を使用した方法
datasetがjsonファイルであること,cliからファイルを生成したい という場合はこちら
cat dataset.json | jq -c '.[] | ({"index":{"_index": "{index_name}", "_id": {id_data}}}, .)' > bulk_dataset.json
{index_name} などは入力したい形式に合わせて変更してください.
以下の記事を参考にさせていただきました.
2. yield を使用した方法
アプリケーション内で という場合はこちら.
txt,csv,tsvファイル => 読み込んでiterate という流れなのでデータファイルに依存せず使えます.
# bulk_insertのためのフォーマット変換
def yield_bulk_insert_dataset(data: list):
for idx, text in enumerate(data):
yield {
"_op_type": "create",
"_index": "{作成したindex名称}",
"_source": {"id": idx, "texts": text},
}
# txtファイルからの読み込み
def load_txt(filepath: str) -> list:
with open(filepath, "r", encoding="utf-8") as f:
lines = f.read().splitlines()
return lines
# load
data = load_txt("data/filenameXXX.txt")
# bulk_insert実行
helpers.bulk(es, yield_bulk_insert_dataset(data))
今回はtxtファイルを1行ずつ読み込んで,バルクインサートするというcodeになってます.
tsv, csvファイルからの読み込みの場合はDataFrameで読み込んでから整形というのがよさそうです.
def csv2dict(filepath: str) -> list:
# csvからdataframeで読み込み
df = pd.read_csv(filepath, header=None)
# 使用するカラム以外をdrop
usecols = ["col1", "col2"]
df = df[usecols]
# dict型に変換
col1 = df.col1.values.tolist()
col2 = df.col2.values.tolist()
data =[{"col1": c1, "col2": c2 } for c1, c2 in zip(col1, col2)]
return data
後で思いついたのですが,dict変換はdf.to_dict(orient='records')
と書くのが良かったみたいです.
何はともあれ,テキストファイルの時と同様に
data = csv2dict("data/filenameXXX.csv")
helpers.bulk(es, yield_bulk_insert_dataset(data))
のように使える形式になった(はず)です.
以下の記事の バルクインサートについての項目を参考にさせていただきました.
3. list
[{
"_op_type": "create",
"_index": "{作成したindex名称}",
"_source": {"id": idx1, "texts": text1},
},
{
"_op_type": "create",
"_index": "{作成したindex名称}",
"_source": {"id": idx2, "texts": text2},
},
...
]
入力するdict形式のlistを作成する手法です.
- 大量のデータをlistとして保持するのがあんまり筋が良くない
- 多分これなら yield でいい
ということで小さめのデータなら使用することもあるかもという印象です.
その他
pythonアプリケーションとElasticsearchをdockerで構築
今回は上の記事を参考に若干versionを上げて実験してみました.
- elasticsearch 7.4.2 => 7.16.2
# Version 7.16.2 を選択してイメージを取得する
FROM docker.elastic.co/elasticsearch/elasticsearch:7.16.2
# 日本語を扱う場合はplugin「analysis-kuromoji」をインストールする
RUN elasticsearch-plugin install analysis-kuromoji
- python 3.8 => 3.9
FROM python:3.9
...
# clientである「elasticsearch」をインストール
RUN pip install numpy \
matplotlib \
pandas \
elasticsearch == 8.1.0 \
elasticsearch client は古めのものをversion指定してあげると上手くいきやすいかもしれません
まとめ
- jsonファイルからすぐにbulk_insertを試してみたい => jq
- jsonファイル以外 && アプリケーションに組み込む => yield
ということになるかなと思います.
誤りなどあればご指摘,編集リクエストなどいただけると大変助かります.