LoginSignup
136
114

More than 3 years have passed since last update.

Spark API チートシート

Last updated at Posted at 2016-07-08

目的

Sparkのよく使うAPIを(主に自分用に)メモしておくことで、久しぶりに開発するときでもサクサク使えるようにしたい。とりあえずPython版をまとめておきます(Scala版も時間があれば加筆するかも)

このチートシートはあくまでチートシートなので(引数が省略してあったりします)、時間がある方はきちんと公式APIドキュメント(Spark Python API Docs)を見て下さい。

Spark API チートシート(Python)

以下では次を前提とする

from pyspark import SparkContext
from pyspark.sql import SQLContext

sc = SparkContext()
sqlContext = SQLContext(sc)

RDD

RDDを作る(データの読み込み)

parallelize

sc.parallelize(collection) リストやタプルからRDDを作る

>>> a = [1, 2, 3, 4, 5]
>>> rdd = sc.parallelize(a)
textFile

sc.textFile(file) ファイルを読み込む。ワイルドカードや正規表現も使える。

>>> rdd = sc.textFile("./words.txt")
wholeTextFiles

sc.wholeTextFiles(dierctory) ディレクトリの各ファイルの内容全体をそれぞれRDDの一つの要素に入力する

# $ ls
# a.json b.json c.json
>>> rdd = sc.textWholeFiles("./")

Action

Actionが実行されると初めてTransformationが順に実行される(遅延実行)

要素を返すもの

collect

collect() 全ての要素を返す

>>> print(rdd.collect())
[1, 2, 3, 4, 5]
take

take(n) 最初n個の要素を返す

>>> print(rdd.take(3))
[1, 2, 3]
first

first() 一番最初の要素を返す

>>> rdd = sc.parallelize([1, 2, 3])
>>> rdd.first()
1
top

top(n) 大きいものからn個要素を返す

>>> rdd = sc.parallelize([1, 2, 3])
>>> rdd.top(2)
[3, 2]

(統計)量を返すもの

count

count() 要素数を数えて返す

>>> rdd = sc.parallelize([1, 2, 3])
>>> rdd.count()
3
mean

mean() 平均を返す

>>> rdd = sc.parallelize([1, 2, 3])
>>> rdd.mean()
3.0
sum

sum() 合計を返す

>>> rdd = sc.parallelize([1, 2, 3])
>>> rdd.sum()
6
variance

variance() 分散を返す

>>> rdd = sc.parallelize([1, 2, 3])
>>> rdd.variance()
0.6666666666666666
stdev

stdev() 標準偏差を返す

>>> rdd = sc.parallelize([1, 2, 3])
>>> rdd.stdev()
0.816496580927726

保存するもの

saveAsTextFile

saveAsTextFile(file) ファイルを保存する

>>> rdd.saveAsTextFile("./a.txt")

Transformation

Transformationはimmutableな新しいRDDを返す

filter/map/reduce

filter

filter(f) fが真となる要素だけを含むRDDを返す

>>> rdd = sc.parallelize([1, 2, 3])
>>> rdd.filter(lambda x: x % 2 == 0).collect()
[2]
map

map(f) 全ての要素にfを作用させたRDDを返す

>>> rdd = sc.parallelize([1, 2, 3])
>>> rdd.map(lambda x: x * 2).collect()
[2, 4, 6]
flatMap

flatMap(f) 全ての要素にfを作用させたあと、要素内のリストを展開したRDDを返す

>>> rdd = sc.parallelize(["This is a pen", "This is an apple"])
>>> rdd.flatMap(lambda x: x.split()).collect()
['This', 'is', 'a', 'pen', 'This', 'is', 'an', 'apple']
Reduce

reduce(f) 二つの要素にfを作用させ続けて一つの返り値を得る

>>> rdd = sc.parallelize([1, 2, 3])
>>> rdd.reduce(lambda x, y: x + y)
6

ペアRDDに対する操作

ペアRDDをつくる

ペアRDDはPythonでいうTupleを要素に持つRDD。keyとvalueを扱うことができる。
作り方はkeyByを使うかmapで要素数2のタプルを要素に返す。

keyBy(PairRDD)

keyBy(f) 普通のRDDの要素にfを作用させ、その返り値をkeyに、元の要素をそのままvalueにしたRDDを返す

>>> rdd = sc.parallelize(["Ken 27 180 83", "Bob 32 170 65", "Meg 29 165 45"])
>>> rdd.keyBy(lambda x: x.split()[0]).collect()
[('Ken', 'Ken 27 180 83'), ('Bob', 'Bob 32 170 65'), ('Meg', 'Meg 29 165 45')]
keys

keys ペアRDDのkeyだけからなるRDDを返す

>>> rdd = sc.parallelize([("Ken", 2), ("Bob", 3), ("Taka", 1), ("Ken", 3), ("Bob", 2)])
>>> rdd.keys().collect()
['Ken', 'Bob', 'Taka', 'Ken', 'Bob']
values

values ペアRDDのvlaueだけからなるRDDを返す

