学校で組み込みの授業があり、Raspberry Piを使用してシステム開発をするという課題をグループで取り組みました。今回は、そこで作成したもの、実際に学び、苦労した部分について紹介していこうと思います。
Raspberry Piで動作するデジタル砂時計
こちらが私たちのグループが作成したデジタル砂時計です。動画のとおり、傾きによって砂(LEDドット)が落下し、砂時計の砂が落下する様子が描かれています。
Raspberry Pi上でPythonを実行することで処理がスタートします。
加えて、砂が落下し終わるとブザーが鳴り、LINEに通知が行きます。ブザーはボタンを押して止められます。
機能一覧
- 加速度センサ
MPU9250
からの角度取得機能 - 傾きに応じて玉の位置を描写する演算機能
- CLIへの玉・ステータス描写機能
- LEDマトリックス
MAX7219
2台への玉描写機能 - 落下し終わった際の通知機能
- ブザー(単音耳コピの『ビビデバ 星街すいせい』)が流れる
- ブレッドボードにあるボタンで停止可能
-
LINE BOT
、Google Apps Script
経由でLINEに通知を行う
技術スタック
-
Python
- RPI.GPIO(シンプルなGPIO制御)
- luma関連(LED表示)
- curses(CLI上にドットを表示)
-
Google Apps Script
- 落下終了時にLINEへメッセージを送信するためのCallback関数(送信だけならPythonだけでも行けたらしい)
- LINE BOT
- React + Vite
- 事前に玉の位置を描写する処理をHTMLのCanvasで再現
その他
- 友達がレーザーカッターを使用してLED枠に合わせた台座を作成
ソースコード
ただし、main
ブランチは全ての端末(CLI)で動作するものとなっています。raspberrypi
ブランチを取得し、GPIOの端子をつなげることで本来のデバイスで動作します。
アイデアの詳細
一応授業なので「従来の問題点・解決策」などを書く必要があったので、こちらでもざっくり記述します。
従来の問題点(砂時計)
- 砂時計は美しい&時間を測れるという強みがあるが、落ちきっても通知などが鳴らず気づかないリスクがある
- だからといってスマホのタイマーを使用するのは全く趣がない
解決策
- 砂が落ちきったときに通知するSomethingを導入
- 砂の描写はLEDで実施し趣を再現
- 加速度センサーを用いて通常の砂と同じ落下を再現
まるで砂時計をディスってるみたいな言い方ですが、そうでもしないと新システムは作れないのです!今の当たり前、常識に待てをかける、それがエンジニアです。
元ネタ
ここまでで我々が考えたみたいになっていますが、実は全く同じことを既にしている人(デバイス)は存在し、これを見て、じゃあ自分たちも作ってみようとなったのが本来の動機づけとして正しいです。
実際のコード
加速度センサから回転角度を求める
mpu_events.py
# VCCをRaspberry Piの3.3Vに接続
# GNDをRaspberry PiのGNDに接続
# SDAをRaspberry PiのGPIO2 (SDA)に接続
# SCLをRaspberry PiのGPIO3 (SCL)に接続
import smbus
import time
import math
# MPU9250のI2Cアドレス
MPU9250_ADDRESS = 0x68
# レジスタアドレス
ACCEL_XOUT_H = 0x3B
ACCEL_XOUT_L = 0x3C
ACCEL_YOUT_H = 0x3D
ACCEL_YOUT_L = 0x3E
ACCEL_ZOUT_H = 0x3F
ACCEL_ZOUT_L = 0x40
# I2Cバスの初期化
bus = smbus.SMBus(1)
def read_word(sensor_address, reg_address):
high = bus.read_byte_data(sensor_address, reg_address)
low = bus.read_byte_data(sensor_address, reg_address + 1)
value = (high << 8) + low
if value >= 0x8000:
value = -((65535 - value) + 1)
return value
def read_accel_data():
accel_x = read_word(MPU9250_ADDRESS, ACCEL_XOUT_H)
accel_y = read_word(MPU9250_ADDRESS, ACCEL_YOUT_H)
accel_z = read_word(MPU9250_ADDRESS, ACCEL_ZOUT_H)
return accel_x, accel_y, accel_z
# 傾きの計算
def calculate_tilt(accel_x, accel_y, accel_z):
roll = math.atan2(accel_y, accel_z) * 180 / math.pi
pitch = math.atan2(-accel_x, math.sqrt(accel_y**2 + accel_z**2)) * 180 / math.pi
return round(roll), round(pitch)
def get_mpu_angle():
accel_x, accel_y, accel_z = read_accel_data()
return calculate_tilt(accel_x, accel_y, accel_z)
この部分はChatGPTに任せたため、私はあまり理解していないです。ですが動いているので使用しています。
実際に回転を取得する際は、get_mpu_angle()
をフレームごとに呼び出して回転角度を取得しています。
LEDディスプレイにデータを表示
draw.py
CLIの表示も同時に実施しているため、記述量は多いです
# 1. **VCC** - 5V(ピン2または4)
# 2. **GND** - GND(ピン6、9、14、20、25、30、34、39のいずれか)
# 3. **DIN** - MOSI(ピン19)
# 4. **CS** - CE0(ピン24)
# 5. **CLK** - SCLK(ピン23)import math
from luma.core.interface.serial import spi, noop
from luma.core.render import canvas
from luma.led_matrix.device import max7219
import math
from config import GRID_SIZE, FRAMERATE, INIT_ANGLE, balls, is_finish_falling
_grid = [["◯" for _ in range(GRID_SIZE)] for _ in range(GRID_SIZE)]
_logs = []
# 初期設定
serial = spi(port=0, device=0, gpio=noop())
device = max7219(serial, cascaded=2, block_orientation=90, rotate=0)
# 内部で使用する変数
pre_angle = INIT_ANGLE
_count_angle_diff_frame = 1 / FRAMERATE
def draw_grid(balls, index: int):
# balls配列に基づいてドットを配置
for ball in balls:
x, y = ball["x"], ball["y"]
if 0 <= x < GRID_SIZE and 0 <= y < GRID_SIZE:
_grid[y + index * GRID_SIZE][x] = "●"
def draw_routine(stdscr, frame_count: int, ball_count: int, angle: int, curses):
global _grid, _logs, pre_angle, _count_angle_diff_frame
# 空のグリッドを作成
_grid = [["◯" for _ in range(GRID_SIZE)] for _ in range(GRID_SIZE * 2)]
draw_grid(balls[0], 0)
draw_grid(balls[1], 1)
stdscr.clear()
# 1行目のメッセージ
row0_message = f"frame: {frame_count:> 5} ball: {ball_count:> 3} angle: {angle:> 4}° GRID_SIZE: {GRID_SIZE:< 3}"
stdscr.addstr(0, 0, row0_message)
# 2行目のメッセージ
if pre_angle != angle or _count_angle_diff_frame < 1 / FRAMERATE:
if pre_angle != angle:
pre_angle = angle
_count_angle_diff_frame = 0
stdscr.addstr(1, 4, "[")
stdscr.addstr("keypress", curses.color_pair(1))
stdscr.addstr(f"] angleを{angle:> 4}° に変更")
_count_angle_diff_frame += 1
# グリッドを描写
for index, row in enumerate(_grid):
row_text = (" " * GRID_SIZE if index >= GRID_SIZE else "") + " ".join(row)
stdscr.addstr(index + 2, 2, row_text)
with canvas(device) as draw:
for index, ball_items in enumerate(balls):
for ball in ball_items:
x = ball["x"]
y = ball["y"]
if index == 1:
x += GRID_SIZE
draw.point((x, y), fill="white") # 点灯するドットを描画
# これは数字を表示するための行
row_text = ("").join([f"{i:> 2}" for i in range(GRID_SIZE)])[1:]
stdscr.addstr(
GRID_SIZE * 2 + 2, 0 + 2, f"{row_text} {row_text}", curses.color_pair(1)
)
# これは数字を表示するための列
for index in range(GRID_SIZE * 2):
stdscr.addstr(
index + 2, GRID_SIZE * 4 + 1, f"{index:> 2}", curses.color_pair(1)
)
# 角度に応じて星を描写
is_positive_sine = math.sin((angle * math.pi) / 180) >= 0
is_positive_cosine = math.cos((angle * math.pi) / 180) >= 0
stdscr.addstr(
(GRID_SIZE * 2 + 2 if is_positive_cosine else 1),
(GRID_SIZE * 4 + 2 if is_positive_sine else 0),
"☆",
)
# アラームを表示
if is_finish_falling[0]:
stdscr.addstr(1, GRID_SIZE * 4 + 4, "Alerm!")
stdscr.addstr(GRID_SIZE * 2 + 3, 0, "[")
stdscr.addstr("a", curses.color_pair(1))
stdscr.addstr("]で回転, [")
stdscr.addstr("q", curses.color_pair(1))
stdscr.addstr("]または`control + c`で終了")
for index, log in enumerate(_logs):
stdscr.addstr(GRID_SIZE * 2 + 5 + index, 0, log)
stdscr.refresh()
def print_log(log: str):
global _logs
_logs.append(log)
config.py
(共通変数を格納するファイル)にballs
という変数があり、その変数に現在のボール情報が格納されています。その情報を元に、draw_routine()
をフレームごとに実行することで2デバイスのLEDマトリックスが動作します。
with canvas(device) as draw:
for index, ball_items in enumerate(balls):
for ball in ball_items:
x = ball["x"]
y = ball["y"]
if index == 1:
x += GRID_SIZE
draw.point((x, y), fill="white") # 点灯するドットを描画
for文で砂1つ一つをdraw.point
で座標を示すことで実際に描写されます。今回2つ連結していますが、16×8
として扱われているので、2デバイス目の描写はx += GRID_SIZE
としています。
『ビビデバ 星街すいせい』を流す
bibideba.py
import RPi.GPIO as GPIO
import time
import threading
# 設定
BUZZER_PIN = 27 # GPIO 27(PIN 13)
GPIO.setmode(GPIO.BCM)
GPIO.setup(BUZZER_PIN, GPIO.OUT, initial=GPIO.LOW)
buzzer = GPIO.PWM(BUZZER_PIN, 1)
stop_flag = threading.Event()
def play_sound(tone: int, devide: float, pause: int):
if stop_flag.is_set():
return
buzzer.ChangeFrequency(tone)
time.sleep(devide * 0.127)
if pause > 0:
buzzer.stop()
time.sleep(pause)
# ここから音程を刻み込む
buzzer.start(0)
def play_bibideba_chorus_common():
play_sound(430, 1, 0) # BI
play_sound(480, 1, 0) # BBI
play_sound(430, 2, 0) # DI
play_sound(430, 1, 0) # BO
play_sound(480, 1, 0) # BBI
play_sound(430, 2, 0) # DI
play_sound(480, 3, 0) # BOO
play_sound(320, 3, 0) # WA
play_sound(430, 1, 0) # BI
play_sound(480, 1, 0) # BBI
play_sound(430, 1, 0) # DI
play_sound(480, 1, 0) # BO
play_sound(430, 1, 0) # BBI
play_sound(430, 1, 0) # DI
play_sound(480, 2, 0) # BOO
play_sound(320, 4, 0) # BA
play_sound(462, 2, 0) # YEAH
play_sound(430, 2, 0) #
play_sound(382, 2, 0) #
def play_song():
global buzzer
while not stop_flag.is_set():
play_bibideba_chorus_common() # 繰り返し部分
play_sound(320, 2, 0) # 混(こん)
play_sound(382, 1, 0) # 絡(が)
play_sound(430, 1, 0) # (ら)
play_sound(480, 2, 0) # がっ
play_sound(320, 1, 0) # て
play_sound(285, 1, 0) # も
play_sound(320, 2, 0) # 仕様(しよう)
play_sound(480, 3 / 3, 0) # が
play_sound(462, 1 / 3, 0) #
play_sound(480, 4 / 3, 0) # ない
play_sound(320, 1, 0) # ガ
play_sound(382, 1, 0) # ラ
play_sound(430, 1, 0) # ス
play_sound(480, 2, 0) # シュー
play_sound(320, 1, 0) # ズ
play_sound(285, 1, 0) # で
play_sound(320, 2, 0) # お
play_sound(480, 3 / 3, 0) # とっ
play_sound(462, 1 / 3, 0) #
play_sound(480, 8, 0) # ない
play_bibideba_chorus_common() # 繰り返し部分
play_sound(320, 2, 0) # こん
play_sound(382, 1, 0) # や
play_sound(430, 1, 0) # に
play_sound(480, 2, 0) # あす
play_sound(320, 1, 0) # な
play_sound(285, 1, 0) # ど
play_sound(320, 2, 0) # な
play_sound(620, 1 / 3, 0) # ーい
play_sound(660, 8 / 3, 0) # ーい
play_sound(462, 1, 0) # な
play_sound(430, 1, 0) # ら
play_sound(382, 1, 0) # ば
play_sound(430, 2, 0) # 自由
play_sound(382, 1, 0) # に
play_sound(430, 2, 0) # 踊っ
play_sound(382, 1, 0) # た
play_sound(430, 1, 0) # も
play_sound(382, 1, 0) # ん
play_sound(430, 1, 0) # が
play_sound(480, 3, 0) # ち
play_sound(285, 1, 0) # で
play_sound(320, 3, 0) # しょ
def start_playing():
buzzer.start(50)
stop_flag.clear()
song_thread = threading.Thread(target=play_song)
song_thread.start()
def stop_playing():
stop_flag.set()
buzzer.start(0)
def cleanup():
GPIO.cleanup()
# モジュールとして利用できるようにエクスポート
__all__ = ["start_playing", "stop_playing", "cleanup"]
# スクリプトとして実行された場合
if __name__ == "__main__":
try:
start_playing()
except KeyboardInterrupt:
print("プログラムが正常に終了されました")
finally:
cleanup()
ビビデバブーム?ということで、音程は一つ一つ手打ちでブザーで鳴らすプログラムを作成しました。Hzを設定することで正しい音程で音を鳴らすことができます。
start_playing
で曲の開始、stop_playing
で曲の終了ができます。音程部分はもっと定数化したほうが良かったかもしれません。
ボタンで停止
import RPi.GPIO as GPIO
import time
# GPIOモジュールの設定
# GPIO.setmode(GPIO.BCM) # または GPIO.BOARD
BUTTON = 25 # 使用するGPIOピン番号
# GPIOピンの設定
GPIO.setup(BUTTON, GPIO.IN, pull_up_down=GPIO.PUD_DOWN)
# メインループ
def button_thread(stop_event, bibideba):
try:
while not stop_event.is_set():
time.sleep(0.1)
btn = GPIO.input(BUTTON)
if btn == 1:
bibideba.stop_playing()
except KeyboardInterrupt:
print("Exiting...")
ボタンの処理が一番シンプルで分かりやすいです。ただし、マルチスレッド(並行処理)で無限ループしているので、while not stop_event.is_set():
の記述がないとうまく動作しなかったです。
ボールの演算
ソースコードは長いので省略します。ボールの演算だけでなく、ボールが落下し終えたときに音楽を流したり、LINEに送信するためのコールバック関数を叩いたりしています。
LINE BOTに落下終了を送信
// LINEBotのチャネルアクセストークン
const LINE_BOT_TOKEN = `LINE_BOT_TOKEN`;
// LINE APIの基本となるURL
const LINE_BASE_URL = "https://api.line.me/v2/bot";
// LINEBotの認証ヘッダー
const HEADER = {
"Content-Type": "application/json; charset=UTF-8",
Authorization: "Bearer " + LINE_BOT_TOKEN,
};
const USER_ID = `LINEのユーザID`;
function doGet() {
var now = new Date();
var formattedDateTime = Utilities.formatDate(
now,
Session.getScriptTimeZone(),
"yyyy-MM-dd HH:mm:ss"
);
var message =
"砂時計のタイマーが終了しました\n" + "現在の日時: " + formattedDateTime;
sendLineMessage(USRE_ID, message);
}
function sendLineMessage(userId, message) {
var url = LINE_BASE_URL + "/message/push";
var postData = {
to: userId,
messages: [
{
type: "text",
text: message,
},
],
};
var options = {
method: "post",
headers: HEADER,
payload: JSON.stringify(postData),
};
UrlFetchApp.fetch(url, options);
}
Google Apps Scriptというやつですね。doGet
がこのプロジェクトのgetメソッドとなっており、デプロイ時に生成されたURLを叩くことでこの処理が実行されます。
学習・苦戦したポイント
ここからはこのシステム開発をして学習・苦戦したポイントを話そうと思います。
WiFiに接続できない
早速トラブルが発生したのですが、先生の言われようがままにraspberry piをアップデートしました。
sudo apt update
そしたら授業の資料とは異なるWiFi設定となり、どこの班もWiFiに接続できない
というトラブルが発生しました。
なお、配られたRaspberry PiのOSはCLIで、よくあるスマホのWiFi設定のように簡単に接続できるものではありません。
有線LANもありましたが、学校特有の認証があり、発表当日までネットに繋がらないというトラブルが発生していました。最後には先生がテザリングして対応するなども。
GUIに書き換えることで解決
あまりドキュメントもない中で、GUIのOSを入れることで難なくWiFiに接続できました。あと単純に黒い画面に抵抗ある人にとっては救世主だったかもしれません。
端子逆に繋いで煙が出た
Raspberry Piは上から1,2,…の順で端子が並んでいると思います。ただ、私はRaspberry Piを逆にした状態40,39,…のように使っていたため、その結果繋いでいたデバイスから煙が出ました。センサー壊れました。
Raspberry Piなのでせいぜい5V程度ですが、本来5V,GNDのところを通信用端子に、通信用端子のところに5Vを入れたせいで、トランジスタの関係でどんどん電圧が増幅された。と、友達が推測してました。5Vでも平気で壊れるので、皆様お気をつけください。
raspberrypiはpipに制限がある
結論venv
を構築することで解決しますが、最初見たとき絶望していました。何にもできひんやん、と。今では当たり前のようにvenv
使用するようになったため、ここは非常に勉強になりました。
授業での勇者になった
他の班が常に苦戦してる中で、我々の班は優秀な人がいたおかげで、カメラの起動、ネット接続、センサ接続をどの班よりも早く実行し、先生に頼られる存在にまでなりました。
これについては、好奇心の多さ、実行回数の多さが功を奏したのでしょう。
LINE BOTの仕組みを理解した
他の班がこぞってLINEの送信機能(Notify使用)を実装してたので、悔しくて私も急遽LINEで砂落下し終わったときに通知行くように実装しました。
先輩の講習会で一度LINE BOTをいじったことがあり、当初は全く理解できませんでしたが、今は全然理解してしまっている、我ながらエレガントゥ
気合の入れようがすごい
今回のLEDマトリックスや加速度センサは、たまたまArduino電子工作キットを2人持ってたことで実現できました。その他、秋葉原の秋月電子通商に赴いたり、気合の入れようが凄まじかったです。
終わりに
IoTデバイスなども元は小さいデバイスの組み合わせで成り立っている形なので、本当にアイデア次第で何でも作れるんだと実感しました。学校の授業で1,2位を争うくらいの満足度でした。
昔からおもちゃなどを分解して基盤などがどうなっているか確認してた歴史があったので、かなり没頭できました。