比較的に特殊な状況について紹介します。
話題は2つ:
- pyspark経由でどうやってscala製のjarを動かすのか
- glueのpysparkスクリプトではどうやってs3のパスを変数として渡して、glue外の機能を使ってS3に出力させるのか
pyspark経由でscala製のjarを動かす
この人のブログを見てください。
Spark - Calling Scala code from PySpark
要点だけつまんで云うと
手順
- pysparkやspark-shellの実行時に
--jar
でjarファイルを指定していれば、該当クラスがsc._jvm
に入る -
sc._jvm.MainClassName.MethodName(*args)
でjarライブラリー内のメソッドを呼べる
注意点
自分はここらへんの素人なので、元ブログを読むことをおすすめします。
-
Py4J
経由でやっているので、java経由でやり取りしている。その為、一部そのまま対応しないクラスが存在する。- python側でsparkのオブジェクトを変数として渡したい時に、一旦javaのオブジェクトにunwrapしてから渡そう
- scala側でインタフェースを用意して、受け取ったjavaのオブジェクトをscalaの該当ものにwrapしてからジョブを実行しよう
- jarに
main()
を定義しているのならこの方法で呼べない可能性がある?- 何パターンかを試してたのですが、基本的にエラー終了しているので。純粋にコールの仕方が違ってたかもしれません。
DynamicFrameを使わずにS3に出力する
仕事上は元々
S3入力パス ----(Data Pipeline + EMR + jar in scala)----> S3出力パス
的なことをやってました。
とりあえず同じjarファイルを使って、glueで同じことをしたいですね。
jarファイルの指定
Dependent jars pathにそのままjarファイルを指定しました。
スクリプトファイル
使ったものはこういう感じです。
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
## @params: [JOB_NAME]
args = getResolvedOptions(sys.argv, ['JOB_NAME'])
sc = SparkContext()
glueContext = GlueContext(sc)
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
sc._jsc.hadoopConfiguration().set("mapred.output.committer.class", "org.apache.hadoop.mapred.FileOutputCommitter")
sc._jvm.MainClassName.MethodName("S3 input path", "S3 output path")
job.commit()
注意点
sc._jsc.hadoopConfiguration().set("mapred.output.committer.class", "org.apache.hadoop.mapred.FileOutputCommitter")
↑これですけど、ここを参照しました。
そうしないとClass org.apache.hadoop.mapred.DirectOutputCommitter not found
というエラーでジョブが失敗するためです。
本当はscalaのソースを書き換えて、そのままdataframeを返してくれれば、↓でDynamicFrameに変換することができて、DynamicFrameの仕様に乗ってすんなりS3に出力できるはずです。
new_df = DynamicFrame.fromDF(result_data_frame, glueContext, "new_df")