Edited at

TreasureData x MQTT translator

More than 3 years have passed since last update.

こんにちは、ちょびえです。私の専門はPHP CoreとかWeb Serviceを勢いでつくったり、とかなんですが最近だと広く浅くをモットーにGREEのインフラ周りのお仕事に携わっています。

はてさて、Treasure Data AdventCalendarを勢いで始めてみましたが、当初のアイデアだとGREEの現場で実際につかってる例を紹介しようかなー、と思っていたのですがそれを空き枠の16記事ぐらいでやると社内での確認とるの大変! とか全記事書くのにフルスイングしないと無理! となってしまいそうです。

そんなこんなで、僕が担当する記事では、こういったアイデアどうなかな?とか実際に現場でカジュアルに使っていたしている事柄について書いていこうかと思いますのでクリスマスまでお付き合いいただければと思います。

一発目の今日はアイデア系から言ってみたいと思います


Webhookを受けとる上で企業内でよくでくわす問題

Github等をはじめとした最近のWeb ServiceはAPIでの連携が簡単になっています。しかしながら、企業でwebhook等利用しようとすると前段にFirewallがいる関係で気軽にwebhookの設定が開発マシンに投げられない、一度設定したとしても後になって社内の他のグループが使いたいので新たなルールを…と結構面倒くさい問題が出てしまいます。

Treasure DataのAPIでも様々な出力先が選べるようになっていますが、社内の特定マシンに対して結果を受け取りたい、といった場合に上記のような権限の問題にひっかかってしまうこともあります。

Screenshot 2014-11-30 18.47.42.png

一つだけならそんな手間でもないですし、問題ないのですが、これが複数になってくるとルールの管理や今後どうしていこうか、といった話になってきてしまいます。

整理するとこんな感じです。


  • 管理者)webhookを受けるにあたって都度firewallやproxyなどにルールなどを追加しなければいけない。特定のマシンへのrouting追加した場合は定期的に利用有無を確認しないといけない

  • 利用者)気軽に使いたいけどいちいち申請とか面倒くさい。

うーん。これはどうにかアイデアを考えないと面倒なままですね。


MQTT - IoT時代のメッセージブローカー -

そんな問題を抱えつつ、どうしていこうかという時にHexaさんのLAN 内に GitHub WebHooks の HTTP リクエストを届けるを読んで、あ、この解法色んな所に使える!アイデア拝借しよう、という事でTDの結果セットをMQTTに書き出すアプローチを取ってみたいと思います。

こうすれば依頼毎にroutingを追加したり、使われなくなった事を定期的に確認しなくてすみます。後から利用したいと言ってくる人への対応等も楽そうです。

因みにMQTTはIBMが作っているプロトコルで、現在ではOASISという団体で標準化の策定をしており現在のversionは3.1.1となっています。Internet Of Things……なんかカッコいい響きですがざっくり言うと現実とインターネットをもっと連携させていこう、みたいな取り組みです。例えば気温等のセンサ情報をインターネットに流して機械学習やリモートコントロール等、夢が広がりングな分野です。

◇ ◇ ◇ ◇

ところが現状のTreasureDataではMQTTへのexportは対応しておりません。

なければ作ればいいだけの話ですし、再現できるように作っておけばトレジャーデータ社に検討してもらうのも楽でしょうからJob Result Output to Web Server (REST)機能を使って

テータを受け取りつつ、MQTTに結果をProxyする機能をGoで作ってみましょう。


構想〜必要なものセットアップ

Treasure Dataのexport先にMQTTが使えるようになるとこんな事が簡単にできるようになります。


  • クエリ実行からの自動処理が容易に組める


    • Production環境でも同様に組んでおけばらくちん



  • 疎結合を強いれるので作った仕組みを他部署へ展開することが容易になる


    • ザッツMicro Serviceですね



  • 管理者はMQTTサーバーの管理だけしてればいいので楽

こんな風に利用者側が自分達で出来るように構築しておくと、説明の際にサンプルコードはここだ、後頑張って!なんかあったら聞いてね!で済むので楽ができそうです。

Screenshot 2014-11-30 18.54.05.png

早速実装に入りたいのですが、その前に必要なツール等の準備をしていきます。

◇ ◇ ◇ ◇

MQTTサーバーを自前で立ててもいいのですが、時雨堂のSangoが便利なので今日はSangoを使います。

Sangoへの登録やmqttcliのインストールはshirouさんの記事をを参考にしてください。

◇ ◇ ◇ ◇

続いて、最近mopemopeさんのツイートで知ったngrokというツールがとても便利でして、これ何かというと*.ngrok.comで公開されている80ポートにランダムなサブドメインを割り振ってくれて、ローカルのポートをそのサブドメインにport forwardingしてくれるツール&サービスです。

Screenshot 2014-11-30 18.55.35.png

https://ngrok.com/

エンジニア向けなサービスはGithub一発でサインアップ終わるのでホントサイコーですね。

登録したらngrokのダウンロードをして設定を済ませます。

https://ngrok.com/download

