動機
仕事でAWS Glueを使っている。、GlueはSparkが内部で動いており、PySparkで処理を記述することができる。
まだコードが満足に書けない中、実行に何分もかかると辛いのでローカルで動かないかなという感じ。
やったこと
PySparkの実行
ローカルにいろいろ入れたくないのでDockerから実行。
$ docker pull jupyter/pyspark-notebook
Status: Downloaded newer image for jupyter/pyspark-notebook:latest
$ docker run -p 8888:8888 -v /tmp/pyspark-notebook:/home/jovyan/work jupyter/pyspark-notebook start-notebook.sh
Copy/paste this URL into your browser when you connect for the first time,
to login with a token:
http://(xxxxxxxxxxx or 127.0.0.1):8888/?token=34602879dbd346bbb1febcce60fb6558141fe707e639a1d7
Webブラウザで先ほど表示された http://localhost:8888/?token=34602879dbd346bbb1febcce60fb6558141fe707e639a1d7 (Tokenは毎回異なる)にアクセスする。
PySparkの実行
New->Terminal
で新しいノートを開く。
https://yubessy.hatenablog.com/entry/2016/12/11/095915
によると
- RDD
- DataFrame
- Dataset
の3つのAPIがあるらしい。DatasetはDataFrameの型安全版のため、PythonではDataFrameがメインみたい。
RDD
MapReduce?
from pyspark.context import SparkContext
sc = SparkContext.getOrCreate()
rdd = sc.textFile("README.md").flatMap(lambda line:line.split(" "))
wordCount = (rdd.map(lambda word: (word, 1))
.reduceByKey(lambda v1, v2: v1 + v2)
.sortBy(lambda x: x[1], ascending=False))
wordCount.take(5)
[('', 9),
(' environment:', 3),
(' ports:', 3),
(' depends_on:', 2),
(' - spark-master', 2)]
DataFrame
Pandasみたい。
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
df = spark.createDataFrame([(1,0), (0,3),(3,2),(4,8),(5,9),(3,3), (4,5),(4,4),],['col1', 'col2'])
df.filter(df.col1>0).groupBy(df.col1).count().show()
+----+-----+
|col1|count|
+----+-----+
| 5| 1|
| 1| 1|
| 3| 2|
| 4| 3|
+----+-----+
参考