0
0

misskeyのストリーミングAPI単体のラッパーを作ったので紹介する

Posted at

こんにちは、六角レンチです。
今回は、misskeyのストリーミングAPI単体のラッパーであるBromineCoreを作ったので使い方を書こうと思います。

元になった記事

BromineCoreには元になった記事があります

この記事を元にBromineCoreは作られています(といっても原型はほぼない)
最近更新が入ったっぽいので見てみるのもいいかも

参考になるもの

公式のストリーミングAPIのドキュメントです。
一回読み通すのをおすすめします

ぶろみねくんです
BromineCoreはこのbotのコア部分の実装です
ただリファクタリングとかしてないし適当に書いているので見づらいです
絵文字追加やアナウンスに反応したり、じゃんけんも不定期に開催してリバーシ対戦もできます

インストール

PYPIに登録してあるので以下のコマンドで簡単にインストールできます

pip install brominecore

依存しているライブラリはwebsocketsだけです
また、対応しているバージョンは3.9以降です。

まずはノートを見てみよう

接続先はmisskey.ioにします
トークンは特に必要ありませんが、もし付けたい場合、初期化時の引数tokenに渡すことでつけることができます

import asyncio

from brcore import Bromine


INSTANCE = "misskey.io"
brm = Bromine(instance=INSTANCE)


async def on_note(note: dict):
    print("-"*10)  # 仕切り
    body = note["body"]  # body情報だけほしい
    name = body["user"]["name"] if body["user"]["name"] is not None else body["user"]["username"]
    if body["text"] is None and body["renoteId"] is not None and len(body["files"]) == 0:
        # リノート
        print(f"{name}がリノート")
        await on_note({"body": body["renote"]})
        return
    else:
        # 普通のノート
        print(f"{name}がノート")
        print(body['text'])
        if len(body["files"]) != 0:
            print(f"files: {len(body['files'])}")
        print(f"reply: {body['repliesCount']}, renote: {body['renoteCount']}, reaction: {body['reactionCount']}")
        print("-"*10)  # 仕切り
    print()  # 最後に改行入れとく


brm.ws_connect("localTimeline", on_note)

try:
    asyncio.run(brm.main())
except KeyboardInterrupt:
    print("finish...")

非同期関数on_notelocalTimelineに接続させる感じです
Bromine.mainを開始すると処理を始めます

on_noteは非同期関数とは名ばかりで同期的な処理をしているだけですが、ws_connectには非同期関数が必要なので非同期関数にしています

こんな感じに表示されます
image.png

add_ws_connectにはパラメーターを乗せることもできます。
例えば、reversiGameチャンネルの場合gameIdパラメーターが必要です。

また、ws_connectにはデコレータ版としてws_connect_decoが用意されています
使い方は非同期関数の上に接続したいチャンネルを渡してくっつけるだけです
パラメーターを乗せることはできません

@brm.ws_connect_deco("localTimeline")
async def on_note(note: dict):

チャンネルから切断するには、ws_disconnectを使います。
この時、識別idが必要です
(ws_connectの返り値は識別idになっている。idを指定することも可能。デコレータにはないので注意)

再接続時に何か実行させる

websocketはよく切れます。

BromineCoreは自動で再接続するのであまり心配はいらないですが、再接続した時に何か実行したりしたいときがあると思います
そのために、add_comebackというメソッドが用意してあります。

async def on_comeback():
    print("再接続完了!")


brm.add_comeback(on_comeback)

こちらもデコレーター版としてadd_comeback_decoが用意されています

@brm.add_comeback_deco()
async def on_comeback():
    print("再接続完了!")

websocketに接続して交信を開始する前に実行したい場合には、引数としてblockをTrueにすることでブロッキングに実行させることができます

async def on_comeback():
    print("再接続完了!")


brm.add_comeback(on_comeback, block=True)

また、ws_disconnectと同様に解除するためdel_comebackが用意してあります。
ws_disconnectと同様、識別idが必要です。(注意事項はws_connectと同様)

ノートの講読

ノートについたリアクションをリアルタイムで検知できます。

例えば...misskey.ioはドスケベ破廉恥な絵が多いことで有名です(要出典)
そこで、特定のリアクションがついたファイル付きノートを検出する物を作ってみましょう

class DosukebeWatcher:
    _DOSUKEBE_REACTIONS = set((
        "dosukebe",
        "dosukebefes",
        "doecchi",
        "etti_da_ooo",
        "etti_dane",
        "eti",
    ))

    def __init__(self, noteid: str) -> None:
        self.noteid = noteid
        self._wait_event = asyncio.Event()

    async def on_note_updated(self, info: dict):
        """ノートの情報を処理する"""
        if info["type"] == "reacted":
            reaction = info["body"]["reaction"].strip(":").rstrip("@.")
            if reaction in self._DOSUKEBE_REACTIONS:
                # ドスケベな絵の可能性が高い!!!!
                print("ドスケベな絵がある可能性を検出!!!!")
                print(f"noteid: {self.noteid}")
                self._wait_event.set()

    async def wait_timer(self):
        """タイマー"""
        await asyncio.sleep(30)
        self._wait_event.set()
        print("すけべじゃなかったな...")

    async def wait(self):
        """待たせるやつ"""
        await self._wait_event.wait()


