5
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 3 years have passed since last update.

Kafka Streamを使ってLineに通知する

Last updated at Posted at 2022-04-16

はじめに

ESP32ではじめようIoT開発ハンズオンでkafkaを利用するので、少し発展させてLineに通知する所を組んでみる。

image.png

ここでいうところのkafka stream -> Firebase -> Line

開発環境

  • windows 10
  • Software
    • node.js: 16.13.1(12.0以降であれば問題ないハズ)

結果

drawing kafkaからの通知に成功

Line Messaging API側

やることは

  • Firebase Functions & Firestoreアプリを作成する
  • Line Developers ConsoleからMessaging APIのchannelを作る
  • Firebase FunctionsとLine Messaging APIを連動させる
  • 自身の端末から認証する

Firebase Functions & Firestoreアプリを作成する

Firebase consoleから新規プロジェクトを作成する

  • プロジェクト名: esp32-iot-notificator

プロジェクトが出来たらFirestore Databaseを作成する

image.png

Databaseが作成出来たらコレクションの要素を作成しておく。コレクション名はline-notificate-users

image.png

続いてfunctionsの設定。利用するには従量制への変更が必要なので勇気を出して切り替える。

image.png

早速プロジェクトをローカルで作成してゆく

firebase login --no-localhost --interactive
firebase init

下記にチェックを付ける

  • firestore
  • functions

ひとまずfunctions/src/index.tsを修正して

import * as functions from "firebase-functions";

export const callback = functions.https.onRequest((request, response) => {
  functions.logger.info("response", request.body);
  response.status(200).send("callback");
});

デプロイ

firebase deploy

firebaseの画面でcallbackが登録されていればOK。

image.png

Line Developers ConsoleからMessaging APIのchannelを作る

まずは、LINEのMessaging APIを用いて能動的通知をする方法で紹介されている通りにチャンネルを作る。

image.png
あふれ出る名前の適当さ加減

Webhook URLに先ほどfunctionに登録されたcallbackのURLを設定する。ここで設定したコールバックはLineユーザーがアプリを登録しりchannelにメッセージを送った際などに呼ばれる。

image.png

Firebase FunctionsとLine Messaging APIを連動させる

まずはやる事を整理する

image.png

なので、下記タイミングで発動するfunctionsを組む必要がある

  • ユーザーがchannelを友達承認した
  • メッセージを能動的に打つ

ユーザーがchanngelを友達承認したorメッセージを送った

今回はメッセージを受け取ったら自動的に登録するようにする。因みにメッセージを受け取った際にcallbackが受け取るリクエストは下記の様になっている(2022/04/11現在)

{
    "events":[
        {
            "message":{
                "id":"15899930248440",
                "type":"text",
                "text":"おい"
            },
            "source":{
                "type":"user",
                "userId":"<lineユーザーのID>"
            },
            "type":"message",
            "replyToken":"<返信などで必要となるトークン>",
            "timestamp":1649672154163,
            "mode":"active"
        }
    ],
    "destination":"<channelのユーザーID>"
}

ここで取得しなければならないのはevents[].source.userId。因みにこのuserIdはLineのプロフィールに表示されるIDではない。メッセージの送受信で必要となる別のID。

callbackで受け取ってFirestoreに格納する

import * as functions from "firebase-functions";
import * as admin from "firebase-admin";

admin.initializeApp();
const db = admin.firestore();
db.settings({
  ignoreUndefinedProperties: true,
});

export const callback = functions.https.onRequest((request, response) => {
  const userId = request.body.events[0].source.userId;
  db.collection("line-notificate-users").doc(userId).set({})
      .then(() => {
        response.status(200).send("Complete");
      })
      .catch((error) => {
        response.status(500).send(error);
      });
});

メッセージを能動的に打つ

メッセージを打つ部分はkafkaから呼ばれる想定。firestoreにため込まれたユーザに対してメッセージを打つ。

まずは必要なパッケージをpackage.jsonのdependenciesに追加する。

    "axios": "^0.26.1"

軽くインストール

npm install

index.tsにfunctionを追加

import * as functions from "firebase-functions";
import * as admin from "firebase-admin";
import axios, {AxiosInstance} from "axios";

admin.initializeApp();
const db = admin.firestore();
db.settings({
  ignoreUndefinedProperties: true,
});

const lineToken = "<line messaging apiのトークン>";
const lineEndpoint = "https://api.line.me/v2/bot/message/push";

export const notificate = functions.https.onRequest((request, response) => {
  functions.logger.info("request.body.message", request.body.message);
  const message = request.body.message;
  db.collection("line-notificate-users").get()
      .then((snapshot) => {
        snapshot.forEach((doc) => {
          const userId = doc.id;
          const data = {
            to: userId,
            messages: [
              {
                type: "text",
                text: message,
              },
            ],
          };
          const apiClient: AxiosInstance = axios.create({
            headers: {
              "Content-type": "application/json",
              "Authorization": "Bearer " + lineToken,
            },
          });
          return apiClient.post(lineEndpoint, JSON.stringify(data))
              .then((data) => {
                console.log(data);
                functions.logger.info("send succeed");
                response.status(200).send("complete");
              })
              .catch((error) => {
                functions.logger.error("send failure", error);
                response.status(500).send(error);
              });
        });
      })
      .catch((error) => {
        functions.logger.error("snapshot failure", error);
        response.status(500).send(error);
      });
});

