AWS Lake Formationとは?
Lake FormationはAWS上でのデータレイク構築を容易にするサービスです。
複数の機能を持っていますが、特にセキュリティ面でIAMポリシーを補強するセキュリティコントロールを実現できます。
AWS Lake Formationのセキュリティコントロール
AWSでデータレイクを構築するときの標準的な構成は、実データをS3に配置し、それらのデータのカタログ情報をGlue Data Catalogとして管理する構成になります。
Lake Formationがない場合はS3のバケットポリシーやGlue Data Catalogのポリシーなどの複数のIAMポリシーでセキュリティコントロールを行う必要があり非常に面倒でした。
Lake Formationはデータレイクに含まれるデータベースやテーブルなどの単位でユーザごとにSELECT, INSERTなどの権限制御を行うことができるようになります。従来のRDBで行うような権限制御を行うことができるようになります。
詳しくはBlack Beltの資料をご覧ください。
AWS Lake Formationのセキュリティコントロールに対応しているサービス
以下のサービスはLake Formationのセキュリティコントロールに対応しています。
- Amazon Athena
- AWS Glue
- Amazon Redshift
- Amazon EMR
例えばLake FormationでAユーザにXテーブルのSELECT権限だけを与えた場合にAmazon Athenaでクエリを実行すると
-
SELECT * FROM Xテーブル
は成功する -
INSERT INTO Xテーブル ...
は権限がないため失敗する
という挙動になります。
また、Lake Formation側でセキュリティの設定を行えば対応サービス側で何か設定を行う必要はなくLake Formationのセキュリティ設定が各サービスで適用されます。
AWS Lake Formationに対応していない場合
AWS Lake Formationに対応していない場合、例えば自作のPythonプログラムでデータレイクのテーブルのデータを参照したい場合にLake Formationのセキュリティコントロールを使うことはできないのでしょうか?
Lake Formationが提供しているQuerying APIを使用するとLake Formationで設定したセキュリティで動作させることができます。
Querying APIとは?
Querying APIはLake Formationが提供しているGoverned Tablesに対してクエリを行うためのAPIです。
Governed TablesはS3にParquetなどで保存したデータに対して、いわゆるRDBで行えるようなトランザクションの制御を実現するテーブルの形式です。知っている方であれば、Apache Hudi, Apache Iceberg, Delta Lakeと同じような機能を持つものと思っていただけるとイメージしやすいのではないかと思います。
PythonプログラムをLake Formationのセキュリティコントロールで制御する
実はQuerying APIはGoverned Tables以外の普通のテーブルに対しても実行することができます。普通のテーブルに対してはもちろんトランザクション制御を行うことはできませんが、Lake Formationのセキュリティの制御は適用されます。
以下がQuerying APIを使ったテーブルのデータ参照をするPythonプログラムです。Pythonを実行するIAMユーザにデータ参照を行うテーブルに対してLake FormationでDESCRIBE, SELECT権限を与えると実行が可能になります。一方権限がない場合は start_query_planning
で Insufficient Lake Formation Permission
になり権限がないため適切にエラーになることがわかります。
import boto3
lf = boto3.client("lakeformation")
# Querying APIを使うときはstart_transactionを実行しTransactionIdを取得する
r = lf.start_transaction(TransactionType="READ_ONLY")
txid = r["TransactionId"]
# データベース名とTransactionIdとPartiQL形式のクエリを指定する
r = lf.start_query_planning(QueryPlanningContext={'DatabaseName': 'db1','TransactionId': txid},QueryString='select * from tbl1')
qid = r['QueryId']
# クエリIDを指定してWork Unitを取得
r = lf.get_work_units(QueryId = qid)
wuid = r['WorkUnitRanges'][0]['WorkUnitIdMin']
token = r['WorkUnitRanges'][0]['WorkUnitToken']
# Work Unitから結果を取得する。結果はApache Arrowのストリームとして返却される
r = lf.get_work_unit_results(QueryId=qid, WorkUnitId=wuid, WorkUnitToken=token)
# Apache Arrowのストリームから結果を取得しprintで表示
import pyarrow as pa
s = pa.ipc.RecordBatchStreamReader(r['ResultStream'])
r = s.read_all()
print(r)
取得したデータについてはApache Arrowのストリームで返却されるので、必要に応じてPandasのDataFrameなどに変換して処理を続けることもできます。