1
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 3 years have passed since last update.

[Oracle Cloud] Data Integration で OCI SDK を使って、Parameter 付きタスクを実行してみた

Posted at

はじめに

前回の記事では、Data Integration に Parameter を指定して、OCI CLI で実行する方法を紹介しました。今回は、OCI CDK から実行する方法を紹介します。Object Storage にファイルが置かれたことを検知して、Oracle Functions から タスクを起動するときには、基本的には SDK を使っていくのが便利なので、この手順を検証しました。

今回は2 種類の言語で、Task を実行する方法を紹介します

  • Go言語
  • Python

Go言語

まずは、ソースコードを全て載せます。のちほど、ポイントを紹介します。GitHub でも Sample コードとして載せています。よろしければ次のURLからどうぞ。
https://github.com/Sugi275/oci-dataintegration-taskrun-sample/blob/master/main.go

package main

import (
	"context"
	"fmt"

	"github.com/Sugi275/oci-dataintegration-taskrun-sample/loglib"
	"github.com/oracle/oci-go-sdk/common"
	"github.com/oracle/oci-go-sdk/common/auth"
	"github.com/oracle/oci-go-sdk/dataintegration"

	"github.com/google/uuid"
)

func main() {
	loglib.InitSugar()
	defer loglib.Sugar.Sync()

	fmt.Println("Process Start!")

	dataIntegrationClient, err := getDataIntegrationClient()
	if err != nil {
		loglib.Sugar.Error(err)
		return
	}

	createTaskRun(dataIntegrationClient)

	fmt.Println("Process End!")
}

func getDataIntegrationClient() (client dataintegration.DataIntegrationClient, err error) {
	var dataIntegrationClient dataintegration.DataIntegrationClient

	provider, err := auth.InstancePrincipalConfigurationProvider()
	if err != nil {
		loglib.Sugar.Error(err)
		return dataIntegrationClient, err
	}

	dataIntegrationClient, err = dataintegration.NewDataIntegrationClientWithConfigurationProvider(provider)
	if err != nil {
		loglib.Sugar.Error(err)
		return dataIntegrationClient, err
	}

	return dataIntegrationClient, nil
}

func createTaskRun(dataIntegrationClient dataintegration.DataIntegrationClient) {
	var rootObjectValueInterface interface{}
	rootObjectValueInterface = map[string]interface{}{
		"dataFormat": map[string]interface{}{
			"formatAttribute": map[string]interface{}{
				"delimiter":       ",",
				"encoding":        "UTF-8",
				"escapeCharacter": "\\",
				"hasHeader":       "true",
				"modelType":       "CSV_FORMAT",
				"quoteCharacter":  "\"",
				"timestampFormat": "yyyy-MM-dd HH:mm:ss.SSS",
			},
			"type": "CSV",
		},
		"entity": map[string]interface{}{
			"key":          "dataref:4081caef-488a-440b-9d76-871912c4d3f0/input/FILE_ENTITY:paramtest02.csv",
			"modelType":    "FILE_ENTITY",
			"objectStatus": 1,
		},
		"modelType": "ENRICHED_ENTITY",
	}

	// 上記複雑な変数は、次のJSONオブジェジェクトを生成するために必要なもの。OCI SDK の インターフェース上、このような指定方法となる
	//      {
	//         "dataFormat": {
	//             "formatAttribute": {
	//                 "delimiter": ",",
	//                 "encoding": "UTF-8",
	//                 "escapeCharacter": "\\",
	//                 "hasHeader": "true",
	//                 "modelType": "CSV_FORMAT",
	//                 "quoteCharacter": "\"",
	//                 "timestampFormat": "yyyy-MM-dd HH:mm:ss.SSS"
	//             },
	//             "type": "CSV"
	//         },
	//         "entity": {
	//             "key": "dataref:4081caef-488a-440b-9d76-871912c4d3f0/input/FILE_ENTITY:paramtest03.csv",
	//             "modelType": "FILE_ENTITY",
	//             "objectStatus": 1
	//         },
	//         "modelType": "ENRICHED_ENTITY"
	//     }
	//

	parameterValue := map[string]dataintegration.ParameterValue{
		"INPUT_OBJECT_NAME": {RootObjectValue: &rootObjectValueInterface},
	}

	configProvider := dataintegration.CreateConfigProvider{
		Bindings: parameterValue,
	}

	uuidString, err := uuid.NewRandom()
	if err != nil {
		loglib.Sugar.Error(err)
		return
	}

	createTaskRunDetails := dataintegration.CreateTaskRunDetails{
		Key:              common.String(uuidString.String()),                                                                      // Task Run Key に、UUID を与えて、一意の文字列にする
		RegistryMetadata: &dataintegration.RegistryMetadata{AggregatorKey: common.String("f00c0f5c-da6d-4756-9fad-05b30840b181")}, // Application に Publish している Task の Task Key を指定
		ConfigProvider:   &configProvider,
	}

	createTaskRequest := dataintegration.CreateTaskRunRequest{
		WorkspaceId:          common.String("ocid1.disworkspace.oc1.ap-tokyo-1.amaaaaaassl65iqa4726obzimlzokp4p2tscrb3ykye2xin4ltwdnf5ioh4q"),
		ApplicationKey:       common.String("2848c0c8-8400-4ac2-8d75-bcf38ad7c9b2"),
		CreateTaskRunDetails: createTaskRunDetails,
	}

	response, err := dataIntegrationClient.CreateTaskRun(context.Background(), createTaskRequest)
	if err != nil {
		loglib.Sugar.Error(err)
		return
	}

	fmt.Println(response)
}

