はじめに
OCI Data Integration のデータフローで「ファンクション演算子」が利用できるようになりました。これにより従来 OCI Data Integration の組み込み機能だけでは対応が難しかったデータ変換処理を OCI Functions のアプリケーションに任せることができます。
ドキュメントはこちらから
ここでは、OCI Data Integration と OCI Functions 間のインターフェース(入出力データ)の話と、それに即した Pythonアプリケーションの書き方についてお話ししたいと思います。
OCI Data Integration 側の準備
ファンクション演算子のプロパティ設定で
- 入力属性 (Input Attributes)
- 出力属性 (Output Attributes)
- 機能属性 (Function Configuration)
を構成します。入力属性と出力属性はOCI Functions アプリケーションのリクエスト/レスポンスの Json データのスキーマとなります。機能属性の情報もリクエストデータに含まれますので、これをアプリケーションの実行パラメータとして使うことができます。
OCI Functions の送受信データ
- Request: Jsonオブジェクト
{
"data" : "{{ 入力属性スキーマを持つレコードのjsonオブジェクトを複数改行区切りでまとめてbase64エンコードした文字列 }}"
"parameters : {{ 機能属性のjsonオブジェクト}}
}
具体的には、このようなJsonとなります。
{
"data": "eyJmaXJzd...(途中略)...WMwNi02MDExLTMwYjQtNTllOC1kMTY0N2VlZDQ5ZjEifQo=",
"parameters": {
"param1": "param1value"
}
}
さらに、"data" の値であるbase64エンコードされた文字列をデコードすると
{"firstname":"James","lastname":"Brown","secret_id_field":"0927e5a8-9097-4cad-a7fa-b6167184c744"}
{"firstname":"David","lastname":"Paich","secret_id_field":"58e25c06-6011-30b4-59e8-d1647eed49f1"}
のように、各レコードが改行で区切られた形になっています (Json Linesフォーマット)。
"secret_id_field" はOCI Data Integrationによって入力データの各レコードに振られる一意のIDで、出力データを返す時も各レコードに必ず同じIDをつけなければなりません。OCI Data IntegrationはこのIDを使って、レコードの紐付けをおこないます。
- Response: Jsonオブジェクト
[
{{ 出力属性スキーマを持つレコードのjsonオブジェクト }},
{{ 出力属性スキーマを持つレコードのjsonオブジェクト }},
...
{{ 出力属性スキーマを持つレコードのjsonオブジェクト }}
]
こちらはごく一般的なJson配列です。
OCI Functions で変換アプリケーションを書く
多分一番シンプルな "無変換" する変換アプリケーションを公開しています。入力データをそのまま出力データとして返すプログラムです。
プログラム本体が func.py、テストプログラムの方が functest.py、ですが、functest.py ではOCI Data Integration に代わって入力データを作成しているので、こちらを参考にしてもらうと、全体の入出力が理解できると思います。
import io
import json
import base64
from fdk import fixtures
import pytest
import func
@pytest.mark.asyncio
async def test_request():
rows = [
{"firstname":"James","lastname":"Brown","secret_id_field":"0927e5a8-9097-4cad-a7fa-b6167184c744"},
{"firstname":"David","lastname":"Paich","secret_id_field":"58e25c06-6011-30b4-59e8-d1647eed49f1"}
]
parameters = {"param1" : "param1value"}
data = []
for row in rows:
data.append(json.dumps(row) + '\n')
request = {
"data" : base64.b64encode(''.join(data).encode('utf-8')).decode('utf-8'),
"parameters" : parameters
}
input_content = io.BytesIO(json.dumps(request).encode("utf-8"))
call = await fixtures.setup_fn_call(func.handler, content=input_content)
content, status, headers = await call
assert 200 == status
assert rows == json.loads(content)
プログラム本体の func.py では地道に io.BytesIO から改行区切りされた Json レコードまで戻していきます。
body: bytes = data.getvalue() # io.BytesIO -> bytes
request: json = json.loads(body) # jsonオブジェクト("data" と "paramteres" ノードが存在)
rows: str = base64.b64decode(request.get("data")).decode() # 改行区切りされた Json 文字列
変換処理のポイントとしては、改行区切りのJsonレコードを扱うために pandas の DataFrame を使っている点です。
df = pandas.read_json(rows, lines=True)
読み込んだ後 DataFrame のまま処理を続けて最後にJson配列の文字列に落とすと、そのままレスポンスの出力データとして使えます。実際に変換処理を行う場合は
def process(df: pandas.DataFrame) -> str:
df['lastname'] = df['lastname'].apply(lambda x: str(x).upper()) # 姓を大文字に変換
return df.to_json(orient='records')
などとやって下さい。
注意点など
- 一度にOCI Functionsに渡されるレコード数は、OCI Data Integration の機能属性 BATCH_SIZE でコントロールできます。OCI Functionsのメモリーが足りなかったり、タイムアウトにひっかかるような場合は BATCH_SIZE を小さな値にします。
- 出力データに "secret_id_field" を忘れずに。
まとめ
ということで、OCI Data Integration から OCI Functions に渡すデータが少しだけ分かりづらいですが、この投稿を参考していただいて、バリバリ OCI Data Integration を活用していただければ幸いです。