はじめに
前回の記事では、Data Integration に Parameter を指定して、OCI CLI で実行する方法を紹介しました。今回は、OCI CDK から実行する方法を紹介します。Object Storage にファイルが置かれたことを検知して、Oracle Functions から タスクを起動するときには、基本的には SDK を使っていくのが便利なので、この手順を検証しました。
今回は2 種類の言語で、Task を実行する方法を紹介します
- Go言語
- Python
Go言語
まずは、ソースコードを全て載せます。のちほど、ポイントを紹介します。GitHub でも Sample コードとして載せています。よろしければ次のURLからどうぞ。
https://github.com/Sugi275/oci-dataintegration-taskrun-sample/blob/master/main.go
package main
import (
"context"
"fmt"
"github.com/Sugi275/oci-dataintegration-taskrun-sample/loglib"
"github.com/oracle/oci-go-sdk/common"
"github.com/oracle/oci-go-sdk/common/auth"
"github.com/oracle/oci-go-sdk/dataintegration"
"github.com/google/uuid"
)
func main() {
loglib.InitSugar()
defer loglib.Sugar.Sync()
fmt.Println("Process Start!")
dataIntegrationClient, err := getDataIntegrationClient()
if err != nil {
loglib.Sugar.Error(err)
return
}
createTaskRun(dataIntegrationClient)
fmt.Println("Process End!")
}
func getDataIntegrationClient() (client dataintegration.DataIntegrationClient, err error) {
var dataIntegrationClient dataintegration.DataIntegrationClient
provider, err := auth.InstancePrincipalConfigurationProvider()
if err != nil {
loglib.Sugar.Error(err)
return dataIntegrationClient, err
}
dataIntegrationClient, err = dataintegration.NewDataIntegrationClientWithConfigurationProvider(provider)
if err != nil {
loglib.Sugar.Error(err)
return dataIntegrationClient, err
}
return dataIntegrationClient, nil
}
func createTaskRun(dataIntegrationClient dataintegration.DataIntegrationClient) {
var rootObjectValueInterface interface{}
rootObjectValueInterface = map[string]interface{}{
"dataFormat": map[string]interface{}{
"formatAttribute": map[string]interface{}{
"delimiter": ",",
"encoding": "UTF-8",
"escapeCharacter": "\\",
"hasHeader": "true",
"modelType": "CSV_FORMAT",
"quoteCharacter": "\"",
"timestampFormat": "yyyy-MM-dd HH:mm:ss.SSS",
},
"type": "CSV",
},
"entity": map[string]interface{}{
"key": "dataref:4081caef-488a-440b-9d76-871912c4d3f0/input/FILE_ENTITY:paramtest02.csv",
"modelType": "FILE_ENTITY",
"objectStatus": 1,
},
"modelType": "ENRICHED_ENTITY",
}
// 上記複雑な変数は、次のJSONオブジェジェクトを生成するために必要なもの。OCI SDK の インターフェース上、このような指定方法となる
// {
// "dataFormat": {
// "formatAttribute": {
// "delimiter": ",",
// "encoding": "UTF-8",
// "escapeCharacter": "\\",
// "hasHeader": "true",
// "modelType": "CSV_FORMAT",
// "quoteCharacter": "\"",
// "timestampFormat": "yyyy-MM-dd HH:mm:ss.SSS"
// },
// "type": "CSV"
// },
// "entity": {
// "key": "dataref:4081caef-488a-440b-9d76-871912c4d3f0/input/FILE_ENTITY:paramtest03.csv",
// "modelType": "FILE_ENTITY",
// "objectStatus": 1
// },
// "modelType": "ENRICHED_ENTITY"
// }
//
parameterValue := map[string]dataintegration.ParameterValue{
"INPUT_OBJECT_NAME": {RootObjectValue: &rootObjectValueInterface},
}
configProvider := dataintegration.CreateConfigProvider{
Bindings: parameterValue,
}
uuidString, err := uuid.NewRandom()
if err != nil {
loglib.Sugar.Error(err)
return
}
createTaskRunDetails := dataintegration.CreateTaskRunDetails{
Key: common.String(uuidString.String()), // Task Run Key に、UUID を与えて、一意の文字列にする
RegistryMetadata: &dataintegration.RegistryMetadata{AggregatorKey: common.String("f00c0f5c-da6d-4756-9fad-05b30840b181")}, // Application に Publish している Task の Task Key を指定
ConfigProvider: &configProvider,
}
createTaskRequest := dataintegration.CreateTaskRunRequest{
WorkspaceId: common.String("ocid1.disworkspace.oc1.ap-tokyo-1.amaaaaaassl65iqa4726obzimlzokp4p2tscrb3ykye2xin4ltwdnf5ioh4q"),
ApplicationKey: common.String("2848c0c8-8400-4ac2-8d75-bcf38ad7c9b2"),
CreateTaskRunDetails: createTaskRunDetails,
}
response, err := dataIntegrationClient.CreateTaskRun(context.Background(), createTaskRequest)
if err != nil {
loglib.Sugar.Error(err)
return
}
fmt.Println(response)
}
main 関数から、getDataIntegrationClient
と createTaskRun
の2つの関数を呼び出しています。最初の getDataIntegrationClient
は見ればわかる通り、Data Integration の Client を生成しています。認証は Instance Principal で行っています。秘密鍵の設定とかは面倒なので、Compute Instance に API の実行権限を付与しています。
func getDataIntegrationClient() (client dataintegration.DataIntegrationClient, err error) {
var dataIntegrationClient dataintegration.DataIntegrationClient
provider, err := auth.InstancePrincipalConfigurationProvider()
if err != nil {
loglib.Sugar.Error(err)
return dataIntegrationClient, err
}
dataIntegrationClient, err = dataintegration.NewDataIntegrationClientWithConfigurationProvider(provider)
if err != nil {
loglib.Sugar.Error(err)
return dataIntegrationClient, err
}
return dataIntegrationClient, nil
}
次に createTaskRun
関数です。データ加工タスクを起動している本命の部分は、この1行です。第二引数の createTaskRequest
変数を作成する部分が、コードの大多数を占めています。
response, err := dataIntegrationClient.CreateTaskRun(context.Background(), createTaskRequest)
各種パラメータをどこから引っ張ってくるかは、前回の OCI CLI の実行方法に書かれています。特筆すべきは、次の変数です。map 変数 rootObjectValueInterface
を定義しています。Key は string
で、Value が interface{}
です。これらの変数の要素が、JSON として変換されるため、このような複雑な構成になっています。
var rootObjectValueInterface interface{}
rootObjectValueInterface = map[string]interface{}{
"dataFormat": map[string]interface{}{
"formatAttribute": map[string]interface{}{
"delimiter": ",",
"encoding": "UTF-8",
"escapeCharacter": "\\",
"hasHeader": "true",
"modelType": "CSV_FORMAT",
"quoteCharacter": "\"",
"timestampFormat": "yyyy-MM-dd HH:mm:ss.SSS",
},
"type": "CSV",
},
"entity": map[string]interface{}{
"key": "dataref:4081caef-488a-440b-9d76-871912c4d3f0/input/FILE_ENTITY:paramtest02.csv",
"modelType": "FILE_ENTITY",
"objectStatus": 1,
},
"modelType": "ENRICHED_ENTITY",
}
map の中に、いくつかの map を入れているため、かなり複雑な構成となっています。これによって生成されるJSONがこのような形です。このあたりの定義方法に注意して、プログラミングを組み立てていくとよいです。
// 上記複雑な変数は、次のJSONオブジェジェクトを生成するために必要なもの。OCI SDK の インターフェース上、このような指定方法となる
// {
// "dataFormat": {
// "formatAttribute": {
// "delimiter": ",",
// "encoding": "UTF-8",
// "escapeCharacter": "\\",
// "hasHeader": "true",
// "modelType": "CSV_FORMAT",
// "quoteCharacter": "\"",
// "timestampFormat": "yyyy-MM-dd HH:mm:ss.SSS"
// },
// "type": "CSV"
// },
// "entity": {
// "key": "dataref:4081caef-488a-440b-9d76-871912c4d3f0/input/FILE_ENTITY:paramtest03.csv",
// "modelType": "FILE_ENTITY",
// "objectStatus": 1
// },
// "modelType": "ENRICHED_ENTITY"
// }
//
Python
次に Python のソースコードを紹介します。Go言語のものと比べて、いくらかシンプルですね。今回の用途では、Python の方が書きやすいと思います。
# ! /usr/bin/env python3
# -*- coding: utf-8 -*-
from sys import meta_path
import oci
import time
import calendar
from oci.data_integration.models import key
from oci.data_integration.models import registry_metadata
from oci.data_integration.models import config_provider
def createTaskRun():
print("Start createTaskRun")
# By default this will hit the auth service in the region returned by http://169.254.169.254/opc/v2/instance/region on the instance.
signer = oci.auth.signers.InstancePrincipalsSecurityTokenSigner()
dataIntegrationClient = oci.data_integration.DataIntegrationClient(
config={'region': 'ap-tokyo-1'}, signer=signer)
workspaceID = "ocid1.disworkspace.oc1.ap-tokyo-1.amaaaaaassl65iqa4726obzimlzokp4p2tscrb3ykye2xin4ltwdnf5ioh4q"
applicationKey = "2848c0c8-8400-4ac2-8d75-bcf38ad7c9b2"
taskKey = "f00c0f5c-da6d-4756-9fad-05b30840b181"
genkey = str(calendar.timegm(time.gmtime()))
metadataRegistry = oci.data_integration.models.RegistryMetadata(
aggregator_key=taskKey)
configProvider = {
"bindings": {
"INPUT_OBJECT_NAME": {
"rootObjectValue": {
"dataFormat": {
"formatAttribute": {
"delimiter": ",",
"encoding": "UTF-8",
"escapeCharacter": "\\",
"hasHeader": "true",
"modelType": "CSV_FORMAT",
"quoteCharacter": "\"",
"timestampFormat": "yyyy-MM-dd HH:mm:ss.SSS"
},
"type": "CSV"
},
"entity": {
"key": "dataref:4081caef-488a-440b-9d76-871912c4d3f0/input/FILE_ENTITY:paramtest02.csv",
"modelType": "FILE_ENTITY",
"name": "paramtest02.csv",
"resourceName": "FILE_ENTITY:paramtest02.csv"
},
"modelType": "ENRICHED_ENTITY"
}
}
}
}
createTaskRunDetails = oci.data_integration.models.CreateTaskRunDetails(
key=genkey, registry_metadata=metadataRegistry, config_provider=configProvider)
taskRunResponse = dataIntegrationClient.create_task_run(
workspaceID, applicationKey, create_task_run_details=createTaskRunDetails)
print("HTTP Response Code : " + taskRunResponse.status)
if __name__ == '__main__':
print("Start Main")
createTaskRun()
print("End Main")
いくつかポイントを紹介します。Python で Instance Pricipal を使っているのがここです。signer を生成して、Data Integration の Client へ与えています。
signer = oci.auth.signers.InstancePrincipalsSecurityTokenSigner()
dataIntegrationClient = oci.data_integration.DataIntegrationClient(
config={'region': 'ap-tokyo-1'}, signer=signer)
余談 : SDK Debug について
Go言語のソースコードで紹介していた、次の複雑な Map 変数があります。この複雑な Map 変数にたどり着くまでに、色々苦労しました。
func createTaskRun(dataIntegrationClient dataintegration.DataIntegrationClient) {
var rootObjectValueInterface interface{}
rootObjectValueInterface = map[string]interface{}{
"dataFormat": map[string]interface{}{
"formatAttribute": map[string]interface{}{
"delimiter": ",",
"encoding": "UTF-8",
"escapeCharacter": "\\",
"hasHeader": "true",
"modelType": "CSV_FORMAT",
"quoteCharacter": "\"",
"timestampFormat": "yyyy-MM-dd HH:mm:ss.SSS",
},
"type": "CSV",
},
"entity": map[string]interface{}{
"key": "dataref:4081caef-488a-440b-9d76-871912c4d3f0/input/FILE_ENTITY:paramtest02.csv",
"modelType": "FILE_ENTITY",
"objectStatus": 1,
},
"modelType": "ENRICHED_ENTITY",
}
はじめは、次のように、JSON 文字列をそのまま指定していました。
var rootObjectValueInterface interface{}
rootObjectValueInterface = `{"dataFormat":{"formatAttribute":{"delimiter":",","encoding":"UTF-8","escapeCharacter":"\\","hasHeader":"true","modelType":"CSV_FORMAT","quoteCharacter":"\"","timestampFormat":"yyyy-MM-dd HH:mm:ss.SSS"},"type":"CSV"},"entity":{"key":"dataref:4081caef-488a-440b-9d76-871912c4d3f0/input/FILE_ENTITY:paramtest05.csv","modelType":"FILE_ENTITY","objectStatus":1},"modelType":"ENRICHED_ENTITY"}`
が、このままだと、どうしても正常に ConfigProvider が動かせませんでした。OCI SDK が、interface{}
を渡す設計のため、なんでも渡せるがゆえに、何を渡せば正解なのかがわかりません。HTTP Request の Body を見れば、悪いところが分かるのですが、HTTPS で通信しているため、tcpdump コマンドでパケットを解析しても、暗号化されていてよくわかりません。
こういう時に使えるのが、OCI SDK の Debug 方法です。OCI SDK によって色々設定方法はあるのですが、Go言語の SDK では、環境変数 OCI_GO_SDK_DEBUG
を設定すれば、SDK 側のデバッグ情報が出せます。次の URL で公開されています。
fish shell での環境変数設定例はこちらです。v
を Value に指定することで、verbose な情報を出力できます。
set -x OCI_GO_SDK_DEBUG v
この指定の結果、実行ログに、ConfigProvider の中身が出力されていることがわかりました。エスケープ文字 "/" が含まれており、なにやら正常なJSONではなさそうなことが分かり、試行錯誤して Map 変数へたどり着けました。
> go run main.go
...snip...
{"configProvider":{"bindings":{"INPUT_OBJECT_NAME":{"rootObjectValue":"{ \"dataFormat\": { \"formatAttribute\": { \"delimiter\": \",\", \"encoding\": \"UTF-8\", \"escapeCharacter\": \"\\\\\", \"hasHeader\": \"true\", \"modelType\": \"CSV_FORMAT\", \"quoteCharacter\": \"\\\"\", \"timestampFormat\": \"yyyy-MM-dd HH:mm:ss.SSS\" }, \"type\": \"CSV\" }, \"entity\": { \"key\": \"dataref:4081caef-488a-440b-9d76-871912c4d3f0/input/FILE_ENTITY:paramtest05.csv\", \"modelType\": \"FILE_ENTITY\", \"objectStatus\": 1 }, \"modelType\": \"ENRICHED_ENTITY\" }"}}},"key":"e61b403c-9c0f-4a89-87a2-11b6b0b38478","registryMetadata":{"aggregatorKey":"f00c0f5c-da6d-4756-9fad-05b30840b181"}}
...snip...
参考URL
Integration Tasks の紹介
https://blogs.oracle.com/dataintegration/integration-tasks-in-oracle-cloud-infrastructure-oci-data-integration
PM Blogs
https://blogs.oracle.com/dataintegration/oracle-cloud-infrastructure-data-integration