0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

PySpark の概要

Last updated at Posted at 2024-08-13

PySparkについて触れる機会があったので、学んだ内容をまとめます。

Sparkとは

Apache Sparkは、大規模なデータを処理するためのオープンソースの分散処理フレームワークです。2009年にUC BerkeleyのAMPLabで開始され、その後Databricks社が開発と商用サポートを提供しています。
SparkはJava、Python、R、Scala用のAPIを提供しており、データ分析において言語の障壁なく利用できます。また、複雑な分散処理の裏側を抽象化しているため、利用者はSparkのメリットを手軽に享受できます。

PySparkとは

PySparkは、Apache SparkのPython向けAPIであり、大規模なデータセットを分散処理するためのツールです。
PySparkはSparkの機能をサポートしており、Spark SQL、DataFrames、Structured Streaming、Machine Learning (MLlib)、Spark Coreなどを利用できます。

Sparkを構成するライブラリ

  • Spark Core
    Sparkの中核をなす部分で、タスクの分散伝送、スケジューリング、I/O 機能を提供します。
    このコアライブラリは、Sparkの基本的な機能をサポートしています。

  • Spark SQL
    関係データベースのようなSQLクエリを使用して、構造化データを操作するためのライブラリです。
    データフレームとしてデータを扱い、SQLクエリを実行できます。

  • MLlib (Machine Learning Library):
    機械学習アルゴリズムを提供するライブラリです。
    分類、回帰、クラスタリング、次元削減などのタスクをサポートしています。

  • Spark Streaming
    リアルタイムストリーミングデータを処理するためのライブラリです。
    リアルタイムのデータフローをバッチ処理と同じインターフェースで扱えます。

  • GraphX
    グラフデータ処理のためのライブラリです。
    グラフアルゴリズムやネットワーク分析に使用されます。

SparkContext/SparkSQL/SparkSessionについて

良く使うライブラリなので、まとめておきます。
Spark ContextはApache Sparkのエントリーポイントであり、
Spark SQLはSQLクエリ処理を効率化するライブラリです。
SparkSessionはこれらのコンテキストを隠蔽し、統合的に利用できるものです。

そのため、まずSparkSessionを作成して、SparkSessionを通して、データ処理をしていく
という流れになります。

SparkSessionの役割

  • データソースへの接続: ファイルシステム、Hive、JDBCなど、さまざまなデータソースにアクセスできます。
  • データフレームの作成と操作: データフレームを作成し、SQLクエリやデータフレームAPIを使用してデータを操作できます。
  • 設定管理: Sparkの設定や構成を管理します。
  • アプリケーションのエントリーポイント: Sparkアプリケーションの開始点となります。

基本的には、1つのSparkアプリケーションに対して1つのSparkSessionを作成するのが一般的です。

Sparkの特徴

遅延評価(lazy evaluation)

遅延評価とは?

遅延評価は、プログラムの式(関数呼び出しなど)を必要になるまで実際に評価しない戦略です。
つまり、式が呼び出された時点では一旦保留し、値が必要になった時点で式を評価します。
Sparkでは、変換操作(例: マップ、フィルタ、グループ化など)は遅延評価され、アクション操作(例: collect、count、saveなど)が呼び出された時に実際の計算が行われます。

なぜ遅延評価を使うのか?

遅延評価を使用することで、最適な物理プランを識別し、データ処理を効率的に最適化できます。
アクションが呼び出されるまで、不要な計算を避けることができます。

データ処理のためのデータ構造

RDD (Resilient Distributed Dataset) と データフレーム (DataFrame) です。

RDD (Resilient Distributed Dataset)

RDDは、Sparkの誕生以来、ユーザーの目に触れる主要なAPIでした。
RDDは、不変で分散されたデータのコレクションであり、以下の特性を持っています:

  • 不変性: 一度作成されたRDDは変更できません。
  • 耐障害性: データの失敗に対して頑健です。失われたデータはトランスフォーメーションのログから再計算されます。
  • 分散処理: クラスタ全体に分散してデータを処理します。

データフレーム (DataFrame):

データフレームは、RDDと同様に分散されたデータのコレクションですが、名前付きカラムで整理されています。
データフレームはRDBMSにおけるテーブルに似ており、このようにデータを構造化することによってSpark SQLを使ってクエリを実行できます。
データフレームは、構造化データを効率的に扱うために設計されており、開発者は高レベルの抽象化を利用できます。

