背景と目的
SQLの学習のためネットワーク経由でデータをデータベースに追加する方法を検討しました。
作成したもの
データベース(MariaDB)をRaspberryPi上に作成し、iPhoneのMESHアプリから日時情報と温度情報を送信して、RaspberryPiで起動するローカルWebサーバ(Flask)で受信してデータベースに追加処理します。
RaspberryPiであれば温度センサーを使うことも考えましたが、ネットワーク経由でデータ処理を確認したかったので手元にあるもので作ることにしました。
また、温度情報はMESHの温度・湿度(Temperature & Humidity)タグを使用して、jQueryのajaxメソッドを使用してカスタムタグから受信した日時情報と温度情報をPOST送信します。
データベースへの追加処理はPyMySQLライブラリを使用します。
動作検証1:ボタンタグでテストデータを送信するカスタムタグ
はじめに動作検証としてボタンタグをトリガーにしてカスタムタグからデータ送信する方法を確認しました。カスタムタグはMESH SDKを使用して作成します。
テストデータはjson形式で作成してPOST送信します。
カスタムタグではTagが持つ各Function(表示名,コネクタ,設定パラメタ,ソースコード)の動作を定義するために、Functionが持つ4つのメソッド {Initialize, Receive, Execute, Result} を実装しますが、今回はFunction内のメソッド間で変数参照はないため、Executeメソッドのみ実装します。
// MESH_SDK
// アクセス先(ポート番号はデフォルト値)
var apiURL = "http://[サーバーのipアドレス]:5000/send_data";
// テストデータ
// ボタンタグをトリガーに送信させる
var send_data = {
"type": "testType",
"testTypeValue" : "testTypeValue1"
};
// HTTPリクエスト(POST)送信
// HTTPレスポンスはローカルWebサーバー側(Flask)に返る
// log("HTTP Request / POST());
ajax ({
url : apiURL,
data : JSON.stringify(send_data),
type : "post",
contentType : "application/json",
dataType : "json",
timeout : 5000,
success : function (contents) {
log("Done");
// 処理を継続
callbackSuccess({
resultType : "continue"
});
},
error : function (request, errorMessage) {
log("Error");
// 処理を継続
callbackSuccess({
resultType : "continue"
});
}
});
// 非同期のレスポンスを待つため、処理を停止する
// log("Stop processing and wait for response");
return {
resultType : "pause"
};
ローカルWebサーバー(Flask)側
ソースコードはこちらの記事を参考にさせていただきました。
IPアドレスの指定はFlask公式のQuickstartを参考にしました。
from flask import Flask
from flask import request
app = Flask(__name__)
@app.route('/send_data', methods=['POST'])
def send_data():
if request.method == 'POST':
data = request.json
print(data)
return data
if __name__ == "__main__":
# ローカルネットワーク内の接続を有効にしてサーバーを起動する
app.run(host='0.0.0.0')
実行結果
カスタムタグで作成したデータをサーバー側(RaspberryPi)で受信できることを確認しました。
$ python app.py
WARNING: This is a development server. Do not use it in a production deployment. Use a production WSGI server instead.
* Serving Flask app 'app'
* Debug mode: off
* Running on all addresses (0.0.0.0)
WARNING: This is a development server. Do not use it in a production deployment.
* Running on http://127.0.0.1:5000
* Running on http://[サーバーのipアドレス]:5000 (Press CTRL+C to quit)
{'type': 'testType', 'testTypeValue': 'testTypeValue1'}
[iPhoneのipアドレス] - - [タイムスタンプ] "POST /test HTTP/1.1" 200 -
動作検証2:日時情報と温度情報の作成(MESH SDK)してMariaDBにデータ追加(INCERT)する
動作検証1でカスタムタグのHTTPリクエスト(POST)動作を確認できたため、続けて日時情報と温度情報の作成してMariaDBにデータ追加(INCERT)します。
日時情報と温度情報の送信(MESH SDK)
日時情報はMESHアプリでは取得できないためDateオブジェクトで作成して、
温度情報はSDKでの取得方法記載の定義値をそれぞれ送信用データとして作成します。
// MESH_SDK
// アクセス先(ポート番号はデフォルト値)
var apiURL = "http://[サーバーのipアドレス]:5000/send_data";
// 日付オブジェクトを作成
var date = new Date();
// ハイフン区切りの文字列へ変換
function getDateString(date) {
return date.getFullYear() + '-'
+ ('0' + (date.getMonth() + 1)).slice(-2) + '-'
+ ('0' + date.getDate()).slice(-2) + (' ')
+ ('0' + date.getHours()).slice(-2) + (':')
+ ('0' + date.getMinutes()).slice(-2) + (':')
+ ('0' + date.getSeconds()).slice(-2)
};
// 変換済み日時データ(string型)
var dtNow = getDateString(date);
// 温度・湿度(Temperature & Humidity)タグ情報
var send_data = {
"dateValue": dtNow,
"temperatureValue" : messageValues.temperatureValue
};
// HTTPリクエスト(POST)送信
// HTTPレスポンスはローカルWebサーバー側(Flask)に返る
// log("HTTP Request / POST());
ajax ({
url : apiURL,
data : JSON.stringify(send_data),
type : "post",
contentType : "application/json",
dataType : "json",
timeout : 5000,
success : function (contents) {
log("Success");
// 処理を継続
callbackSuccess({
resultType : "continue"
});
},
error : function(request, errorMessage) {
log("Error: Please check recipe or server satus");
// 処理を継続
callbackSuccess( {
resultType : "continue"
});
}
});
// 非同期のレスポンスを待つため、処理を停止する
// log("Stop processing and wait for response");
return {
resultType : "pause"
};
作成したレシピ
レシピではロジック機能のタイマーを使用して2分間隔で温度情報を送信し、データの確認のためモバイル機能のファイルを使用して日時情報と温度情報をファイルに出力しました。
また、データ送信を常に有効にするためバックグラウンドで実行する設定にしました。
データベースの仕様について
カラムはid,date,temperatureとして追加しました。
MariaDB [(none)]> CREATE DATABASE mesh_temp_db;
Query OK, 1 row affected (0.001 sec)
MariaDB [(none)]> show databases;
+--------------------+
| Database |
+--------------------+
| information_schema |
| mesh_temp_db |
| myTestDB1 |
| mysql |
| performance_schema |
| usage_status_table |
+--------------------+
6 rows in set (0.001 sec)
MariaDB [(none)]> USE mesh_temp_db;
Database changed
MariaDB [mesh_temp_db]> CREATE TABLE mesh_temp_table(id INT AUTO_INCREMENT, date DATETIME, temperature FLOAT, PRIMARY KEY (id));
Query OK, 0 rows affected (0.103 sec)
MariaDB [mesh_temp_db]> show tables;
+------------------------+
| Tables_in_mesh_temp_db |
+------------------------+
| mesh_temp_table |
+------------------------+
1 row in set (0.001 sec)
MariaDB [mesh_temp_db]> DESCRIBE mesh_temp_table;
+-------------+----------+------+-----+---------+----------------+
| Field | Type | Null | Key | Default | Extra |
+-------------+----------+------+-----+---------+----------------+
| id | int(11) | NO | PRI | NULL | auto_increment |
| date | datetime | YES | | NULL | |
| temperature | float | YES | | NULL | |
+-------------+----------+------+-----+---------+----------------+
3 rows in set (0.005 sec)
ローカルWebサーバー(Flask)側(SQL処理追加版)
import pymysql.cursors
from datetime import datetime
from flask import Flask
from flask import request
app = Flask(__name__)
@app.route('/send_data', methods=['POST'])
def send_data():
if request.method == 'POST':
data = request.json
incert_sql(data['dateValue'],data['temperatureValue'])
return data
def incert_sql(data1,data2):
# RapberyyPiで受信処理する場合
# 他デバイスで接続する場合はhost(ipアドレス)とportを追加する
connection = pymysql.connect(host='localhost',
user='root',
# mysqlで作成したユーザーのパスワードを指定する
password=[password],
database='mesh_temp_db',
cursorclass=pymysql.cursors.DictCursor)
with connection:
with connection.cursor() as cursor:
# string型のためdatetime型に変換する
dt_now = datetime.strptime(data1, '%Y-%m-%d %H:%M:%S')
# executeメソッドで実行する
sql = "INSERT INTO `mesh_temp_table` (`date`, `temperature`) VALUES (%s, %s)"
cursor.execute(sql, (dt_now,data2))
# コミットしてトランザクション実行
connection.commit()
# 終了処理
cursor.close()
if __name__ == "__main__":
# ローカルネットワーク内の接続を有効にしてサーバーを起動する
app.run(host='0.0.0.0')
実行結果
ファイルとデータベースの結果から日時情報と温度情報を追加できています。
日時情報も2分刻みで差異なく記録できています。
id1,id2で秒数の差異がありますが、個人的には許容範囲でした。
エラー処理とログ出力の対応
動作検証2でカスタムタグからローカルWebサーバー(Flask)経由でDBにデータを追加することを確認できました。
ここまででネットワーク経由でデータをデータベースに追加する目的は達成しましたが、SQL文の実行結果を確認するためにはDBを確認しなければわかりません。また、DB接続時のエラーを考慮していないため、例えばDBが起動していない場合はconnection()によるコンストラクタでOperationalError(予期しない接続遮断の発生、データソース名が見つからなかった、トランザクションが処理できなかった、処理中に発生したメモリアロケーションエラーなど)が発生し、サーバーが停止します。
そこでエラー発生時にサーバーが停止しないよう、エラー処理を作成し、エラー情報はログに出力して確認できるようにします。さらに、SQL文の実行結果をMESH側のログから確認できるようにします。
シーケンス図
動作検証2のコードをベースにシーケンス図を作成しました。
DBベースの接続から終了処理までをincert_sql()
で実行し、エラー処理を複数考慮しているため縦に長いシーケンスになっています。
追加する処理としては、DBベースとローカルWebサーバー間の情報を./db_processing.log
のログファイルに出力します。また、MESH側でSQL文の実行結果を確認するために、send_data()
の中で実行結果(json)を作成して戻り値として返すことで、ajaxメソッドのコールバックされるイベントハンドラで参照して実行結果を確認してログ出力します。
エラーについて
PyMYSQLではDBベースへの接続処理(connection)とカーソル処理で例外処理が定義されています。
仕様についてはPyMySQL’s documentation!に記載されていますが、PEP 249 – Python Database API Specification v2.0に基づき実装されています。
今回、エラー処理ではエラーが複数定義されているためtry文で実行時に検出したエラーを例外としてキャッチし、その時のエラー名を(e.__class__.__name__)
で取得してログに出力するようにしました。
日時情報と温度情報の送信してDBの実行結果を受け付ける(MESH SDK)
エラーはerrorMessageで受信しますが、ローカルWebサーバーとDBの実行結果を区別するため、DBの実行結果は通信が成功した場合にコールバックされるイベントハンドラで参照し結果をログに出力します。
// Execute
// アクセス先(ポート番号はデフォルト値)
var apiURL = "http://[サーバーのipアドレス]:5000/send_data";
// 日付オブジェクトを作成
var date = new Date();
// ハイフン区切りの文字列へ変換
function getDateString(date) {
return date.getFullYear() + "-"
+ ("0" + (date.getMonth() + 1)).slice(-2) + "-"
+ ("0" + date.getDate()).slice(-2) + (" ")
+ ("0" + date.getHours()).slice(-2) + (":")
+ ("0" + date.getMinutes()).slice(-2) + (":")
+ ("0" + date.getSeconds()).slice(-2)
};
// 変換済み日時データ(string型)
var dtNow = getDateString(date);
// 温度・湿度(Temperature & Humidity)タグ情報
var send_data = {
"dateValue": dtNow,
"temperatureValue" : messageValues.temperatureValue
};
// HTTPリクエスト(POST)送信
// HTTPレスポンスはローカルWebサーバー側(Flask)に返る
// log("HTTP Request / POST());
ajax ({
url : apiURL,
data : JSON.stringify(send_data),
type : "post",
contentType : "application/json",
dataType : "json",
timeout : 5000,
success : function(contents){
if(contents["result_num"] == 0){
// ローカルWebサーバー(Flask)は起動してるが、DBが停止している場合
log(contents["error_info"] + ": DB Incert NG. Please Check DB Status.");
// 処理を継続
callbackSuccess({
resultType : "continue"
});
}
else{
// 正常終了
log("Success: DB Incert OK. / " + contents['dateValue'] + " / " + contents['temperatureValue']);
// 処理を継続
callbackSuccess({
resultType : "continue"
});
}
},
// エラー
error : function(request, errorMessage){
if(errorMessage == "error"){
// ローカルWebサーバー(Flask)が停止している場合はerrorが返る
log(errorMessage + ": Please Check Local Web Server Status.");
}else if(errorMessage == "parsererror"){
// パース処理でエラーになる場合があるのでチェックする
log(errorMessage + ": Please Check Local Web Server Response Data.");
}
// 処理を継続
callbackSuccess({
resultType : "continue"
});
}
});
// 処理を停止し、応答を待つ
// log("Stop processing and wait for response");
return {
resultType : "pause"
};
ローカルWebサーバー(Flask)側(エラー処理とログ出力処理追加版)
ログ出力は[Python]ログ出力の基礎と考え方を参考にさせていただきました。
ログのフォーマットでは重大度、イベントの日時、ファイル名、関数名、行番号、ロガー名、メッセージを出力します。
import pymysql.cursors
from datetime import datetime
from flask import Flask
from flask import request
import logging
# ログ出力設定
logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)
handler = logging.FileHandler('./db_process.log')
handler.setLevel(logging.DEBUG)
formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(name)s - %(filename)s - %(funcName)s - %(lineno)d - %(message)s')
handler.setFormatter(formatter)
logger.addHandler(handler)
# SQL実行結果判別用定数
INCERT_DB_NG = 0
INCERT_DB_OK = 1
# Flaskクラスのインスタンス作成
app = Flask(__name__)
# エンドポイント設定
# send_data()を指定したURLにマッピング
@app.route('/send_data', methods=['POST'])
# HTTPリクエスト(POST)を受けてSQLを実行する関数
def send_data():
result = INCERT_DB_NG
error_status = 'NoError'
if request.method == 'POST':
data = request.json
# テーブルにデータを登録し、実行結果を取得する
result, error_status = incert_sql(data['dateValue'],data['temperatureValue'])
# 実行結果をログに出力
logger.info('incert_db_result: %s error_status: %s data: %s', result, error_status, data )
# 実行結果作成(json)
response_json = {'result_num': result, 'error_info': error_status}
response_json.update(data);
# 実行結果を返す
return response_json
# テーブルにデータを登録する関数
def incert_sql(data1,data2):
# SQL実行結果用変数を初期化
result = INCERT_DB_OK
error_status = 'NoError'
# RapberyyPiで受信処理する場合
# 他デバイスで接続する場合はhost(ipアドレス)とportを追加する
try:
# 接続先DB情報を指定してpymysql.connections.Connection objectを作成し、DBに接続する(ソケット通信)
# See `Connection <https://www.python.org/dev/peps/pep-0249/#connection-objects>`_ in the specification.
connection = pymysql.connect(host='localhost',
user='root',
# mysqlで作成したユーザーのパスワードを指定する
password=[password],
database='mesh_temp_db',
cursorclass=pymysql.cursors.DictCursor)
# connect()メソッドエラーチェック
# GithubよりエラーはBEP仕様に従い設定しているので各エラーをチェックする
# Connection Object / See Connection in the specification.
# https://pymysql.readthedocs.io/en/latest/modules/connections.html
# connection.py
# https://github.com/PyMySQL/PyMySQL/blob/main/pymysql/connections.py
# errp.py
# https://github.com/PyMySQL/PyMySQL/blob/main/pymysql/err.py
# PEP 249 – Python Database API Specification v2.0 / Module Interface / Exceptions
# https://peps.python.org/pep-0249/#connection-objects
except (pymysql.err.InterfaceError,\
pymysql.err.DatabaseError,\
pymysql.err.DataError,\
pymysql.err.OperationalError,\
pymysql.err.ProgrammingError,\
pymysql.err.NotSupportedError
) as e:
# エラー名を取得
error_status = e.__class__.__name__
logger.error('error_status: %s', error_status)
result = INCERT_DB_NG
return result, error_status
# pymysql.cursors.DictCursor objectを作成してレコードを挿入する
cursor = connection.cursor()
# string型のためdatetime型に変換する
dt_now = datetime.strptime(data1, '%Y-%m-%d %H:%M:%S')
# executeメソッドで実行する
sql = "INSERT INTO `mesh_temp_table` (`date`, `temperature`) VALUES (%s, %s)"
# execute()メソッドエラーチェック
# https://github.com/PyMySQL/PyMySQL/blob/main/pymysql/cursors.py
# より、Exceptionsはconnect()メソッド同様にエラー定義が存在するため同様にエラーチェックする
try:
# :return: Number of affected rows.
cursor.execute(sql, (dt_now,data2))
except (pymysql.err.InterfaceError,\
pymysql.err.DatabaseError,\
pymysql.err.DataError,\
pymysql.err.OperationalError,\
pymysql.err.ProgrammingError,\
pymysql.err.NotSupportedError
) as e:
# エラー名を取得
error_status = e.__class__.__name__
logger.error('error_status: %s', error_status)
result = INCERT_DB_NG
return result, error_status
# コミットしてトランザクション実行
try:
# 戻り値なし
# See `Connection.commit() <https://www.python.org/dev/peps/pep-0249/#commit>`_in the specification.
connection.commit()
# https://github.com/PyMySQL/PyMySQL/blob/d1748350b9b6b4efdcead428fad2fbcdb7cfddd0/pymysql/connections.py#L791
# err.InterfaceError
# https://github.com/PyMySQL/PyMySQL/blob/d1748350b9b6b4efdcead428fad2fbcdb7cfddd0/pymysql/connections.py#L441
# err.OperationalError
except (pymysql.err.InterfaceError,\
pymysql.err.OperationalError
) as e:
# エラー名を取得
error_status = e.__class__.__name__
logger.error('error_status: %s', error_status)
result = INCERT_DB_NG
return result, error_status
# 終了処理
# With文を使用しない場合は、自動的に接続を閉じないためclose()メソッドで明示的に閉じる
# pymysql.cursors.DictCursor destructor / 戻り値なし / 例外なし
cursor.close()
try:
# pymysql.connections.Connection destructor / 戻り値なし / 例外あり
# See `Connection.close() <https://www.python.org/dev/peps/pep-0249/#Connection.close>`_in the specification.
# Close the connection now (rather than whenever .__del__() is called).
connection.close()
except (pymysql.err.Error) as e:
# :raise Error: If the connection is already closed.
# エラー名を取得
error_status = e.__class__.__name__
logger.error('error_status: %s', error_status)
result = INCERT_DB_NG
return result, error_status
else:
# INCERT_DB_OK
return result, error_status
if __name__ == "__main__":
# ローカルネットワーク内の接続を有効にしてサーバーを起動する
app.run(host='0.0.0.0')
実行結果
1分間隔で下記処理を①→①→②→①→③→①の順に実施し、テーブルの値とログを確認しました。
①追加処理
②ローカルWebサーバー停止処理
③DBの停止処理
mesh_temp_tableから、データが①の時のみ追加されており、
ログ(MESH)から計6回分の実行結果とそれぞれについてDBへの追加処理が成功した場合とエラー情報を出力できていることを確認できました。(②のケースでは通信が失敗した場合にコールバックされるイベントハンドラで処理されるため、db_process.logには出力されていません。)
また、db_process.logでは行番号は上記app.py記載の数字とは若干のずれがありますが、DB停止時のエラー(OperationalError)を出力できていることを確認できました。
2022-08-13 17:51:38,319 - INFO - __main__ - app.py - send_data - 37 - incert_db_result: 1 error_status: NoError data: {'dateValue': '2022-08-13 17:51:38', 'temperatureValue': 24.8}
2022-08-13 17:52:38,351 - INFO - __main__ - app.py - send_data - 37 - incert_db_result: 1 error_status: NoError data: {'dateValue': '2022-08-13 17:52:38', 'temperatureValue': 24.8}
2022-08-13 17:54:38,401 - INFO - __main__ - app.py - send_data - 37 - incert_db_result: 1 error_status: NoError data: {'dateValue': '2022-08-13 17:54:38', 'temperatureValue': 24.9}
2022-08-13 17:55:38,529 - ERROR - __main__ - app.py - incert_sql - 87 - error_status: OperationalError
2022-08-13 17:55:38,531 - INFO - __main__ - app.py - send_data - 37 - incert_db_result: 0 error_status: OperationalError data: {'dateValue': '2022-08-13 17:55:38', 'temperatureValue': 24.9}
2022-08-13 17:56:38,668 - INFO - __main__ - app.py - send_data - 37 - incert_db_result: 1 error_status: NoError data: {'dateValue': '2022-08-13 17:56:38', 'temperatureValue': 24.9}
まとめ
MESH + Flask + MariaDBの構成で温度・湿度タグから取得した温度情報をローカルWebサーバ経由でデータベースに追加する方法を検討しました。
RapberryPiとMESHで作成して確認したところ、複数他接続のケースは確認していませんが、一定間隔の追加要求に対して正しく処理(データ追加)できることがわかりました。
また、エラー処理とログ出力に対応することでエラー発生時に確認が容易になり、SQL文の実行結果をMESH側のログからHTTPリクエスト(DB追加要求)に対する実行結果の確認できるようにしました。
エラー処理については、今回全てのエラーケースは確認していませんが、PEP249とPyMYSQLのコード(GitHub)と仕様から確認しました。MySQL/MariaDB等に接続する他のドライバについても同じ処理をしていると思うので何かの参考になればと思います。
今後はデータ追加処理とは別にデータベースからデータを取得して解析する方法を検討してみたいと思います。データ取得処理はPandas,Matplotlibを使えば比較的自由にデータ処理できると考えています。
その他
使用したMESHアプリについてはIPhoneのバッググラウンド実行をサポートしているためbluetoothの接続が問題なければ正しく動作しました。ただ、スマホと距離が離れるとボタンタグに比べて温度・湿度タグは接続が切れることがありました。公式では最大通信距離が見通し距離約10mとありますが、電源を接続すると比較的安定したため、スマホとの距離を置く場合は個人的には電源接続をした方が良いかと思います。
また、以前投稿した記事でMESHハブアプリケーションを使用しましたが、MESHハブアプリケーションはサポートがRaspberry PiでOSがRaspberry Pi OS Stretch with Desktop (GUI) / Lite (CUI)3B+/3Bのみで更新も止まっています。公式もサポートを継続するのかわからない状態ですが、個人的には開発言語をJavascript以外もサポートしてくれたら嬉しいのですが、このまま更新はされないのではないかと思っています。
参考
【Flask】JQueryでAjax通信をする手順。〜 JavaScript初心者向け 〜
https://sunnyday-travel-aso-6487.ssl-lolipop.jp/programing/python/flask/ajax/
Flask Quickstart
https://flask.palletsprojects.com/en/2.2.x/quickstart/
PythonでMySQLを操作する(PyMySQL)
https://python-work.com/pymysql/
JavaScriptのDate型を「2019-11-06」みたいな形式に変換
https://qiita.com/takezone/items/8134cc49df0793dec325
Python で例外のエラーメッセージを取得するコード例
https://srbrnote.work/archives/5125