LoginSignup
1
0

More than 3 years have passed since last update.

【PySpark】dataframe操作サンプルコード集

Last updated at Posted at 2020-10-08

筆者はpython・dataframe・glue等の事前知識がなく都度対応しているので効率的でない、間違っているやり方もあると思います。
その際はご指摘いただけると助かります。

環境構築

AWS Glueのテスト環境をローカルに構築の記事を参考に開発環境を構築
記事内では対話式だがpythonファイルを実行する際は
/aws-glue-libs/bin/gluesparksubmit /src/sample.py --JOB_NAME='dummy'
のようなコマンドを実行すればpythonファイルの実行が可能となる。

docekr run -v
コマンドでボリュームをマウントすればローカルでファイル編集が可能。

サンプルデータの作成

都度ファイルの読み込みは面倒なので配列からdataframeを作成しており、そのコードが以下

from pyspark.sql.types import StructField
from pyspark.sql.session import SparkSession
from pyspark.sql.types import StructType
from pyspark.sql.types import StringType

row_schema = StructType(
    [
        StructField("name", StringType(), True),
        StructField("code", StringType(), True)
     ]
)
spark = SparkSession.builder.master("local").appName("sample").config("spark.some.config.option", "some-value").getOrCreate()
values = [
    ("あいうえお", "4")
]
df = spark.createDataFrame(values, row_schema)

カラム追加

df = df.withColumn('column_name',  'value')

正規表現で置換

df = df.withColumn('name',  F.regexp_replace('goagle', 'go+gle', 'o'))

正規表現で文字列を抽出して置換

df = df.withColumn("name", F.regexp_extract('1234foo', '^\d{3,4}', 0))

特定の条件時に先頭0埋めでカラムを追加

df = df.withColumn("new_name", F.when(F.col("name").rlike("^\d{3,4}"), lpad(F.regexp_extract(F.col('name'), '^\d{3,4}', 0), 6, '0')))

カラムのリネーム

df.withColumnRenamed('name', 'new_name')

カラム削除

df = df.drop('name')

結合

left_df = left_df.join(right_df, left_df.name == right_df.name, 'left_outer')

left_outerのところにはinner、cross、outer、full、left、left_outer、right、right_outer、left_semi、left_antiのどれかが入る

改行文字で行を分割(その他のレコードは同じ値を入れる)

row_schema = StructType(
    [
        StructField("name", StringType(), True),
        StructField("code", StringType(), True)
     ]
)
spark = SparkSession.builder.master("local").appName("sample").config("spark.some.config.option", "some-value").getOrCreate()
values = [
    ("\nあいう\nえお", "4")
]
df = spark.createDataFrame(values, row_schema)
df = df.withColumn('name', F.split(F.col('name'), '\n')) # ameを配列化
df = df.select(F.explode(F.col('name')).alias('name'), 'code') # 分割処理

結果

+---------+-------+
|name     |code   |
+---------+-------+
|あいう   |4       |
|えお     |4      |
+---------+-------+

改行文字で行を分割し行数のカラムを追加(その他のレコードは同じ値を入れ)

row_schema = StructType(
    [
        StructField("name", StringType(), True),
        StructField("code", StringType(), True)
     ]
)
spark = SparkSession.builder.master("local").appName("sample").config("spark.some.config.option", "some-value").getOrCreate()
values = [
    ("\nあいう\nえお", "4")
]
df = spark.createDataFrame(values, row_schema)
df = df.withColumn('name', F.split(F.col('name'), '\n')) # nameを配列化
df = df.select(F.posexplode(F.col('name')).alias('index', 'name'), 'code') # 分割処理

結果

+-----+---------+-------+
|index|name     |code   |
+-----+---------+-------+
|0    |あいう   |4      |
|1    |えお     |4      |
+-----+---------+-------+

メモ

Athenaのviewテーブルは参照できない

An error occurred while calling o59.getCatalogSource.
: java.lang.Error: No classification or connection in db_name.table_name

Athenaのviewテーブルを参照しようとするとエラーメッセージが表示された。
2020年8月現在viewテーブルは参照できないようです。

1
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
1
0