Misskeyのチャットbotを作るつもりがいつの間にか端末クライアントを作ってたので、あまり深入りしないためにここに公開しておきます。端末内に画像表示は色々試したものの未だ鬼門みたいで、面倒になって画像/動画表示のサブウインドウにChromiumを使ったら何か良い感じになった気がします(少しバグってるかも)。
書き込みにも一応対応してるものの、入力の操作とか長文の入力の表示とかがこんがらがったので簡易対応です。
sudo apt install python3-selenium # chromium-chromedriver
pip install Misskey.py asyncio websockets json
python misskey_client.py
# SPDX-License-Identifier: Apache-2.0
import asyncio
import json
import websockets
from misskey import Misskey
TOKEN='ここにトークン'
mi = Misskey('misskey.io', i=TOKEN)
RED = "\033[1;31m"
BLUE = "\033[1;34m"
CYAN = "\033[1;36m"
GREEN = "\033[0;32m"
RESET = "\033[0;0m"
BOLD = "\033[;1m"
REVERSE = "\033[;7m"
CURSOR_UP = "\033[A"
CURSOR_DOWN = '\033[B'
CURSOR_RIGHT = '\033[C'
CURSOR_LEFT = '\033[D'
CLEAR_LINE = "\033[2K"
CURSOR_RESET = "\033[G"
CLEAR_AFTER = "\x1b[J"
#user_id = mi.i()['id']
user_name = mi.i()['name']
WS_URL='wss://misskey.io/streaming?i='+TOKEN
import subprocess
# Get the position of the terminal window
result = subprocess.check_output("xwininfo -id $(xdotool getactivewindow) | grep 'Absolute upper-left X:'", shell=True).decode("utf-8")
terminal_x = int(result.split(":")[1].strip())
result = subprocess.check_output("xwininfo -id $(xdotool getactivewindow) | grep 'Absolute upper-left Y:'", shell=True).decode("utf-8")
terminal_y = int(result.split(":")[1].strip())
result = subprocess.check_output("xwininfo -id $(xdotool getactivewindow) | grep 'Width:'", shell=True).decode("utf-8")
terminal_width = int(result.split(":")[1].strip())
chrome_x = terminal_x
chrome_y = terminal_y
# Use chronium as co-window!
from selenium import webdriver
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.chrome import service as fs
from selenium.common.exceptions import WebDriverException
driver = None
def open_cowindow():
chrome_options = Options()
chrome_options.add_experimental_option("excludeSwitches", ['enable-automation'])
chrome_options.add_argument("--app=data:,")
global driver
chrome_service = fs.Service(executable_path="/usr/lib/chromium-browser/chromedriver")
driver = webdriver.Chrome(service=chrome_service , options=chrome_options)
script = f"""
window.resizeTo(400, 400);
window.moveTo({chrome_x}, {chrome_y});
document.title="ここに画像と動画が表示されます。"
"""
driver.execute_script(script)
driver.execute_script(script) # Omajinai
open_cowindow()
def update_driver_url(url, title=None):
for i in range(2):
try:
# driver.execute_script('location.href="%s";'%url)
driver.get(url)
if title:
driver.execute_script('document.title="%s";'%title.split("\n")[0])
except WebDriverException:
open_cowindow()
import termios
import os, signal, sys, threading
old_settings = termios.tcgetattr(sys.stdin)
new_settings = termios.tcgetattr(sys.stdin)
new_settings[3] &= ~(termios.ICANON | termios.ECHO | termios.ECHOE) # for handling key input and display
termios.tcsetattr(sys.stdin, termios.TCSANOW, new_settings)
def sigint_handler(sig, frame):
sys.stdout.write(RESET + CURSOR_DOWN + CURSOR_RESET + CLEAR_AFTER)
sys.stdout.flush()
termios.tcsetattr(sys.stdin, termios.TCSANOW, old_settings)
# print("Ctrl+C pressed. Exiting gracefully.")
sys.exit(128 + int(signal.SIGINT))
signal.signal(signal.SIGINT, sigint_handler)
stdout_lock = threading.Lock() # protection for sys.stdout, user_text and cursor_str_pos
user_text = ""
cursor_str_pos = 0 # cursor position on user_text
prefix = user_name + ": "
cursor_position_in_display = len(prefix)
user_text_raw_position_in_display = 0
def remove_char_at_position(input_string, position):
if 0 <= position < len(input_string):
new_string = input_string[:position] + input_string[position+1:]
return new_string
else:
return input_string
def insert_char_at_position(input_string, char_to_insert, position):
if 0 <= position <= len(input_string):
return input_string[:position] + char_to_insert + input_string[position:]
else:
return input_string
import shutil, wcwidth
def input_loop():
global user_text
global cursor_str_pos
global cursor_position_in_display
global user_text_raw_position_in_display
global prefix
input_buf = b""
tmp_buf = b""
while True:
term_width, _ = shutil.get_terminal_size()
prev_text = user_text[0:cursor_str_pos]
c = sys.stdin.buffer.raw.read(1)
with stdout_lock:
if c == b"\n":
mi.notes_create(user_text, visibility="followers", local_only=True) # XXX: まだテスト
cursor_str_pos = 0
user_text = ""
elif c == b"\x7f":
user_text = input_buf.decode("utf-8", errors="ignore")
if cursor_str_pos > 0:
user_text = remove_char_at_position(user_text, cursor_str_pos-1)
cursor_str_pos -= 1
input_buf = user_text.encode("utf-8")
elif c == b'\x1b':
user_text = input_buf.decode("utf-8", errors="ignore")
input_buf = user_text.encode("utf-8")
c2 = sys.stdin.buffer.raw.read(1)
c3 = sys.stdin.buffer.raw.read(1)
if c2 == b'[':
if c3 == b'A': # UP
pass
elif c3 == b'B': #DOWN
pass
elif c3 == b'C': #RIGHT
pass
elif c3 == b'D': #LEFT
pass
elif c3 == b'3': #DEL
c4 = sys.stdin.buffer.raw.read(1)
if c4 == b'~':
if cursor_str_pos < len(user_text):
user_text = remove_char_at_position(user_text, cursor_str_pos)
input_buf = user_text.encode("utf-8")
elif c3 == b'H': #HOME
pass
elif c3 == b'F': #END
pass
else: # TODO: Ctrl+H handling?
tmp_buf += c
tmp_text = tmp_buf.decode("utf-8", errors="ignore")
if len(tmp_text):
user_text = insert_char_at_position(user_text, tmp_text, cursor_str_pos)
cursor_str_pos += 1
tmp_buf = b""
input_buf = user_text.encode("utf-8")
stdout_write_override("")
sys.stdout.flush()
input_thread = threading.Thread(target=input_loop)
input_thread.daemon = True
input_thread.start()
def str_to_term_str(s, prefix):
term_width, _ = shutil.get_terminal_size()
plen = wcwidth.wcswidth(prefix)
l = 0
r = s
for i, c in enumerate(s):
clen = wcwidth.wcwidth(c)
if l + clen < term_width - plen:
l += clen
else:
r = s[0:i]
break
return r
#added_len = 0
tail_ln = False
def stdout_write_override(s):
with stdout_lock:
_stdout_write(CLEAR_LINE + CURSOR_RESET + CURSOR_UP)
_stdout_write(s)
_stdout_write(RED + "\n" + prefix + RESET + str_to_term_str(user_text, prefix))
# _stdout_write(CURSOR_RESET + CURSOR_RIGHT*cursor_position_in_display)
_stdout_write = sys.stdout.write
sys.stdout.write = stdout_write_override
#https://misskey-hub.net/docs/api/streaming
even_note = False
def show_notes(data):
note = data["body"]["body"]
if "files" in note and len(note["files"]):
update_driver_url(note["files"][0]["url"], str(note["text"]))
global even_note
note_template = GREEN + "%s" + RESET + ":" + (BLUE if even_note else CYAN) + " %s"
even_note = not even_note
print(note_template%(note["user"]["name"], str(note["text"])))
async def runner():
while True:
try:
async with websockets.connect(WS_URL) as ws:
await ws.send(json.dumps({
"type": "connect",
"body": {
"channel": "localTimeline",
"id": "test"
}
}))
while True:
data = json.loads(await ws.recv())
show_notes(data)
except websockets.exceptions.ConnectionClosedOK:
print("Connection closed with code 1001. Retrying in a moment...")
await asyncio.sleep(10) # Add a delay before retrying
except Exception as e:
print(f"An error occurred: {e}")
asyncio.get_event_loop().run_until_complete(runner())
``