1
2

Autonomous Database:S3にファイルがアップロードされたらすぐに表にロードし、ロード完了後にS3上のファイルを削除する

Posted at

はじめに

こちらの記事の応用編として、Amazon S3バケットにファイルがアップロードされたら、それをトリガーにしてLambdaファンクションが起動、起動したLambdaファンクションがAutonomous Databaseに接続してアップロードされたファイルを表にロードし、ロード後にロードしたファイルをAmazon S3から削除する仕組みを構築してみました。

処理の流れのイメージは以下のようになります。

  1. Amazon S3のバケットにファイルをアップロード

  2. Amazon S3バケットのオブジェクト作成イベントをLambdaに通知

  3. LambdaからAutonomous Databaseにアクセス

  4. Amazon S3にアップロードされたファイルを表にロードし、ロード後にファイルを削除するPL/SQLプロシージャをLambdaから実行

  5. PL/SQLプロシージャによって、Amazon S3にアップロードされたファイルが表にロードされ、その後ファイルが削除される

※なお、Python、Lambdaに関して初心者ですので、最低限動くといったコーディング、設定しか行っていません。実装にあたっては、エラーハンドリング等を追加し充分に検証を行ってから実装してください。こちらの内容を元に実装されたことによる問題について、当方は一切の責任を負いません。
また、こちらの内容に関するOracle Supportへのご質問等はご遠慮ください。

1. 事前準備

バケットload-and-delete-buketを作成し、empdata1.csvをアップロードします。
スクリーンショット 2024-07-31 18.15.48.png

empdata1.csvの内容は以下の通りです。

empdata1.csv
1,SMITH
2,ALLEN
3,WARD

S3にアップロードされたファイルのロード先となる表empdataを作成します。

SQL> CREATE TABLE empdata (
  2  	 id   NUMBER,
  3  	 name VARCHAR2(20)
  4  );

Table created.

SQL> 

2. PL/SQLプロシージャの作成と動作確認

バケット名とオブジェクト名(ファイル名)をパラメータとして受け取り、パラメータで指定されたS3上のファイルを表にロードし、ロード後にS3上のファイルを削除するプロシージャload_and_deleteを作成します。

SQL> CREATE OR REPLACE PROCEDURE load_and_delete ( bucket_name IN VARCHAR2, object_name IN VARCHAR2)
  2  IS
  3  	 object_uri VARCHAR2(1024) := 'https://'||bucket_name||'.s3.ap-northeast-1.amazonaws.com/'||object_name;
  4  	 table_name VARCHAR2(128)  := 'empdata';
  5  	 cred_name  VARCHAR2(128)  := 'AWS_S3_CRED';
  6  BEGIN
  7  	 DBMS_OUTPUT.PUT_LINE('object_uri:'||object_uri);
  8  
  9  	 DBMS_CLOUD.COPY_DATA(
 10  	     table_name      => table_name,
 11  	     credential_name => cred_name,
 12  	     file_uri_list   => object_uri,
 13  	     format	     => json_object('delimiter' value ',')
 14  	 );
 15  
 16  	 DBMS_CLOUD.DELETE_OBJECT(
 17  	     credential_name => cred_name,
 18  	     object_uri      => object_uri
 19  	 );
 20  END;
 21  /

Procedure created.

SQL> 

対象のバケットにアクセスできることを確認します。

SQL> col object_name for a30
SQL> SELECT object_name, bytes FROM
  2  DBMS_CLOUD.LIST_OBJECTS(
  3  	 'AWS_S3_CRED',
  4  	 'https://load-and-delete-bucket.s3.ap-northeast-1.amazonaws.com/'
  5  );

OBJECT_NAME			                BYTES
------------------------------ ----------
empdata1.csv			               22

SQL> 

empdata表が空であることを確認します。

SQL> SELECT * FROM empdata;

no rows selected

SQL>

バケット名「load-and-delete-bucket」、ファイル名「empdata1.csv」をパラメータとして渡して、プロシージャload_and_deleteを実行してみます。

