TSV(CSV)をElasicsearchへ入れるコードサンプル
from elasticsearch import Elasticsearch
from elasticsearch import helpers
import pandas as pd
import datetime
import time
import json
import random
from pandas.io.json import json_normalize
# Elasticsearch
es = Elasticsearch("{ES_IP}")
INDEX = "{ES_Index_Name}"
fname="{FileName}"
reader = pd.read_csv(fname, chunksize=1000, sep='\t',low_memory = False)
df_all = reader.get_chunk() # chunkをdataframe
# json
df_lines = df_all.to_json(force_ascii=False, orient='records', lines=True)
# Bulk inser
actions = []
for i in iter(df_lines.split("\n")):
v_json = json.loads(i)
actions.append({
"_index": INDEX,
"_type": "{ES_Type}",
"_source": v_json
})
helpers.bulk(es, actions)
- ES Mapping Conf
"format": "yyyy-MM-dd HHss||yyyy-MM-dd||yyyyMMdd||epoch_millis"
パイプ二本繋げるとほか日付のフォーマットが指定でき、形が違う日付でもKibanaから拾える
PUT hoge
{
"mappings": {
"books": {
"properties": {
"hoge1": { "type": "integer" },
"hoge2": { "type": "text" },
"hoge3": { "type": "text" },
"hoge4": { "type": "text" },
"hoge5": { "type": "integer" },
"hoge6": { "type": "text" },
"hoge7": { "type": "integer" },
"hoge8": { "type": "text" },
"hoge9": { "type": "text" },
"create_date": {
"type": "date",
"format": "yyyy-MM-dd HH:mm:ss||yyyy-MM-dd||yyyyMMdd||epoch_millis"
}
}
}
}
}