AWS GlueでDBのデータをマスキングして出力してみる

はじめに

AWS Glueは2017年8月に発表された、フルマネージドでサーバレスなETLサービスです。
RDSからS3にデータを抽出したり、S3にあるログファイルをカタログに登録してAmazon Athenaで解析したりできます。
現在は、バージニア北部・オハイオ・オレゴンの3つのリージョンのみしかサポートされていませんが、もうまもなく東京リージョンもサポートされるのではないでしょうか。

本記事では、AWS Glueを使用してRDSインスタンスのデータを特定のカラムのみマスキングしてから、CSV形式でS3に抽出してみます!

前提

以下のRDSインスタンスが準備されているものとします。

  • MySQL RDSインスタンス

また、今回は以下のようなテーブルを使用します。
ID, ユーザ名、パスワードの3フィールドのみの簡単なテーブルです。
(まずパスワードが平文で保存されていることはありえませんが。。)

#+---+---------+------------+
#| ID| username|    password|
#+---+---------+------------+
#|  1|    alice|   alicepass|
#|  2|      bob|     bobpass|
#|  3|  charlie| charliepass|
#|  4|     dave|    davepass|
#|  5|    ellen|   ellenpass|
#+---+---------+------------+

また、事前に任意のS3バケットに以下のファイルをアップロードしておいてください。

job.py
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job

## @params: [JOB_NAME]
args = getResolvedOptions(sys.argv, ['JOB_NAME'])

sc = SparkContext()
glueContext = GlueContext(sc)
job = Job(glueContext)
job.init(args['JOB_NAME'], args)

# ---------- ここに書いていく ----------

job.commit()

ETL処理の作成

AWS GlueでETL処理を行うには、主に以下の手順で行います。
(DataCatalogを使用しない場合は、2と3は省略可能です。)

  1. Connectionの作成
  2. Crawlerの作成
  3. Crawlerを実行し、DataCatalogの登録
  4. Jobの作成
  5. JobのPythonスクリプトの記述
  6. Jobの実行

今回のケースでは、DataCatalogの登録にこれといったメリットが感じられないので使用しないことにします。

1. Connectionを作成する

まずはRDSへの接続情報を保持するConnectionを作成してみましょう。

  1. AWS Glueのコンソール画面の左メニューからConnectionsを選択します。
  2. Add connectionボタンをクリックします。
  3. 表示されたフォームに以下の通りに入力します。

    Connection name 任意
    Connection type Amazon RDS
    Database engine MySQL
  4. Nextをクリックします。

  5. フォームのInstance, Database name, Username, Passwordの欄を自分の環境のものに合わせて指定してください。

  6. Nextをクリックします。

  7. 問題なければFinishをクリックしてConnectionを作成します。

2. Jobを作成する

次に、ETL処理を行うJobを作成しましょう。

  1. AWS Glueのコンソール画面の左メニューからJobsを選択します。
  2. Add jobボタンをクリックします。
  3. 表示されたフォームに以下の通りに入力します。

    Name 任意
    IAM role Glue実行用のサービスロール
    This job runs An existing script that you provide
    S3 path where the script is stored 最初にアップロードしたS3のjob.pyファイル
    Temporary directory 任意のS3パス
  4. Nextをクリックします。

  5. Connectionの一覧から先ほど作成したConnectionを探し、Addをクリックします。

  6. Nextをクリックします。

  7. 問題なければFinishをクリックしてJobを作成します。

無事作成が完了するとスクリプトエディタが表示されるかと思います。

3. JobのPythonスクリプトを書く

ここからは具体的にETL処理を行うスクリプトを記述していきます。

データベースに接続する(Extract)

データベースに接続する際には、DynamicFrameというものを作成します。
AWS Glueでは、以下3つの方法で作成することができます。

create_dynamic_frame.from_catalog AWS Glueのデータカタログから作成します
create_dynamic_frame.from_rdd Resilient Distributed Dataset (RDD)から作成します
create_dynamic_frame.from_options JDBCやS3などの接続タイプを指定して作成します

今回は、DataCatalogは使用しないのでcreate_dynamic_frame.from_optionsを使用します。

glueContext.create_dynamic_frame.from_options(
    connection_type='mysql',
    connection_options={
        'url': 'JDBC_URL',
        'user': 'USER_NAME',
        'password': 'PASSWORD',
        'dbtable': 'TABLE_NAME'
    })

connection_typeは、jdbcと思ってしまうところですが、どうやらjdbcではエラーになってしまうようですので、接続先データベースのベンダー名を指定します。今回で言えば、mysqlです。
connection_optionsについては、直接記述するのもいいですが、せっかくConnectionを登録しているので、その情報を利用してみましょう。

Connectionから接続情報を取得

以下のようにして、AWS Glueに登録したConnectionの情報を取得することができます。

glueContext.extract_jdbc_conf(connection_name='CONNECTION_NAME')

この関数を使用することで、次のような情報が取れます。

url JDBCのURL
vendor vendor名
user ユーザ名
password パスワード

