目的
ブラウン運動の動画からアボガドロ数を自動で解析するアプリをPythonのCustomTkinterを使って作ります。
原理
ブラウン運動
ブラウン運動する粒子の平均二乗変位$<r^2>$は、拡散係数$D$と時間$t$を用いて次のように表されます。
$$
<r^2> = <x^2 + y^2> = 4 Dt
$$
ここで、$D$は気体定数$R$、温度$T$、アボガドロ定数$N_A$、媒質の粘性係数$\eta$、粒子の半径$a$を用いて次の様に表されます。この式をアインシュタイン・ストークスの関係式と言います。
$$
D=\frac{RT}{N_A}\frac{1}{6 \pi \eta a}
$$
アインシュタインは1905年にアインシュタイン・ストークスの関係式を発表し、ブラウン運動という粒子のマクロな運動からアボガドロ定数を算出できることを理論的に示し、これを1908年にペランが実験的に確かめました。ペランの実験は、ブラウン運動がアボガドロ数ほどの膨大な数の微粒子の運動によって引き起こされる現象であることの実証であり、今日では、これが人類最初の分子の存在を照明した実験とされています。
pythonによるブラウン運動の解析
ペランがブラウン運動の実験をしていた20世紀の初めは、μmスケールの粒子の平均二乗変位を測定するのは大変な作業だったかと思います。しかし、現代では顕微鏡でブラウン運動を動画撮影し、その軌跡を辿ればいいだけです。その軌跡の検出もpythonの物体検出を用いて仕舞えば自動化できます。
本プログラムではガウシアン差分(Diffrence of Gaussian: DoG)の前処理を行い動画中のブラウン運動する粒子を検出します。ガウシアン差分は、画像に対して二つの異なる大きさのカーネルによるガウシアンぼかしをかけてそれらの差をとる処理を行います。小さいカーネルのガウシアンぼかしでカーネルサイズに近い大きさの構造物を際立たせ、大きなカーネルのガウシアンぼかしで背景や照明ムラなどをフィルターします。ガウシアン差分で得られた画像を閾値で2値化し物体を検出します。
動画の$n$枚面のフレームでの粒子の位置を$r(n)$とすると、$<r^2>$は画面内の全ての粒子の$r(n)^2-r(0)^2$の平均です。フレームレートから各フレームでの経過時間$t$がわかるので拡散定数$D$は次のように求まります。
$$
D = \frac{<r^2>}{4t}
$$
あとは、気体定数$R$、温度$T$、媒質の粘性係数$\eta$、粒子の半径$a$を使ってアインシュタイン・ストークスの関係式からアボガドロ数を算出できます。
$$
N_A=\frac{RT}{D}\frac{1}{6 \pi \eta a}
$$
実際にやってみたところアボガドロ数がおよそ$4 \times10^{23}$になりました。正確な値が$6.02 \times10^{23}$なので、まずまずの清土かと思います。
手順
ブラウン運動の動画撮影
ブラウン運動をさせる粒子として簡単なのは牛乳だと思います。牛乳には乳脂肪球と呼ばれるコロイドが分散しています。これの大きさは平均で4 μm程度の大きさと言われています。今回は$a=2$ μmとして扱います。
1)牛乳を水で薄める。
2)スライドガラスに一滴落とす。
3)カバーガラスを乗せる。
4)お好みの顕微鏡で動画撮影。
5)動画にはスケールバーを入れておく。
こんな感じにブラウン運動が見えるかと思います。不規則に動く球状の粒が牛乳の乳脂肪球です。左下の白い棒は10 μmのスケールバーです。
アプリによる解析
アプリを用いてブラウン運動の動画からアボガドロ数を解析する手順を示します。アプリの内容については後述します。
2)スケールバーの長さを測定。
3)スケールバーの長さ、温度$T$、媒質の粘性係数$\eta$、粒子の半径$a$を入力。
4)粒子検出のパラメタを設定。
5)計測開始ボタンを押す。

計測開始ボタンを押すと最初のフレームで検出された粒子に緑の丸と番号が付けられます。

解析が終わると軌跡を黄色で示した画像と計算結果が表示され保存されます。

