0
2

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 1 year has passed since last update.

AWS IoTのトピックにJavaプロジェクトからMQTTをpublishした(aws-iot-device-sdk-java使用)

Last updated at Posted at 2022-05-27

はじめに

  • 結論だけ(何が必要なのかだけ)知りたい人は準備と出来上がったソースたちを読んで下さい。(でもできれば私の格闘様子も読んでほしい

  • aws-iot-device-sdk-javaを使用したくても公式ドキュメント(英語)しかなくて本当に辛かったので、私みたいな人を救いたくて自分用に残してたメモをほぼそのまま載せることにしました。救えるかは知りません。

  • AWSの構築手順などは割愛してます。あくまでJavaプロジェクト側の話のみです。

  • aws-iot-device-sdk-javaはv2でなくv1を使用してます。v2はまた少し違いそうなのでご注意を。

  • 全体の参考ドキュメント
    https://github.com/aws/aws-iot-device-sdk-java

  • APIドキュメント
    http://aws-iot-device-sdk-java-docs.s3-website-us-east-1.amazonaws.com

環境

  • Springboot2.6.7 + Thymeleaf
  • Java17.0.2

準備

サンプルのダウンロード

  • 参考ドキュメントを読み進めていたらところどころ「sampleにあるよ」との記載が。

→githubからダウンロードしておく

gradleリポジトリの追加

  • build.gradleのdependenciesに以下を追加
implementation 'com.amazonaws:aws-iot-device-sdk-java:1.3.10'

ライブラリの追加

  • 参考ドキュメントより

You will also need to add two libraries the SDK depends on:
Jackson 2.x, including Jackson-core jackson-core and Jackson-databind jackson-databind
Paho MQTT client for Java 1.1.x. download instructions

Jackson

  • もともと依存関係のjarファイルとして含まれていたのでパス

Paho MQTT client for Java

  • build.gradleのdependenciesに以下を追加
implementation 'org.eclipse.paho:org.eclipse.paho.client.mqttv3:1.1.0'

秘密鍵と証明書のダウンロード

  • AWSIoTで作成する証明書のこと。
  • 証明書を作ったときでないと、秘密鍵はダウンロードできないので注意
  • もし作ったときにダウンロードしてない!とか、どこにあるかわからん!とかあれば、再作成してもok
  • AWSIot > セキュリティ > 証明書から、証明書を作成で作成できる(再作成するときはMQTTpublishしたいコアと紐づける)
  • 作成過程で長ったらしい名前のファイルがダウンロードできる。名前は変更してもOK。src/main/resources配下に格納。

実装

  • まずはひたすらドキュメントを読み進める
  • 読んだ感じ、MQTT送信までは以下の流れっぽい
    • AWSIotMqttClientの初期化
    • MQTTの送信

MQTT接続タイプ

  • MQTTまたはMQTT over WebSocketを選択する必要がある
  • 接続タイプの違いによって、AWSIotMqttClientの初期化方法が変わる
  • 以下プロトコルの詳細について書かれたサイトを見ると、双方同じオペレーションを用いることができる→どちらでもよさそう

https://docs.aws.amazon.com/ja_jp/iot/latest/developerguide/protocols.html

  • 今回は証明書と鍵ファイルをあらかじめAWS構築担当者に作成してもらっているので、MQTTを選択することとする

AWSIotMqttClientの初期化

  • AWS IoTサービスにアクセスするためには必須
  • 以下ドキュメントより
String clientEndpoint = "<prefix>-ats.iot.<region>.amazonaws.com";   // use value returned by describe-endpoint --endpoint-type "iot:Data-ATS"
String clientId = "<unique client id>";                              // replace with your own client ID. Use unique client IDs for concurrent connections.
String certificateFile = "<certificate file>";                       // X.509 based certificate file
String privateKeyFile = "<private key file>";                        // PKCS#1 or PKCS#8 PEM encoded private key file

// SampleUtil.java and its dependency PrivateKeyReader.java can be copied from the sample source code.
// Alternatively, you could load key store directly from a file - see the example included in this README.
KeyStorePasswordPair pair = SampleUtil.getKeyStorePasswordPair(certificateFile, privateKeyFile);
AWSIotMqttClient client = new AWSIotMqttClient(clientEndpoint, clientId, pair.keyStore, pair.keyPassword);

// optional parameters can be set before connect()
client.connect();
  • SampleUtil.java and its dependency PrivateKeyReader.java can be copied from the sample source codeと丁寧にコメントに書いてある通り、一部はサンプルコードに記載がある
  • SampleUtil.getKeyStorePasswordPair()の呼び出し元を探していくと、「PublishSubscribeSample.java」というMQTT送信部分のサンプルを発見
  • そこにinitClient()というクライアントを初期化しているメソッドがあった

 →上記から紐づいてサンプルから必要そうなクラスやメソッドを取り出す

  • コマンドラインで実行するベースで書かれているので、メソッドから呼び出す方針に沿ってサンプルを修正しながら実装

publish

  • 上記初期化と合わせて実装した

動かない

  • AWSのトピックにパブリッシュして欲しい~~
  • と思い動かしたのだが以下のエラーが発生
2022/05/19 16:34:44 WARN  [MQTT Rec: xxx-greengrass-core] - Connect request failure
org.eclipse.paho.client.mqttv3.MqttException: 接続喪失
        at org.eclipse.paho.client.mqttv3.internal.CommsReceiver.run(CommsReceiver.java:146)
        at java.base/java.lang.Thread.run(Unknown Source)
Caused by: java.io.EOFException: null
        at java.base/java.io.DataInputStream.readByte(Unknown Source)
        at org.eclipse.paho.client.mqttv3.internal.wire.MqttInputStream.readMqttWireMessage(MqttInputStream.java:65)
        at org.eclipse.paho.client.mqttv3.internal.CommsReceiver.run(CommsReceiver.java:107)
        ... 1 common frames omitted
2022/05/19 16:34:44 INFO  [pool-5-thread-1] - Connection temporarily lost
2022/05/19 16:34:44 INFO  [pool-5-thread-1] - Client connection lost:xxx-greengrass-core
2022/05/19 16:35:14 INFO  [pool-5-thread-1] - Connection is being retried
  • デバッグで処理を追っていると、awsclientがconnectしようとするところで予期せぬNullが発生してエラーになっていることだけはわかった。
  • MqttExceptionやEOFExceptionで調べると、全く同じような悩みを抱えたエンジニアたちが嘆いていた。
  • 解決法は「証明書を作り直してみる」だとか、「ライブラリのバージョンを変えてみる」だとかあって見つかるものを片っ端から試してみたが全く事象解決せず。

しょうもない原因でした

エンドポイントのリージョンが間違っとるやないけ~~~~~!!!!!

  • まったく気づかなかった。調べた解決法の中にもエンドポイントが間違ってるかも?とかあったけど、「合っとるわ!コピーしてきたもん!」とあまり重視してなかった(だめ)
  • どうやらAWS操作時にそういった挙動がされてしまうぽい 
    • リージョンの概念を持たないサービスを開く(例えば IAM。リージョン表示が「グローバル」になる)
       →実はこの時点でURLのパラメータが「region=us-east-1」になっている
    • その後、リージョンが指定できるサービスを開く
    • リージョンが「us-east-1」(バージニア北部)に変わっている
  • リージョンの違いには本当に気をつけましょう。URLでリージョンを指定してログインすると再発防止になるよ。
  • リージョン違いを解消したら、publishまでしていた。(サンプルコードでpublishしたら無限ループで止まらないから慌てて強制終了した)

実装2

  • 上記は動きを担保するためのソースコードだったので、次は目的にあったコードにするぞの気持ち。

スレッドやめる

  • スレッドでpublishしてるけど、今回はMQTTが1度publishされてくれればOKなので、繰り返さなくていい
  • 並行処理もいらない
  • じゃあいらねえじゃあねえかああーーーッ!!!となったので、サンプルのスレッド部分はすべて削除することに。
  • 以下はこの考えに至るまでの経緯(寄り道とも言う)

無限ループをやめさせる

 →フラグ方式で実装してみる

ブロッキング形式を絞る

  • サンプルだとブロッキングとノンブロッキング方式の2つが記載されており、そのままだと2回publishすることとなる(しないで)
  • そもそもブロッキングがわかってないので調べた
    • ブロッキング:操作に対して順番に処理。待機時間が生じる。
    • ノンブロッキング:随時処理の進捗を確認。複数の処理を並行する。待ち時間がない。
  • 大量にメッセージを送るぜ!なら迷わずノンブロッキングだと思うのだが、今回は大量に送らない。(人間によるボタン押下のタイミングで送信される程度)
  • また、複数の処理を並行しない。publishしたら完了とするので、むしろ応答を待ちたい
  • ということでブロッキング形式にした。

初期化~publishを一つのクラスでまとめる

  • プロジェクト後続者に向けてもいる。初期化~publishを一つのクラスにまとめることで、「とりあえずpublishするならここ呼べばOK」状態にしておく。スッキリ。

証明書などのパスを設定値化する

  • ソースコードにベタ書きは流石にセンスを疑われてしまうので、かっこよく(?)設定ファイルに記載してセンスを褒められたい
  • それに、絶対パスで記載するとかマジで嫌われて距離を取られてしまうので、クラスパス(src/main/resource配下)から見た相対パスで書きたい

相対パスでのファイル渡し方

  • ファイルを実際に開いているところを変えた
//filenameにはsrc/main/resourceから見たパスを入れる
File file = null;
        try {
            file = new ClassPathResource(filename).getFile();
        } catch (IOException e1) {
            e1.printStackTrace();
        }

設定ファイル化

  • 設定ファイルにエンドポイント、クライアントid、証明書ファイルのパスを定義
aws.client.endpoint=AWSのエンドポイント
aws.client.id=任意のクライアントID。他と重複しなければなんでもOK
aws.client.certificatefile=証明書ファイル名
aws.client.privatekeyfile=秘密鍵ファイル名
  • 設定ファイルの値を取ってくる設定値クラス追加
@Component
@PropertySource("classpath:application.properties")
@Data
public class Config {

    @Value("${aws.client.endpoint}")
    private String awsClientEndpoint;

    @Value("${aws.client.id}")
    private String awsClientId;

    @Value("${aws.client.certificatefile}")
    private String awsClientCertificatefile;

    @Value("${aws.client.privatekeyfile}")
    private String awsClientPrivatekeyfile;
}

出来上がったソースたち

  • サンプルから引っ張り出してできたものの一覧化
  • 正直サンプルそのまま持ってきたところ(特に秘密鍵や証明書付近)は内容がよくわかってない(ここだけの話です)
ソース名 概要
Asn1Object.java サンプルそのまま。秘密鍵を解析するときに使っている。
DerParser.java サンプルそのまま。秘密鍵を解析するときに使っている。
KeyStorePasswordPair.java SampleUtil.javaにあった。どこに入れるか迷って結局独立したクラスにした
MqttClient.java SampleUtil.javaからAWSIoTクライアントの初期化、AWSIoTクライアントへの接続、publish部分を抜粋している。
MqttUtil.java サンプルに存在していた秘密鍵・証明書関連の処理をまとめた。
TopicListener.java もとはTestTopicLister.java。テストってクラス名に入ってんのやばいなと思って名前だけ変えた。
Config.java application.propertiesの設定値を読み込む。(前述)
  • サンプルそのまま持ってきたところはサンプルを見てください。
  • オリジナル(サンプルを抜粋したところとか)は以下に載せるので参考までに。。
KeyStorePasswordPair.java
@Data
@AllArgsConstructor
public class KeyStorePasswordPair {
    private KeyStore keyStore;

    private String keyPassword;

}
MqttClient.java
@Component
public class MqttClient {

    @Autowired
    Config config;

    @Autowired
    ResourceLoader resourceLoader;

    private static final AWSIotQos TOPIC_QOS = AWSIotQos.QOS0;

    /**
     * AWSに接続し、MQTTをpublishする
     *
     * @param topicName AWSのトピック名
     * @param message   publishするメッセージ
     * @throws AWSIotException
     * @throws InterruptedException
     * @throws AWSIotTimeoutException
     * @throws GeneralSecurityException
     * @throws IOException
     */
    public void publish(String topicName, String message) throws AWSIotException, InterruptedException,
            AWSIotTimeoutException, IOException, GeneralSecurityException {

        String clientEndpoint = config.getAwsClientEndpoint();
        String clientId = config.getAwsClientId();
        String certificateFile = config.getAwsClientCertificatefile();
        String privateKeyFile = config.getAwsClientPrivatekeyfile();
        // AWSIoTクライアントの初期化
        AWSIotMqttClient awsIotClient = initClient(clientEndpoint, clientId, certificateFile, privateKeyFile);

        // AWSIoTクライアントへの接続
        awsIotClient.connect();

        // メッセージのpublish
        publishMessage(awsIotClient, topicName, message);

        // AWSIoTクライアントの切断
        awsIotClient.disconnect();
    }

    /**
     * AWSIoTクライアントの初期化
     *
     * @param clientEndpoint  接続するAWSのエンドポイント
     * @param clientId        クライアントID
     * @param certificateFile 証明書のパス
     * @param privateKeyFile  秘密鍵のパス
     * @return AWSIoTクライアント
     * @throws GeneralSecurityException
     * @throws IOException
     */
    private AWSIotMqttClient initClient(String clientEndpoint, String clientId, String certificateFile,
            String privateKeyFile) throws IOException, GeneralSecurityException {

        if (certificateFile != null && privateKeyFile != null) {
            KeyStorePasswordPair pair = MqttUtil.getKeyStorePasswordPair(certificateFile, privateKeyFile,
                    resourceLoader);
            return new AWSIotMqttClient(clientEndpoint, clientId, pair.getKeyStore(), pair.getKeyPassword());
        }
        return null;
    }

    /**
     * メッセージのpublish
     *
     * @param awsIotClient AWSIoTクライアント
     * @param topicName    AWSのトピック名
     * @param message      publishするメッセージ
     * @throws InterruptedException
     * @throws AWSIotException
     * @throws AWSIotTimeoutException
     */
    private void publishMessage(AWSIotMqttClient awsIotClient, String topicName, String message)
            throws InterruptedException, AWSIotException, AWSIotTimeoutException {

        AWSIotTopic topic = new TopicListener(topicName, TOPIC_QOS);

        awsIotClient.subscribe(topic, true);

        try {
            awsIotClient.publish(topicName, message);
        } catch (AWSIotException e) {
            throw e;
        }
    }
}
MqttUtil.java
public class MqttUtil {

    /**
     * 証明書、秘密鍵からキーストアとパスワードのペアを取得する
     *
     * @param certificateFile 証明書のパス
     * @param privateKeyFile  秘密鍵のパス
     * @param resourceLoader  ResourceLoader
     * @return キーストアとパスワードのペア
     * @throws IOException
     * @throws GeneralSecurityException
     */
    public static KeyStorePasswordPair getKeyStorePasswordPair(String certificateFile, String privateKeyFile,
            ResourceLoader resourceLoader) throws IOException, GeneralSecurityException {
        if (certificateFile == null || privateKeyFile == null) {
            return null;
        }

        // 秘密鍵の読み取り
        PrivateKey privateKey = loadPrivateKeyFromFile(privateKeyFile, null, resourceLoader);

        // 証明書の読み取り
        List<Certificate> certChain = loadCertificatesFromFile(certificateFile, resourceLoader);

        if (certChain == null || privateKey == null) {
            return null;
        }

        return getKeyStorePasswordPair(certChain, privateKey);
    }

    /**
     * 証明書と秘密鍵の中身からキーストアとパスワードのペアを取得する
     * 
     * @param certificates 証明書の中身
     * @param privateKey   秘密鍵の中身
     * @return キーストアとパスワードのペア
     */
    private static KeyStorePasswordPair getKeyStorePasswordPair(final List<Certificate> certificates,
            final PrivateKey privateKey) {
        KeyStore keyStore;
        String keyPassword;
        try {
            keyStore = KeyStore.getInstance(KeyStore.getDefaultType());
            keyStore.load(null);

            // randomly generated key password for the key in the KeyStore
            keyPassword = new BigInteger(128, new SecureRandom()).toString(32);

            Certificate[] certChain = new Certificate[certificates.size()];
            certChain = certificates.toArray(certChain);
            keyStore.setKeyEntry("alias", privateKey, keyPassword.toCharArray(), certChain);
        } catch (KeyStoreException | NoSuchAlgorithmException | CertificateException | IOException e) {
            return null;
        }

        return new KeyStorePasswordPair(keyStore, keyPassword);
    }

    /**
     * 秘密鍵の読み取り
     *
     * @param filename       秘密鍵のパス
     * @param algorithm      証明書アルゴリズム(デフォルト:RSA)
     * @param resourceLoader ResourceLoader
     * @return 読み取った秘密鍵
     * @throws IOException
     * @throws GeneralSecurityException
     */
    private static PrivateKey loadPrivateKeyFromFile(final String filename, final String algorithm,
            ResourceLoader resourceLoader) throws IOException, GeneralSecurityException {
        PrivateKey privateKey = null;

        URL url = null;
        try {
            url = resourceLoader.getResource("classpath:" + filename).getURL();
        } catch (IOException e) {
            throw e;
        }
        try (DataInputStream stream = new DataInputStream(url.openStream())) {
            privateKey = getPrivateKey(stream, algorithm);
        } catch (IOException | GeneralSecurityException e) {
            throw e;
        }
        return privateKey;
    }

    /**
     * 秘密鍵の解析
     *
     * @param stream    秘密鍵ファイルのstream
     * @param algorithm 証明書アルゴリズム(デフォルト:RSA)
     * @return 解析した秘密鍵
     * @throws IOException
     * @throws GeneralSecurityException
     */
    private static PrivateKey getPrivateKey(InputStream stream, String algorithm)
            throws IOException, GeneralSecurityException {
        PrivateKey key = null;
        boolean isRSAKey = false;

        BufferedReader br = new BufferedReader(new InputStreamReader(stream, "UTF-8"));
        StringBuilder builder = new StringBuilder();
        boolean inKey = false;
        for (String line = br.readLine(); line != null; line = br.readLine()) {
            if (!inKey) {
                if (line.startsWith("-----BEGIN ") && line.endsWith(" PRIVATE KEY-----")) {
                    inKey = true;
                    isRSAKey = line.contains("RSA");
                }
                continue;
            } else {
                if (line.startsWith("-----END ") && line.endsWith(" PRIVATE KEY-----")) {
                    inKey = false;
                    isRSAKey = line.contains("RSA");
                    break;
                }
                builder.append(line);
            }
        }
        KeySpec keySpec = null;
        byte[] encoded = Base64.decodeBase64(builder.toString());
        if (isRSAKey) {
            keySpec = getRSAKeySpec(encoded);
        } else {
            keySpec = new PKCS8EncodedKeySpec(encoded);
        }
        KeyFactory kf = KeyFactory.getInstance((algorithm == null) ? "RSA" : algorithm);
        key = kf.generatePrivate(keySpec);

        return key;
    }

    /**
     * RSA秘密鍵の解析
     *
     * @param keyBytes 秘密鍵ファイルのエンコード結果
     * @return RSA秘密鍵
     * @throws IOException
     */
    private static RSAPrivateCrtKeySpec getRSAKeySpec(byte[] keyBytes) throws IOException {

        DerParser parser = new DerParser(keyBytes);

        Asn1Object sequence = parser.read();
        if (sequence.getType() != DerParser.SEQUENCE)
            throw new IOException("Invalid DER: not a sequence"); //$NON-NLS-1$

        // Parse inside the sequence
        parser = sequence.getParser();

        parser.read(); // Skip version
        BigInteger modulus = parser.read().getInteger();
        BigInteger publicExp = parser.read().getInteger();
        BigInteger privateExp = parser.read().getInteger();
        BigInteger prime1 = parser.read().getInteger();
        BigInteger prime2 = parser.read().getInteger();
        BigInteger exp1 = parser.read().getInteger();
        BigInteger exp2 = parser.read().getInteger();
        BigInteger crtCoef = parser.read().getInteger();

        RSAPrivateCrtKeySpec keySpec = new RSAPrivateCrtKeySpec(modulus, publicExp, privateExp, prime1, prime2, exp1,
                exp2, crtCoef);

        return keySpec;
    }

    /**
     * 証明書の読み取り
     *
     * @param filename       証明書のパス
     * @param resourceLoader ResourceLoader
     * @return 読み取った証明書
     * @throws IOException
     */
    private static List<Certificate> loadCertificatesFromFile(final String filename, ResourceLoader resourceLoader)
            throws IOException {

        URL url = null;
        try {
            url = resourceLoader.getResource("classpath:" + filename).getURL();
        } catch (IOException e) {
            throw e;
        }

        try (BufferedInputStream stream = new BufferedInputStream(url.openStream())) {
            final CertificateFactory certFactory = CertificateFactory.getInstance("X.509");
            return (List<Certificate>) certFactory.generateCertificates(stream);
        } catch (IOException | CertificateException e) {
        }
        return null;
    }

}
  • 実際に動かすときはMqttClient.javaのpublishメソッドを以下のような感じで呼び出せばOK
mqttClient.publish(publishしたいAWSのトピック名, publishするメッセージ);
  • AWSIoTのMQTT テストクライアントからトピックのサブスクリプトをして監視していると、publishされたメッセージが表示されるはず

まとめ

  • AWS側の設定は担当外で設定してないので、本当に情報皆無ですみません
  • 英語のドキュメントつらい。英語アレルギーで死ぬんじゃないかと思った。日本語でお願いします。
  • 公式くん、「今はaws-iot-device-sdk-javaはv1じゃなくてv2を使ってね!」と言っているので、今更役に立たないかもですが誰かに届けば嬉しいなあ、、
0
2
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
2

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?