4
6

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 5 years have passed since last update.

MySQL(MariaDB)の表データからElasticsearchのインデックスにデータをインポートする

Posted at

MariaDBの表データからPythonスクリプトでElasticsearchにインポートする方法を記載します。
RDBのデータをインデックス化する時の参考になればと思います。

MariaDBのインストールとサンプルデータの準備は下記を参照しました。
Ubuntu 18.04 LTS に MariaDB 10.3 をインストール
https://qiita.com/cherubim1111/items/61cbc72673712431d06e
【SQL】MySQL公式サンプルデータベースを使う
https://qiita.com/yukibe/items/fc6016348ecf4f3b10bf
MySQL Serverに外部から接続する
https://qiita.com/tocomi/items/0c009d7299584df49378

上記のサンプルデータベースのcityテーブルを丸ごとインポートすることを目指したいと思います。

0. 事前準備

事前にPython3とpython3-pipが導入されているものとします。
スクリプトで使用するMySQL(MariaDB)とElasticsearchのPythonライブラリをインストールします。
なお、MySQLのPythonライブラリはいくつかありますが、個人的にはpymysqlが最も安定しているように感じました。

pip3 install pymysql
pip3 install elasticsearch

1. cityインデックスのmapping定義

cityテーブルの構造は下記の通りです。

MariaDB [world]> desc city;
+-------------+----------+------+-----+---------+----------------+
| Field       | Type     | Null | Key | Default | Extra          |
+-------------+----------+------+-----+---------+----------------+
| ID          | int(11)  | NO   | PRI | NULL    | auto_increment |
| Name        | char(35) | NO   |     |         |                |
| CountryCode | char(3)  | NO   | MUL |         |                |
| District    | char(20) | NO   |     |         |                |
| Population  | int(11)  | NO   |     | 0       |                |
+-------------+----------+------+-----+---------+----------------+
5 rows in set (0.005 sec)

mapping定義するにあたり、Name、CountryCode、Districtはのtypeはkeywordに、Populationのtypeはintegerとします。
keywordではなくtextと定義することも考えられますが、国名などは文章ではないのでkeywordで十分だと考えます。
なお、IDはドキュメントIDとして使用する想定で、mappingの定義内容には含みません。

cityインデックスのmappingを定義するPythonスクリプトは下記のとおりとなります。

putCityMapping.py
# !/usr/bin/python3
from elasticsearch import Elasticsearch
from pprint import pprint

# Elasticsearch接続設定
es =  Elasticsearch(['XX.XX.XX.XX:9200']) #ElasticsearchサーバーのIPアドレス

# mappingの定義クエリ
city = {}
city["settings"] = {}
city["settings"]["number_of_shards"] = 3
city["settings"]["number_of_replicas"] = 1

city["mappings"] = {}
city["mappings"]["properties"] = {}
city["mappings"]["properties"]["Name"] = {}
city["mappings"]["properties"]["Name"]["type"] = "keyword"
city["mappings"]["properties"]["CountryCode"] = {}
city["mappings"]["properties"]["CountryCode"]["type"] = "keyword"
city["mappings"]["properties"]["District"] = {}
city["mappings"]["properties"]["District"]["type"] = "keyword"
city["mappings"]["properties"]["Population"] = {}
city["mappings"]["properties"]["Population"]["type"] = "integer"

# クエリの内容を表示
pprint("クエリの内容を表示")
pprint(city)
print()

# クエリの実行
response = es.indices.create(index='city',body=city)

# クエリのレスポンスの表示
pprint("レスポンスの内容を表示")
pprint(response)

このスクリプトを実行すると下記のような結果が得られます。

$ ./putCityMapping.py 
'クエリの内容を表示'
{'mappings': {'properties': {'CountryCode': {'type': 'keyword'},
                             'District': {'type': 'keyword'},
                             'Name': {'type': 'keyword'},
                             'Population': {'type': 'integer'}}},
 'settings': {'number_of_replicas': 1, 'number_of_shards': 3}}

'レスポンスの内容を表示'
{'acknowledged': True, 'index': 'city', 'shards_acknowledged': True}

2. cityインデックスのmappingの定義内容の取得

続いて、作成したmappingの定義内容を表示するスクリプトを作成します。
内容は下記の通りです。

getCityMapping.py
# !/usr/bin/python3
from elasticsearch import Elasticsearch
from pprint import pprint

# Elasticsearch接続設定
es =  Elasticsearch(['XX.XX.XX.XX:9200']) #ElasticsearchサーバーのIPアドレス

response = es.indices.get_mapping(index='city')

pprint(response)

このスクリプトを実行すると下記のような結果が得られます。

