(2022.1.24修正)
SparkをWindowsを動作させるためのwinutilsの関係で訂正を行いました。
SparkをWindows環境で動作させるためにcdarlint/winutils、steveloughran/winutilsが良く参照されていますが、このバイナリはWindows11環境で動作しなかったので、winutilsを再構築し、GitHubで公開で公開しました。
その手順を説明します。
0.PySparkとは?
Apache Sparkは分散処理などを活用して、大容量データを分析するためのライブラリです。それをPythonのインターフェースから利用できるようにしたのがPySparkです。
さまざまなデータソースに適応でき、柔軟なデータ処理ができますので、まさにデータ処理を統一する際などに最適です。
用語 | 内容 |
---|---|
Apache Spark | 巨大なデータを高速に処理できる |
PySpark | Apache SparkをPythonから使うことができるようにしたライブラリ |
以下では主にWindows環境でのPySparkの利用をしてみます。
1.Apache Spark
1.1. Apache Sparkの環境構築
winutilsのREADME.mdに従って環境を構築してください。
1.2.動作確認
以上ができたら、コマンドラインからspark-shellを起動してみます。
>spark-shell
2.pyspark
続いてPython環境でSparkを使えるようにしていきましょう。
2.1. pysparkのインストール
ご利用のPython環境で、pysparkを追加します。
pip install pyspark
2.2.環境変数の設定2
PYSPARK_PYTHON
利用するPython環境を環境変数PYSPARK_PYTHONに設定します。
set PYSPARK_PYTHON=C:\xxxx\bin\python.exe
2.3.動作確認
PySparkを起動してみましょう。
> pyspark
起動できたのであれば、プログラムを作っていきましょう。
3. PySparkを試す
基本的にはSparkContextオブジェクトを作り、データを操作します。ここでは二つのデータ形式について操作してみましょう。
3.1. RDDを使ってみる
RDDとはRDD(Resilient Distributed Dataset)のことです。Apache Sparkのプログラミングでは、基本的にRDDにデータを保持して操作することが基本です。
PySparkではSparkContextオブジェクトを作成し、そこからさまざまなデータ処理を行います。
import pyspark
from pyspark import SparkContext
# SparkContextの作成
sc = SparkContext(appName='spark_sample')
# RDDを作成する。
rdd = sc.parallelize([
(1, 'Foo'),
(2, 'Bar'),
(3, 'Baz'),
])
# RDDにかけるFilter関数(2以上の要素を取り出す)
def filter_func(x):
n,s = x
return n >= 2
rdd = rdd.filter(filter_func)
# 結果の表示
print(rdd.collect())
RDDにはfilter/mapなど関数型プログラミングで馴染みのメソッドが準備されていますので、自在な処理が可能です。
3.2. DataFrameを使ってみる
DataFrameはデータベースのようなカラムを持ったテーブル構造です。RDDにデータのスキーマを与えて作成することができます。これによりSQLライクの検索も可能になります。
続いてJSONをDataFrame化してデータ処理を行ってみましょう。
sample.json
{"name":"Alice","year":20}
{"name":"Bob","year":25}
さきほどはSparkContextをエントリとして使いましたが、_データセットやデータフレームのAPIを使用する際のエントリポイントは、SparkSessionです。 SparkSessionは、builderを使用して以下のように作成します。
spark = SparkSession.builder \
.master("local") \
.appName("AppName") \
.config("spark.some.config.option", "some-value") \
.getOrCreate()
SparkSession作成し、データを処理してみます。JSONから取り込んだ場合、自動的にスキーマも推測されていますので、以下では複数行からなるJSONデータを取り込んでデータフレームを作成し、検索を行ってみます。
pythonコード
import pyspark
from pyspark.sql import SparkSession
# SparkSessionの作成
spark = SparkSession.builder \
.master("local") \
.appName("JSON SQL") \
.getOrCreate()
# DataFrameの作成
df = spark.read.json('sample.json')
# JSONを読み込んだのでスキーマを確認
df.printSchema()
# SparkSessionにテーブルとして登録
df.registerTempTable('people')
# Spark SQLによる検索
selected = spark.sql('SELECT * FROM people WHERE name==\"Alice\"')
selected.show()
さまざまなDBへの接続
さまざまなデータベースに接続する場合、connectorをconfigに与えることで接続できます。たとえば、さまざまなデータベースの接続が、Spark Packageとして提供されています。
RDBMS(MySQL)
まずはRDBMSであるMySQLに接続してみましょう。MySQL用のJDBCドライバのJARファイルを用意しておく必要があります。
以下から、JARファイルをダウンロードして、適当な位置にJARファイルを設置します。
https://jar-download.com/artifacts/mysql/mysql-connector-java
こちらのJARファイルをSparkSessionで適用する必要があります。
from pyspark.sql import SparkSession
spark = SparkSession.builder.config("spark.jars", "...jars\\mysql-connector-java-8.0.27.jar") \
.master("local").appName("PySpark_MySQL_test").getOrCreate()
df = spark.read.format("jdbc").option("url", "jdbc:mysql://localhost:3306/my_schema") \
.option("driver", "com.mysql.cj.jdbc.Driver").option("dbtable", "shema_name") \
.option("user", "xxxx").option("password", "xxxx").load()
df.show()
MongoDB(NoSQL)
NoSQLであるMongoDBについても同様にアクセスできます。MongoDBのコネクタはSpark Packageで提供されていますので以下のようにアクセスできます。
MongoDBは以下のようにSparkSessionを作成できます。
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("myApp") \
.config("spark.mongodb.input.uri", "mongodb://localhost:27017/db_name.collection_name") \
.config("spark.mongodb.output.uri", "mongodb://localhost:27017/db_name.collection_name") \
.config('spark.jars.packages', 'org.mongodb.spark:mongo-spark-connector_2.12:3.0.1') \
.getOrCreate()
df = spark.read.format("com.mongodb.spark.sql.DefaultSource").load()
df.show()