何を、どんな経緯でやってみたのか?
下記の前回の記事の続きで、今度はSubscribeをやってみよう、という話です。
あと、前回のコードは本来は指定不要なパスワードを指定していたため、そのあたりの整理も行っています。
その他の経緯は下記の記事を参照。
AWS IoTに対するSubscribeを行うコード
前回に既に接続自体は可能になっているため、単純にPublishとSubscribeを入れ替えただけです。
実際のコードは下記のページから確認できます。
https://github.com/kimutansk/aws-iot-example/
MqttSubscriber.scala
import org.eclipse.paho.client.mqttv3.{MqttClient, MqttConnectOptions}
/** MQTT Subscribe Test Class */
object MqttSubscriber {
def main(args: Array[String]) {
// Connect Target
val brokerURI:String = "ssl://******.iot.ap-northeast-1.amazonaws.com:8883"
// SocketFactoryGenerate
val socketFactory = SocketFactoryGenerator.generateFromFilePath("/etc/cert/rootCA.pem", "/etc/cert/cert.pem", "/etc/cert/private.pem")
// MQTT Client generate
val client:MqttClient = new MqttClient(brokerURI, "mqtt-subscriber")
client.setCallback(new SubscribeMqttCallback)
val options:MqttConnectOptions = new MqttConnectOptions()
options.setSocketFactory(socketFactory)
client.connect(options)
client.subscribe("test-topic")
Thread.sleep(60000)
}
}
SocketFactoryGenerator.scala
import java.io.{ByteArrayInputStream, InputStream, InputStreamReader}
import java.nio.file.{Files, Paths}
import java.security.cert.{CertificateFactory, X509Certificate}
import java.security.{KeyPair, KeyStore, Security}
import javax.net.ssl.{KeyManagerFactory, SSLContext, SSLSocketFactory, TrustManagerFactory}
import org.bouncycastle.cert.X509CertificateHolder
import org.bouncycastle.jce.provider.BouncyCastleProvider
import org.bouncycastle.openssl.jcajce.JcaPEMKeyConverter
import org.bouncycastle.openssl.{PEMKeyPair, PEMParser}
/** Factory for [[javax.net.ssl.SSLSocketFactory]] instances. */
object SocketFactoryGenerator {
/**
* Generate [[javax.net.ssl.SSLSocketFactory]] from pem file paths.
*
* @param rootCaFilePath Root CA file path
* @param certFilePath Certificate file path
* @param keyFilePath Private key file path
* @return Generated [[javax.net.ssl.SSLSocketFactory]]
*/
def generateFromFilePath(rootCaFilePath:String, certFilePath:String, keyFilePath:String):SSLSocketFactory = {
Security.addProvider(new BouncyCastleProvider())
// load Root CA certificate
val rootCaParser:PEMParser = new PEMParser(new InputStreamReader(new ByteArrayInputStream(Files.readAllBytes(Paths.get(rootCaFilePath)))))
val rootCaCertHolder:X509CertificateHolder = rootCaParser.readObject().asInstanceOf[X509CertificateHolder]
val rootCaCert:X509Certificate = convertToJavaCertificate(rootCaCertHolder)
rootCaParser.close()
// load Server certificate
val certParser:PEMParser = new PEMParser(new InputStreamReader(new ByteArrayInputStream(Files.readAllBytes(Paths.get(certFilePath)))))
val serverCertHolder:X509CertificateHolder = certParser.readObject.asInstanceOf[X509CertificateHolder]
val serverCert:X509Certificate = convertToJavaCertificate(serverCertHolder)
certParser.close()
// load Private Key
val keyParser:PEMParser = new PEMParser(new InputStreamReader(new ByteArrayInputStream(Files.readAllBytes(Paths.get(keyFilePath)))))
val pemKeyPair:PEMKeyPair = keyParser.readObject.asInstanceOf[PEMKeyPair]
val keyPair:KeyPair = new JcaPEMKeyConverter().getKeyPair(pemKeyPair)
keyParser.close()
// Root CA certificate is used to authenticate server
val rootCAKeyStore:KeyStore = KeyStore.getInstance(KeyStore.getDefaultType())
rootCAKeyStore.load(null, null)
rootCAKeyStore.setCertificateEntry("ca-certificate", convertToJavaCertificate(rootCaCertHolder))
val tmf:TrustManagerFactory = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm())
tmf.init(rootCAKeyStore);
// client key and certificates are sent to server so it can authenticate us
val ks:KeyStore = KeyStore.getInstance(KeyStore.getDefaultType())
ks.load(null, null)
ks.setCertificateEntry("certificate", serverCert)
ks.setKeyEntry("private-key", keyPair.getPrivate(), "DummyPassword".toCharArray, Array(serverCert))
val kmf:KeyManagerFactory = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm())
kmf.init(ks, "DummyPassword".toCharArray());
// finally, create SSL socket factory
val context:SSLContext = SSLContext.getInstance("TLSv1.2")
context.init(kmf.getKeyManagers(), tmf.getTrustManagers(), null)
context.getSocketFactory()
}
def convertToJavaCertificate(certificateHolder:X509CertificateHolder):X509Certificate = {
val is:InputStream = new ByteArrayInputStream(certificateHolder.toASN1Structure.getEncoded);
try {
CertificateFactory.getInstance("X.509").generateCertificate(is).asInstanceOf[X509Certificate]
} finally is.close()
}
}
SubscribeMqttCallback.scala
import org.eclipse.paho.client.mqttv3.{IMqttDeliveryToken, MqttCallback, MqttMessage}
/** Subscribe MqttCallBack */
class SubscribeMqttCallback extends MqttCallback{
override def deliveryComplete(iMqttDeliveryToken: IMqttDeliveryToken): Unit = ???
override def messageArrived(s: String, mqttMessage: MqttMessage): Unit = {
System.out.println("Message received. : Topic=" + s + ", Payload=" + mqttMessage.toString)
}
override def connectionLost(throwable: Throwable): Unit = ???
}
実際の動作確認
mosquitto_pubコマンドで実際にメッセージを投入してみると、下記のようにScala側でMQTTMessageが取得可能であることを確認しました。
- メッセージPublish側
# mosquitto_pub --cafile rootCA.pem --cert cert.pem --key private.pem -h ******.iot.ap-northeast-1.amazonaws.com -p 8883 -q 1 -d -t test-topic -i mosquitto-publisher -m TestPublishMessage
Client mosquitto-publisher sending CONNECT
Client mosquitto-publisher received CONNACK
Client mosquitto-publisher sending PUBLISH (d0, q1, r0, m1, 'test-topic', ... (18 bytes))
Client mosquitto-publisher received PUBACK (Mid: 1)
Client mosquitto-publisher sending DISCONNECT
- メッセージSubscribe側
(起動コマンド)
Message received. : Topic=test-topic, Payload=TestPublishMessage
最後に
これで、最終的には異常系などのコードの整理やファイルの配置などの足回りは必要になりますが、AWS IoTに対してJVM言語からPublishとSubscribeが出来ることが確認できました。
AWS IoTの機能は強力ですが、全て使用する必要はない、という状況においてはこうやって中途半端に使用するのもありかもしれませんね。