7
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

この記事は インフォマティカ Advent Calender 2025 Day 1 の記事として書かれています。

はじめに

InformaticaのIntelligent Data Management Cloud(IDMC)において簡単ウィザードを利用した大量データ・リアルタイムデータの転送/レプリケーションを実施する"Cloud Data Ingestion and Replication(CDIR)"は今まで何度もご紹介してきたのですが、CDIRのうち "Streaming” については結構謎に包まれているような雰囲気ですので、利用例をご紹介したいと思います。

ログファイルなど、Textファイルへの書き込みデータをリアルタイムに読み込みCDWHへ転送する

はい、このような要件は皆様もお持ちではないでしょうか?ここではログにどんどん出力される通信記録をCDIR Steamingを利用して読み込み、リアルタイムにGoogle Cloud Storageへレプリケーションする例をご紹介します。

デモデータとCDIR Streamingで今回実装する内容

今回は以下のように、"携帯等の通信データ"が1つのファイルにどんどん書き込まれるような状況を仮想します。このログデータを常に監視し、レコードが来たら取得してGoogle Cloud Storageに投入していく、という処理を実装してみましょう。
ログデータはSecureAgent上に出力される物としています。そのためローカルファイルコネクタを利用。ターゲットとなるGoogle Cloud Storageコネクタ。それぞれ事前に準備してある物を利用します。

通信データ (ソースデータ)

SecureAgentの以下のディレクトリに call_data.json と言う形でデータを約1秒毎に投入するshellを作っています。

/opt/infa01/infa_shared/source/streamdata/nokkia/call_data.json

ファイルの内容はこんな感じです。

call_data.json sample
{"Call Timestamp": "2025-11-25 18:34:25", "Network Code": "NOKKIA", "Network Fault Flag": "N", "Caller ID": 909, "Receiver ID": 817, "Call Duration": 112.73850799650519, "Call Type": "voice", "Data Transfer (MB)": null, "Caller Location": {"Latitude": 49.463562449713145, "Longitude": 16.170931598007627}, "Receiver Location": {"Latitude": 38.513628411254665, "Longitude": 9.186252760351007}}
{"Call Timestamp": "2025-11-25 18:34:26", "Network Code": "NOKKIA", "Network Fault Flag": "N", "Caller ID": 430, "Receiver ID": 561, "Call Duration": 230.6406515400272, "Call Type": "voice", "Data Transfer (MB)": null, "Caller Location": {"Latitude": 43.109448968790694, "Longitude": 8.199045730704487}, "Receiver Location": {"Latitude": 38.19501711184719, "Longitude": 14.192383642609311}}
{"Call Timestamp": "2025-11-25 18:34:27", "Network Code": "NOKKIA", "Network Fault Flag": "N", "Caller ID": 744, "Receiver ID": 484, "Call Duration": 158.0078097188479, "Call Type": "video", "Data Transfer (MB)": null, "Caller Location": {"Latitude": 37.26759624488112, "Longitude": 18.449576823535864}, "Receiver Location": {"Latitude": 34.70898286897633, "Longitude": 5.285980544675219}}
{"Call Timestamp": "2025-11-25 18:34:28", "Network Code": "NOKKIA", "Network Fault Flag": "N", "Caller ID": 399, "Receiver ID": 137, "Call Duration": 282.8880913957357, "Call Type": "voice", "Data Transfer (MB)": null, "Caller Location": {"Latitude": 45.85166348908817, "Longitude": 15.442680897271966}, "Receiver Location": {"Latitude": 31.989156521404436, "Longitude": 17.610559050423728}}
{"Call Timestamp": "2025-11-25 18:34:29", "Network Code": "NOKKIA", "Network Fault Flag": "N", "Caller ID": 785, "Receiver ID": 100, "Call Duration": 254.34046168404137, "Call Type": "data", "Data Transfer (MB)": 454.3266661419987, "Caller Location": {"Latitude": 43.61170470451168, "Longitude": 9.289620451269318}, "Receiver Location": {"Latitude": 36.02721052105766, "Longitude": 18.513080065377252}}
{"Call Timestamp": "2025-11-25 18:34:30", "Network Code": "NOKKIA", "Network Fault Flag": "N", "Caller ID": 384, "Receiver ID": 239, "Call Duration": 109.47890823777739, "Call Type": "data", "Data Transfer (MB)": 6.921931555659491, "Caller Location": {"Latitude": 32.73464608961519, "Longitude": 18.278642736389717}, "Receiver Location": {"Latitude": 31.146056670008228, "Longitude": 10.575613786799607}}

ターゲットとなるGoogle Cloud Storage

以下のBucketの、INFA_Streamフォルダの配下にデータを投入することに致しましょう。先に書きましたとおり、このBucketを参照するためのコネクタは作成してあります。
image.png

実装!

IDMCにログインし、"データ統合" または "データ取り込みおよびレプリケーション" メニューをクリックします。
image.png

データ統合のホーム画面にて "取り込み" メニューをクリック。
image.png
開かれるメニューから "ストリーミング取り込みおよびレプリケーションタスク" メニューに進みます。
image.png

Streaming取り込みタスクのメニューに進みます。
1:定義 では、以下のように、タスクの名称を自由に設定し、ランタイム環境(実行するSecureAgent)を選びます。プロジェクト(タスクの保存先)は必要に応じて。
image.png
[次へ] で進む。

2:ソース
必須となるのは2箇所、まずコネクタを選択します。今回はローカルフラットファイル用のコネクタを選択。次に参照するファイルをフルパスで指定します。
補足:初期開始位置 は 「現在の時刻」か「ファイルの先頭から」か選択出来ます。また、tailモードを「単一ファイル」か「複数ファイル」かを選択できます。今回はデフォルトのまま↓としています。
image.png
[次へ] で進む。

3:ターゲット
設定箇所は3箇所。コネクタを選択すると、Bucketとkeyを入力する必要があります。
Bucket は 今回投入するGoogle Cloud StorageのBucket名を記述。
Key は Bucket配下のディレクトリ構成+ファイル名 を指定します。今回は以下のように記述し、ファイル名に日付時刻タイムスタンプを表記するようにしました。

INFA_Stream/call_data_nokkia/events_${now():format('yyyyMMddHHmmssSSS')}.json

image.png
[次へ] で進む。

4:トランスフォーメーション
出力する形式を選べます。特に指定しない場合は"バイナリ"となります。そしてトランスフォーメーションを追加して、出力するデータをフィルタしたり、分割したりという設定が出来ます。デフォルトは何もせずそのまま転送する形になります。今回もデフォルトのまま次へ。
image.png

5:ランタイムオプション
最後に、実行ジョブの詳細設定を行います。今回はデフォルトのままとします。
image.png

保存してデプロイしましょう。

デプロイ=実行!

CDIR Streamingはデプロイを行い成功すると、そのまま処理が開始されます。
image.png
image.png

実行確認

実行確認をするにはいくつか手段がありますが、今回はマイジョブで確認します。
image.png

ちゃんと稼働中になっていますね。よかった。明示的に停止を行う(かエラーとなる)まで、ずっと稼働します。
image.png

Google Cloud Storageで見てみる

このようにファイルが転送されています。
image.png
クリックしてダウンロードし、Textファイルの中を見てみると。
image.png

簡単ですね。
使い込めば便利なCDIRStreaming、是非ご利用ください。

以上、何かのお役に立てば幸いです。

7
1
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
7
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?