自身の端末から認証する

Line Developer Consoleのchannel -> Messagee APIにあるQRコードを自身のLineから読み込ませて友達になる。

image.png

スタンプなりなんなりを送るとデフォルトのメッセージが返ってくる。

drawing

下記コマンドでメッセージを送ってみる

curl -v -X POST https://us-central1-esp32-iot-notificator-42297.cloudfunctions.net/notificate \
-H 'Content-Type: application/json' \
-d '{ "message": "hello bun bun youtu" }'

Lineに通知が来ることを確認する

drawing

Kafka Stream側アプリを組む

イベントでは照度と水分を計測するので、今回はStreamingで照度が急激に上がった際を検出して通知を出す。

  • Brokerクラスタの構築は割愛

さっそくTopologyを定義する。流れとしては

  • handson0429-summaryのトピックを読み取り
  • jsonパース
  • key単位に異常を検知する(DeviceMetricAbnormalDetector)
  • 異常検知したらfirebaseを叩く
class AbnormalDetectTopology {
  private static final Logger log = LoggerFactory.getLogger(AbnormalDetectTopology.class);

  public static Topology build() {
    StreamsBuilder builder = new StreamsBuilder();
    Consumed<String, String> consumerOption = Consumed.with(Serdes.String(), Serdes.String());
    KStream<String, String> events = builder.stream("handson0429-summary", consumerOption);

    events
        .filter((key, value) -> value != null)
        .filter((key, value) -> !value.equals(""))
        .mapValues(value -> DeviceMetric.parse(value))
        .filter((key, value) -> !value.getDeviceId().equals(""))
        .map((key, value) -> KeyValue.pair(value.getDeviceId(), value))
        .groupByKey(Grouped.with(Serdes.String(), JsonSerdes.DeviceMetric()))
        .aggregate(
            () -> new DeviceMetricAbnormalDetector(),
            (key, value, detector) -> {
              detector.detect(value.getMoist(), value.getLight());
              return detector;
            },
            Materialized.as("metric-abnormal-detector")
                .with(Serdes.String(), JsonSerdes.DeviceMetricAbnormalDetector()))
        .filter((key, value) -> value.abnormal)
        .toStream()
        .foreach(
            (key, value) -> {
              try {
                var client = HttpClient.newHttpClient();
                var json = "{ \"message\" : \"" + key + "が眩しいってさ\" }";
                var req =
                    HttpRequest.newBuilder()
                        .uri( URI.create( System.getenv("NOTIFICATE_ENDPOINT")))
                        .header("Content-Type", "application/json")
                        .POST(BodyPublishers.ofString(json))
                        .build();
                var res = client.send(req, HttpResponse.BodyHandlers.ofString());
                System.out.println(res.body());
              } catch (Exception e) {
                System.out.println(e.toString());
              }
            });

    return builder.build();
  }
}

DeviceMetricAbnormalDetectorはこんな感じ

public class DeviceMetricAbnormalDetector {
  public Long moist = 0L;
  public Long light = 0L;
  public Boolean abnormal = false;

  public DeviceMetricAbnormalDetector() {}

  public void detect(Long newMoist, Long newLight) {
    if ((newLight - light) > 1000L) {
      abnormal = true;
    } else {
      abnormal = false;
    }
    light = newLight;
    moist = newMoist;
  }

  @Override
  public String toString() {
    return "{"
        + "moist='"
        + moist
        + "'"
        + ", light='"
        + light
        + "'"
        + ", abnormal='"
        + abnormal
        + "'"
        + "}";
  }
}

AppからTopologyを指定する


class App {
  public static void main(String[] args) {
    Topology topology = AbnormalDetectTopology.build();

    Properties props = new Properties();
    props.put(StreamsConfig.APPLICATION_ID_CONFIG, "device-metric-summarizer");
    props.put(
        StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,
        "kafka-broker-001:9092,kafka-broker-002:9092,kafka-broker-003:9092");
    props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
    props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
    props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
    props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
    props.put(StreamsConfig.STATE_DIR_CONFIG, "state-store");
    props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 10 * 1000);
    props.put(
        StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, WallclockTimestampExtractor.class);

    System.out.println(topology.describe());
    KafkaStreams streams = new KafkaStreams(topology, props);
    Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
    streams.cleanUp();
    streams.start();
  }
}

残りはイベントまでにアップします。

ダミーデータを流して通知が来ることを確認する

スクリプトを組んで叩いてみる

#!/bin/bash

message='{"deviceId":"1001","moist":0 ,"light":0}'
./bin/kafka-console-producer.sh \
    --topic handson0429-summary \
    --property key.separator=, \
    --property parse.key=true \
    --bootstrap-server localhost:9092 << EOF
1001,${message}
EOF


message='{"deviceId":"1001","moist":1001,"light":1001}'
./bin/kafka-console-producer.sh \
    --topic handson0429-summary \
    --property key.separator=, \
    --property parse.key=true \
    --bootstrap-server localhost:9092 << EOF
1001,${message}
EOF

結果

drawing

ふりかえり

ハンズオンではGrafanaつかった可視化までやるけど、時間が余れば通知まで実施する。
通知まで組み込むとIoT感がぐっと上がるってことが分かった。受動最高。

drawing

調子に乗って飛ばし過ぎた

5
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
5
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?