RedshiftのデータをAWS GlueでParquetに変換してRedshift Spectrumで利用するときにハマったことや確認したことを記録しています。
前提
Parquet化してSpectrumを利用するユースケースとして以下を想定しています。
- テーブルにある、全データをParquet化した後にテーブルを削除(または、全データを洗い替えする)
-> Redshift Spectrumからのみ利用するようにする。 - テーブル内の一部データ(特定の日付以前のデータのみ)をParquet化して、テーブルからParquet済みのデータを削除する。
-> 利用頻度の高いデータはRedshiftに残しておき、利用頻度の低い過去データはRedshift Spectrumに移す。 - 新規に取り込むデータで極端にデータ量が多いことが想定されていて、Redshiftに貯めるとノード追加の頻度が非常に増えそうな場合
-> 最初からSpectrumにのみデータを保管して、必要に応じてサマリーテーブル的なものをRedshiftに作る。
GlueのCrawlerは使わない
GlueのCrawlerは便利ですが、少し利用してみて以下の点で難があるので利用していません。
回避方法があるのかもしれませんが、その辺りを調査するよりもローカルにSparkを立ててpySparkのコードを動作確認しながら書いてしまった方が早いので詳しく調査はしていません。
- Redshiftのテーブル内のデータに改行が含まれる場合はたぶんClassifierをしないと、Glue Job実行時にエラーになる※
- Crawl対象のテーブルが1テーブルのみしか指定できない。
- CrawlしたテーブルをGlue Job(DynamicFrame)で使うと、テーブルの全データをtempディレクトリにUnloadしてしまう。
※Classifierで回避できるかは確認はしていないが、Classifierを使わずRedshiftのテーブルをDynamic Frameで利用するとエラーになった。
処理の流れ
基本的には以下の流れで作業しています。
- RedshiftでUnloadしてS3に保存
- Glue JobでParquetに変換(GlueのData catalogは利用しない)
- Redshift Spectrumで利用
TIPS
1. DynamicFrameとDataFrameの変換
AWS Black Belt - AWS Glueで説明のあった通りです。
# 他のimportはGlue Job作成のものと同じため省略
from awsglue.dynamicframe import DynamicFrame
datasource0 = glueContext.create_dynamic_frame.from_catalog(database = "db", table_name = "table", redshift_tmp_dir = args["TempDir"], transformation_ctx = "datasource0")
result = datasource0.toDF() # DataFrameに変換
new_datasource0 = DynamicFrame.fromDF(result, glueContext, "new_datasource0") # Dynamic
2. 改行を含むデータの取扱
改行を含むとエラーになるのでUnload時に改行コードを置換する。
Unload時のオプションは適切なものを設定してください。
UNLOAD ('
SELECT
user_id,
ip,
replace(replace(url, \'\n\', \'
\'), \'\r\', \'
\') as url,
visit_time,
browser,
browser_ver,
os,
device
FROM db.access_log
')
TO 's3://backup/prod/db/access_log/'
IAM_ROLE 'IAMRoleを書く'
ESCAPE
ADDQUOTES
GZIP
DELIMITER AS ','
ALLOWOVERWRITE;
GlueScript側でreplaceしたものを元に戻します。
# 他のimportはGlue Job作成のものと同じため省略
from pyspark.sql.functions import *
# DynamicFrameを作成するところまでは省略
result = datasource0.toDF()
result = result.withColumn('url', regexp_replace('url', '
', '\n'))
result = result.withColumn('url', regexp_replace('url', '
', '\r'))
new_datasource0 = DynamicFrame.fromDF(result, glueContext, "new_datasource0")
3. デフォルト値の設定
Data Frameのfillnaを使ってデフォルト値(空文字)を設定していきます。
# 他のimportはGlue Job作成のものと同じため省略
from pyspark.sql.functions import *
# DynamicFrameを作成するところまでは省略
result = datasource0.toDF()
result = result.fillna({'url': ''})
result = result.fillna({'device': ''})
new_datasource0 = DynamicFrame.fromDF(result, glueContext, "new_datasource0")
4. DataFrameでのSchemaの利用
DynamicFrameを使わずに、直接DataFrameを使うことも出来ます。
# 他のimportはGlue Job作成のものと同じため省略
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark.sql import SQLContext
# Glue周りの初期化は省略
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
fields = StructType([
StructField("user_id", StringType(), False),
StructField("ip", StringType(), False),
StructField("url", StringType(), False),
StructField("visit_time", TimestampType(), False),
StructField("browser", StringType(), False),
StructField("browser_ver", StringType(), False),
StructField("os", StringType(), False),
StructField("device", StringType(), False),
])
df = spark.read.csv("s3://backup/prod/db/access_logs/", schema=fields)
5.データの保存
DynamicFrameにてParquet(snappy圧縮)で保存する。
datasink4 = glueContext.write_dynamic_frame.from_options(frame = dropnullfields3, connection_type = "s3", connection_options = {"path": "s3://data/spectrum/acces_logs_archives"}, format = "parquet", transformation_ctx = "datasink4")
DataFrameにてParquet(gzip圧縮)で保存する。
df.write.mode("overwrite").format("parquet").option("compression", "gzip").mode("overwrite").save("s3://data/spectrum/acces_logs_archives")
試した感じでは、snappyよりもgzipのほうが若干ではあるが圧縮後のデータサイズは小さくなる傾向がある。
6. パラメータの渡し方
AWS SDK(Rubyなど)から呼び出すときも、画面から設定するのと同じようにする必要があります。
resp = glue.start_job_run({job_name: name,
arguments:{"--START_DATE" => start_date.strftime("%Y-%m-%d"),
"--END_DATE" => end_date.strftime("%Y-%m-%d") }
})
job_id = resp.job_run_id
7.Pythonライブラリの利用
S3にアップロードして指定するだけです。(当然Glueからアクセス可能な設定にしておく必要はあります)
単一のファイルの場合はここに一つ指定するだけ良い。複数指定する場合は、カンマ区切り(スペースなし)で続けて指定します。※
※現状のWebUIだと一つしか指定出来ないので、text box上で複数ファイルを記述する必要があります。
上図のPython library pathに以下のように指定します。
s3:/バケット名/library/python/boto3-1.4.7.zip,s3://バケット名/library/python/requests-requests-v2.18.4-26.zip
利用可能なファイル形式はアプリケーションの提出 - アプリケーションの依存性をバンドルにある通り、.py .egg .zipになります。
以下コードの様に利用できます。
import sys
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from pyspark.sql import SQLContext
import requests
import boto3
sc = SparkContext()
glueContext = GlueContext(sc)
sqlContext = SQLContext(sc)
## @params: [IN_PATH, IN_PATH]
args = getResolvedOptions(sys.argv, ['IN_PATH', 'OUT_PATH'])
print('-------------------------')
r = requests.get('https://github.com/timeline.json')
print(r)
print('-------------------------')
もう少し確認する
Glueのログを見てみると以下のようにspark submitのオプションが設定されていることがわかります。
--py-files /tmp/PyGlue.zip --jars /opt/amazon/superjar/glue-assembly.jar
起動シーケンスを見ていると以下のようなログが出力されているので、これらのライブラリはデフォルトで入っていることがわかります。
pyGlueはおそらくaws-glue-libsのことだと思われる。
17/09/20 08:23:01 DEBUG Client: PYTHONPATH -> {{PWD}}/pyspark.zip<CPS>{{PWD}}/py4j-0.10.4-src.zip<CPS>{{PWD}}/PyGlue.zip
py4jも含まれているので、Javaのコードを実行することが出来ようになっています。
8.Java/Scalaライブラリの利用
Glueはpy4jが標準で使えるようになっているので、以下のようなコードをBuildしたものを呼び出してみます。
package sample
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
case class Person(name: String, age: Long)
object BasicFilter {
def main(args:Array[String]) {
print("main")
}
def hello() {
val spark = SparkSession.builder().getOrCreate()
import spark.implicits._
val seq = Seq(Person("hoge", 20), Person("foo", 30), Person("bar", 40))
val ds = seq.toDS()
ds.printSchema
print(ds.first)
}
}
Glue Scriptでは以下のような感じ(関連箇所のみ抜粋)で呼び出すことが出来ます。
java_import(sc._jvm, "sample.BasicFilter")
sc._jvm.BasicFilter.hello()
9.ISO8601型のデータを処理する
spark.read.jsonなどでデータを読み込む時に、TimestampType()を指定するとエラーになるのでStringTypeで読み込んだのちにwithColumnで処理します。
まずは検証用にデータを準備します。
>>> fields_sample = StructType([
StructField("id", StringType(), False),
StructField("request_time", StringType(), False),
])
>>> samples = [[1, '20170104T0850+0900']]
>>> df_sample = sqlContext.createDataFrame(sc.parallelize(samples), fields_sample)
>>> df_sample.printSchema()
root
|-- id: string (nullable = false)
|-- request_time: string (nullable = false)
動作を確認していきます。
castは意図した動作にならいので、以下のようにto_timestampを利用してStringTypeからTimestampTypeに変換します。
>>> # castではうまくいかない
>>> df_sample.withColumn("request_time", df_sample["request_time"].cast(TimestampType()))
DataFrame[id: string, request_time: timestamp]
>>> df_sample.withColumn("request_time", df_sample["request_time"].cast(TimestampType())).take(1)
[Row(id=u'1', request_time=None)]
>>> df_sample.withColumn("request_time", to_timestamp("request_time", "yyyymmdd'T'HHmm")).printSchema()
root
|-- id: string (nullable = false)
|-- request_time: timestamp (nullable = true)
>>> df_sample.withColumn("request_time", to_timestamp("request_time", "yyyymmdd'T'HHmm")).take(1)
[Row(id=u'1', request_time=datetime.datetime(2017, 1, 4, 8, 50))]
10. Parquetで出力するとDecimal型が含まれるとエラーになる
AWS GlueでDecimal型のデータを含むデータをParquetとして出力すると、Redshift Spectrumで読み込む際にエラーになります。
DataFrameでもDynamicFrameでも、どちらを利用していいても発生します。
原因はMapRのサイト書かれていることな気がします。
対応していてはwriteLegacyFormatをtrueにします。
// scala
val spark: SparkContext = new SparkContext()
val glueContext: GlueContext = new GlueContext(spark)
glueContext.setConf("spark.sql.parquet.writeLegacyFormat", "true")