はじめに
AWS Glueは2017年8月に発表された、フルマネージドでサーバレスなETLサービスです。
RDSからS3にデータを抽出したり、S3にあるログファイルをカタログに登録してAmazon Athenaで解析したりできます。
現在は、バージニア北部・オハイオ・オレゴンの3つのリージョンのみしかサポートされていませんが、もうまもなく東京リージョンもサポートされるのではないでしょうか。
本記事では、AWS Glueを使用してRDSインスタンスのデータを特定のカラムのみマスキングしてから、CSV形式でS3に抽出してみます!
前提
以下のRDSインスタンスが準備されているものとします。
- MySQL RDSインスタンス
また、今回は以下のようなテーブルを使用します。
ID, ユーザ名、パスワードの3フィールドのみの簡単なテーブルです。
(まずパスワードが平文で保存されていることはありえませんが。。)
#+---+---------+------------+
#| ID| username| password|
#+---+---------+------------+
#| 1| alice| alicepass|
#| 2| bob| bobpass|
#| 3| charlie| charliepass|
#| 4| dave| davepass|
#| 5| ellen| ellenpass|
#+---+---------+------------+
また、事前に任意のS3バケットに以下のファイルをアップロードしておいてください。
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
## @params: [JOB_NAME]
args = getResolvedOptions(sys.argv, ['JOB_NAME'])
sc = SparkContext()
glueContext = GlueContext(sc)
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
# ---------- ここに書いていく ----------
job.commit()
ETL処理の作成
AWS GlueでETL処理を行うには、主に以下の手順で行います。
(DataCatalogを使用しない場合は、2と3は省略可能です。)
- Connectionの作成
- Crawlerの作成
- Crawlerを実行し、DataCatalogの登録
- Jobの作成
- JobのPythonスクリプトの記述
- Jobの実行
今回のケースでは、DataCatalogの登録にこれといったメリットが感じられないので使用しないことにします。
1. Connectionを作成する
まずはRDSへの接続情報を保持するConnectionを作成してみましょう。
-
AWS Glueのコンソール画面の左メニューからConnectionsを選択します。
-
Add connectionボタンをクリックします。
-
表示されたフォームに以下の通りに入力します。
Connection name 任意 Connection type Amazon RDS Database engine MySQL -
Nextをクリックします。
-
フォームのInstance, Database name, Username, Passwordの欄を自分の環境のものに合わせて指定してください。
-
Nextをクリックします。
-
問題なければFinishをクリックしてConnectionを作成します。
2. Jobを作成する
次に、ETL処理を行うJobを作成しましょう。
-
AWS Glueのコンソール画面の左メニューからJobsを選択します。
-
Add jobボタンをクリックします。
-
表示されたフォームに以下の通りに入力します。
Name 任意 IAM role Glue実行用のサービスロール This job runs An existing script that you provide S3 path where the script is stored 最初にアップロードしたS3のjob.pyファイル Temporary directory 任意のS3パス -
Nextをクリックします。
-
Connectionの一覧から先ほど作成したConnectionを探し、Addをクリックします。
-
Nextをクリックします。
-
問題なければFinishをクリックしてJobを作成します。
無事作成が完了するとスクリプトエディタが表示されるかと思います。
3. JobのPythonスクリプトを書く
ここからは具体的にETL処理を行うスクリプトを記述していきます。
データベースに接続する(Extract)
データベースに接続する際には、DynamicFrame
というものを作成します。
AWS Glueでは、以下3つの方法で作成することができます。
create_dynamic_frame.from_catalog | AWS Glueのデータカタログから作成します |
create_dynamic_frame.from_rdd | Resilient Distributed Dataset (RDD)から作成します |
create_dynamic_frame.from_options | JDBCやS3などの接続タイプを指定して作成します |
今回は、DataCatalogは使用しないのでcreate_dynamic_frame.from_options
を使用します。
glueContext.create_dynamic_frame.from_options(
connection_type='mysql',
connection_options={
'url': 'JDBC_URL',
'user': 'USER_NAME',
'password': 'PASSWORD',
'dbtable': 'TABLE_NAME'
})
connection_type
は、jdbc
と思ってしまうところですが、どうやらjdbc
ではエラーになってしまうようですので、接続先データベースのベンダー名を指定します。今回で言えば、mysql
です。
connection_options
については、直接記述するのもいいですが、せっかくConnectionを登録しているので、その情報を利用してみましょう。
Connectionから接続情報を取得
以下のようにして、AWS Glueに登録したConnectionの情報を取得することができます。
glueContext.extract_jdbc_conf(connection_name='CONNECTION_NAME')
この関数を使用することで、次のような情報が取れます。
url | JDBCのURL |
vendor | vendor名 |
user | ユーザ名 |
password | パスワード |
ただ、ここで注意があります。
AWS GlueのConnectionに登録されているJDBCのURLは、jdbc:[ベンダー名]://[ホスト名]:[ポート番号]/[データベース名]
(Oracleの場合は、jdbc:[ベンダー名]:thin://@[ホスト名]:[ポート番号]/[データベース名]
)となっているのですが、この関数で取れるURLは、jdbc:[ベンダー名]://[ホスト名]:[ポート番号]
とデータベース名の部分が欠けていることに注意してください。また、Oracleのみ、jdbc:oracle:thin://@
のところが、jdbc:oracle://
という形で取得されるので、ここも注意してください。
この情報を用いて、以下のようにDynamicFrameを作成できます。
urlの部分は仕方なく、取得したURLにデータベース名を追加してあります。
# Connectionの情報を取得
jdbc_conf = glueContext.extract_jdbc_conf(connection_name='CONNECTION_NAME')
# DynamicFrameを作成
dynamicframe = glueContext.create_dynamic_frame.from_options(
connection_type='mysql',
connection_options={
'url': "{0}/{1}".format(jdbc_conf['url'], 'DB_NAME'),
'user': jdbc_conf['user'],
'password': jdbc_conf['password'],
'dbtable': 'TABLE_NAME'
})
マスキングしてみる(Transform)
次に、データのマスキングを行なってみましょう。
AWS GlueのビルトインTransformに、Map
というものがありますので、それを使ってマスキングを行います。
今回は、password
カラムのデータをすべて****************
に変えちゃいましょう。
# マスク用の関数
def mask(dynamicRecord):
dynamicRecord['password'] = '****************'
return dynamicRecord
# DynamicFrameにマスク用の関数を適用
masked_dynamicframe = Map.apply(frame=dynamicframe, f=mask)
データをCSV形式で出力する(Load)
最後は、データをCSV形式で書き出してみましょう。
データの書き出しは、以下3つの方法で行えます。
write_dynamic_frame.from_catalog | AWS Glueのデータカタログを指定して書き出します |
write_dynamic_frame.from_options | JDBCやS3などの接続タイプを指定して書き出します |
write_dynamic_frame.from_jdbc_conf | JDBCオプションを指定して書き出します |
今回は、CSV形式でS3に書き出すので、write_dynamic_frame.from_options
を使用します。
S3のバケットは任意のものを指定してください。
# DynamicFrameをCSV形式でS3に書き出す
glueContext.write_dynamic_frame.from_options(
frame=masked_dynamicframe,
connection_type='s3',
connection_options={'path': "s3://{0}/{1}".format('CSV_BACKET_NAME', 'TABLE_NAME')},
format='csv')
最終的なコードはこのようになりました。
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
# マスク用の関数
def mask(dynamicRecord):
dynamicRecord['password'] = '****************'
return dynamicRecord
## @params: [JOB_NAME]
args = getResolvedOptions(sys.argv, ['JOB_NAME'])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
# Connectionの情報を取得
jdbc_conf = glueContext.extract_jdbc_conf(connection_name='CONNECTION_NAME')
# DynamicFrameを作成
dynamicframe = glueContext.create_dynamic_frame.from_options(
connection_type='mysql',
connection_options={
'url': "{0}/{1}".format(jdbc_conf['url'], 'DB_NAME'),
'user': jdbc_conf['user'],
'password': jdbc_conf['password'],
'dbtable': 'TABLE_NAME'
})
# DynamicFrameにマスク用の関数を適用
masked_dynamicframe = Map.apply(frame=dynamicframe, f=mask)
# DynamicFrameをCSV形式でS3に書き出す
glueContext.write_dynamic_frame.from_options(
frame=masked_dynamicframe,
connection_type='s3',
connection_options={'path': "s3://{0}/{1}".format('CSV_BACKET_NAME', 'TABLE_NAME')},
format='csv')
job.commit()
4. Jobの実行
最後にRun JobボタンをクリックしてJobを実行してみましょう。
出力されたCSVを確認するとしっかりとデータがマスクされています!
id,username,password
1,alice,****************
2,bob,****************
3,charlie,****************
4,dave,****************
5,ellen,****************
おわりに
AWS Glueを使用してRDSインスタンスのデータを特定のカラムのみマスキングしてから、CSV形式でS3に抽出してみました。
今回はCSV形式での出力を行いましたが、RDSインスタンスのデータベースにデータを書き出すこともできます。ただ、この場合は書き出し先のテーブルの末尾にデータが追加されてしまいますので、もしテーブル内のデータをTruncateしてから書き出したい場合はPySparkのAPIを使用する必要があります。
AWS GlueのDynamicFrameは、PySparkのDataFrameをラップしていますので、DataFrameへの変換が可能です。DataFrameに変換すればもっと柔軟なETL処理を行うことができます。
AWS Glueは、新しいサービスということもあり日々進化しています。昨日できなかったことが今日できたりします。今後、DynamicFrameでもっと多くのことができるようになり、AWS Glueがさらに便利に使えることを期待しています。