IoTLT Advent Calendar 2024 8日目です!
最近平成レトロにハマりつつあるVanです。
今回はまさに時代を象徴する新元号の改元発表をハックしたいと思います。
○○おじさんになりたい
"平成"の元号を発表した小淵官房長官は "平成おじさん"
"令和"の元号を発表した菅官房長官は "令和おじさん"
というネットミームがあるみたいですね。
"令和"の発表前はネットやテレビで予想合戦が白熱していて、発表の当日もちょっとしたお祭り騒ぎ?になっていた気がします。
私も "アレ" を体験してみたい!と思いました。
どうやって実現するか
単純な話、自分が考えた文字(元号)を表示するだけですが、電子工作でよく使われるLCD(液晶ディスプレイ)ではドット文字で味気がないと考えました。
そこで、電子書籍で使われるe-Paper(電子ペーパー)を使うことでリアリティを出すことができるのではないかと考えました。
e-Paperを調査していたところ、以下の記事を見つけたのでこれを参考に開発を進めることを決めました。
e-PaperはWaveshare Electronics製の7.5インチのものを使用しました。
制御基板との接続が面倒そうですが、ちょうど家にRaspberry Pi Pico Wが転がっていたのでRaspberry Pi PicoのHAT(Hardware Attached on Top)付きのものを購入しました。
システム構成
LINEを起点に3パターンの元号表示をできるようにしました。
①. 自分の好きな2文字をe-Paperで表示する。
(LINEのメッセージをもとにCloud Runで処理)
②. 「かっこいい」、「かわいい」など抽象的なイメージから元号をAIが考え、e-Paperで表示する。
(LINEのメッセージをChatGPTに入力し、出力結果をCloud Runで処理)
③. 入力画像をもとにイメージにあった元号をAIが考え、e-Paperに表示する。
(LINEからImgBBに画像をアップロードし、そのURLをChatGPTに入力し、出力結果をCloud Runで処理)
↓
- Cloud Runでは入力された文字を、PythonのPILライブラリで筆文字画像に成形してモノクロのbmpファイルをFirebaseのStorageに保存します。
↓
- FirebaseのStorageに保存したbmpファイルを同じくFirebaseのRealtime Databaseに流します。
↓
- さらにFirebaseのREST APIを使ったストリーミング機能により、画像の更新があったタイミングでRaspberry Pi Pico WはFirebaseのRealtime Databaseの値を取得し、e-Paperに画像を表示します。
ざっとこんな感じの流れです。
Cloud RunもFirebaseも、そもそもPythonでコードを書くことも初めてだったのでChatGPTのOpenAI o1に頼ってなんとか制作することができました。
サーバー側(Cloud Run側)のディレクトリ構成
line-to-epaper/
├ app.py
├ Dockerfile(後述)
├ requirements.txt(後述)
├ brush.font.ttf
└ serviceAcconutKey.json
brush.font.ttfは以下の筆文字のフリーフォントを格納しています。
JIS第一水準漢字2965字が収録されているということで大体の漢字は対応してそうです。
serviceAccountKey.jsonはFirebaseのサービスアカウントに関連付けられた認証情報ファイルです。
Firebaseのコンソールからプロジェクトの設定→サービスアカウント→ 新しい秘密鍵を生成 から取得することができます。
app.pyのソースコードはこちら
# 必要なモジュールをインポートします
# OS(オペレーティングシステム)関連の機能を使用するためのモジュール
import os
# 正規表現(パターンマッチング)を扱うためのモジュール
import re
# ログを記録するためのモジュール
import logging
# ウェブアプリケーションを構築するためのFlask関連のモジュール
from flask import Flask, request, abort
# LINE BotのAPIを扱うためのモジュール
from linebot import LineBotApi, WebhookHandler
# LINEの署名検証エラーを扱うための例外
from linebot.exceptions import InvalidSignatureError
# LINEで扱うメッセージやイベントのモデル
from linebot.models import (
MessageEvent,
TextMessage,
ImageMessage,
TextSendMessage,
VideoMessage, # 動画メッセージを扱うためのモデル
AudioMessage, # 音声メッセージを扱うためのモデル
LocationMessage, # 位置情報メッセージを扱うためのモデル
StickerMessage # スタンプメッセージを扱うためのモデル
)
# 画像の生成や編集を行うためのPillowライブラリ
from PIL import Image, ImageDraw, ImageFont
# Firebaseの機能を使用するためのモジュール
import firebase_admin
# Firebaseの認証、ストレージ、データベース機能
from firebase_admin import credentials, storage, db
# OpenAIのAPIを使用するためのモジュール
from openai import OpenAI
# HTTPリクエストを行うためのモジュール
import requests
# JSONデータを扱うためのモジュール
import json
# バイナリデータをBase64エンコード/デコードするためのモジュール
import base64
# 一時ファイルを作成するためのモジュール
import tempfile
# OpenAIのクライアントを初期化
client = OpenAI()
# ログの基本設定をINFOレベルに設定
logging.basicConfig(level=logging.INFO)
# ロガーを取得
logger = logging.getLogger(__name__)
# Flaskアプリケーションを初期化
app = Flask(__name__)
# LINE APIの設定を環境変数から取得
LINE_CHANNEL_SECRET = os.getenv('LINE_CHANNEL_SECRET')
LINE_CHANNEL_ACCESS_TOKEN = os.getenv('LINE_CHANNEL_ACCESS_TOKEN')
# 必要な環境変数が設定されているか確認
if not LINE_CHANNEL_SECRET or not LINE_CHANNEL_ACCESS_TOKEN:
logger.error("環境変数 LINE_CHANNEL_SECRET と LINE_CHANNEL_ACCESS_TOKEN が設定されていません。")
raise ValueError("環境変数 LINE_CHANNEL_SECRET と LINE_CHANNEL_ACCESS_TOKEN を設定してください。")
# LINE Bot APIとWebhookハンドラーを初期化
line_bot_api = LineBotApi(LINE_CHANNEL_ACCESS_TOKEN)
handler = WebhookHandler(LINE_CHANNEL_SECRET)
# Firebaseの設定を環境変数から取得
FIREBASE_STORAGE_BUCKET = os.getenv('FIREBASE_STORAGE_BUCKET')
FIREBASE_DB_URL = os.getenv('FIREBASE_DB_URL') # Realtime DatabaseのURL
# 必要な環境変数が設定されているか確認
if not FIREBASE_STORAGE_BUCKET or not FIREBASE_DB_URL:
logger.error("環境変数 FIREBASE_STORAGE_BUCKET または FIREBASE_DB_URL が設定されていません。")
raise ValueError("環境変数 FIREBASE_STORAGE_BUCKET と FIREBASE_DB_URL を設定してください。")
try:
# サービスアカウントキーのJSONファイルから認証情報を取得
cred = credentials.Certificate('serviceAccountKey.json') # サービスアカウントキーのパス
# Firebaseアプリを初期化
firebase_admin.initialize_app(cred, {
'storageBucket': FIREBASE_STORAGE_BUCKET,
'databaseURL': FIREBASE_DB_URL
})
# Firebaseストレージのバケットを取得
bucket = storage.bucket()
logger.info("Firebase Storage と Realtime Database に正常に接続しました。")
except Exception as e:
# Firebaseの初期化中にエラーが発生した場合、ログを記録し処理を停止
logger.error(f"Firebase の初期化中にエラーが発生しました: {e}")
raise
# OpenAI APIキーを環境変数から取得
OPENAI_API_KEY = os.getenv('OPENAI_API_KEY')
# APIキーが設定されているか確認
if not OPENAI_API_KEY:
logger.error("環境変数 OPENAI_API_KEY が設定されていません。")
raise ValueError("環境変数 OPENAI_API_KEY を設定してください。")
# OpenAIクライアントにAPIキーを設定
client.api_key = OPENAI_API_KEY
logger.info("OpenAI APIキーを正常に設定しました。")
# ImgBB APIキーを環境変数から取得
IMGBB_API_KEY = os.getenv('IMGBB_API_KEY')
# APIキーが設定されているか確認
if not IMGBB_API_KEY:
logger.error("環境変数 IMGBB_API_KEY が設定されていません。")
raise ValueError("環境変数 IMGBB_API_KEY を設定してください。")
# 二文字の漢字を判定するための正規表現パターン
KANJI_REGEX = re.compile(r'^[\u4e00-\u9faf]{2}$')
def is_valid_input(text):
"""
入力が3文字以内であるかどうかをチェックします。
"""
if len(text) > 3:
return False # 文字数が3を超える場合は無効
return True # 上記以外の場合は有効
def is_two_kanji(text):
"""
テキストが二文字の漢字であるかを判定します。
"""
return bool(KANJI_REGEX.match(text))
def create_bmp(text):
"""
テキストからBMP画像を作成する関数です。
"""
try:
# 画像のサイズと色の設定
width, height = 480, 800 # 画像サイズを480x800に設定
background_color = 'white' # 背景色を白に設定
text_color = 'black' # 文字色を黒に設定
# モノクロの新しい画像を作成
image = Image.new('1', (width, height), color=1) # '1'はモノクロ画像を意味する
draw = ImageDraw.Draw(image) # 画像に描画するためのオブジェクト
# フォントの設定(筆文字風のフォントを指定)
font_path = 'brush_font.ttf' # フォントファイルのパス
# 文字数に応じてフォントサイズと位置を調整
if len(text) == 1:
font_size = 100 # 1文字の場合の初期フォントサイズ
font = ImageFont.truetype(font_path, font_size)
# 画像サイズに合わせてフォントサイズを調整
while True:
total_height = sum([draw.textsize(char, font=font)[1] for char in text])
if total_height < height * 0.5:
font_size += 5
font = ImageFont.truetype(font_path, font_size)
else:
break
elif len(text) == 2:
font_size = 100 # 2文字の場合の初期フォントサイズ
font = ImageFont.truetype(font_path, font_size)
while True:
total_height = sum([draw.textsize(char, font=font)[1] for char in text]) + 20
if total_height < height * 0.9:
font_size += 5
font = ImageFont.truetype(font_path, font_size)
else:
break
else:
font_size = 100 # 3文字の場合の初期フォントサイズ
font = ImageFont.truetype(font_path, font_size)
while True:
total_height = sum([draw.textsize(char, font=font)[1] for char in text]) + 40
if total_height < height * 0.9:
font_size += 5
font = ImageFont.truetype(font_path, font_size)
else:
break
# フォントの読み込みとサイズ調整が成功したことをログに記録
logger.info(f"フォント {font_path} を正常に読み込み、最適なサイズ {font_size} に調整しました。")
except IOError as e:
# フォントの読み込みに失敗した場合、ログを記録し処理を停止
logger.error(f"フォントファイル {font_path} の読み込みに失敗しました: {e}")
raise
try:
# 縦書きのテキストを描画
char_spacing = 20 # 文字間のスペース
if len(text) == 1:
total_height = draw.textsize(text, font=font)[1]
else:
total_height = sum([draw.textsize(char, font=font)[1] for char in text]) + char_spacing * (len(text) -1)
start_y = (height - total_height) // 2 # 垂直方向の開始位置を計算
for i, char in enumerate(text):
# 各文字の位置を計算して描画
if len(text) == 1:
text_width, text_height = draw.textsize(char, font=font)
x = (width - text_width) // 2
y = (height - text_height) // 2
else:
text_width, text_height = draw.textsize(char, font=font)
x = (width - text_width) // 2
y = start_y + i * (text_height + char_spacing)
# 文字を描画
draw.text((x, y), char, font=font, fill=0)
# BMPファイルの作成が成功したことをログに記録
logger.info("縦書きでBMPファイルを作成しました。")
# 画像を90度回転(横向きに)して保存
image = image.rotate(90, expand=True)
except Exception as e:
# テキストの描画中にエラーが発生した場合、ログを記録し処理を停止
logger.error(f"縦書きテキストの描画中にエラーが発生しました: {e}")
raise
# BMPファイルとして保存
bmp_path = f'{text}.bmp'
image.save(bmp_path, format='BMP')
logger.info(f"BMPファイル {bmp_path} を正常に保存しました。")
# 作成したファイルのパスを返す
return bmp_path
def upload_to_firebase(file_path):
"""
ファイルをFirebase Storageにアップロードする関数です。
"""
try:
# アップロード先のパスを設定
blob = bucket.blob(f'kanji_bmp/{os.path.basename(file_path)}')
# ファイルをアップロード
blob.upload_from_filename(file_path)
# ファイルを公開設定にする
blob.make_public()
# アップロード成功のログを記録
logger.info(f"ファイル {file_path} を Firebase Storage に正常にアップロードしました。URL: {blob.public_url}")
# アップロードしたファイルのURLを返す
return blob.public_url
except Exception as e:
# アップロード中にエラーが発生した場合、ログを記録し処理を停止
logger.error(f"Firebase Storage へのアップロード中にエラーが発生しました: {e}")
raise
def generate_two_kanji(message):
"""
メッセージに基づいて漢字二文字を生成する関数です。
"""
try:
# OpenAIに送信するプロンプトを作成
prompt = (
f"ユーザーからのメッセージ: '{message}' に基づいて、日本の元号のような印象を与える適切な漢字二文字を提案してください。"
"返答は常用漢字二文字のみとしてください。"
"返答は必ず二文字のみとしてください。"
)
logger.info(f"OpenAIに送信するプロンプト: {prompt}")
# OpenAI APIを呼び出して漢字を生成
response = client.chat.completions.create(
model="gpt-4o-mini", # 使用するモデルを指定
messages=[
{"role": "system", "content": "あなたは日本の元号に詳しいアシスタントです。"},
{"role": "user", "content": prompt}
],
max_tokens=10,
temperature=0.7,
)
# 生成された漢字を取得
kanji = response.choices[0].message.content.strip()
# 生成された漢字が正しい形式かチェック
if not KANJI_REGEX.match(kanji):
logger.error(f"生成された漢字が正しくありません: '{kanji}'")
raise ValueError("OpenAIが有効な漢字二文字を返しませんでした。")
# 生成された漢字をログに記録
logger.info(f"OpenAIから生成された漢字二文字: {kanji}")
# 漢字を返す
return kanji
except Exception as e:
# OpenAI APIの呼び出し中にエラーが発生した場合、ログを記録し処理を停止
logger.error(f"OpenAI APIの呼び出し中にエラーが発生しました: {e}")
raise
def upload_image_to_imgbb(image_bytes):
"""
画像のバイナリデータをImgBBにアップロードし、URLを取得する関数です。
"""
try:
logger.info("ImgBBへの画像アップロードを開始します。")
url = "https://api.imgbb.com/1/upload"
# ImgBB APIキーと画像の有効期限を設定
params = {
"key": IMGBB_API_KEY,
"expiration": 60 # 1分後に画像を自動削除
}
# 画像データをBase64エンコード
encoded_image = base64.b64encode(image_bytes).decode('utf-8')
# リクエストデータを準備
data = {
"image": encoded_image
}
# POSTリクエストを送信して画像をアップロード
response = requests.post(url, params=params, data=data)
response.raise_for_status() # HTTPエラーがあれば例外を発生
# レスポンスをJSON形式で取得
result = response.json()
if result.get('success'):
# アップロード成功時の処理
image_url = result['data']['url']
logger.info(f"ImgBBへのアップロードに成功しました。URL: {image_url}")
return image_url # アップロードした画像のURLを返す
else:
# レスポンスで失敗が返された場合の処理
logger.error(f"ImgBBからのレスポンスで失敗しました: {result}")
raise ValueError("ImgBBへのアップロードが失敗しました。")
except requests.exceptions.HTTPError as http_err:
# HTTPエラーが発生した場合、ログを記録し処理を停止
logger.error(f"ImgBBへのアップロード中にHTTPエラーが発生しました: {http_err}")
logger.error(f"レスポンス内容: {response.text}")
raise
except Exception as e:
# その他のエラーが発生した場合、ログを記録し処理を停止
logger.error(f"ImgBBへのアップロード中にエラーが発生しました: {e}")
raise
def analyze_image_with_openai(image_url):
"""
OpenAI APIを使用して画像を分析し、適切な漢字二文字を提案する関数です。
"""
try:
# OpenAIに送信するプロンプトを作成
prompt = (
"画像からイメージされる、日本の元号のような印象を与える適切な漢字二文字を提案してください。"
"返答は常用漢字二文字のみとしてください。"
"返答は必ず二文字のみとしてください。"
)
logger.info("OpenAIに画像分析を依頼します。")
# OpenAI APIを呼び出して漢字を生成
response = client.chat.completions.create(
model="gpt-4o-mini", # 使用するモデルを指定
messages=[
{
"role": "user",
"content": [
{"type": "text", "text": prompt},
{
"type": "image_url",
"image_url": {
"url": image_url,
}
},
],
}
],
)
# 生成された漢字を取得
kanji = response.choices[0].message.content.strip()
# 生成された漢字が正しい形式かチェック
if not KANJI_REGEX.match(kanji):
logger.error(f"生成された漢字が正しくありません: '{kanji}'")
raise ValueError("OpenAIが有効な漢字二文字を返しませんでした。")
# 生成された漢字をログに記録
logger.info(f"OpenAIから生成された漢字二文字: {kanji}")
# 漢字を返す
return kanji
except Exception as e:
# OpenAIによる画像分析中にエラーが発生した場合、ログを記録し処理を停止
logger.error(f"OpenAIによる画像分析中にエラーが発生しました: {e}")
raise
def start_loading_animation(UserID):
"""
LINEのローディングアニメーションを開始する関数です。
"""
try:
# ローディングアニメーションの開始エンドポイントURL
url_loading = "https://api.line.me/v2/bot/chat/loading/start"
# HTTPリクエストのヘッダーを設定
headers_loading = {
"Content-Type": "application/json",
"Authorization": f'Bearer {LINE_CHANNEL_ACCESS_TOKEN}'
}
# ローディングアニメーションのパラメータを設定
payload_loading = {
"chatId": UserID,
"loadingSeconds": 20 # ローディングアニメーションの表示時間(秒)
}
# POSTリクエストを送信してローディングアニメーションを開始
response_loading = requests.post(url_loading, headers=headers_loading, data=json.dumps(payload_loading))
if response_loading.status_code != 202:
# ローディングアニメーションの開始に失敗した場合、ログを記録
logger.error(f"ローディングアニメーションの開始に失敗しました。ステータスコード: {response_loading.status_code}, レスポンス: {response_loading.text}")
else:
# ローディングアニメーションの開始に成功した場合、ログを記録
logger.info("ローディングアニメーションを正常に開始しました。") # 成功時のログを追加
except Exception as e:
# ローディングアニメーションの開始中にエラーが発生した場合、ログを記録
logger.error(f"ローディングアニメーションの開始中にエラーが発生しました: {e}")
# Flaskのルートを設定し、LINEからのWebhookイベントを受け取る
@app.route("/callback", methods=['POST'])
def callback():
# LINEからの署名を取得
signature = request.headers.get('X-Line-Signature', '')
# リクエストボディを取得
body = request.get_data(as_text=True)
logger.info(f"リクエストボディ: {body}")
# 署名の検証とハンドラーの呼び出し
try:
handler.handle(body, signature)
except InvalidSignatureError:
# 署名が無効な場合、エラーログを記録して400エラーを返す
logger.error("無効な署名です。")
abort(400) # 不正なリクエストとして処理
except Exception as e:
# その他のエラーが発生した場合、エラーログを記録して500エラーを返す
logger.error(f"Webhook ハンドリング中にエラーが発生しました: {e}")
abort(500) # サーバーエラーとして処理
# 正常終了を返す
return 'OK'
# テキストメッセージを処理するハンドラー
@handler.add(MessageEvent, message=TextMessage)
def handle_text_message(event):
# ローディングアニメーションの開始
UserID = event.source.user_id
start_loading_animation(UserID)
# 受信したメッセージを取得し、前後の空白を削除
user_message = event.message.text.strip()
logger.info(f"受信メッセージ: {user_message}")
try:
if user_message == "リセット":
# ユーザーからのメッセージが「リセット」の場合の処理
reset_url = "https://storage.googleapis.com/YOUR_HOST_ID.firebasestorage.app/reset"
ref = db.reference('/latest_image_url')
ref.set(reset_url) # Realtime DatabaseにリセットURLを設定
logger.info(f"Realtime DatabaseにリセットURL {reset_url} を書き込みました。")
reply_text = "リセットを受け付けました!"
else:
if len(user_message) <= 3:
# メッセージが3文字以内の場合、内容に関係なくBMP画像を生成・アップロード
bmp_path = create_bmp(user_message)
file_url = upload_to_firebase(bmp_path)
# Firebase Realtime Databaseに最新の画像URLを更新
ref = db.reference('/latest_image_url')
ref.set(file_url)
logger.info(f"Realtime DatabaseにURL {file_url} を書き込みました。")
# 一時ファイルを削除
os.remove(bmp_path)
logger.info(f"一時ファイル {bmp_path} を削除しました。")
# 返信メッセージを設定
reply_text = f"画像が保存されました!\nURL: {file_url}"
else:
# メッセージが4文字以上の場合、OpenAIで漢字二文字を生成・アップロード
kanji = generate_two_kanji(user_message)
bmp_path = create_bmp(kanji)
file_url = upload_to_firebase(bmp_path)
# Firebase Realtime Databaseに最新の画像URLを更新
ref = db.reference('/latest_image_url')
ref.set(file_url)
logger.info(f"Realtime DatabaseにURL {file_url} を書き込みました。")
# 一時ファイルを削除
os.remove(bmp_path)
logger.info(f"一時ファイル {bmp_path} を削除しました。")
# 返信メッセージを設定
reply_text = f"画像が保存されました!\nURL: {file_url}"
except Exception as e:
# 処理中にエラーが発生した場合、エラーログを記録しエラーメッセージを設定
logger.error(f"処理中にエラーが発生しました: {e}")
reply_text = "処理中にエラーが発生しました。もう一度お試しください。"
# LINEに返信
try:
line_bot_api.reply_message(
event.reply_token,
TextSendMessage(text=reply_text)
)
logger.info("LINEに返信メッセージを送信しました。")
except Exception as e:
# 返信中にエラーが発生した場合、エラーログを記録
logger.error(f"LINEへの返信中にエラーが発生しました: {e}")
# VideoMessageを処理するハンドラー
@handler.add(MessageEvent, message=VideoMessage)
def handle_video_message(event):
# 無効な入力です。というメッセージを返信します。
try:
line_bot_api.reply_message(
event.reply_token,
TextSendMessage(text="無効な入力です。") # ユーザーに無効な入力であることを通知
)
logger.info("動画メッセージが送信されたため、無効な入力と返信しました。")
except Exception as e:
# 返信中にエラーが発生した場合、エラーログを記録
logger.error(f"LINEへの返信中にエラーが発生しました: {e}")
# AudioMessageを処理するハンドラー
@handler.add(MessageEvent, message=AudioMessage)
def handle_audio_message(event):
# 無効な入力です。というメッセージを返信します。
try:
line_bot_api.reply_message(
event.reply_token,
TextSendMessage(text="無効な入力です。") # ユーザーに無効な入力であることを通知
)
logger.info("音声メッセージが送信されたため、無効な入力と返信しました。")
except Exception as e:
# 返信中にエラーが発生した場合、エラーログを記録
logger.error(f"LINEへの返信中にエラーが発生しました: {e}")
# LocationMessageを処理するハンドラー
@handler.add(MessageEvent, message=LocationMessage)
def handle_location_message(event):
# 無効な入力です。というメッセージを返信します。
try:
line_bot_api.reply_message(
event.reply_token,
TextSendMessage(text="無効な入力です。") # ユーザーに無効な入力であることを通知
)
logger.info("位置情報メッセージが送信されたため、無効な入力と返信しました。")
except Exception as e:
# 返信中にエラーが発生した場合、エラーログを記録
logger.error(f"LINEへの返信中にエラーが発生しました: {e}")
# StickerMessageを処理するハンドラー
@handler.add(MessageEvent, message=StickerMessage)
def handle_sticker_message(event):
# 無効な入力です。というメッセージを返信します。
try:
line_bot_api.reply_message(
event.reply_token,
TextSendMessage(text="無効な入力です。") # ユーザーに無効な入力であることを通知
)
logger.info("スタンプメッセージが送信されたため、無効な入力と返信しました。")
except Exception as e:
# 返信中にエラーが発生した場合、エラーログを記録
logger.error(f"LINEへの返信中にエラーが発生しました: {e}")
# ImageMessageを処理するハンドラー
@handler.add(MessageEvent, message=ImageMessage)
def handle_image_message(event):
# ローディングアニメーションの開始
UserID = event.source.user_id
start_loading_animation(UserID)
# 受信した画像メッセージのログを記録
logger.info("画像メッセージを受信しました。")
try:
# 画像データを取得
message_content = line_bot_api.get_message_content(event.message.id)
# 一時ファイルを作成して画像データを書き込む
with tempfile.NamedTemporaryFile(delete=False) as tmp_file:
for chunk in message_content.iter_content():
tmp_file.write(chunk)
tmp_file_path = tmp_file.name
# 一時ファイルを開いて画像のバイナリデータを読み込む
with open(tmp_file_path, "rb") as image_file:
image_bytes = image_file.read()
# 一時ファイルを削除
os.remove(tmp_file_path)
logger.info(f"一時ファイル {tmp_file_path} を削除しました。")
# ImgBBに画像をアップロードしてURLを取得
image_url = upload_image_to_imgbb(image_bytes)
# OpenAIで画像を分析して漢字二文字を取得
kanji = analyze_image_with_openai(image_url)
# 漢字からBMPファイルを作成
bmp_path = create_bmp(kanji)
# Firebase Storageにアップロード
file_url = upload_to_firebase(bmp_path)
# Firebase Realtime Databaseに最新の画像URLを更新
ref = db.reference('/latest_image_url')
ref.set(file_url)
logger.info(f"Realtime DatabaseにURL {file_url} を書き込みました。")
# 一時ファイルを削除
os.remove(bmp_path)
logger.info(f"一時ファイル {bmp_path} を削除しました。")
# 成功メッセージを設定
reply_text = f"画像が保存されました!\nURL: {file_url}"
except Exception as e:
# 画像の処理中にエラーが発生した場合、エラーログを記録しエラーメッセージを設定
logger.error(f"画像の処理中にエラーが発生しました: {e}")
reply_text = "画像の処理中にエラーが発生しました。もう一度お試しください。"
# LINEに返信
try:
line_bot_api.reply_message(
event.reply_token,
TextSendMessage(text=reply_text)
)
logger.info("LINEに返信メッセージを送信しました。")
except Exception as e:
# 返信中にエラーが発生した場合、エラーログを記録
logger.error(f"LINEへの返信中にエラーが発生しました: {e}")
# アプリケーションを起動する部分
if __name__ == "__main__":
# ポート番号を環境変数から取得(デフォルトは8080)
port = int(os.getenv("PORT", 8080))
# アプリケーションを起動
app.run(host="0.0.0.0", port=port)
Cloud Runへのデプロイ
Cloud Runのインラインエディタで直接コードを書くことも可能です。
ただ、筆文字画像を生成する際に筆文字のフリーフォントも同じディレクトリに格納する必要があるのですが、コンソール上ではアップロードができませんでした。
そのため、Dockerイメージをビルド・プッシュしてCloud Runにデプロイしています。
手順としては以下の通りです。
①. Docker Desktopのセットアップ
ダウンロードページからDocker Desktopをインストール。
インストールが完了したら立ち上げる。
②. Google Cloudプロジェクトのセットアップ
ダウンロードページからgcloud CLIをインストール。
インストール後に次のコマンドで認証:
gcloud auth login
プロジェクトを設定:
gcloud config set project [YOUR_PROJECT_ID]
③. Dockerfileを作成
Dockerfile
FROM python:3.9-slim
WORKDIR /app
# 必要なファイルを先にコピー
COPY requirements.txt /app/
COPY serviceAccountKey.json /app/
COPY brush_font.ttf /app/
COPY app.py /app/
# 必要なライブラリのインストール
RUN pip install --no-cache-dir -r requirements.txt
# その他のファイルをコピー(必要に応じて)
COPY . /app
# ポートの設定
ENV PORT=8080
CMD ["gunicorn", "--bind", "0.0.0.0:8080", "app:app"]
④. requirements.txtを作成
requirements.txt
Flask
line-bot-sdk
firebase-admin
Pillow>=9.0.0,<10.0.0
gunicorn
openai>=1.0.0
requests
⑤. Cloud Runへのデプロイ
プロジェクトディレクトリへ移動:
cd C:\[YOUR_PROJECT_DIRECTORY]
Dockerイメージのビルド:
docker build -t gcr.io/[YOUR_PROJECT_ID]/line-to-epaper .
Google Container Registryへのプッシュ:
gcloud auth configure-docker
docker push gcr.io/[YOUR_PROJECT_ID]/line-to-epaper
Cloud Runにデプロイ:
gcloud run deploy line-to-epaper --image gcr.io/[YOUR_PROJECT_ID]/line-to-epaper --platform managed --region asia-northeast1 --allow-unauthenticated
⑦. 各種キーを環境変数に設定
Google CloudのコンソールからCloud Run
→サービスを選択→新しいリビジョンの編集とデプロイ
→コンテナ内の変数とシークレット
のタブを選択→'環境変数'に各種キーの名前と値を入力し→デプロイ
クライアント側(Raspberry Pi Pico W側)のディレクトリ構成
RaspberryPiPicoW_Gengo/
├ main.py
└ wifi_config.py
wifi_config.pyには接続する可能性のあるSSIDとパスワードのペアをリストにしています。
WIFI_CONFIG = [
{"ssid": "SSID1", "password": "PASSWORD1"},
{"ssid": "SSID2", "password": "PASSWORD2"},
{"ssid": "SSID3", "password": "PASSWORD3"}
]
main.pyのソースコードはこちら
import network # ネットワーク接続に必要なモジュールをインポートします
import urequests # HTTPリクエストを送信するためのモジュールをインポートします
import time # 時間操作を行うためのモジュールをインポートします
import machine # ハードウェア操作を行うためのモジュールをインポートします
from machine import Pin, SPI # ピン制御とSPI通信のためのクラスをインポートします
import framebuf # 画像データを扱うためのモジュールをインポートします
import gc # ガベージコレクション(メモリ管理)のためのモジュールをインポートします
import socket # ネットワークソケット通信を行うためのモジュールをインポートします
import ssl # SSL/TLSセキュア通信を行うためのモジュールをインポートします
import ujson # JSON形式のデータを扱うためのモジュールをインポートします
EPD_WIDTH = 800 # e-Paperディスプレイの幅を800ピクセルに設定します
EPD_HEIGHT = 480 # e-Paperディスプレイの高さを480ピクセルに設定します
RST_PIN = 12 # リセットピンをGPIO12に設定します
DC_PIN = 8 # データ/コマンド制御ピンをGPIO8に設定します
CS_PIN = 9 # チップセレクトピンをGPIO9に設定します
BUSY_PIN = 13 # ビジーピンをGPIO13に設定します
class EPD_7in5_B:
def __init__(self):
self.reset_pin = Pin(RST_PIN, Pin.OUT) # リセットピンを出力モードで初期化します
self.busy_pin = Pin(BUSY_PIN, Pin.IN, Pin.PULL_UP) # ビジーピンを入力モードで初期化します
self.cs_pin = Pin(CS_PIN, Pin.OUT) # チップセレクトピンを出力モードで初期化します
self.width = EPD_WIDTH # ディスプレイの幅を設定します
self.height = EPD_HEIGHT # ディスプレイの高さを設定します
self.spi = SPI(1) # SPI通信をチャンネル1で初期化します
self.spi.init(baudrate=4000000) # SPIの通信速度を4MHzに設定します
self.dc_pin = Pin(DC_PIN, Pin.OUT) # データ/コマンド制御ピンを出力モードで初期化します
self.buffer_black = bytearray(self.height * self.width // 8) # 黒色用のバッファを作成します
self.buffer_red = bytearray(self.height * self.width // 8) # 赤色用のバッファを作成します
self.imageblack = framebuf.FrameBuffer(self.buffer_black, self.width, self.height, framebuf.MONO_HLSB) # 黒色バッファをフレームバッファとして初期化します
self.imagered = framebuf.FrameBuffer(self.buffer_red, self.width, self.height, framebuf.MONO_HLSB) # 赤色バッファをフレームバッファとして初期化します
self.init() # ディスプレイを初期化します
def digital_write(self, pin, value):
pin.value(value) # 指定したピンに値(0または1)を設定します
def digital_read(self, pin):
return pin.value() # 指定したピンの値を読み取ります
def delay_ms(self, delaytime):
time.sleep(delaytime / 1000.0) # 指定したミリ秒だけ待機します
def spi_writebyte(self, data):
self.spi.write(bytearray(data)) # SPIを通じてデータを送信します
def reset(self):
self.digital_write(self.reset_pin, 1) # リセットピンを高電位に設定します
self.delay_ms(200) # 200ミリ秒待機します
self.digital_write(self.reset_pin, 0) # リセットピンを低電位に設定します
self.delay_ms(2) # 2ミリ秒待機します
self.digital_write(self.reset_pin, 1) # リセットピンを再び高電位に設定します
self.delay_ms(200) # 200ミリ秒待機します
def send_command(self, command):
self.digital_write(self.dc_pin, 0) # コマンドモードに設定します
self.digital_write(self.cs_pin, 0) # チップセレクトをアクティブにします
self.spi_writebyte([command]) # コマンドをSPIで送信します
self.digital_write(self.cs_pin, 1) # チップセレクトを非アクティブにします
def send_data(self, data):
self.digital_write(self.dc_pin, 1) # データモードに設定します
self.digital_write(self.cs_pin, 0) # チップセレクトをアクティブにします
self.spi_writebyte([data]) # データをSPIで送信します
self.digital_write(self.cs_pin, 1) # チップセレクトを非アクティブにします
def send_data1(self, buf):
self.digital_write(self.dc_pin, 1) # データモードに設定します
self.digital_write(self.cs_pin, 0) # チップセレクトをアクティブにします
self.spi.write(buf) # バッファデータをSPIで送信します
self.digital_write(self.cs_pin, 1) # チップセレクトを非アクティブにします
def WaitUntilIdle(self):
print("e-Paper busy") # ディスプレイが忙しいことを表示します
while self.digital_read(self.busy_pin) == 0: # ビジーピンが0の間待機します
self.delay_ms(20) # 20ミリ秒待機します
self.delay_ms(20) # さらに20ミリ秒待機して安定させます
print("e-Paper busy release") # ディスプレイがアイドル状態になったことを表示します
def TurnOnDisplay(self):
self.send_command(0x12) # ディスプレイを更新するコマンドを送信します
self.delay_ms(100) # 100ミリ秒待機します
self.WaitUntilIdle() # ディスプレイがアイドルになるまで待機します
def init(self):
self.reset() # ディスプレイをリセットします
self.send_command(0x06) # ブーストソフトスタート設定コマンドを送信します
self.send_data(0x17) # パラメータを送信します
self.send_data(0x17) # パラメータを送信します
self.send_data(0x28) # パラメータを送信します
self.send_data(0x17) # パラメータを送信します
self.send_command(0x04) # 電源オンコマンドを送信します
self.delay_ms(100) # 100ミリ秒待機します
self.WaitUntilIdle() # ディスプレイがアイドルになるまで待機します
self.send_command(0x00) # パネル設定コマンドを送信します
self.send_data(0x0F) # パネル設定データを送信します
self.send_command(0x61) # 解像度設定コマンドを送信します
self.send_data(0x03) # ソース解像度の高バイトを送信します
self.send_data(0x20) # ソース解像度の低バイトを送信します
self.send_data(0x01) # ゲート解像度の高バイトを送信します
self.send_data(0xE0) # ゲート解像度の低バイトを送信します
self.send_command(0x15) # データ設定コマンドを送信します
self.send_data(0x00) # データ設定データを送信します
self.send_command(0x50) # VCOMとデータ間隔設定コマンドを送信します
self.send_data(0x11) # パラメータを送信します
self.send_data(0x07) # パラメータを送信します
self.send_command(0x60) # TCON設定コマンドを送信します
self.send_data(0x22) # パラメータを送信します
def Clear(self):
self.send_command(0x10) # 黒色のメモリに書き込むコマンドを送信します
for _ in range(0, self.height): # ディスプレイの高さ分繰り返します
line = bytearray([0xFF] * (self.width // 8)) # 1行分のデータを白で埋めます
self.send_data1(line) # データを送信します
self.send_command(0x13) # 赤色のメモリに書き込むコマンドを送信します
for _ in range(0, self.height): # ディスプレイの高さ分繰り返します
line = bytearray([0x00] * (self.width // 8)) # 1行分のデータを黒で埋めます
self.send_data1(line) # データを送信します
self.TurnOnDisplay() # ディスプレイを更新します
def sleep(self):
self.send_command(0x02) # 電源オフコマンドを送信します
self.WaitUntilIdle() # ディスプレイがアイドルになるまで待機します
self.send_command(0x07) # ディープスリープコマンドを送信します
self.send_data(0xa5) # ディープスリープの確認コードを送信します
def display(self):
high = self.height # ディスプレイの高さを取得します
if self.width % 8 == 0: # 幅が8で割り切れるか確認します
wide = self.width // 8 # 割り切れる場合はそのままバイト数を計算します
else:
wide = self.width // 8 + 1 # 割り切れない場合は1バイト追加します
self.send_command(0x10) # 黒色データを書き込むコマンドを送信します
for i in range(0, wide): # 幅分繰り返します
self.send_data1(self.buffer_black[(i * high) : ((i+1) * high)]) # 黒色バッファからデータを送信します
self.send_command(0x13) # 赤色データを書き込むコマンドを送信します
for i in range(0, wide): # 幅分繰り返します
self.send_data1(self.buffer_red[(i * high) : ((i+1) * high)]) # 赤色バッファからデータを送信します
self.TurnOnDisplay() # ディスプレイを更新します
def connect_wifi():
import wifi_config # Wi-Fiの設定を読み込みます
wlan = network.WLAN(network.STA_IF) # Wi-Fiステーションインターフェースを取得します
wlan.active(True) # Wi-Fiを有効にします
led = Pin('LED', Pin.OUT) # LEDを制御するためのピンを設定します
while not wlan.isconnected(): # Wi-Fiに接続されていない間、繰り返します
print('Connecting to network...') # ネットワークに接続中であることを表示します
for config in wifi_config.WIFI_CONFIG: # 設定ファイル内のWi-Fi設定を順に試します
ssid = config["ssid"] # SSIDを取得します
password = config["password"] # パスワードを取得します
print(f'Trying to connect to {ssid}') # 接続を試みるSSIDを表示します
wlan.connect(ssid, password) # 指定したSSIDとパスワードで接続を試みます
start_time = time.time() # 接続開始時間を記録します
while not wlan.isconnected(): # 接続が成功するまで待機します
led.on() # LEDを点灯します
time.sleep(0.5) # 0.5秒待機します
led.off() # LEDを消灯します
time.sleep(0.5) # 0.5秒待機します
if time.time() - start_time > 10: # 10秒以上経過した場合
print(f'Failed to connect to {ssid}') # 接続失敗を表示します
break # 次のSSIDを試します
if wlan.isconnected(): # 接続に成功した場合
break # ループを抜けます
if not wlan.isconnected(): # すべてのSSIDで接続できなかった場合
print('Could not connect to any Wi-Fi network. Retrying...') # 再試行することを表示します
led.off() # LEDを消灯します
print('Network configuration:', wlan.ifconfig()) # ネットワーク設定を表示します
def parse_bmp_header_stream(stream):
header = stream.read(54) # BMPのヘッダー部分(54バイト)を読み込みます
if len(header) < 54: # ヘッダーの長さが54バイト未満の場合
print('Invalid BMP header') # 無効なBMPヘッダーであることを表示します
return None # 関数を終了します
if header[0:2] != b'BM': # ヘッダーの最初の2バイトが'BM'でない場合
print('Not a BMP file') # BMPファイルでないことを表示します
return None # 関数を終了します
size = int.from_bytes(header[2:6], 'little') # ファイルサイズを取得します
data_offset = int.from_bytes(header[10:14], 'little') # 画像データの開始位置を取得します
width = int.from_bytes(header[18:22], 'little') # 画像の幅を取得します
height = int.from_bytes(header[22:26], 'little') # 画像の高さを取得します
bits_per_pixel = int.from_bytes(header[28:30], 'little') # ピクセルあたりのビット数を取得します
compression = int.from_bytes(header[30:34], 'little') # 圧縮方式を取得します
if compression != 0: # 圧縮方式が0(圧縮なし)でない場合
print('Unsupported BMP compression') # サポートされていない圧縮方式であることを表示します
return None # 関数を終了します
palette_size = data_offset - 54 # パレットのサイズを計算します
palette = stream.read(palette_size) # パレットデータを読み込みます
return width, height, bits_per_pixel, data_offset, palette # 解析結果を返します
def stream_latest_image_url(epd):
host = 'YOUR_HOST_ID-default-rtdb.firebaseio.com' # Firebaseのホスト名を設定します
path = '/latest_image_url.json' # データベース内のパスを設定します
addr_info = socket.getaddrinfo(host, 443) # ホスト名からアドレス情報を取得します
addr = addr_info[0][-1] # 最初のアドレス情報を選択します
s = socket.socket() # ソケットを作成します
s.connect(addr) # サーバーに接続します
s = ssl.wrap_socket(s) # SSL/TLSでソケットをラップします
request = 'GET {} HTTP/1.1\r\nHost: {}\r\nAccept: text/event-stream\r\n\r\n'.format(path, host) # HTTPリクエストを作成します
s.write(request.encode()) # リクエストを送信します
buffer = b"" # 受信データを格納するバッファを初期化します
while True:
try:
data = s.readline() # サーバーから1行のデータを読み込みます
if not data: # データがない場合
break # ループを抜けます
buffer += data # バッファにデータを追加します
if b'\n\n' in buffer: # バッファ内にイベントの終わりがある場合
events = buffer.split(b'\n\n') # イベントごとに分割します
for event_data in events[:-1]: # 最後の未完のイベントを除いて繰り返します
lines = event_data.split(b'\n') # イベントデータを行ごとに分割します
event_type = None # イベントの種類を初期化します
event_payload = None # イベントのデータを初期化します
for line in lines: # 各行を処理します
if line.startswith(b'event:'): # 'event:'で始まる行の場合
event_type = line[len(b'event:'):].strip() # イベントの種類を取得します
elif line.startswith(b'data:'): # 'data:'で始まる行の場合
event_payload = line[len(b'data:'):].strip() # イベントのデータを取得します
if event_type == b'put' and event_payload: # イベントの種類が'put'でデータが存在する場合
payload = ujson.loads(event_payload) # JSONデータを解析します
image_url = payload.get('data') # 新しい画像のURLを取得します
if image_url: # 画像URLが存在する場合
print('New image URL:', image_url) # 新しい画像URLを表示します
if image_url == "https://storage.googleapis.com/YOUR_HOST_ID.firebasestorage.app/reset": # リセットURLの場合
epd.Clear() # ディスプレイをクリアします
#epd.delay_ms(2000) # 2秒待機します
#print("sleep") # スリープモードに入ることを表示します
#epd.sleep() # ディスプレイをスリープモードにします
else:
#epd.init() # ディスプレイを初期化します
epd.Clear() # ディスプレイをクリアします
download_and_process_image(image_url, epd) # 画像をダウンロードして処理します
buffer = events[-1] # 未処理のデータをバッファに残します
except Exception as e:
print('Exception in stream_latest_image_url:', e) # エラーが発生した場合は表示します
break # ループを抜けます
s.close() # ソケットを閉じます
def load_bmp_from_stream(epd, stream):
try:
width = epd.width # ディスプレイの幅を取得します
height = epd.height # ディスプレイの高さを取得します
row_size = ((width + 7) // 8 + 3) & ~3 # 各行のバイト数を計算します(4の倍数にパディング)
flip_vertical = True # 画像を上下反転させるかどうかを設定します
for y in range(height): # 画像の高さ分繰り返します
row = stream.read(row_size) # 各行のデータを読み込みます
if not row: # データが読み込めなかった場合
print('Failed to read row data at row', y) # エラーメッセージを表示します
return False # 関数を終了します
for x in range(width): # 画像の幅分繰り返します
byte_index = x // 8 # 現在のピクセルが含まれるバイトのインデックスを計算します
bit_index = 7 - (x % 8) # 現在のピクセルのビット位置を計算します
pixel = (row[byte_index] >> bit_index) & 0x1 # ピクセルの値を取得します(0または1)
y_pos = height - 1 - y if flip_vertical else y # ピクセルのY座標を計算します(上下反転する場合)
if pixel:
epd.imageblack.pixel(x, y_pos, 1) # ピクセルが1の場合、黒色を設定します(反転済み)
else:
epd.imageblack.pixel(x, y_pos, 0) # ピクセルが0の場合、白色を設定します(反転済み)
return True # 正常に読み込めたことを返します
except Exception as e:
print("Failed to load BMP:", e) # エラーが発生した場合は表示します
return False # 関数を終了します
def download_and_process_image(url, epd):
try:
response = urequests.get(url, stream=True) # 画像をストリームでダウンロードします
if response.status_code == 200: # ダウンロードが成功した場合
raw = response.raw # 生のデータストリームを取得します
header_info = parse_bmp_header_stream(raw) # BMPヘッダーを解析します
if header_info is None: # ヘッダー解析に失敗した場合
print('Failed to parse BMP header') # エラーメッセージを表示します
return # 関数を終了します
width, height, bits_per_pixel, data_offset, palette = header_info # ヘッダー情報を取得します
if width != epd.width or abs(height) != epd.height: # 画像サイズがディスプレイと一致しない場合
print(f"Image size {width}x{height} does not match display size {epd.width}x{epd.height}") # エラーメッセージを表示します
return # 関数を終了します
bytes_read = 54 + len(palette) # 読み込んだバイト数を計算します
bytes_to_skip = data_offset - bytes_read # 追加でスキップするバイト数を計算します
if bytes_to_skip > 0: # スキップするバイトがある場合
raw.read(bytes_to_skip) # 指定バイト数をスキップします
epd.imageblack.fill(0xFF) # 黒色バッファを白で埋めます
epd.imagered.fill(0x00) # 赤色バッファを黒で埋めます
if load_bmp_from_stream(epd, raw): # BMPデータをストリームから読み込みます
epd.display() # 画像をディスプレイに表示します
print('Image displayed on e-Paper.') # 表示完了を通知します
else:
print('Failed to load BMP image.') # 画像読み込み失敗を通知します
else:
print('Failed to download image:', response.status_code) # ダウンロード失敗を通知します
except Exception as e:
print('Exception in download_and_process_image:', e) # エラーが発生した場合は表示します
def main():
connect_wifi() # Wi-Fiに接続します
epd = EPD_7in5_B() # e-Paperディスプレイのインスタンスを作成します
while True: # 無限ループを開始します
print('Starting stream to listen for updates...') # 更新を待機することを表示します
stream_latest_image_url(epd) # 画像URLのストリーミングを開始します
print('Stream ended. Reconnecting in 5 seconds...') # ストリームが終了したことを表示します
time.sleep(5) # 5秒待機して再接続します
if __name__ == '__main__':
main() # スクリプトが直接実行された場合にメイン関数を呼び出します
ケース(額縁)への組み込み
額縁は100円ショップのSeriaでポストカードサイズのマット紙つきのものを購入しました。
e-Paperのサイズに合わせてマット紙をカットしました。
額縁の中心にe-Paperが配置されるように貼り付けます。
額縁の裏蓋を閉めたらHATに乗ったラズパイPicoWとモバイルバッテリーを貼り付けて接続します。
モバイルバッテリーは低電流モード対応のものを使用する必要があります。
ラズパイPicoWの消費電流が低いので通常のモバイルバッテリーだと保護機能が働いて電源の供給ができません。
そんなことはつゆ知らず、モバイルバッテリーを買い直すことになってしまいました。
今回はオウルテックのものを使用しました。
デモ
e-Paperの表示に時間がかかるのは仕様です。それもロマンと思えなくもないですね。
おまけ
赤ちゃんが生まれたら命名書を書く方も多いのではないでしょうか。
3文字までの好きな文字を表示できるようにしているので命名書としても使えます!
さいごに
自分がやりたいモノづくりはやっぱりハードウェアが伴うものだと実感させられました。
今後はモーターやソレノイドなどを使ってもっと動きのある製品を開発していきたいです!
また平成レトロネタ投下していきます!
IoTLT Advent Calendar 2024 明日は@take12012さんの記事です!
お楽しみに✌