たまたまnulabのTypetalkというアプリのAPI( https://developer.nulab-inc.com/ja/docs/typetalk )をいじる機会があったのでwebsocketの実装を行ってみた。
pythonでやったことがなかったのでメモ
##開発環境
MaxOSX 10.10.3
Python 2.7.9
##インストール
pythonで必要なパッケージをインストールする
pip install websocket-client
を実行してpythonのwebsocket-clientをインストールする。
##websocketの実装
今回,Typetalkを使ったためtype talkからaccess tokenを取得,websocketclientを定義するところを紹介
このコードによって登録ユーザーの全ての挙動を確認することができるようになる,
main.py
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import os
import sys
import urllib
import urllib2
import json
import string
import websocket
import requests
class TypeTalkSample:
access_token = None
def __init__(self):
# typetalkからaccess tokenを取得する
client_id = "xxxxxxx"
client_secret = "xxxxxxx"
params = {
'client_id': client_id,
'client_secret': client_secret,
'grant_type': 'client_credentials',
'scope': 'topic.read,topic.post',
}
data = urllib.urlencode(params)
res = urllib2.urlopen(urllib2.Request('https://typetalk.in/oauth2/access_token', data))
# access tokenを保持する
self.access_token = json.load(res)['access_token']
# websocketを定義する,ここでheaderにauthorizationを登録することでaccess tokenを使ったwebsocketを設定することができる。
websocket.enableTrace(True)
ws = websocket.WebSocketApp("wss://typetalk.in/api/v1/streaming", header=["Authorization: Bearer %s" % self.access_token], on_open=self.on_open, on_message=self.on_message, on_error=self.on_error, on_close=self.on_close)
# websocketを起動する。Ctrl+Cで終了するようにする。
try:
ws.run_forever()
except KeyboardInterrupt:
ws.close()
# ここで定義したメソッドがwebsocketのコールバック関数になる。
# messageをうけとったとき
def on_message(self, ws, message):
print message
# エラーが起こった時
def on_error(self, ws, error):
print error
# websocketを閉じた時
def on_close(self, ws):
print 'disconnected streaming server'
# websocketを開いた時
def on_open(self, ws):
print 'connected streaming server'
if __name__ == "__main__":
typetalk = TypeTalkSample()
リンク
Typetalk api (https://developer.nulab-inc.com/ja/docs/typetalk)