>>> rdd = sc.parallelize([("Ken", 2), ("Bob", 3), ("Taka", 1), ("Ken", 3), ("Bob", 2)])
>>> rdd.values().collect()
[2, 3, 1, 3, 2]
flatMapValues

flatMapValues(f) PairRDDのvalueにflatMapを作用させてkeyを複製して所謂縦持ちにする

>>> rdd = sc.parallelize([("Ken", "Yumi,Yukiko"), ("Bob", "Meg, Tomomi, Akira"), ("Taka", "Yuki")])
>>> rdd.flatMapValues(lambda x: x.split(","))
[('Ken', 'Yumi'),
 ('Ken', 'Yukiko'),
 ('Bob', 'Meg'),
 ('Bob', ' Tomomi'),
 ('Bob', ' Akira'),
 ('Taka', 'Yuki')]
reduceByKey

reduceByKey(f) 同じkeyの要素でグループ化してvalueにreduceを作用させる

>>> rdd = sc.parallelize([("Ken", 2), ("Bob", 3), ("Taka", 1), ("Ken", 3), ("Bob", 2)])
>>> rdd.reduceByKey(lambda x, y: x + y).collect()
[('Taka', 1), ('Bob', 5), ('Ken', 5)]
countByKey

countByKey() 同じkeyの値がいくつあるか数えてdictで返す

>>> rdd = sc.parallelize([("Ken", 2), ("Bob", 3), ("Taka", 1), ("Ken", 3), ("Bob", 2)])
>>> rdd.countByKey()
defaultdict(<type 'int'>, {'Ken': 2, 'Bob': 2, 'Taka': 1})
sortByKey

sortByKey ペアRDDをkeyでソートします

