LoginSignup
11
12

More than 5 years have passed since last update.

spark 2.0系Tips #1 Jupyterでpyspark

Last updated at Posted at 2016-06-30

概要

みんな大好きJupyter notebook(python)上で、Pyspark/Cythonを使っていろんなことをやる。とかいう記事を書こうと思ったけど、1記事に詰め込みすぎても醜いし、時間かかって書きかけで放置してしまうので、分割して初歩的なことからはじめようとおもった。

ということで、今回は、Jupyter起動して、sparkSession作るだけにしてみる。

使用バージョン

  • Python == 3.5.1
  • Spark == 2.0系最新(branch-2.0をビルドしたもの)
  • notebook == 4.2.1

Sparkの最新安定バージョンは、2016-07-01現在1.6.2なんだけど、もうgithubには2.0.0-rc1出てたりする。しかもrc1出て以降も、バグフィックスとかcommitされているので、結局今使っているのは、branch-2.0をビルドしたもの。
ちなみに、2.0で結構APIが変わっています。

Jupyter起動の前にやること

Jupyter起動前に、いろいろ環境変数をセットしておく。Jupyterの設定ファイルに書いといてもいいけど、書き方よくわかっていないし、毎回設定変えたりするので、環境変数でやってしまう。

$ export SPARK_HOME=/opt/local/spark
$ export PYSPARK_PYTHON=/opt/local/python-3.5.1/bin/python3
$ export PYSPARK_DRIVER_PYTHON=/opt/local/python-3.5.1/bin/python3
export PYTHONPATH=$(ls -a ${SPARK_HOME}/python/lib/py4j-*-src.zip):${SPARK_HOME}/python:$PYTHONPATH
$ export PYSPARK_SUBMIT_ARGS="
--packages com.amazonaws:aws-java-sdk-pom:1.11.8,org.apache.hadoop:hadoop-aws:2.7.2
 --conf 'spark.local.dir=/mnt/ephemeral/tmp/spark'
 --driver-java-options '-XX:+UseG1GC -XX:G1HeapRegionSize=32m -XX:+ParallelRefProcEnabled -XX:MaxGCPauseMillis=300 -XX:InitiatingHeapOccupancyPercent=35'
 --driver-library-path '/opt/local/hadoop/lib/native'
 --conf 'spark.driver.memory=2g'
 --conf 'spark.driver.maxResultSize=2g'
 --conf 'spark.executor.memory=45g'
 --conf 'spark.executor.extraJavaOptions=-XX:+UseG1GC -XX:G1HeapRegionSize=32m -XX:+ParallelRefProcEnabled -XX:MaxGCPauseMillis=300 -XX:InitiatingHeapOccupancyPercent=35'
 --conf 'spark.executor.extraLibraryPath=/opt/local/hadoop/lib/native'
 --conf 'spark.executorEnv.LD_PRELOAD=/usr/lib/libjemalloc.so'
 --conf 'spark.network.timeout=600s'
 --conf 'spark.io.compression.codec=lz4'
 --conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer'
 --conf 'spark.kryo.referenceTracking=false'
 --conf 'spark.shuffle.io.numConnectionsPerPeer=4'
 --conf 'spark.sql.inMemoryColumnarStorage.batchSize=20000'
 --conf 'spark.sql.autoBroadcastJoinThreshold=104857600'
 --conf 'spark.sql.shuffle.partitions=800'
 pyspark-shell
"

環境変数説明

Sparkドキュメント見ればわかるけど一応。インストールパスとかは、自分の環境に合わせてね。これ以外にも、必要に応じてHADOOP_HOMEとかも。

環境変数名 適用
SPARK_HOME ま、これはそのまんま。spark-env.shとかで設定してもいい。
PYSPARK_PYTHON Workerが使うPython executable。指定しなければOSデフォルトのpython
PYSPARK_DRIVER_PYTHON Driverが使うPython executable。指定しなければOSデフォルトのpython
PYTHONPATH Jupyter上でimport pysparkできるように。py4jのバージョン変わったりするのでわざわざlsしてます
PYSPARK_SUBMIT_ARGS pysparkの起動オプション。aws関連のパッケージを読んだりしている。好きなように変えてください。メモリをたくさん使う設定にしているので、このまま張り付けたりしても、メモリ足りないと動きません。最後のpyspark-shellは必要。

複数notebook使う時、メモリなどの設定をnotebookごとに変えたい場合は、notebook上でsparkSessionを作る前に、os.environを使ってPYSPARK_SUBMIT_ARGSを上書きしてもいいよ。

Jupyter起動

上記環境変数とともに、こんな感じで。

$ /opt/local/python-3.5/bin/jupyter notebook --ip=0.0.0.0 --no-browser