アプリ
アプリの内容を抜粋して解説します。作成はChatGPTを用いて行っています。
ソースコード全文はこちら
import os
import threading
from dataclasses import dataclass
from tkinter import filedialog, messagebox
import numpy as np
import cv2
import customtkinter as ctk
from PIL import Image, ImageTk
# -------------------------
# Constants
# -------------------------
R_GAS = 8.314462618 # J/(mol*K)
MIN_TRACKED_PARTICLES = 5
# 線形フィットに使う点数(= 時間範囲の長さ)
# 600フレーム/30fpsなら、400点≈13秒分
FIT_FRAMES = 400
@dataclass
class DetectParams:
blur: int # DoGの小さいぼかし(粒子スケール)
circularity_min: float
area_min: float
area_max: float
max_track_dist_px: float
bg_blur: int # DoGの大きいぼかし(背景スケール)
dog_thr: float # DoG後の閾値(0~1)
@dataclass
class VideoInfo:
path: str
cap: cv2.VideoCapture
w: int
h: int
fps: float
total: int
def bgr_to_photo_letterbox(bgr, out_w, out_h):
"""Resize with aspect ratio and letterbox into fixed out_w/out_h. Returns PhotoImage and mapping params."""
h, w = bgr.shape[:2]
scale = min(out_w / w, out_h / h)
nw, nh = max(1, int(w * scale)), max(1, int(h * scale))
resized = cv2.resize(bgr, (nw, nh), interpolation=cv2.INTER_AREA)
canvas = np.zeros((out_h, out_w, 3), dtype=resized.dtype)
x0 = (out_w - nw) // 2
y0 = (out_h - nh) // 2
canvas[y0:y0 + nh, x0:x0 + nw] = resized
rgb = cv2.cvtColor(canvas, cv2.COLOR_BGR2RGB)
pil = Image.fromarray(rgb)
return ImageTk.PhotoImage(pil), scale, x0, y0
def _contours_to_particles(bin_img, params: DetectParams):
"""Return accepted particle info: list of dict: {'cx','cy','area','radius_px','circ'}"""
contours, _ = cv2.findContours(bin_img, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
out = []
for c in contours:
area = float(cv2.contourArea(c))
if area < params.area_min or area > params.area_max:
continue
peri = float(cv2.arcLength(c, True))
if peri <= 0:
continue
circ = float(4.0 * np.pi * area / (peri * peri))
if circ < params.circularity_min:
continue
m = cv2.moments(c)
if m["m00"] == 0:
continue
cx = float(m["m10"] / m["m00"])
cy = float(m["m01"] / m["m00"])
radius_px = float(np.sqrt(area / np.pi)) # equivalent circle radius (参考表示用)
out.append({"cx": cx, "cy": cy, "area": area, "radius_px": radius_px, "circ": circ})
return out
def detect_particles_dog(bgr, params: DetectParams):
"""DoG (small blur - large blur) + threshold. ※マスク機能は削除済み"""
gray = cv2.cvtColor(bgr, cv2.COLOR_BGR2GRAY).astype(np.float32)
small = int(params.blur)
if small < 0:
small = 0
if small % 2 == 0:
small += 1
if small < 3:
small = 3
large = int(params.bg_blur)
if large % 2 == 0:
large += 1
if large <= small:
large = small + 2
if large < 9:
large = 9
g1 = cv2.GaussianBlur(gray, (small, small), 0)
g2 = cv2.GaussianBlur(gray, (large, large), 0)
dog = g1 - g2
dog = dog - dog.min()
dog = dog / (dog.max() + 1e-6)
thr = float(params.dog_thr)
thr = max(0.0, min(1.0, thr))
bin_img = (dog > thr).astype(np.uint8) * 255
# 小さなゴミを消す(任意)
bin_img = cv2.morphologyEx(bin_img, cv2.MORPH_OPEN, np.ones((3, 3), np.uint8), iterations=1)
return _contours_to_particles(bin_img, params)
def track_nearest(prev_pts, curr_pts, max_dist):
"""Nearest-neighbor assignment. prev_pts list[(x,y)]/None -> new list same length."""
if len(curr_pts) == 0:
return [None for _ in prev_pts]
curr = np.array(curr_pts, dtype=np.float32)
used = np.zeros(len(curr_pts), dtype=bool)
new_pts = []
for p in prev_pts:
if p is None:
new_pts.append(None)
continue
px, py = p
d2 = (curr[:, 0] - px) ** 2 + (curr[:, 1] - py) ** 2
d2[used] = np.inf
j = int(np.argmin(d2))
if not np.isfinite(d2[j]):
new_pts.append(None)
continue
if np.sqrt(d2[j]) <= max_dist:
used[j] = True
new_pts.append((float(curr[j, 0]), float(curr[j, 1])))
else:
new_pts.append(None)
return new_pts
def draw_trajectories_on_frame(frame_bgr, det0, tracks, show_ids=True):
"""
frame_bgr: 最終フレーム(BGR)
det0: 初期検出 list of dict (cx,cy,radius_px)
tracks: list of list of (x,y) or None
"""
out = frame_bgr.copy()
for i, tr in enumerate(tracks):
# tr には None が混ざるので、連続区間ごとに polyline を描く
segment = []
segments = []
for p in tr:
if p is None:
if len(segment) >= 2:
segments.append(segment)
segment = []
else:
segment.append((int(round(p[0])), int(round(p[1]))))
if len(segment) >= 2:
segments.append(segment)
for seg in segments:
cv2.polylines(out, [np.array(seg, dtype=np.int32)], False, (0, 255, 255), 1, cv2.LINE_AA)
# 開始点(最初に見つかった点)と終点(最後に見つかった点)
p0 = next((p for p in tr if p is not None), None)
p1 = next((p for p in reversed(tr) if p is not None), None)
if p0 is not None:
cv2.circle(out, (int(round(p0[0])), int(round(p0[1]))), 3, (0, 255, 0), -1)
if p1 is not None:
cv2.circle(out, (int(round(p1[0])), int(round(p1[1]))), 3, (0, 0, 255), -1)
if show_ids:
cv2.putText(out, str(i),
(int(round(p1[0])) + 4, int(round(p1[1])) + 4),
cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 200, 255), 1, cv2.LINE_AA)
# 初期検出円(薄い緑)※参考表示
for d in det0:
cx, cy = int(round(d["cx"])), int(round(d["cy"]))
rr = int(max(2, round(d["radius_px"])))
cv2.circle(out, (cx, cy), rr, (0, 120, 0), 1)
return out
class BrownianNaApp(ctk.CTk):
def __init__(self):
super().__init__()
self.title("Brownian Motion → MSD → D → Avogadro (DoG / Scroll / 軌跡表示)")
self.geometry("1360x860")
ctk.set_appearance_mode("System")
ctk.set_default_color_theme("blue")
# preview fixed size
self.prev_w = 600
self.prev_h = 400
self.vinfo: VideoInfo | None = None
self.last_bgr = None
self._photo = None
# frame slider throttle
self.frame_job = None
self.pending_frame = 0
self.throttle_ms = 40
# scale bar x-range
self.scale_start = 0
self.scale_end = 0
self._build_ui()
self._enable_controls(False)
# ---------------- UI ----------------
def _build_ui(self):
top = ctk.CTkFrame(self)
top.pack(fill="x", padx=12, pady=12)
self.btn_pick = ctk.CTkButton(top, text="動画ファイル選択", command=self.pick_video)
self.btn_pick.pack(side="left", padx=(10, 8), pady=10)
self.lbl_path = ctk.CTkLabel(top, text="未選択", anchor="w")
self.lbl_path.pack(side="left", fill="x", expand=True, padx=8)
main = ctk.CTkFrame(self)
main.pack(fill="both", expand=True, padx=12, pady=(0, 12))
# left: preview + sliders
left = ctk.CTkFrame(main)
left.pack(side="left", fill="both", expand=True, padx=12, pady=12)
prev_frame = ctk.CTkFrame(left, width=self.prev_w + 40, height=self.prev_h + 40)
prev_frame.pack(pady=(0, 10))
prev_frame.pack_propagate(False)
self.preview_label = ctk.CTkLabel(prev_frame, text="動画を選択してください", width=self.prev_w, height=self.prev_h)
self.preview_label.place(relx=0.5, rely=0.5, anchor="center")
# n slider
ctk.CTkLabel(left, text="nフレーム目(スケールバーが写っているフレーム)").pack(anchor="w")
self.slider_n = ctk.CTkSlider(left, from_=0, to=1, command=self.on_frame_drag)
self.slider_n.pack(fill="x", pady=(6, 0))
self.slider_n.bind("<ButtonRelease-1>", self.on_frame_release)
self.lbl_n = ctk.CTkLabel(left, text="Frame: - / -", anchor="w")
self.lbl_n.pack(fill="x", pady=(4, 12))
# scale sliders separated clearly
ctk.CTkLabel(left, text="スケールバー位置(横)").pack(anchor="w")
row_s = ctk.CTkFrame(left)
row_s.pack(fill="x", pady=(6, 0))
ctk.CTkLabel(row_s, text="start").pack(side="left", padx=(0, 8))
self.slider_start = ctk.CTkSlider(row_s, from_=0, to=1, command=lambda v: self.on_scale_change())
self.slider_start.pack(side="left", fill="x", expand=True)
row_e = ctk.CTkFrame(left)
row_e.pack(fill="x", pady=(6, 0))
ctk.CTkLabel(row_e, text="end").pack(side="left", padx=(0, 18))
self.slider_end = ctk.CTkSlider(row_e, from_=0, to=1, command=lambda v: self.on_scale_change())
self.slider_end.pack(side="left", fill="x", expand=True)
self.lbl_scale = ctk.CTkLabel(left, text="start=- px / end=- px (len=- px)", anchor="w")
self.lbl_scale.pack(fill="x", pady=(6, 0))
# right: settings + run + results (Scrollable!)
right = ctk.CTkScrollableFrame(main, width=520)
right.pack(side="left", fill="y", padx=(0, 12), pady=12)
self._bind_mousewheel(right)
# scale length
sec_scale = ctk.CTkFrame(right)
sec_scale.pack(fill="x", padx=12, pady=(12, 8))
ctk.CTkLabel(sec_scale, text="スケールバー長さ(実空間)").pack(anchor="w")
self.entry_scale_len = ctk.CTkEntry(sec_scale, placeholder_text="例: 10 (μm)")
self.entry_scale_len.pack(fill="x", pady=(6, 0))
self.entry_scale_len.insert(0, "10")
ctk.CTkLabel(sec_scale, text="※単位は μm として計算します", anchor="w").pack(fill="x", pady=(4, 0))
# Na inputs (T, eta, a manual)
sec_na = ctk.CTkFrame(right)
sec_na.pack(fill="x", padx=12, pady=(0, 8))
ctk.CTkLabel(sec_na, text="Na計算パラメータ(aは入力)").pack(anchor="w")
rowT = ctk.CTkFrame(sec_na)
rowT.pack(fill="x", pady=(6, 0))
ctk.CTkLabel(rowT, text="T [K]").pack(side="left", padx=(0, 6))
self.entry_T = ctk.CTkEntry(rowT, placeholder_text="例: 298.15")
self.entry_T.pack(side="left", fill="x", expand=True)
self.entry_T.insert(0, "298.15")
rowEta = ctk.CTkFrame(sec_na)
rowEta.pack(fill="x", pady=(6, 0))
ctk.CTkLabel(rowEta, text="η [Pa·s]").pack(side="left", padx=(0, 6))
self.entry_eta = ctk.CTkEntry(rowEta, placeholder_text="水: 0.001")
self.entry_eta.pack(side="left", fill="x", expand=True)
self.entry_eta.insert(0, "0.001")
rowA = ctk.CTkFrame(sec_na)
rowA.pack(fill="x", pady=(6, 0))
ctk.CTkLabel(rowA, text="a [m]").pack(side="left", padx=(0, 6))
self.entry_a = ctk.CTkEntry(rowA, placeholder_text="例: 5e-7 (半径)")
self.entry_a.pack(side="left", fill="x", expand=True)
self.entry_a.insert(0, "5e-7")
# reference info (optional)
self.lbl_a_ref = ctk.CTkLabel(sec_na, text="※aは“半径”をmで入力(例: 1 μm → 1e-6)", anchor="w")
self.lbl_a_ref.pack(fill="x", pady=(6, 10))
# detection params UI (DoG)
sec_det = ctk.CTkFrame(right)
sec_det.pack(fill="x", padx=12, pady=(0, 8))
ctk.CTkLabel(sec_det, text="検出パラメータ(DoG方式)").pack(anchor="w")
self.ent_blur = self._labeled_entry(sec_det, "DETECT_BLUR (small)", "5")
self.ent_bg_blur = self._labeled_entry(sec_det, "BG_BLUR (large)", "31")
self.ent_dog_thr = self._labeled_entry(sec_det, "DOG_THR (0-1)", "0.60")
self.ent_circ = self._labeled_entry(sec_det, "CIRCULARITY_MIN", "0.50")
self.ent_area_min = self._labeled_entry(sec_det, "AREA_MIN", "8")
self.ent_area_max = self._labeled_entry(sec_det, "AREA_MAX", "800")
self.ent_track = self._labeled_entry(sec_det, "MAX_TRACK_DIST_PX", "25")
# run + progress
self.btn_run = ctk.CTkButton(right, text="計測開始", command=self.start_measurement)
self.btn_run.pack(fill="x", padx=12, pady=(10, 8))
self.progress = ctk.CTkProgressBar(right)
self.progress.pack(fill="x", padx=12, pady=(0, 10))
self.progress.set(0)
self.lbl_status = ctk.CTkLabel(right, text="Ready", anchor="w")
self.lbl_status.pack(fill="x", padx=12)
# results
sec_res = ctk.CTkFrame(right)
sec_res.pack(fill="x", padx=12, pady=(10, 12))
ctk.CTkLabel(sec_res, text="結果 / 検出ログ").pack(anchor="w")
self.txt_res = ctk.CTkTextbox(sec_res, height=320)
self.txt_res.pack(fill="x", pady=(6, 0))
self._write_results("まだ計測していません。")
def _bind_mousewheel(self, scrollable: ctk.CTkScrollableFrame):
try:
canvas = scrollable._parent_canvas
except Exception:
return
def _on_mousewheel(event):
delta = int(-1 * (event.delta / 120))
canvas.yview_scroll(delta, "units")
scrollable.bind("<Enter>", lambda e: self.bind_all("<MouseWheel>", _on_mousewheel))
scrollable.bind("<Leave>", lambda e: self.unbind_all("<MouseWheel>"))
def _labeled_entry(self, parent, label, default):
row = ctk.CTkFrame(parent)
row.pack(fill="x", pady=(6, 0))
ctk.CTkLabel(row, text=label).pack(side="left", padx=(0, 8))
ent = ctk.CTkEntry(row)
ent.pack(side="left", fill="x", expand=True)
ent.insert(0, default)
return ent
def _write_results(self, s: str):
self.txt_res.configure(state="normal")
self.txt_res.delete("1.0", "end")
self.txt_res.insert("1.0", s)
self.txt_res.configure(state="disabled")
def _enable_controls(self, enabled: bool):
state = "normal" if enabled else "disabled"
for w in [self.slider_n, self.slider_start, self.slider_end, self.btn_run]:
try:
w.configure(state=state)
except Exception:
pass
# ---------------- video ----------------
def pick_video(self):
path = filedialog.askopenfilename(
title="動画を選択",
filetypes=[("Video", "*.mp4 *.mov *.mkv *.avi *.webm"), ("All", "*.*")]
)
if not path:
return
if self.vinfo is not None:
try:
self.vinfo.cap.release()
except Exception:
pass
cap = cv2.VideoCapture(path)
if not cap.isOpened():
messagebox.showerror("Error", "動画を開けませんでした。")
return
w = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
h = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
fps = float(cap.get(cv2.CAP_PROP_FPS)) or 30.0
total = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
self.vinfo = VideoInfo(path=path, cap=cap, w=w, h=h, fps=fps, total=total)
self.lbl_path.configure(text=os.path.basename(path))
maxf = max(0, total - 1)
self.slider_n.configure(from_=0, to=maxf)
self.slider_n.set(0)
self.pending_frame = 0
self.lbl_n.configure(text=f"Frame: 0 / {maxf}")
self.slider_start.configure(from_=0, to=max(0, w - 2))
self.slider_end.configure(from_=1, to=max(1, w - 1))
self.slider_start.set(0)
self.slider_end.set(max(1, w - 1))
self.on_scale_change()
frame = self._read_frame(0)
if frame is None:
messagebox.showerror("Error", "フレームを読み込めませんでした。")
return
self.last_bgr = frame
self._update_preview()
self._enable_controls(True)
def _read_frame(self, n: int):
if self.vinfo is None:
return None
self.vinfo.cap.set(cv2.CAP_PROP_POS_FRAMES, int(n))
ok, bgr = self.vinfo.cap.read()
if not ok:
return None
return bgr
# ---------------- frame slider (throttled) ----------------
def on_frame_drag(self, value):
if self.vinfo is None:
return
n = int(round(float(value)))
n = max(0, min(n, self.vinfo.total - 1))
self.slider_n.set(n)
self.pending_frame = n
self.lbl_n.configure(text=f"Frame: {n} / {self.vinfo.total - 1}")
if self.frame_job is None:
self.frame_job = self.after(self.throttle_ms, self._apply_pending_frame)
def on_frame_release(self, event):
if self.vinfo is None:
return
if self.frame_job is not None:
try:
self.after_cancel(self.frame_job)
except Exception:
pass
self.frame_job = None
self.pending_frame = int(round(float(self.slider_n.get())))
self._apply_pending_frame(force=True)
def _apply_pending_frame(self, force: bool = False):
self.frame_job = None
frame = self._read_frame(self.pending_frame)
if frame is None:
return
self.last_bgr = frame
self._update_preview()
# ---------------- scale sliders ----------------
def on_scale_change(self):
if self.vinfo is None:
return
s = int(self.slider_start.get())
e = int(self.slider_end.get())
if e <= s:
e = s + 1
self.slider_end.set(e)
self.scale_start, self.scale_end = s, e
self.lbl_scale.configure(text=f"start={s} px / end={e} px (len={e - s} px)")
self._update_preview()
def _update_preview(self, overlay_bgr=None):
if self.vinfo is None:
return
bgr = overlay_bgr.copy() if overlay_bgr is not None else (self.last_bgr.copy() if self.last_bgr is not None else None)
if bgr is None:
return
s, e = int(self.scale_start), int(self.scale_end)
y = int(self.vinfo.h * 0.92)
cv2.line(bgr, (s, y), (e, y), (0, 255, 255), 3)
self._photo, *_ = bgr_to_photo_letterbox(bgr, self.prev_w, self.prev_h)
self.preview_label.configure(image=self._photo, text="")
# ---------------- parameter read ----------------
def _get_params(self) -> DetectParams:
def get_int(ent, default):
try:
return int(float(ent.get().strip()))
except Exception:
return default
def get_float(ent, default):
try:
return float(ent.get().strip())
except Exception:
return default
blur = get_int(self.ent_blur, 5)
bg_blur = get_int(self.ent_bg_blur, 31)
dog_thr = get_float(self.ent_dog_thr, 0.60)
circ = get_float(self.ent_circ, 0.50)
area_min = get_float(self.ent_area_min, 8.0)
area_max = get_float(self.ent_area_max, 800.0)
maxd = get_float(self.ent_track, 25.0)
blur = max(0, blur)
bg_blur = max(0, bg_blur)
if bg_blur % 2 == 0:
bg_blur += 1
if blur % 2 == 0:
blur += 1
circ = max(0.0, min(circ, 1.0))
dog_thr = max(0.0, min(dog_thr, 1.0))
area_min = max(1.0, area_min)
area_max = max(area_min + 1.0, area_max)
maxd = max(1.0, maxd)
return DetectParams(
blur=blur,
circularity_min=circ,
area_min=area_min,
area_max=area_max,
max_track_dist_px=maxd,
bg_blur=bg_blur,
dog_thr=dog_thr,
)
# ---------------- measurement ----------------
def start_measurement(self):
if self.vinfo is None:
return
try:
scale_um = float(self.entry_scale_len.get().strip())
if scale_um <= 0:
raise ValueError
except Exception:
messagebox.showwarning("入力", "スケールバー長さ(μm)を正しく入力してください。")
return
try:
T = float(self.entry_T.get().strip())
eta = float(self.entry_eta.get().strip())
a = float(self.entry_a.get().strip()) # 半径[m]
if T <= 0 or eta <= 0 or a <= 0:
raise ValueError
except Exception:
messagebox.showwarning("入力", "T, η, a(m, 半径)を正しく入力してください。")
return
px_len = max(1, int(self.scale_end - self.scale_start))
um_per_px = scale_um / px_len
m_per_px = um_per_px * 1e-6
params = self._get_params()
self.btn_run.configure(state="disabled")
self.progress.set(0)
self.lbl_status.configure(text="Measuring...")
self._write_results("計測中...")
t = threading.Thread(
target=self._measure_worker,
args=(m_per_px, T, eta, a, params),
daemon=True
)
t.start()
def _measure_worker(self, m_per_px: float, T: float, eta: float, a: float, params: DetectParams):
try:
cap = self.vinfo.cap
total = self.vinfo.total
fps = self.vinfo.fps
cap.set(cv2.CAP_PROP_POS_FRAMES, 0)
ok, frame0 = cap.read()
if not ok:
self._ui_error("フレーム0の読み込みに失敗しました。")
return
det0 = detect_particles_dog(frame0, params)
if len(det0) < MIN_TRACKED_PARTICLES:
self._ui_error(
f"粒子が十分に検出できませんでした(採用 {len(det0)} 個)。\n"
"ヒント:\n"
"- DOG_THR を下げる(例 0.60→0.55)\n"
"- AREA_MIN を下げる(例 8→5)\n"
"- CIRCULARITY_MIN を下げる(例 0.50→0.35)\n"
"- BG_BLUR を上げる(例 31→51)\n"
)
return
# 0フレーム検出オーバーレイ
overlay0 = frame0.copy()
for i, d in enumerate(det0):
cx, cy = int(round(d["cx"])), int(round(d["cy"]))
rr = int(max(2, round(d["radius_px"])))
cv2.circle(overlay0, (cx, cy), rr, (0, 255, 0), 2)
cv2.putText(overlay0, str(i), (cx + rr + 2, cy + rr + 2),
cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 255, 0), 1, cv2.LINE_AA)
self.after(0, lambda img=overlay0: self._update_preview(overlay_bgr=img))
init_pts = [(d["cx"], d["cy"]) for d in det0]
tracked = list(init_pts)
init = list(init_pts)
# 軌跡保存
tracks = [[] for _ in range(len(init_pts))]
for i, p in enumerate(init_pts):
tracks[i].append(p)
times = []
msd = []
last_good_frame = frame0
last_fn = 0
for fn in range(1, total):
cap.set(cv2.CAP_PROP_POS_FRAMES, fn)
ok, fr = cap.read()
if not ok:
break
det = detect_particles_dog(fr, params)
curr_pts = [(d["cx"], d["cy"]) for d in det]
tracked = track_nearest(tracked, curr_pts, max_dist=params.max_track_dist_px)
for i, p in enumerate(tracked):
tracks[i].append(p)
r2_list = []
for p0, p in zip(init, tracked):
if p is None:
continue
dx = (p[0] - p0[0]) * m_per_px
dy = (p[1] - p0[1]) * m_per_px
r2_list.append(dx * dx + dy * dy)
if len(r2_list) >= MIN_TRACKED_PARTICLES:
times.append(fn / fps)
msd.append(float(np.mean(r2_list)))
if fn % 10 == 0:
prog = min(0.95, fn / max(1, total - 1))
self.after(0, lambda vv=prog: self.progress.set(vv))
last_good_frame = fr
last_fn = fn
if len(msd) < 5:
self._ui_error("有効な追跡点が少なすぎて ⟨r²⟩ を評価できませんでした。")
return
times = np.array(times, dtype=np.float64)
msd = np.array(msd, dtype=np.float64)
nfit = min(FIT_FRAMES, len(times))
tfit = times[:nfit]
yfit = msd[:nfit]
A = np.vstack([tfit, np.ones_like(tfit)]).T
s, b = np.linalg.lstsq(A, yfit, rcond=None)[0]
D = s / 4.0
if D <= 0:
self._ui_error("推定された D が 0 以下です。検出/追跡条件を見直してください。")
return
Na = (R_GAS * T) / (6.0 * np.pi * eta * a * D)
# 最終フレームに軌跡を描画して保存&表示
traj_img = draw_trajectories_on_frame(last_good_frame, det0, tracks, show_ids=True)
base = os.path.splitext(os.path.basename(self.vinfo.path))[0]
out_png = os.path.join(os.path.dirname(self.vinfo.path), f"{base}_trajectories_frame{last_fn}.png")
cv2.imwrite(out_png, traj_img)
self.after(0, lambda img=traj_img: self._update_preview(overlay_bgr=img))
res = (
"=== 検出結果(0フレーム)===\n"
f"採用粒子数: {len(det0)}\n"
f"スケール換算: {m_per_px:.3e} m/px\n\n"
"=== MSD / 拡散係数 ===\n"
"⟨r²⟩(2D): r² = dx² + dy²\n"
f"fit点数: {nfit}\n"
f"傾き s = {s:.3e} (m²/s)\n"
f"D = {D:.3e} m²/s\n\n"
"=== アボガドロ数 ===\n"
"N_A = RT / (6π η a D)\n"
f"T = {T} K\n"
f"η = {eta} Pa·s\n"
f"a(入力)= {a:.3e} m\n"
f"N_A = {Na:.3e} 1/mol\n\n"
"=== 軌跡表示 ===\n"
f"最終フレーム番号: {last_fn}\n"
f"軌跡画像保存: {out_png}\n\n"
"=== 使用パラメータ(DoG)===\n"
f"DETECT_BLUR(small) = {params.blur}\n"
f"BG_BLUR(large) = {params.bg_blur}\n"
f"DOG_THR = {params.dog_thr}\n"
f"CIRCULARITY_MIN = {params.circularity_min}\n"
f"AREA_MIN / AREA_MAX = {params.area_min} / {params.area_max}\n"
f"MAX_TRACK_DIST_PX = {params.max_track_dist_px}\n"
)
self.after(0, lambda: self._finish_ok(res))
except Exception as e:
self._ui_error(str(e))
def _finish_ok(self, res: str):
self.progress.set(1.0)
self.lbl_status.configure(text="Done")
self._write_results(res)
self.btn_run.configure(state="normal")
def _ui_error(self, msg: str):
self.after(0, lambda: messagebox.showerror("Error", msg))
self.after(0, lambda: self.lbl_status.configure(text="Error"))
self.after(0, lambda: self.btn_run.configure(state="normal"))
self.after(0, lambda: self.progress.set(0))
if __name__ == "__main__":
app = BrownianNaApp()
app.mainloop()
粒子の検出
以下の関数でガウシアン差分および2値化を行っています。
def detect_particles_dog(bgr, params: DetectParams):
"""DoG (small blur - large blur) + threshold. """
gray = cv2.cvtColor(bgr, cv2.COLOR_BGR2GRAY).astype(np.float32)
small = int(params.blur)
if small < 0:
small = 0
if small % 2 == 0:
small += 1
if small < 3:
small = 3
large = int(params.bg_blur)
if large % 2 == 0:
large += 1
if large <= small:
large = small + 2
if large < 9:
large = 9
g1 = cv2.GaussianBlur(gray, (small, small), 0)
g2 = cv2.GaussianBlur(gray, (large, large), 0)
dog = g1 - g2
dog = dog - dog.min()
dog = dog / (dog.max() + 1e-6)
thr = float(params.dog_thr)
thr = max(0.0, min(1.0, thr))
bin_img = (dog > thr).astype(np.uint8) * 255
# 小さなゴミを消す(任意)
bin_img = cv2.morphologyEx(bin_img, cv2.MORPH_OPEN, np.ones((3, 3), np.uint8), iterations=1)
return _contours_to_particles(bin_img, params)
OpenCVのcv2.GaussianBlur()がガウシアンぼかしを行う関数です。小さいカーネルのg1、大きいカーネルのg2から差分のdogを計算します。
g1 = cv2.GaussianBlur(gray, (small, small), 0)
g2 = cv2.GaussianBlur(gray, (large, large), 0)
dog = g1 - g2
dogに対して閾値thrとの大小関係の正誤判定を行い2値化しています。
bin_img = (dog > thr).astype(np.uint8) * 255
そして次の関数で球形の検出と位置の計測をしています。球形から大きく外れるものなどは棄却しています。
def _contours_to_particles(bin_img, params: DetectParams):
"""Return accepted particle info: list of dict: {'cx','cy','area','radius_px','circ'}"""
contours, _ = cv2.findContours(bin_img, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
out = []
for c in contours:
area = float(cv2.contourArea(c))
if area < params.area_min or area > params.area_max:
continue
peri = float(cv2.arcLength(c, True))
if peri <= 0:
continue
circ = float(4.0 * np.pi * area / (peri * peri))
if circ < params.circularity_min:
continue
m = cv2.moments(c)
if m["m00"] == 0:
continue
cx = float(m["m10"] / m["m00"])
cy = float(m["m01"] / m["m00"])
radius_px = float(np.sqrt(area / np.pi)) # equivalent circle radius (参考表示用)
out.append({"cx": cx, "cy": cy, "area": area, "radius_px": radius_px, "circ": circ})
return out
<r^2>の計測
各粒子の初期位置initを軌跡の配列にtracksに入れます。
det0 = detect_particles_dog(frame0, params)
init_pts = [(d["cx"], d["cy"]) for d in det0]
tracked = list(init_pts)
init = list(init_pts)
# 軌跡保存
tracks = [[] for _ in range(len(init_pts))]
for i, p in enumerate(init_pts):
tracks[i].append(p)
fn枚目のフレームに対して同様に粒子の検出を行い、track_nearest()で位置と粒子の対応付けを行います。粒子と位置の対応つけから、実スケールでの初期位置からの移動距離dx、dyを計算し、$r^2 = x^2 + y^2$を求めます。msdには$r^2$の平均値、つまり平均二乗変位$<r^2>$を配列として加えていきます。
for fn in range(1, total):
det = detect_particles_dog(fr, params)
curr_pts = [(d["cx"], d["cy"]) for d in det]
tracked = track_nearest(tracked, curr_pts, max_dist=params.max_track_dist_px)
for i, p in enumerate(tracked):
tracks[i].append(p)
r2_list = []
for p0, p in zip(init, tracked):
if p is None:
continue
dx = (p[0] - p0[0]) * m_per_px
dy = (p[1] - p0[1]) * m_per_px
r2_list.append(dx * dx + dy * dy)
if len(r2_list) >= MIN_TRACKED_PARTICLES:
times.append(fn / fps)
msd.append(float(np.mean(r2_list)))
位置と粒子の対応付けを行うtrack_nearest()ですが、基本的には一つ前の検出位置のうち、最も近いものを次の位置とするという関数です。ただし、max_dist以上離れた場合は違う粒子としています。該当する粒子がない場合はNoneを返します。
def track_nearest(prev_pts, curr_pts, max_dist):
"""Nearest-neighbor assignment. prev_pts list[(x,y)]/None -> new list same length."""
if len(curr_pts) == 0:
return [None for _ in prev_pts]
curr = np.array(curr_pts, dtype=np.float32)
used = np.zeros(len(curr_pts), dtype=bool)
new_pts = []
for p in prev_pts:
if p is None:
new_pts.append(None)
continue
px, py = p
d2 = (curr[:, 0] - px) ** 2 + (curr[:, 1] - py) ** 2
d2[used] = np.inf
j = int(np.argmin(d2))
if not np.isfinite(d2[j]):
new_pts.append(None)
continue
if np.sqrt(d2[j]) <= max_dist:
used[j] = True
new_pts.append((float(curr[j, 0]), float(curr[j, 1])))
else:
new_pts.append(None)
return new_pts
アボガドロ定数の計算
$<r^2>$と$t$の配列であるyfitとtfitから$<r^2> = 4Dt +b$の式に最小二乗近似することにより$D$を求めています。理想的には切片$b=0$ですが、実測の検出誤差などの影響を考慮して入れてあります。$D$が求まれば、アボガドロ定数の計算はアインシュタイン・ストークスの関係式の通りです。
nfit = min(FIT_FRAMES, len(times))
tfit = times[:nfit]
yfit = msd[:nfit]
A = np.vstack([tfit, np.ones_like(tfit)]).T
s, b = np.linalg.lstsq(A, yfit, rcond=None)[0]
D = s / 4.0
Na = (R_GAS * T) / (6.0 * np.pi * eta * a * D)
結果
まず粒子の軌跡ですが、一部途中で見失うものもあるものの、とてもブラウン運動らしいランダムウォークの軌跡が得られました。とても良いです。
定数として、$T=298.15$ K、$\eta=0.001$ Pa・s、$a=2$ μmを用いると$N_A=4.308 \times 10^{23}$になりました。結構理論値に近くなってよかったなと思います。
まとめ
ブラウン運動の動画からアボガドロ数を自動で解析するアプリをCustomTkinterを使って作りました。精度もまずますです。あとは粒子のサイズを正確に測定したり、温度を厳格に測ったりすることで精度が上げられると思います。何より、歴史的に分子の存在証明である実験の解析を自動化できたことはよかったかなと思います。






