はじめに
今回は.NET SDK(ASP.NET C#)を使ってOCI Streamingにデータを送信し、
Pythonで送信されたデータを確認(受信)してみたいと思います。
■概要
OCI StreamingはOracle Cloud Infrastructure(OCI)が提供する
マネージド型のストリーミングデータサービスです。
このサービスは大量のデータをリアルタイムで収集、処理、分析するために設計されており、
Apache Kafkaとの高い互換性を持っています。
Kafkaを使用した経験がある開発者は容易に移行や統合が可能です。
OCI Streamingを使用することでウェブアプリケーションやIoTデバイスなどから
大規模なデータストリーム構成を簡単に構築することができます。
・OCI Streamingの構成イメージ
メッセージ送信側をProducer、受信側をConsumerと呼びます。
■構成
今回検証ではローカルのクライアントPCにインストールしたVisual Studio(C#)で
テスト用アプリケーションを作成し、データ送信を行います。
受信はOCI上に立てたLinuxからPythonを使ってデータを確認します。
■事前準備
・アカウント
OCI Streamingが利用できるアカウントを準備します。
必要に応じて以下のようなポリシーを適用
Allow group [グループ名] to manage stream-family in compartment [コンパートメント名]
※ご自身の環境に合わせて適宜設定します。
・APIキー
OCI StreamingにアクセスするためのAPIキーを準備します。
参考
https://docs.oracle.com/ja-jp/iaas/Content/API/Concepts/apisigningkey.htm#two
・ネットワークの設定
適当にVCN、Subnet(今回はPublic)、Internet Gateway、セキュリティリスト等を
作成しておきます。
参考
https://oracle-japan.github.io/ocitutorials/beginners/creating-vcn/
・Linuxの設定
Pythonが利用できる環境を準備します。
・クライアンドPC(Windows)の設定
Visual Studio Community 2022をインストールし、C#が利用できるようにしておきます。
今回はASP.NET Webフォームを利用します。
(コンポーネントとしてはもう古いですが、
手っ取り早くテストができるため今回利用しています)
◆目次
- Streamingの作成
- .NET SDKでメッセージ送信
- Python SDKでメッセージ受信
1. Streamingの作成
初めにStreamingの作成を行います。
左上メニュー(Ξ)からアナリティクスとAI - ストリーミングを選択
以下の通り入力・選択し、作成ボタンを選択
ストリーム名: 任意の名前(ここではstream01)
ラジオボタン: 新規ストリーム・プールの作成
ストリーム・プール名: 任意の名前(ここではstreampool01)
エンドポイント・タイプ: パブリック・エンドポイント
暗号化設定の構成: Oracle管理キーを使用した暗号化
ストリーム設定の定義 - 保持(時間): 24 / パーティション数: 1
作成してしばらくすると一覧の表示されるので、名前のリンクを選択
詳細にOCIDとメッセージ・エンドポイントが表示されているので
こちらをメモしておきます。
以上でStreamingの作成は終了です。
2. .NET SDKでメッセージ送信
①Visual Studioでプロジェクトを作成
新規プロジェクトの作成で、ASP.NET Webアプリケーション(.NET Framework)を選択
プロジェクト名を入力して作成を選択
ここでは、Producer_clientとしています。
空のプロジェクトを選択し、WebフォームとHTTP用の構成に
チェックを入れて作成を選択
プロジェクトが作成されたらNuGetで以下2つのSDKを追加します。
OCI.DotNetSDK.CommonOCI.DotNetSDK.Streaming
プロジェクト名を右クリックしたら
NuGetパッケージの管理を選択し、
検索ボックスから上記2つのSDKを選択して
それぞれインストールします。
ソリューションエクスプローラにフォームが追加されたらダブルクリックし
ソースタブに切り替え次のHTMLコードに置き換えます。
・HTMLコード
<%@ Page Language="C#" AutoEventWireup="true" CodeBehind="WebForm1.aspx.cs" Inherits="Producer_client.WebForm1" Async="true" %>
<!DOCTYPE html>
<html xmlns="http://www.w3.org/1999/xhtml">
<head runat="server">
<meta http-equiv="Content-Type" content="text/html; charset=utf-8"/>
<title></title>
<style type="text/css">
.auto-style2 {
width: 605px;
}
.auto-style3 {
width: 11px;
}
</style>
</head>
<body>
<form id="form1" runat="server">
<table style="width:100%;">
<tr>
<td class="auto-style3">ID</td>
<td class="auto-style2">
<asp:TextBox ID="TextBox1" runat="server"></asp:TextBox>
</td>
</tr>
<tr>
<td class="auto-style3">Value</td>
<td class="auto-style2">
<asp:TextBox ID="TextBox2" runat="server"></asp:TextBox>
</td>
</tr>
<tr>
<td class="auto-style3">Counter</td>
<td class="auto-style2">
<asp:TextBox ID="TextBox3" runat="server"></asp:TextBox>
</td>
</tr>
<tr>
<td class="auto-style3"> </td>
<td class="auto-style2">
<asp:Button ID="Button1" runat="server" OnClick="Button1_Click" Text="Send" />
</td>
</tr>
<tr>
<td class="auto-style3"> </td>
<td class="auto-style2">
<asp:Label ID="Label1" runat="server"></asp:Label>
</td>
</tr>
</table>
</form>
</body>
</html>
デザインタブに切り替えると次のようなフォームになっていることが確認できます。
画面の適当なところで右クリックし、コードの表示を選択すると画像のような
コードのスケルトンが書いてあるので、次のソースコードに置き換えます。
・ソースコード
using System;
using System.Collections.Generic;
using System.Text;
using System.Text.Json;
using System.Web.UI;
using Oci.Common.Auth;
using Oci.StreamingService;
using Oci.StreamingService.Requests;
using Oci.StreamingService.Models;
namespace Producer_client
{
public partial class WebForm1 : System.Web.UI.Page
{
protected void Page_Load(object sender, EventArgs e)
{
// ページ読み込み時のロジック
}
protected async void Button1_Click(object sender, EventArgs e)
{
string id = TextBox1.Text;
int loopCount = int.Parse(TextBox3.Text);
int totalEntriesSent = 0;
string configurationFilePath = Server.MapPath("~/conf/config");
var provider = new ConfigFileAuthenticationDetailsProvider(configurationFilePath, "DEFAULT");
var client = new StreamClient(provider);
string streamId = "[StreamingのOCID]";
string serviceEndpoint = "[Streamingのエンドポイント]";
client.SetEndpoint(serviceEndpoint);
for (int i = 1; i <= loopCount; i++)
{
string value = TextBox2.Text + "_" + i;
string timestamp = DateTime.UtcNow.ToString("o");
var data = new
{
ID = id,
VALUE = value,
TIMESTAMP = timestamp
};
string dataJson = JsonSerializer.Serialize(data);
byte[] encodedData = Encoding.UTF8.GetBytes(dataJson);
byte[] keyData = Encoding.UTF8.GetBytes(id);
var messageList = new List<PutMessagesDetailsEntry>
{
new PutMessagesDetailsEntry {
Key = keyData,
Value = encodedData
}
};
var messages = new PutMessagesDetails { Messages = messageList };
var request = new PutMessagesRequest { StreamId = streamId, PutMessagesDetails = messages };
try
{
var response = await client.PutMessages(request);
if (response.PutMessagesResult != null)
{
totalEntriesSent += response.PutMessagesResult.Entries.Count;
}
}
catch (Exception ex)
{
Label1.Text += "Error in loop " + i + ": " + ex.Message + "\n";
}
}
Label1.Text = "Total messages sent successfully: " + totalEntriesSent;
}
}
}
ソースコードの中の以下2点を実際の値に変更します。
→ string streamId = "[StreamingのOCID]";
→ string serviceEndpoint = "[Streamingのエンドポイント]";
またAPIでアクセスする必要があるため
.NETのプロジェクトフォルダ内にconfフォルダを作成し、
OCIへ接続するための、configと秘密鍵を格納しておきます。
→ string configurationFilePath = Server.MapPath("~/conf/config");
ファイル構造はこのような形になります。
プロジェクトフォルダ [Producer_client]
├─ bin/
├─ obj/
└─ conf/
├─ config
└─ oci_api_key.pem(秘密鍵)
因みにconfigの中身はこのようなイメージとなります。
[DEFAULT]
user=[ユーザOCID]
fingerprint=[フィンガープリント]
key_file=C:/Users/xxx/program/Producer_client/conf/oci_api_key.pem
tenancy=[テナントOCID]
region=ap-tokyo-1
※秘密鍵のファイルのパスは、¥マークではなく、/なのでその点はご注意ください。
HTMLとソースコードを書き換えたら、F5でアプリケーションを実行すると
次のようなページが表示されます。
そこで次のような値を入れてSendボタンを押します。
ID: id01
Value: testdata
Counter: 5
この送信でデータが5件登録されています。
(Counterの件数=データ件数です)
以上で.NET SDKでメッセージ送信は終了です。
3. Python SDKでメッセージ受信
最後に送信されたデータを確認しますが、
OCIコンソールのStreamingからだと過去1分間しかデータ表示ができないため、
Pythonで表示できるようにしておくと便利です。
今回検証で利用したPythonのバージョンは以下の通りです。
# python --version
Python 3.6.8
※適宜Pythonが実行できる環境を用意しておきます。
あとはOCI Python SDKのインストールします。
pip install oci
続いてAPIキーの設定をするために、ホームディレクトリに
.ociのディレクトリを作成します。
mkdir ~/.oci
.NETと同じように、APIキーのconfigとoci_api_key.pem(秘密鍵)を.oci
ディレクトリ内に配置します。
# ls -l
-rw-r--r-- 1 root root 294 Apr 9 12:25 config
-rw-r--r-- 1 root root 1708 Apr 9 12:26 oci_api_key.pem
準備が整ったらPythonでStreamingデータを全件受信する
Consumerのコードを作成します。
vi consumer_all.py
・Consumerコード
import oci
from oci.config import from_file
import base64
config = from_file()
stream_client = oci.streaming.StreamClient(config, service_endpoint='[Streamingのエンドポイント]')
stream_id = '[StreamingのOCID]'
# TRIM_HORIZON タイプのカーソルを作成し、ストリームの最初から読み取る
cursor_details = oci.streaming.models.CreateCursorDetails(partition="0", type="TRIM_HORIZON")
cursor_response = stream_client.create_cursor(stream_id, cursor_details)
cursor = cursor_response.data.value
# 全てのメッセージを読み取る
while True:
messages_response = stream_client.get_messages(stream_id, cursor, limit=10000) # OCIの許容する最大値をセット
messages = messages_response.data
if not messages:
break # メッセージがなくなったら終了
for message in messages:
# メッセージをデコードして表示
print("オフセット:", message.offset, "メッセージ:", base64.b64decode(message.value).decode('utf-8'))
# 次のカーソルを更新
cursor = messages_response.headers["opc-next-cursor"]
※Max 10000万件までは表示できるコードなので必要に応じて適宜変更します。
StreamingのエンドポイントとOCIDを実際の値に変更したら実行してみます。
python3 consumer_all.py
実行結果は以下のようなイメージになっているかと思います。
オフセット: 1 メッセージ: {"ID":"id01","VALUE":"testdata_1","TIMESTAMP":"2024-05-02T10:18:12.8812690Z"}
オフセット: 2 メッセージ: {"ID":"id01","VALUE":"testdata_2","TIMESTAMP":"2024-05-02T10:18:14.4445751Z"}
オフセット: 3 メッセージ: {"ID":"id01","VALUE":"testdata_3","TIMESTAMP":"2024-05-02T10:18:14.5553259Z"}
オフセット: 4 メッセージ: {"ID":"id01","VALUE":"testdata_4","TIMESTAMP":"2024-05-02T10:18:14.6556413Z"}
オフセット: 5 メッセージ: {"ID":"id01","VALUE":"testdata_5","TIMESTAMP":"2024-05-02T10:18:14.7371521Z"}
※このオフセットは連番になっていてデータが登録される度にインクリメントされます。
以上でPython SDKでメッセージ受信は終了です。
おわり
OCI Streamingに溜まったデータをService Connector Hubを介して
オブジェクトストレージに格納したり、Function経由でDBにデータを
登録することも可能です。