Help us understand the problem. What is going on with this article?

Azure EventhubsにSASトークンを使う方法(Java)

はじめに

こんにちは、とがりと申します。
SREをしています。

なぜ書いたか

Azure Eventhubsを使っていて調べても全然ドキュメント出てこなく、詰まったから。
本記事はJavaの環境が既に用意されている人を対象にしております。(maven, gradle, etc.)

本題

やりたいこと

アンドロイドアプリからAzure Eventhubsにデータを送信

Shared Access Signature 用いてEventhubsにアクセスしたい。1
しかし、SASを持っていればEventhubsに対して半永久的にアクセスすることができてしまうため
クライアントサイドに配置するのはセキュリティの観点で不安である。
何かいい方法がないかと探していたところSasからTokenを生成できるという情報を得た。
なのでSasTokenを用いてEventhubsにデータを送信する。
詰まった部分もあったので備忘録として記す。

やってみる

  1. SasTokenを作成する
  2. SasTokenを用いてEventhubsにメッセージを送信する
  3. ちゃんとメッセージが届いているかチェックする
1. SasTokenを作成する

Azure ポータル
→Eventhubs 名前空間
→Eventhubs インスタンス
→共有アクセスポリシー
→共有アクセスポリシー名
→主キー&接続文字列–主キー
を使います。
接続文字列–主キーの
Endpoint=sb://(Eventhubs名).servicebus.windows.net/;SharedAccessKeyName=.......;EntityPath=(エントリーポイント)
これを変形して以下の[RESOURCEURI]に用います

スクリーンショット 2020-01-24 9.55.05.png

使う変数
[RESOURCEURI]=sb://(Eventhubs名).servicebus.windows.net/(エントリーポイント)
[KEYNAME]=共有アクセスポリシー名
[KEY]=共有アクセスポリシー名の主キー
SASToken.java
import java.io.UnsupportedEncodingException;
import java.net.URLEncoder;
import java.security.InvalidKeyException;
import java.security.NoSuchAlgorithmException;
import java.util.Base64;
import javax.crypto.Mac;
import javax.crypto.spec.SecretKeySpec;

public class SASToken
{
    public static void main(String[] args) {
        long epoch = System.currentTimeMillis() / 1000L;
        int week = 60 * 60 * 24 * 7;
        String resourceUri ="[RESOURCEURI]";
        String keyName = "[KEYNAME]";
        String key = "[KEY]";
        String expiry = Long.toString(epoch + week);
        String sasToken = null;
        try {
            String stringToSign = URLEncoder.encode(resourceUri, "UTF-8") + "\n" + expiry;
            String signature = getHMAC256(key, stringToSign);
            sasToken = "SharedAccessSignature sr=" + URLEncoder.encode(resourceUri, "UTF-8") + "&sig=" +
                    URLEncoder.encode(signature, "UTF-8") + "&se=" + expiry + "&skn=" + keyName;
        } catch (UnsupportedEncodingException e) {
            e.printStackTrace();
        }
        System.out.println(sasToken);
    }

    // hash値を返す関数
    public static String getHMAC256 (String key, String input){
        Mac sha256_HMAC = null;
        String hash = null;
        try {
            sha256_HMAC = Mac.getInstance("HmacSHA256");
            SecretKeySpec secret_key = new SecretKeySpec(key.getBytes(), "HmacSHA256");
            sha256_HMAC.init(secret_key);
            Base64.Encoder encoder = Base64.getEncoder();
            hash = new String(encoder.encode(sha256_HMAC.doFinal(input.getBytes("UTF-8"))));

        } catch (InvalidKeyException e) {
            e.printStackTrace();
        } catch (NoSuchAlgorithmException e) {
            e.printStackTrace();
        } catch (IllegalStateException e) {
            e.printStackTrace();
        } catch (UnsupportedEncodingException e) {
            e.printStackTrace();
        }
        return hash;
    }
}

これを実行するとsasTokenが得られます。

2. SasTokenを用いてEventhubsにメッセージを送信する
使う変数
[1で得たSasToken]=上記の結果
[NAMESPACENAME]=イベントハブの名前空間
[EVENTHUBNAME]=イベントハブインスタンスの名前
SendToEventhubs.java
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import com.microsoft.azure.eventhubs.ConnectionStringBuilder;

import com.microsoft.azure.eventhubs.EventData;
import com.microsoft.azure.eventhubs.EventHubClient;
import com.microsoft.azure.eventhubs.EventHubException;

import java.io.IOException;
import java.nio.charset.Charset;
import java.time.Instant;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;