RDDは柔軟で強力な操作を提供し、データフレームは構造化データを効率的に操作するための高レベルのAPIです。
一般的には、データフレームの方が構造化データを効率的に扱うために設計されており、パフォーマンスが良いです。そのため、特別な理由がない限りはデータフレームを使えばOKです。

今回は取り上げませんが、他にもキャッシュの利用などもできるとのこと。

利用時のポイント

  • 細かいファイルを読み込むのは効率が悪い
    一般的には、小さいサイズのファイルを大量に読み込むより、大きなサイズの1ファイルを読み込む方が、ファイルのオープンクローズのオーバーヘッドが発生せず、パフォーマンス的に優れる

  • できるだけspark functionを使う
    SQLで書くと、論理的な流れではない(SQLは後ろから処理されて行くので見にくい)ので、できるだけpysparkのfunctionで記載すると良い

まずは触ってみたい方向け:Docker環境構築

※Dockerはインストールされている前提とします。
jupyter/pyspark-notebookのイメージをPullして起動します

sudo docker pull jupyter/pyspark-notebook
sudo docker run --name spark_test -it -p 8888:8888 -v $PWD:/home/jovyan/work jupyter/pyspark-notebook

起動したらコンソールに表示されているURL(以下のURL)にブラウザからアクセス

http://{ローカルIPアドレス}:8888/lab/tree/work/pyspark_test.ipynb

pullしたdockerについてはこちら

ファイルを読み込んで変換して書き込んでみる

今回は読み込むのはAWS CloudFrontのログです。
CloudFrontログのフォーマットはこちら

サンプルコード

from pyspark.sql import SparkSession
from pyspark import SparkContext
from pyspark.sql import functions as F
from pyspark.sql.types import StringType, StructType, StructField

# Sparkセッションの作成
spark = SparkSession.builder.appName("Example").getOrCreate()

filename = '<読み込み先のパス>'
result_path = '<読み込み先のパス>'

# CloudFrontログのカラムを指定
schema = StructType([
    StructField( 'date', StringType(), False),
    StructField( 'time', StringType(), False),
    StructField( 'location', StringType(), False),
    StructField( 'bytes', StringType(), False),
    StructField( 'requestip', StringType(), False),
    StructField( 'method', StringType(), False),
    StructField( 'host', StringType(), False),
    StructField( 'uri', StringType(), False),
    StructField( 'status', StringType(), False),
    StructField( 'referrer', StringType(), False),
    StructField( 'useragent', StringType(), False),
    StructField( 'querystring', StringType(), False),
    StructField( 'cookie', StringType(), False),
    StructField( 'resulttype', StringType(), False),
    StructField( 'requestid', StringType(), False),
    StructField( 'hostheader', StringType(), False),
    StructField( 'requestprotocol', StringType(), False),
    StructField( 'requestbytes', StringType(), False),
    StructField( 'timetaken', StringType(), False),
    StructField( 'xforwardedfor', StringType(), False),
    StructField( 'sslprotocol', StringType(), False),
    StructField( 'sslcipher', StringType(), False),
    StructField( 'responseresulttype', StringType(), False),
    StructField( 'httpversion', StringType(), False),
    StructField( 'filestatus', StringType(), False),
    StructField( 'encryptedfields', StringType(), False),
    StructField( 'c_port', StringType(), False),
    StructField( 'time_to_first_byte', StringType(), False),
    StructField( 'x_edge_detailed_result_type', StringType(), False),
    StructField( 'sc_content_type', StringType(), False),
    StructField( 'sc_content_len', StringType(), False),
    StructField( 'sc_range_start', StringType(), False),
    StructField( 'sc_range_end', StringType(), False)
])

data = spark.read.schema(schema).options(delimiter='\t').csv(filename)

# URIに[test]が含まれるものだけに絞る
# dateとtimeを結合し、datetimeカラムを作成
# 必要なカラムだけを取得
data = (
    data.filter(data.uri.like('%test%'))
        .withColumn('date_time', F.concat('date', F.lit(" "), 'time'))
        .select('date_time', 'requestip', 'uri', 'referrer', 'useragent', 'querystring', 'cookie')
)

# データを表示する
data.show()
                
