8
5

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 5 years have passed since last update.

Cloud Functions で IoT CoreをPubSubしてみる

Posted at

はじめに

Cloud FunctionsはGoogle Cloud Platformのサーバレスアプリケーション。 AWS Lambda
Azure Functionsと同様の短時間実行型の関数機能を提供するサービスである。Cloud Functionsは現時点でベータ版である。

このFunctionsで IoT Core のトピックをサブスクライブしてみることにする。

やってみた

Google Cloud IoT Coreを使ってみる で作成したプロジェクト下で、IoT Coreレジストリのトピックをサブスクライブしてみる。

まず、GCPコンソールの左上ハンバーガーメニューから Functions をクリックしてFunctionsのコンソールに移動する。

スクリーンショット 2018-03-22 21.48.09.png

はじめて使う場合はAPIが無効になっているので有効化する。

スクリーンショット 2018-03-22 21.48.21.png

関数を作成 をクリックする。

スクリーンショット 2018-03-22 21.49.45.png

関数の作成画面。関数のソースコードを作成する方法はいくつかあるが、今回はインラインエディタを使ってソースコードを作成する。

スクリーンショット 2018-03-22 21.50.14.png

関数名を btf-test-iot を入力する
割り当てられるメモリ 256MB を選択する
トリガー Cloud Pub/Sub トピック を選択する
トピック btf-test-topic を選択 (IoT Coreで作成したトピック)
ソースコード インライン エディタ を選択する
実行する関数 subscribe を入力する

スクリーンショット 2018-03-22 22.47.28.png

トリガーで Cloud Pub/Sub トピック を選択すると、ソースコードがPub/Subトリガー専用のサンプルコードに切り替わる。ソースコードはトリガーに応じたサンプルコードが用意されているようだ。今回はこれをそのまま利用する。

index.js
/**
 * Triggered from a message on a Cloud Pub/Sub topic.
 *
 * @param {!Object} event The Cloud Functions event.
 * @param {!Function} The callback function.
 */
exports.subscribe = (event, callback) => {
  // The Cloud Pub/Sub Message object.
  const pubsubMessage = event.data;

  // We're just going to log the message to prove that
  // it worked.
  console.log(Buffer.from(pubsubMessage.data, 'base64').toString());

  // Don't forget to call the callback.
  callback();
};

すべて設定できたら 作成 をクリックする。

スクリーンショット 2018-03-22 21.54.35.png

しばらくすると、関数が作成される。

スクリーンショット 2018-03-22 22.52.58.png

IoT Coreにメッセージをパブリッシュするサンプルコードを作成する。 Google Cloud IoT Coreを使ってみる で使った公式のサンプルコードはちょっと冗長なのでこれを参考にカンタンなものをつくってみた。

[PROJECT ID] を実際のプロジェクトID
[REGION] をIoT Coreレジストリのリージョン
[REGISTRY ID] をIoT CoreレジストリID
[DEVICE ID] をデバイスID

にそれぞれ書き換える。

index.js
'use strict';

const fs = require('fs');
const jwt = require('jsonwebtoken');
const mqtt = require('mqtt');

const PUBLISH_INTERVAL_MILLISEC = 5000;

const MQTT_HOST = 'mqtt.googleapis.com';
const MQTT_PORT = 8883;

const PROJECT_ID = '[PROJECT ID]';
const CLOUD_REGION = '[REGION]';
const REGISTRY_ID = '[REGISTRY ID]';
const DEVICE_ID = '[DEVICE ID]';

const PRIVATE_KEY_FILE = './rsa_private.pem';
const ALGORITHM = 'RS256';
const MESSAGE_TYPE = 'events';

const mqttClientId = `projects/${PROJECT_ID}/locations/${CLOUD_REGION}/registries/${REGISTRY_ID}/devices/${DEVICE_ID}`;
const mqttTopic = `/devices/${DEVICE_ID}/${MESSAGE_TYPE}`;

// Create a Cloud IoT Core JWT for the given project id, signed with the given
// private key.
// [START iot_mqtt_jwt]
function createJwt (projectId, privateKeyFile, algorithm) {
  // Create a JWT to authenticate this device. The device will be disconnected
  // after the token expires, and will have to reconnect with a new token. The
  // audience field should always be set to the GCP project id.
  const token = {
    'iat': parseInt(Date.now() / 1000),
    'exp': parseInt(Date.now() / 1000) + 20 * 60, // 20 minutes
    'aud': projectId
  };
  const privateKey = fs.readFileSync(privateKeyFile);
  return jwt.sign(token, privateKey, { algorithm: algorithm });
}
// [END iot_mqtt_jwt]

// With Google Cloud IoT Core, the username field is ignored, however it must be
// non-empty. The password field is used to transmit a JWT to authorize the
// device. The "mqtts" protocol causes the library to connect using SSL, which
// is required for Cloud IoT Core.
let settings = {
  host: MQTT_HOST,
  port: MQTT_PORT,
  clientId: mqttClientId,
  username: 'unused',
  password: createJwt(PROJECT_ID, PRIVATE_KEY_FILE, ALGORITHM),
  protocol: 'mqtts',
  secureProtocol: 'TLSv1_2_method'
};

let client = mqtt.connect(settings);

let count = 1;

/* MAIN */
setInterval(function () {

  const payload = `${DEVICE_ID}-payload-${count}`;

  // Publish "payload" to the MQTT topic. qos=1 means at least once delivery.
  // Cloud IoT Core also supports qos=0 for at most once delivery.
  console.log('Publishing message:', payload);
  client.publish(mqttTopic, payload, { qos: 1 }, function (err) {
    if (err) {
      throw new Error(err);
    }
  });

  count++;

}, PUBLISH_INTERVAL_MILLISEC);

実行する。5秒に一度メッセージがパブリッシュされる。

node index.js

Publishing message: sensor1-payload-1
Publishing message: sensor1-payload-2
Publishing message: sensor1-payload-3
Publishing message: sensor1-payload-4
Publishing message: sensor1-payload-5

Functionsのコンソールから btf-test-iot Functionの右側オプションメニューから ログを表示 をクリックする。

スクリーンショット 2018-03-22 23.02.11.png

画面上部の再生ボタンをクリックしてログのストリーミングを有効にする。

スクリーンショット 2018-03-22 23.03.24.png

サブスクライブされたメッセージがログに表示されていく。うまくいったようだ。

スクリーンショット 2018-03-22 23.11.11.png

まとめ

インラインエディタが便利。ソースコードはZIPでアップロードしたりCloud Storageに置いたりできるけどインラインエディタのサンプルをいじったほうが楽かもと思った。ローカルで開発する場合は cloud-functions-emulator でデバッグするのが便利そう。

GCPでサーバレスするには Cloud Pub/Sub をうまく利用するのがキモになる気がする。FunctionsのPub/Subトリガーを活用して柔軟にサーバレスできるようになりたいもんである。

8
5
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
8
5

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?