LoginSignup
2
4

【OCI】.NET SDKを使ってOCI Streamingにデータを送信してみる

Posted at

はじめに

今回は.NET SDK(ASP.NET C#)を使ってOCI Streamingにデータを送信し、
Pythonで送信されたデータを確認(受信)してみたいと思います。

■概要
OCI StreamingはOracle Cloud Infrastructure(OCI)が提供する
マネージド型のストリーミングデータサービスです。
このサービスは大量のデータをリアルタイムで収集、処理、分析するために設計されており、
Apache Kafkaとの高い互換性を持っています。
Kafkaを使用した経験がある開発者は容易に移行や統合が可能です。
OCI Streamingを使用することでウェブアプリケーションやIoTデバイスなどから
大規模なデータストリーム構成を簡単に構築することができます。

・OCI Streamingの構成イメージ
メッセージ送信側をProducer、受信側をConsumerと呼びます。

image.png

■構成
今回検証ではローカルのクライアントPCにインストールしたVisual Studio(C#)で
テスト用アプリケーションを作成し、データ送信を行います。
受信はOCI上に立てたLinuxからPythonを使ってデータを確認します。

image.png

■事前準備
・アカウント
OCI Streamingが利用できるアカウントを準備します。

必要に応じて以下のようなポリシーを適用
Allow group [グループ名] to manage stream-family in compartment [コンパートメント名]
※ご自身の環境に合わせて適宜設定します。

・APIキー
OCI StreamingにアクセスするためのAPIキーを準備します。

参考
https://docs.oracle.com/ja-jp/iaas/Content/API/Concepts/apisigningkey.htm#two

・ネットワークの設定
適当にVCN、Subnet(今回はPublic)、Internet Gateway、セキュリティリスト等を
作成しておきます。

参考
https://oracle-japan.github.io/ocitutorials/beginners/creating-vcn/

・Linuxの設定
Pythonが利用できる環境を準備します。

・クライアンドPC(Windows)の設定
Visual Studio Community 2022をインストールし、C#が利用できるようにしておきます。
今回はASP.NET Webフォームを利用します。
(コンポーネントとしてはもう古いですが、
手っ取り早くテストができるため今回利用しています)

◆目次

  1. Streamingの作成
  2. .NET SDKでメッセージ送信
  3. Python SDKでメッセージ受信

1. Streamingの作成

初めにStreamingの作成を行います。

左上メニュー(Ξ)からアナリティクスとAI - ストリーミングを選択
image.png

ストリームの作成を選択
image.png

以下の通り入力・選択し、作成ボタンを選択

ストリーム名: 任意の名前(ここではstream01)
ラジオボタン: 新規ストリーム・プールの作成
ストリーム・プール名: 任意の名前(ここではstreampool01)
エンドポイント・タイプ: パブリック・エンドポイント
暗号化設定の構成: Oracle管理キーを使用した暗号化
ストリーム設定の定義 - 保持(時間): 24 / パーティション数: 1

image.png

作成してしばらくすると一覧の表示されるので、名前のリンクを選択
image.png

詳細にOCIDとメッセージ・エンドポイントが表示されているので
こちらをメモしておきます。
image.png

以上でStreamingの作成は終了です。

2. .NET SDKでメッセージ送信

①Visual Studioでプロジェクトを作成

新規プロジェクトの作成で、ASP.NET Webアプリケーション(.NET Framework)を選択
image.png

プロジェクト名を入力して作成を選択
ここでは、Producer_clientとしています。
image.png

空のプロジェクトを選択し、WebフォームとHTTP用の構成に
チェックを入れて作成を選択
image.png

プロジェクトが作成されたらNuGetで以下2つのSDKを追加します。

OCI.DotNetSDK.Common
OCI.DotNetSDK.Streaming

プロジェクト名を右クリックしたら
NuGetパッケージの管理を選択し、
検索ボックスから上記2つのSDKを選択して
それぞれインストールします。
image.png

最終的にインストールされるとこのようになります。
image.png

あとはプロジェクト名から右クリックをし、
image.png

Webフォームを追加します。
image.png

名前はWebForm1としています。
image.png

ソリューションエクスプローラにフォームが追加されたらダブルクリックし
ソースタブに切り替え次のHTMLコードに置き換えます。
image.png

・HTMLコード

<%@ Page Language="C#" AutoEventWireup="true" CodeBehind="WebForm1.aspx.cs" Inherits="Producer_client.WebForm1" Async="true" %>

<!DOCTYPE html>

<html xmlns="http://www.w3.org/1999/xhtml">
<head runat="server">
<meta http-equiv="Content-Type" content="text/html; charset=utf-8"/>
    <title></title>
    <style type="text/css">
        .auto-style2 {
            width: 605px;
        }
        .auto-style3 {
            width: 11px;
        }
    </style>
</head>
<body>
    <form id="form1" runat="server">
        <table style="width:100%;">
            <tr>
                <td class="auto-style3">ID</td>
                <td class="auto-style2">
            <asp:TextBox ID="TextBox1" runat="server"></asp:TextBox>
                </td>
            </tr>
            <tr>
                <td class="auto-style3">Value</td>
                <td class="auto-style2">
            <asp:TextBox ID="TextBox2" runat="server"></asp:TextBox>
                </td>
            </tr>
            <tr>
                <td class="auto-style3">Counter</td>
                <td class="auto-style2">
            <asp:TextBox ID="TextBox3" runat="server"></asp:TextBox>
                </td>
            </tr>
            <tr>
                <td class="auto-style3">&nbsp;</td>
                <td class="auto-style2">
        <asp:Button ID="Button1" runat="server" OnClick="Button1_Click" Text="Send" />
                </td>
            </tr>
            <tr>
                <td class="auto-style3">&nbsp;</td>
                <td class="auto-style2">
        <asp:Label ID="Label1" runat="server"></asp:Label>
                </td>
            </tr>
        </table>
    </form>
</body>
</html>

デザインタブに切り替えると次のようなフォームになっていることが確認できます。
image.png

画面の適当なところで右クリックし、コードの表示を選択すると画像のような
コードのスケルトンが書いてあるので、次のソースコードに置き換えます。
image.png

・ソースコード

using System;
using System.Collections.Generic;
using System.Text;
using System.Text.Json;
using System.Web.UI;

using Oci.Common.Auth;
using Oci.StreamingService;
using Oci.StreamingService.Requests;
using Oci.StreamingService.Models;

namespace Producer_client
{
    public partial class WebForm1 : System.Web.UI.Page
    {
        protected void Page_Load(object sender, EventArgs e)
        {
            // ページ読み込み時のロジック
        }

        protected async void Button1_Click(object sender, EventArgs e)
        {
            string id = TextBox1.Text;
            int loopCount = int.Parse(TextBox3.Text);
            int totalEntriesSent = 0;

            string configurationFilePath = Server.MapPath("~/conf/config");
            var provider = new ConfigFileAuthenticationDetailsProvider(configurationFilePath, "DEFAULT");
            var client = new StreamClient(provider);
            string streamId = "[StreamingのOCID]";
            string serviceEndpoint = "[Streamingのエンドポイント]";
            client.SetEndpoint(serviceEndpoint);

            for (int i = 1; i <= loopCount; i++)
            {
                string value = TextBox2.Text + "_" + i;
                string timestamp = DateTime.UtcNow.ToString("o");
                var data = new
                {
                    ID = id,
                    VALUE = value,
                    TIMESTAMP = timestamp
                };

                string dataJson = JsonSerializer.Serialize(data);
                byte[] encodedData = Encoding.UTF8.GetBytes(dataJson);
                byte[] keyData = Encoding.UTF8.GetBytes(id);

                var messageList = new List<PutMessagesDetailsEntry>
                {
                    new PutMessagesDetailsEntry {
                        Key = keyData,
                        Value = encodedData
                    }
                };

                var messages = new PutMessagesDetails { Messages = messageList };
                var request = new PutMessagesRequest { StreamId = streamId, PutMessagesDetails = messages };

                try
                {
                    var response = await client.PutMessages(request);
                    if (response.PutMessagesResult != null)
                    {
                        totalEntriesSent += response.PutMessagesResult.Entries.Count;
                    }
                }
                catch (Exception ex)
                {
                    Label1.Text += "Error in loop " + i + ": " + ex.Message + "\n";
                }
            }
            Label1.Text = "Total messages sent successfully: " + totalEntriesSent;
        }
    }
}

ソースコードの中の以下2点を実際の値に変更します。
→ string streamId = "[StreamingのOCID]";
→ string serviceEndpoint = "[Streamingのエンドポイント]";

またAPIでアクセスする必要があるため
.NETのプロジェクトフォルダ内にconfフォルダを作成し、
OCIへ接続するための、configと秘密鍵を格納しておきます。
→ string configurationFilePath = Server.MapPath("~/conf/config");

ファイル構造はこのような形になります。
プロジェクトフォルダ [Producer_client]
├─ bin/
├─ obj/
└─ conf/
       ├─ config
       └─ oci_api_key.pem(秘密鍵)

因みにconfigの中身はこのようなイメージとなります。

[DEFAULT]
user=[ユーザOCID]
fingerprint=[フィンガープリント]
key_file=C:/Users/xxx/program/Producer_client/conf/oci_api_key.pem
tenancy=[テナントOCID]
region=ap-tokyo-1

※秘密鍵のファイルのパスは、¥マークではなく、/なのでその点はご注意ください。

HTMLとソースコードを書き換えたら、F5でアプリケーションを実行すると
次のようなページが表示されます。
image.png

そこで次のような値を入れてSendボタンを押します。
ID: id01
Value: testdata
Counter: 5

image.png

正常に送信されると次のようなメッセージが表示されます。
image.png

この送信でデータが5件登録されています。
(Counterの件数=データ件数です)

以上で.NET SDKでメッセージ送信は終了です。

3. Python SDKでメッセージ受信

最後に送信されたデータを確認しますが、
OCIコンソールのStreamingからだと過去1分間しかデータ表示ができないため、
Pythonで表示できるようにしておくと便利です。

今回検証で利用したPythonのバージョンは以下の通りです。

# python --version
Python 3.6.8

※適宜Pythonが実行できる環境を用意しておきます。

あとはOCI Python SDKのインストールします。

pip install oci

続いてAPIキーの設定をするために、ホームディレクトリに
.ociのディレクトリを作成します。

mkdir ~/.oci

.NETと同じように、APIキーのconfigとoci_api_key.pem(秘密鍵)を.oci
ディレクトリ内に配置します。

# ls -l
-rw-r--r-- 1 root root  294 Apr  9 12:25 config
-rw-r--r-- 1 root root 1708 Apr  9 12:26 oci_api_key.pem

準備が整ったらPythonでStreamingデータを全件受信する
Consumerのコードを作成します。

vi consumer_all.py

・Consumerコード

import oci
from oci.config import from_file
import base64

config = from_file()
stream_client = oci.streaming.StreamClient(config, service_endpoint='[Streamingのエンドポイント]')
stream_id = '[StreamingのOCID]'

# TRIM_HORIZON タイプのカーソルを作成し、ストリームの最初から読み取る
cursor_details = oci.streaming.models.CreateCursorDetails(partition="0", type="TRIM_HORIZON")
cursor_response = stream_client.create_cursor(stream_id, cursor_details)
cursor = cursor_response.data.value

# 全てのメッセージを読み取る
while True:
    messages_response = stream_client.get_messages(stream_id, cursor, limit=10000)  # OCIの許容する最大値をセット
    messages = messages_response.data
    if not messages:
        break  # メッセージがなくなったら終了
    for message in messages:
        # メッセージをデコードして表示
        print("オフセット:", message.offset, "メッセージ:", base64.b64decode(message.value).decode('utf-8'))
    # 次のカーソルを更新
    cursor = messages_response.headers["opc-next-cursor"]

※Max 10000万件までは表示できるコードなので必要に応じて適宜変更します。

StreamingのエンドポイントとOCIDを実際の値に変更したら実行してみます。

python3 consumer_all.py

実行結果は以下のようなイメージになっているかと思います。

オフセット: 1 メッセージ: {"ID":"id01","VALUE":"testdata_1","TIMESTAMP":"2024-05-02T10:18:12.8812690Z"}
オフセット: 2 メッセージ: {"ID":"id01","VALUE":"testdata_2","TIMESTAMP":"2024-05-02T10:18:14.4445751Z"}
オフセット: 3 メッセージ: {"ID":"id01","VALUE":"testdata_3","TIMESTAMP":"2024-05-02T10:18:14.5553259Z"}
オフセット: 4 メッセージ: {"ID":"id01","VALUE":"testdata_4","TIMESTAMP":"2024-05-02T10:18:14.6556413Z"}
オフセット: 5 メッセージ: {"ID":"id01","VALUE":"testdata_5","TIMESTAMP":"2024-05-02T10:18:14.7371521Z"}

※このオフセットは連番になっていてデータが登録される度にインクリメントされます。

以上でPython SDKでメッセージ受信は終了です。

おわり

OCI Streamingに溜まったデータをService Connector Hubを介して
オブジェクトストレージに格納したり、Function経由でDBにデータを
登録することも可能です。

2
4
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
2
4