4
3

More than 1 year has passed since last update.

Apache Spark アーキテクチャ

Posted at

Apache Sparkの学習記事です。
クラウドにおけるSparkの利用は別記事を作成したいと思います。

Apache Spark のカバー領域

データ分析タスクにおいてApache Sparkはデータ処理および分析処理の領域をカバーします。

image.png

Spark プログラム

SparkのAPIは、Python, Java, Scala, R の言語が用意されています。
今回リンクでpickupする言語は python(pyspark)とします。

SparkのコンポーネントにSpark-shellと呼ばれるものが存在します、これはSparkのコマンドをコマンドラインから入力して対話的に実行内容を確認することができます。

また、Sparkのアプリケーションを起動する際、spark-submitのスクリプトを使用します。

./bin/spark-submit \
  --class <main-class> \
  --master <master-url> \
  --deploy-mode <deploy-mode> \
  --conf <key>=<value> \
  ... # other options
  <application-jar> \
  [application-arguments]

Spark Cluster

Spark はクラスターモードと呼ばれるCluster Managerを利用した構成を組む事が出来ます。クラスターモードのコンポーネントと動作は、以下のイメージです。

全体的な流れとして、spark-submitコマンドにより、Driver Programを起動した後、Cluster Managerに接続、要求したExecutorが起動して、プログラム上の処理をTasksとしてExecutorが処理を実行、計算結果を保存します。
SparkContextの停止(pyspark.SparkContext.stop()) が呼ばれると、Driver Programが終了し、Cluster Managerから取得したリソース群を開放するようなイメージです。

image.png

  • Worker Node

クラスタ内に1つ以上のWorker Nodeを構成し、Executor(処理を実行し、アプリケーションのデータを格納するプロセス)とTasks(ジョブの最小単位で実行される計算リソース)を複数持ちます。ExecutorはClusterManagerにより起動されます。

  • Cluster Manager

Executorの起動を管理します。
クラスターマネージャは以下の構成からサポートします。
StandaloneMesosYARNKubernetes

  • Driver Program

アプリケーションは Driver Programにより起動されます。
SparkContextオブジェクトがCluster Managerへ接続します。
SparkContextオブジェクトは、クラスター内のWorker Nodeで実行するtasksをexecutorへ送信し結果を回収します。
アプリケーションコードは、SparkContextから、executorに送信されます。

Spark CoreとRDDについて

Apache Sparkは、Spark Coreとその他複数のライブラリから構成されます。

image.png

Spark Core

Spark CoreはRDDの概念を使用して Spark ContextSparkConf等のライブラリ、メソッドから構成されるAPIです。

  • Spark Context

Spark ライブラリ内で定義されるクラスの一つでSparkで操作を行う際の入り口となります。Sparkクラスタへの接続を行います。先述した Spark Contextの停止はpyspark.SparkContext.stop() / sc.stop()にて行われます。

pyspark.SparkContext

pyspark.SparkContext.stop

sample1.py
from pyspark import SparkContext

#SparkContextの開始
sc = pyspark.SparkContext()

#SparkContextの停止
sc.stop()
  • SparkConf

Sparkライブラリ内で定義されるクラスの一つで、Sparkの構成を定義するために使用されます。SparkConfへ設定値を記述し、SparkContextに渡す事が出来ます。

pyspark.SparkConf

sample2.py
from pyspark import SparkContext
from pyspark import SparkConf

conf = SparkConf()
conf.setMaster("local").setAppName("My app")
sc = pyspark.SparkContext(conf=conf)

  • RDD(Resillent Distributed Dataset)

Sparkはデータの分散処理を基本機能としており、分散処理を行うためにデータを統合的に扱う必要があります。Spark では、RDD(Resillent Distributed Dataset)という抽象化された分散処理用のデータセットを扱います。

pyspark.RDD

例として SparkContext.parallelize()では、RDDを生成します。

pyspark.SparkContext.parallelize

sample3.py
from pyspark import SparkContext

#省略#

#RDD(データセット)の生成
rdd = spark_context.parallelize([
  (1, 'a'),
  (2, 'b'),
  (3, 'c'),
  (4, 'd'),
])