>>> rdd = sc.parallelize([("cba", 2), ("abc", 3), ("bac", 1), ("bbb", 
>>> rdd.sortByKey().collect()
[('aaa', 2), ('abc', 3), ('bac', 1), ('bbb', 3), ('cba', 2)]

Join操作

leftOuterJoin

二つのRDDをleft outer joinして、valueに二つの要素のタプルをもつペアRDDを返す

>>> rdd1 = sc.parallelize([("Ken", 1), ("Bob", 2), ("Meg", 3)])
>>> rdd2 = sc.parallelize([("Ken", 1), ("Kaz", 3)])
>>> rdd1.leftOuterJoin(rdd2).collect()
[('Bob', (2, None)), ('Meg', (3, None)), ('Ken', (1, 1))]
rightOuterJoin

二つのRDDをright outer joinして、valueに二つの要素のタプルをもつペアRDDを返す

>>> rdd1 = sc.parallelize([("Ken", 1), ("Bob", 2), ("Meg", 3)])
>>> rdd2 = sc.parallelize([("Ken", 1), ("Kaz", 3)])
>>> rdd1.rightOuterJoin(rdd2).collect()
[('Ken', (1, 1)), ('Kaz', (3, None))]
fullOuterJoin

二つのRDDをfull outer joinして、valueに二つの要素のタプルをもつペアRDDを返す

>>> rdd1 = sc.parallelize([("Ken", 1), ("Bob", 2), ("Meg", 3)])
>>> rdd2 = sc.parallelize([("Ken", 1), ("Kaz", 3)])
>>> rdd1.fullOuterJoin(rdd2).collect()
[('Bob', (2, None)), ('Meg', (3, None)), ('Ken', (1, 1)), ('Kaz', (None, 3))]

ソート操作

sortBy

sortBy(f) fの返す値によってソートする

>>> rdd = sc.parallelize([("cba", 2), ("abc", 3), ("bac", 1), ("bbb", 
>>> rdd.sortBy(lambda (x, y): x).collect() # sortByKeyと同じ

集合操作など

intersection

intersection(rdd) 二つのRDDのintersectionを返す

union

union(rdd) 二つのRDDのunionを返す

zip

zip(rdd) 引数のrddの各要素をvlaueにしたペアRDDを返す

>>> rdd = sc.parallelize([("Ken", 2), ("Bob", 3), ("Taka", 1), ("Ken", 3), ("Bob", 2)])
>>> rdd.keys().zip(rdd.values())
[('Ken', 2), ('Bob', 3), ('Taka', 1), ('Ken', 3), ('Bob', 2)]
distinct

同じ要素を含まないRDDを返します

サンプリング操作

sample

sample(bool, frac) サンプリングしたRDDを返す。第一引数で同じ要素の重複を許すか決める。

>>> rdd = sc.parallelize([1, 2, 3, 4, 5])
>>> rdd.sample(True, 0.5).collect()
[1, 5, 5]
>>> rdd.sample(False, 0.5).collect()
[1, 3, 5]
takeSample

takeSmaple(bool, size) 固定されたサイズのサンプルをリストで返す。第一引数で同じ要素の重複を許すか決める。

>>> rdd = sc.parallelize([1, 2, 3, 4, 5])
>>> rdd.takeSample(True, 2)
[5, 5]
>>> rdd.takeSample(False, 2)
[3, 5]

デバッグ

toDebugString

toDebugString() 実行計画を返す

print(rdd.flatMap(lambda x: x.split()).map(lambda x: (x, 1)).reduceByKey(lambda x, y: x + y).toDebugString())
(1) PythonRDD[190] at RDD at PythonRDD.scala:43 []
 |  MapPartitionsRDD[189] at mapPartitions at PythonRDD.scala:374 []
 |  ShuffledRDD[188] at partitionBy at null:-1 []
 +-(1) PairwiseRDD[187] at reduceByKey at <ipython-input-114-71f5cb742e13>:1 []
    |  PythonRDD[186] at reduceByKey at <ipython-input-114-71f5cb742e13>:1 []
    |  ParallelCollectionRDD[141] at parallelize at PythonRDD.scala:423 []

永続化

persist

persist() RDDをそのまま(デフォルトではメモリに)キャッシュする。メモリだけ、メモリが無理ならディスク、ディスクだけ、などの設定が出来る(StorageLevelで指定)

>>> rdd.persist()
unpersist

unpersist() RDDの永続化を解く。永続化レベルを変える時などに使う。

>>> from pyspark import StorageLevel
>>> rdd.persist()
>>> rdd.unpersist()
>>> rdd.persist(StorageLevel.DISK_ONLY)

よくある例

随時追加していく予定

word count
>>> rdd.flatMap(lambda x: x.split())\
       .map(lambda x: (x, 1))\
       .reduceByKey(lambda x, y: x + y)\
       .take(5)

DataFrame

特に構造化データを扱うときはこちらの方が便利。

DataFrameをつくる(データの読み込み)

read.json

read.json(file) jsonからデータを読み込む

# $ cat a.json
# {"name":"Ken", "age":35}
# {"name":"Bob", "age":30, "weight":80}
# {"name":"Meg", "age":29, "weight":45}
df = sqlContext.read.json("a.json")

DataFrameを表示する

RDDと同じcollect, takeの他にshow がある

show

show(n) n行表示する(nはデフォルトで20)

>>> df.show()
+---+----+------+
|age|name|weight|
+---+----+------+
| 35| Ken|  null|
| 30| Bob|    80|
| 29| Meg|    45|
+---+----+------+

DataFrameの操作

select

select(column) stringかColumnオブジェクトを渡してselectしたDataFrameを返す。カラムを列挙して複数列取得したり、演算することもできる。

>>> df.select("age").show()
+---+
|age|
+---+
| 35|
| 30|
| 29|
+---+

# 次も同じ
>>> df.select(df.age).show() # Columnオブジェクトを渡す
>>> df.select(df["age"]).show() # Columnオブジェクトを渡す
>>> df.select(df.name, df.age).show()
+----+---+
|name|age|
+----+---+
| Ken| 35|
| Bob| 30|
| Meg| 29|
+----+---+
DataframeのColumnオブジェクト

selectで渡すColumnオブジェクトのアクセスの仕方として、Pythonでは次の2パターンが用意されている:

>>> df.age
Column<age>
>>> df["age"]
Column<age>
where/filter

filter(condition) stringの条件に合う行だけからなるDataFrameを返す。 wherefilterのaliasである。

>>> df.where(df.age >=30).show()
+---+----+------+
|age|name|weight|
+---+----+------+
| 35| Ken|  null|
| 30| Bob|    80|
+---+----+------+
sort

sort(column) 指定されたカラムでソートされたDataFrameを返す

>>> df.sort(df.age)
+---+----+------+
|age|name|weight|
+---+----+------+
| 29| Meg|    45|
| 30| Bob|    80|
| 35| Ken|  null|
+---+----+------+
limit

limit(n) 先頭n行だけに制限したDataFrameを返す

>>> df.limit(1).show()
+---+----+------+
|age|name|weight|
+---+----+------+
| 35| Ken|  null|
+---+----+------+
distinct

distinct() distinctした結果の行だけからなるDataFrameを返す

>>> df.distinct().count()
3
join

join(dataframe, on, how) howのデフォルトはinner

  • on: カラム、もしくはカラムのリスト
  • how: "inner", "outer", "left_outer", "right_outer", "leftsemi" のいずれか

DataframeからRDDへ変換

DataFrameはRDD上に構築されているので、元となるRDDを取り出すことができる

>>> print(df.rdd.collect())
[Row(age=35, name=u'Ken', weight=None),
 Row(age=30, name=u'Bob', weight=80),
 Row(age=29, name=u'Meg', weight=45)]

特定の列だけ取り出すにはRowオブジェクトの対応する属性にアクセスする

df.rdd.map(lambda row: (row.age, row.weight)).collect()
[(35, None), (30, 80), (29, 45)]

Dataframeを保存する

toJson

toJson() jsonの形でRDDに変換する。このあとsaveAsTextFileを呼べばjson形式で保存できる。

>>> df.toJSON().saveAsTextFile("b.json")
>>> df2 = sqlContext.read.json("/b.json")
+---+----+------+
|age|name|weight|
+---+----+------+
| 35| Ken|  null|
| 30| Bob|    80|
| 29| Meg|    45|
+---+----+------+

今後

Spark StreamingやMllib関連もこちらに追記していくかもしれない。

136
114
3

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
136
114