SQL> set serveroutput on
SQL> EXEC load_and_delete('load-and-delete-bucket','empdata1.csv')
object_uri:https://load-and-delete-bucket.s3.ap-northeast-1.amazonaws.com/empdat
a1.csv

PL/SQL procedure successfully completed.

SQL>

empdata表の内容を確認します。

SQL> SELECT * FROM empdata;

	    ID NAME
---------- --------------------
	     1 SMITH
	     2 ALLEN
	     3 WARD

SQL>

empdata1.csvの内容がempdata表にロードされていることが確認できました。

S3バケットの内容を確認します。

SQL> SELECT * FROM
  2  DBMS_CLOUD.LIST_OBJECTS(
  3  	 'AWS_S3_CRED',
  4  	 'https://load-and-delete-bucket.s3.ap-northeast-1.amazonaws.com/'
  5  );

no rows selected

SQL>

ファイルempdata1.csvが削除されていることが確認できました。

3. Lambdaファンクションの作成

sam initを実行して新規アプリケーションを初期化します。
今回はAWS Quick Start Templatesに用意されているHello World Exampleをテンプレートとして使用しました。
プロジェクト名はload-and-deleteとしました。

(mypy) kyamakaw@kyamakaw-mac sam % sam init

You can preselect a particular runtime or package type when using the `sam init` experience.
Call `sam init --help` to learn more.

Which template source would you like to use?
	1 - AWS Quick Start Templates
	2 - Custom Template Location
Choice: 1

Choose an AWS Quick Start application template
	1 - Hello World Example
	2 - Data processing
	3 - Hello World Example with Powertools for AWS Lambda
	4 - Multi-step workflow
	5 - Scheduled task
	6 - Standalone function
	7 - Serverless API
	8 - Infrastructure event management
	9 - Lambda Response Streaming
	10 - Serverless Connector Hello World Example
	11 - Multi-step workflow with Connectors
	12 - GraphQLApi Hello World Example
	13 - Full Stack
	14 - Lambda EFS example
	15 - DynamoDB Example
	16 - Machine Learning
Template: 1

Use the most popular runtime and package type? (Python and zip) [y/N]: y

Would you like to enable X-Ray tracing on the function(s) in your application?  [y/N]: 

Would you like to enable monitoring using CloudWatch Application Insights?
For more info, please view https://docs.aws.amazon.com/AmazonCloudWatch/latest/monitoring/cloudwatch-application-insights.html [y/N]: 

Would you like to set Structured Logging in JSON format on your Lambda functions?  [y/N]: 

Project name [sam-app]: load-and-delete

    -----------------------
    Generating application:
    -----------------------
    Name: load-and-delete
    Runtime: python3.9
    Architectures: x86_64
    Dependency Manager: pip
    Application Template: hello-world
    Output Directory: .
    Configuration file: load-and-delete/samconfig.toml
    
    Next steps can be found in the README file at load-and-delete/README.md
        

Commands you can use next
=========================
[*] Create pipeline: cd load-and-delete && sam pipeline init --bootstrap
[*] Validate SAM template: cd load-and-delete && sam validate
[*] Test Function in the Cloud: cd load-and-delete && sam sync --stack-name {stack-name} --watch

(mypy) kyamakaw@kyamakaw-mac sam %

作成されたディレクトリload-and-deleteに移動します。

(mypy) kyamakaw@kyamakaw-mac sam % cd load-and-delete 
(mypy) kyamakaw@kyamakaw-mac load-and-delete % ls
README.md	events		samconfig.toml	tests
__init__.py	hello_world	template.yaml
(mypy) kyamakaw@kyamakaw-mac load-and-delete %

ディレクトリhello_worldに移動します。

(mypy) kyamakaw@kyamakaw-mac load-and-delete % cd hello_world 
(mypy) kyamakaw@kyamakaw-mac hello_world % ls
__init__.py		app.py			requirements.txt
(mypy) kyamakaw@kyamakaw-mac hello_world %

