記事の目的
AWS Glueを評価するにあたり、まずはVSCodeでPythonコードを書き、すぐに実行できるWindows開発環境を構築するのが目的。想像以上にApache SparkでS3を利用するためのライブラリ依存関係を特定しきるのに苦戦したためメモを残すことにした。
AWS Glueのマネコンでスクリプト編集してジョブを実行できるが、コード編集の効率が悪く、テストのフィードバックサイクルの観点でかなり効率が悪く、従量課金が発生するため避けたい。
また、Glueで利用が推奨されているDynamicFrameとSparkのDataFrameの違い目を見極める上でも手元の純粋Spark環境とAWS Glue環境は両方あったほうがよい。
インストール
ダウンロード先
から最新版をダウンロードして解凍する。
本記事では2024/2/8時点の最新版(spark-3.5.0-bin-hadoop3)を利用。
以降はすべて、C:\spark-3.4.2-bin-hadoop3にコピーされた前提としている。
本記事ではバッチ処理結果をS3に格納するため下記のような環境変数を利用している。
C:\spark-3.4.2-bin-hadoop3\setEnv.bat
set HADOOP_HOME=C:\spark-3.4.2-bin-hadoop3
set JAVA_HOME=C:\jdk-17.0.9
set PATH=%JAVA_HOME%\bin;%HADOOP_HOME%\bin;%PATH%
set hadoop.home.dir=C:\spark-3.4.2-bin-hadoop3
set ACCESS_KEY=XXXXXXX(For S3)
set SECRET_KEY=YYYYYYY(For S3)
Windows環境固有の手順
https://github.com/cdarlint/winutils/tree/master/hadoop-3.3.5/bin
からwinutils.exe,hadoop.dllをダウンロードし、HADOOP_HOME\binの下に配置する。
S3利用環境固有の手順
C:\spark-3.4.2-bin-hadoop3\conf\spark-defaults.conf
に下記二行を追加。
それぞれのjarファイルはmavenrリポジトリからjarをダウンロードしてC:\spark-3.4.2-bin-hadoop3\jarsへコピー。クラスの依存関係を整えるのに苦労したためこの記事を書くモチベーションとなっている。
spark.hadoop.fs.s3a.impl org.apache.hadoop.fs.s3a.S3AFileSystem
spark.driver.extraClassPath C:\spark-3.4.2-bin-hadoop3\jars\hadoop-aws-3.3.5.jar;C:\spark-3.4.2-bin-hadoop3\jars\aws-java-sdk-core-1.12.654.jar;C:\spark-3.4.2-bin-hadoop3\jars\aws-java-sdk-s3-1.12.654.jar;C:\spark-3.4.2-bin-hadoop3\jars\delta-spark_2.13-3.1.0.jar;C:\spark-3.4.2-bin-hadoop3\jars\delta-core_2.13-2.4.0.jar;C:\spark-3.4.2-bin-hadoop3\jars\delta-hive_2.13-3.1.0.jar;C:\spark-3.4.2-bin-hadoop3\jars\delta-contribs_2.13-3.1.0.jar;C:\spark-3.4.2-bin-hadoop3\jars\aws-java-sdk-bundle-1.12.316.jar;C:\spark-3.4.2-bin-hadoop3\jars\hadoop-common-3.3.5.jar
ソースコード(Test6.py)は下記の通り。CSVファイルを読み込み、Spark標準の関数や自前UDFを織り交ぜてデータ変換処理を実行するプログラム。結果はS3へ保存。
ms932を指定している箇所はWindows環境ユニーク処理、それを除けばLinux環境でも同じコードが問題なく動く。
import os
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf
from pyspark.sql.types import *
from pyspark.sql import functions as F
spark=SparkSession.builder.config("spark.driver.host", "localhost").appName('appname').getOrCreate()
spark._jsc.hadoopConfiguration().set("mapreduce.fileoutputcommitter.algorithm.version", "2")
spark._jsc.hadoopConfiguration().set("spark.hadoop.mapreduce.fileoutputcommitter.cleanup-failures.ignored", "true")
spark._jsc.hadoopConfiguration().set("fs.s3a.access.key", os.environ['ACCESS_KEY'])
spark._jsc.hadoopConfiguration().set("fs.s3a.secret.key", os.environ['SECRET_KEY'])
spark._jsc.hadoopConfiguration().set("fs.s3a.endpoint", "s3.amazonaws.com")
spark._jsc.hadoopConfiguration().set("fs.s3a.fast.upload", "true")
spark._jsc.hadoopConfiguration().set("fs.s3a.fast.upload.buffer", "bytebuffer")
#spark._jsc.hadoopConfiguration().set("fs.s3a.fast.upload.buffer", "array")
def dateconvert(text):
if text == "99999999":
return "9999-12-31"
elif text == "null":
return "0001-01-01"
elif text is None:
return "0001-01-01"
else:
return text[0:4] + '-' + text[4:6] + '-' + text[6:8]
schema = StructType([
StructField("_c0", StringType(), False),
StructField("_c1", StringType(), False),
StructField("_c2", StringType(), False),
StructField("_c3", StringType(), False),
StructField("_c4", StringType(), False),
StructField("_c5", StringType(), False),
StructField("_c6", StringType(), False)
])
csv_encoding = "ms932" # for Windows開発環境
csv_path="C:\Athena_POC\data.csv"
df = spark.read.format('csv').schema(schema).option('encoding', csv_encoding).options(header='False', inferSchema='False').load('C:\Athena_POC\data.csv')
udf_dataconvert = udf(dateconvert, StringType())
df = df.withColumn("_c0", F.trim("_c0"))
df = df.withColumn("_c1", udf_dataconvert(df['_c1']))
df = df.withColumn("_c2", udf_dataconvert(df['_c2']))
df = df.withColumn("_c3", udf_dataconvert(df['_c3']))
df = df.withColumn("_c5", F.lpad("_c5", 7, "0"))
df = df.withColumn("_c6", F.lpad("_c6", 7, "0"))
df = df.withColumn("_c6", F.concat( "_c5", "_c6"))
# write into single file
#df.coalesce(1).write.format('csv').option("compression", "gzip").mode('overwrite').save('C:\spark-3.4.2-bin-hadoop3\converted')
df.coalesce(1).write.format('csv').option("compression", "gzip").mode('overwrite').csv('s3a://xxxxxx/test7_converted/')
df.show(10)
spark.stop()
テストデータの生成プログラムは
CreateData.java
と同じものを利用した。ソース上のC:\Athena_POC\data.csvはAmazon Athena検証で利用したものと全く同じものを利用。
実行
C:\spark-3.4.2-bin-hadoop3>bin\spark-submit.cmd --packages org.apache.hadoop:hadoop-aws:3.3.5 Test6.py
処理の一番最後でShutdownHookManagerのファイル掃除系処理でExceptionがthrowされるがSpark本体のジョブは問題なく実行される。ShutdownManager関連のExceptionはWindows環境ならではのように見える。実際にLinux環境では同じプログラムが同じSparkバージョン環境でノーエラーとなる。
Sparkジョブの実行状況については付属のツールで見ることができる。
最後に
参考にした情報の大半はDataBricks社の方が書いたものだった。ググるとよくヒットする。
DynamicFrameはどう使うべきか?、場合によってはDataFrameを使うべきか?
AWS Glue環境で性能を出すためには何をすべきかを調査し、また記事を投稿する予定。