10
8

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 5 years have passed since last update.

Glueの使い方的な㊱(RDBからの読み取りを並列化する)

Last updated at Posted at 2019-03-10

GlueでRDSから読み取りはデフォでは1つのExecutorで実行され、もしメモリ超過する場合は並列化しよう

今回実施する構成図は以下のような感じになります。RedshiftをRDSに置き換えて見てください。

GlueからRDSのデータを読み込みparquetにしてS3に出力します。

glue_redshift-Final.png

全体の流れ

  • 前準備
  • Glueジョブ作成(デフォルト)
  • ログ確認
  • Glueジョブ作成(並列化)
  • ログ確認
  • 出力確認

### 前準備

自己参照セキュリティグループなどの作り方や意味合いはこちらを参照ください。
GlueのConnectionの作成もこちらをご参照ください
https://qiita.com/pioho07/items/05c912333e88788a1391
https://qiita.com/pioho07/items/3a07cf6dccb8dfe046ff

RDSを作成し、テーブル、スキーマ、データインポート。作業は割愛します。クエリの確認のためRDSのパラメータグループでgeneral_logを1にしておく。
インポートデータは以下

  • カラム情報
    deviceid,uuid,appid,country,year,month,day,hour

※RDSはGlueの使い方的な㉟と同じものを利用

cvlog
iphone,11111,001,JP,2017,12,14,12
android,11112,001,FR,2017,12,14,14
iphone,11113,009,FR,2017,12,16,21
iphone,11114,007,AUS,2017,12,17,18
other,11115,005,JP,2017,12,29,15
iphone,11116,001,JP,2017,12,15,11
pc,11118,001,FR,2017,12,01,01
pc,11117,009,FR,2017,12,02,18
iphone,11119,007,AUS,2017,11,21,14
other,11110,005,JP,2017,11,29,15
iphone,11121,001,JP,2017,11,11,12
android,11122,001,FR,2017,11,30,20
iphone,11123,009,FR,2017,11,14,14
iphone,11124,007,AUS,2017,12,17,14
iphone,11125,005,JP,2017,11,29,15
iphone,11126,001,JP,2017,12,19,08
android,11127,001,FR,2017,12,19,14
iphone,11128,009,FR,2017,12,09,04
iphone,11129,007,AUS,2017,11,30,14

RDS側でselectして確認

MySQL [db]> select * from cvlog;
+----------+-------+-------+---------+------+-------+------+------+
| deviceid | uuid  | appid | country | year | month | day  | hour |
+----------+-------+-------+---------+------+-------+------+------+
| iphon    | 11111 |     1 | JP      | 2017 |    12 |   14 |   12 |
| andro    | 11112 |     1 | FR      | 2017 |    12 |   14 |   14 |
| iphon    | 11113 |     9 | FR      | 2017 |    12 |   16 |   21 |
| iphon    | 11114 |     7 | AUS     | 2017 |    12 |   17 |   18 |
| other    | 11115 |     5 | JP      | 2017 |    12 |   29 |   15 |
| iphon    | 11116 |     1 | JP      | 2017 |    12 |   15 |   11 |
| pc       | 11118 |     1 | FR      | 2017 |    12 |    1 |    1 |
| pc       | 11117 |     9 | FR      | 2017 |    12 |    2 |   18 |
| iphon    | 11119 |     7 | AUS     | 2017 |    11 |   21 |   14 |
| other    | 11110 |     5 | JP      | 2017 |    11 |   29 |   15 |
| iphon    | 11121 |     1 | JP      | 2017 |    11 |   11 |   12 |
| andro    | 11122 |     1 | FR      | 2017 |    11 |   30 |   20 |
| iphon    | 11123 |     9 | FR      | 2017 |    11 |   14 |   14 |
| iphon    | 11124 |     7 | AUS     | 2017 |    12 |   17 |   14 |
| iphon    | 11125 |     5 | JP      | 2017 |    11 |   29 |   15 |
| iphon    | 11126 |     1 | JP      | 2017 |    12 |   19 |    8 |
| andro    | 11127 |     1 | FR      | 2017 |    12 |   19 |   14 |
| iphon    | 11128 |     9 | FR      | 2017 |    12 |    9 |    4 |
| iphon    | 11129 |     7 | AUS     | 2017 |    11 |   30 |   14 |
+----------+-------+-------+---------+------+-------+------+------+
MySQL [db]> show create table cvlog\G
*************************** 1. row ***************************
       Table: cvlog