RDDについて

Sparkは、データの分散処理を行う事をコンセプトに設計されています。
RDDは、各ノードに分散しているデータを単一のオブジェクトとして処理するためのAPIです。
以下が特徴となります。

image.png

1.処理種別と遅延評価

RDDの処理は2つあります。

Transform - マップ、フィルター、読み取りなどの処理
Action - 処理実行

Sparkでは、Actionが実行されるまでの前処理のTransformの処理は実行されません。これを遅延評価といいます。
そして Action により実行される処理の一連をジョブと呼びます。

2.イミュータブル

RDD は処理毎に新しく生成されるため、一度生成されたデータは不変性を保証しています。つまり、DAGと呼ばれる有向非巡回性の原理に従っています。

3.フォールトトレラント
障害復旧機能として、データ変換のプロセスを記録し、障害が発生した処理ポイントから変換プロセスを実行し、データ復旧を実行します。

4.分散コレクション
クラスターモードであっても、論理的なコレクションがRDDで各サーバに分散されて処理が実行されます、内部的にはpartitionで区画されています。

データセットのAPI

Spark v2以降は、RDD,DataFrame,Datasetの3種類のデータセットのAPIを扱う事が出来ます。v2以降、扱われるAPIとして DataFrame/Datasetが使用されますが、いずれも上述したRDDの処理プロセスの特性を持ちます。

image.png
サポートされるデータタイプは以下に記載があります。

  • Supported Data Types

Spark SQL

先述した DataFrame APIをベースにしたクエリ処理を実行するためのSparkのライブラリとなります。
これは一般的なRDBのクエリ操作、DataFrame APIと同等のクエリ処理が出来ます。

  • 標準SQLを利用してデータにアクセス可能
  • CSV,JSON,Parquet,Avro,ORCなどのファイルに対しての操作
  • RDBMSなどへJDBC経由でアクセス
  • Spark Streaming,MLlibなどのAPIとの連携が可能

  • SparkSession

Dataset、DataFrameを使用してSparkをプログラミングするた
めのエントリポイント(API)となります。
基本的な SparkSessionを生成するには、SparkSession.builder定義します。

pyspark.sql.SparkSession

sample4.py
from pyspark.sql import SparkSession

spark = SparkSession \
    .builder \
    .appName("Python Spark SQL basic example") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()
  • DataFrame

Spark SQLライブラリで定義されるクラスの1つで、構造化データを処理するためにメソッドが提供されています。

pyspark.sql.DataFrame

以下の例では、DataFrameを生成し、表示を行います。

pyspark.sql.SparkSession.createDataFrame

sample5.py
from pyspark.sql import SparkSession

spark = SparkSession \
    .builder \
    .appName("Python Spark SQL basic example") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()

df= spark.createDataFrame([{"a": "1", "b": "2", "c": "3"}])

df.show()

#+---+---+---+
#| a | b | c |
#+---+---+---+
#|  1|  2|  3|
#+---+---+---+
  • UDF

ユーザが定義した関数を使用して分散処理をする機能となります。
SparkSQLのみではありませんが、以下のようにDataFrame APIを事前に定義された UDFの関数を使用して使う事も出来ます。

pyspark.sql.functions.udf

sample6.py

from pyspark.sql.types import IntegerType
slen = udf(lambda s: len(s), IntegerType())
@udf
def to_upper(s):
    if s is not None:
        return s.upper()

@udf(returnType=IntegerType())
def add_one(x):
    if x is not None:
        return x + 1

df = spark.createDataFrame([(1, "John Doe", 21)], ("id", "name", "age"))
df.select(slen("name").alias("slen(name)"), to_upper("name"), add_one("age")).show()

#+----------+--------------+------------+
#|slen(name)|to_upper(name)|add_one(age)|
#+----------+--------------+------------+
#|         8|      JOHN DOE|          22|
#+----------+--------------+------------+

Spark Streaming

Sparkは基本バッチ処理を行うフレームワークですが、バッチ処理を連続的に実行する事で、疑似的なストリーム処理を実現します。
この方式をマイクロバッチと呼びます。
Spark Streaming機能は、バッチ処理を一定間隔で実行し続ける事でストリーム処理を実行します。