ただ、ここで注意があります。
AWS GlueのConnectionに登録されているJDBCのURLは、jdbc:[ベンダー名]://[ホスト名]:[ポート番号]/[データベース名](Oracleの場合は、jdbc:[ベンダー名]:thin://@[ホスト名]:[ポート番号]/[データベース名])となっているのですが、この関数で取れるURLは、jdbc:[ベンダー名]://[ホスト名]:[ポート番号]とデータベース名の部分が欠けていることに注意してください。また、Oracleのみ、jdbc:oracle:thin://@のところが、jdbc:oracle://という形で取得されるので、ここも注意してください。

この情報を用いて、以下のようにDynamicFrameを作成できます。
urlの部分は仕方なく、取得したURLにデータベース名を追加してあります。

job.py
# Connectionの情報を取得
jdbc_conf = glueContext.extract_jdbc_conf(connection_name='CONNECTION_NAME')

# DynamicFrameを作成
dynamicframe = glueContext.create_dynamic_frame.from_options(
                   connection_type='mysql',
                   connection_options={
                       'url': "{0}/{1}".format(jdbc_conf['url'], 'DB_NAME'),
                       'user': jdbc_conf['user'],
                       'password': jdbc_conf['password'],
                       'dbtable': 'TABLE_NAME'
                   })

マスキングしてみる(Transform)

次に、データのマスキングを行なってみましょう。
AWS GlueのビルトインTransformに、Mapというものがありますので、それを使ってマスキングを行います。
今回は、passwordカラムのデータをすべて****************に変えちゃいましょう。

job.py
# マスク用の関数
def mask(dynamicRecord):
    dynamicRecord['password'] = '****************'
    return dynamicRecord

# DynamicFrameにマスク用の関数を適用
masked_dynamicframe = Map.apply(frame=dynamicframe, f=mask)

データをCSV形式で出力する(Load)

最後は、データをCSV形式で書き出してみましょう。
データの書き出しは、以下3つの方法で行えます。

write_dynamic_frame.from_catalog AWS Glueのデータカタログを指定して書き出します
write_dynamic_frame.from_options JDBCやS3などの接続タイプを指定して書き出します
write_dynamic_frame.from_jdbc_conf JDBCオプションを指定して書き出します

今回は、CSV形式でS3に書き出すので、write_dynamic_frame.from_optionsを使用します。
S3のバケットは任意のものを指定してください。

job.py
# DynamicFrameをCSV形式でS3に書き出す
glueContext.write_dynamic_frame.from_options(
    frame=masked_dynamicframe,
    connection_type='s3',
    connection_options={'path': "s3://{0}/{1}".format('CSV_BACKET_NAME', 'TABLE_NAME')},
    format='csv')

最終的なコードはこのようになりました。

job.py
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job

# マスク用の関数
def mask(dynamicRecord):
    dynamicRecord['password'] = '****************'
    return dynamicRecord

## @params: [JOB_NAME]
args = getResolvedOptions(sys.argv, ['JOB_NAME'])

sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)


# Connectionの情報を取得
jdbc_conf = glueContext.extract_jdbc_conf(connection_name='CONNECTION_NAME')

# DynamicFrameを作成
dynamicframe = glueContext.create_dynamic_frame.from_options(
                   connection_type='mysql',
                   connection_options={
                       'url': "{0}/{1}".format(jdbc_conf['url'], 'DB_NAME'),
                       'user': jdbc_conf['user'],
                       'password': jdbc_conf['password'],
                       'dbtable': 'TABLE_NAME'
                   })

# DynamicFrameにマスク用の関数を適用
masked_dynamicframe = Map.apply(frame=dynamicframe, f=mask)

# DynamicFrameをCSV形式でS3に書き出す
glueContext.write_dynamic_frame.from_options(
    frame=masked_dynamicframe,
    connection_type='s3',
    connection_options={'path': "s3://{0}/{1}".format('CSV_BACKET_NAME', 'TABLE_NAME')},
    format='csv')


job.commit()

4. Jobの実行

最後にRun JobボタンをクリックしてJobを実行してみましょう。

出力されたCSVを確認するとしっかりとデータがマスクされています!

output.csv
id,username,password
1,alice,****************
2,bob,****************
3,charlie,****************
4,dave,****************
5,ellen,****************

おわりに

AWS Glueを使用してRDSインスタンスのデータを特定のカラムのみマスキングしてから、CSV形式でS3に抽出してみました。
今回はCSV形式での出力を行いましたが、RDSインスタンスのデータベースにデータを書き出すこともできます。ただ、この場合は書き出し先のテーブルの末尾にデータが追加されてしまいますので、もしテーブル内のデータをTruncateしてから書き出したい場合はPySparkのAPIを使用する必要があります。
AWS GlueのDynamicFrameは、PySparkのDataFrameをラップしていますので、DataFrameへの変換が可能です。DataFrameに変換すればもっと柔軟なETL処理を行うことができます。

AWS Glueは、新しいサービスということもあり日々進化しています。昨日できなかったことが今日できたりします。今後、DynamicFrameでもっと多くのことができるようになり、AWS Glueがさらに便利に使えることを期待しています。

参考

(公式)AWS Glue Documentation