main 関数から、getDataIntegrationClientcreateTaskRun の2つの関数を呼び出しています。最初の getDataIntegrationClient は見ればわかる通り、Data Integration の Client を生成しています。認証は Instance Principal で行っています。秘密鍵の設定とかは面倒なので、Compute Instance に API の実行権限を付与しています。

func getDataIntegrationClient() (client dataintegration.DataIntegrationClient, err error) {
	var dataIntegrationClient dataintegration.DataIntegrationClient

	provider, err := auth.InstancePrincipalConfigurationProvider()
	if err != nil {
		loglib.Sugar.Error(err)
		return dataIntegrationClient, err
	}

	dataIntegrationClient, err = dataintegration.NewDataIntegrationClientWithConfigurationProvider(provider)
	if err != nil {
		loglib.Sugar.Error(err)
		return dataIntegrationClient, err
	}

	return dataIntegrationClient, nil
}

次に createTaskRun 関数です。データ加工タスクを起動している本命の部分は、この1行です。第二引数の createTaskRequest 変数を作成する部分が、コードの大多数を占めています。

 response, err := dataIntegrationClient.CreateTaskRun(context.Background(), createTaskRequest)

各種パラメータをどこから引っ張ってくるかは、前回の OCI CLI の実行方法に書かれています。特筆すべきは、次の変数です。map 変数 rootObjectValueInterface を定義しています。Key は string で、Value が interface{} です。これらの変数の要素が、JSON として変換されるため、このような複雑な構成になっています。

	var rootObjectValueInterface interface{}
	rootObjectValueInterface = map[string]interface{}{
		"dataFormat": map[string]interface{}{
			"formatAttribute": map[string]interface{}{
				"delimiter":       ",",
				"encoding":        "UTF-8",
				"escapeCharacter": "\\",
				"hasHeader":       "true",
				"modelType":       "CSV_FORMAT",
				"quoteCharacter":  "\"",
				"timestampFormat": "yyyy-MM-dd HH:mm:ss.SSS",
			},
			"type": "CSV",
		},
		"entity": map[string]interface{}{
			"key":          "dataref:4081caef-488a-440b-9d76-871912c4d3f0/input/FILE_ENTITY:paramtest02.csv",
			"modelType":    "FILE_ENTITY",
			"objectStatus": 1,
		},
		"modelType": "ENRICHED_ENTITY",
	}