public class SendToEventhubs {
    public static void main(String[] args)
            throws EventHubException, ExecutionException, InterruptedException, IOException {

        String sas = "[1で得たSasToken]";
// 認証情報をセット
        final ConnectionStringBuilder connStr = new ConnectionStringBuilder()
                .setNamespaceName("[NAMESPACENAME]")
                .setEventHubName("[EVENTHUBNAME]")
                .setSharedAccessSignature(sas);
        final Gson gson = new GsonBuilder().create();
        final ScheduledExecutorService executorService = Executors.newScheduledThreadPool(4);
// イベントハブクライアントを記述
        final EventHubClient ehClient = EventHubClient.createSync(connStr.toString(), executorService);

        try {
            for (int i = 0; i < 1; i++) {

                String payload = "Hello, Eventhubs !! (ここにメッセージいれる)";
                byte[] payloadBytes = gson.toJson(payload).getBytes(Charset.defaultCharset());

                EventData sendEvent = EventData.create(payloadBytes);
                System.out.println(sendEvent);
// イベントハブに送信
                ehClient.sendSync(sendEvent);
            }
            System.out.println(Instant.now() + ": Send Complete...");
            System.out.println("Press Enter to stop.");
            System.in.read();
        } finally {
            ehClient.closeSync();
            executorService.shutdown();
        }
    }
}

これでイベントの送信が完了しました。ただこれでは確認できないので受信する方法も乗せておきます。

3. ちゃんとメッセージが届いているかチェックする
使う変数
        String consumerGroupName = "$Default";
        String namespaceName = "[NAMESPACENAME]";
        String eventHubName = "[EVENTHUBNAME]";
        String sasKeyName = "[KEYNAME]";
        String sasKey = "[KEY]";
        String storageConnectionString = "[STORAGECONNECTION]"; //ポータル→ストレージアカウント→アクセスキー
        String storageContainerName = "[STORAGECONTAINERNAME]"; //ポータル→ストレージアカウント→コンテナー
        String hostNamePrefix = "";
EventProcessorSample.java
import com.microsoft.azure.eventhubs.ConnectionStringBuilder;
import com.microsoft.azure.eventhubs.EventData;
import com.microsoft.azure.eventprocessorhost.CloseReason;
import com.microsoft.azure.eventprocessorhost.EventProcessorHost;
import com.microsoft.azure.eventprocessorhost.EventProcessorOptions;
import com.microsoft.azure.eventprocessorhost.ExceptionReceivedEventArgs;
import com.microsoft.azure.eventprocessorhost.IEventProcessor;
import com.microsoft.azure.eventprocessorhost.PartitionContext;

import java.util.concurrent.ExecutionException;
import java.util.function.Consumer;

public class EventProcessorSample
{
    public static void main(String args[]) throws InterruptedException, ExecutionException
    {
        String consumerGroupName = "$Default";
        String namespaceName = "[NAMESPACENAME]";
        String eventHubName = "[EVENTHUBNAME]";
        String sasKeyName = "[KEYNAME]";
        String sasKey = "[KEY]";
        String storageConnectionString = "[STORAGECONNECTION]";
        String storageContainerName = "[STORAGECONTAINERNAME]";
        String hostNamePrefix = "";


        ConnectionStringBuilder eventHubConnectionString = new ConnectionStringBuilder()
                .setNamespaceName(namespaceName)
                .setEventHubName(eventHubName)
                .setSasKeyName(sasKeyName)
                .setSasKey(sasKey);

        EventProcessorHost host = new EventProcessorHost(
                EventProcessorHost.createHostName(hostNamePrefix),
                eventHubName,
                consumerGroupName,
                eventHubConnectionString.toString(),
                storageConnectionString,
                storageContainerName);

        System.out.println("Registering host named " + host.getHostName());
        EventProcessorOptions options = new EventProcessorOptions();
        options.setExceptionNotification(new ErrorNotificationHandler());

        host.registerEventProcessor(EventProcessor.class, options)
                .whenComplete((unused, e) ->
                {
                    if (e != null)
                    {
                        System.out.println("Failure while registering: " + e.toString());
                        if (e.getCause() != null)
                        {
                            System.out.println("Inner exception: " + e.getCause().toString());
                        }
                    }
                })
                .thenAccept((unused) ->
                {
                    System.out.println("Press enter to stop.");
                    try
                    {
                        System.in.read();
                    }
                    catch (Exception e)
                    {
                        System.out.println("Keyboard read failed: " + e.toString());
                    }
                })
                .thenCompose((unused) ->
                {
                    return host.unregisterEventProcessor();
                })
                .exceptionally((e) ->
                {
                    System.out.println("Failure while unregistering: " + e.toString());
                    if (e.getCause() != null)
                    {
                        System.out.println("Inner exception: " + e.getCause().toString());
                    }
                    return null;
                })
                .get(); // Wait for everything to finish before exiting main!

        System.out.println("End of sample");
    }

