最近のアトリエ秋葉原さんのイベントで、質問やコメントをSlackに投稿すると現場のPepperが発話するというアプリがすごく良くて、自分も作ってみようと思ったのがきっかけです。
実際にアプリを作られた方にお聞きしたところ、SlackのReal Time Messaging APIを使用しているとのこと。ハードルが高そう・・(実際自分にはハードルが高かった)と思ったのですが、先駆者の方々の情報を元に、なんとか形になりましたので投稿します。
注意事項
ソフトバンクロボティクス株式会社のPepperを活用し、独自開発したものです。
開発環境
MacOS Sierra 10.12.2
Choregraphe 2.4.3.28
Choregraphe 2.5.5.5
Pepper
参考
こちらの記事を参考にさせていただきました。
Pepper ハッカソンに役に立ちそうな ボックスを 4つ
プログラム
プログラムは、Slackの特定のchannelに投稿されたメッセージを Animated Say Text で発話するというものです。頭(前方)をタッチで終了です。
Real Time Messaging APIの接続について
SlackのReal Time Messaging APIの接続先は固定ではなく、rtm.startメソッドで動的に取得したURLを使用して、WebSocketで接続する必要があります。また、このURLの接続までの有効期限は30秒です。
また、接続後もしばらくすると自動で切断されてしまうため、こまめにpingを送り、接続を維持します。
WebSocketライブラリの準備
PepperにはPythonのWebSocket関連のモジュールがありません。そのため、プロジェクト内に必要なモジュールを保存しておき、使用するボックスでその保存先をロードパスに含める必要があります。
プロジェクトフォルダにlibフォルダを作成し、その配下にモジュールを格納します。
モジュールの取得方法については、こちらの記事を参考にいたしました。
Pepper ハッカソンに役に立ちそうな ボックスを 4つ
使用するボックス側では、 importの前に、先ほどのプロジェクトフォルダのlibも検索対象になるように設定します。(ビヘイビアパスを基点にした相対パスでlibを指定しているので、ビヘイビアの位置を変更したらそれに合わせて変更してください)
また、ボックス終了時にパスから削除します。
# websocket関連のモジュールのパスを追加
import sys, os
self.folderName = os.path.join(self.framemanager.getBehaviorPath(self.behaviorId), "../lib")
if self.folderName not in sys.path:
sys.path.append(self.folderName)
self.pathModified = True
if self.pathModified and self.folderName and self.folderName in sys.path:
sys.path.remove(self.folderName)
作成したボックスについて
全部で4つ作成しました。ボックスは https://github.com/piroku/SlackSample で公開しています。
bx_Slack_ChannelListボックス
channels.listメソッドを使用し、channelのリストを取得します。あとで使用するReal Time Messaging APIのmessageイベントでは、投稿されたchannelがid値で送られてくるので、channel名を変換するマスタとして取得しておきます。取得した情報は、ALMemoryを使用して、キー(SlackSample/Channels)に格納します。
ボックスのパラメータ
channels.listメソッドのurlとSlackのWebAPIを使用するためのtokenを指定します。tokenは、こちらで取得したものをパラメータに設定してください。
ソース(一部)
def onInput_onStart(self):
try:
import requests, json
# Slack Web API token
token = self.getParameter('token')
# Web API https://slack.com/api/channels.list
url = self.getParameter('url')
# self.logger.info(url)
payload = {'token' : token}
response = requests.post(url, data=payload).json()
okresult = str(response['ok'])
if(okresult=="True"):
# Channel Listを取得 メモリに格納
channels = json.dumps(response['channels'])
self.memory.insertData("SlackSample/Channels", channels)
# onStopped
self.onStopped()
elif(okresult=="False"):
raise Exception('channels.list error: %s' % response['error'].encode('utf-8'))
else:
raise Exception('channels.list error')
except Exception as e:
#self.logger.error('Failed to http post')
#self.logger.error('message:' + str(e))
# onErrorに出力
raise Exception, e
bx_Slack_rtmstartボックス
rtm.startメソッドを使用し、Real Time Messaging APIの接続先のURLを取得します。取得したURLの文字列は、onStopped()で出力します。
ボックスのパラメータ
rtm.startメソッドのurlとSlackのWebAPIを使用するためのtokenを指定します。(tokenは前述のものと同じです)
ソース(一部)
def onInput_onStart(self):
try:
import requests, json
# Slack Web API token
token = self.getParameter('token')
# Web API rtm.start url https://slack.com/api/rtm.start
url = self.getParameter('url')
# self.logger.info(url)
payload = {'token' : token}
response = requests.post(url, data=payload).json()
okresult = str(response['ok'])
if(okresult=="True"):
# WebSocketのurlを取得。
wsurl = response['url'].encode('utf-8')
# onStoppedに出力
self.onStopped(wsurl)
elif(okresult=="False"):
raise Exception('rtm.start error: %s' % response['error'].encode('utf-8'))
else:
raise Exception('rtm.start error')
except Exception as e:
#self.logger.error('Failed to http post')
#self.logger.error('message:' + str(e))
# onErrorに出力
raise Exception, e
bx_Slack_rtmボックス
bx_Slack_rtmstartボックスで取得したURLをonStartに指定し、WebSocketで接続します。接続が確立するとhelloイベントが送られてきますので、pingのタイマーを開始します。また、イベントはon_messageに通知されてくるので、イベントのうちmessageイベントのみを onTypeMessage()に出力します。
ボックスのパラメータ
ソース
path文字列を修正しました
naoqi2.5にて No module named urllib.parse のエラーが発生する問題を修正。(sixのバージョン違いによる) yuya-mさんありがとうございます。
class MyClass(GeneratedClass):
def __init__(self):
GeneratedClass.__init__(self)
def onLoad(self):
self.framemanager = ALProxy('ALFrameManager')
self.folderName = None
self.pathModified = False
self.ws = None
self.bOpening = False
self.periodic_task = None
def onUnload(self):
if self.ws != None:
self.ws.close()
if self.pathModified and self.folderName and self.folderName in sys.path:
sys.path.remove(self.folderName)
self.folderName = None
self.pathModified = False
self.on_pingStop
self.ws = None
self.bOpening = False
self.periodic_task = None
def onInput_onStart(self, p):
# naoqi2.5
system = ALProxy('ALSystem')
version = system.systemVersion()
system = None
# websocket関連のライブラリのパスを追加
import sys, os
self.folderName = os.path.join(self.framemanager.getBehaviorPath(self.behaviorId), "../lib/")
if self.folderName not in sys.path:
if version.startswith('2.5.') == True:
# naoqi2.5 six対策
sys.path.insert(0, self.folderName)
else:
sys.path.append(self.folderName)
self.pathModified = True
if self.bOpening:
self.logger.info('websocket client already opening..')
return
# websocket通信の開始
import websocket
self.serverURI = p
websocket.enableTrace(True)
self.logger.info(self.serverURI)
self.ws = websocket.WebSocketApp(self.serverURI,
on_message = self.on_message,
on_error = self.on_error,
on_close = self.on_close)
self.ws.on_open = self.on_open
try:
self.ws.run_forever()
except Exception as e:
# self.logger.error('Failed to websocket')
# self.logger.error('message:' + str(e))
raise Exception, e
def onInput_sendMessage(self, data):
import json
if self.bOpening:
#self.logger.info("send!!")
data = json.dumps(data)
self.ws.send(data)
self.onMessageSent()
def on_message(self, ws, message):
import json
data = json.loads(message)
self.onMessage(json.dumps(data))
type = data['type'].encode('utf-8')
self.logger.info(type)
# type判定
if(type == "hello"):
# 接続成功
#self.logger.info("start!!")
self.on_pingStart()
elif(type == "message"):
self.onTypeMessage(json.dumps(data))
elif(type == "pong"):
# pingの返答
pass
def on_error(self, ws, error):
self.bOpening = False
self.logger.info('WebSocket Error: %s' % error)
def on_close(self, ws):
self.bOpening = False
self.on_PingStop()
self.logger.info('Websocket closed')
def on_open(self, ws):
self.bOpening = True
self.logger.info('Websocket open')
def on_pingStart(self):
import qi
self.periodic_task = qi.PeriodicTask()
self.periodic_task.setName("SlackPingTask")
self.periodic_task.setCallback(self.on_ping)
self.periodic_task.setUsPeriod(self.getParameter('ping (s)')*1000*1000)
self.periodic_task.start(False)
def on_ping(self):
import json
ping = {'type':'ping'}
# ping を送る。
self.onInput_sendMessage(ping)
def on_pingStop(self):
if self.periodic_task != None:
self.periodic_task.stop()
def onInput_onStop(self):
self.onUnload() #it is recommended to reuse the clean-up as the box is stopped
self.onStopped() #activate the output of the box
bx_filter_messageボックス
messageイベントには、Slackチームの全てのchannelの投稿が通知されてきます。全ての投稿を発話するわけにはいかないので、特定のchannelの投稿かどうかをこのボックスで判断します。messageイベントで通知されるjsonデータには、channelの情報がありますが、channel名ではなく、IDとなっています。そのため、事前にbx_Slack_ChannelListボックスで取得したChannelListからchannel名に変換し、対象のchannelかどうかを判断しています。対象のChannelについてはパラメータとしました。
対象のChannelの投稿のtext値は、onSayMessage()に出力されますので、Animated Sayなどで発話させることができます。
ボックスのパラメータ
ソース(一部)
def onInput_onStart(self, p):
targetChannel = self.getParameter('channel')
if(targetChannel == ''):
self.logger.error("Parameter 'channel' isnot set")
else:
import json
data = json.loads(p)
# メモリに格納したchannellistを元に投稿されたchannelのid値からname値に変換する
msgChannelName = ''
msgChannelId = data['channel'].encode('utf-8')
msgText = data["text"].encode("utf-8")
#self.logger.info(msgText)
slackChannels=self.memory.getData("SlackSample/Channels")
channelList=json.loads(slackChannels)
for ch in channelList:
id = ch["id"].encode('utf-8')
if( id == msgChannelId):
msgChannelName = ch['name'].encode('utf-8')
break;
# 対象のchannelであれば、メッセージをonSayMessageに出力
if(targetChannel == msgChannelName):
self.onSayMessage(msgText)
self.onStopped()
最後に
ほぼリアルタイムで投稿された内容でPepperが喋るので、WebSocketってすごい!と思いました。応用として、離れた人とのコミュニケーションや、投稿をコマンドに見立てて、Pepperそのものを遠隔操作してしまうなどといったことに使えないかなぁと思っています。こんな風に使ってみたよとかありましたらコメントいただけると幸いです。