1
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 1 year has passed since last update.

LEDマトリクスを使った電光掲示板風タスクマネージャの開発

Last updated at Posted at 2023-11-11

何を作ったか

LEDマトリクスとラズパイを使って、PCの情報を電光掲示板風に表示するデバイスを開発しました。

写真は全数値同じ色ですが、現在は使用率・温度によって色が変わるようになっています。(例えばGPU温度50℃なら緑、70℃ならオレンジ、90℃なら赤といった風に)

自分の備忘録として仕組みとか使い方を説明します。

仕組み

image.png

  1. ホストPCでデバイス情報を取得
  2. 無線ソケット通信でRaspberry Pi Zero WHに取得したデバイス情報を送信
  3. Raspberry Pi Zero WHのGPIOピンでLEDマトリクスにデータ伝送、表示

[ホストPC] スペック

パーツ 内容
OS Windows 11 Home
CPU 12th Gen Intel(R) Core(TM) i5-12400F

[ホストPC] デバイス情報を取得

欲しいデバイス情報としてはCPU使用率、RAM使用率、CPU温度、GPU温度です。
このうちCPU使用率、RAM使用率はPythonのpsutilで取得可能です。
CPU温度とGPU温度については、後者はnvidia-smiコマンドで楽に取得可能ですが、前者のCPU温度は取得方法が限られます。
この記事が参考になり、方法は以下。

  • sysfs経由で温度を取得する
  • lm_sensors を使用する

しかしこれらの方法はLinux環境の方法です。
WSL上で上記の方法を試してみましたがホストデバイス側のCPU温度は取得できませんでした。
そこでオープンソースのモニタリングソフトウェアであるOpen Hardware MonitorをWindows側(PowerShell)で使用することにしました。

しかしOpen Hardware MonitorはIntel 12th Gen. CPUに対応しておらず、肝心の温度を取得できませんでした。
有志がPRしてビルドしたバージョンがissueに置かれており、それを利用して解決しました。
https://github.com/hexagon-oss/openhardwaremonitor/releases/tag/v0.9.7-alpha11

[ホストPC/ラズパイ] ソケット通信でラズパイにデータ送信

PCとラズパイのリアルタイムデータ伝送は以下の方法がありそうでした。(自分調べ)

  • USB有線シリアル通信
  • Bluetooth無線シリアル通信
  • WiFi無線ソケット通信

USBシリアル通信には専用ケーブルが必要で、Bluetoothは自分のラズパイが不安定だったことから、今回は3番目のソケット通信を選択しました。

ソケット通信実装は以下サイトが参考になりました。
今回だと、デバイス情報を取得するホストPC=クライアント、ラズパイ=サーバ となります。

クライアント編
https://www.raspberrypirulo.net/entry/socket-client

サーバ編
https://www.raspberrypirulo.net/entry/socket-server

[ラズパイ] LEDマトリクスへの表示

ラズパイのGPIO端子を用いたLEDマトリクスの点灯については、自分の昔の記事をご覧ください。
https://qiita.com/Suibari_cha/items/45f4decd07f9dd51159a

使い方

まず、ラズパイ側で以下実行。
root権限が必要なのはLEDマトリクス表示ライブラリの仕様です。

$ source ./venv/bin/activate
$ sudo python ./main.py

これでLEDマトリクスにALL0が表示されます。
次に、ホストPC側で以下実行。

PS> .\venv\Scripts\activate
PS> python .\main.py

これでソケット通信が確立し、一定時間ごとにLEDマトリクス表示が更新されます。

コード

本記事内のURLのコードを切り貼りしており汚いですが… 以下です。

ホストPC(クライアント)側

import socket
import time
from datetime import datetime
import psutil
import clr

HOST_IP = "192.168.0.XXX" # 接続するサーバーのIPアドレス
PORT = 12345 # 接続するサーバーのポート
DATESIZE = 1024  # 受信データバイト数
DLL_PATH = r"C:\Users\xxxxx\OpenHardwareMonitor\OpenHardwareMonitorLib.dll"

clr.AddReference(DLL_PATH)
from OpenHardwareMonitor.Hardware import Computer
c = Computer()