$ ./getCityMapping.py
{'city': {'mappings': {'properties': {'CountryCode': {'type': 'keyword'},
                                      'District': {'type': 'keyword'},
                                      'Name': {'type': 'keyword'},
                                      'Population': {'type': 'integer'}}}}}

3. cityドキュメントの作成(1つだけ)

作成したcityインデックスにドキュメントを1つだけ作成します。
いきなりCityテーブルから全データをインポートするのではなく、SQLで表示したデータを使用することにします。

ドキュメント化するデータは下記の通りです。

MariaDB [world]> select  * from city where id=1;
+----+-------+-------------+----------+------------+
| ID | Name  | CountryCode | District | Population |
+----+-------+-------------+----------+------------+
|  1 | Kabul | AFG         | Kabol    |    1780000 |
+----+-------+-------------+----------+------------+
1 row in set (0.004 sec)

このデータのドキュメントを作成するスクリプトは下記の通りです。

putKabulDoc.py
# !/usr/bin/python3
from elasticsearch import Elasticsearch
from pprint import pprint

# Elasticsearch接続設定
es =  Elasticsearch(['XX.XX.XX.XX:9200']) #ElasticsearchサーバーのIPアドレス

index_id = "1"
kabul_doc = {}
kabul_doc["Name"] = "Kabul"
kabul_doc["CountryCode"] = "AFG"
kabul_doc["District"] = "Kabol"
kabul_doc["Population"] = 1780000
pprint(kabul_doc)
print()

response = es.index(index="city", doc_type="_doc", id=index_id,body=kabul_doc)
pprint(response)

このスクリプトを実行すると下記のような結果が得られます。

$ ./putKabulDoc.py 
{'CountryCode': 'AFG',
 'District': 'Kabol',
 'Name': 'Kabul',
 'Population': 1780000}

{'_id': '1',
 '_index': 'city',
 '_primary_term': 1,
 '_seq_no': 0,
 '_shards': {'failed': 0, 'successful': 1, 'total': 2},
 '_type': '_doc',
 '_version': 1,
 'result': 'created'}

4. ドキュメントの表示

KabulドキュメントをElasticsearchから取得するスクリプトを作成します。
ここでは検索クエリではなく、id=1のドキュメントを取得するクエリとします。

getId1Doc.py
# !/usr/bin/python3
from elasticsearch import Elasticsearch
from pprint import pprint

# Elasticsearch接続設定
es =  Elasticsearch(['XX.XX.XX.XX:9200']) #ElasticsearchサーバーのIPアドレス

response = es.get(index='city',id='1')
pprint(response)

このスクリプトを実行すると下記のような結果が得られます。

$ ./getId1Doc.py 
{'_id': '1',
 '_index': 'city',
 '_primary_term': 1,
 '_seq_no': 0,
 '_source': {'CountryCode': 'AFG',
             'District': 'Kabol',
             'Name': 'Kabul',
             'Population': 1780000},
 '_type': '_doc',
 '_version': 1,
 'found': True}

5.cityテーブルからPythonでデータを取得する

cityテーブルから全データをインポートする前に、テーブルからデータを取得する方法を確認したいと思います。

getCityTable.py
# !/usr/bin/python3
import pymysql.cursors
from pprint import pprint

# MySQL(MariaDB)接続設定
db = pymysql.connect(host="XX.XX.XX.XX", user="ユーザー名", password="パスワード")
cursor=db.cursor(pymysql.cursors.DictCursor)

cursor.execute("USE world")
db.commit()
 
sql = 'SELECT * FROM city'
cursor.execute(sql)

city = cursor.fetchall()

pprint(city)

このスクリプトを実行すると下記のような結果が得られます。

$ ./getCityTable.py
[{'CountryCode': 'AFG',
  'District': 'Kabol',
  'ID': 1,
  'Name': 'Kabul',
  'Population': 1780000},
 {'CountryCode': 'AFG',
  'District': 'Qandahar',
  'ID': 2,
  'Name': 'Qandahar',
  'Population': 237500},
 {'CountryCode': 'AFG',
  'District': 'Herat',
  'ID': 3,
  'Name': 'Herat',
  'Population': 186800},

(省略)

 {'CountryCode': 'PSE',
  'District': 'Rafah',
  'ID': 4079,
  'Name': 'Rafah',
  'Population': 92020}]

6.全データのインポート

ようやくですが、cityテーブルの全データをスクリプトでインポートしたいと思います。
スクリプトの内容は下記の通りです。

importFromCityTable.py
# !/usr/bin/python3
import pymysql.cursors
from elasticsearch import Elasticsearch
from pprint import pprint

# MySQL接続設定
db = pymysql.connect(host="XX.XX.XX.XX", user="ユーザーID", password="パスワード")
cursor=db.cursor(pymysql.cursors.DictCursor)

