Posted at

AWS Glue + PySpark逆引きメモ

More than 1 year has passed since last update.

PySparkでバッチ処理をやろうとしたときに必要だったこと


操作


DataFrameでRDB的操作

調べると方法がいろいろあるのでうれしい


カラムの並び替え

dataframe.select(["col1", "col2", "col3"])


カラムのリネーム

selectExpr, withColumnRenamed, alias or as, 最後に普通にSQL使う方法


固定値追加

lit('固定値') を使う


DataFrameを一時Viewとして登録


registerTempTable メソッドを呼び出す際に RDD から Spark SQL の SchemaRDD へ暗黙変換が実施されます。

registerTempTable の引数としてテーブル名を渡す事で、SQL 内でこのテーブル名を使用できるようになります。


#...SparkSQLを実行したり、RDDを変形したりする処理…

df.registerTempTable('table_name') # 結果を一時テーブルにできる


DataFrameでファイル操作

やり方がいろいろある


取り込み

S3からの取り込み、ローカルのファイルの取り込み。どちらもSparkContextに生えているtextFileでできるようだ(便利すぎる…)。まだ正規化されてないデータの処理に使えそう。(以下もどこかのサイトで調べたコードなのだが、記録してなかったのでURLなし。)

# Loads RDD

lines = sc.textFile("s3://hoge/hoge.csv")
# Split lines into columns; change split() argument depending on deliminiter e.g. '\t'
parts = lines.map(lambda l: l.split(','))
# Convert RDD into DataFrame
df = spark.createDataFrame(parts, ['your', 'column', 'name'])
df.printSchema() # スキーマ表示
df.show() # プレビュー表示


書き出し

ファイルを書き出すには .write.save() を使えばいいのだが、複数ファイルに分かれてしまうと困る。その場合は repartition(1) , coalesece(1) という関数でファイルをまとめることができる。

df

.repartition(1)
.write.format("com.databricks.spark.csv")
.option("header", "true")
.save("mydata.csv")


RDD

データ自体を変形するときはRDDを直接いじったほうが自由度が高い。というかいったんRDDを取り出してしまえばPythonでできることはすべてできるのでアルゴリズムの組み立てで困ることはなさそうだ。正規化されてないデータを処理する際に役立ちそう。

APIドキュメント → pyspark.RDD - A Resilient Distributed Dataset (RDD), the basic abstraction in Spark.


連番

rdd_indexed = df.rdd.zipWithIndex().map(lambda x: (x[0][0],x[0][1],x[1]+1))

df = rdd_indexed.toDF(['id','score','rowNum'])
df.show()