class SocketClient():

    def __init__(self, host, port):
        self.host = host
        self.port = port
        self.socket = None

    def send_recv(self, input_data):
        
        # sockインスタンスを生成
        with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock:
            # ソケットをオープンにして、サーバーに接続
            sock.connect((self.host, self.port))
            input_data = " ".join(str(elm) for elm in input_data)
            print('[{0}] input data : {1}'.format(datetime.now().strftime('%Y-%m-%d %H:%M:%S'), input_data) )
            # 入力データをサーバーへ送信
            sock.send(input_data.encode('utf-8'))
            # サーバーからのデータを受信
            rcv_data = sock.recv(DATESIZE)            
            rcv_data = rcv_data.decode('utf-8')
            print('[{0}] recv data : {1}'.format(datetime.now().strftime('%Y-%m-%d %H:%M:%S'), rcv_data) )

def get_device_info():

    result = []

    # OpenHardwareMonitor更新
    c.Hardware[0].Update()
    c.Hardware[1].Update()

    # CPU使用率
    result.append(psutil.cpu_percent(interval=1))

    # RAM使用率
    result.append(psutil.virtual_memory().percent)

    # CPU温度
    cpu = c.Hardware[0]
    for a in range (0, len(cpu.Sensors)):
        if str(cpu.Sensors[a].SensorType) == "Temperature":
            if str(cpu.Sensors[a].Name) == "CPU Package":
                cpu_temp = cpu.Sensors[a].get_Value()
                result.append(cpu_temp)

    # GPU温度
    result.append(c.Hardware[1].Sensors[0].get_Value())

    return result

if __name__ == '__main__':

    client = SocketClient(HOST_IP, PORT)

    # OpenHardwareMonitor設定
    c.CPUEnabled = True
    c.GPUEnabled = True
    c.Open()

    while True:
        result = get_device_info()
        client.send_recv(result)
        
        time.sleep(2)

ラズパイ(サーバ)側

import socket
import threading
from datetime import datetime
from rgbmatrix import RGBMatrix, RGBMatrixOptions, graphics
import time

HOST_IP = "192.168.0.XXX"
PORT = 12345
CLIENTNUM = 1
DATASIZE = 1024
CPU_TEMP_MAX = 80
CPU_TEMP_MIN = 20
GPU_TEMP_MAX = 100
GPU_TEMP_MIN = 40

cpu_use_ratio = 0
ram_use_ratio = 0
cpu_temp = 0
gpu_temp = 0
rcv_data_decode = [0, 0, CPU_TEMP_MIN, GPU_TEMP_MIN]

class SocketServer():
    def __init__(self, host, port):
        self.host = host
        self.port = port

    # サーバー起動
    def run_server(self):

        # server_socketインスタンスを生成
        with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as server_socket:
            server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
            server_socket.bind((self.host, self.port))
            server_socket.listen(CLIENTNUM)
            print('[{}] run server'.format(datetime.now().strftime('%Y-%m-%d %H:%M:%S')))

            while True:
                # クライアントからの接続要求受け入れ
                client_socket, address = server_socket.accept()
                print('[{0}] connect client -> address : {1}'.format(datetime.now().strftime('%Y-%m-%d %H:%M:%S'), address) )
                client_socket.settimeout(60)
                # クライアントごとにThread起動 send/recvのやり取りをする
                t = threading.Thread(target = self.conn_client, args = (client_socket,address))
                t.setDaemon(True)
                t.start()

    # クライアントごとにThread起動する関数
    def conn_client(self, client_socket, address):
        global rcv_data_decode

        with client_socket:
            while True:
                # クライアントからデータ受信
                rcv_data = client_socket.recv(DATASIZE)
                if rcv_data:
                    # データ受信したデータをそのままクライアントへ送信
                    client_socket.send(rcv_data)
                    rcv_data_decode = rcv_data.decode('utf-8').split(' ') # 'AAA BBB CCC'を配列化する
                else:
                    break

        print('[{0}] disconnect client -> address : {1}'.format(datetime.now().strftime('%Y-%m-%d %H:%M:%S'), address) )

