はじめに
以前、Data Integrationの機能を使ってOCIオブジェクトストレージのデータを
ADWにインポートするというブログを書きましたが、その応用編として
AWS S3のデータをインポートするというのを試してみたいと思います。
■以前のブログ
今回はS3にアップロードされたjsonファイル(salesデータ)をData Integrationを
使ってダウンロードしてきて、ファイルの中身を加工してからADWにインポートします。
*前回からの変更点
・S3に接続するためのデータアセットの設定
・Order IDやタイムスタンプ等、jsonデータに存在しない値の設定
基本的にはS3用のデータアセットが追加になるぐらいで、
あとはデータフロー部分の処理が少し追加になる程度の変更です。
■事前準備
・アカウント
Data Integration, ADW, オブジェクトストレージが使用できるOCIアカウントを準備
AWS側も同様にS3にアクセス可能なアカウントを準備
・アクセスキー
S3側でアクセスキーを作成
・ADW
事前にインスタンスを作成
・オブジェクトストレージ / S3
OCI, AWSとも事前にバケットを作成
・ネットワークの設定
今回は特にVCNを利用しません。
※OCI関連の設定について、詳細を確認したい場合は前回のブログを参考にしてください。
◆目次
1.jsonファイルのアップロード
2.スキーマ/テーブル作成
3.Data Integration設定
4.データ・フローの実行テスト
1. jsonファイルのアップロード
まず初めに、S3にアップロードするjsonファイルを準備します。
sales.json
[
{
"id": 1,
"unit price": 200,
"quantity": 10
},
{
"id": 2,
"unit price": 350,
"quantity": 50
},
{
"id": 3,
"unit price": 1200,
"quantity": 200
},
{
"id": 4,
"unit price": 3000,
"quantity": 5
},
{
"id": 5,
"unit price": 5000,
"quantity": 3
}
]
AWSのコンソールにアクセスし、S3を選択して、対象のバケット内に
sales.jsonファイルをアップロード

以上でjsonファイルのアップロードは終了です。
2.スキーマ/テーブル作成
続いてスキーマ/テーブルを作成しますので、
ADWの詳細ページからデータベース・アクションのSQLを選択

SQLが実行可能なワークシートが表示されるので
ADMINでスキーマ(diuser)を作成

・スキーマ(ユーザ)作成SQL
CREATE USER diuser IDENTIFIED BY "<パスワード>";
GRANT CREATE SESSION TO diuser;
GRANT CREATE TABLE TO diuser;
GRANT UNLIMITED TABLESPACE TO diuser;
・テーブル作成SQL
create table DIUSER.tblSales
(
id number generated always as identity primary key,
order_id varchar2(50),
unit_price number(10, 2),
sales_quantity number(10, 0),
sales_amount number(10, 0),
file_name varchar2(255),
create_timestamp timestamp
)
※id列は自動採番になっています。
以上でスキーマ/テーブル作成は終了です。
3.Data Integration設定
続いてData Integrationの設定となります。
構成については以下のようなイメージとなります。
※今回オブジェクトストレージは直接的に使用しないのですが、
Data Integrationが内部的に使用するため
オブジェクトストレージもデータアセットとして
作成しておく必要があります。
■機能概要
・ワークスペース
Data Integration全体を管理する仮想スペース
・プロジェクト
タスク(データフロー等)の設計のまとまり
※プログラムのソースコードのようなもの
・アプリケーション
タスクの実行環境
※プログラムの実行ファイルのようなもの
・データアセット
データソース、ターゲットに対する接続ポイント
・手順の流れ
[1] ワークスペースの作成
[2] アプリケーションの作成
[3] データアセットの作成
- 一時保管用[オブジェクトストレージ]の接続ポイント作成
- ターゲット[ADW]の接続ポイント作成
- データソース[S3]の接続ポイント作成
[4] プロジェクトの作成
- データフローの作成
- タスクの作成
[5] プロジェクトタスクのアプリケーションへのPublish(公開)
それでは早速、Data Integrationの設定に入ります。
[1] ワークスペースの作成
左上メニュー[Ξ]からアナリティクスとAI → データ統合を選択

プライベート・ネットワークの有効化のチェックを外して、作成を選択

※注意事項
Data IntegrationからS3にアクセスする時、Internet Gateway経由では
通信できない仕様(?)となっているため注意が必要です。
ワークスペースをVCNに配置しない:S3アクセス可
ワークスペースをVCN Public Subnetに配置 [Internet Gateway経由]:S3アクセス不可
ワークスペースをVCN Private Subnetに配置 [NAT Gateway経由]:S3アクセス可
上記のことから今回はVCN上に配置しない構成としています。
[2] アプリケーションの作成
ワークスペースが作成されると一覧の表示されるので
名前のリンクを選択

[3] データアセットの作成
- 一時保管用[オブジェクトストレージ]の接続ポイント作成
- ターゲット[ADW]の接続ポイント作成
任意の名前をつけて、タイプがOracle Autonomous Data Warehouseに
なっていることを確認
ここではDA_ConADW01

