DataLakeとは
https://aws.amazon.com/jp/big-data/datalakes-and-analytics/what-is-a-data-lake/
AWS公式の記述を引用すると
データレイクは、規模にかかわらず、すべての構造化データと非構造化データを保存できる一元化されたリポジトリです。データをそのままの形で保存できるため、データを構造化しておく必要がありません。また、ダッシュボードや可視化、ビッグデータ処理、リアルタイム分析、機械学習など、さまざまなタイプの分析を実行し、的確な意思決定に役立てることができます。
当記事ではDataLakeのデータストアをS3とし、既存のRDSに蓄積されている既存ログ(レガシー)をGlueを使ってS3に移行する手順を紹介します。
なお、S3に移行したログの可視化や活用方法は当記事の対象範囲外です。
(ただしAthenaを使ってログを抽出できることは確認します)
また、今後蓄積されるログはRDSではなくS3に蓄積し続けることになりますが、当記事では既存ログの移行方法を紹介するまでに留めます。
(やるとすればKinesisFirehoseを使うことになるでしょう)
構成図
RDSはVPC内のプライベートサブネットに配置されています。
-
クローリング
GlueからVPC内のRDSへ接続しクローリングを行ないデータカタログを作ります。 -
ジョブ
S3へ物理的にログを移行します -
クエリ発行
S3へ移行されたログを抽出してみます
GlueからRDSへ接続する
Glue設定
レガシーログが格納されているRDSを選択し、画面通りに設定を進めて完了します。

セキュリティグループ作成
RDSをVPCに配置している場合はセキュリティグループが必要です。
以下を参考に、Glueの自己参照型セキュリティグループを作成します。
https://docs.aws.amazon.com/ja_jp/glue/latest/dg/setup-vpc-for-glue-access.html
全てのトラフィックを、 自身のセキュリティグループからのみ呼ばれるように 設定します。

このセキュリティグループを、RDSのセキュリティグループに全てのTCPを開放して設定します。(以下はRDSにアタッチされているセキュリティグループの例)

接続テスト
ここまで進めればGlueの接続より接続テストをしてみましょう。
ここで失敗すれば設定等に不備がある可能性があります。見直しましょう。
クローリング①
データベース作成
クローリング実行
データストアはJDBCを選択し、先ほど作成した接続を選択します。
実行はオンデマンドで実行とし、最後に手動実行します。

実行後、左メニューのテーブルにテーブルが追加されていることを確認しましょう。
S3バケット作成
Glueのテーブルにクローリングの結果が格納されていますが、物理的なログの格納場所はS3となります。
この次の手順でジョブを作成しログを流し込みますが、そのためのS3バケットを事前に作成します。
適当なバケット名を指定して作成しておきましょう。
Glueジョブ作成
ジョブにはスクリプトを作成する必要があります。
今回はAWS Glue が生成する提案されたスクリプトを選択して進めます。

なお、IAMロールを作成する必要があるので存在しない場合は新規で作成します。
アタッチするポリシーはAWSGlueServiceRoleとAmazonS3FullAccessにします。
その他オプションについては自由ですが、モニタリングオプションは有効化しておくのがおすすめです。
エラーになった場合に原因が特定されやすくなります。

そして設定を進めていくとスクリプトの編集画面に進みます。
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
## @params: [JOB_NAME]
args = getResolvedOptions(sys.argv, ['JOB_NAME'])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
〜(省略)〜
## @type: DataSink
## @args: [database = "xxxxx", table_name = "xxx_logs", transformation_ctx = "datasink5"]
## @return: datasink5
## @inputs: [frame = resolvechoice4]
# datasink5 = glueContext.write_dynamic_frame.from_catalog(frame = resolvechoice4, database = "xxxxx", table_name = "xxx_logs", transformation_ctx = "datasink5")
datasink5 = glueContext.write_dynamic_frame.from_options(frame = resolvechoice4, connection_type = "s3", connection_options = {"path": "s3://xxx/yyy/zzz"}, format = "parquet", transformation_ctx = "datasink5")
job.commit()
最終行付近にあるglueContext.write_dynamic_frame.from_catalogの部分を編集します。
今回はS3に配置するため、以下のようにします。
glueContext.write_dynamic_frame.from_options(frame = resolvechoice4, connection_type = "s3", connection_options = {"path": "s3://xxx/yyy/zzz"}, format = "parquet", transformation_ctx = "datasink5")
s3://xxxがバケット、yyy/zzzはキー(プレフィックス)になりますが、S3ではキーで区切ることでデータストアのパーティションとして定義することができます。パーティションについての解説は当記事の対象範囲外とします。
クローリング②
ジョブの実行が完了すると、S3に.parquet形式のログデータが格納されていると思います。
このデータをデータカタログとして登録するためにクローラの作成を行ないます。
前回の手順ではデータストアをRDSとしましたが、今回はS3です。

クローラを実行し、左メニューのテーブルにS3をデータストアとしたテーブルが追加されていることを確認しましょう。
なお、クローラを実行しないとデータカタログが更新されませんので
商用で活用する場合は定期的に実行するようにスケジュールを設定することをおすすめします。
Athenaによるクエリ実行
Athenaによるログの抽出は簡単です。SQLとほぼ同じ形式で書けます。
クローリングしたデータが抽出できることを確認しましょう。





