6
4

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 5 years have passed since last update.

AWS IoTにJVM系言語(Scala)から接続するには(Publish)

Last updated at Posted at 2015-10-27

何を、どんな経緯でやってみたのか?

AWS IoTにJVM系言語(Scala)から直接MQTTライブラリを用いて接続するにはどうすればいいかという確認。
下記の記事にあるように、現状AWS IoTに対して直接接続している事例として、mosquittoのCLIを使うもの、Pythonのpaho-mqttを使用するもの、Node.jsで接続するものの3種類が存在します。
ただ、JVM系言語から接続している事例は見つからなかったのでやってみようという話です。

尚、AWS IoT用のJava SDKには「AWS IoTに対してPublishする」というAPIのみ存在してます。

上記のSDKを用いた場合、AWS IoTに対してSubscribeすることは出来ません。
つまり、AWS的なスタンスとしては「AWS IoTには投入のみ行い、取得/活用はAWS IoTからフィルタリングした上で別サービスに転送して行ってね」ということなのだと推測されます。

ただ、それだと元々JVM系言語からMQTT Brokerに対してSubscribeして取得データを活用していた場合、いまいち移行しずらいです。というか元々それを前提に作られているコンポーネントではAWS IoTとは連携できないということになってしまいます。

と、ここまででわかったとは思いますが、つまりはAWS IoTを単なるMQTT BrokerのSaaSとして使うにはどうすればいいか、というわけなので、結構奇特なユースケースな気はします。
とはいえ、気になったので出来るかだけは確認しておこうという話です。

尚、AWS IoTは完全従量課金でフルマネージドなMQTT Brokerを使えるため、扱うメッセージがそれほど多くない環境においては単なるMQTT BrokerのSaaSとして扱っても非常に優秀だとは思います。流量多くなったら別サービスに流せばいいわけですし。

AWS IoTの設定

下記ページを参考に、「モノの作成」「モノの証明書を作成」「ポリシの作成と証明書との紐付け」「モノと証明書との紐付け」を行い、証明書ファイルを保存しておきます。

証明書ファイルは下記の名称で保存されているものとします。

  • cert.pem
  • private.pem
  • rootCa.pem

という形で、証明書はファイルとして提供されているという前提で記述します。

AWS IoTに対するPublishを行うコード

AWS IoTに対するPublishを行うコードは下記のようになりました。
尚、SocketFactoryGeneratorで使用している「password」という文字列は実質的に使用されないため、何でも問題ないです。
・・・ならそもそも置くなよ、というわけですが、それはSubscribeの時にでも訂正します^^;

実際のコードは下記のページから確認できます。
https://github.com/kimutansk/aws-iot-example/

MqttPublisher.scala
import org.eclipse.paho.client.mqttv3.{MqttMessage, MqttConnectOptions, MqttClient}

/** MQTT Publish Test Class */
object MqttPublisher {
  def main(args: Array[String]) {
    // Connect Target
    val brokerURI:String = "ssl://******.iot.ap-northeast-1.amazonaws.com:8883"

    // SocketFactoryGenerate
    val socketFactory = SocketFactoryGenerator.generateFromFilePath("/etc/cert/rootCA.pem", "/etc/cert/cert.pem", "/etc/cert/private.pem", "password")

    // MQTT Client generate
    val client:MqttClient = new MqttClient(brokerURI, "mqtt-publisher")
    client.setCallback(new PublishMqttCallback)
    val options:MqttConnectOptions = new MqttConnectOptions()
    options.setSocketFactory(socketFactory)
    client.connect(options)


    val message:MqttMessage = new MqttMessage("Test Message".getBytes("UTF-8"))
    client.publish("test-topic", message)
  }
}
SocketFactoryGenerator.scala
import java.io.{ByteArrayInputStream, InputStream, InputStreamReader}
import java.nio.file.{Files, Paths}
import java.security.cert.{CertificateFactory, X509Certificate}
import java.security.{KeyPair, KeyStore, Security}
import javax.net.ssl.{KeyManagerFactory, SSLContext, SSLSocketFactory, TrustManagerFactory}

import org.bouncycastle.cert.X509CertificateHolder
import org.bouncycastle.jce.provider.BouncyCastleProvider
import org.bouncycastle.openssl.jcajce.JcaPEMKeyConverter
import org.bouncycastle.openssl.{PEMKeyPair, PEMParser}

/** Factory for [[javax.net.ssl.SSLSocketFactory]] instances. */
object SocketFactoryGenerator {

