何を、どんな経緯でやってみたのか?
AWS IoTにJVM系言語(Scala)から直接MQTTライブラリを用いて接続するにはどうすればいいかという確認。
下記の記事にあるように、現状AWS IoTに対して直接接続している事例として、mosquittoのCLIを使うもの、Pythonのpaho-mqttを使用するもの、Node.jsで接続するものの3種類が存在します。
ただ、JVM系言語から接続している事例は見つからなかったのでやってみようという話です。
- 太陽光パネルの発電量をAWS IoTとAmazon Elasticsearch Serviceを使って可視化してみる(mosquitto/Python)
- AWS IoTのレイテンシーを測ってみた(1)(Node.js)
尚、AWS IoT用のJava SDKには「AWS IoTに対してPublishする」というAPIのみ存在してます。
上記のSDKを用いた場合、AWS IoTに対してSubscribeすることは出来ません。
つまり、AWS的なスタンスとしては「AWS IoTには投入のみ行い、取得/活用はAWS IoTからフィルタリングした上で別サービスに転送して行ってね」ということなのだと推測されます。
ただ、それだと元々JVM系言語からMQTT Brokerに対してSubscribeして取得データを活用していた場合、いまいち移行しずらいです。というか元々それを前提に作られているコンポーネントではAWS IoTとは連携できないということになってしまいます。
と、ここまででわかったとは思いますが、つまりはAWS IoTを単なるMQTT BrokerのSaaSとして使うにはどうすればいいか、というわけなので、結構奇特なユースケースな気はします。
とはいえ、気になったので出来るかだけは確認しておこうという話です。
尚、AWS IoTは完全従量課金でフルマネージドなMQTT Brokerを使えるため、扱うメッセージがそれほど多くない環境においては単なるMQTT BrokerのSaaSとして扱っても非常に優秀だとは思います。流量多くなったら別サービスに流せばいいわけですし。
AWS IoTの設定
下記ページを参考に、「モノの作成」「モノの証明書を作成」「ポリシの作成と証明書との紐付け」「モノと証明書との紐付け」を行い、証明書ファイルを保存しておきます。
証明書ファイルは下記の名称で保存されているものとします。
- cert.pem
- private.pem
- rootCa.pem
という形で、証明書はファイルとして提供されているという前提で記述します。
AWS IoTに対するPublishを行うコード
AWS IoTに対するPublishを行うコードは下記のようになりました。
尚、SocketFactoryGeneratorで使用している「password」という文字列は実質的に使用されないため、何でも問題ないです。
・・・ならそもそも置くなよ、というわけですが、それはSubscribeの時にでも訂正します^^;
実際のコードは下記のページから確認できます。
https://github.com/kimutansk/aws-iot-example/
import org.eclipse.paho.client.mqttv3.{MqttMessage, MqttConnectOptions, MqttClient}
/** MQTT Publish Test Class */
object MqttPublisher {
def main(args: Array[String]) {
// Connect Target
val brokerURI:String = "ssl://******.iot.ap-northeast-1.amazonaws.com:8883"
// SocketFactoryGenerate
val socketFactory = SocketFactoryGenerator.generateFromFilePath("/etc/cert/rootCA.pem", "/etc/cert/cert.pem", "/etc/cert/private.pem", "password")
// MQTT Client generate
val client:MqttClient = new MqttClient(brokerURI, "mqtt-publisher")
client.setCallback(new PublishMqttCallback)
val options:MqttConnectOptions = new MqttConnectOptions()
options.setSocketFactory(socketFactory)
client.connect(options)
val message:MqttMessage = new MqttMessage("Test Message".getBytes("UTF-8"))
client.publish("test-topic", message)
}
}
import java.io.{ByteArrayInputStream, InputStream, InputStreamReader}
import java.nio.file.{Files, Paths}
import java.security.cert.{CertificateFactory, X509Certificate}
import java.security.{KeyPair, KeyStore, Security}
import javax.net.ssl.{KeyManagerFactory, SSLContext, SSLSocketFactory, TrustManagerFactory}
import org.bouncycastle.cert.X509CertificateHolder
import org.bouncycastle.jce.provider.BouncyCastleProvider
import org.bouncycastle.openssl.jcajce.JcaPEMKeyConverter
import org.bouncycastle.openssl.{PEMKeyPair, PEMParser}
/** Factory for [[javax.net.ssl.SSLSocketFactory]] instances. */
object SocketFactoryGenerator {
/**
* Generate [[javax.net.ssl.SSLSocketFactory]] from pem file paths.
*
* @param rootCaFilePath Root CA file path
* @param certFilePath Certificate file path
* @param keyFilePath Private key file path
* @return Generated [[javax.net.ssl.SSLSocketFactory]]
*/
def generateFromFilePath(rootCaFilePath:String, certFilePath:String, keyFilePath:String, keyStorePassword:String):SSLSocketFactory = {
Security.addProvider(new BouncyCastleProvider())
// load Root CA certificate
val rootCaParser:PEMParser = new PEMParser(new InputStreamReader(new ByteArrayInputStream(Files.readAllBytes(Paths.get(rootCaFilePath)))))
val rootCaCertHolder:X509CertificateHolder = rootCaParser.readObject().asInstanceOf[X509CertificateHolder]
val rootCaCert:X509Certificate = convertToJavaCertificate(rootCaCertHolder)
rootCaParser.close()
// load Server certificate
val certParser:PEMParser = new PEMParser(new InputStreamReader(new ByteArrayInputStream(Files.readAllBytes(Paths.get(certFilePath)))))
val serverCertHolder:X509CertificateHolder = certParser.readObject.asInstanceOf[X509CertificateHolder]
val serverCert:X509Certificate = convertToJavaCertificate(serverCertHolder)
certParser.close()
// load Private Key
val keyParser:PEMParser = new PEMParser(new InputStreamReader(new ByteArrayInputStream(Files.readAllBytes(Paths.get(keyFilePath)))))
val pemKeyPair:PEMKeyPair = keyParser.readObject.asInstanceOf[PEMKeyPair]
val keyPair:KeyPair = new JcaPEMKeyConverter().getKeyPair(pemKeyPair)
keyParser.close()
// Root CA certificate is used to authenticate server
val rootCAKeyStore:KeyStore = KeyStore.getInstance(KeyStore.getDefaultType())
rootCAKeyStore.load(null, null)
rootCAKeyStore.setCertificateEntry("ca-certificate", convertToJavaCertificate(rootCaCertHolder))
val tmf:TrustManagerFactory = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm())
tmf.init(rootCAKeyStore);
// client key and certificates are sent to server so it can authenticate us
val ks:KeyStore = KeyStore.getInstance(KeyStore.getDefaultType())
ks.load(null, null)
ks.setCertificateEntry("certificate", serverCert)
ks.setKeyEntry("private-key", keyPair.getPrivate(), keyStorePassword.toCharArray, Array(serverCert))
val kmf:KeyManagerFactory = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm())
kmf.init(ks, keyStorePassword.toCharArray());
// finally, create SSL socket factory
val context:SSLContext = SSLContext.getInstance("TLSv1.2")
context.init(kmf.getKeyManagers(), tmf.getTrustManagers(), null)
context.getSocketFactory()
}
def convertToJavaCertificate(certificateHolder:X509CertificateHolder):X509Certificate = {
val is:InputStream = new ByteArrayInputStream(certificateHolder.toASN1Structure.getEncoded);
try {
CertificateFactory.getInstance("X.509").generateCertificate(is).asInstanceOf[X509Certificate]
} finally is.close()
}
}
import org.eclipse.paho.client.mqttv3.{IMqttDeliveryToken, MqttMessage, MqttCallback}
/** Publish MqttCallBack */
class PublishMqttCallback extends MqttCallback{
// Nop
override def deliveryComplete(iMqttDeliveryToken: IMqttDeliveryToken): Unit = ???
override def messageArrived(s: String, mqttMessage: MqttMessage): Unit = ???
override def connectionLost(throwable: Throwable): Unit = ???
}
実際の動作確認
mosquitto_subコマンドで実際に投入されるかを確かめてみると、下記のようにScala側のコードから投入したメッセージがSubscribe可能であることを確認しました。
# mosquitto_sub --cafile rootCA.pem --cert cert.pem --key private.pem -h ******.iot.ap-northeast-1.amazonaws.com -p 8883 -q 1 -d -t test-topic -i mosquitto-subscriber
Client mosquitto-subscriber sending CONNECT
Client mosquitto-subscriber received CONNACK
Client mosquitto-subscriber sending SUBSCRIBE (Mid: 1, Topic: test-topic, QoS: 1)
Client mosquitto-subscriber received SUBACK
Subscribed (mid: 1): 1
Client mosquitto-subscriber received PUBLISH (d0, q1, r0, m1, 'test-topic', ... (12 bytes))
Client mosquitto-subscriber sending PUBACK (Mid: 1)
Test Message
最後に
というわけで、SSLSocketFactoryを差し替えることで、ScalaからAWS IoTに直接接続し、メッセージのPublishが行えることが確認できました。
おそらくSubscribeも同じ方式で可能だとは思いますが、次に確認してみます。