MariaDBの表データからPythonスクリプトでElasticsearchにインポートする方法を記載します。
RDBのデータをインデックス化する時の参考になればと思います。
MariaDBのインストールとサンプルデータの準備は下記を参照しました。
Ubuntu 18.04 LTS に MariaDB 10.3 をインストール
https://qiita.com/cherubim1111/items/61cbc72673712431d06e
【SQL】MySQL公式サンプルデータベースを使う
https://qiita.com/yukibe/items/fc6016348ecf4f3b10bf
MySQL Serverに外部から接続する
https://qiita.com/tocomi/items/0c009d7299584df49378
上記のサンプルデータベースのcityテーブルを丸ごとインポートすることを目指したいと思います。
0. 事前準備
事前にPython3とpython3-pipが導入されているものとします。
スクリプトで使用するMySQL(MariaDB)とElasticsearchのPythonライブラリをインストールします。
なお、MySQLのPythonライブラリはいくつかありますが、個人的にはpymysqlが最も安定しているように感じました。
pip3 install pymysql
pip3 install elasticsearch
1. cityインデックスのmapping定義
cityテーブルの構造は下記の通りです。
MariaDB [world]> desc city;
+-------------+----------+------+-----+---------+----------------+
| Field | Type | Null | Key | Default | Extra |
+-------------+----------+------+-----+---------+----------------+
| ID | int(11) | NO | PRI | NULL | auto_increment |
| Name | char(35) | NO | | | |
| CountryCode | char(3) | NO | MUL | | |
| District | char(20) | NO | | | |
| Population | int(11) | NO | | 0 | |
+-------------+----------+------+-----+---------+----------------+
5 rows in set (0.005 sec)
mapping定義するにあたり、Name、CountryCode、Districtはのtypeはkeywordに、Populationのtypeはintegerとします。
keywordではなくtextと定義することも考えられますが、国名などは文章ではないのでkeywordで十分だと考えます。
なお、IDはドキュメントIDとして使用する想定で、mappingの定義内容には含みません。
cityインデックスのmappingを定義するPythonスクリプトは下記のとおりとなります。
# !/usr/bin/python3
from elasticsearch import Elasticsearch
from pprint import pprint
# Elasticsearch接続設定
es = Elasticsearch(['XX.XX.XX.XX:9200']) #ElasticsearchサーバーのIPアドレス
# mappingの定義クエリ
city = {}
city["settings"] = {}
city["settings"]["number_of_shards"] = 3
city["settings"]["number_of_replicas"] = 1
city["mappings"] = {}
city["mappings"]["properties"] = {}
city["mappings"]["properties"]["Name"] = {}
city["mappings"]["properties"]["Name"]["type"] = "keyword"
city["mappings"]["properties"]["CountryCode"] = {}
city["mappings"]["properties"]["CountryCode"]["type"] = "keyword"
city["mappings"]["properties"]["District"] = {}
city["mappings"]["properties"]["District"]["type"] = "keyword"
city["mappings"]["properties"]["Population"] = {}
city["mappings"]["properties"]["Population"]["type"] = "integer"
# クエリの内容を表示
pprint("クエリの内容を表示")
pprint(city)
print()
# クエリの実行
response = es.indices.create(index='city',body=city)
# クエリのレスポンスの表示
pprint("レスポンスの内容を表示")
pprint(response)
このスクリプトを実行すると下記のような結果が得られます。
$ ./putCityMapping.py
'クエリの内容を表示'
{'mappings': {'properties': {'CountryCode': {'type': 'keyword'},
'District': {'type': 'keyword'},
'Name': {'type': 'keyword'},
'Population': {'type': 'integer'}}},
'settings': {'number_of_replicas': 1, 'number_of_shards': 3}}
'レスポンスの内容を表示'
{'acknowledged': True, 'index': 'city', 'shards_acknowledged': True}
2. cityインデックスのmappingの定義内容の取得
続いて、作成したmappingの定義内容を表示するスクリプトを作成します。
内容は下記の通りです。
# !/usr/bin/python3
from elasticsearch import Elasticsearch
from pprint import pprint
# Elasticsearch接続設定
es = Elasticsearch(['XX.XX.XX.XX:9200']) #ElasticsearchサーバーのIPアドレス
response = es.indices.get_mapping(index='city')
pprint(response)
このスクリプトを実行すると下記のような結果が得られます。
$ ./getCityMapping.py
{'city': {'mappings': {'properties': {'CountryCode': {'type': 'keyword'},
'District': {'type': 'keyword'},
'Name': {'type': 'keyword'},
'Population': {'type': 'integer'}}}}}
3. cityドキュメントの作成(1つだけ)
作成したcityインデックスにドキュメントを1つだけ作成します。
いきなりCityテーブルから全データをインポートするのではなく、SQLで表示したデータを使用することにします。
ドキュメント化するデータは下記の通りです。
MariaDB [world]> select * from city where id=1;
+----+-------+-------------+----------+------------+
| ID | Name | CountryCode | District | Population |
+----+-------+-------------+----------+------------+
| 1 | Kabul | AFG | Kabol | 1780000 |
+----+-------+-------------+----------+------------+
1 row in set (0.004 sec)
このデータのドキュメントを作成するスクリプトは下記の通りです。
# !/usr/bin/python3
from elasticsearch import Elasticsearch
from pprint import pprint
# Elasticsearch接続設定
es = Elasticsearch(['XX.XX.XX.XX:9200']) #ElasticsearchサーバーのIPアドレス
index_id = "1"
kabul_doc = {}
kabul_doc["Name"] = "Kabul"
kabul_doc["CountryCode"] = "AFG"
kabul_doc["District"] = "Kabol"
kabul_doc["Population"] = 1780000
pprint(kabul_doc)
print()
response = es.index(index="city", doc_type="_doc", id=index_id,body=kabul_doc)
pprint(response)
このスクリプトを実行すると下記のような結果が得られます。
$ ./putKabulDoc.py
{'CountryCode': 'AFG',
'District': 'Kabol',
'Name': 'Kabul',
'Population': 1780000}
{'_id': '1',
'_index': 'city',
'_primary_term': 1,
'_seq_no': 0,
'_shards': {'failed': 0, 'successful': 1, 'total': 2},
'_type': '_doc',
'_version': 1,
'result': 'created'}
4. ドキュメントの表示
KabulドキュメントをElasticsearchから取得するスクリプトを作成します。
ここでは検索クエリではなく、id=1のドキュメントを取得するクエリとします。
# !/usr/bin/python3
from elasticsearch import Elasticsearch
from pprint import pprint
# Elasticsearch接続設定
es = Elasticsearch(['XX.XX.XX.XX:9200']) #ElasticsearchサーバーのIPアドレス
response = es.get(index='city',id='1')
pprint(response)
このスクリプトを実行すると下記のような結果が得られます。
$ ./getId1Doc.py
{'_id': '1',
'_index': 'city',
'_primary_term': 1,
'_seq_no': 0,
'_source': {'CountryCode': 'AFG',
'District': 'Kabol',
'Name': 'Kabul',
'Population': 1780000},
'_type': '_doc',
'_version': 1,
'found': True}
5.cityテーブルからPythonでデータを取得する
cityテーブルから全データをインポートする前に、テーブルからデータを取得する方法を確認したいと思います。
# !/usr/bin/python3
import pymysql.cursors
from pprint import pprint
# MySQL(MariaDB)接続設定
db = pymysql.connect(host="XX.XX.XX.XX", user="ユーザー名", password="パスワード")
cursor=db.cursor(pymysql.cursors.DictCursor)
cursor.execute("USE world")
db.commit()
sql = 'SELECT * FROM city'
cursor.execute(sql)
city = cursor.fetchall()
pprint(city)
このスクリプトを実行すると下記のような結果が得られます。
$ ./getCityTable.py
[{'CountryCode': 'AFG',
'District': 'Kabol',
'ID': 1,
'Name': 'Kabul',
'Population': 1780000},
{'CountryCode': 'AFG',
'District': 'Qandahar',
'ID': 2,
'Name': 'Qandahar',
'Population': 237500},
{'CountryCode': 'AFG',
'District': 'Herat',
'ID': 3,
'Name': 'Herat',
'Population': 186800},
(省略)
{'CountryCode': 'PSE',
'District': 'Rafah',
'ID': 4079,
'Name': 'Rafah',
'Population': 92020}]
6.全データのインポート
ようやくですが、cityテーブルの全データをスクリプトでインポートしたいと思います。
スクリプトの内容は下記の通りです。
# !/usr/bin/python3
import pymysql.cursors
from elasticsearch import Elasticsearch
from pprint import pprint
# MySQL接続設定
db = pymysql.connect(host="XX.XX.XX.XX", user="ユーザーID", password="パスワード")
cursor=db.cursor(pymysql.cursors.DictCursor)
cursor.execute("USE world")
db.commit()
# Elasticsearch接続設定
es = Elasticsearch(['XX.XX.XX.XX:9200'])
# MySQLからcityデータをまるごと取得
sql = 'SELECT * FROM city'
cursor.execute(sql)
city_table = cursor.fetchall()
# cityデータをレコード単位でループ
for city_record in city_table:
index_id = str(city_record["ID"])
city_doc = {}
city_doc["Name"] = city_record["Name"]
city_doc["CountryCode"] = city_record["CountryCode"]
city_doc["District"] = city_record["District"]
city_doc["Population"] = city_record["Population"]
response = es.index(index="city", doc_type="_doc", id=index_id, body=city_doc)
print(response)
このスクリプトを実行すると下記のような結果が得られます。
# ./importFromCityTable.py
{'_index': 'city', '_type': '_doc', '_id': '1', '_version': 2, 'result': 'updated', '_shards': {'total': 2, 'successful': 1, 'failed': 0}, '_seq_no': 1, '_primary_term': 1}
{'_index': 'city', '_type': '_doc', '_id': '2', '_version': 1, 'result': 'created', '_shards': {'total': 2, 'successful': 1, 'failed': 0}, '_seq_no': 0, '_primary_term': 1}
{'_index': 'city', '_type': '_doc', '_id': '3', '_version': 1, 'result': 'created', '_shards': {'total': 2, 'successful': 1, 'failed': 0}, '_seq_no': 1, '_primary_term': 1}
(省略)
{'_index': 'city', '_type': '_doc', '_id': '4078', '_version': 1, 'result': 'created', '_shards': {'total': 2, 'successful': 1, 'failed': 0}, '_seq_no': 1350, '_primary_term': 1}
{'_index': 'city', '_type': '_doc', '_id': '4079', '_version': 1, 'result': 'created', '_shards': {'total': 2, 'successful': 1, 'failed': 0}, '_seq_no': 1379, '_primary_term': 1}
インポートしたデータの確認はここでは省略したいと思います。
getId1Doc.pyを編集するか、Kibanaコンソールから確認して下さい。
7. Bulk APIを使ったインポート
cityテーブル程度のサイズでしたらimportFromCityTable.pyのように1つのドキュメントごとにPUTしてもそこまで時間はかかりませんが、都度通信が発生するためレコード数や1レコードのサイズが大きくなると時間がかかってしまいます。
そのような時は、Bulk APIを使用してドキュメントを配列形式でまとめてPUTした方がインポートの時間が短くて済みます。
Bulk APIを使用したスクリプトは下記のようになります。
# !/usr/bin/python3
import pymysql.cursors
from elasticsearch import Elasticsearch
from elasticsearch import helpers
from pprint import pprint
# MySQL接続設定
db = pymysql.connect(host="XX.XX.XX.XX", user="ユーザーID", password="パスワード")
cursor=db.cursor(pymysql.cursors.DictCursor)
cursor.execute("USE world")
db.commit()
# Elasticsearch接続設定
es = Elasticsearch(['XX.XX.XX.XX:9200'])
# MySQLからcityデータをまるごと取得
sql = 'SELECT * FROM city'
cursor.execute(sql)
city_table = cursor.fetchall()
# Bulk送信用のドキュメントの配列
bulk_doc = []
# bulkで送信するドキュメント数とカウンター
bulk_number = 1000
n = 1
# cityデータをレコード単位でループ
for city_record in city_table:
index_id = str(city_record["ID"])
city_doc = {}
city_doc["Name"] = city_record["Name"]
city_doc["CountryCode"] = city_record["CountryCode"]
city_doc["District"] = city_record["District"]
city_doc["Population"] = city_record["Population"]
# bulk配列に追加
bulk_doc.append({"_index":"city", "_type":"_doc", "_id":index_id, "_source":city_doc})
# 格納数がbulk_numberに達したら送信、未達なら次のcity_doc生成とbulk配列へのappendに進む
if n < bulk_number:
n = n + 1
else :
response = helpers.bulk(es, bulk_doc)
pprint(response)
n = 1
bulk_index = []
# bulk_numberに達しなうでループが終わってしまった分を送信
response = helpers.bulk(es, bulk_doc)
pprint(response)
このスクリプトを実行すると下記のような結果が得られます。
$ ./bulkImportFromCityTable.py
(1000, [])
(2000, [])
(3000, [])
(4000, [])
(4079, [])
実行環境にもよりますが、こちらの方が速くインポートが完了すると思います。