ここではサンプルのIoTセンサーからのデータを使ってTensorFlowで機械学習し、検知されたセンサーデータからアマチュア/普通/プロの判定をしてみます。
1. TensorFlowを準備します
- BluemixのDockerコンテナ環境にTensorFlowをインストールするを参照し、Bluemix環境にTensorFlowを準備します
- 「デプロイ」ステージを構成するステップで、portとして、8888以外に、6006と9000も開けておくようにします
3. Python実行環境にflaskとpandasを入れておきます。flaskはpip istall flask
などで、pandasはpip install pandas
などでインストール可能です
4. TenserFlowのDockerコンテナのデプロイによりBluemixから提供されるIPアドレスのうち、「パブリックIP」を使用してブラウザからアクセスします。flaskサーバーは「プライベートIP」を指定して9000ポートを開けます
2. サンプルを準備します
- 用意したTensorFlow環境に下記のソースを配置します。
-
model.py
でモデルを定義します- 隠れ層2でそれぞれ32、16個のReLuで構成しています
-
COLUMN_SIZE
に対象にするデータの項目数を指定します。このサンプルでは、'accel', 'gyro', 'temp', 'humid'を対象にしています -
actual_class
に判定に使用する目的変数の項目数を指定します。このサンプルでは amateur = 0, regular = 1, pros = 2の3個の値を使用しています - ここではGradientDescentOptimizerを使用していますが、AdamOptimizerなど異なるアルゴリズムを使用する事も可能です
- 学習100回毎に学習とテストの精度を出力します
- その他詳細はTensorFlowのTutorialをご参照ください
# -*- coding: utf-8 -*-
import tensorflow as tf
import numpy as np
class Model:
HIDDEN_UNIT_SIZE = 32
HIDDEN_UNIT_SIZE2 = 16
# 対象データの項目数
COLUMN_SIZE = 4
def __init__(self):
self.__setup_placeholder()
self.__setup_model()
self.__setup_ops()
def __enter__(self):
self.session = tf.Session()
return self
def __exit__(self, exc_type, exc_value, traceback):
self.session.close()
return False
def save(self, path):
saver = tf.train.Saver()
saver.save(self.session, path)
def restore(self, path):
saver = tf.train.Saver()
saver.restore(self.session, path)
def __setup_placeholder(self):
column_size = Model.COLUMN_SIZE
self.iot_data = tf.placeholder("float", [None, column_size])
# 目的変数が3値
self.actual_class = tf.placeholder("float", [None, 3])
self.keep_prob = tf.placeholder("float")
self.label = tf.placeholder("string")
def __setup_model(self):
column_size = Model.COLUMN_SIZE
w1 = tf.Variable(tf.truncated_normal([column_size, Predictor.HIDDEN_UNIT_SIZE], stddev=0.1))
b1 = tf.Variable(tf.constant(0.1, shape=[Predictor.HIDDEN_UNIT_SIZE]))
h1 = tf.nn.relu(tf.matmul(self.iot_data, w1) + b1)
w2 = tf.Variable(tf.truncated_normal([Predictor.HIDDEN_UNIT_SIZE, Predictor.HIDDEN_UNIT_SIZE2], stddev=0.1))
b2 = tf.Variable(tf.constant(0.1, shape=[Predictor.HIDDEN_UNIT_SIZE2]))
h2 = tf.nn.relu(tf.matmul(h1, w2) + b2)
h2_drop = tf.nn.dropout(h2, self.keep_prob)
# 目的変数が3値
w2 = tf.Variable(tf.truncated_normal([Predictor.HIDDEN_UNIT_SIZE2, 3], stddev=0.1))
# 目的変数が3値
b2 = tf.Variable(tf.constant(0.1, shape=[3]))
self.output = tf.nn.softmax(tf.matmul(h2_drop, w2) + b2)
def __setup_ops(self):
cross_entropy = -tf.reduce_sum(self.actual_class * tf.log(self.output))
self.summary = tf.scalar_summary(self.label, cross_entropy)
# 異なるアルゴリズムを使用可能
# self.train_op = tf.train.AdamOptimizer(0.0001).minimize(cross_entropy)
self.train_op = tf.train.GradientDescentOptimizer(0.0001).minimize(cross_entropy)
self.merge_summaries = tf.merge_summary([self.summary])
correct_prediction = tf.equal(tf.argmax(self.output,1), tf.argmax(self.actual_class,1))
self.accuracy = tf.reduce_mean(tf.cast(correct_prediction, "float"))
class Trainer(Model):
def train(self, steps, data):
self.__prepare_train(self.session)
for i in range(steps):
self.__do_train(self.session, i, data)
if i %100 == 0:
self.__add_summary(self.session, i, data)
self.__print_status(self.session, i, data)
def __prepare_train(self, session):
self.summary_writer = tf.train.SummaryWriter('logs', graph_def=session.graph_def)
session.run(tf.initialize_all_variables())
def __do_train(self, session, i, data):
session.run(self.train_op, feed_dict=self.train_feed_dict(data))
def __add_summary(self, session, i, data):
summary_str = session.run(self.merge_summaries, feed_dict=self.train_feed_dict(data))
summary_str += session.run(self.merge_summaries, feed_dict=self.test_feed_dict(data))
self.summary_writer.add_summary(summary_str, i)
def __print_status(self, session, i, data):
train_accuracy = session.run(self.accuracy, feed_dict=self.train_feed_dict(data))
test_accuracy = session.run(self.accuracy, feed_dict=self.test_feed_dict(data))
print 'step {} ,train_accuracy={} ,test_accuracy={} '.format(i, train_accuracy, test_accuracy)
def train_feed_dict(self, data):
return {
self.iot_data: data.train_data(),
self.actual_class: data.train_up_down(),
self.keep_prob: 0.8,
self.label: "train"
}
def test_feed_dict(self, data):
return {
self.iot_data: data.test_data(),
self.actual_class: data.test_up_down(),
self.keep_prob: 1,
self.label: "test"
}
class Predictor(Model):
def predict( self, data ):
return self.session.run(tf.argmax(self.output,1), feed_dict=self.predict_feed_dict(data))
def predict_feed_dict(self, data):
return {
self.iot_data: data,
self.keep_prob: 1,
self.label: "predict"
}
-
iot_results_loader.py
で学習用のデータをロードします- ここでは、サンプル用データを使用しています
-
outcome
項で結果を定義しています - 全体の2/3を学習データに、1/3をテストデータに使用しています
# -*- coding: utf-8 -*-
import pandas as pd
class IoTResults:
def __float__(data):
return 1.0
def __init__(self, data):
self.raw = data.copy()
self.data = data
def predictor_data(self):
return self.data
def train_data(self):
return self.__drop_outcome(self.__train_data())
def test_data(self):
return self.__drop_outcome(self.__test_data())
def train_up_down(self):
return self.__good_bad(self.__train_data()["outcome"])
def test_up_down(self):
return self.__good_bad(self.__test_data()["outcome"])
def __train_data(self):
# 全データの 2/3 を訓練データとして使用
return self.data.loc[lambda df: df.index % 3 != 0, :]
def __test_data(self):
# 全データの 1/3 をテストデータとして使用
return self.data.loc[lambda df: df.index % 3 == 0, :]
def __drop_outcome(self, data):
return data.drop("outcome", axis=1)
def __good_bad(self, outcome):
return outcome.apply(
lambda p: pd.Series([
1 if p <= 0 else 0,
1 if p ==1 else 0,
1 if p > 1 else 0
], index=['amateur', 'regular', 'pros']))
class IoTResultsLoader:
def retrieve_iot_data(self):
df = pd.DataFrame(
{'accel': [5,5,23,8,7,8,16,0,7,5,6,18,7,0,8,13,6,7,8,6,8,16,8,1,8,7,18,10,8,7,9,0,7,6,14,12,8,5,7,6,8,1,8,7,10,6,7,7,8,18,7,6,8,0,8,6,5,0,8,16,7,8,6,7,9,9,8,7,8,0,8,7,6,7,9,8,9,15,8,7,6,1,8,7,9,7,6,9,8,14,7,6,7,9,8,0,7,8,8,9],
'gyro': [4,8,20,6,2,4,12,2,12,6,8,14,6,2,8,22,0,10,6,4,8,18,2,4,4,8,20,8,4,6,4,2,4,6,12,12,2,6,4,2,10,2,8,4,6,2,0,4,0,14,2,4,0,4,10,8,4,2,4,12,8,4,2,0,4,0,8,4,6,2,2,0,4,8,6,4,0,12,0,4,8,1,10,4,2,0,2,4,6,12,4,2,2,0,4,3,4,0,2,4],
'temp': [20,18,25,21,23,25,26,8,19,18,20,48,22,6,24,42,23,19,20,18,23,35,22,6,24,22,40,19,20,30,22,7,28,24,33,28,25,30,28,29,27,6,12,20,18,20,22,21,22,34,28,30,26,7,25,20,19,6,21,27,18,20,18,20,24,26,22,28,24,8,24,25,21,22,28,26,19,32,23,21,24,6,21,20,19,22,25,27,26,34,24,20,23,19,22,7,27,23,24,18],
'humid': [50,44,58,56,48,42,60,40,58,54,56,60,44,40,58,62,50,52,54,56,48,46,50,42,56,60,64,52,56,60,58,44,44,48,54,52,56,60,62,64,58,48,58,48,48,50,54,60,62,52,48,44,50,54,52,58,46,48,50,56,62,60,54,48,44,52,54,58,50,48,48,52,58,62,60,52,60,58,56,48,44,52,56,54,58,48,52,54,50,58,54,56,62,60,60,52,48,46,58,48],
'outcome': [1,1,0,1,1,1,0,2,1,1,1,0,1,2,1,0,1,1,1,1,1,0,1,2,1,1,0,1,1,1,1,2,1,1,0,0,1,1,1,1,1,2,1,1,1,1,1,1,1,0,1,1,1,2,1,1,1,2,1,0,1,1,1,1,1,1,1,1,1,2,1,1,1,1,1,1,1,0,1,1,1,2,1,1,1,1,1,1,1,0,1,1,1,1,1,2,1,1,1,1]})
return df
-
train.py
で学習させます- 必要に応じ学習回数を変更して試してみます
- 学習結果が./model.ckptに保存されます
# -*- coding: utf-8 -*-
from iot_results_loader import *
from model import *
loader = IoTResultsLoader()
data = IoTResults(loader.retrieve_iot_data())
with Trainer() as trainer:
# 学習回数
trainer.train(3000, data)
trainer.save("./model.ckpt")
-
server.py
でサーバーを実行します- flaskで9000ポートでアクセスします。TenserFlowのDockerコンテナのデプロイによりBluemixから提供されるIPアドレスのうち、パブリックIPを使用してブラウザからアクセスします。flaskサーバーはプライベートIPへ向け9000ポートを開けます
# -*- coding: utf-8 -*-
import pandas as pd
from flask import Flask, jsonify, request
from iot_results_loader import *
from model import *
loader = IoTResultsLoader()
predictor = Predictor()
predictor.__enter__()
predictor.restore("./model.ckpt")
iot_data = loader.retrieve_iot_data()
# webapp
app = Flask(__name__)
@app.route('/api/predictor', methods=['POST'])
def predict():
data = pd.DataFrame(eval(request.data))
results = predictor.predict(IoTResults(data).predictor_data().iloc[[0]])
return jsonify(result=("pros!" if results[0] == 2 else "regular" if results[0] == 1 else "amateur!"))
if __name__ == '__main__':
app.run(host='172.30.0.3', port=9000)
- Node-REDからアクセスします
[{"id":"e1d1df28.30cdb","type":"inject","z":"23ccc58c.26b582","name":"","topic":"","payload":"","payloadType":"date","repeat":"","crontab":"","once":false,"x":120,"y":100,"wires":[["c3e01383.ee175"]]},{"id":"81d9e766.6e82","type":"http request","z":"23ccc58c.26b582","name":"","method":"POST","ret":"txt","url":"http://xx.xx.xx.xx:9000/api/predictor","tls":"","x":450,"y":160,"wires":[["40a2e5d8.976e6c"]]},{"id":"11916de2.fb6c0a","type":"debug","z":"23ccc58c.26b582","name":"","active":true,"console":"false","complete":"false","x":690,"y":240,"wires":[]},{"id":"c3e01383.ee175","type":"template","z":"23ccc58c.26b582","name":"amateur","field":"payload","fieldType":"msg","format":"handlebars","syntax":"mustache","template":"{'accel': [13], 'gyro': [20], 'temp': [25], 'humid': [58]}\t","x":280,"y":100,"wires":[["81d9e766.6e82"]]},{"id":"c00d807a.6e942","type":"template","z":"23ccc58c.26b582","name":"pros","field":"payload","fieldType":"msg","format":"handlebars","syntax":"mustache","template":"{'accel': [0], 'gyro': [2], 'temp': [8], 'humid': [40]}\t","x":270,"y":220,"wires":[["81d9e766.6e82"]]},{"id":"ba871882.501c9","type":"template","z":"23ccc58c.26b582","name":"regular","field":"payload","fieldType":"msg","format":"handlebars","syntax":"mustache","template":"{'accel': [5], 'gyro': [4], 'temp': [20], 'humid': [50]}\t","x":270,"y":160,"wires":[["81d9e766.6e82"]]},{"id":"473e7329.e15e1c","type":"inject","z":"23ccc58c.26b582","name":"","topic":"","payload":"","payloadType":"date","repeat":"","crontab":"","once":false,"x":120,"y":220,"wires":[["c00d807a.6e942"]]},{"id":"ca48901b.3e3678","type":"inject","z":"23ccc58c.26b582","name":"","topic":"","payload":"","payloadType":"date","repeat":"","crontab":"","once":false,"x":120,"y":160,"wires":[["ba871882.501c9"]]},{"id":"f662f04b.eb2808","type":"template","z":"23ccc58c.26b582","name":"出力","field":"payload","fieldType":"msg","format":"handlebars","syntax":"mustache","template":"あなたは {{payload}} ですね!","x":530,"y":240,"wires":[["11916de2.fb6c0a"]]},{"id":"40a2e5d8.976e6c","type":"function","z":"23ccc58c.26b582","name":"取り出し","func":"var data = JSON.parse(msg.payload);\nmsg.payload = data.result;\nreturn msg;\n","outputs":1,"noerr":0,"x":620,"y":160,"wires":[["f662f04b.eb2808"]]}]
3. 実行します
-
python server.py
でサーバーを実行します -
python -m tensorflow.tensorboard --logdir=./logs
などでtensorboardを稼働しブラウザで6006ポートへアクセスして学習の状況などを確認します。IPアドレスはTenserFlowのDockerコンテナのデプロイによりBluemixから提供されるIPアドレスのうち、パブリックIPを使用してブラウザからアクセスします。
参考
TensorFlowを使った為替(FX)のトレードシステムを作るチュートリアルを大変参考にさせていただきました。どうもありがとうございます。