Autonomous Databaseへのアクセスにpython-oracledbを使用するので、reqirements.txtにoracledbを追加します。

(mypy) kyamakaw@kyamakaw-mac hello_world % vi requirements.txt 

oracledbを追加して保存します。

requests
oracledb

アプリケーション本体のapp.pyを編集し、以下のような内容に書き換えます。

import json
import oracledb
import os

# 環境変数を取得
un = os.environ.get('PYTHON_USERNAME')
pw = os.environ.get('PYTHON_PASSWORD')
cs = os.environ.get('PYTHON_CONNECTSTRING')

def lambda_handler(event, context):
# イベント通知からバケット名とオブジェクト名を取得
    bucket_name = event['Records'][0]['s3']['bucket']['name']
    object_name = event['Records'][0]['s3']['object']['key']
    
# Autonomous Databaseに接続し、バケット名、オブジェクト名を引数としてload-and-deleteプロシージャを実行
    with oracledb.connect(user=un, password=pw, dsn=cs) as connection:
        with connection.cursor() as cursor:
            cursor.callproc('load_and_delete', [bucket_name, object_name])

    return {
        "statusCode": 200,
        "body": json.dumps({
            "message": "hello world",
            # "location": ip.text.replace("\n", "")
        }),
    }

ディレクトリload-and-deleteに移動します。

(mypy) kyamakaw@kyamakaw-mac hello_world % cd ..

sam buildでアプリケーションをビルドを実行します。

(mypy) kyamakaw@kyamakaw-mac load-and-delete % sam build
Starting Build use cache                                                        
Manifest file is changed (new hash: 2768310a16637613860e1081461caa9f) or        
dependency folder (.aws-sam/deps/18fc3384-c7aa-4080-a286-f650ebdfd1bb) is       
missing for (HelloWorldFunction), downloading dependencies and copying/building 
source                                                                          
Building codeuri: /Users/kyamakaw/sam/load-and-delete/hello_world runtime:      
python3.9 metadata: {} architecture: x86_64 functions: HelloWorldFunction       
 Running PythonPipBuilder:CleanUp                                               
 Running PythonPipBuilder:ResolveDependencies                                   
 Running PythonPipBuilder:CopySource                                            
 Running PythonPipBuilder:CopySource                                            

Build Succeeded

Built Artifacts  : .aws-sam/build
Built Template   : .aws-sam/build/template.yaml

Commands you can use next
=========================
[*] Validate SAM template: sam validate
[*] Invoke Function: sam local invoke
[*] Test Function in the Cloud: sam sync --stack-name {{stack-name}} --watch
[*] Deploy: sam deploy --guided
(mypy) kyamakaw@kyamakaw-mac load-and-delete % 

sam deployでアプリケーションをLambdaにファンクションとしてデプロイします。

(mypy) kyamakaw@kyamakaw-mac load-and-delete % sam deploy --stack-name load-and-delete --guided

Configuring SAM deploy
======================

	Looking for config file [samconfig.toml] :  Found
	Reading default arguments  :  Success

	Setting default arguments for 'sam deploy'
	=========================================
	Stack Name [load-and-delete]: 
	AWS Region [ap-northeast-1]: 
	#Shows you resources changes to be deployed and require a 'Y' to initiate deploy
	Confirm changes before deploy [Y/n]: 
	#SAM needs permission to be able to create roles to connect to the resources in your template
	Allow SAM CLI IAM role creation [Y/n]: 
	#Preserves the state of previously provisioned resources when an operation fails
	Disable rollback [y/N]: 
	HelloWorldFunction has no authentication. Is this okay? [y/N]: y
	Save arguments to configuration file [Y/n]: 
	SAM configuration file [samconfig.toml]: 
	SAM configuration environment [default]: 

	Looking for resources needed for deployment:

	Managed S3 bucket: aws-sam-cli-managed-default-samclisourcebucket-e7cuyvsbwyhw
	A different default S3 bucket can be set in samconfig.toml and auto resolution of buckets turned off by setting resolve_s3=False
                                                                                
        Parameter "stack_name=load-and-delete" in [default.deploy.parameters] is