Create Table: CREATE TABLE `cvlog` (
  `deviceid` varchar(5) DEFAULT NULL,
  `uuid` int(11) DEFAULT NULL,
  `appid` int(11) DEFAULT NULL,
  `country` varchar(5) DEFAULT NULL,
  `year` int(11) DEFAULT NULL,
  `month` int(11) DEFAULT NULL,
  `day` int(11) DEFAULT NULL,
  `hour` int(11) DEFAULT NULL
) ENGINE=InnoDB DEFAULT CHARSET=latin1

### Glueジョブ作成

以下のコードでジョブを作成し実行します。
RDS->S3

se2_job21
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job

## @params: [JOB_NAME]
args = getResolvedOptions(sys.argv, ['JOB_NAME'])

sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)

datasource0 = glueContext.create_dynamic_frame.from_catalog(database = "se2", table_name = "se2_36_db_cvlog", transformation_ctx = "datasource0" )

applymapping1 = ApplyMapping.apply(frame = datasource0, mappings = [("country", "string", "country", "string"), ("month", "int", "month", "int"), ("hour", "int", "hour", "int"), ("year", "int", "year", "int"), ("appid", "int", "appid", "int"), ("deviceid", "string", "deviceid", "string"), ("uuid", "int", "uuid", "int"), ("day", "int", "day", "int")], transformation_ctx = "applymapping1")

resolvechoice2 = ResolveChoice.apply(frame = applymapping1, choice = "make_struct", transformation_ctx = "resolvechoice2")

dropnullfields3 = DropNullFields.apply(frame = resolvechoice2, transformation_ctx = "dropnullfields3")

datasink4 = glueContext.write_dynamic_frame.from_options(frame = dropnullfields3, connection_type = "s3", connection_options = {"path": "s3://test-glue00/se2/out17"}, format = "parquet", transformation_ctx = "datasink4")
job.commit()

ログ確認

MySQL側のクエリログを確認すると、Glueから全件SELECTしている

MySQL [(none)]> select event_time, argument from mysql.general_log order by event_time desc \G

*************************** 32. row ***************************
event_time: 2019-03-10 08:57:02
  argument: SELECT `deviceid`,`uuid`,`appid`,`country`,`year`,`month`,`day`,`hour` FROM cvlog

Glueジョブ確認(並列化)

DynamicFrameReaderクラスのfrom_catalogのAdditional optionで、hashfieldを使う。

additional_options – AWS Glue に指定する追加のオプション。並列読み込みを実行する ​​JDBC 接続を使用するには、hashfield、hashexpression、または hashpartitions オプションを設定できます。

今回はhashefieldキーに"country"を指定。(均等に分散しているキーが望ましい)
修正は、以下のオプションを追加したのみ

additional_options = {"hashfield": "country"} )
  • hashfield:文字列など数値以外の、指定したカラムをキーにして、ハッシュ化した値をwhereのキーにする
  • hashexpression:数値の、指定したカラムをキーにして、ハッシュ化した値をwhereのキーにする
  • hashpartitions:JDBC テーブルの並列読み込みの数を hashpartitions​ に設定します。このプロパティが設定されていない場合、デフォルト値は7

※末尾に公式ドキュメントのリンクあり

se2_job21
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job

## @params: [JOB_NAME]
args = getResolvedOptions(sys.argv, ['JOB_NAME'])

sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)

datasource0 = glueContext.create_dynamic_frame.from_catalog(database = "se2", table_name = "se2_36_db_cvlog", transformation_ctx = "datasource0", additional_options = {"hashfield": "country"} )

