0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 5 years have passed since last update.

AWS IoTにJVM系言語(Scala)から接続するには(Subscribe)

Posted at

何を、どんな経緯でやってみたのか?

下記の前回の記事の続きで、今度はSubscribeをやってみよう、という話です。
あと、前回のコードは本来は指定不要なパスワードを指定していたため、そのあたりの整理も行っています。
その他の経緯は下記の記事を参照。

AWS IoTに対するSubscribeを行うコード

前回に既に接続自体は可能になっているため、単純にPublishとSubscribeを入れ替えただけです。

実際のコードは下記のページから確認できます。
https://github.com/kimutansk/aws-iot-example/

MqttSubscriber.scala
import org.eclipse.paho.client.mqttv3.{MqttClient, MqttConnectOptions}

/** MQTT Subscribe Test Class */
object MqttSubscriber {
  def main(args: Array[String]) {
    // Connect Target
    val brokerURI:String = "ssl://******.iot.ap-northeast-1.amazonaws.com:8883"

    // SocketFactoryGenerate
    val socketFactory = SocketFactoryGenerator.generateFromFilePath("/etc/cert/rootCA.pem", "/etc/cert/cert.pem", "/etc/cert/private.pem")

    // MQTT Client generate
    val client:MqttClient = new MqttClient(brokerURI, "mqtt-subscriber")
    client.setCallback(new SubscribeMqttCallback)
    val options:MqttConnectOptions = new MqttConnectOptions()
    options.setSocketFactory(socketFactory)
    client.connect(options)

    client.subscribe("test-topic")
    Thread.sleep(60000)
  }
}
SocketFactoryGenerator.scala
import java.io.{ByteArrayInputStream, InputStream, InputStreamReader}
import java.nio.file.{Files, Paths}
import java.security.cert.{CertificateFactory, X509Certificate}
import java.security.{KeyPair, KeyStore, Security}
import javax.net.ssl.{KeyManagerFactory, SSLContext, SSLSocketFactory, TrustManagerFactory}

import org.bouncycastle.cert.X509CertificateHolder
import org.bouncycastle.jce.provider.BouncyCastleProvider
import org.bouncycastle.openssl.jcajce.JcaPEMKeyConverter
import org.bouncycastle.openssl.{PEMKeyPair, PEMParser}

/** Factory for [[javax.net.ssl.SSLSocketFactory]] instances. */
object SocketFactoryGenerator {

  /**
   * Generate [[javax.net.ssl.SSLSocketFactory]] from pem file paths.
   *
   * @param rootCaFilePath Root CA file path
   * @param certFilePath Certificate file path
   * @param keyFilePath Private key file path
   * @return Generated [[javax.net.ssl.SSLSocketFactory]]
   */
  def generateFromFilePath(rootCaFilePath:String, certFilePath:String, keyFilePath:String):SSLSocketFactory = {
    Security.addProvider(new BouncyCastleProvider())

    // load Root CA certificate
    val rootCaParser:PEMParser  = new PEMParser(new InputStreamReader(new ByteArrayInputStream(Files.readAllBytes(Paths.get(rootCaFilePath)))))
    val rootCaCertHolder:X509CertificateHolder = rootCaParser.readObject().asInstanceOf[X509CertificateHolder]
    val rootCaCert:X509Certificate = convertToJavaCertificate(rootCaCertHolder)
    rootCaParser.close()

    // load Server certificate
    val certParser:PEMParser = new PEMParser(new InputStreamReader(new ByteArrayInputStream(Files.readAllBytes(Paths.get(certFilePath)))))
    val serverCertHolder:X509CertificateHolder = certParser.readObject.asInstanceOf[X509CertificateHolder]
    val serverCert:X509Certificate = convertToJavaCertificate(serverCertHolder)
    certParser.close()

    // load Private Key
    val keyParser:PEMParser = new PEMParser(new InputStreamReader(new ByteArrayInputStream(Files.readAllBytes(Paths.get(keyFilePath)))))
    val pemKeyPair:PEMKeyPair = keyParser.readObject.asInstanceOf[PEMKeyPair]
    val keyPair:KeyPair = new JcaPEMKeyConverter().getKeyPair(pemKeyPair)
    keyParser.close()

    // Root CA certificate is used to authenticate server
    val rootCAKeyStore:KeyStore = KeyStore.getInstance(KeyStore.getDefaultType())
    rootCAKeyStore.load(null, null)
    rootCAKeyStore.setCertificateEntry("ca-certificate", convertToJavaCertificate(rootCaCertHolder))
    val tmf:TrustManagerFactory = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm())
    tmf.init(rootCAKeyStore);

    // client key and certificates are sent to server so it can authenticate us
    val ks:KeyStore  = KeyStore.getInstance(KeyStore.getDefaultType())
    ks.load(null, null)
    ks.setCertificateEntry("certificate", serverCert)
    ks.setKeyEntry("private-key", keyPair.getPrivate(), "DummyPassword".toCharArray, Array(serverCert))
    val kmf:KeyManagerFactory = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm())
    kmf.init(ks, "DummyPassword".toCharArray());

    // finally, create SSL socket factory
    val context:SSLContext = SSLContext.getInstance("TLSv1.2")
    context.init(kmf.getKeyManagers(), tmf.getTrustManagers(), null)

    context.getSocketFactory()
  }


  def convertToJavaCertificate(certificateHolder:X509CertificateHolder):X509Certificate = {
     val is:InputStream = new ByteArrayInputStream(certificateHolder.toASN1Structure.getEncoded);
    try {
      CertificateFactory.getInstance("X.509").generateCertificate(is).asInstanceOf[X509Certificate]
    } finally is.close()
  }
}
SubscribeMqttCallback.scala
import org.eclipse.paho.client.mqttv3.{IMqttDeliveryToken, MqttCallback, MqttMessage}

/** Subscribe MqttCallBack */
class SubscribeMqttCallback extends MqttCallback{

  override def deliveryComplete(iMqttDeliveryToken: IMqttDeliveryToken): Unit = ???

  override def messageArrived(s: String, mqttMessage: MqttMessage): Unit = {
    System.out.println("Message received. : Topic=" + s + ", Payload=" + mqttMessage.toString)
  }

  override def connectionLost(throwable: Throwable): Unit = ???
}

実際の動作確認

mosquitto_pubコマンドで実際にメッセージを投入してみると、下記のようにScala側でMQTTMessageが取得可能であることを確認しました。

  • メッセージPublish側
# mosquitto_pub --cafile rootCA.pem --cert cert.pem --key private.pem -h ******.iot.ap-northeast-1.amazonaws.com -p 8883 -q 1 -d -t test-topic -i mosquitto-publisher -m TestPublishMessage
Client mosquitto-publisher sending CONNECT
Client mosquitto-publisher received CONNACK
Client mosquitto-publisher sending PUBLISH (d0, q1, r0, m1, 'test-topic', ... (18 bytes))
Client mosquitto-publisher received PUBACK (Mid: 1)
Client mosquitto-publisher sending DISCONNECT
  • メッセージSubscribe側
(起動コマンド)
Message received. : Topic=test-topic, Payload=TestPublishMessage

最後に

これで、最終的には異常系などのコードの整理やファイルの配置などの足回りは必要になりますが、AWS IoTに対してJVM言語からPublishとSubscribeが出来ることが確認できました。
AWS IoTの機能は強力ですが、全て使用する必要はない、という状況においてはこうやって中途半端に使用するのもありかもしれませんね。

0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?