  /**
   * Generate [[javax.net.ssl.SSLSocketFactory]] from pem file paths.
   *
   * @param rootCaFilePath Root CA file path
   * @param certFilePath Certificate file path
   * @param keyFilePath Private key file path
   * @return Generated [[javax.net.ssl.SSLSocketFactory]]
   */
  def generateFromFilePath(rootCaFilePath:String, certFilePath:String, keyFilePath:String, keyStorePassword:String):SSLSocketFactory = {
    Security.addProvider(new BouncyCastleProvider())

    // load Root CA certificate
    val rootCaParser:PEMParser  = new PEMParser(new InputStreamReader(new ByteArrayInputStream(Files.readAllBytes(Paths.get(rootCaFilePath)))))
    val rootCaCertHolder:X509CertificateHolder = rootCaParser.readObject().asInstanceOf[X509CertificateHolder]
    val rootCaCert:X509Certificate = convertToJavaCertificate(rootCaCertHolder)
    rootCaParser.close()

    // load Server certificate
    val certParser:PEMParser = new PEMParser(new InputStreamReader(new ByteArrayInputStream(Files.readAllBytes(Paths.get(certFilePath)))))
    val serverCertHolder:X509CertificateHolder = certParser.readObject.asInstanceOf[X509CertificateHolder]
    val serverCert:X509Certificate = convertToJavaCertificate(serverCertHolder)
    certParser.close()

    // load Private Key
    val keyParser:PEMParser = new PEMParser(new InputStreamReader(new ByteArrayInputStream(Files.readAllBytes(Paths.get(keyFilePath)))))
    val pemKeyPair:PEMKeyPair = keyParser.readObject.asInstanceOf[PEMKeyPair]
    val keyPair:KeyPair = new JcaPEMKeyConverter().getKeyPair(pemKeyPair)
    keyParser.close()

    // Root CA certificate is used to authenticate server
    val rootCAKeyStore:KeyStore = KeyStore.getInstance(KeyStore.getDefaultType())
    rootCAKeyStore.load(null, null)
    rootCAKeyStore.setCertificateEntry("ca-certificate", convertToJavaCertificate(rootCaCertHolder))
    val tmf:TrustManagerFactory = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm())
    tmf.init(rootCAKeyStore);

    // client key and certificates are sent to server so it can authenticate us
    val ks:KeyStore  = KeyStore.getInstance(KeyStore.getDefaultType())
    ks.load(null, null)
    ks.setCertificateEntry("certificate", serverCert)
    ks.setKeyEntry("private-key", keyPair.getPrivate(), keyStorePassword.toCharArray, Array(serverCert))
    val kmf:KeyManagerFactory = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm())
    kmf.init(ks, keyStorePassword.toCharArray());

    // finally, create SSL socket factory
    val context:SSLContext = SSLContext.getInstance("TLSv1.2")
    context.init(kmf.getKeyManagers(), tmf.getTrustManagers(), null)

    context.getSocketFactory()
  }


  def convertToJavaCertificate(certificateHolder:X509CertificateHolder):X509Certificate = {
     val is:InputStream = new ByteArrayInputStream(certificateHolder.toASN1Structure.getEncoded);
    try {
      CertificateFactory.getInstance("X.509").generateCertificate(is).asInstanceOf[X509Certificate]
    } finally is.close()
  }
}
PublishMqttCallback.scala
import org.eclipse.paho.client.mqttv3.{IMqttDeliveryToken, MqttMessage, MqttCallback}

/** Publish MqttCallBack */
class PublishMqttCallback extends MqttCallback{
  // Nop
  override def deliveryComplete(iMqttDeliveryToken: IMqttDeliveryToken): Unit = ???

  override def messageArrived(s: String, mqttMessage: MqttMessage): Unit = ???

  override def connectionLost(throwable: Throwable): Unit = ???
}

実際の動作確認

mosquitto_subコマンドで実際に投入されるかを確かめてみると、下記のようにScala側のコードから投入したメッセージがSubscribe可能であることを確認しました。

# mosquitto_sub --cafile rootCA.pem --cert cert.pem --key private.pem -h ******.iot.ap-northeast-1.amazonaws.com -p 8883 -q 1 -d -t test-topic -i mosquitto-subscriber
Client mosquitto-subscriber sending CONNECT
Client mosquitto-subscriber received CONNACK
Client mosquitto-subscriber sending SUBSCRIBE (Mid: 1, Topic: test-topic, QoS: 1)
Client mosquitto-subscriber received SUBACK
Subscribed (mid: 1): 1
Client mosquitto-subscriber received PUBLISH (d0, q1, r0, m1, 'test-topic', ... (12 bytes))
Client mosquitto-subscriber sending PUBACK (Mid: 1)
Test Message

最後に

というわけで、SSLSocketFactoryを差し替えることで、ScalaからAWS IoTに直接接続し、メッセージのPublishが行えることが確認できました。
おそらくSubscribeも同じ方式で可能だとは思いますが、次に確認してみます。

6
4
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
6
4

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?