Apache Sparkの学習記事です。
クラウドにおけるSparkの利用は別記事を作成したいと思います。
Apache Spark のカバー領域
データ分析タスクにおいてApache Spark
はデータ処理および分析処理の領域をカバーします。
Spark プログラム
SparkのAPIは、Python, Java, Scala, R の言語が用意されています。
今回リンクでpickupする言語は python(pyspark)
とします。
SparkのコンポーネントにSpark-shell
と呼ばれるものが存在します、これはSparkのコマンドをコマンドラインから入力して対話的に実行内容を確認することができます。
また、Sparkのアプリケーションを起動する際、spark-submit
のスクリプトを使用します。
./bin/spark-submit \
--class <main-class> \
--master <master-url> \
--deploy-mode <deploy-mode> \
--conf <key>=<value> \
... # other options
<application-jar> \
[application-arguments]
Spark Cluster
Spark はクラスターモード
と呼ばれるCluster Manager
を利用した構成を組む事が出来ます。クラスターモードのコンポーネントと動作は、以下のイメージです。
全体的な流れとして、spark-submitコマンドにより、Driver Programを起動した後、Cluster Managerに接続、要求したExecutorが起動して、プログラム上の処理をTasksとしてExecutorが処理を実行、計算結果を保存します。
SparkContextの停止(pyspark.SparkContext.stop()) が呼ばれると、Driver Programが終了し、Cluster Managerから取得したリソース群を開放するようなイメージです。
- Worker Node
クラスタ内に1つ以上のWorker Node
を構成し、Executor
(処理を実行し、アプリケーションのデータを格納するプロセス)とTasks
(ジョブの最小単位で実行される計算リソース)を複数持ちます。ExecutorはClusterManagerにより起動されます。
- Cluster Manager
Executorの起動を管理します。
クラスターマネージャは以下の構成からサポートします。
Standalone
、Mesos
、YARN
、Kubernetes
-
Driver Program
アプリケーションは
Driver Program
により起動されます。
SparkContext
オブジェクトがCluster Managerへ接続します。
SparkContextオブジェクトは、クラスター内のWorker Nodeで実行するtasksをexecutorへ送信し結果を回収します。
アプリケーションコードは、SparkContextから、executorに送信されます。
Spark CoreとRDDについて
Apache Sparkは、Spark Core
とその他複数のライブラリから構成されます。
Spark Core
Spark CoreはRDD
の概念を使用して Spark Context
、SparkConf
等のライブラリ、メソッドから構成されるAPIです。
-
Spark Context
Spark ライブラリ内で定義されるクラスの一つでSparkで操作を行う際の入り口となります。Sparkクラスタへの接続を行います。先述した Spark Contextの停止は
pyspark.SparkContext.stop() / sc.stop()
にて行われます。
pyspark.SparkContext
pyspark.SparkContext.stop
from pyspark import SparkContext
# SparkContextの開始
sc = pyspark.SparkContext()
# SparkContextの停止
sc.stop()
- SparkConf
Sparkライブラリ内で定義されるクラスの一つで、Sparkの構成を定義するために使用されます。SparkConf
へ設定値を記述し、SparkContextに渡す事が出来ます。
pyspark.SparkConf
from pyspark import SparkContext
from pyspark import SparkConf
conf = SparkConf()
conf.setMaster("local").setAppName("My app")
sc = pyspark.SparkContext(conf=conf)
- RDD(Resillent Distributed Dataset)
Sparkはデータの分散処理を基本機能としており、分散処理を行うためにデータを統合的に扱う必要があります。Spark では、RDD(Resillent Distributed Dataset)
という抽象化された分散処理用のデータセットを扱います。
pyspark.RDD
例として SparkContext.parallelize()
では、RDDを生成します。
pyspark.SparkContext.parallelize
from pyspark import SparkContext
# 省略#
# RDD(データセット)の生成
rdd = spark_context.parallelize([
(1, 'a'),
(2, 'b'),
(3, 'c'),
(4, 'd'),
])
RDDについて
Sparkは、データの分散処理を行う事をコンセプトに設計されています。
RDDは、各ノードに分散しているデータを単一のオブジェクトとして処理するためのAPIです。
以下が特徴となります。
1.処理種別と遅延評価
RDDの処理は2つあります。
Transform
- マップ、フィルター、読み取りなどの処理
Action
- 処理実行
Sparkでは、Actionが実行されるまでの前処理のTransformの処理は実行されません。これを遅延評価といいます。
そして Action により実行される処理の一連をジョブと呼びます。
2.イミュータブル
RDD は処理毎に新しく生成されるため、一度生成されたデータは不変性を保証しています。つまり、DAG
と呼ばれる有向非巡回性の原理に従っています。
3.フォールトトレラント
障害復旧機能として、データ変換のプロセスを記録し、障害が発生した処理ポイントから変換プロセスを実行し、データ復旧を実行します。
4.分散コレクション
クラスターモードであっても、論理的なコレクションがRDDで各サーバに分散されて処理が実行されます、内部的にはpartition
で区画されています。
データセットのAPI
Spark v2以降は、RDD
,DataFrame
,Dataset
の3種類のデータセットのAPIを扱う事が出来ます。v2以降、扱われるAPIとして DataFrame/Datasetが使用されますが、いずれも上述したRDDの処理プロセスの特性を持ちます。
- Supported Data Types
Spark SQL
先述した DataFrame API
をベースにしたクエリ処理を実行するためのSparkのライブラリとなります。
これは一般的なRDBのクエリ操作、DataFrame APIと同等のクエリ処理が出来ます。
-
標準SQLを利用してデータにアクセス可能
-
CSV,JSON,Parquet,Avro,ORCなどのファイルに対しての操作
-
RDBMSなどへJDBC経由でアクセス
-
Spark Streaming,MLlibなどのAPIとの連携が可能
-
SparkSession
Dataset、DataFrameを使用してSparkをプログラミングするた
めのエントリポイント(API)となります。
基本的な SparkSessionを生成するには、SparkSession.builder定義します。
pyspark.sql.SparkSession
from pyspark.sql import SparkSession
spark = SparkSession \
.builder \
.appName("Python Spark SQL basic example") \
.config("spark.some.config.option", "some-value") \
.getOrCreate()
- DataFrame
Spark SQLライブラリで定義されるクラスの1つで、構造化データを処理するためにメソッドが提供されています。
pyspark.sql.DataFrame
以下の例では、DataFrameを生成し、表示を行います。
pyspark.sql.SparkSession.createDataFrame
from pyspark.sql import SparkSession
spark = SparkSession \
.builder \
.appName("Python Spark SQL basic example") \
.config("spark.some.config.option", "some-value") \
.getOrCreate()
df= spark.createDataFrame([{"a": "1", "b": "2", "c": "3"}])
df.show()
# +---+---+---+
# | a | b | c |
# +---+---+---+
# | 1| 2| 3|
# +---+---+---+
- UDF
ユーザが定義した関数を使用して分散処理をする機能となります。
SparkSQLのみではありませんが、以下のようにDataFrame APIを事前に定義された UDF
の関数を使用して使う事も出来ます。
pyspark.sql.functions.udf
from pyspark.sql.types import IntegerType
slen = udf(lambda s: len(s), IntegerType())
@udf
def to_upper(s):
if s is not None:
return s.upper()
@udf(returnType=IntegerType())
def add_one(x):
if x is not None:
return x + 1
df = spark.createDataFrame([(1, "John Doe", 21)], ("id", "name", "age"))
df.select(slen("name").alias("slen(name)"), to_upper("name"), add_one("age")).show()
# +----------+--------------+------------+
# |slen(name)|to_upper(name)|add_one(age)|
# +----------+--------------+------------+
# | 8| JOHN DOE| 22|
# +----------+--------------+------------+
Spark Streaming
Sparkは基本バッチ処理を行うフレームワークですが、バッチ処理を連続的に実行する事で、疑似的なストリーム処理を実現します。
この方式をマイクロバッチと呼びます。
Spark Streaming機能は、バッチ処理を一定間隔で実行し続ける事でストリーム処理を実行します。
ストリームデータはデータが無限に発生し、途切れる事がありません。
ストリームデータを一定区間で区切る機能をウィンドウ処理といいます。
ストリームデータを1つのまとまったデータとして処理するためには、一定の区切りのきっかけを与えるトリガーを明示する事が必要となり、ウィンドウ機能により、1つのまとまったデータに対して集計、演算を実施します。
- Streaming Context
Spark Streamingライブラリのエントリポイントであり、Spark StreamingのアプリケーションがSparkクラスタに接続するための機能を提供するクラスです。
pyspark.streaming.StreamingContext
from pyspark.context import SparkContext
from pyspark.streaming import StreamingContext
# 1秒単位でのバッチ処理を繰り返す StreamingContext を生成
sc = SparkContext("local[2]", "NetworkWordCount")
ssc = StreamingContext(sc, 1)
- Dstream(Discretized Stream)
受信したデータを連続して処理する内容を定義するクラスです。
ライブデータをStreamingContextのstartメソッドにて開始する際に、処理内容を事前に定義することも、map、window、reduceByKeyAndWindowなどを使用して既存のDStreamを変換することによって生成することもできます。
pyspark.streaming.DStream
pyspark.streaming.StreamingContext.socketTextStream
# localhostへ9999ポートで接続するDStreamを作成します
lines = ssc.socketTextStream("localhost", 9999)
Structed Streaming
Spark SQL
の構造化データに対する処理をストリーム上で実現するAPIです。
Spark Streaming
と比較して、抽象度が高く、理解がしやすいAPIとされているようです。
Structed Streaming
の特徴は、データストリームを継続的に追加されるテーブルとして扱うことです。これにより、バッチ処理モデルと非常によく似た新しいストリーム処理モデルが作成されます。計算処理を静的テーブルのように標準のバッチのようなクエリとして表現し、Sparkはそれを無制限の入力テーブルの増分クエリとして実行します。
Structed Streaming の処理は、公式ドキュメントにある以下のプログラミングモデルが非常に分かりやすかったです。
Programming Model
The key idea in Structured Streaming is to treat a live data stream as a table that is being continuously appended. This leads to a new stream processing model that is very similar to a batch processing model. You will express your streaming computation as standard batch-like query as on a static table, and Spark runs it as an incremental query on the unbounded input table. Let’s understand this model in more detail.
Basic Concepts
Consider the input data stream as the “Input Table”. Every data item that is arriving on the stream is like a new row being appended to the Input Table.
ストリーム処理におけるウインドウ処理
ストリーム処理は、データが発生した順に到着しない性質を持ちます。
EventTime
- イベントが発生した時刻
ProcessingTime
- イベントを処理した時刻
Watermark
- Event Timeをどこまで処理が完了したかを示す概念であり、データ処理の進行度を示すメトリクスとなります
Spark Structed Streamingでは、WaterMarkの基準設定と、どこまでの遅れが許容できるかの範囲について、withWaterMarkを用いて設定を行います。
from pyspark.sql.functions import *
# eventTimeというカラムの時刻を基準にウィンドウ処理を実施
eventTimeAplicationDataFrame = exampleDataFrame.groupBy(window("eventTime", "5 minute")).count()
# processingTimeというカラムに現在時刻を設定し、時刻を基準にウィンドウ処理を実施
processingTimeAplicationDataFrame = exampleDataFrame.withColumn('processingTime','current_timestamp())
.groupBy(window("processingTime", "5 minute")).count()
# eventTimeのカラムを使用してWaterMark(許容遅延)を10分に設定
watermarkAplicationDataFrame = exampleDataFrame.withWatermark ('eventTime',"10 minute").groupBy(window("eventTime", "10 minute","5 minute")). count ()
参考文献
今回は参考文献として以下を利用させていただきました。
- Apache Spark 公式
- アプリケーションエンジニアのためのApache Spark入門
以上となります。