17
21

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 3 years have passed since last update.

Apache Spark を Jupyter Notebook で試す (on ローカル Docker

Posted at

以前 Spark を使ってたのですが今は使ってなくて,
そのうち忘れそうなので基本をメモしておくことにしました.

(全体的に聞きかじりの知識なので間違ってる点はコメント・編集リクエストを期待します)

使う

Jupyter + PySpark な環境が動く Docker イメージが用意されているので,ローカルで試すには便利です:
https://hub.docker.com/r/jupyter/pyspark-notebook/

PySpark とは,という話ですが,Spark 自体は Scala だけど,
Python で使えるやつがあってそれが PySpark だという話があります.

IPC でがんばってるという仕組みになっていたはずなので,
Scala <-> Python の変換のコストが結構でかいうんぬんみたいな話題もあります.

さて,使ってみましょう:

docker run -it -p 8888:8888 jupyter/pyspark-notebook

これを実行すると Terminal に 8888 番にトークンがついた URL が流れてくるので,
(To access the notebook, ... のあたり)
おもむろにアクセスすると Jupyter のページが出てきて,
Jupyter Notebook でコーディングできる簡単環境のできあがりです.

Home_Page_-_Select_or_create_a_notebook.png

ここの New から Notebook: Python3 を選択すれば Notebook を開けます

Untitled_-_Jupyter_Notebook.png

試す

動くかどうかのテストコードは以下で,サンプルからとってきました (https://jupyter-docker-stacks.readthedocs.io/en/latest/using/specifics.html#in-a-python-notebook):

from pyspark.sql import SparkSession

spark: SparkSession = SparkSession.builder.appName("SimpleApp").getOrCreate()

# do something to prove it works
spark.sql('SELECT "Test" as c1').show()

SparkSession というやつはよくわからないけど,
Spark 自体のインスタンスみたいなものという認識です.

これを実行して表がでれば OK です:

Untitled_-_Jupyter_Notebook.png

データを扱う

こういうデータを対象にしてみましょう:

id name gender age
1 サトシ male 10
2 シゲル male 10
3 カスミ female 12

入力と定義

Python で素朴にデータを定義するとこうなりますね:

from typing import List, Tuple

Trainer = Tuple[int, str, str, int]
trainers: List[Trainer] = [
    (1, 'サトシ', 'male',   10),
    (2, 'シゲル', 'male',   10),
    (3, 'カスミ', 'female', 12),
]

各行の型は Python の typing でいう Tuple[int, str, str, int] となりますね.

で,Spark でもスキーマの定義があります:

from pyspark.sql.types import StructField, StructType, StringType, IntegerType

trainers_schema = StructType([
    StructField('id',      IntegerType(), True),
    StructField('name',    StringType(),  True),
    StructField('gender',  StringType(),  True),
    StructField('age',     IntegerType(), True),
])

これで Spark 側での列のスキーマを定義できます.

Python で定義したデータを Spark の DataFrame に変換するにはこうします:

from pyspark.sql import DataFrame

trainers_df: DataFrame = spark.createDataFrame(
    spark.sparkContext.parallelize(trainers),
    trainers_schema
)

これで trainers_df という DataFrame ができました.

データソースとして CSV とか MySQL とかそういうものから読み込めるので,
実際にはコード上で定義するよりそういうデータソースから読み込むことになるでしょう.
(場合により後述する JDBC とか,Hadoop の設定が必要です)

これをダンプして確認したい場合はこうします:

trainers_df.show()

そうすると,表に整形されたテキストが数行出力されます:

Untitled_-_Jupyter_Notebook.png

+---+------+------+---+
| id|  name|gender|age|
+---+------+------+---+
|  1|サトシ|  male| 10|
|  2|シゲル|  male| 10|
|  3|カスミ|female| 12|
+---+------+------+---+

集計と出力

ダンプではなく値をもらうには .collect() すればいいです:

result = trainers_df.collect()
print(result)

CSV に書き出すときはこういう雰囲気で DataFrame を書き出します:

trainers_df.coalesce(1).write.mode('overwrite').csv("path/to/output.csv")

入力同様,他にも S3, MySQL とか Elasticsearch とかいろいろ出力先がある雰囲気です.

.coalesce(1) はパーティションごとの分割されているデータを,
1つのパーティションに coalesce するというものです.
こうしないと,分割されたまま CSV 出力されます.

Hadoop の hdfs コマンドをつかって,
分割されたものを1つにまとめて取得するという手段もあります.

基本的に遅延評価になっていて,
.collect() みたいな操作をしてはじめて評価されるようになっているので,
そんなに頻繁に集計はしないはずです.

基本

これだけではただ表示しただけでまったく意味がないので適当な操作をしてみましょう:

trainers_df.createOrReplaceTempView('trainers');

male_trainers_df = spark.sql('''
    SELECT *
    FROM   trainers
    WHERE  gender = 'male'
''')
male_trainers_df.show()

これはこういう結果を得ます:

id name gender age
1 サトシ male 10
2 シゲル male 10

DataFrame.createOrReplaceTempView(name)DataFrame を,
一時的な SQL の View として登録することができるものです.

これで spark.sql(query) で登録した View を対象に SQL の操作した結果の DF を得ることができるので,
こうすれば,全く臆することなく慣れ親しんだ SQL を使って Spark を使うことができて,
心理的障壁も学習コストも低いというマジックになっています.

View に登録しなくても,DataFrame のままコードで記述するという方法もあります:

male_trainers_df = trainers_df.filter(trainers_df['gender'] == 'male')

こっちのほうが使いやすいケースもあるのでケースバイケースですね.

応用

SQL を使うことができるのだから,基本的な操作では別に問題ないのですが,
たいてい Spark を使いたいケースというのはなにかユーザー定義の操作をしたい状況になっていそうですね.

たとえば自分が過去やりたかったケースとしては,
「記事本文を形態素解析して分かち書きする」というものがあって,
これは SQL だけでは実現しがたいですね.

ただ,Python 上であれば MeCab があるので,
MeCab のライブラリを使って形態素解析してやれば何も考えなくても分解されてやってくるので,
僕のように全然わかってなくてもとりあえず MeCab に投げればいけるという手段をとれます.

そういう操作を Spark 上で DataFrame に対して行うにはどうすればいいかというと,
UDF (User-Defined Function) を定義するといいです.

(※ DataFrame ではなく RDD というものに対しては直接 lambda を適用できるという技がありますが,
  これはパフォーマンスが悪いというのがあります).

UDF を定義するには次のような定義を行います:

from pyspark.sql.functions import udf

@udf(StringType())
def name_with_suffix(name: str, gender: str) -> str:
    return name + {'male': 'くん', 'female': 'さん'}.get(gender, '')

spark.udf.register('name_with_suffix', name_with_suffix)

UDF となる関数に @udf(ReturnType) デコレーターを適用することで,
その関数は UDF として定義できるようになります.
それを Spark SQL で使うには spark.udf.register(udf_name, udf) して登録すれば,
COUNT() とかと同じ用にそのまま使えます.

ちなみにデコレーターを使わなくても,udf_fn = udf(fn) すれば既存の関数を適用できます.

この例としてあげたものは gender に応じて,
namegender に応じた suffix をつけるというものです.
この関数を UDF として適用してみましょう:

dearest_trainers = spark.sql('''
    SELECT name_with_suffix(name, gender)
    FROM   trainers
''')
dearest_trainers.show()

結果はこうなります:

name_with_suffix(name, gender)
サトシくん
シゲルくん
カスミさん

今回の例であれば SQL でも CASE を駆使して書けるというご意見がありますが,そのとおりです.

やりたいことによっては便利に使えるでしょう.

UDF

さて,前述した形態素解析して分かち書きするというものですが,
これはイメージとしてこのような関数になります
(実際には MeCab をカッコよく使います):

import re

# 半角/全角スペースや約物で文字列を分割する
@udf(ArrayType(StringType()))
def wakachi(text: str) -> List[str]:
    return [
        word
        for word
        in re.split('[  !…]+', text)
        if len(word) > 0
    ]

これを適用するのも同じくそのまま使えば OK です.
サンプルコードを今一度データを変更しつつ書いてみます:

Trainer = Tuple[int, str, str, int, str]
trainers: List[Trainer] = [
    (1, 'サトシ', 'male',   10, 'ポケモン ゲット だぜ'),
    (2, 'シゲル', 'male',   10, 'このおれさまが せかいで いちばん! つよいって ことなんだよ!'),
    (3, 'カスミ', 'female', 12, 'わたしの ポリシーはね… みず タイプ ポケモンで せめて せめて …せめまくる ことよ!'),
]

trainers_schema = StructType([
    StructField('id',      IntegerType(), True),
    StructField('name',    StringType(),  True),
    StructField('gender',  StringType(),  True),
    StructField('age',     IntegerType(), True),
])

trainers_df = spark.createDataFrame(
    spark.sparkContext.parallelize(trainers),
    trainers_schema
)
trainers_df.createOrReplaceTempView('trainers');

wakachi_trainers_df = spark.sql('''
    SELECT id, name, wakachi(comment)
    FROM   trainers
''')
wakachi_trainers_df.show()

ここでポイントになるのは,
今回の UDF は str を受け取って List[str] として展開するということですね.
これを実行してみるとこうなります:

id name wakachi(comment)
1 サトシ [ポケモン, ゲット, だぜ]
2 シゲル [このおれさまが, せかいで, い...
3 カスミ [わたしの, ポリシーはね, みず...

展開されたセルはリストになっていて,
列のなかに更に列がある入れ子状態になっています.

これをそれぞれの str を列として展開したい場合どうすればいいかというと,
展開する関数を更に適用すればいいです:

from pyspark.sql.functions import explode

wakachi_trainers_df = spark.sql('''
    SELECT id, name, explode(wakachi(comment))
    FROM   trainers
''')
wakachi_trainers_df.show()

explode という関数があるので,
これを適用すれば入れ子になった要素がそれぞれの列として展開されます:

id name col
1 サトシ ポケモン
1 サトシ ゲット
1 サトシ だぜ
2 シゲル このおれさまが
2 シゲル せかいで
2 シゲル いちばん
2 シゲル つよいって
2 シゲル ことなんだよ
3 カスミ わたしの
3 カスミ ポリシーはね
3 カスミ みず
3 カスミ タイプ
3 カスミ ポケモンで
3 カスミ せめて
3 カスミ せめて
3 カスミ せめまくる
3 カスミ ことよ

ジョイン

さらなるポイントとして DataFrame どうしの JOIN ができます.
普通の MySQL とかの JOIN と変わらずに結合につかうカラムを指定して,
それをもとに DataFrame を結合するものです.

サンプルコードを更に追加して JOIN を使ってみます:

Pkmn = Tuple[int, int, str, int]
pkmns: List[Pkmn] = [
    (1, 1, 'ピカチュウ', 99),
    (2, 1, 'リザードン', 99),
    (3, 2, 'イーブイ',   50),
    (4, 3, 'トサキント', 20),
    (5, 3, 'ヒトデマン', 30),
    (6, 3, 'スターミー', 40),
]
pkmns_schema = StructType([
    StructField('id',         IntegerType(), True),
    StructField('trainer_id', IntegerType(), True),
    StructField('name',       StringType(),  True),
    StructField('level',      IntegerType(), True),
])
pkmns_df = spark.createDataFrame(
    spark.sparkContext.parallelize(pkmns),
    pkmns_schema
)
pkmns_df.createOrReplaceTempView('pkmns');

trainer_and_pkmns_df = spark.sql('''
    SELECT     *
    FROM       trainers
    INNER JOIN pkmns
          ON   trainers.id = pkmns.trainer_id
''')
trainer_and_pkmns_df.show()
id name gender age comment id trainer_id name level
1 サトシ male 10 ポケモン ゲット だぜ 1 1 ピカチュウ 99
1 サトシ male 10 ポケモン ゲット だぜ 2 1 リザードン 99
3 カスミ female 12 わたしの ポリシーはね… みず タ... 4 3 トサキント 20
3 カスミ female 12 わたしの ポリシーはね… みず タ... 5 3 ヒトデマン 30
3 カスミ female 12 わたしの ポリシーはね… みず タ... 6 3 スターミー 40
2 シゲル male 10 このおれさまが せかいで いちばん... 3 2 イーブイ 50

ちなみに INNER JOIN, OUTER JOIN の他に種類がいっぱいあります.
こちらの記事がわかりやすいので引用します:

これで集合操作ができるので便利という感じです.

JOIN の概念はこのページのベン図がわかりやすいので引用します:

ポイントとしてやはり JOIN はコストがかかっていて遅いです.
クラスタを組んでたとしたら,各所に分散したデータから見つけて JOIN して戻してとかそういう操作が行われているようです.

ので,後述するパフォーマンスチューニングが必要になってきます.

パフォーマンス

現実のケースとして,膨大なデータセットと格闘するのはそうとう辛いものがあります.
というのも,4時間とかかかるものだったら,最後の方で落ちたらまたやり直しかとなって,
二回ミスると一日の業務時間を捧げたことになってしまって残業が確定します.

なので,そういうパフォーマンスを改善するために,JOIN の効率を上げるようにデータを削減したり,
パーティションの区切りかたを変えたり,
パーティションをなるべくクラスタ上に小さく断片化させないような工夫が必要です.

Broadcast Join というもので,あえて全クラスタにデータセットを重複して配置することで,
JOIN 時にデータセットの検索のコストを下げるとかそういうものもあります.

重要なテクニックとして,
各チェックポイント的なところで適宜 DataFrame を .cache() しておくことで,
パフォーマンスが劇的に改善されるというものもあります.

パフォーマンスについての公式ページを見るとそういうテクニックがあって参考になります:

MySQL

さて,よくあるのが MySQL のデータベースから読み込んでうんぬんしたいというのがあります.
このケースでは MySQL を扱うために JDBC の MySQL コネクタを用意する必要がありますが,
こちらのかたのエントリと,その Docker イメージが参考になります:

しかしながら,MySQL は Spark では扱いづらいみたいなところもあります.
(いろいろハマりポイントがあります)

実際

Spark が威力を発揮するのは:

  • データがとにかくでかい
  • 適用したい処理が互いに依存しない
  • 各操作に副作用がなくて内部の操作で完結する (外の API への操作とかがない)

というものだと思っています.

あと Spark は複数台でクラスタを作って worker に仕事をさせるのがキモなので,
現実的には AWS におまかせするということで,Amazon EMR とか AWS Glue でやるのがよさそうですね.
ローカルだとクラスタを作らずに動くので,本気の巨大データを打ち込んでもパフォーマンスは出ずに恩恵にはあずかれないからです.

メモリの限界にぶち当たったり,
節約できても処理全体にバッチ流して二週間かかりますみたいな巨大データだと,
素朴なものでも自前で分割して複数プロセスにわけて実行とかすればできるかもしれないけど,
Spark にできることなら任せるのもよさげです.

17
21
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
17
21

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?