    // The general notification handler is an object that derives from Consumer<> and takes an ExceptionReceivedEventArgs object
    // as an argument. The argument provides the details of the error: the exception that occurred and the action (what EventProcessorHost
    // was doing) during which the error occurred. The complete list of actions can be found in EventProcessorHostActionStrings.
    public static class ErrorNotificationHandler implements Consumer<ExceptionReceivedEventArgs>
    {
        @Override
        public void accept(ExceptionReceivedEventArgs t)
        {
            System.out.println("SAMPLE: Host " + t.getHostname() + " received general error notification during " + t.getAction() + ": " + t.getException().toString());
        }
    }

    public static class EventProcessor implements IEventProcessor
    {
        private int checkpointBatchingCount = 0;

        // OnOpen is called when a new event processor instance is created by the host.
        @Override
        public void onOpen(PartitionContext context) throws Exception
        {
            System.out.println("SAMPLE: Partition " + context.getPartitionId() + " is opening");
        }

        // OnClose is called when an event processor instance is being shut down.
        @Override
        public void onClose(PartitionContext context, CloseReason reason) throws Exception
        {
            System.out.println("SAMPLE: Partition " + context.getPartitionId() + " is closing for reason " + reason.toString());
        }

        // onError is called when an error occurs in EventProcessorHost code that is tied to this partition, such as a receiver failure.
        @Override
        public void onError(PartitionContext context, Throwable error)
        {
            System.out.println("SAMPLE: Partition " + context.getPartitionId() + " onError: " + error.toString());
        }

        // onEvents is called when events are received on this partition of the Event Hub.
        @Override
        public void onEvents(PartitionContext context, Iterable<EventData> events) throws Exception
        {
            System.out.println("SAMPLE: Partition " + context.getPartitionId() + " got event batch");
            int eventCount = 0;
            for (EventData data : events)
            {
                try
                {
                    System.out.println("SAMPLE (" + context.getPartitionId() + "," + data.getSystemProperties().getOffset() + "," +
                            data.getSystemProperties().getSequenceNumber() + "): " + new String(data.getBytes(), "UTF8"));
                    eventCount++;

                    // Checkpointing persists the current position in the event stream for this partition and means that the next
                    // time any host opens an event processor on this event hub+consumer group+partition combination, it will start
                    // receiving at the event after this one.
                    this.checkpointBatchingCount++;
                    if ((checkpointBatchingCount % 5) == 0)
                    {
                        System.out.println("SAMPLE: Partition " + context.getPartitionId() + " checkpointing at " +
                                data.getSystemProperties().getOffset() + "," + data.getSystemProperties().getSequenceNumber());
                        // Checkpoints are created asynchronously. It is important to wait for the result of checkpointing
                        // before exiting onEvents or before creating the next checkpoint, to detect errors and to ensure proper ordering.
                        context.checkpoint(data).get();
                    }
                }
                catch (Exception e)
                {
                    System.out.println("Processing failed for an event: " + e.toString());
                }
            }
            System.out.println("SAMPLE: Partition " + context.getPartitionId() + " batch size was " + eventCount + " for host " + context.getOwner());
        }
    }
}

こちらのコードを実行することでEventhubs に送信されたデータをリアルタイムに受信することができます。

私が詰まったところ

  • SASTokenを作成するところでResourceUriが何を指しているかわからなかった。
  • SasTokenがどこからどこまでかわからなかった。srからなのか、=の後からなのか全体なのか。 (SharedAccessSignature sr=...)
  • メッセージを送信するときにSasトークンと他になんの情報が必要なのか。

これらをそれぞれ通りの組み合わせでトライエラーをする必要があったので時間と労力がかかった。
この記事を読んでくださったみなさんの何らかの助けとなれば嬉しいです。

参考文献

Java を使用して Azure Event Hubs との間でイベントを送受信する
Shared Access Signature (SAS) を使用して Event Hubs リソースへのアクセスを認証する

最後に

最後まで読んでいただきありがとうございます!
疑問点などございましたら、コメントお待ちしております。

Why not register and get more from Qiita?
  1. We will deliver articles that match you
    By following users and tags, you can catch up information on technical fields that you are interested in as a whole
  2. you can read useful information later efficiently
    By "stocking" the articles you like, you can search right away
Comments
No comments
Sign up for free and join this conversation.
If you already have a Qiita account
Why do not you register as a user and use Qiita more conveniently?
You need to log in to use this function. Qiita can be used more conveniently after logging in.
You seem to be reading articles frequently this month. Qiita can be used more conveniently after logging in.
  1. We will deliver articles that match you
    By following users and tags, you can catch up information on technical fields that you are interested in as a whole
  2. you can read useful information later efficiently
    By "stocking" the articles you like, you can search right away
ユーザーは見つかりませんでした