11
4

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

生成AIにETLジョブ作らせてみた。-Amazon Q data integration in AWS Glue-

Posted at

1. はじめに

※この記事は、慎重な検証を行った結果に基づくものではなく、参考情報として提供されています。そのため、内容を鵜呑みにせず、自身で確認や検証を行っていただくことをお勧めします。ご理解のほど、よろしくお願いいたします。

Amazon Q data integration in AWS Glueがプレビューされました(2024/01/30)。
Amazon Q とAWS Glueが統合されたことによって、ETLジョブ作成に関する学習時間や労力の削減が期待できます。具体的な機能としては以下の二つです。

i.生成AIに自然言語で質問すると、回答してくれる。
Bedrockベースの生成AIがAWSのドキュメントを参照してより専門的な回答を生成してくれます。
(例)AWS Glue ジョブのメモリ管理を最適化するにはどうすればよいですか?

ii.生成AIに自然言語でETLジョブの作成を依頼できる。
(例)DynamoDB テーブルを読み取り、フィールドをマッピングし、結果を Parquet 形式で Amazon S3 に書き込む Glue ジョブを作成してください

現在は英語のみプレビューが可能で、AWSコンソールの右上のアイコンからアクセスできます。
画像3.png

右下の入力欄に自然言語で依頼

image.png

生成されたスクリプトを、右から左にコピペするだけで簡単にジョブが作れます。

image.png

2. Amazon Q 統合の利点

  • 学習時間の短縮
  • ジョブ作成の時間と労力の削減
  • 問題解決の早期化

AWS GlueではETLジョブをスクリプトで作成することができますが、私のような何も知らないド素人が少しマニュアルを読んだくらいでは、ちょっとした自動化も難しいのが触ってみた感想でした。。。が!

技術の進歩はすごいです、、。プロンプトでスクリプトを生成してくれるようになりました


私:「DynamoDB テーブルを読み取り、フィールドをマッピングし、結果を Parquet 形式で Amazon S3 に書き込む Glue ジョブを手伝ってください」

Amazon Q :

スクリプトの概要
1: 最初に、AWS Glueから必要なすべてのライブラリをインポートします。
2: 次>に、GlueContextオブジェクトをインポートして初期化します。これは、スクリプト作成の観点から最も重要なインポートです。これは、ソースとターゲットのデータセットを定義するための標準的な方法を公開し、すべてのETLスクリプトの出発点となります。
3: その後、SparkContextとSparkSessionを初期化します。これらにより、AWS Glueジョブ内で利用可能なSparkエンジンを設定できます。
4: スクリプト内でジョブ引数を使用するために、getResolvedOptionsを呼び出して準備します。
5: 次のステップは、ジョブを初期化することです。ジョブオブジェクトは、設定を行い、さまざまなオプションのAWS Glue機能の状態を追跡します。スクリプトはジョブオブジェクトなしで実行できますが、後でこれらの機能を統合する際に混乱を避けるため、初期化することが最善の方法です。
6: ソース(DynamoDB)からデータを抽出するために、オプションから動的フレームを作成し、接続タイプを「dynamodb」として設定します。
7: 接続オプションプレースホルダー(['dynamodb.input.tableName', 'dynamodb.throughput.read.percent', 'dynamodb.splits'])を実際の値に置き換えます。
8: ApplyMappingオプションのプレースホルダー(['frame', 'mappings'])を実際の値に置き換えます。 9: ターゲット(S3)にデータを書き込むために、フォーマットをparquetにし、フォーマットオプションを{"compression": "snappy"}にし、接続タイプを's3'にして、オプションから動的フレームを書き込みます。
10: 接続オプションのプレースホルダー(['path'])を実際の値に置き換えます。
11: ジョブブックマークをサポートするために、transformation_ctxパラメータをメソッドに提供します。
12: 最後に、スクリプトの最後でJobオブジェクトのcommitメソッドを呼び出すことにより、ジョブのライフサイクルを終了します。

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job

args = getResolvedOptions(sys.argv, ["JOB_NAME"])

sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args["JOB_NAME"], args)

dynamo_DBnode1 = glueContext.create_dynamic_frame.from_options(
    connection_type="dynamodb",
    connection_options={
        "dynamodb.input.tableName": "<table-name>",
        "dynamodb.throughput.read.percent": "0.5",
        "dynamodb.splits": "1",
    },
    transformation_ctx="dynamo_DBnode1",
)

