LoginSignup
11

More than 3 years have passed since last update.

posted at

updated at

Organization

AWS Glue で 億超えレコードなテーブルからETLする

この記事はfreee データに関わる人たち Advent Calendar 2019の11日目です。
シンプルにAWS Glueで RDB(MySQLとか)から巨大なテーブルデータを取り出すときの話です。

eyecatch_aws-glue_1200x630.jpeg

tl;dr

Glueを使ってMySQLなどRDSから億単位のデータを引っこ抜くときは、Glueの並列取り込み機能を使わず、sparkの機能を使おう

やりたいこと & 問題

  • RDB(MySQL)の一つのテーブルが 1億件以上データを持っている
  • そのままGlueで取り込むと遅い -> なんかGlueで並列読み込みする機能があるらしい
  • Glueの並列読み込み機能試したけど、クソ遅い
  • Spark自身の機能を使った -> めちゃ早くできた

Glue と Sparkの関係

用語・語弊を生みそうなので、最初に整理
Glue は Managed Sparkと言い換えられます。Sparkを使いやすくしたもの。よってベースは普通のSpark
そこにGlue独自の実装をかぶせてある。このエントリでは
- Glue = (DynamicFrame dyf)
- Spark = (DataFrame df)
とかき分けます。Glueは dyfもdfも両方つかえます。 dyf は dfの拡張版です。

Glueの並列取り込み機能

Glue(dyf)は並列取り込みの機能を持っています。具体的には
https://docs.aws.amazon.com/ja_jp/glue/latest/dg/run-jdbc-parallel-read-job.html
hashfield に int/bigintなど数値型の主キーとなるカラムを指定
hashpartitions に N分割するという数値を指定します

この機能のいいところは、glue の jobとして上記パラメータを与えることはもちろん、glue データカタログのテーブルパラメータで指定することもできます。データが太ってきたら、分割数をブラウザポチポチで増やせばいい。

最高!と思ってましたが、実際試してみるとダメダメだった。

Glueの並列取り込みが遅い原因

どうやって分割しているかは、実際にRDBに打ち込むSQLを捕まえればすぐわかります。mysqlサーバ側で show processlist とかで一旦どんなSQLを叩いているのかを見ると、、、(glueの人のエントリ)

上記例では、 hashfield に 数値型以外のカラムを指定しているのでMD5とかゴニョって数値型にしていますが、数値型のカラムを指定してればこの処理はなくなります。が、問題は 数値 % 7 = 0 の演算です。7分割する場合だと 7で割って余りを出してます。たしかに分割はできますが

select * from hoge where id % 7 = 0;
select * from hoge where id % 7 = 1;
select * from hoge where id % 7 = 2;
...
select * from hoge where id % 7 = 6;

というSQLが走ることになります。つまり対象テーブルのレコード数が 1億件あれば、
1億レコードのidの値を7で割るという演算が7回も走ることになります。どう考えても遅い=RDB側がしんどいよね。

解決方法 = Sparkの機能

Spark自身が持っている並列取り込み機能

Glueではどうにかならんのか。。
と思ってたら、 Sparkで普通にできるで! なネタを見つけた。
https://medium.com/@radek.strnad/tips-for-using-jdbc-in-apache-spark-sql-396ea7b2e3d3

見た感じ アホな割り算もやらないし、これでええんとちゃう?と思ったら、普通に行けた。

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from pyspark.sql import SQLContext

args = getResolvedOptions(sys.argv, ['JOB_NAME'])
sc = SparkContext()
sqlContext = SQLContext(sc)
glueContext = GlueContext(sc)

spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)

DB_HOSTNAME="mysql.example.com"
DB_DATABASE="hogedb"
DB_USERNAME="user"
DB_PASSWORD="hogefuga"

df = sqlContext.read.format("jdbc") \
    .option("url", "jdbc:mysql://{0}:3306/{1}".format(DB_HOSTNAME, DB_DATABASE])) \
    .option("driver", "com.mysql.jdbc.Driver") \
    .option("user", DB_USERNAME) \
    .option("password", DB_PASSWORD) \
    .option("dbtable", "huge_table") \
    .option("numPartitions", 100) \
    .option("partitionColumn", "id") \  #数値型のカラム、 whereがかかる & ユニークである必要があるので、実質主キー
    .option("lowerBound", 1) \ 
    .option("upperBound", 100000000) \ # select max(id) とかで最大値を調べておく必要ある
    .load()

1億レコードを100に分割するので 100万レコードに分割して走るとなります。

SELECT * FROM huge_table WHERE owner_id >= 1 and owner_id < 1000000
SELECT * FROM huge_table WHERE owner_id >= 1000000 and owner_id < 2000000
SELECT * FROM huge_table WHERE owner_id >= 2000000 and owner_id < 3000000
...
SELECT * FROM huge_table WHERE owner_id >= 99000000 and owner_id < 100000001

分割数の考え方

Sparkに本来備わっている機能を使えば、巨大なテーブルでも十分なパフォーマンスを得られることはわかりましたが、うまく使うにはもう少し工夫が必要です。
まず、GlueはWorkerTypeと Workerの数を調整することができます。

  • WorkerType は EC2のインスタンスタイプのように1WorkerあたりのCPU/メモリに影響します
  • Worker数 は文字通りです、どれだけWorkerが動くかです

分割数は、Workerの数を上回っても問題ないです。キューイングされます。では Worker数も分割数もものすごい高い値にすればいいのか?というとそれは間違いです。データソースをRDBとしているので、Glue(Sparkで)全力で殴るとすぐにRDB側のリソースがサチります。よって
Worker数をそこそこに押さえて、RDB側の負荷をコントロールし、かつWorkerTypeに合致する分割数を決める
という工夫が必要です。ここらへんはインフラ屋の腕のみせどころです。

まとめ・雑感

データソースRDB(RDS含む)の場合は Glueのウィザードがソレっぽいコードをdyfで自動生成してくれるが、それで対応できるのはせいぜい1000万件レコード以下です。それ以上の場合はdfでsparkの機能でやるとよい。
dyfのメリットとしては、Glueのウィザードで一応コード一切書けなくてもできるが、コード少しは書ける人だったら、データソースがRDBの場合dyfを使うメリット全く無いのでdfだけで書いてよい。

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
What you can do with signing up
11