2
0

More than 1 year has passed since last update.

OCI Data Integration から OCI Functions を呼び出す

Last updated at Posted at 2021-10-14

はじめに

OCI Data Integration のデータフローで「ファンクション演算子」が利用できるようになりました。これにより従来 OCI Data Integration の組み込み機能だけでは対応が難しかったデータ変換処理を OCI Functions のアプリケーションに任せることができます。
image.png
ドキュメントはこちらから

ここでは、OCI Data Integration と OCI Functions 間のインターフェース(入出力データ)の話と、それに即した Pythonアプリケーションの書き方についてお話ししたいと思います。

OCI Data Integration 側の準備

ファンクション演算子のプロパティ設定で

  • 入力属性 (Input Attributes)
  • 出力属性 (Output Attributes)
  • 機能属性 (Function Configuration)

を構成します。入力属性と出力属性はOCI Functions アプリケーションのリクエスト/レスポンスの Json データのスキーマとなります。機能属性の情報もリクエストデータに含まれますので、これをアプリケーションの実行パラメータとして使うことができます。

OCI Functions の送受信データ

  • Request: Jsonオブジェクト
{
  "data" : "{{ 入力属性スキーマを持つレコードのjsonオブジェクトを複数改行区切りでまとめてbase64エンコードした文字列 }}"
  "parameters : {{ 機能属性のjsonオブジェクト}}
}

具体的には、このようなJsonとなります。

{
  "data": "eyJmaXJzd...(途中略)...WMwNi02MDExLTMwYjQtNTllOC1kMTY0N2VlZDQ5ZjEifQo=",
  "parameters": {
    "param1": "param1value"
  }
}

さらに、"data" の値であるbase64エンコードされた文字列をデコードすると

{"firstname":"James","lastname":"Brown","secret_id_field":"0927e5a8-9097-4cad-a7fa-b6167184c744"}
{"firstname":"David","lastname":"Paich","secret_id_field":"58e25c06-6011-30b4-59e8-d1647eed49f1"}

のように、各レコードが改行で区切られた形になっています (Json Linesフォーマット)。
"secret_id_field" はOCI Data Integrationによって入力データの各レコードに振られる一意のIDで、出力データを返す時も各レコードに必ず同じIDをつけなければなりません。OCI Data IntegrationはこのIDを使って、レコードの紐付けをおこないます。

  • Response: Jsonオブジェクト
[
  {{ 出力属性スキーマを持つレコードのjsonオブジェクト }},
  {{ 出力属性スキーマを持つレコードのjsonオブジェクト }},
  ...
  {{ 出力属性スキーマを持つレコードのjsonオブジェクト }}
] 

こちらはごく一般的なJson配列です。

OCI Functions で変換アプリケーションを書く

多分一番シンプルな "無変換" する変換アプリケーションを公開しています。入力データをそのまま出力データとして返すプログラムです。

プログラム本体が func.py、テストプログラムの方が functest.py、ですが、functest.py ではOCI Data Integration に代わって入力データを作成しているので、こちらを参考にしてもらうと、全体の入出力が理解できると思います。

import io
import json
import base64
from fdk import fixtures
import pytest
import func

@pytest.mark.asyncio
async def test_request():
    rows = [
        {"firstname":"James","lastname":"Brown","secret_id_field":"0927e5a8-9097-4cad-a7fa-b6167184c744"},
        {"firstname":"David","lastname":"Paich","secret_id_field":"58e25c06-6011-30b4-59e8-d1647eed49f1"}
    ]
    parameters = {"param1" : "param1value"}
    data = []
    for row in rows:
        data.append(json.dumps(row) + '\n')
    request = {
        "data" : base64.b64encode(''.join(data).encode('utf-8')).decode('utf-8'),
        "parameters" : parameters
    }
    input_content = io.BytesIO(json.dumps(request).encode("utf-8"))
    call = await fixtures.setup_fn_call(func.handler, content=input_content)

    content, status, headers = await call

    assert 200 == status
    assert rows == json.loads(content)

プログラム本体の func.py では地道に io.BytesIO から改行区切りされた Json レコードまで戻していきます。

body: bytes = data.getvalue() # io.BytesIO -> bytes
request: json = json.loads(body) # jsonオブジェクト("data" と "paramteres" ノードが存在)
rows: str = base64.b64decode(request.get("data")).decode() # 改行区切りされた Json 文字列

変換処理のポイントとしては、改行区切りのJsonレコードを扱うために pandas の DataFrame を使っている点です。

df = pandas.read_json(rows, lines=True)

読み込んだ後 DataFrame のまま処理を続けて最後にJson配列の文字列に落とすと、そのままレスポンスの出力データとして使えます。実際に変換処理を行う場合は

def process(df: pandas.DataFrame) -> str:
    df['lastname'] = df['lastname'].apply(lambda x: str(x).upper()) # 姓を大文字に変換
    return df.to_json(orient='records')

などとやって下さい。

注意点など

  • 一度にOCI Functionsに渡されるレコード数は、OCI Data Integration の機能属性 BATCH_SIZE でコントロールできます。OCI Functionsのメモリーが足りなかったり、タイムアウトにひっかかるような場合は BATCH_SIZE を小さな値にします。
  • 出力データに "secret_id_field" を忘れずに。

まとめ

ということで、OCI Data Integration から OCI Functions に渡すデータが少しだけ分かりづらいですが、この投稿を参考していただいて、バリバリ OCI Data Integration を活用していただければ幸いです。

2
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
2
0