0
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 1 year has passed since last update.

Misskeyの端末クライアントを作る

Last updated at Posted at 2023-10-24

Misskeyのチャットbotを作るつもりがいつの間にか端末クライアントを作ってたので、あまり深入りしないためにここに公開しておきます。端末内に画像表示は色々試したものの未だ鬼門みたいで、面倒になって画像/動画表示のサブウインドウにChromiumを使ったら何か良い感じになった気がします(少しバグってるかも)。

書き込みにも一応対応してるものの、入力の操作とか長文の入力の表示とかがこんがらがったので簡易対応です。

sudo apt install python3-selenium # chromium-chromedriver
pip install Misskey.py asyncio websockets json
python misskey_client.py
# SPDX-License-Identifier: Apache-2.0
import asyncio
import json
import websockets
from misskey import Misskey

TOKEN='ここにトークン'
mi = Misskey('misskey.io', i=TOKEN)


RED   = "\033[1;31m"  
BLUE  = "\033[1;34m"
CYAN  = "\033[1;36m"
GREEN = "\033[0;32m"
RESET = "\033[0;0m"
BOLD    = "\033[;1m"
REVERSE = "\033[;7m"

CURSOR_UP = "\033[A"
CURSOR_DOWN = '\033[B'
CURSOR_RIGHT = '\033[C'
CURSOR_LEFT = '\033[D'

CLEAR_LINE = "\033[2K"
CURSOR_RESET = "\033[G"

CLEAR_AFTER = "\x1b[J"

#user_id = mi.i()['id']
user_name = mi.i()['name']

WS_URL='wss://misskey.io/streaming?i='+TOKEN



import subprocess

# Get the position of the terminal window
result = subprocess.check_output("xwininfo -id $(xdotool getactivewindow) | grep 'Absolute upper-left X:'", shell=True).decode("utf-8")
terminal_x = int(result.split(":")[1].strip())
result = subprocess.check_output("xwininfo -id $(xdotool getactivewindow) | grep 'Absolute upper-left Y:'", shell=True).decode("utf-8")
terminal_y = int(result.split(":")[1].strip())
result = subprocess.check_output("xwininfo -id $(xdotool getactivewindow) | grep 'Width:'", shell=True).decode("utf-8")
terminal_width = int(result.split(":")[1].strip())

chrome_x = terminal_x
chrome_y = terminal_y

# Use chronium as co-window!
from selenium import webdriver
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.chrome import service as fs
from selenium.common.exceptions import WebDriverException
driver = None
def open_cowindow():
    chrome_options = Options()
    chrome_options.add_experimental_option("excludeSwitches", ['enable-automation'])
    chrome_options.add_argument("--app=data:,")
    global driver
    chrome_service = fs.Service(executable_path="/usr/lib/chromium-browser/chromedriver")
    driver = webdriver.Chrome(service=chrome_service , options=chrome_options)
    script = f"""
    window.resizeTo(400, 400);
    window.moveTo({chrome_x}, {chrome_y});
    document.title="ここに画像と動画が表示されます。"
"""
    driver.execute_script(script)
    driver.execute_script(script) # Omajinai

open_cowindow()

def update_driver_url(url, title=None):
    for i in range(2):
        try:
#            driver.execute_script('location.href="%s";'%url)
            driver.get(url)
            if title:
                driver.execute_script('document.title="%s";'%title.split("\n")[0])
        except WebDriverException: 
            open_cowindow()



import termios
import os, signal, sys, threading
old_settings = termios.tcgetattr(sys.stdin)
new_settings = termios.tcgetattr(sys.stdin)
new_settings[3] &= ~(termios.ICANON | termios.ECHO | termios.ECHOE) # for handling key input and display
termios.tcsetattr(sys.stdin, termios.TCSANOW, new_settings)

def sigint_handler(sig, frame):
    sys.stdout.write(RESET + CURSOR_DOWN + CURSOR_RESET + CLEAR_AFTER)
    sys.stdout.flush()
    termios.tcsetattr(sys.stdin, termios.TCSANOW, old_settings)
#    print("Ctrl+C pressed. Exiting gracefully.")
    sys.exit(128 + int(signal.SIGINT))

signal.signal(signal.SIGINT, sigint_handler)

stdout_lock = threading.Lock() # protection for sys.stdout, user_text and cursor_str_pos

user_text = ""
cursor_str_pos = 0 # cursor position on user_text
prefix = user_name + ": "
cursor_position_in_display = len(prefix)
user_text_raw_position_in_display = 0

def remove_char_at_position(input_string, position):
    if 0 <= position < len(input_string):
        new_string = input_string[:position] + input_string[position+1:]
        return new_string
    else:
        return input_string 

