背景
これまで、Twythonを使用してTwitterのDirect Messageをやりとりする私的なbotを作成していた。
(Raspberry Pi上で動かす予定)
- Raspberry PiをTwitter botにして家のグローバルIPを調べさせる
- TwitterのUser Streamが廃止になったけどTwythonのStreamerはどうなっているか調べてみた
- Twython3.7.0でDM取得してみる(廃止されたTwitterのUserStreamをpollingで書き直してみる)
前回の記事で、TwitterAPI側の仕様変更にTwythonが追随できておらず、ライブラリソースに手を入れてなんとか動かそうとしていた。
が、どうも色々と変更しなければ動かない部分が多くあり、またソース見る限りではそんなに複雑なこともしていないので、
Requestsをそのまま使ってTwitterAPIを叩いてもそんなに大変じゃないのではないか、つまりTwython使わなくてもよくね?と思い始めてきた。
目的
- Requestsを使ってTwitterのDM botを作成する。
- bot自体の機能や目的はこの記事
- TwitterAPIの仕様変更でストリーミングAPIが使えなくなったため、レート制限に引っかからない程度の間隔でポーリングする。そのあたりの経緯はここ
TwitterAPIの呼び出し方
とりあえず公式ドキュメントを参照する。
使う機能はDM受信、DM送信、ユーザID取得あたり。
基本的にはrequestsのget/postメソッドに適切な引数をつけて呼び出せばOK。JSON形式で結果が返ってくるので、辞書形式に変換して使う。
TwitterAPI呼び出し準備
以前の記事に書いたように、Twitter側でdeveloper accountを作成してConsumer Key, Consumer Secret, Access Token, Access Token Secretの4つのIDを取得しておく。
プログラムの冒頭でこれらのIDを使って、OAuth1Session
を初期化する。
まず必要なライブラリをimport
import requests
import json
from requests_oauthlib import OAuth1Session
コンストラクタで初期化(実際のコードでは、取得したIDを代入する)
self.twitter = OAuth1Session(consumer_key, consumer_secret, access_token, access_token_secret)
self.pollingTime = 70
self.headers = {"content-type": "application/json"}
DM取得は15分に15回以上?叩くとレート超過でエラーが返るっぽいので、安全のため70秒間隔でポーリングすることにする。chat botとは言いがたいレスポンスの悪さだが、まぁ自分用なので気にしないことにする。
リクエストヘッダは毎回同じなのでここで定義してしまう。
DM受信
def receive_dm(self):
url = "https://api.twitter.com/1.1/direct_messages/events/list.json"
ret_json = self.twitter.get(url, headers = self.headers)
return json.loads(ret_json.text)
公式のドキュメントを読んでその通りに実装。引数無しで、直近20メッセージ?がリスト形式で取得できる。
データ形式がわかりにくいが、実際ダンプしてみると以下のようになっているようだ。
- "event"はDMのリストになっていて、
- 各DMは"created_timestamp" "id" "message_create" "type"の各要素を持っている。
- "created_timesamp"は単調増加するタイムスタンプ
- "id"はDM自体を一意に識別するID
- "message_create"はさらに子要素を持っていて、メッセージの中身はここ
- "message_create"->"target"->"recipient_id" 受信者のユーザID(スクリーンネームではない)
- "message_create"->"sender_id" 送信者のユーザID
- "message_create"->"message_data"->"text" DM本文
今回使うのはこのあたりだけでよさそう。
DM送信
def send_dm(self, target, msg_text):
url = "https://api.twitter.com/1.1/direct_messages/events/new.json"
param = {"event": {"type": "message_create", "message_create": {"target": {"recipient_id": target}, "message_data": {"text": msg_text}}}}
self.twitter.post(url, headers = self.headers, data = json.dumps(param))
POSTメッセージのdata要素に、JSON形式に変換した引数を放り込んでやればOK。
- "event"->"type" には"message_create"という文字列を入れておく
- "event"->"message_create"->"target"->"recipient_id" に受信者のユーザID
- "event"->"message_create"->"message_data"->"text" DM本文
json.dumps()
で文字列展開して、post()
に渡す
ユーザID取得
Twitterスクリーンネーム(@で始まるやつ)からユーザIDを取得する。
def get_user_screen_name(self, user_id):
url = "https://api.twitter.com/1.1/users/show.json?user_id=" + user_id
ret_json = self.twitter.get(url, headers = self.headers)
return (json.loads(ret_json.text))["screen_name"]
引数を与えるのが面倒なので、URLにそのまま含めてしまう。
JSON形式で戻ってくるので、文字列変換した物をjson.loads()
に食わせてPython辞書形式に変換。
Bot実装
特に難しいことはないが、polling形式なので既に受信済みのDMは処理しないこと、polling間隔中に複数のDMを受信した場合の処理、DM受信APIでは自分自身の発言も取得されてしまうので無視すること、あたりを気をつければOK。
起動直後の処理
ret = self.receive_dm()
last_timestamp = max(i["created_timestamp"] for i in ret["events"])
とりあえず1回DMを受信して、受信できたDMの中でもっとも大きい(新しい)タイムスタンプを記憶しておく。
ここで記憶したタイムスタンプより新しいDMだけを処理の対象とする。
メインループ
while True:
messages = ret["events"]
new_messages = [m for m in messages if m["created_timestamp"] > last_timestamp]
new_messages.sort(key=(lambda x: x["created_timestamp"]))
for m in new_messages:
sender_name = self.get_user_screen_name(m["message_create"]["sender_id"])
msg_text = m["message_create"]["message_data"]["text"]
if sender_name != my_screen_name:
ret_msg = self.dispatch(msg_text)
self.send_dm(m["message_create"]["sender_id"], ret_msg)
last_timestamp = m["created_timestamp"]
time.sleep(self.pollingTime)
ret = self.receive_dm()
やっていることを簡単に列挙すると、
- 受信したDMリストから、記憶されているタイムスタンプより新しいもののみを抽出
- タイムスタンプ順に並べ替える
- 処理対象DMの送信者IDを見て、自分以外だったらDM本文を取得して処理関数
dispatch()
に回す - タイムスタンプを更新する
- polling間隔だけスリープ
- 最新のDMリストを受信
メッセージ処理関数
def dispatch(self, cmd_text):
if cmd_text == "exit":
exit()
elif cmd_text.startswith("echo "):
return cmd_text[5:]
elif cmd_text == "ip":
return requests.get("http://inet-ip.info/ip").text
以前の記事から更新してない。
Botに機能追加するときはここ。
ログ出力関数
def log(self, log_msg):
log_str = "{0:[%m/%d %H:%M.%S]} ".format(datetime.datetime.now()) + log_msg + "\n"
print(log_str)
self.log_fp.write(log_str)
デバッグ用。現在のタイムスタンプを付加して、ログメッセージを標準出力とファイルに出力する。
ソースコード全貌
Consumer Key, Consumer Secret, Access Token, Access Token Secretはソース中では ***** と表記
#!/usr/bin/python3
import requests
import time
import json
import datetime
from requests_oauthlib import OAuth1Session
consumer_key = "*****"
consumer_secret = "*****"
access_token = "*****"
access_token_secret = "*****"
my_screen_name = "*****"
log_path = "*****"
class MyBot:
def __init__(self):
self.twitter = OAuth1Session(consumer_key, consumer_secret, access_token, access_token_secret)
self.pollingTime = 70
self.headers = {"content-type": "application/json"}
# ログ出力ファイルをオープン
self.log_fp = open(log_path, mode="a")
def __del__(self):
self.log_fp.close()
# DM受信
def receive_dm(self):
url = "https://api.twitter.com/1.1/direct_messages/events/list.json"
ret_json = self.twitter.get(url, headers = self.headers)
return json.loads(ret_json.text)
# DM送信
def send_dm(self, target, msg_text):
url = "https://api.twitter.com/1.1/direct_messages/events/new.json"
param = {"event": {"type": "message_create", "message_create": {"target": {"recipient_id": target}, "message_data": {"text": msg_text}}}}
self.twitter.post(url, headers = self.headers, data = json.dumps(param))
# ユーザID取得
def get_user_screen_name(self, user_id):
url = "https://api.twitter.com/1.1/users/show.json?user_id=" + user_id
ret_json = self.twitter.get(url, headers = self.headers)
return (json.loads(ret_json.text))["screen_name"]
def run(self):
# 受信済みのDMタイムスタンプのうち最新のものを記憶しておく
ret = self.receive_dm()
last_timestamp = max(i["created_timestamp"] for i in ret["events"])
self.log("last timestamp: " + last_timestamp)
# メインループ
while True:
messages = ret["events"]
# 処理済みタイムスタンプより新しいDMのみを抽出し、タイムスタンプ順にソート
new_messages = [m for m in messages if m["created_timestamp"] > last_timestamp]
new_messages.sort(key=(lambda x: x["created_timestamp"]))
for m in new_messages:
# 送信者のスクリーンネームを取得
sender_name = self.get_user_screen_name(m["message_create"]["sender_id"])
# 本文取得
msg_text = m["message_create"]["message_data"]["text"]
self.log("timestamp :" + m["created_timestamp"] + m["message_create"]["message_data"]["text"] + " from " + sender_name)
# 自分自身の発言は除外
if sender_name != my_screen_name:
# メッセージ解釈関数に渡す
ret_msg = self.dispatch(msg_text)
self.send_dm(m["message_create"]["sender_id"], ret_msg)
# タイムスタンプを更新
last_timestamp = m["created_timestamp"]
time.sleep(self.pollingTime)
ret = self.receive_dm()
# メッセージ解釈部。将来はココを拡張する
def dispatch(self, cmd_text):
self.log("dispatch: " + cmd_text)
# bot終了
if cmd_text == "exit":
exit()
# デバッグ用。echoで始まるメッセージはオウム返しする
elif cmd_text.startswith("echo "):
return cmd_text[5:]
# "ip"と聞かれたらグローバルIPを調べて返答する
elif cmd_text == "ip":
return requests.get("http://inet-ip.info/ip").text
def log(self, log_msg):
log_str = "{0:[%m/%d %H:%M.%S]} ".format(datetime.datetime.now()) + log_msg + "\n"
print(log_str)
self.log_fp.write(log_str)
if __name__ == "__main__":
bot = MyBot()
bot.run()
完成。
実行
以前の記事と同じ動作をする・・・・はず。