筆者はpython・dataframe・glue等の事前知識がなく都度対応しているので効率的でない、間違っているやり方もあると思います。
その際はご指摘いただけると助かります。
環境構築
AWS Glueのテスト環境をローカルに構築の記事を参考に開発環境を構築
記事内では対話式だがpythonファイルを実行する際は
/aws-glue-libs/bin/gluesparksubmit /src/sample.py --JOB_NAME='dummy'
のようなコマンドを実行すればpythonファイルの実行が可能となる。
docekr run -v
コマンドでボリュームをマウントすればローカルでファイル編集が可能。
サンプルデータの作成
都度ファイルの読み込みは面倒なので配列からdataframeを作成しており、そのコードが以下
from pyspark.sql.types import StructField
from pyspark.sql.session import SparkSession
from pyspark.sql.types import StructType
from pyspark.sql.types import StringType
row_schema = StructType(
[
StructField("name", StringType(), True),
StructField("code", StringType(), True)
]
)
spark = SparkSession.builder.master("local").appName("sample").config("spark.some.config.option", "some-value").getOrCreate()
values = [
("あいうえお", "4")
]
df = spark.createDataFrame(values, row_schema)
カラム追加
df = df.withColumn('column_name', 'value')
正規表現で置換
df = df.withColumn('name', F.regexp_replace('goagle', 'go+gle', 'o'))
正規表現で文字列を抽出して置換
df = df.withColumn("name", F.regexp_extract('1234foo', '^\d{3,4}', 0))
特定の条件時に先頭0埋めでカラムを追加
df = df.withColumn("new_name", F.when(F.col("name").rlike("^\d{3,4}"), lpad(F.regexp_extract(F.col('name'), '^\d{3,4}', 0), 6, '0')))
カラムのリネーム
df.withColumnRenamed('name', 'new_name')
カラム削除
df = df.drop('name')
結合
left_df = left_df.join(right_df, left_df.name == right_df.name, 'left_outer')
left_outer
のところにはinner、cross、outer、full、left、left_outer、right、right_outer、left_semi、left_antiのどれかが入る
改行文字で行を分割(その他のレコードは同じ値を入れる)
row_schema = StructType(
[
StructField("name", StringType(), True),
StructField("code", StringType(), True)
]
)
spark = SparkSession.builder.master("local").appName("sample").config("spark.some.config.option", "some-value").getOrCreate()
values = [
("\nあいう\nえお", "4")
]
df = spark.createDataFrame(values, row_schema)
df = df.withColumn('name', F.split(F.col('name'), '\n')) # ameを配列化
df = df.select(F.explode(F.col('name')).alias('name'), 'code') # 分割処理
結果
+---------+-------+
|name |code |
+---------+-------+
|あいう |4 |
|えお |4 |
+---------+-------+
改行文字で行を分割し行数のカラムを追加(その他のレコードは同じ値を入れ)
row_schema = StructType(
[
StructField("name", StringType(), True),
StructField("code", StringType(), True)
]
)
spark = SparkSession.builder.master("local").appName("sample").config("spark.some.config.option", "some-value").getOrCreate()
values = [
("\nあいう\nえお", "4")
]
df = spark.createDataFrame(values, row_schema)
df = df.withColumn('name', F.split(F.col('name'), '\n')) # nameを配列化
df = df.select(F.posexplode(F.col('name')).alias('index', 'name'), 'code') # 分割処理
結果
+-----+---------+-------+
|index|name |code |
+-----+---------+-------+
|0 |あいう |4 |
|1 |えお |4 |
+-----+---------+-------+
メモ
Athenaのviewテーブルは参照できない
An error occurred while calling o59.getCatalogSource.
: java.lang.Error: No classification or connection in db_name.table_name
Athenaのviewテーブルを参照しようとするとエラーメッセージが表示された。
2020年8月現在viewテーブルは参照できないようです。