defined as a global parameter [default.global.parameters].                      
        This parameter will be only saved under [default.global.parameters] in  
/Users/kyamakaw/sam/load-and-delete/samconfig.toml.                             

	Saved arguments to config file
	Running 'sam deploy' for future deployments will use the parameters saved above.
	The above parameters can be changed by modifying samconfig.toml
	Learn more about samconfig.toml syntax at 
	https://docs.aws.amazon.com/serverless-application-model/latest/developerguide/serverless-sam-cli-config.html

File with same data already exists at                                           
load-and-delete/93c5aeedad5e5506dd94db3354b56a73, skipping upload               

	Deploying with following values
	===============================
	Stack name                   : load-and-delete
	Region                       : ap-northeast-1
	Confirm changeset            : True
	Disable rollback             : False
	Deployment s3 bucket         : aws-sam-cli-managed-default-samclisourcebucket-e7cuyvsbwyhw
	Capabilities                 : ["CAPABILITY_IAM"]
	Parameter overrides          : {}
	Signing Profiles             : {}

Initiating deployment
=====================

File with same data already exists at                                           
load-and-delete/5b105969a61b42a0543907292509f35b.template, skipping upload      


Waiting for changeset to be created..

CloudFormation stack changeset
-------------------------------------------------------------------------------------------------
Operation                LogicalResourceId        ResourceType             Replacement            
-------------------------------------------------------------------------------------------------
+ Add                    HelloWorldFunctionHell   AWS::Lambda::Permissio   N/A                    
                         oWorldPermissionProd     n                                               
+ Add                    HelloWorldFunctionRole   AWS::IAM::Role           N/A                    
+ Add                    HelloWorldFunction       AWS::Lambda::Function    N/A                    
+ Add                    ServerlessRestApiDeplo   AWS::ApiGateway::Deplo   N/A                    
                         yment47fc2d5f9d          yment                                           
+ Add                    ServerlessRestApiProdS   AWS::ApiGateway::Stage   N/A                    
                         tage                                                                     
+ Add                    ServerlessRestApi        AWS::ApiGateway::RestA   N/A                    
                                                  pi                                              
-------------------------------------------------------------------------------------------------


Changeset created successfully. arn:aws:cloudformation:ap-northeast-1:010446572587:changeSet/samcli-deploy1722424715/645c64ba-bbc0-4477-90db-03dc87cd7c34


Previewing CloudFormation changeset before deployment
======================================================
Deploy this changeset? [y/N]: y

2024-07-31 20:18:44 - Waiting for stack create/update to complete

CloudFormation events from stack operations (refresh every 5.0 seconds)
-------------------------------------------------------------------------------------------------
ResourceStatus           ResourceType             LogicalResourceId        ResourceStatusReason   
-------------------------------------------------------------------------------------------------
CREATE_IN_PROGRESS       AWS::CloudFormation::S   load-and-delete          User Initiated         
                         tack                                                                     
CREATE_IN_PROGRESS       AWS::IAM::Role           HelloWorldFunctionRole   -                      
CREATE_IN_PROGRESS       AWS::IAM::Role           HelloWorldFunctionRole   Resource creation      
                                                                           Initiated              
CREATE_COMPLETE          AWS::IAM::Role           HelloWorldFunctionRole   -                      
CREATE_IN_PROGRESS       AWS::Lambda::Function    HelloWorldFunction       -                      
CREATE_IN_PROGRESS       AWS::Lambda::Function    HelloWorldFunction       Resource creation      
                                                                           Initiated              
CREATE_IN_PROGRESS       AWS::Lambda::Function    HelloWorldFunction       Eventual consistency   
                                                                           check initiated        
CREATE_IN_PROGRESS       AWS::ApiGateway::RestA   ServerlessRestApi        -                      
                         pi                                                                       
CREATE_IN_PROGRESS       AWS::ApiGateway::RestA   ServerlessRestApi        Resource creation      
                         pi                                                Initiated              
