はじめに
Raspberry Shake 4D (https://raspberryshake.org) というIoT地震計を2017年から自宅床下で稼働させています(これまでに2回SDカードを故障させましたw)。
それまでも、Arduio+6軸加速度センサーを使った地震計もどきを作っていました(こっちは町工場などで設置するのが目的でしたが)が、学術的なお墨付きのあるやつがあるなら、その方が確実ということでの導入でした。
うちの地震計で検出した地震データはRaspberry Shakeコミュニティに送信され、世界中の地震ネットワークに記録されています。
大きな地震があっては困るのですが、遠くの地震でも自宅の地震計で波形検出することがあります。その際に自宅の震度は幾つかというのがわからない時があります。
同じ地域なのに「いや、あれは震度1じゃないだろ」と思うことも多々あり、せっかく地震計があるのだから自力で震度計算できないかとずっと思ってきました。
Raspberry Shakeは検出データをUDPで流す機能を持っていますので、UDP パケットを受けて、気象庁の計測震度アルゴリズム: https://www.jma.go.jp/jma/kishou/know/jishin/kyoshin/kaisetsu/calc_sindo.html をリアルタイムに走らせ、TUI と Web ブラウザの両方に表示するシステムを作りました。
GitHub: https://github.com/Masakai/earthQuake
このダッシュボードはあくまで実験的に実装したものです。気象庁の実データでの計算アルゴリズムの検証を予定していますが、表示されている震度については正式なものではなく目安であることをご承知おきください。
全体構成
jma_intensity_realtime.py がコア。TUI と Web はどちらもそこから SharedState, recv_loop_fn, compute_loop, AlertSpeaker を import して使います。計算ロジックは一箇所だけです。
TUI ダッシュボード
rich ライブラリで作ったターミナルUI です。SSH越しでも動きます。最近GUIではなくターミナルでCLIがマイブームなので、最初にTUIを作りました。こっちはVoiceVoxを使ってアラームを喋ってくれます。
Web ダッシュボード
でも、TUIで確認できてしまうと表示をもっとリッチにしたくなります。で、ブラウザ版も追加しました。
FastAPI + WebSocket + Leaflet.js で構成したブラウザ版です。震源をクリックするとその周辺にズームし、市区町村単位で震度が色分け表示されます。
こっちもアラーム音声はありますが、ブラウザからは聞こえません。
気象庁計測震度の実装
JMA フィルタ
気象庁の計測震度は、3成分(Z/N/E)の加速度に周波数フィルタをかけた後、0.3秒以上継続する最大値を拾って次式で計算します。
I = 2 × log₁₀(a) + 0.94
フィルタの伝達関数は FL(低域)・FH(高域)・FF(補正)の積です。
def jma_frequency_response(f):
f = np.asarray(f, dtype=float)
# 低域フィルタ
FL = np.sqrt(1.0 - np.exp(-(f / 0.5) ** 3.0))
# 高域フィルタ(12次多項式近似)
y = f / 10.0
poly = (1.0 + 0.694 * y**2 + 0.241 * y**4 + 0.0557 * y**6 +
0.009664 * y**8 + 0.00134 * y**10 + 0.000155 * y**12)
FH = poly ** -0.5
# 補正項(f=0 は 0)
FF = np.where(f > 0.0, f ** -0.5, 0.0)
H = FL * FH * FF
H[~np.isfinite(H)] = 0.0
return H
FFT で周波数域に変換して適用し、irfft で戻します。
def apply_jma_filter_time(acc, fs):
n = len(acc)
spec = np.fft.rfft(acc)
f = np.fft.rfftfreq(n, d=1.0 / fs)
H = jma_frequency_response(f)
return np.fft.irfft(spec * H, n=n)
0.3秒閾値
「0.3秒以上継続する最大値」は np.partition で O(n) に取ります。毎フレーム呼んでも重くなりません。
def a_threshold_for_03s(vec_abs, fs):
k = int(round(0.3 * fs))
k = max(1, min(k, len(vec_abs)))
idx = len(vec_abs) - k
part = np.partition(vec_abs, idx)
return float(part[idx])
震度確定のタイミング問題
STA/LTA トリガが発火した瞬間に震度を記録すると、計算窓(デフォルト90秒)にまだ静穏期間のデータが残っていて値が低くなります。pending_event パターンで窓長経過後に確定します。
if triggered:
pending_event = (t_now, datetime.now().strftime("%H:%M:%S"))
if pending_event is not None:
trig_time, trig_ts = pending_event
if t_now - trig_time >= args.rt_window:
shared.add_event(trig_ts, I_final, scale)
alert.speak(scale, I_final)
pending_event = None
3スレッド構成
スレッドと asyncio の橋渡しは SharedState.snapshot() が担当します。ロックを取ってコピーを返すだけなのでデッドロックがありません。
def snapshot(self):
with self._lock:
return {
"I_final": self.I_final,
"scale": self.scale,
"events": list(self.events),
"p2p_quakes": list(self.p2p_quakes),
# ...
}
Web 版の実装ポイント
バックグラウンドタブ問題
タブをバックグラウンドに移すと requestAnimationFrame が止まるのに WebSocket は受信し続けます。復帰時に溜まったメッセージが一気に処理されて画面が高速スクロールする問題がありました。
visibilitychange で WebSocket を切断・再接続するのがシンプルな解決策です。
document.addEventListener('visibilitychange', () => {
if (document.visibilityState === 'hidden') {
disconnect();
} else {
connect();
}
});
さらに、WebSocket が複数メッセージを一瞬で受け取っても DOM 更新は 1 フレームに 1 回に抑えます。
let _pendingData = null;
let _dashPending = false;
function updateDashboard(data) {
_pendingData = data;
if (_dashPending) return;
_dashPending = true;
requestAnimationFrame(() => {
const d = _pendingData;
_dashPending = false;
// DOM 更新
});
}
市区町村単位の震度色分け
P2P地震情報(code=551)の points 配列に各地点の住所と震度が入っています。
{
"points": [
{ "pref": "岩手県", "addr": "宮古市", "scale": 30 }
]
}
全国一括 GeoJSON は 8.5MB あるので、都道府県別ファイル(75〜760KB)を必要な分だけ並列 fetch してキャッシュします。
const CITY_BASE = 'https://raw.githubusercontent.com/smartnews-smri/japan-topography/main/data/municipality/geojson/s0010/';
async function fetchPrefGeoJSON(prefName) {
if (_cityGeoCache[prefName]) return _cityGeoCache[prefName];
const code = PREF_CODE[prefName];
if (!code) return null;
const res = await fetch(`${CITY_BASE}${code}.json`);
const data = await res.json();
_cityGeoCache[prefName] = data;
return data;
}
GeoJSON の N03_004(市区町村名)と points[].addr を照合するため、住所文字列から市区町村名を正規表現で抽出します。
function extractCity(addr) {
const m = addr.match(/^(.+?[市区町村郡])/);
return m ? m[1] : addr;
}
地図の差分チェック
震源リストが変わっていなければ毎秒再描画しません。マーカーが再生成されると開いているポップアップが閉じてしまうため。
const key = (quakes || []).map(q => q.id || q.time || '').join(',');
if (key === _mapQuakeKey) return;
_mapQuakeKey = key;
震源マーカー(M比例サイズの×印)
L.divIcon でインライン SVG を生成し、M × 2.8(min 8px / max 22px)の×印にします。
function crossIcon(color, size) {
const s = size || 20;
const svg = `<svg width="${s}" height="${s}" viewBox="0 0 ${s} ${s}"
xmlns="http://www.w3.org/2000/svg">
<line x1="0" y1="0" x2="${s}" y2="${s}"
stroke="${color}" stroke-width="2.5" stroke-linecap="round"/>
<line x1="${s}" y1="0" x2="0" y2="${s}"
stroke="${color}" stroke-width="2.5" stroke-linecap="round"/>
</svg>`;
return L.divIcon({
html: svg, className: '',
iconSize: [s, s], iconAnchor: [s/2, s/2], popupAnchor: [0, -s/2]
});
}
震源までの直線距離
navigator.geolocation でブラウザの位置を取得し、ハバーサイン公式で震源との直線距離を表示します。ローカル運用前提なので許可ダイアログを出して取得します。
function haversineKm(lat1, lng1, lat2, lng2) {
const R = 6371;
const dLat = (lat2 - lat1) * Math.PI / 180;
const dLng = (lng2 - lng1) * Math.PI / 180;
const a = Math.sin(dLat/2)**2 +
Math.cos(lat1 * Math.PI/180) * Math.cos(lat2 * Math.PI/180) *
Math.sin(dLng/2)**2;
return R * 2 * Math.atan2(Math.sqrt(a), Math.sqrt(1-a));
}
位置情報の取得は非同期なので、取得完了後に表示中のテーブルと地図を再描画します。
navigator.geolocation.getCurrentPosition(pos => {
_userLat = pos.coords.latitude;
_userLng = pos.coords.longitude;
if (_latestP2pQuakes.length > 0) {
updateP2PTable(_latestP2pQuakes);
_mapQuakeKey = ''; // 強制再描画
updateMap(_latestP2pQuakes);
}
});
トリガ履歴と P2P 地震の時刻照合
トリガ履歴の行をクリックしたとき、±10分以内の P2P 地震を自動で照合して地図をズームします。トリガ時刻は "HH:MM:SS"(ローカル時刻)、P2P は "YYYY-MM-DD HH:MM"(JST)。どちらも JST なので分単位で比較できます。
function _findMatchingQuake(trigTs) {
const trigMin = _trigTsToMin(trigTs);
let best = null, bestDiff = Infinity;
for (const q of _latestP2pQuakes) {
const qMin = _quakeTsToMin(q.time);
if (isNaN(qMin)) continue;
const diff = Math.abs(trigMin - qMin);
if (diff <= 10 && diff < bestDiff) {
best = q;
bestDiff = diff;
}
}
return best;
}
シミュレーターで実データなしに開発
テストするからと言って、都合よく地震は来ないので simulate_udp.py を作りました。目標震度を指定すると、それに対応した合成正弦波を UDP パケットとして送出します。
I = 2×log₁₀(a) + 0.94 → a = 10^((I - 0.94) / 2)
たとえば震度3.0なら a ≈ 0.050 m/s²。これをフィルタ特性の逆算に組み込んで目標周波数(デフォルト 2Hz)の振幅を決め、StationXML の感度値を逆に掛けて counts 値に変換してから送ります。
# 震度3.0の合成波を60秒間送出(静穏20秒→揺れ→静穏)
python simulate_udp.py --intensity 3.0 --duration 60 --quiet-sec 20
あくまでもシミュレートするためのデータですので、そのうち気象庁の実測データをもとに計算が正しいかどうかを確認しようと思っています。
起動方法
pip install numpy obspy rich websocket-client fastapi uvicorn
# TUI
python jma_intensity_tui.py --station R38DC
# Web(TUI と同時起動する場合は --bind で別 UDP ポートを指定)
python jma_intensity_web.py --station R38DC --web-port 8080
TUI と Web を同一マシンで同時に動かす場合は、Raspberry Shake 側の DATACAST 送信先を2エントリ登録してそれぞれ別の UDP ポートを割り当てます。
まとめ
| 課題 | 対策 |
|---|---|
| JMAフィルタの精度 | FFT適用 + verify_filter.py で5項目テスト |
| 震度確定タイミング |
pending_event パターンで rt-window 秒後に確定 |
| バックグラウンドタブの高速スクロール |
visibilitychange で WS 切断・再接続 + rAF デバウンス |
| 地図の毎秒再描画 | ID リスト差分チェック、変化なしはスキップ |
| GeoJSON の重さ | 都道府県別ファイルを必要な分だけ fetch してキャッシュ |
| 実機なし開発 | 合成波 UDP シミュレーター |
コアの計算ライブラリを TUI / Web の両方から import する構成にしたことで、「動作確認はターミナル、遠隔モニタリングはブラウザ」を同じロジックで実現できました。
株式会社リバーランズ・コンサルティング 坂井正徳 : https://riverruns-consulting.com