def insert_char_at_position(input_string, char_to_insert, position):
    if 0 <= position <= len(input_string):
        return input_string[:position] + char_to_insert + input_string[position:]
    else:
        return input_string

import shutil, wcwidth


def input_loop():
    global user_text
    global cursor_str_pos
    global cursor_position_in_display
    global user_text_raw_position_in_display
    global prefix
    input_buf = b""
    tmp_buf = b""
    while True:
        term_width, _ = shutil.get_terminal_size()
        prev_text = user_text[0:cursor_str_pos]
        c = sys.stdin.buffer.raw.read(1)
        with stdout_lock:
            if c == b"\n":
                mi.notes_create(user_text, visibility="followers", local_only=True) # XXX: まだテスト
                cursor_str_pos = 0
                user_text = ""
            elif c == b"\x7f":
                user_text = input_buf.decode("utf-8", errors="ignore")
                if cursor_str_pos > 0:
                     user_text = remove_char_at_position(user_text, cursor_str_pos-1)
                     cursor_str_pos -= 1
                input_buf = user_text.encode("utf-8")

            elif c == b'\x1b':
                user_text = input_buf.decode("utf-8", errors="ignore")
                input_buf = user_text.encode("utf-8")

                c2 = sys.stdin.buffer.raw.read(1)
                c3 = sys.stdin.buffer.raw.read(1)
                if c2 == b'[':
                    if c3 == b'A': # UP
                        pass
                    elif c3 == b'B': #DOWN
                        pass
                    elif c3 == b'C': #RIGHT
                        pass
                    elif c3 == b'D': #LEFT
                        pass
                    elif c3 == b'3': #DEL
                        c4 = sys.stdin.buffer.raw.read(1)
                        if c4 == b'~':
                            if cursor_str_pos < len(user_text):
                                user_text = remove_char_at_position(user_text, cursor_str_pos)
                                input_buf = user_text.encode("utf-8")
                    elif c3 == b'H': #HOME
                        pass
                    elif c3 == b'F': #END
                        pass
            else: # TODO: Ctrl+H handling?
                tmp_buf += c
                tmp_text = tmp_buf.decode("utf-8", errors="ignore")
                if len(tmp_text):
                    user_text = insert_char_at_position(user_text, tmp_text, cursor_str_pos)
                    cursor_str_pos += 1
                    tmp_buf = b""
                    input_buf = user_text.encode("utf-8")
                

        stdout_write_override("")
        sys.stdout.flush()

input_thread = threading.Thread(target=input_loop)
input_thread.daemon = True
input_thread.start()

def str_to_term_str(s, prefix):
    term_width, _ = shutil.get_terminal_size()
    plen = wcwidth.wcswidth(prefix)
    l = 0
    r = s
    for i, c in enumerate(s):
        clen = wcwidth.wcwidth(c)
        if l + clen < term_width - plen:
            l += clen
        else:
            r = s[0:i]
            break
    return r

#added_len = 0
tail_ln = False
def stdout_write_override(s):
    with stdout_lock:
        _stdout_write(CLEAR_LINE + CURSOR_RESET + CURSOR_UP)
        _stdout_write(s)
        _stdout_write(RED + "\n" +  prefix + RESET + str_to_term_str(user_text, prefix))
#        _stdout_write(CURSOR_RESET + CURSOR_RIGHT*cursor_position_in_display)

_stdout_write = sys.stdout.write
sys.stdout.write = stdout_write_override


#https://misskey-hub.net/docs/api/streaming
even_note = False
def show_notes(data):
    note = data["body"]["body"]
    if "files" in note and len(note["files"]):
        update_driver_url(note["files"][0]["url"], str(note["text"]))
    global even_note
    note_template = GREEN + "%s" + RESET + ":" + (BLUE if even_note else CYAN)  + " %s"
    even_note = not even_note
    print(note_template%(note["user"]["name"], str(note["text"])))

async def runner():
    while True:
        try:
            async with websockets.connect(WS_URL) as ws:
                await ws.send(json.dumps({
                    "type": "connect",
                    "body": {
                        "channel": "localTimeline",
                        "id": "test"
                    }
                }))
                while True:
                    data = json.loads(await ws.recv())
                    show_notes(data)
        except websockets.exceptions.ConnectionClosedOK:
            print("Connection closed with code 1001. Retrying in a moment...")
            await asyncio.sleep(10)  # Add a delay before retrying
        except Exception as e:
            print(f"An error occurred: {e}")

asyncio.get_event_loop().run_until_complete(runner())
``
0
1
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?