LoginSignup
34
40

More than 5 years have passed since last update.

Pythonでwebsocket clientを作ってみる。(access token認証)

Last updated at Posted at 2015-08-11

たまたまnulabのTypetalkというアプリのAPI( https://developer.nulab-inc.com/ja/docs/typetalk )をいじる機会があったのでwebsocketの実装を行ってみた。
pythonでやったことがなかったのでメモ

開発環境

MaxOSX 10.10.3
Python 2.7.9

インストール

pythonで必要なパッケージをインストールする

pip install websocket-client

を実行してpythonのwebsocket-clientをインストールする。

websocketの実装

今回,Typetalkを使ったためtype talkからaccess tokenを取得,websocketclientを定義するところを紹介
このコードによって登録ユーザーの全ての挙動を確認することができるようになる,

main.py
#!/usr/bin/env python
# -*- coding: utf-8 -*-

import os
import sys
import urllib
import urllib2
import json
import string

import websocket
import requests

class TypeTalkSample:
    access_token = None

    def __init__(self):
        # typetalkからaccess tokenを取得する 
        client_id = "xxxxxxx"
        client_secret = "xxxxxxx"
        params = {
            'client_id': client_id,
            'client_secret': client_secret,
            'grant_type': 'client_credentials',
            'scope': 'topic.read,topic.post',
        }
        data = urllib.urlencode(params)
        res = urllib2.urlopen(urllib2.Request('https://typetalk.in/oauth2/access_token', data))

        # access tokenを保持する
        self.access_token = json.load(res)['access_token']

        # websocketを定義する,ここでheaderにauthorizationを登録することでaccess tokenを使ったwebsocketを設定することができる。
        websocket.enableTrace(True)
        ws = websocket.WebSocketApp("wss://typetalk.in/api/v1/streaming", header=["Authorization: Bearer %s" % self.access_token], on_open=self.on_open, on_message=self.on_message, on_error=self.on_error, on_close=self.on_close)

        # websocketを起動する。Ctrl+Cで終了するようにする。
        try:
            ws.run_forever()
        except KeyboardInterrupt:
            ws.close()

    # ここで定義したメソッドがwebsocketのコールバック関数になる。
    # messageをうけとったとき
    def on_message(self, ws, message):
        print message

    # エラーが起こった時
    def on_error(self, ws, error):
        print error

    # websocketを閉じた時
    def on_close(self, ws):
        print 'disconnected streaming server'

    # websocketを開いた時
    def on_open(self, ws):
        print 'connected streaming server'


if __name__ == "__main__":
    typetalk = TypeTalkSample()

リンク

Typetalk api (https://developer.nulab-inc.com/ja/docs/typetalk)

34
40
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
34
40