cursor.execute("USE world")
db.commit()
 
# Elasticsearch接続設定
es =  Elasticsearch(['XX.XX.XX.XX:9200'])

# MySQLからcityデータをまるごと取得
sql = 'SELECT * FROM city'
cursor.execute(sql)
city_table = cursor.fetchall()

# cityデータをレコード単位でループ
for city_record in city_table:
    index_id = str(city_record["ID"])

    city_doc = {}
    city_doc["Name"] = city_record["Name"]
    city_doc["CountryCode"] = city_record["CountryCode"]
    city_doc["District"] = city_record["District"]
    city_doc["Population"] = city_record["Population"]
    response = es.index(index="city", doc_type="_doc", id=index_id, body=city_doc)
    print(response)

このスクリプトを実行すると下記のような結果が得られます。

# ./importFromCityTable.py 
{'_index': 'city', '_type': '_doc', '_id': '1', '_version': 2, 'result': 'updated', '_shards': {'total': 2, 'successful': 1, 'failed': 0}, '_seq_no': 1, '_primary_term': 1}
{'_index': 'city', '_type': '_doc', '_id': '2', '_version': 1, 'result': 'created', '_shards': {'total': 2, 'successful': 1, 'failed': 0}, '_seq_no': 0, '_primary_term': 1}
{'_index': 'city', '_type': '_doc', '_id': '3', '_version': 1, 'result': 'created', '_shards': {'total': 2, 'successful': 1, 'failed': 0}, '_seq_no': 1, '_primary_term': 1}

(省略)

{'_index': 'city', '_type': '_doc', '_id': '4078', '_version': 1, 'result': 'created', '_shards': {'total': 2, 'successful': 1, 'failed': 0}, '_seq_no': 1350, '_primary_term': 1}
{'_index': 'city', '_type': '_doc', '_id': '4079', '_version': 1, 'result': 'created', '_shards': {'total': 2, 'successful': 1, 'failed': 0}, '_seq_no': 1379, '_primary_term': 1}

インポートしたデータの確認はここでは省略したいと思います。
getId1Doc.pyを編集するか、Kibanaコンソールから確認して下さい。

7. Bulk APIを使ったインポート

cityテーブル程度のサイズでしたらimportFromCityTable.pyのように1つのドキュメントごとにPUTしてもそこまで時間はかかりませんが、都度通信が発生するためレコード数や1レコードのサイズが大きくなると時間がかかってしまいます。
そのような時は、Bulk APIを使用してドキュメントを配列形式でまとめてPUTした方がインポートの時間が短くて済みます。
Bulk APIを使用したスクリプトは下記のようになります。

bulkImportFromCityTable.py
# !/usr/bin/python3
import pymysql.cursors
from elasticsearch import Elasticsearch
from elasticsearch import helpers
from pprint import pprint

# MySQL接続設定
db = pymysql.connect(host="XX.XX.XX.XX", user="ユーザーID", password="パスワード")
cursor=db.cursor(pymysql.cursors.DictCursor)

cursor.execute("USE world")
db.commit()
 
# Elasticsearch接続設定
es =  Elasticsearch(['XX.XX.XX.XX:9200'])

# MySQLからcityデータをまるごと取得
sql = 'SELECT * FROM city'
cursor.execute(sql)
city_table = cursor.fetchall()

# Bulk送信用のドキュメントの配列
bulk_doc = []

# bulkで送信するドキュメント数とカウンター
bulk_number = 1000
n = 1

# cityデータをレコード単位でループ
for city_record in city_table:
    index_id = str(city_record["ID"])

    city_doc = {}
    city_doc["Name"] = city_record["Name"]
    city_doc["CountryCode"] = city_record["CountryCode"]
    city_doc["District"] = city_record["District"]
    city_doc["Population"] = city_record["Population"]

    # bulk配列に追加
    bulk_doc.append({"_index":"city", "_type":"_doc", "_id":index_id, "_source":city_doc})
    
    # 格納数がbulk_numberに達したら送信、未達なら次のcity_doc生成とbulk配列へのappendに進む
    if n < bulk_number:
        n = n + 1
    else :
        response = helpers.bulk(es, bulk_doc)
        pprint(response)
        n = 1
        bulk_index = []

# bulk_numberに達しなうでループが終わってしまった分を送信
response = helpers.bulk(es, bulk_doc)
pprint(response)

このスクリプトを実行すると下記のような結果が得られます。

$ ./bulkImportFromCityTable.py 
(1000, [])
(2000, [])
(3000, [])
(4000, [])
(4079, [])

実行環境にもよりますが、こちらの方が速くインポートが完了すると思います。

4
6
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
4
6

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?