map の中に、いくつかの map を入れているため、かなり複雑な構成となっています。これによって生成されるJSONがこのような形です。このあたりの定義方法に注意して、プログラミングを組み立てていくとよいです。

	// 上記複雑な変数は、次のJSONオブジェジェクトを生成するために必要なもの。OCI SDK の インターフェース上、このような指定方法となる
	//      {
	//         "dataFormat": {
	//             "formatAttribute": {
	//                 "delimiter": ",",
	//                 "encoding": "UTF-8",
	//                 "escapeCharacter": "\\",
	//                 "hasHeader": "true",
	//                 "modelType": "CSV_FORMAT",
	//                 "quoteCharacter": "\"",
	//                 "timestampFormat": "yyyy-MM-dd HH:mm:ss.SSS"
	//             },
	//             "type": "CSV"
	//         },
	//         "entity": {
	//             "key": "dataref:4081caef-488a-440b-9d76-871912c4d3f0/input/FILE_ENTITY:paramtest03.csv",
	//             "modelType": "FILE_ENTITY",
	//             "objectStatus": 1
	//         },
	//         "modelType": "ENRICHED_ENTITY"
	//     }
	//

Python

次に Python のソースコードを紹介します。Go言語のものと比べて、いくらかシンプルですね。今回の用途では、Python の方が書きやすいと思います。

# ! /usr/bin/env python3
# -*- coding: utf-8 -*-

from sys import meta_path
import oci
import time
import calendar

from oci.data_integration.models import key
from oci.data_integration.models import registry_metadata
from oci.data_integration.models import config_provider


def createTaskRun():
    print("Start createTaskRun")

    # By default this will hit the auth service in the region returned by http://169.254.169.254/opc/v2/instance/region on the instance.
    signer = oci.auth.signers.InstancePrincipalsSecurityTokenSigner()

    dataIntegrationClient = oci.data_integration.DataIntegrationClient(
        config={'region': 'ap-tokyo-1'}, signer=signer)

    workspaceID = "ocid1.disworkspace.oc1.ap-tokyo-1.amaaaaaassl65iqa4726obzimlzokp4p2tscrb3ykye2xin4ltwdnf5ioh4q"
    applicationKey = "2848c0c8-8400-4ac2-8d75-bcf38ad7c9b2"
    taskKey = "f00c0f5c-da6d-4756-9fad-05b30840b181"
    genkey = str(calendar.timegm(time.gmtime()))

    metadataRegistry = oci.data_integration.models.RegistryMetadata(
        aggregator_key=taskKey)

    configProvider = {
        "bindings": {
            "INPUT_OBJECT_NAME": {
                "rootObjectValue": {
                    "dataFormat": {
                        "formatAttribute": {
                            "delimiter": ",",
                            "encoding": "UTF-8",
                            "escapeCharacter": "\\",
                            "hasHeader": "true",
                            "modelType": "CSV_FORMAT",
                            "quoteCharacter": "\"",
                            "timestampFormat": "yyyy-MM-dd HH:mm:ss.SSS"
                        },
                        "type": "CSV"
                    },
                    "entity": {
                        "key": "dataref:4081caef-488a-440b-9d76-871912c4d3f0/input/FILE_ENTITY:paramtest02.csv",
                        "modelType": "FILE_ENTITY",
                        "name": "paramtest02.csv",
                        "resourceName": "FILE_ENTITY:paramtest02.csv"
                    },
                    "modelType": "ENRICHED_ENTITY"
                }
            }
        }
    }

    createTaskRunDetails = oci.data_integration.models.CreateTaskRunDetails(
        key=genkey, registry_metadata=metadataRegistry, config_provider=configProvider)

    taskRunResponse = dataIntegrationClient.create_task_run(
        workspaceID, applicationKey, create_task_run_details=createTaskRunDetails)

    print("HTTP Response Code : " + taskRunResponse.status)


if __name__ == '__main__':
    print("Start Main")
    createTaskRun()
    print("End Main")

いくつかポイントを紹介します。Python で Instance Pricipal を使っているのがここです。signer を生成して、Data Integration の Client へ与えています。

    signer = oci.auth.signers.InstancePrincipalsSecurityTokenSigner()

    dataIntegrationClient = oci.data_integration.DataIntegrationClient(
        config={'region': 'ap-tokyo-1'}, signer=signer)

余談 : SDK Debug について

Go言語のソースコードで紹介していた、次の複雑な Map 変数があります。この複雑な Map 変数にたどり着くまでに、色々苦労しました。

