目的
Sparkのよく使うAPIを(主に自分用に)メモしておくことで、久しぶりに開発するときでもサクサク使えるようにしたい。とりあえずPython版をまとめておきます(Scala版も時間があれば加筆するかも)
このチートシートはあくまでチートシートなので(引数が省略してあったりします)、時間がある方はきちんと公式APIドキュメント(Spark Python API Docs)を見て下さい。
Spark API チートシート(Python)
以下では次を前提とする
from pyspark import SparkContext
from pyspark.sql import SQLContext
sc = SparkContext()
sqlContext = SQLContext(sc)
RDD
RDDを作る(データの読み込み)
parallelize
sc.parallelize(collection)
リストやタプルからRDDを作る
>>> a = [1, 2, 3, 4, 5]
>>> rdd = sc.parallelize(a)
textFile
sc.textFile(file)
ファイルを読み込む。ワイルドカードや正規表現も使える。
>>> rdd = sc.textFile("./words.txt")
wholeTextFiles
sc.wholeTextFiles(dierctory)
ディレクトリの各ファイルの内容全体をそれぞれRDDの一つの要素に入力する
# $ ls
# a.json b.json c.json
>>> rdd = sc.textWholeFiles("./")
Action
Actionが実行されると初めてTransformationが順に実行される(遅延実行)
要素を返すもの
collect
collect()
全ての要素を返す
>>> print(rdd.collect())
[1, 2, 3, 4, 5]
take
take(n)
最初n個の要素を返す
>>> print(rdd.take(3))
[1, 2, 3]
first
first()
一番最初の要素を返す
>>> rdd = sc.parallelize([1, 2, 3])
>>> rdd.first()
1
top
top(n)
大きいものからn個要素を返す
>>> rdd = sc.parallelize([1, 2, 3])
>>> rdd.top(2)
[3, 2]
(統計)量を返すもの
count
count()
要素数を数えて返す
>>> rdd = sc.parallelize([1, 2, 3])
>>> rdd.count()
3
mean
mean()
平均を返す
>>> rdd = sc.parallelize([1, 2, 3])
>>> rdd.mean()
3.0
sum
sum()
合計を返す
>>> rdd = sc.parallelize([1, 2, 3])
>>> rdd.sum()
6
variance
variance()
分散を返す
>>> rdd = sc.parallelize([1, 2, 3])
>>> rdd.variance()
0.6666666666666666
stdev
stdev()
標準偏差を返す
>>> rdd = sc.parallelize([1, 2, 3])
>>> rdd.stdev()
0.816496580927726
保存するもの
saveAsTextFile
saveAsTextFile(file)
ファイルを保存する
>>> rdd.saveAsTextFile("./a.txt")
Transformation
Transformationはimmutableな新しいRDDを返す
filter/map/reduce
filter
filter(f)
fが真となる要素だけを含むRDDを返す
>>> rdd = sc.parallelize([1, 2, 3])
>>> rdd.filter(lambda x: x % 2 == 0).collect()
[2]
map
map(f)
全ての要素にfを作用させたRDDを返す
>>> rdd = sc.parallelize([1, 2, 3])
>>> rdd.map(lambda x: x * 2).collect()
[2, 4, 6]
flatMap
flatMap(f)
全ての要素にfを作用させたあと、要素内のリストを展開したRDDを返す
>>> rdd = sc.parallelize(["This is a pen", "This is an apple"])
>>> rdd.flatMap(lambda x: x.split()).collect()
['This', 'is', 'a', 'pen', 'This', 'is', 'an', 'apple']
Reduce
reduce(f)
二つの要素にfを作用させ続けて一つの返り値を得る
>>> rdd = sc.parallelize([1, 2, 3])
>>> rdd.reduce(lambda x, y: x + y)
6
ペアRDDに対する操作
ペアRDDをつくる
ペアRDDはPythonでいうTupleを要素に持つRDD。keyとvalueを扱うことができる。
作り方はkeyBy
を使うかmap
で要素数2のタプルを要素に返す。
keyBy(PairRDD)
keyBy(f)
普通のRDDの要素にfを作用させ、その返り値をkeyに、元の要素をそのままvalueにしたRDDを返す
>>> rdd = sc.parallelize(["Ken 27 180 83", "Bob 32 170 65", "Meg 29 165 45"])
>>> rdd.keyBy(lambda x: x.split()[0]).collect()
[('Ken', 'Ken 27 180 83'), ('Bob', 'Bob 32 170 65'), ('Meg', 'Meg 29 165 45')]
keys
keys
ペアRDDのkeyだけからなるRDDを返す
>>> rdd = sc.parallelize([("Ken", 2), ("Bob", 3), ("Taka", 1), ("Ken", 3), ("Bob", 2)])
>>> rdd.keys().collect()
['Ken', 'Bob', 'Taka', 'Ken', 'Bob']
values
values
ペアRDDのvlaueだけからなるRDDを返す
>>> rdd = sc.parallelize([("Ken", 2), ("Bob", 3), ("Taka", 1), ("Ken", 3), ("Bob", 2)])
>>> rdd.values().collect()
[2, 3, 1, 3, 2]
flatMapValues
flatMapValues(f)
PairRDDのvalueにflatMapを作用させてkeyを複製して所謂縦持ちにする
>>> rdd = sc.parallelize([("Ken", "Yumi,Yukiko"), ("Bob", "Meg, Tomomi, Akira"), ("Taka", "Yuki")])
>>> rdd.flatMapValues(lambda x: x.split(","))
[('Ken', 'Yumi'),
('Ken', 'Yukiko'),
('Bob', 'Meg'),
('Bob', ' Tomomi'),
('Bob', ' Akira'),
('Taka', 'Yuki')]
reduceByKey
reduceByKey(f)
同じkeyの要素でグループ化してvalueにreduceを作用させる
>>> rdd = sc.parallelize([("Ken", 2), ("Bob", 3), ("Taka", 1), ("Ken", 3), ("Bob", 2)])
>>> rdd.reduceByKey(lambda x, y: x + y).collect()
[('Taka', 1), ('Bob', 5), ('Ken', 5)]
countByKey
countByKey()
同じkeyの値がいくつあるか数えてdictで返す
>>> rdd = sc.parallelize([("Ken", 2), ("Bob", 3), ("Taka", 1), ("Ken", 3), ("Bob", 2)])
>>> rdd.countByKey()
defaultdict(<type 'int'>, {'Ken': 2, 'Bob': 2, 'Taka': 1})
sortByKey
sortByKey
ペアRDDをkeyでソートします
>>> rdd = sc.parallelize([("cba", 2), ("abc", 3), ("bac", 1), ("bbb",
>>> rdd.sortByKey().collect()
[('aaa', 2), ('abc', 3), ('bac', 1), ('bbb', 3), ('cba', 2)]
Join操作
leftOuterJoin
二つのRDDをleft outer joinして、valueに二つの要素のタプルをもつペアRDDを返す
>>> rdd1 = sc.parallelize([("Ken", 1), ("Bob", 2), ("Meg", 3)])
>>> rdd2 = sc.parallelize([("Ken", 1), ("Kaz", 3)])
>>> rdd1.leftOuterJoin(rdd2).collect()
[('Bob', (2, None)), ('Meg', (3, None)), ('Ken', (1, 1))]
rightOuterJoin
二つのRDDをright outer joinして、valueに二つの要素のタプルをもつペアRDDを返す
>>> rdd1 = sc.parallelize([("Ken", 1), ("Bob", 2), ("Meg", 3)])
>>> rdd2 = sc.parallelize([("Ken", 1), ("Kaz", 3)])
>>> rdd1.rightOuterJoin(rdd2).collect()
[('Ken', (1, 1)), ('Kaz', (3, None))]
fullOuterJoin
二つのRDDをfull outer joinして、valueに二つの要素のタプルをもつペアRDDを返す
>>> rdd1 = sc.parallelize([("Ken", 1), ("Bob", 2), ("Meg", 3)])
>>> rdd2 = sc.parallelize([("Ken", 1), ("Kaz", 3)])
>>> rdd1.fullOuterJoin(rdd2).collect()
[('Bob', (2, None)), ('Meg', (3, None)), ('Ken', (1, 1)), ('Kaz', (None, 3))]
ソート操作
sortBy
sortBy(f)
fの返す値によってソートする
>>> rdd = sc.parallelize([("cba", 2), ("abc", 3), ("bac", 1), ("bbb",
>>> rdd.sortBy(lambda (x, y): x).collect() # sortByKeyと同じ
集合操作など
intersection
intersection(rdd)
二つのRDDのintersectionを返す
union
union(rdd)
二つのRDDのunionを返す
zip
zip(rdd)
引数のrddの各要素をvlaueにしたペアRDDを返す
>>> rdd = sc.parallelize([("Ken", 2), ("Bob", 3), ("Taka", 1), ("Ken", 3), ("Bob", 2)])
>>> rdd.keys().zip(rdd.values())
[('Ken', 2), ('Bob', 3), ('Taka', 1), ('Ken', 3), ('Bob', 2)]
distinct
同じ要素を含まないRDDを返します
サンプリング操作
sample
sample(bool, frac)
サンプリングしたRDDを返す。第一引数で同じ要素の重複を許すか決める。
>>> rdd = sc.parallelize([1, 2, 3, 4, 5])
>>> rdd.sample(True, 0.5).collect()
[1, 5, 5]
>>> rdd.sample(False, 0.5).collect()
[1, 3, 5]
takeSample
takeSmaple(bool, size)
固定されたサイズのサンプルをリストで返す。第一引数で同じ要素の重複を許すか決める。
>>> rdd = sc.parallelize([1, 2, 3, 4, 5])
>>> rdd.takeSample(True, 2)
[5, 5]
>>> rdd.takeSample(False, 2)
[3, 5]
デバッグ
toDebugString
toDebugString()
実行計画を返す
print(rdd.flatMap(lambda x: x.split()).map(lambda x: (x, 1)).reduceByKey(lambda x, y: x + y).toDebugString())
(1) PythonRDD[190] at RDD at PythonRDD.scala:43 []
| MapPartitionsRDD[189] at mapPartitions at PythonRDD.scala:374 []
| ShuffledRDD[188] at partitionBy at null:-1 []
+-(1) PairwiseRDD[187] at reduceByKey at <ipython-input-114-71f5cb742e13>:1 []
| PythonRDD[186] at reduceByKey at <ipython-input-114-71f5cb742e13>:1 []
| ParallelCollectionRDD[141] at parallelize at PythonRDD.scala:423 []
永続化
persist
persist()
RDDをそのまま(デフォルトではメモリに)キャッシュする。メモリだけ、メモリが無理ならディスク、ディスクだけ、などの設定が出来る(StorageLevel
で指定)
>>> rdd.persist()
unpersist
unpersist()
RDDの永続化を解く。永続化レベルを変える時などに使う。
>>> from pyspark import StorageLevel
>>> rdd.persist()
>>> rdd.unpersist()
>>> rdd.persist(StorageLevel.DISK_ONLY)
よくある例
随時追加していく予定
word count
>>> rdd.flatMap(lambda x: x.split())\
.map(lambda x: (x, 1))\
.reduceByKey(lambda x, y: x + y)\
.take(5)
DataFrame
特に構造化データを扱うときはこちらの方が便利。
DataFrameをつくる(データの読み込み)
read.json
read.json(file)
jsonからデータを読み込む
# $ cat a.json
# {"name":"Ken", "age":35}
# {"name":"Bob", "age":30, "weight":80}
# {"name":"Meg", "age":29, "weight":45}
df = sqlContext.read.json("a.json")
DataFrameを表示する
RDDと同じcollect
, take
の他にshow
がある
show
show(n)
n行表示する(nはデフォルトで20)
>>> df.show()
+---+----+------+
|age|name|weight|
+---+----+------+
| 35| Ken| null|
| 30| Bob| 80|
| 29| Meg| 45|
+---+----+------+
DataFrameの操作
select
select(column)
stringかColumnオブジェクトを渡してselectしたDataFrameを返す。カラムを列挙して複数列取得したり、演算することもできる。
>>> df.select("age").show()
+---+
|age|
+---+
| 35|
| 30|
| 29|
+---+
# 次も同じ
>>> df.select(df.age).show() # Columnオブジェクトを渡す
>>> df.select(df["age"]).show() # Columnオブジェクトを渡す
>>> df.select(df.name, df.age).show()
+----+---+
|name|age|
+----+---+
| Ken| 35|
| Bob| 30|
| Meg| 29|
+----+---+
DataframeのColumnオブジェクト
selectで渡すColumnオブジェクトのアクセスの仕方として、Pythonでは次の2パターンが用意されている:
>>> df.age
Column<age>
>>> df["age"]
Column<age>
where/filter
filter(condition)
stringの条件に合う行だけからなるDataFrameを返す。 where
はfilter
のaliasである。
>>> df.where(df.age >=30).show()
+---+----+------+
|age|name|weight|
+---+----+------+
| 35| Ken| null|
| 30| Bob| 80|
+---+----+------+
sort
sort(column)
指定されたカラムでソートされたDataFrameを返す
>>> df.sort(df.age)
+---+----+------+
|age|name|weight|
+---+----+------+
| 29| Meg| 45|
| 30| Bob| 80|
| 35| Ken| null|
+---+----+------+
limit
limit(n)
先頭n行だけに制限したDataFrameを返す
>>> df.limit(1).show()
+---+----+------+
|age|name|weight|
+---+----+------+
| 35| Ken| null|
+---+----+------+
distinct
distinct()
distinctした結果の行だけからなるDataFrameを返す
>>> df.distinct().count()
3
join
join(dataframe, on, how)
howのデフォルトはinner
- on: カラム、もしくはカラムのリスト
- how:
"inner"
,"outer"
,"left_outer"
,"right_outer"
,"leftsemi"
のいずれか
DataframeからRDDへ変換
DataFrameはRDD上に構築されているので、元となるRDDを取り出すことができる
>>> print(df.rdd.collect())
[Row(age=35, name=u'Ken', weight=None),
Row(age=30, name=u'Bob', weight=80),
Row(age=29, name=u'Meg', weight=45)]
特定の列だけ取り出すにはRow
オブジェクトの対応する属性にアクセスする
df.rdd.map(lambda row: (row.age, row.weight)).collect()
[(35, None), (30, 80), (29, 45)]
Dataframeを保存する
toJson
toJson()
jsonの形でRDDに変換する。このあとsaveAsTextFile
を呼べばjson形式で保存できる。
>>> df.toJSON().saveAsTextFile("b.json")
>>> df2 = sqlContext.read.json("/b.json")
+---+----+------+
|age|name|weight|
+---+----+------+
| 35| Ken| null|
| 30| Bob| 80|
| 29| Meg| 45|
+---+----+------+
今後
Spark StreamingやMllib関連もこちらに追記していくかもしれない。