2026年のゴールデンウィーク。
暇をもてあそんだ筆者は、何を思ったのかIoTに挑戦しました。
営業職の筆者でもできたため、エンジニアの方々ならよりスムーズにできそうです!
何を作ったのか(動画つき)
Azure IoT Hub経由でJSON形式のメッセージをLEDデバイスに送ります。
そのメッセージが、LEDデバイスにリアルタイムで表示されるというものです。
↓ 上記メッセージを送った際の表示
作ったものの全体像
用語は後で解説します!
この項では、なんとなくの概要把握でOKです。
全体像は、ざっくり以下です。
使用したもの一覧とコストは以下です。
| アイテム | 費用 |
|---|---|
| Pico 2 W Unicorn Cosmic | 約12,000円 |
| USBケーブル(データ転送対応) | 0円 ※家にあったものを使用 |
| Azure IoT Hub(Standard Free) | 無料 |
| Thonny IDE | 無料 |
| 美咲フォント | 無料 |
最も高いPico 2 W Unicorn Cosmicは、1年以上前に購入して置物になっていたものです。
コストを気にされる方は、より小さなサイズのものを検討されると良いでしょう。
Azure IoT Hubとは(概要)
ここから、使用したサービスと実装プロセスを書いていきます。
まずAzure IoT Hubとは、Microsoftが提供するIoT関連サービスです。
インターネット経由で、多数のIoTデバイスを管理することができます。
今回Azure IoT Hubが必要である理由は、直接PCやスマホからLEDデバイスにメッセージを送れないためです。
厳密には直接送ることも可能ですが、セキュリティ面や手間を鑑みると、Azure IoT Hubを使う方が良いです。
Azure IoT Hubが解決してくれることをまとめました。
| 項目 | 説明 |
|---|---|
| 認証・セキュリティ | SASトークンという仕組みで、正規のデバイスだけが接続可能 |
| どこからでも送れる | 場所を問わず、どこからでも送信可能 |
| 複数デバイスの管理 | LEDデバイスを増やしても一元管理可能 |
Azure IoT Hubとは(補足説明)
プランと料金
Azure IoT Hubには3つのプランがあります。
今回はStandardプラン(Standard Free)を選択しました。
理由は、C2Dメッセージ(クラウドからデバイスへのメッセージ)は、Standardプランでしか対応していないためです。
| プラン | 月額 | C2Dメッセージ対応 |
|---|---|---|
| Free | 無料 | × |
| Basic | $10〜 | × |
| Standard | 無料~ | ○ |
今回は、StandardプランのFreeエディション(Standard Free)を選びました。
1日あたりのメッセージ数に制限1があるものの、個人利用では十分と判断したためです。
Azure IoT Hubのプランと料金の詳細は、下記のページをご参照ください。
→https://azure.microsoft.com/ja-jp/pricing/details/iot-hub/?msockid=06ddd506de2b6a552783c397df9b6b0b
余談ですが、FreeプランとBasicプランは、D2Cメッセージ(デバイスからクラウドへのメッセージ)のみ対応しています。
認証方法
認証方法には、SASトークンを採用しました。
SASトークンとは、ざっくり言うと期限付き入館証です。
個人利用には十分だと判断し、かつ実装のハードルが比較的低かったため、採用に至りました。
他の方法とのざっくり比較表も置いておきます。
| 方法 | セキュリティ | 今回の用途 |
|---|---|---|
| SASトークン | ○ | ◎採用 |
| X.509証明書 | ◎ | △ややオーバー? |
| TPMチップ | ◎◎ | ×今回のLEDデバイスに非対応 |
SASトークンを採用する場合、設定情報やそれをまとめたファイルを、GitHubにアップしないようにしましょう!
基本実装:英語テキストをLEDデバイスに表示しよう
Thonny IDEのインストール
ここから実装に入ります。
まずはThonny IDEをインストールしましょう。
今回使うLEDデバイスには画面もキーボードもなく、直接操作できません。
そのため、PCにThonny IDEを入れてUSBで繋ぎ、編集する必要があります。
Thonny接続中は、Azure IoT Hubからのメッセージは届きません。
設定が終わったらUSBを抜いてUSB充電器に繋ぎ直す必要があります。
USBは充電専用のものでなく、データ転送対応のものを使いましょう。
私が使ったLEDデバイスはMicro USB(昔のAndroidに使われていたもの)を使いますが、家にあったUSBが充電専用であることに気づかず、時間を溶かしてしまいました。
Thonny IDEのセットアップ
私が使うLEDデバイスに搭載されているRasberry Pi(Raspberry Pi Pico 2 W)は、実はPythonには対応しておりません。
MicroPythonという、Pythonの軽量版を使う必要があります。
そのため、インタプリタをMicroPythonに設定する必要があります。
やり方はとてもシンプル。
右下のインタプリタ表示をクリックして出てくる下記の画面にて、「MicroPython (Raspberry Pi Pico)」を選択します。
下部の「ポートまたはWebREPL」は、自動検出されます。
自動検出されない場合、USBポートが充電専用のものであるなど、何かしら問題があると言えます。
必要なファイルを用意
main.pyとconfig.pyを作りましょう。
すでに存在する場合は、上書きしてもらって良いです。
| ファイル名 | 概要 |
|---|---|
| main.py | メインプログラム |
| config.py | 設定情報 |
| lib/umqtt/ | (設定不要)通信ライブラリ |
上記の表をベースに説明します。
一番下の"lib/umqtt/"は、MQTTをMicroPythonでカンタンに使えるようにしたライブラリです。
MQTTは、IoT向けの軽量通信プロトコルです。
今回使うLEDデバイスには最初から内臓されているため、設定不要です。
もし内臓されていないデバイスを使う際は、以下のステップでインストールしましょう。
・Thonnyのメニュー →「ツール」→「パッケージを管理」
・検索欄に micropython-umqtt.simple と入力
・「インストール」 をクリック
次に"config.py"は、WiFiやMicrosoft Azureへの接続情報をまとめた設定ファイルです。
このファイルをmain.pyと分けることで、セキュリティや保守性が高くなります。
最後に"main.py"です。
大きく5つを書く必要があります。
| 概要 | 詳細 |
|---|---|
| ライブラリの読み込み | 必要なライブラリの一括読み込み |
| WiFi接続 | "config.py"のWiFi情報を使ってインターネット接続 |
| SASトークン生成 | 先ほど紹介したSASトークンを生成 |
| Azure IoT Hub接続 | SASトークンを使ってIoT HubにMQTTで接続 |
| メッセージ受信・LED表示 | IoT Hubからメッセージが届いたら解析してLEDに反映 |
Pico 2 W Unicorn Cosmicのサンプルコードも、参考になります。
Azure IoT Hub側のセットアップ
Azureポータルで、IoT Hubを作成しましょう。
・IoT Hub作成(StandardのFree枠)
・デバイスを登録
・以下3つをメモ:
IOT_HUB_HOSTNAME = "xxx.azure-devices.net"
DEVICE_ID = "デバイスID"
DEVICE_KEY = "プライマリキー"
config.pyに情報を記入
いったん放置したconfig.pyに、以下の情報を記入しましょう。
WIFI_SSID = "自宅またはテザリングするスマホのWiFi名"
WIFI_PASSWORD = "WiFiパスワード"
IOT_HUB_HOSTNAME = "xxx.azure-devices.net"
DEVICE_ID = "デバイスID"
DEVICE_KEY = "プライマリキー"
応用編:日本語も表示したい
ただ、ひとつ落とし穴があります。
それは、Pico 2 W Unicorn CosmicのMicroPythonに最初から入っているフォントは、ASCII文字のみ対応ということです。
つまり、プラスαで設定しないと、日本語を表示できません。
ただ、安心してください。
Raspberry Pi Pico MicroPython用のフォントライブラリがすでに存在する、「美咲フォント」を使えば、比較的ラクに実装できます。
当初、自作フォントを作ろうとしましたが、カタカナの再現が難しく断念しました…。
既存のフォントを利用するのが確実ですし、早いと思います。
個人が作成・公開しているライブラリです。
公式サポートはないため、利用は自己責任でお願いします。
導入前にコードの中身を確認することをおすすめします。
「美咲フォント」のフォントデータ自体は、本家サイトから取得できます。
→http://littlelimit.net/misaki.htm
ご不安であれば、Pico向けの実装をご自身で行うのも良いでしょう。
美咲フォントをダウンロード
実装プロセスを書いていきます。
まずはGitHubにアクセスいただき、「Code」→「Download ZIP」をクリックします。
ZIPを解凍後、misakifontフォルダを取り出します。
解凍後のフォルダ
└── 📁 misakifont ← これだけ使う
├── __init__.py
├── misakifont.py
├── misakifontdata.py
└── tma_jp_utl.py
ThonnyでPicoにアップロード
「Stop」ボタンでプログラム停止後、左側パネル(PC側)でmisakifontフォルダを右クリック、「Upload to /」をクリックいただきます。
右側パネル(Pico側)にmisakifontフォルダが出現すればOKです。
📁 Raspberry Pi Pico
├── 📁 lib
├── 📁 misakifont ← これが増えていればOK
├── config.py
└── main.py
main.pyに美咲フォントを組み込む
①インポートを追加
from misakifont import MisakiFont
②初期化
self.mf = MisakiFont()
③文字を描画するメソッドを追加
def draw_char_misaki(self, char, x_offset, y_offset):
code = ord(char)
fontdata = self.mf.font(code)
# 全角=8px、半角=実際の幅
if self.mf.isZenkaku(code):
char_width = 8
else:
char_width = self.get_actual_char_width(fontdata)
# 1ピクセルずつ描画
for row in range(8):
for col in range(char_width):
if fontdata[row] & (0x80 >> col):
self.graphics.pixel(x_offset + col, y_offset + row)
return char_width + 1
④日本語・英語を自動判定して切り替え
def scroll_text_display(self):
# 日本語が含まれているか判定
has_jp = any(ord(c) > 0x7F for c in self.current_text)
if has_jp:
# 美咲フォントで描画
self.draw_text_misaki(self.current_text, x, y)
else:
# デフォルトASCIIフォントで描画
self.graphics.text(self.current_text, x, y, scale=1)
「日本語+英語」のメッセージを送った際に、英語の右端が切れてしまったため、日本語が含まれるかを判定して描画方法を自動で切り替えています。
また小文字が間延びして見えてしまったため、フォント幅を固定するのではなく、フォントデータから実際の文字幅を計算しています。
ひらがな・カタカナだけではなく漢字にも対応しています。
ただ、今回採用したライブラリ版は計1,710字であり、難しめの漢字には対応しておりません。
例)大阪の「阪」など
"config.py"と"main.py"のサンプルコードは、本記事の末尾で紹介いたします。
動作確認
Azureポータルからメッセージを送信してみましょう。
「Azure IoT Hub」>「デバイス」>「デバイスへのメッセージ」から送ることができます。
{"text": "こんにちは", "color": "white"}
{"text": "ゴールデンウイークだよ", "color": "white", "background": [0, 20 0]}
まとめ
今回は、LEDにリアルタイムで日本語を表示させるよう、Azure IoT Hubなどを使って実装を進めました。
デバイスの費用はかかるものの、それ以外のランニングコストは、ほぼ無料で済みます。
今後は、Azure Functionsで定期的にメッセージを自動表示したり、connpass APIと連携してコミュニティ情報を自動表示したりしていきたいです。
オマケ その①:"config.py"のサンプルコード
# WiFi接続設定
WIFI_SSID = "hogehoge" # 実際のWiFiネットワーク名に置き換え
WIFI_PASSWORD = "hogehoge" # 実際のWiFiパスワードに置き換え
# Azure IoT Hub接続設定
IOT_HUB_HOSTNAME = "hogehoge.azure-devices.net" # IoT Hubホスト名
DEVICE_ID = "hogehoge" # 一意のデバイス識別子
DEVICE_KEY = "hogehoge=" # デバイスプライマリキー
# ディスプレイ設定
DEFAULT_BRIGHTNESS = 0.5 # LED明度 (0.0-1.0)
DEFAULT_SCROLL_SPEED = 0.1 # スクロール速度 (秒)
DEFAULT_COLOR = (255, 255, 255) # デフォルト色 (RGB)
# システム動作設定
TELEMETRY_INTERVAL = 30 # テレメトリ送信間隔(秒)
CONNECTION_CHECK_INTERVAL = 60 # 接続確認間隔(秒)
MAX_RETRY_ATTEMPTS = 3 # 最大リトライ回数
RETRY_DELAY = 5 # リトライ間隔(秒)
# デバッグ設定
DEBUG_MODE = True # デバッグ出力の有効/無効
LOG_LEVEL = "INFO" # ログレベル: DEBUG, INFO, WARNING, ERROR
# セキュリティ設定
SAS_TOKEN_EXPIRY_HOURS = 1 # SASトークン有効期間(時間)
SSL_VERIFY = True # SSL証明書検証の有効/無効
KEEP_ALIVE_INTERVAL = 60 # MQTT Keep-Alive間隔(秒)
# パフォーマンス設定
MEMORY_WARNING_THRESHOLD = 10000 # メモリ警告閾値(バイト)
MEMORY_CRITICAL_THRESHOLD = 5000 # メモリクリティカル閾値(バイト)
GC_COLLECTION_INTERVAL = 300 # ガベージコレクション間隔(秒)
オマケ その②:"main.py"のサンプルコード
# main.pyは美咲フォントライブラリ(個人作成)を使用しています。
# 事前にThonnyでmisakifontフォルダをPicoにアップロードしてください。
import network
import time
import json
import gc
import hashlib
import ubinascii
import ssl
from misakifont import MisakiFont
# Cosmic Unicorn用のインポート
from cosmic import CosmicUnicorn
from picographics import PicoGraphics, DISPLAY_COSMIC_UNICORN
# MQTTクライアント
# Pimoroniカスタムファームウェアにはumqttが内蔵されているため
# 通常このtry/exceptは不要です。
# ただし標準MicroPython環境など、umqttが内蔵されていない環境で
# 動かす場合に備えて保険として残しています。
try:
from umqtt.simple import MQTTClient
MQTT_AVAILABLE = True
except ImportError:
print("警告: umqtt.simpleが利用できません")
MQTT_AVAILABLE = False
# config.pyから設定をインポート
try:
import config
except ImportError:
print("エラー: config.pyが見つかりません")
raise
# HMAC-SHA256の手動実装(MicroPython用)
# MicroPythonにはhmacライブラリが標準搭載されていないため手動実装。
# SASトークン生成時の署名計算に使用します。
def hmac_sha256(key, message):
block_size = 64
if len(key) > block_size:
key = hashlib.sha256(key).digest()
key = key + b'\x00' * (block_size - len(key))
o_key_pad = bytes([b ^ 0x5c for b in key])
i_key_pad = bytes([b ^ 0x36 for b in key])
inner_hash = hashlib.sha256(i_key_pad + message).digest()
return hashlib.sha256(o_key_pad + inner_hash).digest()
# NTPサーバーから時刻を取得して設定する関数
# Pico Wは電源オフで時刻がリセットされるため起動時に同期が必要です。
# SASトークンの有効期限計算に正確な時刻が必要なため、
# IoT Hub接続前に必ず呼び出します。
def set_time_ntp():
import ntptime
try:
print("NTP時刻同期中...")
ntptime.settime()
print("時刻同期完了: " + str(time.localtime()))
return True
except Exception as e:
print("NTP同期エラー: " + str(e))
return False
# テキストに日本語(0x7F以上の文字)が含まれるか確認する関数
# ASCII文字(英数字)は0x7F以下、日本語は0x80以上のコードを持つため
# この条件で判定できます。日本語ありなら美咲フォント、なしならASCIIフォントで描画します。
def has_japanese(text):
return any(ord(c) > 0x7F for c in text)
class CosmicUnicornAzureIoT:
def __init__(self):
print("Cosmic Unicorn Azure IoT システム初期化中...")
# ハードウェアの初期化
self.cosmic = CosmicUnicorn()
self.graphics = PicoGraphics(DISPLAY_COSMIC_UNICORN)
# ディスプレイサイズ(Cosmic Unicornは32×32固定)
self.width = 32
self.height = 32
self.cosmic.set_brightness(0.5)
# 美咲フォントの初期化(日本語表示に必要)
print("美咲フォント読み込み中...")
self.mf = MisakiFont()
print("美咲フォント読み込み完了")
# 表示パラメータの初期値
self.current_text = "Starting..."
self.text_color = (255, 255, 255) # 白
self.background_color = (0, 0, 0) # 黒
self.scroll_position = 0 # スクロール位置
# ネットワーク関連
self.wlan = network.WLAN(network.STA_IF)
self.mqtt_client = None
self.connected = False
# システム状態管理
self.running = False
self.loop_counter = 0 # MQTTメッセージ確認の間隔制御に使用
print("初期化完了")
# WiFiネットワークへの接続
# config.pyのSSID/パスワードを使用。接続状況をLEDに表示します。
# タイムアウトは15秒。
def connect_wifi(self):
self.display_status("WiFi...", (255, 255, 0))
self.wlan.active(True)
if not self.wlan.isconnected():
print("WiFiに接続中: " + config.WIFI_SSID)
self.wlan.connect(config.WIFI_SSID, config.WIFI_PASSWORD)
timeout = 15
while not self.wlan.isconnected() and timeout > 0:
time.sleep(1)
timeout -= 1
print(".", end="")
if self.wlan.isconnected():
ip_info = self.wlan.ifconfig()
print("\nWiFi接続成功: IP=" + str(ip_info))
self.display_status("WiFi OK", (0, 255, 0))
return True
else:
print("\nWiFi接続失敗")
self.display_status("WiFi NG", (255, 0, 0))
return False
return True
# Azure IoT Hub用SASトークンの生成
# DEVICE_KEYをもとにHMAC-SHA256で署名した期限付きトークンを生成します。
# デフォルトの有効期限は3600秒(1時間)。
def generate_sas_token(self, expiry=3600):
try:
resource_uri = config.IOT_HUB_HOSTNAME + "/devices/" + config.DEVICE_ID
expires = int(time.time()) + expiry
string_to_sign = resource_uri + "\n" + str(expires)
# DEVICE_KEYをBase64デコードして署名を生成
key = ubinascii.a2b_base64(config.DEVICE_KEY)
signature = hmac_sha256(key, string_to_sign.encode())
signature_b64 = ubinascii.b2a_base64(signature).decode().strip()
# URLエンコード(+, /, = をエスケープ)
signature_encoded = signature_b64.replace('+', '%2B').replace('/', '%2F').replace('=', '%3D')
sas_token = "SharedAccessSignature sr=" + resource_uri + "&sig=" + signature_encoded + "&se=" + str(expires)
return sas_token
except Exception as e:
print("SASトークン生成エラー: " + str(e))
return None
# MQTT C2Dメッセージ受信コールバック
# Azure IoT HubからC2Dメッセージを受信した際に自動で呼び出されます。
# JSONを解析して text / color / background / brightness を反映します。
def mqtt_callback(self, topic, msg):
try:
print("受信トピック: " + topic.decode())
print("受信メッセージ: " + msg.decode())
message_data = json.loads(msg.decode())
if 'text' in message_data:
self.update_display_text(message_data['text'])
if 'color' in message_data:
self.update_display_color(message_data['color'])
if 'background' in message_data:
self.update_background_color(message_data['background'])
if 'brightness' in message_data:
# 輝度は0.1〜1.0の範囲に制限
brightness = max(0.1, min(1.0, float(message_data['brightness'])))
self.cosmic.set_brightness(brightness)
print("輝度更新: " + str(brightness))
except Exception as e:
print("メッセージ処理エラー: " + str(e))
# Azure IoT HubへのMQTT接続
# SASトークンで認証し、ポート8883(SSL)でMQTT接続します。
# C2Dメッセージを受信するためのトピックを購読します。
def connect_azure_iot(self):
if not MQTT_AVAILABLE:
print("エラー: MQTTライブラリが利用できません")
self.display_status("MQTT NG", (255, 0, 0))
return False
self.display_status("IoT Hub...", (255, 255, 0))
try:
# NTP時刻同期(SASトークンの期限計算に必要)
set_time_ntp()
sas_token = self.generate_sas_token()
if not sas_token:
return False
# Azure IoT Hub MQTTのユーザー名フォーマット
username = config.IOT_HUB_HOSTNAME + "/" + config.DEVICE_ID + "/?api-version=2021-04-12"
# SSL設定(証明書検証なし)
ssl_context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
ssl_context.verify_mode = ssl.CERT_NONE
self.mqtt_client = MQTTClient(
client_id=config.DEVICE_ID,
server=config.IOT_HUB_HOSTNAME,
port=8883, # MQTTSポート(SSL暗号化)
user=username,
password=sas_token,
ssl=ssl_context
)
self.mqtt_client.set_callback(self.mqtt_callback)
self.mqtt_client.connect()
# C2Dメッセージ受信用トピックを購読
c2d_topic = "devices/" + config.DEVICE_ID + "/messages/devicebound/#"
self.mqtt_client.subscribe(c2d_topic)
self.connected = True
self.display_status("IoT OK", (0, 255, 0))
return True
except Exception as e:
print("Azure IoT Hub接続エラー: " + str(e))
self.display_status("IoT NG", (255, 0, 0))
return False
# ステータスメッセージをLEDに表示する
# WiFi接続中・IoT Hub接続中などの状態をLEDに表示します。
# ASCIIフォントを使用し、1.5秒間表示後に処理を続けます。
def display_status(self, status_text, color=(255, 255, 255)):
self.graphics.set_pen(self.graphics.create_pen(
self.background_color[0],
self.background_color[1],
self.background_color[2]
))
self.graphics.clear()
self.graphics.set_pen(self.graphics.create_pen(
color[0], color[1], color[2]
))
# 文字幅を計算して中央揃え(ASCIIフォントは1文字6px)
text_width = len(status_text) * 6
x_pos = max(0, (self.width - text_width) // 2)
self.graphics.text(status_text, x_pos, 12, scale=1)
self.cosmic.update(self.graphics)
time.sleep(1.5)
# 表示テキストを更新する
# テキスト変更時はスクロール位置をリセットして先頭から表示します。
def update_display_text(self, new_text):
self.current_text = str(new_text)
self.scroll_position = 0
print("テキスト更新: " + self.current_text)
# 文字色を更新する
# RGBリスト(例: [255, 0, 0])または色名(例: "red")で指定できます。
def update_display_color(self, color_data):
try:
if isinstance(color_data, list) and len(color_data) == 3:
self.text_color = tuple(color_data)
elif isinstance(color_data, str):
color_map = {
'red': (255, 0, 0),
'green': (0, 255, 0),
'blue': (0, 0, 255),
'yellow': (255, 255, 0),
'purple': (255, 0, 255),
'cyan': (0, 255, 255),
'white': (255, 255, 255),
'orange': (255, 165, 0)
}
self.text_color = color_map.get(color_data.lower(), (255, 255, 255))
print("文字色更新: " + str(self.text_color))
except Exception as e:
print("文字色更新エラー: " + str(e))
# 背景色を更新する
# RGBリスト(例: [80, 60, 0])または色名(例: "black")で指定できます。
def update_background_color(self, color_data):
try:
if isinstance(color_data, list) and len(color_data) == 3:
self.background_color = tuple(color_data)
elif isinstance(color_data, str):
color_map = {
'red': (255, 0, 0),
'green': (0, 255, 0),
'blue': (0, 0, 255),
'yellow': (255, 255, 0),
'purple': (255, 0, 255),
'cyan': (0, 255, 255),
'white': (255, 255, 255),
'orange': (255, 165, 0),
'black': (0, 0, 0)
}
self.background_color = color_map.get(color_data.lower(), (0, 0, 0))
print("背景色更新: " + str(self.background_color))
except Exception as e:
print("背景色更新エラー: " + str(e))
# フォントデータから実際に使われている幅を計算する
# 半角文字は8px幅より狭いことが多いため、実際に点灯しているピクセルの
# 最大列を調べることで正確な文字幅を求めます。
def get_actual_char_width(self, fontdata, max_width=8):
actual_width = 0
for row in range(8):
for col in range(max_width):
if fontdata[row] & (0x80 >> col):
if col + 1 > actual_width:
actual_width = col + 1
return actual_width if actual_width > 0 else max_width
# 文字の表示幅を返す(全角=8px、半角=実幅)
# 全角文字(日本語)は8px固定。半角文字(英数字)は実際の幅を計算します。
def get_char_width(self, char):
if self.mf.isZenkaku(ord(char)):
return 8
else:
fontdata = self.mf.font(ord(char))
return self.get_actual_char_width(fontdata, max_width=8)
# テキスト全体のピクセル幅を返す
# スクロール範囲の計算や中央揃えに使用します。
# 文字幅 + 1px(文字間スペース)の合計です。
def get_text_pixel_width(self, text):
total = 0
for char in text:
total += self.get_char_width(char) + 1
return total
# 美咲フォントで1文字を描画する
# 8行分のビットマップを1ピクセルずつ描画します。
# 画面範囲外のピクセルはスキップします(スクロール時の安全対策)。
def draw_char_misaki(self, char, x_offset, y_offset):
code = ord(char)
fontdata = self.mf.font(code)
# 全角は8px固定、半角は実幅を計算
if self.mf.isZenkaku(code):
char_width = 8
else:
char_width = self.get_actual_char_width(fontdata, max_width=8)
pen = self.graphics.create_pen(
self.text_color[0],
self.text_color[1],
self.text_color[2]
)
self.graphics.set_pen(pen)
for row in range(8):
for col in range(char_width):
if fontdata[row] & (0x80 >> col):
px = x_offset + col
py = y_offset + row
if 0 <= px < self.width and 0 <= py < self.height:
self.graphics.pixel(px, py)
return char_width + 1 # 次の文字の開始位置(文字幅 + 1px間隔)
# 美咲フォントでテキスト全体を描画する
# 1文字ずつ描画し、描画した文字幅分だけx座標をずらしていきます。
def draw_text_misaki(self, text, x_start, y_start):
x = x_start
for char in text:
x += self.draw_char_misaki(char, x, y_start)
# テキストスクロール表示(自動切り替え版)
# 日本語を含む場合は美咲フォント、英数字のみの場合はASCIIフォントで描画します。
# テキスト幅が32px以内なら中央固定表示、超える場合はスクロール表示します。
def scroll_text_display(self):
try:
# 背景色でクリア
self.graphics.set_pen(self.graphics.create_pen(
self.background_color[0],
self.background_color[1],
self.background_color[2]
))
self.graphics.clear()
if has_japanese(self.current_text):
# 美咲フォントで描画(日本語あり・英語混在も対応)
text_pixel_width = self.get_text_pixel_width(self.current_text)
y_pos = (self.height - 8) // 2 # 縦中央揃え(32-8)÷2=12
if text_pixel_width <= self.width:
# 短いテキスト:中央固定表示
x_start = (self.width - text_pixel_width) // 2
self.draw_text_misaki(self.current_text, x_start, y_pos)
else:
# 長いテキスト:左からスクロール
x_start = self.width - self.scroll_position
self.draw_text_misaki(self.current_text, x_start, y_pos)
self.scroll_position += 1
if self.scroll_position > text_pixel_width + self.width:
self.scroll_position = 0 # 一周したらリセット
else:
# ASCIIフォントで描画(英数字のみ)
self.graphics.set_pen(self.graphics.create_pen(
self.text_color[0],
self.text_color[1],
self.text_color[2]
))
text_width = len(self.current_text) * 6 # ASCIIは1文字6px
if text_width <= self.width:
# 短いテキスト:中央固定表示
x_pos = (self.width - text_width) // 2
self.graphics.text(self.current_text, x_pos, 12, scale=1)
else:
# 長いテキスト:左からスクロール
x_pos = self.width - self.scroll_position
self.graphics.text(self.current_text, x_pos, 12, scale=1)
self.scroll_position += 1
if self.scroll_position > text_width + self.width:
self.scroll_position = 0 # 一周したらリセット
self.cosmic.update(self.graphics)
except Exception as e:
print("表示更新エラー: " + str(e))
# メインシステムループ
# WiFi接続 → IoT Hub接続 → メッセージ受信ループの順で処理します。
# 10ループに1回MQTTメッセージを確認し、毎ループでLED表示を更新します。
def run(self):
print("システム開始...")
if not self.connect_wifi():
print("WiFi接続に失敗しました")
return
if not self.connect_azure_iot():
print("Azure IoT Hub接続に失敗しました")
return
self.update_display_text("IoT Hub Ready!")
self.running = True
print("メインループ開始...")
try:
while self.running:
self.loop_counter += 1
# 10ループに1回MQTTメッセージを確認
# (毎ループ確認するとスクロールがカクつくため間引いています)
if self.connected and self.loop_counter >= 10:
self.loop_counter = 0
try:
self.mqtt_client.check_msg()
except Exception as e:
print("メッセージ確認エラー: " + str(e))
self.connected = False
# 毎ループでLED表示を更新(スクロールを滑らかにするため)
self.scroll_text_display()
time.sleep(0.03) # スクロール速度調整(秒)
# ガベージコレクション(メモリ解放)
if self.loop_counter == 0:
gc.collect()
except KeyboardInterrupt:
print("\nユーザーによるシステム停止")
except Exception as e:
print("システムエラー: " + str(e))
finally:
if self.mqtt_client and self.connected:
try:
self.mqtt_client.disconnect()
except:
pass
print("システム停止完了")
if __name__ == "__main__":
try:
system = CosmicUnicornAzureIoT()
system.run()
except Exception as e:
print("システム実行エラー: " + str(e))
finally:
print("プログラム終了")
-
IoT Hub ユニットごとのメッセージの合計数が、1日当たり8,000という制約があります。 ↩

