この記事は インフォマティカ Advent Calender 2025 Day 1 の記事として書かれています。
はじめに
InformaticaのIntelligent Data Management Cloud(IDMC)において簡単ウィザードを利用した大量データ・リアルタイムデータの転送/レプリケーションを実施する"Cloud Data Ingestion and Replication(CDIR)"は今まで何度もご紹介してきたのですが、CDIRのうち "Streaming” については結構謎に包まれているような雰囲気ですので、利用例をご紹介したいと思います。
ログファイルなど、Textファイルへの書き込みデータをリアルタイムに読み込みCDWHへ転送する
はい、このような要件は皆様もお持ちではないでしょうか?ここではログにどんどん出力される通信記録をCDIR Steamingを利用して読み込み、リアルタイムにGoogle Cloud Storageへレプリケーションする例をご紹介します。
デモデータとCDIR Streamingで今回実装する内容
今回は以下のように、"携帯等の通信データ"が1つのファイルにどんどん書き込まれるような状況を仮想します。このログデータを常に監視し、レコードが来たら取得してGoogle Cloud Storageに投入していく、という処理を実装してみましょう。
ログデータはSecureAgent上に出力される物としています。そのためローカルファイルコネクタを利用。ターゲットとなるGoogle Cloud Storageコネクタ。それぞれ事前に準備してある物を利用します。
通信データ (ソースデータ)
SecureAgentの以下のディレクトリに call_data.json と言う形でデータを約1秒毎に投入するshellを作っています。
/opt/infa01/infa_shared/source/streamdata/nokkia/call_data.json
ファイルの内容はこんな感じです。
{"Call Timestamp": "2025-11-25 18:34:25", "Network Code": "NOKKIA", "Network Fault Flag": "N", "Caller ID": 909, "Receiver ID": 817, "Call Duration": 112.73850799650519, "Call Type": "voice", "Data Transfer (MB)": null, "Caller Location": {"Latitude": 49.463562449713145, "Longitude": 16.170931598007627}, "Receiver Location": {"Latitude": 38.513628411254665, "Longitude": 9.186252760351007}}
{"Call Timestamp": "2025-11-25 18:34:26", "Network Code": "NOKKIA", "Network Fault Flag": "N", "Caller ID": 430, "Receiver ID": 561, "Call Duration": 230.6406515400272, "Call Type": "voice", "Data Transfer (MB)": null, "Caller Location": {"Latitude": 43.109448968790694, "Longitude": 8.199045730704487}, "Receiver Location": {"Latitude": 38.19501711184719, "Longitude": 14.192383642609311}}
{"Call Timestamp": "2025-11-25 18:34:27", "Network Code": "NOKKIA", "Network Fault Flag": "N", "Caller ID": 744, "Receiver ID": 484, "Call Duration": 158.0078097188479, "Call Type": "video", "Data Transfer (MB)": null, "Caller Location": {"Latitude": 37.26759624488112, "Longitude": 18.449576823535864}, "Receiver Location": {"Latitude": 34.70898286897633, "Longitude": 5.285980544675219}}
{"Call Timestamp": "2025-11-25 18:34:28", "Network Code": "NOKKIA", "Network Fault Flag": "N", "Caller ID": 399, "Receiver ID": 137, "Call Duration": 282.8880913957357, "Call Type": "voice", "Data Transfer (MB)": null, "Caller Location": {"Latitude": 45.85166348908817, "Longitude": 15.442680897271966}, "Receiver Location": {"Latitude": 31.989156521404436, "Longitude": 17.610559050423728}}
{"Call Timestamp": "2025-11-25 18:34:29", "Network Code": "NOKKIA", "Network Fault Flag": "N", "Caller ID": 785, "Receiver ID": 100, "Call Duration": 254.34046168404137, "Call Type": "data", "Data Transfer (MB)": 454.3266661419987, "Caller Location": {"Latitude": 43.61170470451168, "Longitude": 9.289620451269318}, "Receiver Location": {"Latitude": 36.02721052105766, "Longitude": 18.513080065377252}}
{"Call Timestamp": "2025-11-25 18:34:30", "Network Code": "NOKKIA", "Network Fault Flag": "N", "Caller ID": 384, "Receiver ID": 239, "Call Duration": 109.47890823777739, "Call Type": "data", "Data Transfer (MB)": 6.921931555659491, "Caller Location": {"Latitude": 32.73464608961519, "Longitude": 18.278642736389717}, "Receiver Location": {"Latitude": 31.146056670008228, "Longitude": 10.575613786799607}}
ターゲットとなるGoogle Cloud Storage
以下のBucketの、INFA_Streamフォルダの配下にデータを投入することに致しましょう。先に書きましたとおり、このBucketを参照するためのコネクタは作成してあります。

実装!
IDMCにログインし、"データ統合" または "データ取り込みおよびレプリケーション" メニューをクリックします。

データ統合のホーム画面にて "取り込み" メニューをクリック。

開かれるメニューから "ストリーミング取り込みおよびレプリケーションタスク" メニューに進みます。

Streaming取り込みタスクのメニューに進みます。
1:定義 では、以下のように、タスクの名称を自由に設定し、ランタイム環境(実行するSecureAgent)を選びます。プロジェクト(タスクの保存先)は必要に応じて。

[次へ] で進む。
2:ソース
必須となるのは2箇所、まずコネクタを選択します。今回はローカルフラットファイル用のコネクタを選択。次に参照するファイルをフルパスで指定します。
補足:初期開始位置 は 「現在の時刻」か「ファイルの先頭から」か選択出来ます。また、tailモードを「単一ファイル」か「複数ファイル」かを選択できます。今回はデフォルトのまま↓としています。

[次へ] で進む。
3:ターゲット
設定箇所は3箇所。コネクタを選択すると、Bucketとkeyを入力する必要があります。
Bucket は 今回投入するGoogle Cloud StorageのBucket名を記述。
Key は Bucket配下のディレクトリ構成+ファイル名 を指定します。今回は以下のように記述し、ファイル名に日付時刻タイムスタンプを表記するようにしました。
INFA_Stream/call_data_nokkia/events_${now():format('yyyyMMddHHmmssSSS')}.json
4:トランスフォーメーション
出力する形式を選べます。特に指定しない場合は"バイナリ"となります。そしてトランスフォーメーションを追加して、出力するデータをフィルタしたり、分割したりという設定が出来ます。デフォルトは何もせずそのまま転送する形になります。今回もデフォルトのまま次へ。

5:ランタイムオプション
最後に、実行ジョブの詳細設定を行います。今回はデフォルトのままとします。

保存してデプロイしましょう。
デプロイ=実行!
CDIR Streamingはデプロイを行い成功すると、そのまま処理が開始されます。


実行確認
実行確認をするにはいくつか手段がありますが、今回はマイジョブで確認します。

ちゃんと稼働中になっていますね。よかった。明示的に停止を行う(かエラーとなる)まで、ずっと稼働します。

Google Cloud Storageで見てみる
このようにファイルが転送されています。

クリックしてダウンロードし、Textファイルの中を見てみると。

簡単ですね。
使い込めば便利なCDIRStreaming、是非ご利用ください。
以上、何かのお役に立てば幸いです。