ユーザ名 [admin] /パスワードを入力し、TNSを選択

下記の項目を入力・選択し、接続テストで問題がないことを確認して
作成を選択
オブジェクト・ストレージ・データ・アセット: 前段で設定したアセット (DA_ConOS01)
接続: デフォルト
コンパートメント: 任意
バケット: 事前に作成しておいたバケット(ここではbucketDI)

- データソース[S3]の接続ポイント作成
以下項目を入力し、下にスクロール
名前: (任意) ここではDA_ConS3
タイプ: Amazon S3
リージョン: (バケットのリージョン) ここではap-northeast-1(東京リージョン)
アカウントID: AWSのアカウントID

以下の通り設定し、接続のテストが成功したら作成を選択
名前: デフォルト接続
識別子: DEFAULT_CONNECTION
アクセス・キー: AWSのアクセスキー
秘密キー: AWSのシークレットアクセスキー

以上でオブジェクトストレージ、ADW、S3のデータアセットができました。
[4] プロジェクトの作成
・データ・フロー
データフローが起動したら、SOURCE, EXPRESSION, TARGETを配置後にリンク

SOURCEのプロパティの詳細を開き、以下の通り設定
データ・アセット: CA_ConS3
接続: デフォルト接続
スキーマ: S3のバケット
データ・エンティティ: sales.json ※ファイル形式はjsonを選択

拡張オプションタブのファイル・メタデータを属性としてフェッチオプションも選択
※ファイル名を取得する際に必要なオプション

次にEXPRESSIONのプロパティの詳細を開き、式の追加を選択

・ORDER_IDの設定
式の追加では以下を入力
識別子: EXPRESSION_ORDER_ID
データ型: VARCHAR
長さ: 50 (長さは状況に応じて変更)

式ビルダーでは以下の項目を右ペインにドラッグ&ドロップ
ファンクション - Unique ID - UUID()
検証して問題なければ追加を選択

・SALES_AMOUNTの設定
再度式の追加で以下を入力
識別子: EXPRESSION_SALES_AMOUNT
データ型: INTEGER

式ビルダーでは以下の項目を右ペインにドラッグ&ドロップ
受信 - SALES_JSON - unit price & quantity

単価(unit price)× 数量(quantity)= 売上(SALES_AMOUNT)を算出
EXPRESSION_1.SALES_JSON.unit_price * EXPRESSION_1.SALES_JSON.quantity
上記、検証して問題なければ追加を選択
・FILE_NAMEの設定
続けて式の追加で以下を入力
識別子: EXPRESSION_FILE_NAME
データ型: VARCHAR
長さ: 50 (長さは状況に応じて変更)

式ビルダーでは以下の項目を右ペインにドラッグ&ドロップ
受信 - SALES_JSON - file_name
検証して問題なければ追加を選択

・CREATE_TIMESTAMPの設定
最後の式の追加で以下を入力
識別子: EXPRESSION_CREATE_TIMESTAMP
データ型: TIMESTAMP

式ビルダーでは以下の項目を右ペインにドラッグ&ドロップ
ファンクション - Date/Time - CURRENT_TIMESTAMP
検証して問題なければ追加を選択

データフローの最後ステップとしてTARGETのプロパティの詳細を開き、以下の通り設定
データ・アセット: CA_ConADW
接続: デフォルト接続
スキーマ: DIUSER (事前に作成しておいたスキーマ)
データ・エンティティ: TBLSALES(事前に作成しておいたテーブル)

マップタブを開くと左側にjsonファイルや先程設定したEXPRESSIONの項目が
表示されているので、ドラッグ&ドロップで右側のTBLSALESとマッピングさせる
※既に自動マッピングされているものはそのままにするか、クリアして再マッピング

最終的に以下のようにマッピング
・TBLSALES
ID: マッピングしない(テーブルで自動採番されるため)
ORDER_ID: EXPRESSION_ORDER_ID
UNIT_PRICE: unit_price
SALES_QUANTITY: quantity
SALES_AMOUNT: EXPRESSION_SALES_AMOUNT
FILE_NAME: file_name
CREATE_TIMESTAMP: EXPRESSION_CREATE_TIMESTAMP
最後に検証をして問題がなければデータフローの作成は完了です。
保存して終了します。

一覧のタスクの右端にあるプロパティからアプリケーションに公開を選択

事前に作成したアプリケーション(di_ap01)を選択して公開を選択
しばらくするとアプリケーションに公開される

以上でData Integration設定は終了です。
4.データ・フローの実行テスト
最後にData Integration設定が正常にできているか確認をします。
※もちろんスケジュールに登録すれば定期的に実行させることも可能です。
最後にデータが登録されているかテーブルを検索
select * from DIUSER.TBLSALES;

以上でデータ・フローの実行テストは終了です。
おわり
以上、Data Integrationを使えばS3とのデータ連携や
データの加工も比較的簡単にできることが分かりました。


