CREATE_COMPLETE          AWS::ApiGateway::RestA   ServerlessRestApi        -                      
                         pi                                                                       
CREATE_IN_PROGRESS       AWS::ApiGateway::Deplo   ServerlessRestApiDeplo   -                      
                         yment                    yment47fc2d5f9d                                 
CREATE_IN_PROGRESS       AWS::Lambda::Permissio   HelloWorldFunctionHell   -                      
                         n                        oWorldPermissionProd                            
CREATE_IN_PROGRESS       AWS::Lambda::Permissio   HelloWorldFunctionHell   Resource creation      
                         n                        oWorldPermissionProd     Initiated              
CREATE_IN_PROGRESS       AWS::ApiGateway::Deplo   ServerlessRestApiDeplo   Resource creation      
                         yment                    yment47fc2d5f9d          Initiated              
CREATE_COMPLETE          AWS::Lambda::Function    HelloWorldFunction       -                      
CREATE_COMPLETE          AWS::Lambda::Permissio   HelloWorldFunctionHell   -                      
                         n                        oWorldPermissionProd                            
CREATE_COMPLETE          AWS::ApiGateway::Deplo   ServerlessRestApiDeplo   -                      
                         yment                    yment47fc2d5f9d                                 
CREATE_IN_PROGRESS       AWS::ApiGateway::Stage   ServerlessRestApiProdS   -                      
                                                  tage                                            
CREATE_IN_PROGRESS       AWS::ApiGateway::Stage   ServerlessRestApiProdS   Resource creation      
                                                  tage                     Initiated              
CREATE_COMPLETE          AWS::ApiGateway::Stage   ServerlessRestApiProdS   -                      
                                                  tage                                            
CREATE_COMPLETE          AWS::CloudFormation::S   load-and-delete          -                      
                         tack                                                                     
-------------------------------------------------------------------------------------------------

CloudFormation outputs from deployed stack
-------------------------------------------------------------------------------------------------
Outputs                                                                                         
-------------------------------------------------------------------------------------------------
Key                 HelloWorldFunctionIamRole                                                   
Description         Implicit IAM Role created for Hello World function                          
Value               arn:aws:iam::010446572587:role/load-and-delete-HelloWorldFunctionRole-      
bHr0gamyWnao                                                                                    

Key                 HelloWorldApi                                                               
Description         API Gateway endpoint URL for Prod stage for Hello World function            
Value               https://halylnj5x6.execute-api.ap-northeast-1.amazonaws.com/Prod/hello/     

Key                 HelloWorldFunction                                                          
Description         Hello World Lambda Function ARN                                             
Value               arn:aws:lambda:ap-northeast-1:010446572587:function:load-and-delete-        
HelloWorldFunction-EX6cP40ohBwp                                                                 
-------------------------------------------------------------------------------------------------


Successfully created/updated stack - load-and-delete in ap-northeast-1

(mypy) kyamakaw@kyamakaw-mac load-and-delete % 

デプロイが完了したら、AWSコンソールからデプロイされたLambdaファンクションを確認します。
スクリーンショット 2024-07-31 20.20.33.png

Lambdaファンクションが作成できました。

4. Lambdaファンクションの設定

ファンクション名をクリックして、ファンクションの詳細を表示します。
スクリーンショット 2024-07-31 20.20.47.png

「設定」タブをクリックします。
スクリーンショット 2024-07-31 20.20.59.png

