はじめに
-
結論だけ(何が必要なのかだけ)知りたい人は準備と出来上がったソースたちを読んで下さい。(
でもできれば私の格闘様子も読んでほしい) -
aws-iot-device-sdk-javaを使用したくても公式ドキュメント(英語)しかなくて本当に辛かったので、私みたいな人を救いたくて自分用に残してたメモをほぼそのまま載せることにしました。救えるかは知りません。
-
AWSの構築手順などは割愛してます。あくまでJavaプロジェクト側の話のみです。
-
aws-iot-device-sdk-javaはv2でなくv1を使用してます。v2はまた少し違いそうなのでご注意を。
-
全体の参考ドキュメント
https://github.com/aws/aws-iot-device-sdk-java -
APIドキュメント
http://aws-iot-device-sdk-java-docs.s3-website-us-east-1.amazonaws.com
環境
- Springboot2.6.7 + Thymeleaf
- Java17.0.2
準備
サンプルのダウンロード
- 参考ドキュメントを読み進めていたらところどころ「sampleにあるよ」との記載が。
→githubからダウンロードしておく
gradleリポジトリの追加
- build.gradleのdependenciesに以下を追加
implementation 'com.amazonaws:aws-iot-device-sdk-java:1.3.10'
ライブラリの追加
- 参考ドキュメントより
You will also need to add two libraries the SDK depends on:
Jackson 2.x, including Jackson-core jackson-core and Jackson-databind jackson-databind
Paho MQTT client for Java 1.1.x. download instructions
Jackson
- もともと依存関係のjarファイルとして含まれていたのでパス
Paho MQTT client for Java
- build.gradleのdependenciesに以下を追加
implementation 'org.eclipse.paho:org.eclipse.paho.client.mqttv3:1.1.0'
秘密鍵と証明書のダウンロード
- AWSIoTで作成する証明書のこと。
- 証明書を作ったときでないと、秘密鍵はダウンロードできないので注意
- もし作ったときにダウンロードしてない!とか、どこにあるかわからん!とかあれば、再作成してもok
- AWSIot > セキュリティ > 証明書から、証明書を作成で作成できる(再作成するときはMQTTpublishしたいコアと紐づける)
- 作成過程で長ったらしい名前のファイルがダウンロードできる。名前は変更してもOK。src/main/resources配下に格納。
実装
- まずはひたすらドキュメントを読み進める
- 読んだ感じ、MQTT送信までは以下の流れっぽい
-
AWSIotMqttClient
の初期化 - MQTTの送信
-
MQTT接続タイプ
- MQTTまたはMQTT over WebSocketを選択する必要がある
- 接続タイプの違いによって、AWSIotMqttClientの初期化方法が変わる
- 以下プロトコルの詳細について書かれたサイトを見ると、双方同じオペレーションを用いることができる→どちらでもよさそう
https://docs.aws.amazon.com/ja_jp/iot/latest/developerguide/protocols.html
- 今回は証明書と鍵ファイルをあらかじめAWS構築担当者に作成してもらっているので、MQTTを選択することとする
AWSIotMqttClientの初期化
- AWS IoTサービスにアクセスするためには必須
- 以下ドキュメントより
String clientEndpoint = "<prefix>-ats.iot.<region>.amazonaws.com"; // use value returned by describe-endpoint --endpoint-type "iot:Data-ATS"
String clientId = "<unique client id>"; // replace with your own client ID. Use unique client IDs for concurrent connections.
String certificateFile = "<certificate file>"; // X.509 based certificate file
String privateKeyFile = "<private key file>"; // PKCS#1 or PKCS#8 PEM encoded private key file
// SampleUtil.java and its dependency PrivateKeyReader.java can be copied from the sample source code.
// Alternatively, you could load key store directly from a file - see the example included in this README.
KeyStorePasswordPair pair = SampleUtil.getKeyStorePasswordPair(certificateFile, privateKeyFile);
AWSIotMqttClient client = new AWSIotMqttClient(clientEndpoint, clientId, pair.keyStore, pair.keyPassword);
// optional parameters can be set before connect()
client.connect();
-
SampleUtil.java and its dependency PrivateKeyReader.java can be copied from the sample source code
と丁寧にコメントに書いてある通り、一部はサンプルコードに記載がある -
SampleUtil.getKeyStorePasswordPair()
の呼び出し元を探していくと、「PublishSubscribeSample.java」というMQTT送信部分のサンプルを発見 - そこに
initClient()
というクライアントを初期化しているメソッドがあった
→上記から紐づいてサンプルから必要そうなクラスやメソッドを取り出す
- コマンドラインで実行するベースで書かれているので、メソッドから呼び出す方針に沿ってサンプルを修正しながら実装
publish
- 上記初期化と合わせて実装した
動かない
- AWSのトピックにパブリッシュして欲しい~~
- と思い動かしたのだが以下のエラーが発生
2022/05/19 16:34:44 WARN [MQTT Rec: xxx-greengrass-core] - Connect request failure
org.eclipse.paho.client.mqttv3.MqttException: 接続喪失
at org.eclipse.paho.client.mqttv3.internal.CommsReceiver.run(CommsReceiver.java:146)
at java.base/java.lang.Thread.run(Unknown Source)
Caused by: java.io.EOFException: null
at java.base/java.io.DataInputStream.readByte(Unknown Source)
at org.eclipse.paho.client.mqttv3.internal.wire.MqttInputStream.readMqttWireMessage(MqttInputStream.java:65)
at org.eclipse.paho.client.mqttv3.internal.CommsReceiver.run(CommsReceiver.java:107)
... 1 common frames omitted
2022/05/19 16:34:44 INFO [pool-5-thread-1] - Connection temporarily lost
2022/05/19 16:34:44 INFO [pool-5-thread-1] - Client connection lost:xxx-greengrass-core
2022/05/19 16:35:14 INFO [pool-5-thread-1] - Connection is being retried
- デバッグで処理を追っていると、awsclientがconnectしようとするところで予期せぬNullが発生してエラーになっていることだけはわかった。
- MqttExceptionやEOFExceptionで調べると、全く同じような悩みを抱えたエンジニアたちが嘆いていた。
- 解決法は「証明書を作り直してみる」だとか、「ライブラリのバージョンを変えてみる」だとかあって見つかるものを片っ端から試してみたが全く事象解決せず。
しょうもない原因でした
エンドポイントのリージョンが間違っとるやないけ~~~~~!!!!!
- まったく気づかなかった。調べた解決法の中にもエンドポイントが間違ってるかも?とかあったけど、「合っとるわ!コピーしてきたもん!」とあまり重視してなかった(だめ)
- どうやらAWS操作時にそういった挙動がされてしまうぽい
- リージョンの概念を持たないサービスを開く(例えば IAM。リージョン表示が「グローバル」になる)
→実はこの時点でURLのパラメータが「region=us-east-1」になっている - その後、リージョンが指定できるサービスを開く
- リージョンが「us-east-1」(バージニア北部)に変わっている
- リージョンの概念を持たないサービスを開く(例えば IAM。リージョン表示が「グローバル」になる)
- リージョンの違いには本当に気をつけましょう。URLでリージョンを指定してログインすると再発防止になるよ。
- リージョン違いを解消したら、publishまでしていた。(サンプルコードでpublishしたら無限ループで止まらないから慌てて強制終了した)
実装2
- 上記は動きを担保するためのソースコードだったので、次は目的にあったコードにするぞの気持ち。
スレッドやめる
- スレッドでpublishしてるけど、今回はMQTTが1度publishされてくれればOKなので、繰り返さなくていい
- 並行処理もいらない
- じゃあいらねえじゃあねえかああーーーッ!!!となったので、サンプルのスレッド部分はすべて削除することに。
- 以下はこの考えに至るまでの経緯(寄り道とも言う)
無限ループをやめさせる
-
前述の通り、サンプルコードだと無限ループでpublishする。スレッドが停止されていないから、だということだけはわかったが、止め方がわからないので調べる
-
thread.start()で始まっとるんやからstopやろ()大人なめんなよ、と思ったら非推奨でした(大人なのに号泣した)
-
以下の2種類のやり方がありそう
- フラグを立てて、trueの間は繰り返しfalseになったらやめさせるという構造にする
- interrupt()を使用する(割り込み用)
-
今回は割り込みしたいわけではなく、一度publish後終わってもらえればそれでよい。
→フラグ方式で実装してみる
ブロッキング形式を絞る
- サンプルだとブロッキングとノンブロッキング方式の2つが記載されており、そのままだと2回publishすることとなる(しないで)
- そもそもブロッキングがわかってないので調べた
- ブロッキング:操作に対して順番に処理。待機時間が生じる。
- ノンブロッキング:随時処理の進捗を確認。複数の処理を並行する。待ち時間がない。
- 大量にメッセージを送るぜ!なら迷わずノンブロッキングだと思うのだが、今回は大量に送らない。(人間によるボタン押下のタイミングで送信される程度)
- また、複数の処理を並行しない。publishしたら完了とするので、むしろ応答を待ちたい
- ということでブロッキング形式にした。
初期化~publishを一つのクラスでまとめる
- プロジェクト後続者に向けてもいる。初期化~publishを一つのクラスにまとめることで、「とりあえずpublishするならここ呼べばOK」状態にしておく。スッキリ。
証明書などのパスを設定値化する
- ソースコードにベタ書きは流石にセンスを疑われてしまうので、かっこよく(?)設定ファイルに記載してセンスを褒められたい
- それに、絶対パスで記載するとかマジで嫌われて距離を取られてしまうので、クラスパス(src/main/resource配下)から見た相対パスで書きたい
相対パスでのファイル渡し方
- ファイルを実際に開いているところを変えた
//filenameにはsrc/main/resourceから見たパスを入れる
File file = null;
try {
file = new ClassPathResource(filename).getFile();
} catch (IOException e1) {
e1.printStackTrace();
}
設定ファイル化
- 設定ファイルにエンドポイント、クライアントid、証明書ファイルのパスを定義
aws.client.endpoint=AWSのエンドポイント
aws.client.id=任意のクライアントID。他と重複しなければなんでもOK
aws.client.certificatefile=証明書ファイル名
aws.client.privatekeyfile=秘密鍵ファイル名
- 設定ファイルの値を取ってくる設定値クラス追加
@Component
@PropertySource("classpath:application.properties")
@Data
public class Config {
@Value("${aws.client.endpoint}")
private String awsClientEndpoint;
@Value("${aws.client.id}")
private String awsClientId;
@Value("${aws.client.certificatefile}")
private String awsClientCertificatefile;
@Value("${aws.client.privatekeyfile}")
private String awsClientPrivatekeyfile;
}
出来上がったソースたち
- サンプルから引っ張り出してできたものの一覧化
- 正直サンプルそのまま持ってきたところ(特に秘密鍵や証明書付近)は内容がよくわかってない(ここだけの話です)
ソース名 | 概要 |
---|---|
Asn1Object.java | サンプルそのまま。秘密鍵を解析するときに使っている。 |
DerParser.java | サンプルそのまま。秘密鍵を解析するときに使っている。 |
KeyStorePasswordPair.java | SampleUtil.javaにあった。どこに入れるか迷って結局独立したクラスにした |
MqttClient.java | SampleUtil.javaからAWSIoTクライアントの初期化、AWSIoTクライアントへの接続、publish部分を抜粋している。 |
MqttUtil.java | サンプルに存在していた秘密鍵・証明書関連の処理をまとめた。 |
TopicListener.java | もとはTestTopicLister.java。テストってクラス名に入ってんのやばいなと思って名前だけ変えた。 |
Config.java | application.propertiesの設定値を読み込む。(前述) |
- サンプルそのまま持ってきたところはサンプルを見てください。
- オリジナル(サンプルを抜粋したところとか)は以下に載せるので参考までに。。
@Data
@AllArgsConstructor
public class KeyStorePasswordPair {
private KeyStore keyStore;
private String keyPassword;
}
@Component
public class MqttClient {
@Autowired
Config config;
@Autowired
ResourceLoader resourceLoader;
private static final AWSIotQos TOPIC_QOS = AWSIotQos.QOS0;
/**
* AWSに接続し、MQTTをpublishする
*
* @param topicName AWSのトピック名
* @param message publishするメッセージ
* @throws AWSIotException
* @throws InterruptedException
* @throws AWSIotTimeoutException
* @throws GeneralSecurityException
* @throws IOException
*/
public void publish(String topicName, String message) throws AWSIotException, InterruptedException,
AWSIotTimeoutException, IOException, GeneralSecurityException {
String clientEndpoint = config.getAwsClientEndpoint();
String clientId = config.getAwsClientId();
String certificateFile = config.getAwsClientCertificatefile();
String privateKeyFile = config.getAwsClientPrivatekeyfile();
// AWSIoTクライアントの初期化
AWSIotMqttClient awsIotClient = initClient(clientEndpoint, clientId, certificateFile, privateKeyFile);
// AWSIoTクライアントへの接続
awsIotClient.connect();
// メッセージのpublish
publishMessage(awsIotClient, topicName, message);
// AWSIoTクライアントの切断
awsIotClient.disconnect();
}
/**
* AWSIoTクライアントの初期化
*
* @param clientEndpoint 接続するAWSのエンドポイント
* @param clientId クライアントID
* @param certificateFile 証明書のパス
* @param privateKeyFile 秘密鍵のパス
* @return AWSIoTクライアント
* @throws GeneralSecurityException
* @throws IOException
*/
private AWSIotMqttClient initClient(String clientEndpoint, String clientId, String certificateFile,
String privateKeyFile) throws IOException, GeneralSecurityException {
if (certificateFile != null && privateKeyFile != null) {
KeyStorePasswordPair pair = MqttUtil.getKeyStorePasswordPair(certificateFile, privateKeyFile,
resourceLoader);
return new AWSIotMqttClient(clientEndpoint, clientId, pair.getKeyStore(), pair.getKeyPassword());
}
return null;
}
/**
* メッセージのpublish
*
* @param awsIotClient AWSIoTクライアント
* @param topicName AWSのトピック名
* @param message publishするメッセージ
* @throws InterruptedException
* @throws AWSIotException
* @throws AWSIotTimeoutException
*/
private void publishMessage(AWSIotMqttClient awsIotClient, String topicName, String message)
throws InterruptedException, AWSIotException, AWSIotTimeoutException {
AWSIotTopic topic = new TopicListener(topicName, TOPIC_QOS);
awsIotClient.subscribe(topic, true);
try {
awsIotClient.publish(topicName, message);
} catch (AWSIotException e) {
throw e;
}
}
}
public class MqttUtil {
/**
* 証明書、秘密鍵からキーストアとパスワードのペアを取得する
*
* @param certificateFile 証明書のパス
* @param privateKeyFile 秘密鍵のパス
* @param resourceLoader ResourceLoader
* @return キーストアとパスワードのペア
* @throws IOException
* @throws GeneralSecurityException
*/
public static KeyStorePasswordPair getKeyStorePasswordPair(String certificateFile, String privateKeyFile,
ResourceLoader resourceLoader) throws IOException, GeneralSecurityException {
if (certificateFile == null || privateKeyFile == null) {
return null;
}
// 秘密鍵の読み取り
PrivateKey privateKey = loadPrivateKeyFromFile(privateKeyFile, null, resourceLoader);
// 証明書の読み取り
List<Certificate> certChain = loadCertificatesFromFile(certificateFile, resourceLoader);
if (certChain == null || privateKey == null) {
return null;
}
return getKeyStorePasswordPair(certChain, privateKey);
}
/**
* 証明書と秘密鍵の中身からキーストアとパスワードのペアを取得する
*
* @param certificates 証明書の中身
* @param privateKey 秘密鍵の中身
* @return キーストアとパスワードのペア
*/
private static KeyStorePasswordPair getKeyStorePasswordPair(final List<Certificate> certificates,
final PrivateKey privateKey) {
KeyStore keyStore;
String keyPassword;
try {
keyStore = KeyStore.getInstance(KeyStore.getDefaultType());
keyStore.load(null);
// randomly generated key password for the key in the KeyStore
keyPassword = new BigInteger(128, new SecureRandom()).toString(32);
Certificate[] certChain = new Certificate[certificates.size()];
certChain = certificates.toArray(certChain);
keyStore.setKeyEntry("alias", privateKey, keyPassword.toCharArray(), certChain);
} catch (KeyStoreException | NoSuchAlgorithmException | CertificateException | IOException e) {
return null;
}
return new KeyStorePasswordPair(keyStore, keyPassword);
}
/**
* 秘密鍵の読み取り
*
* @param filename 秘密鍵のパス
* @param algorithm 証明書アルゴリズム(デフォルト:RSA)
* @param resourceLoader ResourceLoader
* @return 読み取った秘密鍵
* @throws IOException
* @throws GeneralSecurityException
*/
private static PrivateKey loadPrivateKeyFromFile(final String filename, final String algorithm,
ResourceLoader resourceLoader) throws IOException, GeneralSecurityException {
PrivateKey privateKey = null;
URL url = null;
try {
url = resourceLoader.getResource("classpath:" + filename).getURL();
} catch (IOException e) {
throw e;
}
try (DataInputStream stream = new DataInputStream(url.openStream())) {
privateKey = getPrivateKey(stream, algorithm);
} catch (IOException | GeneralSecurityException e) {
throw e;
}
return privateKey;
}
/**
* 秘密鍵の解析
*
* @param stream 秘密鍵ファイルのstream
* @param algorithm 証明書アルゴリズム(デフォルト:RSA)
* @return 解析した秘密鍵
* @throws IOException
* @throws GeneralSecurityException
*/
private static PrivateKey getPrivateKey(InputStream stream, String algorithm)
throws IOException, GeneralSecurityException {
PrivateKey key = null;
boolean isRSAKey = false;
BufferedReader br = new BufferedReader(new InputStreamReader(stream, "UTF-8"));
StringBuilder builder = new StringBuilder();
boolean inKey = false;
for (String line = br.readLine(); line != null; line = br.readLine()) {
if (!inKey) {
if (line.startsWith("-----BEGIN ") && line.endsWith(" PRIVATE KEY-----")) {
inKey = true;
isRSAKey = line.contains("RSA");
}
continue;
} else {
if (line.startsWith("-----END ") && line.endsWith(" PRIVATE KEY-----")) {
inKey = false;
isRSAKey = line.contains("RSA");
break;
}
builder.append(line);
}
}
KeySpec keySpec = null;
byte[] encoded = Base64.decodeBase64(builder.toString());
if (isRSAKey) {
keySpec = getRSAKeySpec(encoded);
} else {
keySpec = new PKCS8EncodedKeySpec(encoded);
}
KeyFactory kf = KeyFactory.getInstance((algorithm == null) ? "RSA" : algorithm);
key = kf.generatePrivate(keySpec);
return key;
}
/**
* RSA秘密鍵の解析
*
* @param keyBytes 秘密鍵ファイルのエンコード結果
* @return RSA秘密鍵
* @throws IOException
*/
private static RSAPrivateCrtKeySpec getRSAKeySpec(byte[] keyBytes) throws IOException {
DerParser parser = new DerParser(keyBytes);
Asn1Object sequence = parser.read();
if (sequence.getType() != DerParser.SEQUENCE)
throw new IOException("Invalid DER: not a sequence"); //$NON-NLS-1$
// Parse inside the sequence
parser = sequence.getParser();
parser.read(); // Skip version
BigInteger modulus = parser.read().getInteger();
BigInteger publicExp = parser.read().getInteger();
BigInteger privateExp = parser.read().getInteger();
BigInteger prime1 = parser.read().getInteger();
BigInteger prime2 = parser.read().getInteger();
BigInteger exp1 = parser.read().getInteger();
BigInteger exp2 = parser.read().getInteger();
BigInteger crtCoef = parser.read().getInteger();
RSAPrivateCrtKeySpec keySpec = new RSAPrivateCrtKeySpec(modulus, publicExp, privateExp, prime1, prime2, exp1,
exp2, crtCoef);
return keySpec;
}
/**
* 証明書の読み取り
*
* @param filename 証明書のパス
* @param resourceLoader ResourceLoader
* @return 読み取った証明書
* @throws IOException
*/
private static List<Certificate> loadCertificatesFromFile(final String filename, ResourceLoader resourceLoader)
throws IOException {
URL url = null;
try {
url = resourceLoader.getResource("classpath:" + filename).getURL();
} catch (IOException e) {
throw e;
}
try (BufferedInputStream stream = new BufferedInputStream(url.openStream())) {
final CertificateFactory certFactory = CertificateFactory.getInstance("X.509");
return (List<Certificate>) certFactory.generateCertificates(stream);
} catch (IOException | CertificateException e) {
}
return null;
}
}
- 実際に動かすときはMqttClient.javaのpublishメソッドを以下のような感じで呼び出せばOK
mqttClient.publish(publishしたいAWSのトピック名, publishするメッセージ);
- AWSIoTのMQTT テストクライアントからトピックのサブスクリプトをして監視していると、publishされたメッセージが表示されるはず
まとめ
- AWS側の設定は担当外で設定してないので、本当に情報皆無ですみません
- 英語のドキュメントつらい。英語アレルギーで死ぬんじゃないかと思った。日本語でお願いします。
- 公式くん、「今はaws-iot-device-sdk-javaはv1じゃなくてv2を使ってね!」と言っているので、今更役に立たないかもですが誰かに届けば嬉しいなあ、、