ApplyMappingnode2 = ApplyMapping.apply(
    frame=dynamo_DBnode1,
    mappings=[
        ("old_name_col", "string", "new_name_col", "string"),
        ("old_id_col", "int", "new_id_col", "int"),
    ],
    transformation_ctx="ApplyMappingnode2",
)

s3node3 = glueContext.write_dynamic_frame.from_options(
    frame=ApplyMappingnode2,
    format_options={"compression": "snappy"},
    connection_type="s3",
    format="parquet",
    connection_options={"path": "**********************************************"},
    transformation_ctx="s3node3",
)
job.commit()

(英語のみ対応のため、翻訳したものを記載)


生成したコードの検証は必要ですが、ちょっとしたETLジョブであれば十分に活躍してくれると感じます。
他にもこんなざっくりとした質問にも具体的に答えてくれます。

私:AWS Glueをどのように使えばいいですか?

Amazon Q:

AWS Glueの使い方を始めるためのいくつかの方法を以下に示します。

AWSマネジメントコンソールにサインインし、「アナリティクス」カテゴリの下にある「Glue」に移動します(リンク)。ここで、Glueのビジュアルインターフェースやチュートリアルを探索できます。
AWS Glue Studioを使用して、コードを書かずにETLワークフローを視覚的に作成、実行、監視します。Glue Studioは、ドラッグアンドドロップのシンプルなインターフェースを提供します。
AWS Glue ETL Pythonライブラリを使用して、独自のETLコードを記述します。GlueのETL機能を活用したPythonスクリプトを記述し、Glueジョブとして実行できます。
AWS Glue DataBrewを使用して、コードを書かずにデータを視覚的にクリーニング、正規化、変換し、分析に使用できます。
AWS Glue GitHubリポジトリでサンプルETLコードにアクセスします。GlueのAPIとライブラリを使用して>ETLパイプラインを構築する方法を示す例を確認してください。
AWS Glueドキュメントを参照して、さまざまなユースケースで始める方法に関する詳細なチュートリアルやガイドを入手してください。ドキュメントには、Glueの機能を理解するのに役立つエンドツーエンドの例が提供されています。
始める方法のオプションを確認した後、特定のGlueタスクで助けが必要な場合はお知らせください。

3. 使ってみた感想

  • スクリプトを素直に作成してくれない時がある

私の英語のプロンプトが良くない可能性はありますが、「スクリプトを書いてほしい」などを明示的に伝えたとしても下記のように書いてくれないことがあります。

私:DB2からテーブルを読み取り、データをCSVにしてS3に格納するスクリプトを作成してください.
(Write a script to read a table from DB2, convert the data to CSV format, and store it in S3)

Amqzon Q:

DB2からテーブルを読み取り、データをCSV形式に変換し、S3に格納する手順は以下の通りです。

  1. DB2、S3、およびGlueへのアクセスが必要な権限を持つIAMロールを作成します。
  2. DB2のアンロードユーティリティを使用して、テーブルデータをCSV形式のファイルにエクスポートします。これには、db2 unloadコマンドを使用できます。
  3. AWS CLIまたはお好みのSDKを使用して、CSVファイルをS3バケットにアップロードします。
    aws s3 cp filename.csv s3://bucket-name
  4. AWS Glueクローラーを作成して、S3に格納されたCSVファイルのスキーマを推定します。
  5. Pythonで書かれたAWS Glue ETLスクリプトを使用して、S3のCSVファイルからデータを読み取り、必要に応じて変換し、CSVまたは他の形式で新しいS3ロケーションに出力を書き込みます。
  6. 変換されたデータは、S3に格納され、AthenaやRedshift Spectrumなどのサービスを使用してクエリしたり、Redshiftなどのデータベースにロードして分析したりできます。

