目的
Sparkのよく使うAPIを(主に自分用に)メモしておくことで、久しぶりに開発するときでもサクサク使えるようにしたい。とりあえずPython版をまとめておきます(Scala版も時間があれば加筆するかも)
このチートシートはあくまでチートシートなので(引数が省略してあったりします)、時間がある方はきちんと公式APIドキュメント(Spark Python API Docs)を見て下さい。
Spark API チートシート(Python)
以下では次を前提とする
from pyspark import SparkContext
from pyspark.sql import SQLContext
sc = SparkContext()
sqlContext = SQLContext(sc)
RDD
RDDを作る(データの読み込み)
parallelize
sc.parallelize(collection) リストやタプルからRDDを作る
>>> a = [1, 2, 3, 4, 5]
>>> rdd = sc.parallelize(a)
textFile
sc.textFile(file) ファイルを読み込む。ワイルドカードや正規表現も使える。
>>> rdd = sc.textFile("./words.txt")
wholeTextFiles
sc.wholeTextFiles(dierctory) ディレクトリの各ファイルの内容全体をそれぞれRDDの一つの要素に入力する
# $ ls
# a.json b.json c.json
>>> rdd = sc.textWholeFiles("./")
Action
Actionが実行されると初めてTransformationが順に実行される(遅延実行)
要素を返すもの
collect
collect() 全ての要素を返す
>>> print(rdd.collect())
[1, 2, 3, 4, 5]
take
take(n) 最初n個の要素を返す
>>> print(rdd.take(3))
[1, 2, 3]
first
first() 一番最初の要素を返す
>>> rdd = sc.parallelize([1, 2, 3])
>>> rdd.first()
1
top
top(n) 大きいものからn個要素を返す
>>> rdd = sc.parallelize([1, 2, 3])
>>> rdd.top(2)
[3, 2]
(統計)量を返すもの
count
count() 要素数を数えて返す
>>> rdd = sc.parallelize([1, 2, 3])
>>> rdd.count()
3
mean
mean() 平均を返す
>>> rdd = sc.parallelize([1, 2, 3])
>>> rdd.mean()
3.0
sum
sum() 合計を返す
>>> rdd = sc.parallelize([1, 2, 3])
>>> rdd.sum()
6
variance
variance() 分散を返す
>>> rdd = sc.parallelize([1, 2, 3])
>>> rdd.variance()
0.6666666666666666
stdev
stdev() 標準偏差を返す
>>> rdd = sc.parallelize([1, 2, 3])
>>> rdd.stdev()
0.816496580927726
保存するもの
saveAsTextFile
saveAsTextFile(file) ファイルを保存する
>>> rdd.saveAsTextFile("./a.txt")
Transformation
Transformationはimmutableな新しいRDDを返す
filter/map/reduce
filter
filter(f) fが真となる要素だけを含むRDDを返す
>>> rdd = sc.parallelize([1, 2, 3])
>>> rdd.filter(lambda x: x % 2 == 0).collect()
[2]
map
map(f) 全ての要素にfを作用させたRDDを返す
>>> rdd = sc.parallelize([1, 2, 3])
>>> rdd.map(lambda x: x * 2).collect()
[2, 4, 6]
flatMap
flatMap(f) 全ての要素にfを作用させたあと、要素内のリストを展開したRDDを返す
>>> rdd = sc.parallelize(["This is a pen", "This is an apple"])
>>> rdd.flatMap(lambda x: x.split()).collect()
['This', 'is', 'a', 'pen', 'This', 'is', 'an', 'apple']
Reduce
reduce(f) 二つの要素にfを作用させ続けて一つの返り値を得る
>>> rdd = sc.parallelize([1, 2, 3])
>>> rdd.reduce(lambda x, y: x + y)
6
ペアRDDに対する操作
ペアRDDをつくる
ペアRDDはPythonでいうTupleを要素に持つRDD。keyとvalueを扱うことができる。
作り方はkeyByを使うかmapで要素数2のタプルを要素に返す。
keyBy(PairRDD)
keyBy(f) 普通のRDDの要素にfを作用させ、その返り値をkeyに、元の要素をそのままvalueにしたRDDを返す
>>> rdd = sc.parallelize(["Ken 27 180 83", "Bob 32 170 65", "Meg 29 165 45"])
>>> rdd.keyBy(lambda x: x.split()[0]).collect()
[('Ken', 'Ken 27 180 83'), ('Bob', 'Bob 32 170 65'), ('Meg', 'Meg 29 165 45')]
keys
keys ペアRDDのkeyだけからなるRDDを返す
>>> rdd = sc.parallelize([("Ken", 2), ("Bob", 3), ("Taka", 1), ("Ken", 3), ("Bob", 2)])
>>> rdd.keys().collect()
['Ken', 'Bob', 'Taka', 'Ken', 'Bob']
values
values ペアRDDのvlaueだけからなるRDDを返す
>>> rdd = sc.parallelize([("Ken", 2), ("Bob", 3), ("Taka", 1), ("Ken", 3), ("Bob", 2)])
>>> rdd.values().collect()
[2, 3, 1, 3, 2]
flatMapValues
flatMapValues(f) PairRDDのvalueにflatMapを作用させてkeyを複製して所謂縦持ちにする
>>> rdd = sc.parallelize([("Ken", "Yumi,Yukiko"), ("Bob", "Meg, Tomomi, Akira"), ("Taka", "Yuki")])
>>> rdd.flatMapValues(lambda x: x.split(","))
[('Ken', 'Yumi'),
('Ken', 'Yukiko'),
('Bob', 'Meg'),
('Bob', ' Tomomi'),
('Bob', ' Akira'),
('Taka', 'Yuki')]
reduceByKey
reduceByKey(f) 同じkeyの要素でグループ化してvalueにreduceを作用させる
>>> rdd = sc.parallelize([("Ken", 2), ("Bob", 3), ("Taka", 1), ("Ken", 3), ("Bob", 2)])
>>> rdd.reduceByKey(lambda x, y: x + y).collect()
[('Taka', 1), ('Bob', 5), ('Ken', 5)]
countByKey
countByKey() 同じkeyの値がいくつあるか数えてdictで返す
>>> rdd = sc.parallelize([("Ken", 2), ("Bob", 3), ("Taka", 1), ("Ken", 3), ("Bob", 2)])
>>> rdd.countByKey()
defaultdict(<type 'int'>, {'Ken': 2, 'Bob': 2, 'Taka': 1})
sortByKey
sortByKey ペアRDDをkeyでソートします
>>> rdd = sc.parallelize([("cba", 2), ("abc", 3), ("bac", 1), ("bbb",
>>> rdd.sortByKey().collect()
[('aaa', 2), ('abc', 3), ('bac', 1), ('bbb', 3), ('cba', 2)]
Join操作
leftOuterJoin
二つのRDDをleft outer joinして、valueに二つの要素のタプルをもつペアRDDを返す
>>> rdd1 = sc.parallelize([("Ken", 1), ("Bob", 2), ("Meg", 3)])
>>> rdd2 = sc.parallelize([("Ken", 1), ("Kaz", 3)])
>>> rdd1.leftOuterJoin(rdd2).collect()
[('Bob', (2, None)), ('Meg', (3, None)), ('Ken', (1, 1))]
rightOuterJoin
二つのRDDをright outer joinして、valueに二つの要素のタプルをもつペアRDDを返す
>>> rdd1 = sc.parallelize([("Ken", 1), ("Bob", 2), ("Meg", 3)])
>>> rdd2 = sc.parallelize([("Ken", 1), ("Kaz", 3)])
>>> rdd1.rightOuterJoin(rdd2).collect()
[('Ken', (1, 1)), ('Kaz', (3, None))]
fullOuterJoin
二つのRDDをfull outer joinして、valueに二つの要素のタプルをもつペアRDDを返す
>>> rdd1 = sc.parallelize([("Ken", 1), ("Bob", 2), ("Meg", 3)])
>>> rdd2 = sc.parallelize([("Ken", 1), ("Kaz", 3)])
>>> rdd1.fullOuterJoin(rdd2).collect()
[('Bob', (2, None)), ('Meg', (3, None)), ('Ken', (1, 1)), ('Kaz', (None, 3))]
ソート操作
sortBy
sortBy(f) fの返す値によってソートする
>>> rdd = sc.parallelize([("cba", 2), ("abc", 3), ("bac", 1), ("bbb",
>>> rdd.sortBy(lambda (x, y): x).collect() # sortByKeyと同じ
集合操作など
intersection
intersection(rdd) 二つのRDDのintersectionを返す
union
union(rdd) 二つのRDDのunionを返す
zip
zip(rdd) 引数のrddの各要素をvlaueにしたペアRDDを返す
>>> rdd = sc.parallelize([("Ken", 2), ("Bob", 3), ("Taka", 1), ("Ken", 3), ("Bob", 2)])
>>> rdd.keys().zip(rdd.values())
[('Ken', 2), ('Bob', 3), ('Taka', 1), ('Ken', 3), ('Bob', 2)]
distinct
同じ要素を含まないRDDを返します
サンプリング操作
sample
sample(bool, frac) サンプリングしたRDDを返す。第一引数で同じ要素の重複を許すか決める。
>>> rdd = sc.parallelize([1, 2, 3, 4, 5])
>>> rdd.sample(True, 0.5).collect()
[1, 5, 5]
>>> rdd.sample(False, 0.5).collect()
[1, 3, 5]
takeSample
takeSmaple(bool, size) 固定されたサイズのサンプルをリストで返す。第一引数で同じ要素の重複を許すか決める。
>>> rdd = sc.parallelize([1, 2, 3, 4, 5])
>>> rdd.takeSample(True, 2)
[5, 5]
>>> rdd.takeSample(False, 2)
[3, 5]
デバッグ
toDebugString
toDebugString() 実行計画を返す
print(rdd.flatMap(lambda x: x.split()).map(lambda x: (x, 1)).reduceByKey(lambda x, y: x + y).toDebugString())
(1) PythonRDD[190] at RDD at PythonRDD.scala:43 []
| MapPartitionsRDD[189] at mapPartitions at PythonRDD.scala:374 []
| ShuffledRDD[188] at partitionBy at null:-1 []
+-(1) PairwiseRDD[187] at reduceByKey at <ipython-input-114-71f5cb742e13>:1 []
| PythonRDD[186] at reduceByKey at <ipython-input-114-71f5cb742e13>:1 []
| ParallelCollectionRDD[141] at parallelize at PythonRDD.scala:423 []
永続化
persist
persist() RDDをそのまま(デフォルトではメモリに)キャッシュする。メモリだけ、メモリが無理ならディスク、ディスクだけ、などの設定が出来る(StorageLevelで指定)
>>> rdd.persist()
unpersist
unpersist() RDDの永続化を解く。永続化レベルを変える時などに使う。
>>> from pyspark import StorageLevel
>>> rdd.persist()
>>> rdd.unpersist()
>>> rdd.persist(StorageLevel.DISK_ONLY)
よくある例
随時追加していく予定
word count
>>> rdd.flatMap(lambda x: x.split())\
.map(lambda x: (x, 1))\
.reduceByKey(lambda x, y: x + y)\
.take(5)
DataFrame
特に構造化データを扱うときはこちらの方が便利。
DataFrameをつくる(データの読み込み)
read.json
read.json(file) jsonからデータを読み込む
# $ cat a.json
# {"name":"Ken", "age":35}
# {"name":"Bob", "age":30, "weight":80}
# {"name":"Meg", "age":29, "weight":45}
df = sqlContext.read.json("a.json")
DataFrameを表示する
RDDと同じcollect, takeの他にshow がある
show
show(n) n行表示する(nはデフォルトで20)
>>> df.show()
+---+----+------+
|age|name|weight|
+---+----+------+
| 35| Ken| null|
| 30| Bob| 80|
| 29| Meg| 45|
+---+----+------+
DataFrameの操作
select
select(column) stringかColumnオブジェクトを渡してselectしたDataFrameを返す。カラムを列挙して複数列取得したり、演算することもできる。
>>> df.select("age").show()
+---+
|age|
+---+
| 35|
| 30|
| 29|
+---+
# 次も同じ
>>> df.select(df.age).show() # Columnオブジェクトを渡す
>>> df.select(df["age"]).show() # Columnオブジェクトを渡す
>>> df.select(df.name, df.age).show()
+----+---+
|name|age|
+----+---+
| Ken| 35|
| Bob| 30|
| Meg| 29|
+----+---+
DataframeのColumnオブジェクト
selectで渡すColumnオブジェクトのアクセスの仕方として、Pythonでは次の2パターンが用意されている:
>>> df.age
Column<age>
>>> df["age"]
Column<age>
where/filter
filter(condition) stringの条件に合う行だけからなるDataFrameを返す。 whereはfilterのaliasである。
>>> df.where(df.age >=30).show()
+---+----+------+
|age|name|weight|
+---+----+------+
| 35| Ken| null|
| 30| Bob| 80|
+---+----+------+
sort
sort(column) 指定されたカラムでソートされたDataFrameを返す
>>> df.sort(df.age)
+---+----+------+
|age|name|weight|
+---+----+------+
| 29| Meg| 45|
| 30| Bob| 80|
| 35| Ken| null|
+---+----+------+
limit
limit(n) 先頭n行だけに制限したDataFrameを返す
>>> df.limit(1).show()
+---+----+------+
|age|name|weight|
+---+----+------+
| 35| Ken| null|
+---+----+------+
distinct
distinct() distinctした結果の行だけからなるDataFrameを返す
>>> df.distinct().count()
3
join
join(dataframe, on, how) howのデフォルトはinner
- on: カラム、もしくはカラムのリスト
- how:
"inner","outer","left_outer","right_outer","leftsemi"のいずれか
DataframeからRDDへ変換
DataFrameはRDD上に構築されているので、元となるRDDを取り出すことができる
>>> print(df.rdd.collect())
[Row(age=35, name=u'Ken', weight=None),
Row(age=30, name=u'Bob', weight=80),
Row(age=29, name=u'Meg', weight=45)]
特定の列だけ取り出すにはRowオブジェクトの対応する属性にアクセスする
df.rdd.map(lambda row: (row.age, row.weight)).collect()
[(35, None), (30, 80), (29, 45)]
Dataframeを保存する
toJson
toJson() jsonの形でRDDに変換する。このあとsaveAsTextFileを呼べばjson形式で保存できる。
>>> df.toJSON().saveAsTextFile("b.json")
>>> df2 = sqlContext.read.json("/b.json")
+---+----+------+
|age|name|weight|
+---+----+------+
| 35| Ken| null|
| 30| Bob| 80|
| 29| Meg| 45|
+---+----+------+
今後
Spark StreamingやMllib関連もこちらに追記していくかもしれない。