前回までのあらすじ
前回まででエラーも考慮して自動測定ができるようになりました。今回はslackのAPIを用いて、測定の終了をslackに通知できるようにします。他のチャットツール(Discord ,Lineなど)であっても、トークンを取得してAPIに要求を送るという流れは同じなので、様々な用途に利用できると思います。
環境
- Windows10
- Anaconda(3.x)
- (2)でpySerialを
conda install
しました
やっていること
pythonのslackclientパッケージが私の環境では動かなかったので、
などを参考にして、
- requests パッケージを用いて
- slackのwebページで取得したトークンを用いて
slackAPIを使って任意のメッセージを、任意のチャンネルに送れます。以下のコードで変更すべき点は
- token
- send_message("ココ", ココ(チャンネル名))
のみで、送れるはずです
mentionの付け方は
このサイトを参考にしてください
import serial
import time
import pandas as pd
## 定数指定
MAX = 40000
COMampere = "COM10"
COMpulse = "COM9"
bitRate = 9600
## 変数初期化
pulse = 0
ampere_list = []
pulse_list = []
ampere_average_list =[]
## 奥原点移動(初期化)
ser = serial.Serial(COMsigma, bitRate, timeout=0.1)
ser.write(b"H:2-\r\n")
# time.sleep(0.1)
# print(ser.read_all())
ser.close()
import requests
class SlackDriver:
def __init__(self, _token):
self._token = _token # api_token
self._headers = {'Content-Type': 'application/json'}
def send_message(self, message, channel):
params = {"token": self._token, "channel": channel, "text": message}
r = requests.get('https://slack.com/api/chat.postMessage',
headers=self._headers,
params=params)
print("return ", r.json())
token = 'xxxx-oooooooooooo-000000000000-hogehoge' # この部分を取得したトークンにしてください
slack = SlackDriver(token)
# 計測開始
try:
while 1:
if pulse >= MAX:
## 位置がMAXまで来ている場合while文を終了
break
if pulse ==2000:
slack.send_message("<@IDhogehoge> 1000おわったよ", "#bot-test") # 取得したIDは<>でくくってください
if pulse ==30000:
slack.send_message("<@IDhogehoge> 15000おわったよ", "#bot-test")
## 現在位置の情報を記録
pulse_list.append(pulse/2)
## 電流を測定する(5回とって平均したものをその位置での値とする)
for i in range(5):
ser = serial.Serial(COMampere,bitRate,timeout=0.1)
ser.write(b"F5, R0,PR2\r\n")
time.sleep(1)
ser.write(b"MD?\r\n")
time.sleep(1)
tmp = ser.read_all()
# 電流が取れていない場合はスキップする
if len(tmp)== 0:
ser.close()
continue
ampere = float(tmp.split()[2])
ampere_average_list.append(ampere)
time.sleep(1)
ser.close()
## 電流とpulse(位置)をlistに追加
ampere_list.append(sum(ampere_average_list)/len(ampere_average_list))
ampere_average_list = []
## 光学台を動かす
pulse += 1000
position = "A:2+P"+str(pulse)+"\r\n"
ser = serial.Serial(COMpulse,bitRate,timeout=0.1)
ser.write(bytes(position, 'UTF-8'))
time.sleep(1)
ser.write(b"G:\r\n")
ser.close()
## リストをdataframeに変える
print(ampere_list)
print(pulse_list)
df = pd.DataFrame({'ampere(A)':ampere_list,'pulse':pulse_list})
def pulseToMilliMeter(pulse):
return pulse*0.006
df["position(mm)"] = df["pulse"].map(pulseToMilliMeter)
df.to_csv('./csv/result.csv',index=False)
plt.figure()
df.plot(x='position(mm)',y='ampere(A)',marker='o')
plt.savefig('./img/sample.png')
plt.close('all')
except IndexError:
ser.close()
slack.send_message("<@IDhogehoge> 測定失敗してるよ", "#bot-test")
# slackに通知したい
slack.send_message("<@IDhogehoge> 測定終わったよ", "#bot-test")
## 変数初期化
pulse = 0
ampere_list = []
pulse_list = []
ampere_average_list =[]
## 奥原点移動(初期化)
ser = serial.Serial(COMpulse, bitRate, timeout=0.1)
ser.write(b"H:2-\r\n")
time.sleep(0.1)
print(ser.read_all())
ser.close()