func createTaskRun(dataIntegrationClient dataintegration.DataIntegrationClient) {
	var rootObjectValueInterface interface{}
	rootObjectValueInterface = map[string]interface{}{
		"dataFormat": map[string]interface{}{
			"formatAttribute": map[string]interface{}{
				"delimiter":       ",",
				"encoding":        "UTF-8",
				"escapeCharacter": "\\",
				"hasHeader":       "true",
				"modelType":       "CSV_FORMAT",
				"quoteCharacter":  "\"",
				"timestampFormat": "yyyy-MM-dd HH:mm:ss.SSS",
			},
			"type": "CSV",
		},
		"entity": map[string]interface{}{
			"key":          "dataref:4081caef-488a-440b-9d76-871912c4d3f0/input/FILE_ENTITY:paramtest02.csv",
			"modelType":    "FILE_ENTITY",
			"objectStatus": 1,
		},
		"modelType": "ENRICHED_ENTITY",
	}

はじめは、次のように、JSON 文字列をそのまま指定していました。

	var rootObjectValueInterface interface{}
	rootObjectValueInterface = `{"dataFormat":{"formatAttribute":{"delimiter":",","encoding":"UTF-8","escapeCharacter":"\\","hasHeader":"true","modelType":"CSV_FORMAT","quoteCharacter":"\"","timestampFormat":"yyyy-MM-dd HH:mm:ss.SSS"},"type":"CSV"},"entity":{"key":"dataref:4081caef-488a-440b-9d76-871912c4d3f0/input/FILE_ENTITY:paramtest05.csv","modelType":"FILE_ENTITY","objectStatus":1},"modelType":"ENRICHED_ENTITY"}`

が、このままだと、どうしても正常に ConfigProvider が動かせませんでした。OCI SDK が、interface{}を渡す設計のため、なんでも渡せるがゆえに、何を渡せば正解なのかがわかりません。HTTP Request の Body を見れば、悪いところが分かるのですが、HTTPS で通信しているため、tcpdump コマンドでパケットを解析しても、暗号化されていてよくわかりません。

こういう時に使えるのが、OCI SDK の Debug 方法です。OCI SDK によって色々設定方法はあるのですが、Go言語の SDK では、環境変数 OCI_GO_SDK_DEBUG を設定すれば、SDK 側のデバッグ情報が出せます。次の URL で公開されています。

fish shell での環境変数設定例はこちらです。v を Value に指定することで、verbose な情報を出力できます。

set -x OCI_GO_SDK_DEBUG v

この指定の結果、実行ログに、ConfigProvider の中身が出力されていることがわかりました。エスケープ文字 "/" が含まれており、なにやら正常なJSONではなさそうなことが分かり、試行錯誤して Map 変数へたどり着けました。

> go run main.go
...snip...
{"configProvider":{"bindings":{"INPUT_OBJECT_NAME":{"rootObjectValue":"{ \"dataFormat\": { \"formatAttribute\": { \"delimiter\": \",\", \"encoding\": \"UTF-8\", \"escapeCharacter\": \"\\\\\", \"hasHeader\": \"true\", \"modelType\": \"CSV_FORMAT\", \"quoteCharacter\": \"\\\"\", \"timestampFormat\": \"yyyy-MM-dd HH:mm:ss.SSS\" }, \"type\": \"CSV\" }, \"entity\": { \"key\": \"dataref:4081caef-488a-440b-9d76-871912c4d3f0/input/FILE_ENTITY:paramtest05.csv\", \"modelType\": \"FILE_ENTITY\", \"objectStatus\": 1 }, \"modelType\": \"ENRICHED_ENTITY\" }"}}},"key":"e61b403c-9c0f-4a89-87a2-11b6b0b38478","registryMetadata":{"aggregatorKey":"f00c0f5c-da6d-4756-9fad-05b30840b181"}}
...snip...

参考URL

Integration Tasks の紹介
https://blogs.oracle.com/dataintegration/integration-tasks-in-oracle-cloud-infrastructure-oci-data-integration

PM Blogs
https://blogs.oracle.com/dataintegration/oracle-cloud-infrastructure-data-integration

Blogs
https://medium.com/@dave.allan.us

1
1
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
1
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?