TwitterのUser Streams APIは廃止されました。本記事のソースはこのままだと動かなくなる可能性があります。
目的
- 海外出張中やら、カフェでドヤリングしているときにたまにVPNで自宅ネットワークにつなぎたくなることがある。
- NASにアクセスしたいとか、おま国とか
- グローバルIP記録しててもたまに変わっててアクセスできなくなる
- DynamicDNSを使うほどの頻度でもない
- 組込み屋としては使ってみなきゃ、と思って昔買ったRaspberryPiがタンスの肥やしになっている
- raspbianをインストールしてSSHでヘッドレス運用できるようにした状態で放置してあった
- Twitter APIとかその手のものを使ってみたいと思っていた
- いつもC/C++ばっかり使ってるのでPythonで遊んでみたかった
やりたいこと
- RaspberryPiを使って、自分用Twitter botを作る
- DMを使って話しかけると、グローバルIPを調べて返してくれる
- 今後、Twitter botをゲートウェイにして家庭内IoTとかいろいろやってみたい
基本構成
こんな感じ
準備(RaspberryPi側)
raspbian StretchがインストールされたRaspberryPiにSSHでログイン。
最初からヘッドレスインストールするやり方もあるけど、以前X含めてインストールしてあったのでそのまま使う
Windows側クライアントはWindows subsystem on LinuxでDebianを入れてあったので、bashからSSHを使うことにする。
以下raspbian上で作業。
Python3を入れる
# apt install python3 python3-pip
今回はTwythonというライブラリを使ってみる。あとはIP取得用のRequests
# pip3 install twython
# pip3 install requests
グローバルIPを調べるにはお手軽に inet-ip.info を使う
>>> import requests
>>> requests.get('http://inet-ip.info/ip').text
'xxx.xxx.xxx.xxx'
>>>
これで取得可能
準備(Twitter側)
アカウント作成
- 自分のTwitterアカウントを使ってもいいけど、今回はbot用に新規アカウントを取得する。
- 遙か昔にIRC1上で、AoE2の仲間内対戦のチーム分けをするbotを作ってたなぁ、というのを思い出してaoemon1と命名。
- 一応鍵付きにして、自分のアカウントと相互フォローしておく。
- ここで電話番号を登録しておかないと、次のアプリ作成で引っかかるらしいので登録しておく。
- メインの電話番号はメインアカウントに紐付け済みだったのでNVMOの適当な番号(SMSが受け取れること)を投入
- SMSが送られてくるので認証
アプリ作成
- アプリ作成ページ (https://apps.twitter.com/) にアクセスしてCreate New App
- アプリ名とかURLとかは適当でいいらしい
- 今回はDMを使用するので、アプリ権限にdirect messageの権限を追加しておく
- Consumer Key, Consumer Secret, Access Token, Access Token Secretを取得する
ソース
Consumer Key, Consumer Secret, Access Token, Access Token Secretは先ほど取得した文字列を使う。
(ソース中では ***** と表記)
Streaming APIを使用すると、自分が発言したDMもストリーミングされてくるので、発言者をチェックして自分の発言には反応しないようにする。
あとは将来の拡張に備えて適当にコマンド解釈部を独立させておく
#!/usr/bin/python3
from twython import TwythonStreamer
from twython import Twython
import requests
consumer_key = '*****'
consumer_secret = '*****'
access_token = '*****'
access_token_secret = '*****'
my_screen_name = 'aoemon1'
twitter = Twython(consumer_key, consumer_secret, access_token, access_token_secret)
class MyStreamer(TwythonStreamer):
# 何かイベントが起きるとon_success()が呼ばれるのでオーバーライドしておく
def on_success(self, data):
# DMだった場合、"direct_message"というkeyが含まれる
if 'direct_message' in data:
#print(data.keys())
msg = data['direct_message']
#print(msg.keys())
# 送り主が自分だった場合は"ハウリング"するので除外
if msg['sender_screen_name'] != my_screen_name:
ret_msg = self.dispatch(msg['text'])
# コマンド解釈部に送る
self.send_dm(msg['sender_screen_name'], ret_msg)
def on_error(self, status_code, data):
print(status_code, data)
# DMを送り返す
def send_dm(self, target, msg_text):
twitter.send_direct_message(screen_name = target, text = msg_text)
# コマンド解釈部。将来はここを拡張する
def dispatch(self, cmd_text):
# bot終了
if cmd_text == 'exit':
exit()
# デバッグ用。echoで始まるメッセージはオウム返しする
elif cmd_text.startswith('echo '):
return cmd_text[5:]
# "ip"と聞かれたらグローバルIPを調べて返答する
elif cmd_text == 'ip':
return requests.get('http://inet-ip.info/ip').text
if __name__ == "__main__":
stream = MyStreamer(consumer_key, consumer_secret, access_token, access_token_secret)
stream.user()
完成!
実行
バックグラウンドで実行、シェルをログアウトした後も実行し続けるようにする
$ nohup ./aoemon_bot.py &
シェルから終了させたいときは、プロセスIDを調べてKILL
$ ps -e
$ kill -9 [PID]
実行結果
こんな感じにIPを聞くと教えてくれる。これでドヤリングが捗るな。
次は野球の速報を伝えてくれるbotでも作ろうかな・・・