LoginSignup
7
6

More than 5 years have passed since last update.

TwitterのDM botをpollingベースで作ってみる(TwythonやめてRequestsでAPI直叩きにしてみた)

Last updated at Posted at 2019-01-05

背景

これまで、Twythonを使用してTwitterのDirect Messageをやりとりする私的なbotを作成していた。
(Raspberry Pi上で動かす予定)

前回の記事で、TwitterAPI側の仕様変更にTwythonが追随できておらず、ライブラリソースに手を入れてなんとか動かそうとしていた。

が、どうも色々と変更しなければ動かない部分が多くあり、またソース見る限りではそんなに複雑なこともしていないので、
Requestsをそのまま使ってTwitterAPIを叩いてもそんなに大変じゃないのではないか、つまりTwython使わなくてもよくね?と思い始めてきた。

目的

  • Requestsを使ってTwitterのDM botを作成する。
  • bot自体の機能や目的はこの記事
  • TwitterAPIの仕様変更でストリーミングAPIが使えなくなったため、レート制限に引っかからない程度の間隔でポーリングする。そのあたりの経緯はここ

TwitterAPIの呼び出し方

とりあえず公式ドキュメントを参照する。
使う機能はDM受信、DM送信、ユーザID取得あたり。

基本的にはrequestsのget/postメソッドに適切な引数をつけて呼び出せばOK。JSON形式で結果が返ってくるので、辞書形式に変換して使う。

TwitterAPI呼び出し準備

以前の記事に書いたように、Twitter側でdeveloper accountを作成してConsumer Key, Consumer Secret, Access Token, Access Token Secretの4つのIDを取得しておく。

プログラムの冒頭でこれらのIDを使って、OAuth1Sessionを初期化する。

まず必要なライブラリをimport

import requests
import json
from requests_oauthlib import OAuth1Session

コンストラクタで初期化(実際のコードでは、取得したIDを代入する)

    self.twitter = OAuth1Session(consumer_key, consumer_secret, access_token, access_token_secret)
    self.pollingTime = 70
    self.headers = {"content-type": "application/json"}

DM取得は15分に15回以上?叩くとレート超過でエラーが返るっぽいので、安全のため70秒間隔でポーリングすることにする。chat botとは言いがたいレスポンスの悪さだが、まぁ自分用なので気にしないことにする。
リクエストヘッダは毎回同じなのでここで定義してしまう。

DM受信

  def receive_dm(self):
    url = "https://api.twitter.com/1.1/direct_messages/events/list.json"
    ret_json = self.twitter.get(url, headers = self.headers)
    return json.loads(ret_json.text)

公式のドキュメントを読んでその通りに実装。引数無しで、直近20メッセージ?がリスト形式で取得できる。
データ形式がわかりにくいが、実際ダンプしてみると以下のようになっているようだ。

  • "event"はDMのリストになっていて、
  • 各DMは"created_timestamp" "id" "message_create" "type"の各要素を持っている。
  • "created_timesamp"は単調増加するタイムスタンプ
  • "id"はDM自体を一意に識別するID
  • "message_create"はさらに子要素を持っていて、メッセージの中身はここ
  • "message_create"->"target"->"recipient_id" 受信者のユーザID(スクリーンネームではない)
  • "message_create"->"sender_id" 送信者のユーザID
  • "message_create"->"message_data"->"text" DM本文

今回使うのはこのあたりだけでよさそう。

DM送信

  def send_dm(self, target, msg_text):
    url = "https://api.twitter.com/1.1/direct_messages/events/new.json"
    param = {"event": {"type": "message_create", "message_create": {"target": {"recipient_id": target}, "message_data": {"text": msg_text}}}}
    self.twitter.post(url, headers = self.headers, data = json.dumps(param))

POSTメッセージのdata要素に、JSON形式に変換した引数を放り込んでやればOK。

  • "event"->"type" には"message_create"という文字列を入れておく
  • "event"->"message_create"->"target"->"recipient_id" に受信者のユーザID
  • "event"->"message_create"->"message_data"->"text" DM本文

json.dumps()で文字列展開して、post()に渡す

ユーザID取得

Twitterスクリーンネーム(@で始まるやつ)からユーザIDを取得する。

  def get_user_screen_name(self, user_id):
    url = "https://api.twitter.com/1.1/users/show.json?user_id=" + user_id
    ret_json = self.twitter.get(url, headers = self.headers)
    return (json.loads(ret_json.text))["screen_name"]

引数を与えるのが面倒なので、URLにそのまま含めてしまう。
JSON形式で戻ってくるので、文字列変換した物をjson.loads()に食わせてPython辞書形式に変換。

Bot実装

特に難しいことはないが、polling形式なので既に受信済みのDMは処理しないこと、polling間隔中に複数のDMを受信した場合の処理、DM受信APIでは自分自身の発言も取得されてしまうので無視すること、あたりを気をつければOK。

起動直後の処理

    ret = self.receive_dm()
    last_timestamp = max(i["created_timestamp"] for i in ret["events"])

とりあえず1回DMを受信して、受信できたDMの中でもっとも大きい(新しい)タイムスタンプを記憶しておく。
ここで記憶したタイムスタンプより新しいDMだけを処理の対象とする。

