この記事は、NTTテクノクロスAdvent Calnder 2020の18日目です。
こんにちは、NTTテクノクロスの@yuyhiraka (平川) と申します。
普段は仮想化/コンテナ/クラウド基盤、小規模ネットワークあたりの先進技術のPoCを主に担当しています。
この記事に記載の内容は個人的な取り組みの内容であり、所属する組織とは関係ありません。
はじめに
KoalasとElasticsearchが連携できるかを試してみました。
世の中的にApache Spark TMとElasticsearchの連携を試されている方々がいるのでその応用となる動作検証となります。
- Spark on elasticsearch-hadoop トライアル
- ElasticsearchのデータをApache Sparkで加工する
- Elasticsearchのクラスタを構築してSparkでIndexを作るまでの簡易手順
Elasticsearchとは
Elasticsearchは検索・分析のための検索エンジンおよびデータベースおよびエコシステムです。
Elasticsearchについての参考情報
Apache Spark TMとは
Apache Spark TMは高速なビッグデータ用の分散処理フレームワークです。Pythonにも対応しており特にPySparkと呼びます。
Apache Spark TMについての参考情報
- Apache Spark™ - Apache Spark とは~分散処理入門の方にもわかりやすくご紹介
- Apache Spark™ - Unified Analytics Engine for Big Data
pandasとは
pandasはPython用の強力なデータ分析ライブラリです。
pandasについての参考情報
Koalasとは
KoalasはApache Spark TMでpandasライクのデータ操作が可能になるラッパーライブラリです。 Apache Spark TMにはSpark Dataset/DataFrameと呼ばれるPandasのDataFrameに近い概念が存在しますが各種APIが異なるためpandas ⇔ Spark Dataset/DataFrame間でオブジェクト変換した際に混乱します。それを解決するアプローチがKoalasになります。
Koalasについての参考情報
- Koalas_ pandas API on Apache Spark
- Spark+AI Summit 2019参加レポート at San Francisco — Spark3.0/Koalas/MLflow/Delta Lake
ElasticsearchとApache Spark TMの連携について
同じバージョンが振られていることからElasticsearch-Hadoopプラグイン (elasticsearch-hadoop 7.10) を使えばElasticsearch 7.10とHadoopエコシステム (Apache Spark TM, Koalasを含む) の連携ができそうです。一方で2020年12月現在においてElasticsearch-Hadoopプラグインを用いてElasticsearchとApache Spark TM 3.0.xの連携ができないようです。
そこで今回は以下を満たすApache Spark TM 2.4.7を利用することにします。
- Koalasのサポートバージョン
- Elasticsearch-Hadoopプラグインのサポートバージョン
Elasticsearch-HadoopプラグインとElasticsearchのバージョンについての参考情報
ElasticsearchとApache Spark TM 3.0.xの連携についての参考情報
- Documentation Databricks Workspace guide Data guide Data sources ElasticSearch
- Restructure Spark Project Cross Compilation #1423
- [Feature] Spark3.0 support #1412
KoalasのDependenciesについての参考情報
検証環境の情報について
マシンスペック
- VirtualBox 6.1.10上のUbuntu 20.04 LTS
- vCPU 6コア
- vMem 32GB
- SSD 100GB
- via HTTP/HTTPS Proxy
(※HTTP/HTTPS Proxyの各種設定については手順上省略します。)
Dockerバージョン
# docker version
Client: Docker Engine - Community
Version: 20.10.0
Apache Spark TM 2.4.7のコンテナイメージを作成する
検証のための環境構築稼働を節約するためBuild an Image with a Different Version of Sparkを参考にPySpark2.4.7とJupyterLabがインストール済のコンテナイメージを作成します。
# mkdir ~/pyspark-notebook
# curl -O https://raw.githubusercontent.com/jupyter/docker-stacks/master/pyspark-notebook/Dockerfile
# mv Dockerfile ~/pyspark-notebook
# docker build --rm --force-rm \
-t jupyter/pyspark-notebook:spark-2.4.7 ./pyspark-notebook \
--build-arg spark_version=2.4.7 \
--build-arg hadoop_version=2.7 \
--build-arg spark_checksum=0F5455672045F6110B030CE343C049855B7BA86C0ECB5E39A075FF9D093C7F648DA55DED12E72FFE65D84C32DCD5418A6D764F2D6295A3F894A4286CC80EF478 \
--build-arg openjdk_version=8
上記のベースイメージに対してElasticsearch-HadoopプラグインとKoalasをインストールするためのDockerfileを作成します。しかし、PySpark2.4はそのままだとPython3.8.xで動作しないため対策としてPython3.7.xのconda仮想環境を作っておきます。
※cloudpickle.pyを差し替えたうえで改造するという方法もあるようですが今回は試しません。
# mkdir ~/koalas-spark
# vi ~/koalas-spark/Dockerfile
FROM jupyter/pyspark-notebook:spark-2.4.7
USER root
RUN apt-get update
RUN apt-get install -y curl
USER jovyan
RUN mkdir ~/jars
RUN curl https://repo1.maven.org/maven2/org/elasticsearch/elasticsearch-hadoop/7.10.1/{elasticsearch-hadoop-7.10.1.jar} --output "/home/jovyan/jars/#1"
RUN conda create -n py37 -c conda-forge python=3.7 jupyter pyspark=2.4 koalas=1.5 openjdk=8 -y
作成したDockerfileを使ってコンテナイメージを作成します。
# docker image build --rm --force-rm -t koalas-spark:0.1 ~/koalas-spark/
Elasticsearchのコンテナイメージをローカルに取得
大きめのコンテナイメージなので先に取得しておきます。
# docker pull elasticsearch:7.10.1
Docker Composeでコンテナ起動
必要ディレクトリ作成とdocker-compose.yamlの作成を行います。
# mkdir /opt/es
# mkdir /opt/koalas-spark/
# コンテナからアクセスできるようにパーミッションを緩める (手抜き)
# chmod 777 /opt/es /opt/koalas-spark/
# vi docker-compose.yaml
version: '3'
services:
elasticsearch:
image: elasticsearch:7.10.1
container_name: elasticsearch
environment:
- discovery.type=single-node
- cluster.name=docker-cluster
- bootstrap.memory_lock=true
- "ES_JAVA_OPTS=-Xms512m -Xmx512m"
ulimits:
memlock:
soft: -1
hard: -1
ports:
- 9200:9200
volumes:
- /opt/es/:/usr/share/elasticsearch/data
networks:
- devnet
koalas-spark:
build: ./koalas-spark
container_name: koalas-spark
working_dir: '/home/jovyan/work/'
tty: true
volumes:
- /opt/koalas-spark/:/home/jovyan/work/
networks:
- devnet
networks:
devnet:
Docker Composeを用いてKoalasコンテナとElasticsearchコンテナを立ち上げます。また、Elasticsearchコンテナが正常に起動していることを確認します。
# docker-compose build
# docker-compose up -d
# curl -X GET http://localhost:9200
{
"name" : "6700fb19f202",
"cluster_name" : "docker-cluster",
"cluster_uuid" : "P-uVFNu6RZKKxdklnVypbw",
"version" : {
"number" : "7.10.1",
"build_flavor" : "default",
"build_type" : "docker",
"build_hash" : "1c34507e66d7db1211f66f3513706fdf548736aa",
"build_date" : "2020-12-05T01:00:33.671820Z",
"build_snapshot" : false,
"lucene_version" : "8.7.0",
"minimum_wire_compatibility_version" : "6.8.0",
"minimum_index_compatibility_version" : "6.0.0-beta1"
},
"tagline" : "You Know, for Search"
Koalasコンテナの中に入る
起動しているコンテナ一覧の中からKoalasコンテナのコンテナIDを確認します。次にコンテナIDを指定しKoalasコンテナの中に入ります。 別解としてdocker-compose exec
を使う方法もあります。
# docker ps
CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES
e33681a37aea root_koalas-spark "tini -g -- start-no…" 2 minutes ago Up 2 minutes 8888/tcp koalas-spark
fe65e3351bea elasticsearch:7.10.1 "/tini -- /usr/local…" 16 minutes ago Up 16 minutes 0.0.0.0:9200->9200/tcp, 9300/tcp elasticsearch
# docker exec -it e33681a37aea bash
curlコマンドを用いてKoalasコンテナからElasticsearchコンテナへの疎通の確認をします。
$ curl -X GET http://elasticsearch:9200
{
"name" : "6700fb19f202",
"cluster_name" : "docker-cluster",
"cluster_uuid" : "P-uVFNu6RZKKxdklnVypbw",
"version" : {
"number" : "7.10.1",
"build_flavor" : "default",
"build_type" : "docker",
"build_hash" : "1c34507e66d7db1211f66f3513706fdf548736aa",
"build_date" : "2020-12-05T01:00:33.671820Z",
"build_snapshot" : false,
"lucene_version" : "8.7.0",
"minimum_wire_compatibility_version" : "6.8.0",
"minimum_index_compatibility_version" : "6.0.0-beta1"
},
"tagline" : "You Know, for Search"
KoalasからElasticsearchへの書き込み
引き続きKoalasコンテナで作業を進めます。
Python3.7環境に切り替えてPySpark (IPython) を起動しElasticsearchに対してデータの書き込みを行います。
今回はSpark RDDの機能を用いて4行4列のデータを作成しています。それを一度Spark DataFrameに変換し、さらにKoalas DataFrameに変換しています。
$ conda activate py37
$ export PYARROW_IGNORE_TIMEZONE=1
$ pyspark --jars /home/jovyan/jars/elasticsearch-hadoop-7.10.1.jar
import databricks.koalas as ks
import pandas as pd
import json, os, datetime, collections
from pyspark.sql import SparkSession, SQLContext, Row
from pyspark.sql.types import *
esURL = "elasticsearch"
rdd1 = sc.parallelize([
Row(col1=1, col2=1, col3=1, col4=1),
Row(col1=2, col2=2, col3=2, col4=2),
Row(col1=3, col2=3, col3=3, col4=3),
Row(col1=4, col2=4, col3=4, col4=4)
])
df1 = rdd1.toDF()
df1.show()
kdf1 = ks.DataFrame(df1)
print(kdf1)
kdf1.to_spark_io(path="sample/test",
format="org.elasticsearch.spark.sql",
options={"es.nodes.wan.only": "false",
"es.port": 9200,
"es.net.ssl": "false",
"es.nodes": esURL},
mode="Overwrite")
PySpark (IPython) からCtrl + Dキー等で抜けます。そして、Elasticsearchにデータが格納されていることを確認します。
curl -X GET http://elasticsearch:9200/sample/test/_search?pretty
{
"took" : 3,
"timed_out" : false,
"_shards" : {
"total" : 1,
"successful" : 1,
"skipped" : 0,
"failed" : 0
},
"hits" : {
"total" : {
"value" : 4,
"relation" : "eq"
},
"max_score" : 1.0,
"hits" : [
{
"_index" : "sample",
"_type" : "test",
"_id" : "kaTbZXYBpKFycpUDLgjO",
"_score" : 1.0,
"_source" : {
"col1" : 4,
"col2" : 4,
"col3" : 4,
"col4" : 4
}
},
{
"_index" : "sample",
"_type" : "test",
"_id" : "kKTbZXYBpKFycpUDLgjG",
"_score" : 1.0,
"_source" : {
"col1" : 2,
"col2" : 2,
"col3" : 2,
"col4" : 2
}
},
{
"_index" : "sample",
"_type" : "test",
"_id" : "j6TbZXYBpKFycpUDLgjG",
"_score" : 1.0,
"_source" : {
"col1" : 3,
"col2" : 3,
"col3" : 3,
"col4" : 3
}
},
{
"_index" : "sample",
"_type" : "test",
"_id" : "jqTbZXYBpKFycpUDLgjD",
"_score" : 1.0,
"_source" : {
"col1" : 1,
"col2" : 1,
"col3" : 1,
"col4" : 1
}
}
]
}
}
まとめ
上記の通りElasticsearch-Hadoopプラグインを用いてKoalasからElasticsearchにデータを投入できることを確認することができました。
当初の想定では
PySpark (IPython) ⇒ JupyterLab
Docker Compose ⇒ Kubernetes
で検証する予定だったのですが本質ではないところに時間をかけるわけにもいかなかったので今回は妥協しています。
おそらくJupyterLab/Kubernetesで実施しても同様にKoalasからElasticsearchに問題なくデータ投入することができるはずなので今後試してみたいと思います。また、要望がたくさん挙がってることから近いうちに対応しそうですがElasticsearch-HadoopプラグインがApache Spark TM 3.0.xでも利用できるようになることを強く望みます。
明日は@y-ohnukiによるNTTテクノクロスAdvent Calnder 2020の記事です。お楽しみに!