はじめに
Elasticsearchの特定のインデックスにサンプルデータを投入したい。でも手元にあるのはキー項目は共通な別々のCSVファイル。えーこれ全部手でマージすんの? それかスクリプト書く? めんどくさ。。 そんな経験みなさんにもありますよね?
そんな時、(結局Pythonスクリプトは書きますが)Elandを使うとPandasのData FrameをそのままElasticsearchとやりとりできるので便利です。というかPandasが便利です。
依存ライブラリ
今回、Pythonのpandas, elasticsearch, elandの各ライブラリを使いますので、インストールされていない場合は以下のコマンドでインストールします。
$ pip install pandas elasticsearch eland
Elandの詳細についてはドキュメントを確認してください。
CSVデータをData Frameに読み込んで結合する
手元に次のようなCSVがあるとします。
dau.csv
date,daily_active_users
2023/08/01,1000
2023/08/02,1002
2023/08/03,980
newusers.csv
date,new_users
2023/08/01,23
2023/08/02,31
2023/08/03,7
この状態から日付ごとのdaily_active_usersとnew_usersをフィールドとしてもつElasticsearchのインデックスを作ることを考えます。
まずはこれらのCSVをData Frameとして読み込みます。
import pandas as pd
dau_df = pd.read_csv("dau.csv", encoding='utf-8', index_col=0, header=0, parse_dates=True)
newusers_df = pd.read_csv("newusers.csv", encoding='utf-8', index_col=0, header=0, parse_dates=True)
Pandasのread_csvは色々なオプションがあり、それだけですごい長い記事を書かれている方もいるようですのでここで詳しく説明はできませんが、使っているものだけかいつまんで説明します。
- header: 数値をいじることで、例えば最初の行に余計な説明文とかが入っている場合に読み飛ばしたりできるので便利です。
- index_col: どのカラムをData Frameのインデックス(ID)にするかを指定します。ここでは日付ごとにまとめるので最初のカラムを指定しています。
- parse_dates: 日付を自動的にパースしてくれるオプションです。便利。
さて、読み込んだData Frameはconcat関数を使えば結合できます。今回はカラムを横に追加するので、axis=1を指定します。
all = pd.concat([dau_df, newusers_df], axis=1)
daily_active_users | new_users | |
---|---|---|
date | ||
2023-08-01 | 1000 | 23 |
2023-08-02 | 1002 | 31 |
2023-08-03 | 980 | 7 |
おお、結合できました。Pandas便利ですね。
Elasticsearchに投入する
それではこのData FrameをElasticsearchのIndexに投入してみましょう。PandasのData FrameをElasticsearchに投入するにはElasticが公開しているライブラリのElandを使います。
ElasticsearchにData Frameを投入する関数はed.pandas_to_elandです。
import eland as ed
from elasticsearch import Elasticsearch
es = Elasticsearch(
'http://localhost:9200',
basic_auth=('username', 'password')
)
ed.pandas_to_eland(
all,
es,
'daily_user_index',
es_if_exists="replace",
es_refresh=True,
use_pandas_index_for_es_ids=True
)
Elasticsearch側で投入されたか調べてみます。KibanaのDev Toolsで以下のリクエストを実行します。
GET daily_user_index/_search
Response
{
"took": 0,
"timed_out": false,
"_shards": {
"total": 1,
"successful": 1,
"skipped": 0,
"failed": 0
},
"hits": {
"total": {
"value": 3,
"relation": "eq"
},
"max_score": 1,
"hits": [
{
"_index": "daily_user_index",
"_id": "2023-08-01 00:00:00",
"_score": 1,
"_source": {
"daily_active_users": 1000,
"new_users": 23,
"date": "2023-08-01T00:00:00"
}
},
{
"_index": "daily_user_index",
"_id": "2023-08-02 00:00:00",
"_score": 1,
"_source": {
"daily_active_users": 1002,
"new_users": 31,
"date": "2023-08-02T00:00:00"
}
},
{
"_index": "daily_user_index",
"_id": "2023-08-03 00:00:00",
"_score": 1,
"_source": {
"daily_active_users": 980,
"new_users": 7,
"date": "2023-08-03T00:00:00"
}
}
]
}
}
投入できています。ただここで一つ注意なのが、Data Frameのindexになっていた日付情報が、ドキュメントIDとして格納されている点です。use_pandas_index_for_es_ids=True
の設定(これがデフォルト)が効いているわけですが、これをFalseにすると、indexの項目(日付)が完全に失われてしまいます。
そこで通常のElasticsearchでのデータの持ち方に近くするのであれば、以下のように日付を通常のフィールドに変換して投入する方が良いかもしれません。
all['date'] = all.index
ed.pandas_to_eland(
all,
es,
'daily_user_index',
es_if_exists="replace",
es_refresh=True,
use_pandas_index_for_es_ids=False
)
この状態で検索すると、Elasticsearch上では以下のように登録されています。
{
"took": 0,
"timed_out": false,
"_shards": {
"total": 1,
"successful": 1,
"skipped": 0,
"failed": 0
},
"hits": {
"total": {
"value": 3,
"relation": "eq"
},
"max_score": 1,
"hits": [
{
"_index": "daily_user_index",
"_id": "I2Cp3IsBCU3flpJPM4Go",
"_score": 1,
"_source": {
"daily_active_users": 1000,
"new_users": 23,
"date": "2023-08-01T00:00:00"
}
},
{
"_index": "daily_user_index",
"_id": "JGCp3IsBCU3flpJPM4Go",
"_score": 1,
"_source": {
"daily_active_users": 1002,
"new_users": 31,
"date": "2023-08-02T00:00:00"
}
},
{
"_index": "daily_user_index",
"_id": "JWCp3IsBCU3flpJPM4Go",
"_score": 1,
"_source": {
"daily_active_users": 980,
"new_users": 7,
"date": "2023-08-03T00:00:00"
}
}
]
}
}
Elasticsearch的にはこの方が自然ですね。
ElasticsearchのIndexをData Frameとしてロードする
ElandにはElasticsearchのインデックスをData Frameとして読み出す関数が用意されています。今登録したデータを再度読み出してみましょう。ed.DataFrame
関数を使います。
ed_df = ed.DataFrame(es, 'daily_user_index')
ed_df
daily_active_users | date | new_users | |
---|---|---|---|
I2Cp3IsBCU3flpJPM4Go | 1000 | 2023-08-01 | 23 |
JGCp3IsBCU3flpJPM4Go | 1002 | 2023-08-02 | 31 |
JWCp3IsBCU3flpJPM4Go | 980 | 2023-08-03 | 7 |
3 rows × 3 columns
おお、ナチュラルにインデックスのデータをData Frameに変換できました。便利ですね。
この例ではインデックスの全件を読み込んでいますが、条件を絞り込んで読み込むなどの操作も可能なので一度ドキュメントを確認してください。
ただこのed_df
はeland.dataframe.DataFrame
クラスのオブジェクトになっています。場合によっては都合が悪いので、PandasのData Frameに変換してみましょう。eland_to_pandasを使います。
pd_df = ed.eland_to_pandas(ed_df)
こうするとこのpd_df
はpandas.core.frame.DataFrame
クラスのオブジェクトになります。
おわりに
Elandを使うとPandasのData FrameとElasticsearchのIndexを簡単に相互変換できることがわかりました。これで今回のように手元のCSVをPandasで読み込んで操作してからElasticsearchに流し込んだり、Elasticsearchに溜まっているデータをPandasに読み込んで分析するなど、さまざまな利用方法が考えられますね。
さらに進んだ使い方として、ElasticsearchのデータをPandasで読み込んだ後にScikit LearnのXGBoostなどで学習させて、出来上がった予測モデルをElasticsearchにアップロードして利用する、などの使い方もできます。機械学習が得意な方はぜひ試してみてください。
またElasticsearch上でデータフレーム分析を実装することも可能です。そちらについては別の記事を書きましたので興味がある方は参照してください。