ストリームデータはデータが無限に発生し、途切れる事がありません。
ストリームデータを一定区間で区切る機能をウィンドウ処理といいます。
ストリームデータを1つのまとまったデータとして処理するためには、一定の区切りのきっかけを与えるトリガーを明示する事が必要となり、ウィンドウ機能により、1つのまとまったデータに対して集計、演算を実施します。

  • Streaming Context

Spark Streamingライブラリのエントリポイントであり、Spark StreamingのアプリケーションがSparkクラスタに接続するための機能を提供するクラスです。

pyspark.streaming.StreamingContext

sample6.py
from pyspark.context import SparkContext
from pyspark.streaming import StreamingContext

#1秒単位でのバッチ処理を繰り返す StreamingContext を生成
sc = SparkContext("local[2]", "NetworkWordCount")
ssc = StreamingContext(sc, 1)

  • Dstream(Discretized Stream)

受信したデータを連続して処理する内容を定義するクラスです。
ライブデータをStreamingContextのstartメソッドにて開始する際に、処理内容を事前に定義することも、map、window、reduceByKeyAndWindowなどを使用して既存のDStreamを変換することによって生成することもできます。

pyspark.streaming.DStream

pyspark.streaming.StreamingContext.socketTextStream

sample7.py
# localhostへ9999ポートで接続するDStreamを作成します
lines = ssc.socketTextStream("localhost", 9999)

Structed Streaming

Spark SQLの構造化データに対する処理をストリーム上で実現するAPIです。
Spark Streamingと比較して、抽象度が高く、理解がしやすいAPIとされているようです。

Structed Streamingの特徴は、データストリームを継続的に追加されるテーブルとして扱うことです。これにより、バッチ処理モデルと非常によく似た新しいストリーム処理モデルが作成されます。計算処理を静的テーブルのように標準のバッチのようなクエリとして表現し、Sparkはそれを無制限の入力テーブルの増分クエリとして実行します。

Structed Streaming の処理は、公式ドキュメントにある以下のプログラミングモデルが非常に分かりやすかったです。

Programming Model
The key idea in Structured Streaming is to treat a live data stream as a table that is being continuously appended. This leads to a new stream processing model that is very similar to a batch processing model. You will express your streaming computation as standard batch-like query as on a static table, and Spark runs it as an incremental query on the unbounded input table. Let’s understand this model in more detail.

Basic Concepts
Consider the input data stream as the “Input Table”. Every data item that is arriving on the stream is like a new row being appended to the Input Table.

ストリーム処理におけるウインドウ処理

ストリーム処理は、データが発生した順に到着しない性質を持ちます。

EventTime - イベントが発生した時刻
ProcessingTime - イベントを処理した時刻
Watermark - Event Timeをどこまで処理が完了したかを示す概念であり、データ処理の進行度を示すメトリクスとなります

Spark Structed Streamingでは、WaterMarkの基準設定と、どこまでの遅れが許容できるかの範囲について、withWaterMarkを用いて設定を行います。

sample7.py

from pyspark.sql.functions import * 

# eventTimeというカラムの時刻を基準にウィンドウ処理を実施
eventTimeAplicationDataFrame = exampleDataFrame.groupBy(window("eventTime", "5 minute")).count()

# processingTimeというカラムに現在時刻を設定し、時刻を基準にウィンドウ処理を実施
processingTimeAplicationDataFrame = exampleDataFrame.withColumn('processingTime','current_timestamp())
.groupBy(window("processingTime", "5 minute")).count()

# eventTimeのカラムを使用してWaterMark(許容遅延)を10分に設定

watermarkAplicationDataFrame = exampleDataFrame.withWatermark ('eventTime',"10 minute").groupBy(window("eventTime", "10 minute","5 minute")). count ()

参考文献

今回は参考文献として以下を利用させていただきました。

  • Apache Spark 公式

  • アプリケーションエンジニアのためのApache Spark入門

以上となります。

4
3
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
4
3