PySparkについて触れる機会があったので、学んだ内容をまとめます。
Sparkとは
Apache Sparkは、大規模なデータを処理するためのオープンソースの分散処理フレームワークです。2009年にUC BerkeleyのAMPLabで開始され、その後Databricks社が開発と商用サポートを提供しています。
SparkはJava、Python、R、Scala用のAPIを提供しており、データ分析において言語の障壁なく利用できます。また、複雑な分散処理の裏側を抽象化しているため、利用者はSparkのメリットを手軽に享受できます。
PySparkとは
PySparkは、Apache SparkのPython向けAPIであり、大規模なデータセットを分散処理するためのツールです。
PySparkはSparkの機能をサポートしており、Spark SQL、DataFrames、Structured Streaming、Machine Learning (MLlib)、Spark Coreなどを利用できます。
Sparkを構成するライブラリ
-
Spark Core
Sparkの中核をなす部分で、タスクの分散伝送、スケジューリング、I/O 機能を提供します。
このコアライブラリは、Sparkの基本的な機能をサポートしています。 -
Spark SQL
関係データベースのようなSQLクエリを使用して、構造化データを操作するためのライブラリです。
データフレームとしてデータを扱い、SQLクエリを実行できます。 -
MLlib (Machine Learning Library):
機械学習アルゴリズムを提供するライブラリです。
分類、回帰、クラスタリング、次元削減などのタスクをサポートしています。 -
Spark Streaming
リアルタイムストリーミングデータを処理するためのライブラリです。
リアルタイムのデータフローをバッチ処理と同じインターフェースで扱えます。 -
GraphX
グラフデータ処理のためのライブラリです。
グラフアルゴリズムやネットワーク分析に使用されます。
SparkContext/SparkSQL/SparkSessionについて
良く使うライブラリなので、まとめておきます。
Spark ContextはApache Sparkのエントリーポイントであり、
Spark SQLはSQLクエリ処理を効率化するライブラリです。
SparkSessionはこれらのコンテキストを隠蔽し、統合的に利用できるものです。
そのため、まずSparkSessionを作成して、SparkSessionを通して、データ処理をしていく
という流れになります。
SparkSessionの役割
- データソースへの接続: ファイルシステム、Hive、JDBCなど、さまざまなデータソースにアクセスできます。
- データフレームの作成と操作: データフレームを作成し、SQLクエリやデータフレームAPIを使用してデータを操作できます。
- 設定管理: Sparkの設定や構成を管理します。
- アプリケーションのエントリーポイント: Sparkアプリケーションの開始点となります。
基本的には、1つのSparkアプリケーションに対して1つのSparkSessionを作成するのが一般的です。
Sparkの特徴
遅延評価(lazy evaluation)
遅延評価とは?
遅延評価は、プログラムの式(関数呼び出しなど)を必要になるまで実際に評価しない戦略です。
つまり、式が呼び出された時点では一旦保留し、値が必要になった時点で式を評価します。
Sparkでは、変換操作(例: マップ、フィルタ、グループ化など)は遅延評価され、アクション操作(例: collect、count、saveなど)が呼び出された時に実際の計算が行われます。
なぜ遅延評価を使うのか?
遅延評価を使用することで、最適な物理プランを識別し、データ処理を効率的に最適化できます。
アクションが呼び出されるまで、不要な計算を避けることができます。
データ処理のためのデータ構造
RDD (Resilient Distributed Dataset) と データフレーム (DataFrame) です。
RDD (Resilient Distributed Dataset)
RDDは、Sparkの誕生以来、ユーザーの目に触れる主要なAPIでした。
RDDは、不変で分散されたデータのコレクションであり、以下の特性を持っています:
- 不変性: 一度作成されたRDDは変更できません。
- 耐障害性: データの失敗に対して頑健です。失われたデータはトランスフォーメーションのログから再計算されます。
- 分散処理: クラスタ全体に分散してデータを処理します。
データフレーム (DataFrame):
データフレームは、RDDと同様に分散されたデータのコレクションですが、名前付きカラムで整理されています。
データフレームはRDBMSにおけるテーブルに似ており、このようにデータを構造化することによってSpark SQLを使ってクエリを実行できます。
データフレームは、構造化データを効率的に扱うために設計されており、開発者は高レベルの抽象化を利用できます。
RDDは柔軟で強力な操作を提供し、データフレームは構造化データを効率的に操作するための高レベルのAPIです。
一般的には、データフレームの方が構造化データを効率的に扱うために設計されており、パフォーマンスが良いです。そのため、特別な理由がない限りはデータフレームを使えばOKです。
今回は取り上げませんが、他にもキャッシュの利用などもできるとのこと。
利用時のポイント
-
細かいファイルを読み込むのは効率が悪い
一般的には、小さいサイズのファイルを大量に読み込むより、大きなサイズの1ファイルを読み込む方が、ファイルのオープンクローズのオーバーヘッドが発生せず、パフォーマンス的に優れる -
できるだけspark functionを使う
SQLで書くと、論理的な流れではない(SQLは後ろから処理されて行くので見にくい)ので、できるだけpysparkのfunctionで記載すると良い
まずは触ってみたい方向け:Docker環境構築
※Dockerはインストールされている前提とします。
jupyter/pyspark-notebookのイメージをPullして起動します
sudo docker pull jupyter/pyspark-notebook
sudo docker run --name spark_test -it -p 8888:8888 -v $PWD:/home/jovyan/work jupyter/pyspark-notebook
起動したらコンソールに表示されているURL(以下のURL)にブラウザからアクセス
http://{ローカルIPアドレス}:8888/lab/tree/work/pyspark_test.ipynb
pullしたdockerについてはこちら
ファイルを読み込んで変換して書き込んでみる
今回は読み込むのはAWS CloudFrontのログです。
CloudFrontログのフォーマットはこちら
サンプルコード
from pyspark.sql import SparkSession
from pyspark import SparkContext
from pyspark.sql import functions as F
from pyspark.sql.types import StringType, StructType, StructField
# Sparkセッションの作成
spark = SparkSession.builder.appName("Example").getOrCreate()
filename = '<読み込み先のパス>'
result_path = '<読み込み先のパス>'
# CloudFrontログのカラムを指定
schema = StructType([
StructField( 'date', StringType(), False),
StructField( 'time', StringType(), False),
StructField( 'location', StringType(), False),
StructField( 'bytes', StringType(), False),
StructField( 'requestip', StringType(), False),
StructField( 'method', StringType(), False),
StructField( 'host', StringType(), False),
StructField( 'uri', StringType(), False),
StructField( 'status', StringType(), False),
StructField( 'referrer', StringType(), False),
StructField( 'useragent', StringType(), False),
StructField( 'querystring', StringType(), False),
StructField( 'cookie', StringType(), False),
StructField( 'resulttype', StringType(), False),
StructField( 'requestid', StringType(), False),
StructField( 'hostheader', StringType(), False),
StructField( 'requestprotocol', StringType(), False),
StructField( 'requestbytes', StringType(), False),
StructField( 'timetaken', StringType(), False),
StructField( 'xforwardedfor', StringType(), False),
StructField( 'sslprotocol', StringType(), False),
StructField( 'sslcipher', StringType(), False),
StructField( 'responseresulttype', StringType(), False),
StructField( 'httpversion', StringType(), False),
StructField( 'filestatus', StringType(), False),
StructField( 'encryptedfields', StringType(), False),
StructField( 'c_port', StringType(), False),
StructField( 'time_to_first_byte', StringType(), False),
StructField( 'x_edge_detailed_result_type', StringType(), False),
StructField( 'sc_content_type', StringType(), False),
StructField( 'sc_content_len', StringType(), False),
StructField( 'sc_range_start', StringType(), False),
StructField( 'sc_range_end', StringType(), False)
])
data = spark.read.schema(schema).options(delimiter='\t').csv(filename)
# URIに[test]が含まれるものだけに絞る
# dateとtimeを結合し、datetimeカラムを作成
# 必要なカラムだけを取得
data = (
data.filter(data.uri.like('%test%'))
.withColumn('date_time', F.concat('date', F.lit(" "), 'time'))
.select('date_time', 'requestip', 'uri', 'referrer', 'useragent', 'querystring', 'cookie')
)
# データを表示する
data.show()
# ファイルに書き込み
data.write.mode("overwrite").parquet(result_path)
解説
まずはSparkSessionを作成します。
from pyspark.sql import SparkSession
from pyspark import SparkContext
from pyspark.sql import functions as F
from pyspark.sql.types import StringType, StructType, StructField
# Sparkセッションの作成
spark = SparkSession.builder.appName("Example").getOrCreate()
次にスキーマを指定して、spark.readでファイルをデータフレームに読み込みます。
CloudFrontログはTSVなので、optionsでタブ区切りを指定しています。
読み込み書き込みのAPIは以下のURLにまとまっています。
# CloudFrontログのカラムを指定
schema = StructType([
StructField( 'date', StringType(), False),
StructField( 'time', StringType(), False),
StructField( 'location', StringType(), False),
StructField( 'bytes', StringType(), False),
StructField( 'requestip', StringType(), False),
StructField( 'method', StringType(), False),
StructField( 'host', StringType(), False),
StructField( 'uri', StringType(), False),
StructField( 'status', StringType(), False),
StructField( 'referrer', StringType(), False),
StructField( 'useragent', StringType(), False),
StructField( 'querystring', StringType(), False),
StructField( 'cookie', StringType(), False),
StructField( 'resulttype', StringType(), False),
StructField( 'requestid', StringType(), False),
StructField( 'hostheader', StringType(), False),
StructField( 'requestprotocol', StringType(), False),
StructField( 'requestbytes', StringType(), False),
StructField( 'timetaken', StringType(), False),
StructField( 'xforwardedfor', StringType(), False),
StructField( 'sslprotocol', StringType(), False),
StructField( 'sslcipher', StringType(), False),
StructField( 'responseresulttype', StringType(), False),
StructField( 'httpversion', StringType(), False),
StructField( 'filestatus', StringType(), False),
StructField( 'encryptedfields', StringType(), False),
StructField( 'c_port', StringType(), False),
StructField( 'time_to_first_byte', StringType(), False),
StructField( 'x_edge_detailed_result_type', StringType(), False),
StructField( 'sc_content_type', StringType(), False),
StructField( 'sc_content_len', StringType(), False),
StructField( 'sc_range_start', StringType(), False),
StructField( 'sc_range_end', StringType(), False)
])
data = spark.read.schema(schema).options(delimiter='\t').csv(filename)
APIを使いデータを操作します。
filter:URIに[test]が含まれるものだけに絞ります。
withColumn:dateとtimeをconcatで結合し、新たなdatetimeカラムを作成しています。
select:必要なカラムをSELECTします。
他にも様々なAPIがありますが、詳しくは公式ドキュメントを参照すると一番早いと思います。
# 必要なカラムだけを取得
data = (
data.filter(data.uri.like('%test%'))
.withColumn('date_time', F.concat('date', F.lit(" "), 'time'))
.select('date_time', 'requestip', 'uri', 'referrer', 'useragent', 'querystring', 'cookie')
)
データを表示します。開発時はよく使うAPIになると思います。
上のデータ処理はここで実際に処理されます。
data.show()
アウトプットはこのようになります。(値はすべてxにしています。)
+---------+---------+---+--------+---------+-----------+------+
|date_time|requestip|uri|referrer|useragent|querystring|cookie|
+---------+---------+---+--------+---------+-----------+------+
|xxxxxxxxx|xxxxxxxxx|xxx|xxxxxxxx|xxxxxxxxx|xxxxxxxxxxx|xxxxxx|
+---------+---------+---+--------+---------+-----------+------+
最後にファイルに書き込みます。今回はparquet形式で書き込んでいます。
data.write.mode("overwrite").parquet(result_path)
今回はSparkのAPIを使ってデータを操作しましたが、SQLを使って操作したい場合は以下のようなソースコードになります。
createOrReplaceTempViewでテーブルを作成して、spark.sqlでSQLを実行します。
data.createOrReplaceTempView("data")
select_sql = '''
SELECT
concat(date, ' ', time) date_time,
requestip,
uri,
referrer,
useragent,
querystring,
cookie
FROM data
WHERE uri LIKE '%test%'
'''
data = spark.sql(select_sql)
参考資料