# ファイルに書き込み
data.write.mode("overwrite").parquet(result_path)

解説

まずはSparkSessionを作成します。

from pyspark.sql import SparkSession
from pyspark import SparkContext
from pyspark.sql import functions as F
from pyspark.sql.types import StringType, StructType, StructField

# Sparkセッションの作成
spark = SparkSession.builder.appName("Example").getOrCreate()

次にスキーマを指定して、spark.readでファイルをデータフレームに読み込みます。
CloudFrontログはTSVなので、optionsでタブ区切りを指定しています。
読み込み書き込みのAPIは以下のURLにまとまっています。

# CloudFrontログのカラムを指定
schema = StructType([
    StructField( 'date', StringType(), False),
    StructField( 'time', StringType(), False),
    StructField( 'location', StringType(), False),
    StructField( 'bytes', StringType(), False),
    StructField( 'requestip', StringType(), False),
    StructField( 'method', StringType(), False),
    StructField( 'host', StringType(), False),
    StructField( 'uri', StringType(), False),
    StructField( 'status', StringType(), False),
    StructField( 'referrer', StringType(), False),
    StructField( 'useragent', StringType(), False),
    StructField( 'querystring', StringType(), False),
    StructField( 'cookie', StringType(), False),
    StructField( 'resulttype', StringType(), False),
    StructField( 'requestid', StringType(), False),
    StructField( 'hostheader', StringType(), False),
    StructField( 'requestprotocol', StringType(), False),
    StructField( 'requestbytes', StringType(), False),
    StructField( 'timetaken', StringType(), False),
    StructField( 'xforwardedfor', StringType(), False),
    StructField( 'sslprotocol', StringType(), False),
    StructField( 'sslcipher', StringType(), False),
    StructField( 'responseresulttype', StringType(), False),
    StructField( 'httpversion', StringType(), False),
    StructField( 'filestatus', StringType(), False),
    StructField( 'encryptedfields', StringType(), False),
    StructField( 'c_port', StringType(), False),
    StructField( 'time_to_first_byte', StringType(), False),
    StructField( 'x_edge_detailed_result_type', StringType(), False),
    StructField( 'sc_content_type', StringType(), False),
    StructField( 'sc_content_len', StringType(), False),
    StructField( 'sc_range_start', StringType(), False),
    StructField( 'sc_range_end', StringType(), False)
])
data = spark.read.schema(schema).options(delimiter='\t').csv(filename)

APIを使いデータを操作します。
filter:URIに[test]が含まれるものだけに絞ります。
withColumn:dateとtimeをconcatで結合し、新たなdatetimeカラムを作成しています。
select:必要なカラムをSELECTします。

他にも様々なAPIがありますが、詳しくは公式ドキュメントを参照すると一番早いと思います。


# 必要なカラムだけを取得
data = (
    data.filter(data.uri.like('%test%'))
        .withColumn('date_time', F.concat('date', F.lit(" "), 'time'))
        .select('date_time', 'requestip', 'uri', 'referrer', 'useragent', 'querystring', 'cookie')
)

データを表示します。開発時はよく使うAPIになると思います。
上のデータ処理はここで実際に処理されます。

data.show()

アウトプットはこのようになります。(値はすべてxにしています。)
+---------+---------+---+--------+---------+-----------+------+
|date_time|requestip|uri|referrer|useragent|querystring|cookie|
+---------+---------+---+--------+---------+-----------+------+
|xxxxxxxxx|xxxxxxxxx|xxx|xxxxxxxx|xxxxxxxxx|xxxxxxxxxxx|xxxxxx|
+---------+---------+---+--------+---------+-----------+------+

最後にファイルに書き込みます。今回はparquet形式で書き込んでいます。

data.write.mode("overwrite").parquet(result_path)

今回はSparkのAPIを使ってデータを操作しましたが、SQLを使って操作したい場合は以下のようなソースコードになります。
createOrReplaceTempViewでテーブルを作成して、spark.sqlでSQLを実行します。

data.createOrReplaceTempView("data")
select_sql = '''
SELECT 
    concat(date, ' ', time) date_time,
    requestip,
    uri,
    referrer,
    useragent,
    querystring,
    cookie
FROM data
WHERE uri LIKE '%test%'
'''
data = spark.sql(select_sql)

参考資料

0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?