@brm.ws_connect_deco("localTimeline")
async def on_note(note: dict):
    if len((files := note["body"]["files"])) != 0 and any(i["isSensitive"] for i in files):
        # センシティブ判定の絵が一つでもついている時
        print("センシティブな絵がついたノートを検出...")
        noteid = note["body"]["id"]

        sukebe = DosukebeWatcher(noteid)
        timer = asyncio.create_task(sukebe.wait_timer())

        brm.ws_subnote(noteid, sukebe.on_note_updated)  # 講読を開始
        await sukebe.wait()
        brm.ws_unsubnote(noteid)  # 講読を外す

        # タイマーの処理
        timer.cancel()
        try:
            await timer
        except asyncio.CancelledError:
            pass

DosukebeWatcher.on_note_updatedがノートの情報を処理する関数。

なんか複雑に見えますが、フラッグが立つのを待ってるだけです。
フラッグが立つ条件として

  • 一定の時間経つか(DosukebeWatcher.wait_timer)
  • 破廉恥なリアクションが来るか(DosukebeWatcher.on_note_updated)

の二つがある感じ

すけべなノートを検出するとこんな感じになります
Screenshot_20240828_025623.png
センシティブな画像が貼ってあるのに、破廉恥なリアクションがつかなかったらすけべじゃなかったな...を出力します

検出したノート

検出したノートはこちら:eyes::eyes::eyes::eyes::eyes::eyes::eyes::eyes:
破廉恥すぎるので注意:eyes::eyes::eyes::eyes::eyes::eyes::eyes::eyes:
https://misskey.io/notes/9xgalbnsx3nm01ft



破廉恥画像検出プログラムは一旦置いて、ws_unsubnoteはnoteidが識別idになります

また、このws_subnoteにもデコレーター版であるws_subnote_decoが一応存在します。

@brm.ws_subnote_deco("hoge_note_id")
async def on_note_updated(info: dict):
    ...

色々な変数

BromineCoreにはアクセス可能な変数が何個か用意されています

cooltime

int
接続が切れてから再接続するまで待つ秒数です。
デフォルト値は5
変更が可能です

5回以上連続で接続に失敗している場合、この変数によらず30秒待ちます。

is_running

bool
Bromine.mainが実行されているか
アクセスのみ可能です

loglevel

int
BromineCoreのloggerのloglevelを指定するもの
デフォルト値はlogging.DEBUG
変更が可能です

expect_info_func

websocketの受信において、辞書にない場所から送られてきた情報を処理させる関数
変更が可能
非同期関数でなければなりません。

特定の情報を処理する場合(typeがなにかわかっている場合)、後述する低レベルAPIのws_type_idを推奨します

低レベルAPI

低レベルAPIとして、

  • add_ws_reconnectとその逆のdel_ws_reconnect
  • add_ws_type_idとその逆のdel_ws_type_id
  • ws_send

の三種類が存在します

ws_reconnect

再接続した際、websocketに情報を送る

引数は
add_ws_reconnectの場合、[type、識別id、body情報]
del_ws_reconnectの場合、[type、識別id]
が必要

ws_type_id

受信した際、typeと識別idをもとに関数を実行させる
引数は
add_ws_type_idの場合、[type、識別id、非同期関数]
del_ws_type_idの場合、[type、識別id]
が必要

また、識別idをALLMATCHにすることで、ワイルドカードにすることが可能。

例えば、絵文字追加を検出したい場合、emojiAddedが送られてくるので、下のようにする

async def on_emoji_add(info: dict):
    print("絵文字追加!")
    print(f"追加された絵文字の名前: {info['emoji']['name']}")


brm.add_ws_type_id("emojiAdded", "ALLMATCH", on_emoji_add)

ws_send

websocketに情報を送る
正確には、websocketに情報を送るキューに情報を送る

Bromine.mainが実行されていない場合、RuntimeErrorを送出する

引数は、[type, body情報]

さいごに

ストリーミングAPI単体の実装がなかったので作ってみました

作ってから気づきましたが、低レベルAPIって勝手にできるものなんですね:thinking:
特に意識してなかったのに、いつの間にかws_connectws_subnoteは低レベルAPIを利用する感じになってました
動きをわかりやすくしようとすると分かれるのかな?

また、この記事はBromineCoreのドキュメントみたいな感じで作っていますが、作るにあたって苦労したところや疑問を感じたところ、なぜストリーミングAPI単体の実装なのか等を別で記事にしてみたいなと思っています

おまけ

実はこの記事書いている途中でreconnectrecconectと書き間違えていることに気づき、それを直すためだけに新しくBromineCore v0.14をリリースして、なおったと思ってよく見てみたらreconectに間違えて修正してしまい、さらに追加でBromineCore v0.15をリリースする羽目になりました(泣きそう)

0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0