3
3

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 1 year has passed since last update.

Windows環境でPySparkを使おう[修正版 2022/1/24]

Last updated at Posted at 2022-01-10

(2022.1.24修正)
SparkをWindowsを動作させるためのwinutilsの関係で訂正を行いました。
SparkをWindows環境で動作させるためにcdarlint/winutilssteveloughran/winutilsが良く参照されていますが、このバイナリはWindows11環境で動作しなかったので、winutilsを再構築し、GitHubで公開で公開しました。
その手順を説明します。


0.PySparkとは?

Apache Sparkは分散処理などを活用して、大容量データを分析するためのライブラリです。それをPythonのインターフェースから利用できるようにしたのがPySparkです。

さまざまなデータソースに適応でき、柔軟なデータ処理ができますので、まさにデータ処理を統一する際などに最適です。

用語 内容
Apache Spark 巨大なデータを高速に処理できる
PySpark Apache SparkをPythonから使うことができるようにしたライブラリ

以下では主にWindows環境でのPySparkの利用をしてみます。

1.Apache Spark

1.1. Apache Sparkの環境構築

winutilsのREADME.mdに従って環境を構築してください。

1.2.動作確認

以上ができたら、コマンドラインからspark-shellを起動してみます。

>spark-shell

2.pyspark

続いてPython環境でSparkを使えるようにしていきましょう。

2.1. pysparkのインストール

ご利用のPython環境で、pysparkを追加します。

pip install pyspark

2.2.環境変数の設定2

PYSPARK_PYTHON

利用するPython環境を環境変数PYSPARK_PYTHONに設定します。

set PYSPARK_PYTHON=C:\xxxx\bin\python.exe

2.3.動作確認

PySparkを起動してみましょう。

> pyspark

起動できたのであれば、プログラムを作っていきましょう。

3. PySparkを試す

基本的にはSparkContextオブジェクトを作り、データを操作します。ここでは二つのデータ形式について操作してみましょう。

3.1. RDDを使ってみる

RDDとはRDD(Resilient Distributed Dataset)のことです。Apache Sparkのプログラミングでは、基本的にRDDにデータを保持して操作することが基本です。

PySparkではSparkContextオブジェクトを作成し、そこからさまざまなデータ処理を行います。

import pyspark
from pyspark import SparkContext

# SparkContextの作成
sc = SparkContext(appName='spark_sample')

# RDDを作成する。
rdd = sc.parallelize([
    (1, 'Foo'),
    (2, 'Bar'),
    (3, 'Baz'),
])

# RDDにかけるFilter関数(2以上の要素を取り出す)
def filter_func(x):
    n,s = x
    return n >= 2

rdd = rdd.filter(filter_func)
# 結果の表示
print(rdd.collect())

RDDにはfilter/mapなど関数型プログラミングで馴染みのメソッドが準備されていますので、自在な処理が可能です。

3.2. DataFrameを使ってみる

DataFrameはデータベースのようなカラムを持ったテーブル構造です。RDDにデータのスキーマを与えて作成することができます。これによりSQLライクの検索も可能になります。

続いてJSONをDataFrame化してデータ処理を行ってみましょう。

sample.json

sample.json
{"name":"Alice","year":20}
{"name":"Bob","year":25}

さきほどはSparkContextをエントリとして使いましたが、_データセットやデータフレームのAPIを使用する際のエントリポイントは、SparkSessionです。 SparkSessionは、builderを使用して以下のように作成します。

spark = SparkSession.builder \
    .master("local") \
    .appName("AppName") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()

SparkSession作成し、データを処理してみます。JSONから取り込んだ場合、自動的にスキーマも推測されていますので、以下では複数行からなるJSONデータを取り込んでデータフレームを作成し、検索を行ってみます。

pythonコード

import pyspark
from pyspark.sql import SparkSession

# SparkSessionの作成
spark = SparkSession.builder \
    .master("local") \
    .appName("JSON SQL") \
    .getOrCreate()

# DataFrameの作成
df = spark.read.json('sample.json')

# JSONを読み込んだのでスキーマを確認
df.printSchema()

# SparkSessionにテーブルとして登録
df.registerTempTable('people')

# Spark SQLによる検索
selected = spark.sql('SELECT * FROM people WHERE name==\"Alice\"')
selected.show()

さまざまなDBへの接続

さまざまなデータベースに接続する場合、connectorをconfigに与えることで接続できます。たとえば、さまざまなデータベースの接続が、Spark Packageとして提供されています。

RDBMS(MySQL)

まずはRDBMSであるMySQLに接続してみましょう。MySQL用のJDBCドライバのJARファイルを用意しておく必要があります。

以下から、JARファイルをダウンロードして、適当な位置にJARファイルを設置します。
https://jar-download.com/artifacts/mysql/mysql-connector-java

こちらのJARファイルをSparkSessionで適用する必要があります。

from pyspark.sql import SparkSession

spark = SparkSession.builder.config("spark.jars", "...jars\\mysql-connector-java-8.0.27.jar") \
.master("local").appName("PySpark_MySQL_test").getOrCreate()

df = spark.read.format("jdbc").option("url", "jdbc:mysql://localhost:3306/my_schema") \
.option("driver", "com.mysql.cj.jdbc.Driver").option("dbtable", "shema_name") \
.option("user", "xxxx").option("password", "xxxx").load()

df.show()

MongoDB(NoSQL)

NoSQLであるMongoDBについても同様にアクセスできます。MongoDBのコネクタはSpark Packageで提供されていますので以下のようにアクセスできます。

MongoDBは以下のようにSparkSessionを作成できます。

from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("myApp") \
.config("spark.mongodb.input.uri", "mongodb://localhost:27017/db_name.collection_name") \
.config("spark.mongodb.output.uri", "mongodb://localhost:27017/db_name.collection_name") \
.config('spark.jars.packages', 'org.mongodb.spark:mongo-spark-connector_2.12:3.0.1') \
.getOrCreate()

df = spark.read.format("com.mongodb.spark.sql.DefaultSource").load()
df.show()
3
3
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
3
3

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?