「トリガー」を選択し、「トリガーを追加」をクリックします。
スクリーンショット 2024-07-31 20.22.50.png
ソースにS3を選択、バケットにload-and-delete-bucketを指定します。
イベントタイプはデフォルトの「すべてのオブジェクト作成イベント」のみにします。
スクリーンショット 2024-07-31 20.23.10.png
入力が完了したら、「追加」をクリックしてトリガーを追加します。
スクリーンショット 2024-07-31 20.23.18.png
Lambdaファンクションにトリガーが追加されました。
スクリーンショット 2024-07-31 20.23.36.png
左側のメニューから「環境変数」を選択します。
スクリーンショット 2024-07-31 20.23.46.png
「編集」をクリックします。
スクリーンショット 2024-07-31 20.24.01.png
Lambdaファンクションの環境変数を設定します。
今回は、Autonomous Databaseに接続する際のユーザ名、パスワード、接続文字列を環境変数に設定し、Lambdaファンクションから参照します。
PYTHON_USERNAME:ユーザ名
PYTHON_PASSWORD:パスワード
PYTHON_CONNECTSTRING:接続文字列
環境変数のキーにPYTHON_USERNAME、PYTHON_PASSWORD、PYTHON_CONNECTSTRINGを追加し、それぞれの値を設定して「保存」をクリックします。
スクリーンショット 2024-07-31 20.25.03.png
環境変数が設定できました。
スクリーンショット 2024-07-31 20.25.14.png

スクリーンショット 2024-07-31 20.25.22.png

5. 動作確認

TRANCATE TABLE文でempdata表を空にします。

SQL> TRUNCATE TABLE empdata;

Table truncated.

SQL> SELECT * FROM empdata;

no rows selected

SQL>

AWSコンソールからload-and-delete-bucketにアクセスし、「アップロード」をクリックします。
スクリーンショット 2024-07-31 20.25.51.png
empdata1.csvをアップロードします
empdata1.csvの内容は以下の通りです。

empdata1.csv
1,SMITH
2,ALLEN
3,WARD

スクリーンショット 2024-07-31 20.26.06.png

スクリーンショット 2024-07-31 20.26.21.png
アップロードが完了したら、「閉じる」をクリックします。
スクリーンショット 2024-07-31 20.26.30.png
バケット内にアップロードしたはずのファイルがありません。
スクリーンショット 2024-07-31 20.31.42.png

empdata表の内容を確認してみます。

SQL> SELECT * FROM empdata;

	    ID NAME
---------- --------------------
	     1 SMITH
	     2 ALLEN
	     3 WARD

SQL>

S3にアップロードしたファイルempdata1.csvの内容がempdata表にロードされていることが確認できました。

さらにS3にファイルempdata2.csvをアップロードしてみます。
empdata2.csvの内容は以下の通りです。

empdata2.csv
4,JACKSON
5,BROWN
6,TURNER

empdata2.csvをアップロードします
スクリーンショット 2024-07-31 20.39.38.png

スクリーンショット 2024-07-31 20.39.47.png

アップロードが完了したら、「閉じる」をクリックします。
スクリーンショット 2024-07-31 20.39.56.png
先ほどと同様に、バケット内にアップロードしたはずのファイルがありません。
スクリーンショット 2024-07-31 20.40.06.png

empdata表の内容を確認してみます。

SQL> SELECT * FROM empdata;

	    ID NAME
---------- --------------------
	     1 SMITH
	     2 ALLEN
	     3 WARD
	     4 JACKSON
	     5 BROWN
	     6 TURNER

6 rows selected.

SQL>

empdata2.csvの内容がempdata表に追加されていることが確認できました。

6. まとめ

Amazon S3にファイルがアップされたことをトリガーにして、そのファイルをAutonomous Database内の表にロードし、データのロード後にロードしたファイルをS3から削除する処理を実装することができました。
今回作成したPL/SQLプロシージャload_and_deleのDELETE_OBJECTプロシージャの部分をMOVE_OBJECTに変更することで、データのロード後にロードが完了したファイルを別のバケットに移動するといったことも可能になります。

参考情報

Amazonリソース名(ARN)を使用したAWSリソースへのアクセス
AWS CLIの最新バージョンのインストールまたは更新
AWS SAM CLI のインストール
sam init コマンドを使用してアプリケーションを作成する
sam build
sam deploy
The python-oracledb driver for Oracle Database
DBMS_CLOUD.LIST_OBJECTSファンクション
DBMS_CLOUD.COPY_DATAプロシージャ
DBMS_CLOUD.DELETE_OBJECTプロシージャ

1
2
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
1
2