この記事はfreee データに関わる人たち Advent Calendar 2019の11日目です。
シンプルにAWS Glueで RDB(MySQLとか)から巨大なテーブルデータを取り出すときの話です。
tl;dr
Glueを使ってMySQLなどRDSから億単位のデータを引っこ抜くときは、Glueの並列取り込み機能を使わず、sparkの機能を使おう
やりたいこと & 問題
- RDB(MySQL)の一つのテーブルが 1億件以上データを持っている
- そのままGlueで取り込むと遅い -> なんかGlueで並列読み込みする機能があるらしい
- Glueの並列読み込み機能試したけど、クソ遅い
- Spark自身の機能を使った -> めちゃ早くできた
Glue と Sparkの関係
用語・語弊を生みそうなので、最初に整理
Glue は Managed Sparkと言い換えられます。Sparkを使いやすくしたもの。よってベースは普通のSpark
そこにGlue独自の実装をかぶせてある。このエントリでは
- Glue = (DynamicFrame dyf)
- Spark = (DataFrame df)
とかき分けます。Glueは dyfもdfも両方つかえます。 dyf は dfの拡張版です。
Glueの並列取り込み機能
Glue(dyf)は並列取り込みの機能を持っています。具体的には
https://docs.aws.amazon.com/ja_jp/glue/latest/dg/run-jdbc-parallel-read-job.html
hashfield
に int/bigintなど数値型の主キーとなるカラムを指定
hashpartitions
に N分割するという数値を指定します
この機能のいいところは、glue の jobとして上記パラメータを与えることはもちろん、glue データカタログのテーブルパラメータで指定することもできます。データが太ってきたら、分割数をブラウザポチポチで増やせばいい。
最高!と思ってましたが、実際試してみるとダメダメだった。
Glueの並列取り込みが遅い原因
どうやって分割しているかは、実際にRDBに打ち込むSQLを捕まえればすぐわかります。mysqlサーバ側で show processlist
とかで一旦どんなSQLを叩いているのかを見ると、、、(glueの人のエントリ)
上記例では、 hashfield
に 数値型以外のカラムを指定しているのでMD5とかゴニョって数値型にしていますが、数値型のカラムを指定してればこの処理はなくなります。が、問題は 数値 % 7 = 0
の演算です。7分割する場合だと 7で割って余りを出してます。たしかに分割はできますが
select * from hoge where id % 7 = 0;
select * from hoge where id % 7 = 1;
select * from hoge where id % 7 = 2;
...
select * from hoge where id % 7 = 6;
というSQLが走ることになります。つまり対象テーブルのレコード数が 1億件あれば、
1億レコードのidの値を7で割るという演算が7回も走ることになります。どう考えても遅い=RDB側がしんどいよね。
解決方法 = Sparkの機能
Spark自身が持っている並列取り込み機能
Glueではどうにかならんのか。。
と思ってたら、 Sparkで普通にできるで!
なネタを見つけた。
https://medium.com/@radek.strnad/tips-for-using-jdbc-in-apache-spark-sql-396ea7b2e3d3
見た感じ アホな割り算もやらないし、これでええんとちゃう?と思ったら、普通に行けた。
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from pyspark.sql import SQLContext
args = getResolvedOptions(sys.argv, ['JOB_NAME'])
sc = SparkContext()
sqlContext = SQLContext(sc)
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
DB_HOSTNAME="mysql.example.com"
DB_DATABASE="hogedb"
DB_USERNAME="user"
DB_PASSWORD="hogefuga"
df = sqlContext.read.format("jdbc") \
.option("url", "jdbc:mysql://{0}:3306/{1}".format(DB_HOSTNAME, DB_DATABASE])) \
.option("driver", "com.mysql.jdbc.Driver") \
.option("user", DB_USERNAME) \
.option("password", DB_PASSWORD) \
.option("dbtable", "huge_table") \
.option("numPartitions", 100) \
.option("partitionColumn", "id") \ #数値型のカラム、 whereがかかる & ユニークである必要があるので、実質主キー
.option("lowerBound", 1) \
.option("upperBound", 100000000) \ # select max(id) とかで最大値を調べておく必要ある
.load()
1億レコードを100に分割するので 100万レコードに分割して走るとなります。
SELECT * FROM huge_table WHERE owner_id >= 1 and owner_id < 1000000
SELECT * FROM huge_table WHERE owner_id >= 1000000 and owner_id < 2000000
SELECT * FROM huge_table WHERE owner_id >= 2000000 and owner_id < 3000000
...
SELECT * FROM huge_table WHERE owner_id >= 99000000 and owner_id < 100000001
分割数の考え方
Sparkに本来備わっている機能を使えば、巨大なテーブルでも十分なパフォーマンスを得られることはわかりましたが、うまく使うにはもう少し工夫が必要です。
まず、GlueはWorkerTypeと Workerの数を調整することができます。
- WorkerType は EC2のインスタンスタイプのように1WorkerあたりのCPU/メモリに影響します
- Worker数 は文字通りです、どれだけWorkerが動くかです
分割数は、Workerの数を上回っても問題ないです。キューイングされます。では Worker数も分割数もものすごい高い値にすればいいのか?というとそれは間違いです。データソースをRDBとしているので、**Glue(Sparkで)全力で殴るとすぐにRDB側のリソースがサチります。**よって
Worker数をそこそこに押さえて、RDB側の負荷をコントロールし、かつWorkerTypeに合致する分割数を決める
という工夫が必要です。ここらへんはインフラ屋の腕のみせどころです。
まとめ・雑感
データソースRDB(RDS含む)の場合は Glueのウィザードがソレっぽいコードをdyfで自動生成してくれるが、それで対応できるのはせいぜい1000万件レコード以下です。それ以上の場合はdfでsparkの機能でやるとよい。
dyfのメリットとしては、Glueのウィザードで一応コード一切書けなくてもできるが、コード少しは書ける人だったら、データソースがRDBの場合dyfを使うメリット全く無いのでdfだけで書いてよい。