やりたいこと
前回の記事↓でpythonからMQTTでDr.Sum Data Funnelにデータを送りました。
今回はFunnel側でデータを受け取って、Dr.Sumというデータベースに格納してみます。
Dr.Sumとは
高速集計が得意なデータベースです。
Oracle DBやPostgres、SQL Serverなどが同じ土俵になるかと思います。
「Enterprise Manager」というクライアントツールで操作します。
Dr.Sum Data Funnelとは
継続的に発生する時系列データをリアルタイムに収集、加工するためのツールです。
「Dr.Sum Data Funnel Client」というクライアントツールで操作します。
データを受け取る
・Dr.Sum Data Funnel Clientの画面からデータをパイプライン定義を作っていきます。右クリック→新規作成
・パイプライン定義の名前を決めます。今回は「test」にします。subscribeするデータの型を取得するため、データ取得ボタンをクリック。
・ここでコンソールを開き、前記事で紹介したpublishプログラムを実行します。プログラムでは、”test”というMQTTトピックでpublishしています。
・Dr.Sum Data Funnel Clientの画面に戻り、MQTTトピックに「test」と入力し「データ取得」ボタンをクリックしてsubscribeを開始します。正常にpublishされていれば、画像のようにjson形式のデータを取得できます。後に取得したjsonデータからテーブルを作るため、json部分をコピーしておきます。コピーしたら、データ取得の画面は閉じてOKです。
・定義作成画面に戻ったら、次のように操作していきます。
トリガー間隔を「1秒」に設定
→「テーブル化の設定」ボタンクリック→「データを入力して推定」ボタンクリック
→"パブリッシュされる1回分の推定用データ"の欄に、先程コピーしたjsonデータを貼り付け→中央の「推定」ボタンクリック→右下「OK」ボタンクリック
→右下「OK」ボタンクリック
・次に、時間の項目の処理について定義していきます。これを行わないとDr.Sumへの出力ができません。次のように操作します。
左メニュー「時刻関連項目の設定」→上部のプルダウンから「ARRIVEDTIME」を選択
→右の項目名に「ROUNDTIME」と入力(ここは任意でOK)→右下「追加」をクリック
ここまでで、publishされたデータを受け取る(subscribe)部分は終了です。
「時間関連項目の設定」からはサンプリングされた時刻データを丸めることが出来るので、ミリ秒単位を秒単位に直したりと、データ量を少なくしたいときに便利です。
Dr.Sumにデータを溜める
・まず、データの格納先となるテーブルを作成する必要があるので、Enterprise Managerからテーブルを作成していきます。
今回は「SensorDB」という名前でDBを作成してみました。作り方等はこちらがわかりやすいと思います。
DBを作成したら、画面上部のアイコン群から「SQL Executor」を起動します。
・SQL Executorの画面が開きます。左上の「接続先データベース」が「SensorDB」になっていることを確認します。
・Funnel Clientの画面を開き、左のメニューから「Dr.Sumに出力」クリック→「テーブルの作成SQLをクリップボードにコピー」をクリック
・Enterprise Managerの画面に戻り、画像のエリアにコピーしたテキストを貼り付けます。テーブルを作成するSQLがコピーされています。貼り付けたら左上の右矢印アイコンをクリックして、SQL文を実行します。実行すると、右下のエリアにログが出てきます。正常に実行されているようであれば、SQL Executorの画面を閉じます。
・Enterprise Managerの画面に戻ります。念の為、一度左上の更新ボタンを押し、SensorDBの中のテーブルに「test」ができていることを確認します。
・「test」のテーブルを右クリック→「開く」で、データの項目が揃っていることを確認します。
・Funnel Clientに戻り、Dr.Sumへの出力設定を行います。
「出力する」にチェック→「出力先サーバーの選択」でDr.Sumのアクセス情報でログイン
→データベース名を「SensorDB」に→右のテーブル名から「test」を選択→「パイプライン定義を保存」
ここまでで、設定はばっちりです!
では、早速pythonからデータを送って、Funnelで受け取り、Dr.Sumへ格納してみましょう!
実行
Funnel Clientで先程保存したパイプライン定義を起動します。