Edited at

PythonのrequestsでmastodonのStreamingAPIを叩く

More than 1 year has passed since last update.


はじめに

前回の記事 PythonのrequestsでmastodonのAPIを叩く の続きというかオマケという感じの記事です。


実装

Mastodon.py の実装を参考にした部分もありますが、概ねこんな感じでストリームが流れるようになります。(ちなみに、Mastodon.pyのGithubリポジトリの方はstreaming.pyがあって実装されてるっぽいんですが、pipでインストールできるやつはバージョン同じなのにこれが実装されてません)

使い方はdocstringのUsageの部分を見てもらえればと。ちなみに前回の続きなのでaccess_tokenは取得済みという前提です。

# -*- coding: utf-8 -*-

import operator
from urllib.parse import urljoin

import requests

class MstdnStream:
"""Mastodon Steam Class

Usage::

>>> from mstdn import MstdnStream, MstdnStreamListner
>>> listener = MstdnStreamListner()
>>> stream = MstdnStream('https://pawoo.net', 'your-access-token', listener)
>>> stream.public()

"""
def __init__(self, base_url, access_token, listener):
self.base_url = base_url
self.session = requests.Session()
self.session.headers.update({'Authorization': 'Bearer ' + access_token})
self.listener = listener

def public(self):
url = urljoin(self.base_url, '/api/v1/streaming/public')
resp = self.session.get(url, stream=True)
resp.raise_for_status()
event = {}
for line in resp.iter_lines():
line = line.decode('utf-8')

if not line:
# End of content.
method_name = "on_{event}".format(event=event['event'])
f = operator.methodcaller(method_name, event['data'])
f(self.listener)
# refreash
event = {}
continue

if line.startswith(':'):
# TODO: Handle heatbeat
print('startwith ":" {line}'.format(line=line))
else:
key, value = line.split(': ', 1)
if key in event:
event[key] += value
else:
event[key] = value

class MstdnStreamListner:

def on_update(self, data):
print(data)

def on_notification(self, data):
print(data)

def on_delete(self, data):
print("Deleted: {id}".format(id=data))

tootsuite/mastodon/streaming/index.js を見ると、ドキュメントに載ってないエンドポイントがありますね。

  app.get('/api/v1/streaming/user', (req, res) => {

streamFrom(`timeline:${req.accountId}`, req, streamToHttp(req, res), streamHttpEnd(req))
})

app.get('/api/v1/streaming/public', (req, res) => {
streamFrom('timeline:public', req, streamToHttp(req, res), streamHttpEnd(req), true)
})

app.get('/api/v1/streaming/public/local', (req, res) => {
streamFrom('timeline:public:local', req, streamToHttp(req, res), streamHttpEnd(req), true)
})

app.get('/api/v1/streaming/hashtag', (req, res) => {
streamFrom(`timeline:hashtag:${req.params.tag}`, req, streamToHttp(req, res), streamHttpEnd(req), true)
})

app.get('/api/v1/streaming/hashtag/local', (req, res) => {
streamFrom(`timeline:hashtag:${req.params.tag}:local`, req, streamToHttp(req, res), streamHttpEnd(req), true)
})

public が連合タイムラインで public/local がローカルタイムラインになるのかな?この辺は調べてないです。

上の例では public 向けにしか実装してないので、必要なエンドポイントの分だけメソッドを用意してください。ちなみにpathが変わるのとhashtag関係だとクエリパラメータに tag を渡さないといけないのでそれに対応するくらいで、メインロジックはたぶん全く同じになると思います。

いちおう例です。


class MstdnStream:

...

def _dispatch_event(self, event):
method_name = "on_{event}".format(event=event['event'])
f = operator.methodcaller(method_name, event['data'])
f(self.listener)

def _handle_heartbeat(self, line):
# TODO: Handle heatbeat
print(line)

def _run(self, path, params=None):
if params is None:
params = {}
url = urljoin(self.base_url, path)
resp = self.session.get(url, params=params, stream=True)
resp.raise_for_status()

event = {}
for line in resp.iter_lines():
line = line.decode('utf-8')

if not line:
# End of content.
self._dispatch_event(event)

# Format dict
event = {}
continue

if line.startswith(':'):
self._handle_heartbeat(line)
else:
key, value = line.split(': ', 1)
if key in event:
event[key] += value
else:
event[key] = value

def public(self):
self._run('/api/v1/streaming/public')

def local(self):
self._run('/api/v1/streaming/public/local')

def user(self):
self._run('/api/v1/streaming/user')

def hashtag(self, tag):
self._run('/api/v1/streaming/hashtag', params={'tag': tag})

def local_hashtag(self, tag):
self._run('/api/v1/streaming/hashtag/local', params={'tag': tag})

実際に上の例動かしてみて、hashtag関係以外はちゃんと動くんですけど、hashtagの2つだけ、heartbeatコメントが帰ってくるだけでコンテンツが全くヒットしないんですよね…。ブラウザ上ではちゃんと結果が流れてくるタグを指定してるのでタグが悪いわけでもなし。

デバッガで止めて、リクエストしたurlとかパラメータとかヘッダーとか確認したけどおかしそうなところなかったのでサーバーサイドの問題かも。


Streaming Requests

requests には Streaming Requests という便利な機能があるので、これを使えば簡単にstreamを取得できます。

import json

import requests

r = requests.get('http://httpbin.org/stream/20', stream=True)

for line in r.iter_lines():

# filter out keep-alive new lines
if line:
decoded_line = line.decode('utf-8')
print(json.loads(decoded_line))

stream=True でリクエスト送って、レスポンスの iter_lines() を呼ぶだけという簡単さ。


レスポンスの中身

MastodonのStreaming APIのページ にもあるように、

event: <イベント名>

data: <データ>

という3行からなります。3行目は空行で、1コンテンツの終わりを示します。heartbeatにあまり詳しくないんですが、heartbeatコメントとして4行目に :thump というコメントが来る場合もあります。

event: <イベント名>

data: <データ>

:thump

Listenerクラスのほうでイベントの種類ごとにメソッド用意してやって、そのメソッドにdataを渡せばOKです。


おまけ

pawoo.netから✗✗な画像を拾ってくるpythonスクリプト書いた のようなもののStreaming API バージョン

たまに拡張子のない画像があるのですが面倒なのでjpg決め打ちです…

# -*- coding: utf-8 -*-

+ import json
+ import os
+ from urllib.request import urlretrieve

...

+ DEST_DIR = 'images'

class MstdnStream:

...

class MstdnStreamListner:

def on_update(self, data):
- print(data)
+ if not os.path.exists(DEST_DIR):
+ os.mkdir(DEST_DIR)
+
+ status = json.loads(data)
+
+ if status['sensitive']:
+ for media in status['media_attachments']:
+ if media['type'] == 'image':
+ media_url = media['url']
+ ext = os.path.splitext(media_url)[1][:4]
+ if not ext:
+ ext = '.jpg'
+ filename = "{media_id}{ext}".format(media_id=str(media['id']), ext=ext)
+ dest = os.path.join(DEST_DIR, filename)
+ urlretrieve(media_url, dest)
+ print("Downloaded: {dest}".format(dest=dest))

...

みんなよく画像DLするコード書くのに with open('filename', 'w')... とかしてるの見るんですけど、urlretrieve(<元画像URL>, <保存先のローカルファイルパス>) 1行でいいのです。Pythonえらい。


おわりに

やっぱり requests 便利だねってことで。

だいぶ説明端折ってしまってますが、気が向いたら編集します。

あ、mastodonのアカウント、イラスト投稿メインですけど一応載せときますね @kk6@pawoo.net