glue
Pyspark

glueのpysparkスクリプトでscala製のjarを動かす

比較的に特殊な状況について紹介します。
話題は2つ:

  • pyspark経由でどうやってscala製のjarを動かすのか
  • glueのpysparkスクリプトではどうやってs3のパスを変数として渡して、glue外の機能を使ってS3に出力させるのか

pyspark経由でscala製のjarを動かす

この人のブログを見てください。
Spark - Calling Scala code from PySpark

要点だけつまんで云うと

手順

  • pysparkやspark-shellの実行時に--jarでjarファイルを指定していれば、該当クラスがsc._jvmに入る
  • sc._jvm.MainClassName.MethodName(*args)でjarライブラリー内のメソッドを呼べる

注意点

自分はここらへんの素人なので、元ブログを読むことをおすすめします。

  • Py4J経由でやっているので、java経由でやり取りしている。その為、一部そのまま対応しないクラスが存在する。
    • python側でsparkのオブジェクトを変数として渡したい時に、一旦javaのオブジェクトにunwrapしてから渡そう
    • scala側でインタフェースを用意して、受け取ったjavaのオブジェクトをscalaの該当ものにwrapしてからジョブを実行しよう
  • jarにmain()を定義しているのならこの方法で呼べない可能性がある?
    • 何パターンかを試してたのですが、基本的にエラー終了しているので。純粋にコールの仕方が違ってたかもしれません。

DynamicFrameを使わずにS3に出力する

仕事上は元々

S3入力パス ----(Data Pipeline + EMR + jar in scala)----> S3出力パス
的なことをやってました。

とりあえず同じjarファイルを使って、glueで同じことをしたいですね。

jarファイルの指定

image.png

Dependent jars pathにそのままjarファイルを指定しました。

スクリプトファイル

使ったものはこういう感じです。

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job

## @params: [JOB_NAME]
args = getResolvedOptions(sys.argv, ['JOB_NAME'])

sc = SparkContext()
glueContext = GlueContext(sc)
job = Job(glueContext)
job.init(args['JOB_NAME'], args)

sc._jsc.hadoopConfiguration().set("mapred.output.committer.class", "org.apache.hadoop.mapred.FileOutputCommitter")

sc._jvm.MainClassName.MethodName("S3 input path", "S3 output path")

job.commit()

注意点

sc._jsc.hadoopConfiguration().set("mapred.output.committer.class", "org.apache.hadoop.mapred.FileOutputCommitter")

↑これですけど、ここを参照しました

そうしないとClass org.apache.hadoop.mapred.DirectOutputCommitter not foundというエラーでジョブが失敗するためです。

本当はscalaのソースを書き換えて、そのままdataframeを返してくれれば、↓でDynamicFrameに変換することができて、DynamicFrameの仕様に乗ってすんなりS3に出力できるはずです。

new_df = DynamicFrame.fromDF(result_data_frame, glueContext, "new_df")