これ以降は、Jupyter上で作業。以下は、Jupyterでつくったnotebookをmarkdown変換して張り付けただけ。

必要なモジュールのimport

import os

import pyspark
from pyspark import StorageLevel
from pyspark.sql import (
    SparkSession,
    functions as sql_funcs,
)
from pyspark.sql.types import *

SparkSession作成

2.0.0からは、pyspark.sql.SparkSessionがこういう時のフロントAPIになっているみたいなので、それに従う。

SparkSession使用時に、SparkContextのAPIにアクセスしたい場合は、spark_session.sparkContextでSparkContextを取得できる。

try:
    spark_session.stop()
except NameError:
    pass

spark_session = SparkSession.builder.appName(
    name='spark-tips-1',
).master(
    master=os.environ.get('X_SPARK_MASTER', 'local[*]'),
).enableHiveSupport().getOrCreate()

DataFrame作成

pythonの欠点は遅いところ。pysparkのソース見ればわかるけど、特にrddのAPIは、「処理を速くしよう」という意思を微塵も感じさせないコードになってたりする。
なので、DataFrame(将来的にはDataSet?)で完結できる処理は、極力DataFrameでやろう。

今回は、最初の一歩なので、お手軽にプロセス内のlistからDataFrame作成。

local_list = (
    ('2016-07-01 00:00:00', 'jiba-nyan', 1,),
    ('2016-07-01 00:01:00', 'bushi-nyan', 1,),
    ('2016-07-01 00:02:00', 'koma-san', 1,),
    ('2016-07-01 00:03:00', 'komajiro', 1,),
)

schema = StructType(
    fields=(
        # 日時
        StructField(
            name='dt',
            dataType=StringType(),
            nullable=False,
        ),
        # user_name
        StructField(
            name='user_name',
            dataType=StringType(),
            nullable=False,
        ),
        # rate
        StructField(
            name='rate',
            dataType=LongType(),
            nullable=True,
        ),
    ),
)


df = spark_session.createDataFrame(
    data=local_list,
    schema=schema,
).persist(
    storageLevel=StorageLevel.MEMORY_ONLY_SER,
)

printSchema()でスキーマ確認

df.printSchema()
root
 |-- dt: string (nullable = false)
 |-- user_name: string (nullable = false)
 |-- rate: long (nullable = true)

show()で中身を確認

df.show()
+-------------------+----------+----+
|                 dt| user_name|rate|
+-------------------+----------+----+
|2016-07-01 00:00:00| jiba-nyan|   1|
|2016-07-01 00:01:00|bushi-nyan|   1|
|2016-07-01 00:02:00|  koma-san|   1|
|2016-07-01 00:03:00|  komajiro|   1|
+-------------------+----------+----+

take()で中身を確認

df.take(4)
[Row(dt='2016-07-01 00:00:00', user_name='jiba-nyan', rate=1),
 Row(dt='2016-07-01 00:01:00', user_name='bushi-nyan', rate=1),
 Row(dt='2016-07-01 00:02:00', user_name='koma-san', rate=1),
 Row(dt='2016-07-01 00:03:00', user_name='komajiro', rate=1)]

日付のカラムが文字列になってるので、Timestamp型に変換

この場合は、うまい具合に日時フォーマットになってるので、cast(TimestampType())するだけ。
フォーマットが違う場合も、文字列操作などのSQL関数で、(python使わずに)大体何とかなります。

df_parsed = df.withColumn(
    'dt',
    df['dt'].cast(TimestampType()),
).persist(
    storageLevel=StorageLevel.MEMORY_ONLY_SER,
)
df_parsed.printSchema()
root
 |-- dt: timestamp (nullable = true)
 |-- user_name: string (nullable = false)
 |-- rate: long (nullable = true)
df_parsed.show()
+--------------------+----------+----+
|                  dt| user_name|rate|
+--------------------+----------+----+
|2016-07-01 00:00:...| jiba-nyan|   1|
|2016-07-01 00:01:...|bushi-nyan|   1|
|2016-07-01 00:02:...|  koma-san|   1|
|2016-07-01 00:03:...|  komajiro|   1|
+--------------------+----------+----+
df_parsed.take(4)
[Row(dt=datetime.datetime(2016, 7, 1, 0, 0), user_name='jiba-nyan', rate=1),
 Row(dt=datetime.datetime(2016, 7, 1, 0, 1), user_name='bushi-nyan', rate=1),
 Row(dt=datetime.datetime(2016, 7, 1, 0, 2), user_name='koma-san', rate=1),
 Row(dt=datetime.datetime(2016, 7, 1, 0, 3), user_name='komajiro', rate=1)]

まとめ

今回は最初の一歩なので、簡単だよねん。

11
12
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
11
12