概要
みんな大好きJupyter notebook(python)上で、Pyspark/Cythonを使っていろんなことをやる。とかいう記事を書こうと思ったけど、1記事に詰め込みすぎても醜いし、時間かかって書きかけで放置してしまうので、分割して初歩的なことからはじめようとおもった。
ということで、今回は、Jupyter起動して、sparkSession作るだけにしてみる。
使用バージョン
- Python == 3.5.1
- Spark == 2.0系最新(branch-2.0をビルドしたもの)
- notebook == 4.2.1
Sparkの最新安定バージョンは、2016-07-01現在1.6.2なんだけど、もうgithubには2.0.0-rc1出てたりする。しかもrc1出て以降も、バグフィックスとかcommitされているので、結局今使っているのは、branch-2.0をビルドしたもの。
ちなみに、2.0で結構APIが変わっています。
Jupyter起動の前にやること
Jupyter起動前に、いろいろ環境変数をセットしておく。Jupyterの設定ファイルに書いといてもいいけど、書き方よくわかっていないし、毎回設定変えたりするので、環境変数でやってしまう。
$ export SPARK_HOME=/opt/local/spark
$ export PYSPARK_PYTHON=/opt/local/python-3.5.1/bin/python3
$ export PYSPARK_DRIVER_PYTHON=/opt/local/python-3.5.1/bin/python3
export PYTHONPATH=$(ls -a ${SPARK_HOME}/python/lib/py4j-*-src.zip):${SPARK_HOME}/python:$PYTHONPATH
$ export PYSPARK_SUBMIT_ARGS="
--packages com.amazonaws:aws-java-sdk-pom:1.11.8,org.apache.hadoop:hadoop-aws:2.7.2
--conf 'spark.local.dir=/mnt/ephemeral/tmp/spark'
--driver-java-options '-XX:+UseG1GC -XX:G1HeapRegionSize=32m -XX:+ParallelRefProcEnabled -XX:MaxGCPauseMillis=300 -XX:InitiatingHeapOccupancyPercent=35'
--driver-library-path '/opt/local/hadoop/lib/native'
--conf 'spark.driver.memory=2g'
--conf 'spark.driver.maxResultSize=2g'
--conf 'spark.executor.memory=45g'
--conf 'spark.executor.extraJavaOptions=-XX:+UseG1GC -XX:G1HeapRegionSize=32m -XX:+ParallelRefProcEnabled -XX:MaxGCPauseMillis=300 -XX:InitiatingHeapOccupancyPercent=35'
--conf 'spark.executor.extraLibraryPath=/opt/local/hadoop/lib/native'
--conf 'spark.executorEnv.LD_PRELOAD=/usr/lib/libjemalloc.so'
--conf 'spark.network.timeout=600s'
--conf 'spark.io.compression.codec=lz4'
--conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer'
--conf 'spark.kryo.referenceTracking=false'
--conf 'spark.shuffle.io.numConnectionsPerPeer=4'
--conf 'spark.sql.inMemoryColumnarStorage.batchSize=20000'
--conf 'spark.sql.autoBroadcastJoinThreshold=104857600'
--conf 'spark.sql.shuffle.partitions=800'
pyspark-shell
"
環境変数説明
Sparkドキュメント見ればわかるけど一応。インストールパスとかは、自分の環境に合わせてね。これ以外にも、必要に応じてHADOOP_HOMEとかも。
環境変数名 | 適用 |
---|---|
SPARK_HOME | ま、これはそのまんま。spark-env.shとかで設定してもいい。 |
PYSPARK_PYTHON | Workerが使うPython executable。指定しなければOSデフォルトのpython |
PYSPARK_DRIVER_PYTHON | Driverが使うPython executable。指定しなければOSデフォルトのpython |
PYTHONPATH | Jupyter上でimport pyspark できるように。py4jのバージョン変わったりするのでわざわざlsしてます |
PYSPARK_SUBMIT_ARGS | pysparkの起動オプション。aws関連のパッケージを読んだりしている。好きなように変えてください。メモリをたくさん使う設定にしているので、このまま張り付けたりしても、メモリ足りないと動きません。最後のpyspark-shell は必要。 |
複数notebook使う時、メモリなどの設定をnotebookごとに変えたい場合は、notebook上でsparkSessionを作る前に、os.environ
を使ってPYSPARK_SUBMIT_ARGS
を上書きしてもいいよ。
Jupyter起動
上記環境変数とともに、こんな感じで。
$ /opt/local/python-3.5/bin/jupyter notebook --ip=0.0.0.0 --no-browser
これ以降は、Jupyter上で作業。以下は、Jupyterでつくったnotebookをmarkdown変換して張り付けただけ。
必要なモジュールのimport
import os
import pyspark
from pyspark import StorageLevel
from pyspark.sql import (
SparkSession,
functions as sql_funcs,
)
from pyspark.sql.types import *
SparkSession作成
2.0.0からは、pyspark.sql.SparkSessionがこういう時のフロントAPIになっているみたいなので、それに従う。
SparkSession使用時に、SparkContextのAPIにアクセスしたい場合は、spark_session.sparkContextでSparkContextを取得できる。
try:
spark_session.stop()
except NameError:
pass
spark_session = SparkSession.builder.appName(
name='spark-tips-1',
).master(
master=os.environ.get('X_SPARK_MASTER', 'local[*]'),
).enableHiveSupport().getOrCreate()
DataFrame作成
pythonの欠点は遅いところ。pysparkのソース見ればわかるけど、特にrddのAPIは、「処理を速くしよう」という意思を微塵も感じさせないコードになってたりする。
なので、DataFrame(将来的にはDataSet?)で完結できる処理は、極力DataFrameでやろう。
今回は、最初の一歩なので、お手軽にプロセス内のlistからDataFrame作成。
local_list = (
('2016-07-01 00:00:00', 'jiba-nyan', 1,),
('2016-07-01 00:01:00', 'bushi-nyan', 1,),
('2016-07-01 00:02:00', 'koma-san', 1,),
('2016-07-01 00:03:00', 'komajiro', 1,),
)
schema = StructType(
fields=(
# 日時
StructField(
name='dt',
dataType=StringType(),
nullable=False,
),
# user_name
StructField(
name='user_name',
dataType=StringType(),
nullable=False,
),
# rate
StructField(
name='rate',
dataType=LongType(),
nullable=True,
),
),
)
df = spark_session.createDataFrame(
data=local_list,
schema=schema,
).persist(
storageLevel=StorageLevel.MEMORY_ONLY_SER,
)
printSchema()でスキーマ確認
df.printSchema()
root
|-- dt: string (nullable = false)
|-- user_name: string (nullable = false)
|-- rate: long (nullable = true)
show()で中身を確認
df.show()
+-------------------+----------+----+
| dt| user_name|rate|
+-------------------+----------+----+
|2016-07-01 00:00:00| jiba-nyan| 1|
|2016-07-01 00:01:00|bushi-nyan| 1|
|2016-07-01 00:02:00| koma-san| 1|
|2016-07-01 00:03:00| komajiro| 1|
+-------------------+----------+----+
take()で中身を確認
df.take(4)
[Row(dt='2016-07-01 00:00:00', user_name='jiba-nyan', rate=1),
Row(dt='2016-07-01 00:01:00', user_name='bushi-nyan', rate=1),
Row(dt='2016-07-01 00:02:00', user_name='koma-san', rate=1),
Row(dt='2016-07-01 00:03:00', user_name='komajiro', rate=1)]
日付のカラムが文字列になってるので、Timestamp型に変換
この場合は、うまい具合に日時フォーマットになってるので、cast(TimestampType())
するだけ。
フォーマットが違う場合も、文字列操作などのSQL関数で、(python使わずに)大体何とかなります。
df_parsed = df.withColumn(
'dt',
df['dt'].cast(TimestampType()),
).persist(
storageLevel=StorageLevel.MEMORY_ONLY_SER,
)
df_parsed.printSchema()
root
|-- dt: timestamp (nullable = true)
|-- user_name: string (nullable = false)
|-- rate: long (nullable = true)
df_parsed.show()
+--------------------+----------+----+
| dt| user_name|rate|
+--------------------+----------+----+
|2016-07-01 00:00:...| jiba-nyan| 1|
|2016-07-01 00:01:...|bushi-nyan| 1|
|2016-07-01 00:02:...| koma-san| 1|
|2016-07-01 00:03:...| komajiro| 1|
+--------------------+----------+----+
df_parsed.take(4)
[Row(dt=datetime.datetime(2016, 7, 1, 0, 0), user_name='jiba-nyan', rate=1),
Row(dt=datetime.datetime(2016, 7, 1, 0, 1), user_name='bushi-nyan', rate=1),
Row(dt=datetime.datetime(2016, 7, 1, 0, 2), user_name='koma-san', rate=1),
Row(dt=datetime.datetime(2016, 7, 1, 0, 3), user_name='komajiro', rate=1)]
まとめ
今回は最初の一歩なので、簡単だよねん。