class createLED():
    # パネル設定用関数
    def __init__(self):
        self.font = []             # フォント
        self.textColor = []        # テキストカラー
        self.flagPreparedToDisplay = False # メイン関数を表示する準備ができたかどうかのフラグ

        # LEDマトリクス設定
        options = RGBMatrixOptions()
        options.rows = 32
        options.cols = 64
        options.gpio_slowdown = 0
        self.matrix = RGBMatrix(options = options)

        # キャンバス作成
        self.offscreen_canvas = self.matrix.CreateFrameCanvas()

        # 文字設定
        print("setting LED matrix options...")
        self.font.append(graphics.Font())
        self.font[0].LoadFont("./src/font/misaki_bdf_2021-05-05/misaki_gothic_2nd.bdf")
        
        # パネル点灯する関数(一時的)
    def displayLEDTemp(self):
        print('display "WAITING..."')
        font_simple = graphics.Font()
        font_simple.LoadFont("./work/rpi-rgb-led-matrix/fonts/5x8.bdf")
        textColor_simple = graphics.Color(255, 255, 0)
        while (self.flagPreparedToDisplay == False):
        graphics.DrawText(self.offscreen_canvas, font_simple, 0, 31, textColor_simple, "WAITING...") # 静止文字
        self.offscreen_canvas = self.matrix.SwapOnVSync(self.offscreen_canvas)

    # パネル点灯する関数(メイン)
    def displayLEDMain(self):
        global rcv_data_decode

        # Start loop
        i = 0
        pos = self.offscreen_canvas.width
        self.flagPreparedToDisplay = True
        print("display LED, Press CTRL-C to stop")
        while True:
            for j in range(len(rcv_data_decode)):
                self.textColor.append(graphics.Color())
                if (j == 2): # CPU temperatur
                    r_value = getColorByMaxAndMin(float(rcv_data_decode[j]), CPU_TEMP_MIN, CPU_TEMP_MAX, 0, 255)
                    g_value = getColorByMaxAndMin(float(rcv_data_decode[j]), CPU_TEMP_MIN, CPU_TEMP_MAX, 255, 0)
                elif (j == 3): # GPU Temperature
                    r_value = getColorByMaxAndMin(float(rcv_data_decode[j]), GPU_TEMP_MIN, GPU_TEMP_MAX, 0, 255)
                    g_value = getColorByMaxAndMin(float(rcv_data_decode[j]), GPU_TEMP_MIN, GPU_TEMP_MAX, 255, 0)
                else:
                    r_value = 255 * float(rcv_data_decode[j])/100
                    g_value = 255 * (100 - float(rcv_data_decode[j]))/100
                self.textColor[j] = graphics.Color(r_value, g_value, 0)

            self.offscreen_canvas.Clear()
            graphics.DrawText(self.offscreen_canvas, self.font[0], 0, 7,    self.textColor[0], "CPUUSG: "+str(rcv_data_decode[0])) # 静止文字
            graphics.DrawText(self.offscreen_canvas, self.font[0], 0, 14+1, self.textColor[1], "RAMUSG: "+str(rcv_data_decode[1])) # 静止文字
            graphics.DrawText(self.offscreen_canvas, self.font[0], 0, 21+2, self.textColor[2], "CPUTMP: "+str(rcv_data_decode[2])) # 静止文字
            graphics.DrawText(self.offscreen_canvas, self.font[0], 0, 28+3, self.textColor[3], "GPUTMP: "+str(rcv_data_decode[3])) # 静止文字
            
            time.sleep(0.05)
            self.offscreen_canvas = self.matrix.SwapOnVSync(self.offscreen_canvas)

# (xmin, ymin)と(xmax, ymax)を通る一次関数の導出
def getColorByMaxAndMin(x, xmin, xmax, ymin, ymax):
    inc = (ymax - ymin) / (xmax - xmin)
    y = inc * x + (ymin - inc * xmin)
    return y

if __name__ == "__main__":

    LED = createLED()
    t = threading.Thread(target=LED.displayLEDMain, daemon=True)
    t.start()
    SocketServer(HOST_IP, PORT).run_server()

おわりに

本筋と少しそれますが、LEDマトリクスを点灯させた状態でラズパイの電源を切ったとき、LEDマトリクスも消灯してほしいですが、ドットが一部発光したままになることがあります。
なぜでしょうか。GPIOがHに張り付いている? 電気回路知識がないのでわかりません。
詳しい方がいたら教えてほしいです。

1
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
1
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?