こんにちは、六角レンチです。
今回は、misskeyのストリーミングAPI単体のラッパーであるBromineCoreを作ったので使い方を書こうと思います。
元になった記事
BromineCoreには元になった記事があります
この記事を元にBromineCoreは作られています(といっても原型はほぼない)
最近更新が入ったっぽいので見てみるのもいいかも
参考になるもの
公式のストリーミングAPIのドキュメントです。
一回読み通すのをおすすめします
ぶろみねくんです
BromineCoreはこのbotのコア部分の実装です
ただリファクタリングとかしてないし適当に書いているので見づらいです
絵文字追加やアナウンスに反応したり、じゃんけんも不定期に開催してリバーシ対戦もできます
インストール
PYPIに登録してあるので以下のコマンドで簡単にインストールできます
pip install brominecore
依存しているライブラリはwebsocketsだけです
また、対応しているバージョンは3.9以降です。
まずはノートを見てみよう
接続先はmisskey.ioにします
トークンは特に必要ありませんが、もし付けたい場合、初期化時の引数tokenに渡すことでつけることができます
import asyncio
from brcore import Bromine
INSTANCE = "misskey.io"
brm = Bromine(instance=INSTANCE)
async def on_note(note: dict):
print("-"*10) # 仕切り
body = note["body"] # body情報だけほしい
name = body["user"]["name"] if body["user"]["name"] is not None else body["user"]["username"]
if body["text"] is None and body["renoteId"] is not None and len(body["files"]) == 0:
# リノート
print(f"{name}がリノート")
await on_note({"body": body["renote"]})
return
else:
# 普通のノート
print(f"{name}がノート")
print(body['text'])
if len(body["files"]) != 0:
print(f"files: {len(body['files'])}")
print(f"reply: {body['repliesCount']}, renote: {body['renoteCount']}, reaction: {body['reactionCount']}")
print("-"*10) # 仕切り
print() # 最後に改行入れとく
brm.ws_connect("localTimeline", on_note)
try:
asyncio.run(brm.main())
except KeyboardInterrupt:
print("finish...")
非同期関数on_note
をlocalTimeline
に接続させる感じです
Bromine.main
を開始すると処理を始めます
on_note
は非同期関数とは名ばかりで同期的な処理をしているだけですが、ws_connect
には非同期関数が必要なので非同期関数にしています
add_ws_connect
にはパラメーターを乗せることもできます。
例えば、reversiGame
チャンネルの場合gameId
パラメーターが必要です。
また、ws_connect
にはデコレータ版としてws_connect_deco
が用意されています
使い方は非同期関数の上に接続したいチャンネルを渡してくっつけるだけです
パラメーターを乗せることはできません
@brm.ws_connect_deco("localTimeline")
async def on_note(note: dict):
チャンネルから切断するには、ws_disconnect
を使います。
この時、識別idが必要です
(ws_connect
の返り値は識別idになっている。idを指定することも可能。デコレータにはないので注意)
再接続時に何か実行させる
websocketはよく切れます。
BromineCoreは自動で再接続するのであまり心配はいらないですが、再接続した時に何か実行したりしたいときがあると思います
そのために、add_comeback
というメソッドが用意してあります。
async def on_comeback():
print("再接続完了!")
brm.add_comeback(on_comeback)
こちらもデコレーター版としてadd_comeback_deco
が用意されています
@brm.add_comeback_deco()
async def on_comeback():
print("再接続完了!")
websocketに接続して交信を開始する前に実行したい場合には、引数としてblockをTrueにすることでブロッキングに実行させることができます
async def on_comeback():
print("再接続完了!")
brm.add_comeback(on_comeback, block=True)
また、ws_disconnect
と同様に解除するためdel_comeback
が用意してあります。
ws_disconnect
と同様、識別idが必要です。(注意事項はws_connect
と同様)
ノートの講読
ノートについたリアクションをリアルタイムで検知できます。
例えば...misskey.ioはドスケベ破廉恥な絵が多いことで有名です(要出典)
そこで、特定のリアクションがついたファイル付きノートを検出する物を作ってみましょう
class DosukebeWatcher:
_DOSUKEBE_REACTIONS = set((
"dosukebe",
"dosukebefes",
"doecchi",
"etti_da_ooo",
"etti_dane",
"eti",
))
def __init__(self, noteid: str) -> None:
self.noteid = noteid
self._wait_event = asyncio.Event()
async def on_note_updated(self, info: dict):
"""ノートの情報を処理する"""
if info["type"] == "reacted":
reaction = info["body"]["reaction"].strip(":").rstrip("@.")
if reaction in self._DOSUKEBE_REACTIONS:
# ドスケベな絵の可能性が高い!!!!
print("ドスケベな絵がある可能性を検出!!!!")
print(f"noteid: {self.noteid}")
self._wait_event.set()
async def wait_timer(self):
"""タイマー"""
await asyncio.sleep(30)
self._wait_event.set()
print("すけべじゃなかったな...")
async def wait(self):
"""待たせるやつ"""
await self._wait_event.wait()
@brm.ws_connect_deco("localTimeline")
async def on_note(note: dict):
if len((files := note["body"]["files"])) != 0 and any(i["isSensitive"] for i in files):
# センシティブ判定の絵が一つでもついている時
print("センシティブな絵がついたノートを検出...")
noteid = note["body"]["id"]
sukebe = DosukebeWatcher(noteid)
timer = asyncio.create_task(sukebe.wait_timer())
brm.ws_subnote(noteid, sukebe.on_note_updated) # 講読を開始
await sukebe.wait()
brm.ws_unsubnote(noteid) # 講読を外す
# タイマーの処理
timer.cancel()
try:
await timer
except asyncio.CancelledError:
pass
DosukebeWatcher.on_note_updated
がノートの情報を処理する関数。
なんか複雑に見えますが、フラッグが立つのを待ってるだけです。
フラッグが立つ条件として
- 一定の時間経つか(
DosukebeWatcher.wait_timer
) - 破廉恥なリアクションが来るか(
DosukebeWatcher.on_note_updated
)
の二つがある感じ
すけべなノートを検出するとこんな感じになります
センシティブな画像が貼ってあるのに、破廉恥なリアクションがつかなかったらすけべじゃなかったな...
を出力します
検出したノート
検出したノートはこちら
破廉恥すぎるので注意
https://misskey.io/notes/9xgalbnsx3nm01ft
破廉恥画像検出プログラムは一旦置いて、ws_unsubnote
はnoteidが識別idになります
また、このws_subnote
にもデコレーター版であるws_subnote_deco
が一応存在します。
@brm.ws_subnote_deco("hoge_note_id")
async def on_note_updated(info: dict):
...
色々な変数
BromineCoreにはアクセス可能な変数が何個か用意されています
cooltime
int
接続が切れてから再接続するまで待つ秒数です。
デフォルト値は5
変更が可能です
5回以上連続で接続に失敗している場合、この変数によらず30秒待ちます。
is_running
bool
Bromine.main
が実行されているか
アクセスのみ可能です
loglevel
int
BromineCoreのloggerのloglevelを指定するもの
デフォルト値はlogging.DEBUG
変更が可能です
expect_info_func
websocketの受信において、辞書にない場所から送られてきた情報を処理させる関数
変更が可能
非同期関数でなければなりません。
特定の情報を処理する場合(typeがなにかわかっている場合)、後述する低レベルAPIのws_type_id
を推奨します
低レベルAPI
低レベルAPIとして、
-
add_ws_reconnect
とその逆のdel_ws_reconnect
-
add_ws_type_id
とその逆のdel_ws_type_id
ws_send
の三種類が存在します
ws_reconnect
再接続した際、websocketに情報を送る
引数は
add_ws_reconnect
の場合、[type、識別id、body情報]
del_ws_reconnect
の場合、[type、識別id]
が必要
ws_type_id
受信した際、typeと識別idをもとに関数を実行させる
引数は
add_ws_type_id
の場合、[type、識別id、非同期関数]
del_ws_type_id
の場合、[type、識別id]
が必要
また、識別idをALLMATCH
にすることで、ワイルドカードにすることが可能。
例えば、絵文字追加を検出したい場合、emojiAdded
が送られてくるので、下のようにする
async def on_emoji_add(info: dict):
print("絵文字追加!")
print(f"追加された絵文字の名前: {info['emoji']['name']}")
brm.add_ws_type_id("emojiAdded", "ALLMATCH", on_emoji_add)
ws_send
websocketに情報を送る
正確には、websocketに情報を送るキューに情報を送る
Bromine.main
が実行されていない場合、RuntimeError
を送出する
引数は、[type, body情報]
さいごに
ストリーミングAPI単体の実装がなかったので作ってみました
作ってから気づきましたが、低レベルAPIって勝手にできるものなんですね
特に意識してなかったのに、いつの間にかws_connect
やws_subnote
は低レベルAPIを利用する感じになってました
動きをわかりやすくしようとすると分かれるのかな?
また、この記事はBromineCoreのドキュメントみたいな感じで作っていますが、作るにあたって苦労したところや疑問を感じたところ、なぜストリーミングAPI単体の実装なのか等を別で記事にしてみたいなと思っています
おまけ
実はこの記事書いている途中でreconnect
をrecconect
と書き間違えていることに気づき、それを直すためだけに新しくBromineCore v0.14をリリースして、なおったと思ってよく見てみたらreconect
に間違えて修正してしまい、さらに追加でBromineCore v0.15をリリースする羽目になりました(泣きそう)