( Here are the steps to read a table from DB2, convert the data to CSV format, and store it in S3:

  1. Create an IAM role with the necessary permissions to access DB2, S3 and Glue.
  2. Use the DB2 unload utility to export the table data to a file in CSV format. The
    db2 unload
  3. Upload the CSV file to an S3 bucket using the AWS CLI or SDK of your choice.
    aws s3 cp filename.csv s3://bucket-name
  4. Create an AWS Glue crawler to infer the schema of the CSV file stored in S3.
  5. Use AWS Glue ETL scripts written in Python to read the data from the S3 CSV file, transform if needed, and write the output to a new S3 location in CSV or other format.
  6. The transformed data in S3 can now be queried using services like Athena, Redshift Spectrum or loaded to databases like Redshift for analytics )
  • ビジュアルエディターには対応していない

現時点ではビジュアルエディターでのジョブ作成には対応していません。「ビジュアルエディターでの作成手順を教えてください」と文言を入れてもスクリプトが生成されたので、ひょっとするとトラブルシューティング自体も未対応なのかも?しれないです。

私:DynamoDB テーブルを読み取り、フィールドをマッピングし、結果を Parquet 形式で Amazon S3 に書き込む AWS Glue noビジュアルエディターで作成する手順を教えてください

(Please provide the steps to create an AWS Glue job using the visual editor to read a DynamoDB table, map the fields, and write the results in Parquet format to Amazon S3)

AmazonQ:

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job

args = getResolvedOptions(sys.argv, ["JOB_NAME"])

sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args["JOB_NAME"], args)

dynamo_DBnode1 = glueContext.create_dynamic_frame.from_options(
    connection_type="dynamodb",
    connection_options={
        "dynamodb.input.tableName": "<table-name>",
        "dynamodb.throughput.read.percent": "0.5",
        "dynamodb.splits": "1",
    },
    transformation_ctx="dynamo_DBnode1",
)

ApplyMappingnode2 = ApplyMapping.apply(
    frame=dynamo_DBnode1,
    mappings=[
        ("old_name_col", "string", "new_name_col", "string"),
        ("old_id_col", "int", "new_id_col", "int"),
    ],
    transformation_ctx="ApplyMappingnode2",
)

s3node3 = glueContext.write_dynamic_frame.from_options(
    frame=ApplyMappingnode2,
    format_options={"compression": "snappy"},
    connection_type="s3",
    format="parquet",
    connection_options={"path": "**********************************************"},
    transformation_ctx="s3node3",
)
job.commit()
  • ChatGPT gpt-4との比較
    今回のプレビュー機能ではチャットベースでの応答のみとなっており、Amazon Qが直接AWSのリソースを操作するわけではありません。
    上記を踏まえると、ほかの生成AIでも事足りるわけです。そこで、それぞれに同じプロンプトを入力して結果を比較しました。
    どちらも「Help me with a Glue job that reads my DynamoDB table, maps the fields, and writes the results to Amazon S3 in Parquet format」を入力として与えています。

結論から言うと、スクリプト自体に明確な差は見られませんでした。引数の数の違いはそれぞれ省略可能なものを省略しているかどうかによるもの、順番の違いはPythonの仕様上問題ない、「write_dynamic_frame.from_options」の戻り値を格納していないのも同様に問題ないです。
むしろ13行目まで全く同じであることに非常に驚きました。

しかしながら、スクリプトの説明やその他補足情報に関しては大きく差が出ました、Amazon Q の方がより専門的かつ具体的に説明をしてくれています。


1. ChatGpt

To create a Glue job that reads data from your DynamoDB table, maps the fields, and writes the results to Amazon S3 in Parquet format, follow these steps:

  1. Create an IAM role for Glue:
    Create an IAM role with the necessary permissions for Glue to access your DynamoDB table and S3 bucket.

  2. Create a Glue Crawler:
    Set up a Glue Crawler to discover the schema of your DynamoDB table and create a table definition in the Glue Data Catalog.

  3. Create a Glue Job:
    Create a Glue ETL job using Python or Scala with the necessary libraries to read from DynamoDB, map the fields, and write to S3 in Parquet format.

  4. Run the Glue Job:
    Execute the Glue job to perform the ETL process.

Here's a Python script using Glue's PySpark library to perform the ETL process:

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job

## Initialize the GlueContext and SparkContext
sc = SparkContext.getOrCreate()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)

## Define input and output locations
dynamodb_table_name = "your-dynamodb-table-name"
s3_output_path = "s3://your-bucket-name/output-path/"