applymapping1 = ApplyMapping.apply(frame = datasource0, mappings = [("country", "string", "country", "string"), ("month", "int", "month", "int"), ("hour", "int", "hour", "int"), ("year", "int", "year", "int"), ("appid", "int", "appid", "int"), ("deviceid", "string", "deviceid", "string"), ("uuid", "int", "uuid", "int"), ("day", "int", "day", "int")], transformation_ctx = "applymapping1")

resolvechoice2 = ResolveChoice.apply(frame = applymapping1, choice = "make_struct", transformation_ctx = "resolvechoice2")

dropnullfields3 = DropNullFields.apply(frame = resolvechoice2, transformation_ctx = "dropnullfields3")

datasink4 = glueContext.write_dynamic_frame.from_options(frame = dropnullfields3, connection_type = "s3", connection_options = {"path": "s3://test-glue00/se2/out17"}, format = "parquet", transformation_ctx = "datasink4")
job.commit()

本オプションはGlueのテーブルプロパティからの適用できる。DynamicFrameでなくても使えるということになる。

スクリーンショット 0031-03-10 21.42.30.png

ログ確認

MySQL側のクエリログを確認すると、Glueからハッシュ化されたクエリを実行している(並列化されている)

MySQL [(none)]> select event_time, argument from mysql.general_log order by event_time desc \G


*************************** 116. row ***************************
event_time: 2019-03-10 12:53:11
  argument: SELECT `deviceid`,`uuid`,`appid`,`country`,`year`,`month`,`day`,`hour` FROM (select * from cvlog WHERE CONV(SUBSTRING(MD5(CONCAT('',country)), -8, 8), 16, 10) % 7 = 2) as cvlog
*************************** 117. row ***************************
event_time: 2019-03-10 12:53:11
  argument: SELECT `deviceid`,`uuid`,`appid`,`country`,`year`,`month`,`day`,`hour` FROM (select * from cvlog WHERE CONV(SUBSTRING(MD5(CONCAT('',country)), -8, 8), 16, 10) % 7 = 3) as cvlog
*************************** 118. row ***************************
event_time: 2019-03-10 12:53:11
  argument: SELECT `deviceid`,`uuid`,`appid`,`country`,`year`,`month`,`day`,`hour` FROM (select * from cvlog WHERE CONV(SUBSTRING(MD5(CONCAT('',country)), -8, 8), 16, 10) % 7 = 1) as cvlog
*************************** 119. row ***************************
event_time: 2019-03-10 12:53:11
  argument: SELECT `deviceid`,`uuid`,`appid`,`country`,`year`,`month`,`day`,`hour` FROM (select * from cvlog WHERE CONV(SUBSTRING(MD5(CONCAT('',country)), -8, 8), 16, 10) % 7 = 0) as cvlog
*************************** 139. row ***************************
event_time: 2019-03-10 12:53:11
  argument: SELECT `deviceid`,`uuid`,`appid`,`country`,`year`,`month`,`day`,`hour` FROM (select * from cvlog WHERE CONV(SUBSTRING(MD5(CONCAT('',country)), -8, 8), 16, 10) % 7 = 4) as cvlog
*************************** 143. row ***************************
event_time: 2019-03-10 12:53:11
  argument: SELECT `deviceid`,`uuid`,`appid`,`country`,`year`,`month`,`day`,`hour` FROM (select * from cvlog WHERE CONV(SUBSTRING(MD5(CONCAT('',country)), -8, 8), 16, 10) % 7 = 6) as cvlog
*************************** 155. row ***************************
event_time: 2019-03-10 12:53:11
  argument: SELECT `deviceid`,`uuid`,`appid`,`country`,`year`,`month`,`day`,`hour` FROM (select * from cvlog WHERE CONV(SUBSTRING(MD5(CONCAT('',country)), -8, 8), 16, 10) % 7 = 5) as cvlog
*************************** 165. row ***************************
event_time: 2019-03-10 12:53:07
  argument: SELECT * FROM (select * from cvlog WHERE CONV(SUBSTRING(MD5(CONCAT('',country)), -8, 8), 16, 10) % 7 = 1) as cvlog WHERE 1=0
