前回、弊社がサービス提供しておりますKCCS APIデータ配信サービスのGoogle Cloud Pub/Subを利用した災害データPUSH配信機能を使って、slackへ連携する方法をご説明致しました。
PUSH配信で利用するサブスクリプションにおけるデータ連携はpull方式とpush方式があり、前回はpull方式で抽出しましたが、今回はpush方式によるslack連携についてご説明したいと思います。
まずは、サブスクリプションで配信する先のエンドポイントを作成します。
Google Cloud Functionsを使ったエンドポイントの作成
ソースは以下に公開しております。
今回はこちらのソースコードをダウンロードし、zipで圧縮、CloudStorageにアップロードします。
Cloud Functionsで関数の作成をします。
この関数のトリガーURLをサブスクリプションのエンドポイントに設定することで、
サブスクリプションからデータが配信されたときに、関数が実行されます。
ランタイムにGO 1.13を選択し、アップロードしたCloudStorageのパスを指定します。
ランタイム環境変数に下記を設定し、デプロイします。
SLACK_URL: webhookのURL
ICON_EMOJI: slack投稿時に使うアイコン 例) :warning:
USERNAME: slack投稿時のユーザ名
サブスクリプションの設定
Cloud Functionsの設定画面でトリガーURLを確認します。
確認したトリガーURLを、サブスクリプションの画面からエンドポイントに設定します。
これで、設定は完了です!
slackの投稿確認
データがサブスクリプションからpushされるとslackに投稿されます。
push方式なので、pull方式よりもタイムラグなく配信されます。
まとめ
push方式とpull方式の違いですが、
push方式の方がリアルタイムに配信可能ですが、pull方式は必要な時までにデータをためておき、取得したいタイミングで取得できるという利点があります。
また、個人的には、pull方式の方が、GASでも実装可能なので、扱いやすいという印象があります!
おまけ
コードの内容について、Mainのみ掲載します。
以下サイトを参考にMain関数を作成しています。
package endpoint
import (
"os"
"fmt"
"bytes"
"net/http"
"io/ioutil"
"encoding/json"
"encoding/base64"
"kccs.co.jp/endpoint/structs"
)
// リクエスト時この関数が実行される
func EndpointSample(w http.ResponseWriter, r *http.Request) {
// Pubsubから配信されるデータの構造体を定義する
var pr structs.PushRequest
// 受信したJSON文字列を構造体に変換する
// JSONの構成については右記参照: https://cloud.google.com/pubsub/docs/push#receiving_messages
err := json.NewDecoder(r.Body).Decode(&pr)
fmt.Printf("received_data: %+v\n", pr)
// 変換に失敗した場合はBadRequestを返す
if err != nil {
http.Error(w, fmt.Sprintf("Could not decode body: %v", err), http.StatusBadRequest)
return
}
// 構造体のデータ部分をBase64デコードする
json_str, err := base64.StdEncoding.DecodeString(pr.Message.Data)
// 変換に失敗した場合はBadRequestを返す
if err != nil {
http.Error(w, fmt.Sprintf("Could not decode body: %v", err), http.StatusBadRequest)
return
}
fmt.Println("json_str: "+string(json_str))
// data_type_codeを取得する
var rd_tmp map[string]interface{}
json.Unmarshal(json_str, &rd_tmp)
data_type_code := rd_tmp["data_type_code"].(string)
var result int
var message string
// data_type_codeごとに処理を分岐する
switch data_type_code {
// 気象警報
case "VXWW50":
// var rd structs.VXWW50ReceivedData
result = 200
message = "OK"
err = nil
// 震度データ
case "VXSE53":
var rd structs.VXSE53ReceivedData
json.Unmarshal(json_str, &rd)
notify_message := rd.MakeNotifyMessage()
result, message, err = notifySlack(notify_message)
}
// functionのレスポンスを返す
if err != nil {
w.WriteHeader(500)
fmt.Fprint(w, "Internal Server Error")
} else {
w.WriteHeader(result)
fmt.Fprint(w, message)
}
}
// slackに通知する
func notifySlack(message string) (int, string, error) {
// リクエストボディを作る
username := os.Getenv("USERNAME")
icon_emoji := os.Getenv("ICON_EMOJI")
message_json := map[string]string{"username": username, "icon_emoji": icon_emoji, "text": message}
marshaled_json, err := json.Marshal(message_json)
fmt.Println("marshaled_json: "+string(marshaled_json))
converted_io := bytes.NewReader(marshaled_json)
// リクエスト実行
url := os.Getenv("SLACK_URL")
req, err := http.NewRequest("POST", url, converted_io)
req.Header.Set("Content-Type", "application/json")
client := &http.Client{}
resp,err := client.Do(req)
defer resp.Body.Close()
body, _ := ioutil.ReadAll(resp.Body)
if err != nil {
return 0, "", err
} else {
return resp.StatusCode, string(body), err
}
}