メインループ

    while True:
      messages = ret["events"]
      new_messages = [m for m in messages if m["created_timestamp"] > last_timestamp]
      new_messages.sort(key=(lambda x: x["created_timestamp"]))
      for m in new_messages:
        sender_name = self.get_user_screen_name(m["message_create"]["sender_id"])
        msg_text = m["message_create"]["message_data"]["text"]
        if sender_name != my_screen_name:
          ret_msg = self.dispatch(msg_text)
          self.send_dm(m["message_create"]["sender_id"], ret_msg)
        last_timestamp = m["created_timestamp"]
      time.sleep(self.pollingTime)
      ret = self.receive_dm()

やっていることを簡単に列挙すると、

  • 受信したDMリストから、記憶されているタイムスタンプより新しいもののみを抽出
  • タイムスタンプ順に並べ替える
  • 処理対象DMの送信者IDを見て、自分以外だったらDM本文を取得して処理関数dispatch()に回す
  • タイムスタンプを更新する
  • polling間隔だけスリープ
  • 最新のDMリストを受信

メッセージ処理関数

  def dispatch(self, cmd_text):
    if cmd_text == "exit":
      exit()
    elif cmd_text.startswith("echo "):
      return cmd_text[5:]
    elif cmd_text == "ip":
      return requests.get("http://inet-ip.info/ip").text

以前の記事から更新してない。
Botに機能追加するときはここ。

ログ出力関数

  def log(self, log_msg):
    log_str = "{0:[%m/%d %H:%M.%S]} ".format(datetime.datetime.now()) + log_msg + "\n"
    print(log_str)
    self.log_fp.write(log_str)

デバッグ用。現在のタイムスタンプを付加して、ログメッセージを標準出力とファイルに出力する。

ソースコード全貌

Consumer Key, Consumer Secret, Access Token, Access Token Secretはソース中では ***** と表記

#!/usr/bin/python3

import requests
import time
import json
import datetime
from requests_oauthlib import OAuth1Session

consumer_key = "*****"
consumer_secret = "*****"
access_token = "*****"
access_token_secret = "*****"

my_screen_name = "*****"
log_path = "*****"

class MyBot:
  def __init__(self):
    self.twitter = OAuth1Session(consumer_key, consumer_secret, access_token, access_token_secret)
    self.pollingTime = 70
    self.headers = {"content-type": "application/json"}

    # ログ出力ファイルをオープン
    self.log_fp = open(log_path, mode="a")

  def __del__(self):
    self.log_fp.close()

  # DM受信
  def receive_dm(self):
    url = "https://api.twitter.com/1.1/direct_messages/events/list.json"
    ret_json = self.twitter.get(url, headers = self.headers)
    return json.loads(ret_json.text)

  # DM送信
  def send_dm(self, target, msg_text):
    url = "https://api.twitter.com/1.1/direct_messages/events/new.json"
    param = {"event": {"type": "message_create", "message_create": {"target": {"recipient_id": target}, "message_data": {"text": msg_text}}}}
    self.twitter.post(url, headers = self.headers, data = json.dumps(param))

  # ユーザID取得
  def get_user_screen_name(self, user_id):
    url = "https://api.twitter.com/1.1/users/show.json?user_id=" + user_id
    ret_json = self.twitter.get(url, headers = self.headers)
    return (json.loads(ret_json.text))["screen_name"]

  def run(self):
    # 受信済みのDMタイムスタンプのうち最新のものを記憶しておく
    ret = self.receive_dm()
    last_timestamp = max(i["created_timestamp"] for i in ret["events"])

    self.log("last timestamp: " + last_timestamp)
    # メインループ
    while True:
      messages = ret["events"]
      # 処理済みタイムスタンプより新しいDMのみを抽出し、タイムスタンプ順にソート
      new_messages = [m for m in messages if m["created_timestamp"] > last_timestamp]
      new_messages.sort(key=(lambda x: x["created_timestamp"]))
      for m in new_messages:
        # 送信者のスクリーンネームを取得
        sender_name = self.get_user_screen_name(m["message_create"]["sender_id"])
        # 本文取得
        msg_text = m["message_create"]["message_data"]["text"]
        self.log("timestamp :" + m["created_timestamp"] + m["message_create"]["message_data"]["text"] + " from " + sender_name)
        # 自分自身の発言は除外
        if sender_name != my_screen_name:
          # メッセージ解釈関数に渡す
          ret_msg = self.dispatch(msg_text)
          self.send_dm(m["message_create"]["sender_id"], ret_msg)
        # タイムスタンプを更新
        last_timestamp = m["created_timestamp"]
      time.sleep(self.pollingTime)
      ret = self.receive_dm()

  # メッセージ解釈部。将来はココを拡張する
  def dispatch(self, cmd_text):
    self.log("dispatch: " + cmd_text)
    # bot終了
    if cmd_text == "exit":
      exit()
    # デバッグ用。echoで始まるメッセージはオウム返しする
    elif cmd_text.startswith("echo "):
      return cmd_text[5:]
    # "ip"と聞かれたらグローバルIPを調べて返答する
    elif cmd_text == "ip":
      return requests.get("http://inet-ip.info/ip").text

  def log(self, log_msg):
    log_str = "{0:[%m/%d %H:%M.%S]} ".format(datetime.datetime.now()) + log_msg + "\n"
    print(log_str)
    self.log_fp.write(log_str)

if __name__ == "__main__":
  bot = MyBot()
  bot.run()

完成。

実行

以前の記事と同じ動作をする・・・・はず。

7
6
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
7
6