*************************** 166. row ***************************
event_time: 2019-03-10 12:53:07
  argument: SELECT * FROM (select * from cvlog WHERE CONV(SUBSTRING(MD5(CONCAT('',country)), -8, 8), 16, 10) % 7 = 1) as cvlog WHERE 1=0
*************************** 176. row ***************************
event_time: 2019-03-10 12:53:07
  argument: SELECT * FROM (select * from cvlog WHERE CONV(SUBSTRING(MD5(CONCAT('',country)), -8, 8), 16, 10) % 7 = 2) as cvlog WHERE 1=0
*************************** 177. row ***************************
event_time: 2019-03-10 12:53:07
  argument: SELECT * FROM (select * from cvlog WHERE CONV(SUBSTRING(MD5(CONCAT('',country)), -8, 8), 16, 10) % 7 = 2) as cvlog WHERE 1=0
*************************** 187. row ***************************
event_time: 2019-03-10 12:53:07
  argument: SELECT * FROM (select * from cvlog WHERE CONV(SUBSTRING(MD5(CONCAT('',country)), -8, 8), 16, 10) % 7 = 3) as cvlog WHERE 1=0
*************************** 188. row ***************************
event_time: 2019-03-10 12:53:07
  argument: SELECT * FROM (select * from cvlog WHERE CONV(SUBSTRING(MD5(CONCAT('',country)), -8, 8), 16, 10) % 7 = 3) as cvlog WHERE 1=0
*************************** 198. row ***************************
event_time: 2019-03-10 12:53:07
  argument: SELECT * FROM (select * from cvlog WHERE CONV(SUBSTRING(MD5(CONCAT('',country)), -8, 8), 16, 10) % 7 = 4) as cvlog WHERE 1=0
*************************** 199. row ***************************
event_time: 2019-03-10 12:53:07
  argument: SELECT * FROM (select * from cvlog WHERE CONV(SUBSTRING(MD5(CONCAT('',country)), -8, 8), 16, 10) % 7 = 4) as cvlog WHERE 1=0
*************************** 209. row ***************************
event_time: 2019-03-10 12:53:07
  argument: SELECT * FROM (select * from cvlog WHERE CONV(SUBSTRING(MD5(CONCAT('',country)), -8, 8), 16, 10) % 7 = 5) as cvlog WHERE 1=0
*************************** 210. row ***************************
event_time: 2019-03-10 12:53:07
  argument: SELECT * FROM (select * from cvlog WHERE CONV(SUBSTRING(MD5(CONCAT('',country)), -8, 8), 16, 10) % 7 = 5) as cvlog WHERE 1=0
*************************** 220. row ***************************
event_time: 2019-03-10 12:53:07
  argument: SELECT * FROM (select * from cvlog WHERE CONV(SUBSTRING(MD5(CONCAT('',country)), -8, 8), 16, 10) % 7 = 6) as cvlog WHERE 1=0
*************************** 221. row ***************************
event_time: 2019-03-10 12:53:07
  argument: SELECT * FROM (select * from cvlog WHERE CONV(SUBSTRING(MD5(CONCAT('',country)), -8, 8), 16, 10) % 7 = 6) as cvlog WHERE 1=0
*************************** 231. row ***************************
event_time: 2019-03-10 12:53:06
  argument: SELECT * FROM (select * from cvlog WHERE CONV(SUBSTRING(MD5(CONCAT('',country)), -8, 8), 16, 10) % 7 = 0) as cvlog WHERE 1=0
*************************** 232. row ***************************
event_time: 2019-03-10 12:53:06
  argument: SELECT * FROM (select * from cvlog WHERE CONV(SUBSTRING(MD5(CONCAT('',country)), -8, 8), 16, 10) % 7 = 0) as cvlog WHERE 1=0

件数が少なすぎて分かりづらいが分散してる

MySQL [db]> SELECT `deviceid`,`uuid`,`appid`,`country`,`year`,`month`,`day`,`hour` FROM (select * from cvlog WHERE CONV(SUBSTRING(MD5(CONCAT('',country)), -8, 8), 16, 10) % 7 = 0) as cvlog
    -> ;
