はじめに
Data Integration は、様々なデータアセットからデータを取り込み、データのクレンジング・変換・再形成・変換などの ETL 処理が出来るフルマネージドサービスです。サーバー管理が不要で、GUI で視覚的にわかりやすく ETL 処理を表現できます。
2020.09.13 現在、次のデータアセットに対応しています。取り込み元、書き込み先の両方に利用できます。
- Object Storage
- Oracle Database
- Autonomous Data Warehouse
- Autonomous Transaction Processing
Data Integration のチュートリアルが、Document に公開されています。Document では文章で書かれている箇所が多く、分かりにくい部分もあります。今回の記事では、このチュートリアルの内容を、スクリーンショット付きで紹介していきます。
チュートリアルURL
https://docs.cloud.oracle.com/en-us/iaas/data-integration/tutorial/tutorials/index.htm
チュートリアルの内容は次の内容を行っていきます。
- Object Storage に格納した、csv ファイル・json ファイルのデータを加工して、Autonomous Data Warehouse に格納する。
Data Integration の概念
Data Integration は、若干複雑な概念があります。チュートリアルの中で各種用語が出てくるので、どういった役割かざっくり把握しておくとよいでしょう。
公式の定義や、詳細な説明は Document に書かれています。
https://docs.cloud.oracle.com/en-us/iaas/data-integration/using/overview.htm
IAM Policy
Data Integration を実行するために、IAM Policy の設定が必要です。チュートリアル的に始める場合は、次の設定でとりあえず大丈夫です。
allow service dataintegration to use virtual-network-family in tenancy
allow service dataintegration to inspect compartments in tenancy
allow service dataintegration to inspect users in tenancy
allow any-user to use buckets in tenancy where ALL {request.principal.type='disworkspace'}
allow any-user to manage objects in tenancy where ALL {request.principal.type='disworkspace'}
allow any-user {PAR_MANAGE} in tenancy where ALL {request.principal.type='disworkspace'}
Data Integration がアクセスするリソースや許可範囲を狭くしたい場合は若干変更することも出来ます。詳細は次の Document を参照してください。
https://docs.cloud.oracle.com/en-us/iaas/data-integration/using/preparing-for-connectivity.htm#dis-access
Create Subnet
Data Integrator は、VCN 内に作ることも出来ますし、VCN 外に作ることも出来ます。一度作ると変更は出来ないため、VCN 内に作るのが良いでしょう。VCN 内のリソース・VCN 外のリソースの両方にアクセスが出来て便利です。
VCN 内に作るときは、Route Table や Security List の関係上、Data Integration 専用の Subnet を作るのが便利です。
Data Integration が Object Storage などにアクセスするため、Service Gateway を設定します。All Service 用の Service Gateway が良いです。
Data Integration の作成に関するハマりどころ
Data Integration 上で様々な物を作成・定義しますが、2020年9月13日現在、日本語を入力するとエラーになります。Task の Run や、Data を参照したときに次のエラーが出たときは、日本語の入力を全て除去してみると良いです
DIS_DATA_XPLORER_0008 - Error in getting interactive job run for opcRequestID: csidc42cde1649b58a4c7cc685fff2f4/748d205d42a94bc8b168219be93a5458, Caused by: java.nio.charset.MalformedInputException: Input length = 1.
Create Workspace
Data Integration で Workspace を作成します。ハンバーガーメニューから Data Integration を選択します。
Create Workspace を押します。
各種パラメータを入れて、Create をします。日本語などのマルチバイトを入れないように注意しましょう。
Workspace は、Subnet を指定して作ります。
Creating となります。
自分の環境では、約 15 分ほど待機すると、Active となりました。
Import Sample Data
次のチュートリアルで公開されている Sample データをダウンロードして、Object Storage の Bucket へ格納します。
https://docs.cloud.oracle.com/en-us/iaas/data-integration/tutorial/tutorials/00_get_sample_data.htm
次の URL にアクセスして、Sample Data をダウンロードします。
https://objectstorage.us-ashburn-1.oraclecloud.com/n/dispaas/b/oci-dataintegration-lab/o/CUSTOMERS.json
https://objectstorage.us-ashburn-1.oraclecloud.com/n/dispaas/b/oci-dataintegration-lab/o/REVENUE.csv
適当に Object Storage の Bucket を作成して、ダウンロードしてきた 2個のファイルをアップロードします。この時に namespace 名も後から使うので、コピーしておくと良いです。
- namespace :
idclaqkuq6tu
Connect to Data Sources
データを読み込む元と、データを書き込む先の定義をしていきます。
この手順に関するチュートリアルは、つぎの Document に書かれています。
https://docs.cloud.oracle.com/en-us/iaas/data-integration/tutorial/tutorials/02-connecting-to-data-sources.htm
Creating the Source Data Asset
作成した Workspace の詳細画面へ移動
Data Assets を選択
Create Data Assets
Type で選択可能なものは次の 4種類です。
- Oracle Object Storage
- Oracle Database
- Oracle Autonomous Data Warehouse
- Oracle Autonomous Transaction Processing
今回は、Oracle Object Storage と、Oracle Autonomous Data Warehouse を定義します。各項目に日本語などのマルチバイトを入力すると、エラーになるので注意。英語を入力しましょう
入力するパラメータ例
- Name :
Data_Lake
- Description : なし
- URL :
https://objectstorage.ap-tokyo-1.oraclecloud.com
- Object Storage Namespace :
idclaqkuq6tu
- Tenancy OCID : 自分の OCID
- Test Connection : 押して Successful になるか確認
Create した Data Asset のページに自動的に移動
Bucket の欄も見えます
Preparing the Target Database
Autonomous Data Warehouse 側で、ETL で準備したデータを書き込むときに使う、ユーザーとテーブルを準備します。
SQL Developer Web を開きます。
admin で入る
user を作成。パスワードは適当に変更します。
create user beta identified by "mIfoO8_fai12#-gai897fao";
grant DWROLE to BETA;
alter user BETA quota 200M on data;
実行
Table 作成
CREATE TABLE "BETA"."CUSTOMERS_TARGET"
("CUST_ID" NUMBER,
"LAST_NAME" VARCHAR2(200 BYTE),
"FIRST_NAME" VARCHAR2(200 BYTE),
"FULL_NAME" VARCHAR2(200 BYTE),
"STREET_ADDRESS" VARCHAR2(400 BYTE),
"POSTAL_CODE" VARCHAR2(10 BYTE),
"CITY_ID" NUMBER,
"CITY" VARCHAR2(100 BYTE),
"STATE_PROVINCE_ID" NUMBER,
"STATE_PROVINCE" VARCHAR2(100 BYTE),
"COUNTRY_ID" NUMBER,
"COUNTRY" VARCHAR2(400 BYTE),
"CONTINENT_ID" NUMBER,
"CONTINENT" VARCHAR2(400 BYTE),
"AGE" NUMBER,
"COMMUTE_DISTANCE" NUMBER,
"CREDIT_BALANCE" NUMBER,
"EDUCATION" VARCHAR2(40 BYTE),
"EMAIL" VARCHAR2(416 BYTE),
"FULL_TIME" VARCHAR2(40 BYTE),
"GENDER" VARCHAR2(6 BYTE),
"HOUSEHOLD_SIZE" NUMBER,
"INCOME" NUMBER,
"INCOME_LEVEL" VARCHAR2(20 BYTE),
"INSUFF_FUNDS_INCIDENTS" NUMBER,
"JOB_TYPE" VARCHAR2(200 BYTE),
"LATE_MORT_RENT_PMTS" NUMBER,
"MARITAL_STATUS" VARCHAR2(8 BYTE),
"MORTGAGE_AMT" NUMBER,
"NUM_CARS" NUMBER,
"NUM_MORTGAGES" NUMBER,
"PET" VARCHAR2(40 BYTE),
"PROMOTION_RESPONSE" NUMBER,
"RENT_OWN" VARCHAR2(40 BYTE),
"SEG" NUMBER,
"WORK_EXPERIENCE" NUMBER,
"YRS_CURRENT_EMPLOYER" NUMBER,
"YRS_CUSTOMER" NUMBER,
"YRS_RESIDENCE" NUMBER,
"COUNTRY_CODE" VARCHAR2(2 BYTE),
"ORDER_NUMBER" NUMBER,
"REVENUE" NUMBER
) SEGMENT CREATION IMMEDIATE
PCTFREE 10 PCTUSED 40 INITRANS 1 MAXTRANS 255
NOCOMPRESS LOGGING
STORAGE(INITIAL 65536 NEXT 1048576 MINEXTENTS 1 MAXEXTENTS 2147483645
PCTINCREASE 0 FREELISTS 1 FREELIST GROUPS 1
BUFFER_POOL DEFAULT FLASH_CACHE DEFAULT CELL_FLASH_CACHE DEFAULT)
TABLESPACE "USERS" ;
--------------------------------------------------------
-- Constraints for Table CUSTOMERS_TARGET
--------------------------------------------------------
ALTER TABLE "BETA"."CUSTOMERS_TARGET" MODIFY ("CUST_ID" NOT NULL ENABLE);
ALTER TABLE "BETA"."CUSTOMERS_TARGET" MODIFY ("SEG" NOT NULL ENABLE);
実行
Creating the Target Data Asset
ADW を Asset として登録します
入力するパラメータ
- Name :
Data_Warehouse
- Description : 空白
- Type :
Oracle Autonomous Data Warehouse
- Wallet File : ダウンロードして貼り付け
- Service Name : 適切に指定
- Name : 任意の名前
- Username : ADW 上で作成したユーザー名を指定
- Password : ユーザーのパスワードを指定
- Test Connection : 成功を確認
自動的にページが変わります
BETA Schemas が見える
作った Table が見える
作ったテーブルの構造も見える
Ingest and Transform Data Using a Data Flow
ETL の流れを Data Flow で定義します。Document はこちら
https://docs.cloud.oracle.com/en-us/iaas/data-integration/tutorial/tutorials/03-ingest-and-transform-data-using-a-data-flow.htm
Creating a Project and a Data Flow
まず Project を作成
Create Project
名前を入れます。各項目に日本語などのマルチバイトを入力すると、エラーになるので注意。英語を入力しましょう
作成される
Create Data Flow
ETL の流れを表現する、Data Flow の名前を入れます。
Name : Load Customers and Revenue Data
と入れて、Save を押します。
Adding Source Operators
データソースとして、Data Asset で定義した Object Storage を持ってきます。左のところにあるアイコンを拡大
次の6種類のアイコンがあります
- Inputs/Outputs
- Source : データの読み込み元
- Target : データの書き込み先
- Shaping
- Filter : フィルター
- Join : 結合
- Expression : 新たな列を作成
- Aggregate : 集合処理
Source をドラッグドロップして、右クリックし、Details を押します。
Data Entity から、Select を押します。
各種パラメータを入力します。
名前変更
Attributes の Tab から、先ほど指定した JSON データがどのような属性を持っているか確認できます。
また、実際のデータの中身も確認できます。各種 ETL 処理を組み立てるときに、リアルタイムに中身を見れるので便利です。
同様の作業を実施して、REVENUE.csv を定義する
なお、csv の場合は、ヘッダーの有無、エスケープ文字や、分割文字などが指定可能
Filtering and Transforming Data
次に、データのフィルター処理を行います。Filter Icon をドラッグアンドドロップで置いて、マウスで線を引きます。
Filter Icon をクリックして、Filter Condition を Create します
Filter Condition 画面が開く
sta を入れて、ORDER_STATUS
を表示
ドラッグアンドドロップで右側にもっていく
次の文字列に変更します。1-Booked
以外の行をフィルターで除外する設定をいれて、Create を押します。
FILTER_1.REVENUE_CSV.ORDER_STATUS='1-Booked'
Filter Icon を選択してData Tab を見ると、Filter 後のデータが表示されています。1-Booked
のデータしか存在していないため、Filter されていることが分かります。
同様に Customers にも Filter を追加して、Condition の Create を押します
COUNTRY_CODE
を右側に持って行き、US
を指定して、Create を押します。
FILTER_2.CUSTOMERS_JSON.COUNTRY_CODE='US'
Data Tab を見ると、US で Filter されています。
Expression を追加して、Customer の Filter に繋げて、Add を押します。
Expression は、新しい列を生成する処理です。Firstname と Lastname をくっつけて、Fullname を生成します。
- Identifier : 新たに生成する列の名前
- Data Type : 新たに生成する列の Type
- Length : 新たに生成する列の長さ
- Functions から、Concat を見つける
右側に持ってくる
次の文字列を指定して、Fisrtname, Lastname を空白で結合します。
CONCAT(CONCAT(EXPRESSION_1.CUSTOMERS_JSON.FIRST_NAME, ' '), EXPRESSION_1.CUSTOMERS_JSON.LAST_NAME)
Data Tab を見ることで、Full Name が追加されていることが分かります。
Joining Data
2つのデータを結合します。JOIN Icon を追加して、Condition を Create
JOSIN に使う条件を指定して、create をします。CUST_ID と CUST_KEY が両方一致しているもののみ、JOIN します。
JOIN_1_1.CUSTOMERS_JSON.CUST_ID=JOIN_1_2.REVENUE_CSV.CUST_KEY
Data Tab で Join 後のデータが見えます。
Adding a Target Operator
Target Icon を追加して、データ処理したあとに ADW にデータを書き込みます。Integration Strategy を Insert にしているため、単純に Table に行を追加する動きになります。
Data Entity から、Select を押します。
Data Assets に定義した ADW の情報を引っ張ってきて、Select を押します。どこのスキーマの、どこのテーブルに書き込むのか指定しています。
Target Icon を選択したあと、Attributes Tab で、Autonomous Database (Target) 側で持っている Table が表示されています。 Exclude することも可能。
Map Tab を見ると、ADW 側のテーブルで持っている 42列のうち、Map 済み 41、Map 無し 1 とわかります。Map 無しの部分はデータが格納されないため、要確認です。黄色背景のところが空白になっており、Mapping されていないことがわかります。
左の FULLNAME から、右の FULL_NAME へドラッグすることで、手動でマップできます。
また、Filter を Attributes not mapped にすることで、Map されていない列のみ表示が出来ます。
データ加工処理の設定が終わったため、一旦 Validate を実行します。何かしらエラーが出ていないか確認します。
Error が無いことを確認出来ました。
Save and Close で保存して閉じます。
Create an Integration Task to Configure and Run a Data Flow
Data Flow の定義が出来たので、実際に処理として動かしていきます。Document はこちら
https://docs.cloud.oracle.com/en-us/iaas/data-integration/tutorial/tutorials/04-define-an-integration-task-to-configure-and-run-a-data-flow.htm
Creating an Integration Task
Projects を開きます。
Dl_Lab を選択します。
Tasks から、Integration タスクを定義します。これはまだ実行ではなく、Task の定義だけです。今回は取り上げませんが、Integration Task を使うと、Data Flow の各処理にパラメータを挿入できます。また別途紹介します。
Name : Load Customers Lab
を入れたあとに、Select で作った Data Flow を指定します。
作ったものを選択します。
Validate 状態が Successful を確認して、Save します。
Integration Task が、Task として定義されました。
Creating an Application
定義した Integration Task を実行するために、Application という枠組みを定義します。Application を押します。
Create Application
Name : Lab Application
を入れて、Create します。
Publishing a Task to an Application
Project の Tasks から、Application の Tasks へ Publish します。Publish しただけでは、Task は実行されません。Task の内容がコピーされるイメージです。Application 上で Task を実行する仕組みとなっており、このような操作が必要です。
おそらく、Data Flow 定義者と、Task 実行者の権限を分けたときに、分かりやすい構成になることを意図していると想像しています。
Publish を押します。
View を押すと、Application 画面に移動します。
Application の Tasks に登録した Task があります。Run を押すと、Task を実行できます。
Loading... と10秒ごど表示されたあとに、右上に成功と表示されます。
画面を更新すると、Queued となっています。
画面を更新すると、Running となっています。
Success となりました。次の情報が表示されています。
- Insert した行
- Data Read の容量
- 実行時間
memo : Errorになったときは、Log を出せます
エラーログがざっと表示されます。
ADW の確認
Data Integration で Task 実行の結果、ADW のテーブルにデータが格納されています。
Task Monitor
Task がどれだけ成功して、失敗したか確認できる、Monitor 用の画面があります。
参考URL
OCI Document
https://docs.cloud.oracle.com/en-us/iaas/data-integration/using/index.htm
Service Tour (Movie)
https://docs.cloud.oracle.com/en-us/data-integration/service-tour/index.html
Data Flow Tour (Movie)
https://docs.cloud.oracle.com/en-us/data-integration/dataflow-tour/index.html
PM Blogs
https://blogs.oracle.com/dataintegration/oracle-cloud-infrastructure-data-integration