はじめに
現在オシゴトでSparkを導入しようとしていたので、今更感がありつつも入門記事を書こうと思っていたのですが、マネージドサービスのAWS Glueが東京リージョンに来たのでその話を書くことにしました。
やったこと
現時点でGlueはPythonのみサポートのようですが、ScalaやJavaもPy4J経由で呼び出せるようなので動かしてみました。
Scalaコード
とりあえず動くことを確認したかったので、S3から読み取ったCSVをそのまま吐き出すだけです。
面白くないです。
AssemblyでFatJarに固めた後、S3にアップロードしておきます。
注意点として、Python側で生成したSparkContext内のjvmに対して呼び出しを行うため、Scala側ではSparkContext
の生成は行いません。
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.ScalaReflection
import org.apache.spark.sql.types.StructType
case class TestCSV(
id: Int,
name: String
)
object GlueScala {
def main() {
val spark = SparkSession.builder().getOrCreate()
val ds: Dataset[TestCSV] = {
import spark.implicits._
val schema = ScalaReflection.schemaFor[TestCSV].dataType.asInstanceOf[StructType]
spark.read
.schema(schema)
.option("header", true)
.csv("s3n://path/to/input.csv").as[TestCSV]
}
ds.write.csv("s3n://path/to/output")
}
}
CSVファイルはこんな感じ。
id,name
1,a
2,b
3,c
...
GlueのETL Jobを作成する
Glueの画面から、ETL -> Jobsと進み、Add Jobを押した後のメニューでA new script to be authored by you
を選びました。
権限
以下の権限を持つIAMロールも作成/設定する必要があります。
- Glueの実行権限
- 処理対象データや、Pythonコード、Scalaで作成したJarファイルを置くS3バケットへの参照権限
- 処理結果を吐き出すためのS3書き込み権限
Jarの場所を設定
Scalaで作成したJarファイルへのパスを通してあげましょう。
Script libraries and job parameters (optional)
エリア内のDependent jars path
の欄にS3のパスを設定します。
このエリアで並列度の設定などもできるようですが今回はデフォルトのままです。
Pythonコードを書く
上記のAdd Jobの設定後出てくるConnectionの画面でOKを押すと、Pythonコードを書くためのエディタが出てきます。
Pythonでは、SparkContextの作成とScalaのメソッドを呼び出す部分だけを記述しました。
S3へアクセスするためキーを設定している個所については、IAMロールの権限を引き継げそうな気もしますが書かないと動かなかったのでとりあえず書いてしまいました。
今後試行錯誤してみます。
import sys
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from py4j.java_gateway import java_import
## @params: [JOB_NAME]
args = getResolvedOptions(sys.argv, ['JOB_NAME'])
sc = SparkContext()
sc._jsc.hadoopConfiguration().set("fs.s3n.awsAccessKeyId", "XXXXXXXXXXXXXX")
sc._jsc.hadoopConfiguration().set("fs.s3n.awsSecretAccessKey", "XXXXXXXXXXXXXX")
sc._jsc.hadoopConfiguration().set("fs.s3n.impl", "org.apache.hadoop.fs.s3native.NativeS3FileSystem")
java_import(sc._jvm, "GlueScala")
sc._jvm.GlueScala.main()
なお、上記の権限で言及しているとおり、ここで作成したPythonコードはS3上に置かれます。
動かす
あとはPythonエディタの画面の Run job
を押すだけです。
もちろんJob一覧から選択してRun job
を押してもよいです。
Scalaの ds.write.csv
で指定したS3パスにCSVファイルが吐かれることでしょう。
さいごに
というわけでGlue上でScalaコードを動かす事ができました。
実用性については試行錯誤や検証が必要かと思いますが、EMRクラスタを立てなくても動かせる気軽さがうれしいですね。