+----------+-------+-------+---------+------+-------+------+------+
| deviceid | uuid  | appid | country | year | month | day  | hour |
+----------+-------+-------+---------+------+-------+------+------+
| andro    | 11112 |     1 | FR      | 2017 |    12 |   14 |   14 |
| iphon    | 11113 |     9 | FR      | 2017 |    12 |   16 |   21 |
| iphon    | 11114 |     7 | AUS     | 2017 |    12 |   17 |   18 |
| pc       | 11118 |     1 | FR      | 2017 |    12 |    1 |    1 |
| pc       | 11117 |     9 | FR      | 2017 |    12 |    2 |   18 |
| iphon    | 11119 |     7 | AUS     | 2017 |    11 |   21 |   14 |
| andro    | 11122 |     1 | FR      | 2017 |    11 |   30 |   20 |
| iphon    | 11123 |     9 | FR      | 2017 |    11 |   14 |   14 |
| iphon    | 11124 |     7 | AUS     | 2017 |    12 |   17 |   14 |
| andro    | 11127 |     1 | FR      | 2017 |    12 |   19 |   14 |
| iphon    | 11128 |     9 | FR      | 2017 |    12 |    9 |    4 |
| iphon    | 11129 |     7 | AUS     | 2017 |    11 |   30 |   14 |
+----------+-------+-------+---------+------+-------+------+------+
12 rows in set (0.00 sec)

MySQL [db]> SELECT * FROM (select * from cvlog WHERE CONV(SUBSTRING(MD5(CONCAT('',country)), -8, 8), 16, 10) % 7 = 1) as cvlog WHERE 1=0;
Empty set (0.00 sec)

MySQL [db]> SELECT `deviceid`,`uuid`,`appid`,`country`,`year`,`month`,`day`,`hour` FROM (select * from cvlog WHERE CONV(SUBSTRING(MD5(CONCAT('',country)), -8, 8), 16, 10) % 7 = 1) as cvlog;
+----------+-------+-------+---------+------+-------+------+------+
| deviceid | uuid  | appid | country | year | month | day  | hour |
+----------+-------+-------+---------+------+-------+------+------+
| iphon    | 11111 |     1 | JP      | 2017 |    12 |   14 |   12 |
| other    | 11115 |     5 | JP      | 2017 |    12 |   29 |   15 |
| iphon    | 11116 |     1 | JP      | 2017 |    12 |   15 |   11 |
| other    | 11110 |     5 | JP      | 2017 |    11 |   29 |   15 |
| iphon    | 11121 |     1 | JP      | 2017 |    11 |   11 |   12 |
| iphon    | 11125 |     5 | JP      | 2017 |    11 |   29 |   15 |
| iphon    | 11126 |     1 | JP      | 2017 |    12 |   19 |    8 |
+----------+-------+-------+---------+------+-------+------+------+
7 rows in set (0.00 sec)

MySQL [db]> SELECT * FROM (select * from cvlog WHERE CONV(SUBSTRING(MD5(CONCAT('',country)), -8, 8), 16, 10) % 7 = 2) as cvlog WHERE 1=0;
Empty set (0.00 sec)

MySQL [db]> SELECT `deviceid`,`uuid`,`appid`,`country`,`year`,`month`,`day`,`hour` FROM (select * from cvlog WHERE CONV(SUBSTRING(MD5(CONCAT('',country)), -8, 8), 16, 10) % 7 = 2) as cvlog;

出力確認

Athenaで確認し、どちらのGlueジョブでも結果は変わらない

スクリーンショット 0031-03-10 18.24.05.png

こちらも是非

JDBC テーブルから並列読み取り
https://docs.aws.amazon.com/ja_jp/glue/latest/dg/run-jdbc-parallel-read-job.html

DynamicFrameReader クラス
https://docs.aws.amazon.com/ja_jp/glue/latest/dg/aws-glue-api-crawler-pyspark-extensions-dynamic-frame-reader.html

Glueの使い方まとめ
https://qiita.com/pioho07/items/32f76a16cbf49f9f712f

10
8
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
10
8

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?