今回は18883ポートにアクセスしたいのでこのようにコマンドを発行して外部からアクセス出来るようにします。

ngrok -authenticate <設定画面のAPIKEY> 18883

ngrok (Ctrl+C to quit)

Tunnel Status online
Version 1.7/1.6
Forwarding http://78b2e0b3.ngrok.com -> 127.0.0.1:18883
Forwarding https://78b2e0b3.ngrok.com -> 127.0.0.1:18883
Web Interface 127.0.0.1:4040
# Conn 0
Avg Conn Time 0.00ms

これで78b2e0b3.ngrok.comでlocalの18883ポートにアクセスできるようになりました。

あ、ちなみにngrok自体はOSSなので自前で建てることもできます。

https://github.com/inconshreveable/ngrok

ngrokは会社内でつかうとビミョーって言われる所もあると思うので、もし会社で使う場合は事前に確認しておきましょう


TD -> MQTT Translatorの実装

それではTD REST Outputの結果を受け取ってMQTTに流す部分をGoで実装していきます。

package main

// 動かし方
// go get github.com/chobie/momonga/client
// go run main.go
import (
"fmt"
"net"
"net/http"
"io/ioutil"
"github.com/chobie/momonga/client"
)

type MqttTranslator struct{
Client *client.Client
}

func (m *MqttTranslator) ServeHTTP(w http.ResponseWriter, r *http.Request) {
switch r.URL.String() {
case "/", "/favicon.ico":
fmt.Fprint(w, "{\"result\": \"fault\"}")
default:
data, err := ioutil.ReadAll(r.Body)
if err != nil {
fmt.Fprint(w, "{\"result\": \"fault\"}")
panic(err)
}

// slash始まりは許可されていないので取り除く
m.Client.Publish(r.URL.String()[1:], data, 0)
fmt.Fprint(w, "{\"result\": \"ok\"}")
}
}

func main() {
mqtt_address := "lite.mqtt.shiguredo.jp"
mqtt_port := 1883

// せっかく自分でmomonga MQTT Clientっつーのを書いてるんでつかいます
opt := client.Option{
TransporterCallback: func() (net.Conn, error) {
return net.Dial("tcp", fmt.Sprintf("%s:%d", mqtt_address, mqtt_port))
},
Keepalive: 0,
Magic: []byte("MQTT"),
Version: 4,
Identifier: "td-mqtt-translator", // MQTTのidentifierはユニークでなければいけません。通常はランダムな文字列を付与したりします
UserName: "chobie@github",
Password: "*************",
}
c := client.NewClient(opt)
c.Connect()
t := &MqttTranslator{
Client: c,
}

go http.ListenAndServe("localhost:18883", t)
select{}
}

それではテストしてみましょう。


実際に試してみる

改めて構成図の確認です。TD -> REST OUTPUT -> MQTT Translator -> MQTT (Sango) <- Monitorという図式になります。

Screenshot 2014-11-30 19.05.33.png

実際にTDでクエリを投げてみます。Job Result Output to Web Server (REST)

機能を利用するので下記のように--result引数でRESTに書き出す指定を行いコマンドを実行します。

chobie% td query \

--result 'webs://78b2e0b3.ngrok.com/chobie@github/hoge' \
-w -d testdb \
"SELECT v['code'] AS code, COUNT(1) AS cnt FROM www_access GROUP BY v['code']"

続いてmqttcliで接続をします。これでクエリの結果がうまくSangoに通知されてmqttcliで結果を受け取れれば成功です。

chobie% MQTT_HOST="lite.mqtt.shiguredo.jp" MQTT_USERNAME="chobie@github" MQTT_PASSWORD="**********" mqttcli sub -t "chobie@github/#"

{"column_names":["code","cnt"],"column_types":["string","long"],"data":[["200",4981],["404",17],["500",2]]}

やりましたね!これでTDからの結果をMQTTに通知出来るようになりました。

と、いうことで。Web Service等からMQTTにProxyしてあげることで企業内でのWeb APIの受け取りも柔軟に、かつ申請不要で自分達で出来るようになりそうです。


まとめ

TD -> MQTT translatorを使うことで受け口を一本化しつつ柔軟な機能を提供出来るようになりました。


  • 管理者)今後webhook受け取る場合のルールの追加は不要(MQTTサーバー、TranslatorだけメンテしてればOK)。新しいサービスが来たとしてもそのままぽこぽこMQTTに投げればいいだけだし楽

  • 利用者)マニュアルや他の人のコードを読んで自分達の好きに作れる。いちいち管理者に確認しなくていいので楽

実際問題MQTTのPayloadは250Mbytesまでの制限がありますし、Sangoの無料枠では2014/12月現在6Kの制限があるので、Job Result Output to Web Server (REST)

は結果セットだけでなくIDとかメタ情報を通知してもらえると嬉しいですねー。

1日目はアイデア系ですがいかがでしたでしょうか?こういったパターンでMQTTを使うのは意外と使い勝手いいんじゃないかなー、と思っています。