## Read data from DynamoDB table
datasource0 = glueContext.create_dynamic_frame.from_options(
    connection_type="dynamodb",
    connection_options={
        "dynamodb.input.tableName": dynamodb_table_name,
        "dynamodb.throughput.read.percent": "1.0"
    }
)

## Map the fields as needed
mapped_datasource = datasource0.apply_mapping([
    ("field1", "string", "new_field1", "string"),
    ("field2", "string", "new_field2", "string"),
    # Add more field mappings as needed
])

## Write the results to S3 in Parquet format
glueContext.write_dynamic_frame.from_options(
    frame=mapped_datasource,
    connection_type="s3",
    connection_options={"path": s3_output_path},
    format="parquet"
)

job.commit()

Replace your-dynamodb-table-name with your DynamoDB table name, and s3://your-bucket-name/output-path/ with the desired S3 output path. Update the apply_mapping function with the appropriate field mappings.

Upload this script to an S3 bucket and use it as the source for your Glue job. Run the job to perform the ETL process.


2. Amazon Q

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job

args = getResolvedOptions(sys.argv, ["JOB_NAME"])

sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args["JOB_NAME"], args)

dynamo_DBnode1 = glueContext.create_dynamic_frame.from_options(
    connection_type="dynamodb",
    connection_options={
        "dynamodb.input.tableName": "<table-name>",
        "dynamodb.throughput.read.percent": "0.5",
        "dynamodb.splits": "1",
    },
    transformation_ctx="dynamo_DBnode1",
)

ApplyMappingnode2 = ApplyMapping.apply(
    frame=dynamo_DBnode1,
    mappings=[
        ("old_name_col", "string", "new_name_col", "string"),
        ("old_id_col", "int", "new_id_col", "int"),
    ],
    transformation_ctx="ApplyMappingnode2",
)

s3node3 = glueContext.write_dynamic_frame.from_options(
    frame=ApplyMappingnode2,
    format_options={"compression": "snappy"},
    connection_type="s3",
    format="parquet",
    connection_options={"path": "**********************************************"},
    transformation_ctx="s3node3",
)
job.commit()

Script Summary
1: First we import all necessary libraries from AWS Glue.
2: We then import and initialize a GlueContext object. This is the most important import, from the script writing perspective. This exposes standard methods for defining source and target datasets, which is the starting point for any ETL script.
3: Then we initialize a SparkContext and SparkSession. These allow you to configure the Spark engine available inside the AWS Glue job.
4: We call getResolvedOptions to prepare your job arguments for use within the script.
5: Next step is to initialize the Job. The Job object sets configuration and tracks the state of various optional AWS Glue features. Your script can run without a Job object, but the best practice is to initialize it so that you don't encounter confusion if those features are later integrated.
6: To extract the data from source - DynamoDB, Create dynamic frame from options and connection type as 'dynamodb'
7: Replace the connection option place holder - ['dynamodb.input.tableName', 'dynamodb.throughput.read.percent', 'dynamodb.splits'] with actual values
8: Replace the ApplyMapping option place holder - ['frame', 'mappings'] with actual values
9: To write the data to target - S3, Write dynamic frame from options using format - parquet, format options - {"compression": "snappy"} and connection type as 's3'
10: Replace the connection option place holder - ['path'] with actual values
11: We provide the transformation_ctx parameter to the method in order to support job bookmarks.
12: Finally, we conclude job lifecycle at the end of your script by calling the commit method on the Job object.


4. まとめ

2024/01/31の記事投稿時点

  • 日本語対応はしていない
  • チャットベースの応答のみでAWSリソースへの操作はしてくれない
  • 簡単なETLジョブのスクリプト作成の助けになる
  • プレビュー版とは言え、具体的かつ専門的な説明をしてくれる
  • ChatGPTに比べ、多少ファインチューニングされているように見える
  • スクリプトを生成させるにはプロンプトエンジニアリング的な要素が必要(?)

2023年のAWS re:Inventでの発表以来、個人的には最も関心があった機能でしたので早速触ってみて、記事に起こしました。機能として優れている点は多々ありますが、全部AIまかせでETLジョブが自動作成される未来はもう少し先の話かな、と感じました。
しかしながらプレビュー版を経て、アップデートを重ねるうちに回答精度諸々は改善されていくと思うので、今後に期待。

11
4
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
11
4

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?