はじめに
イベントの発生時刻が、[0.1, 0.5, 0.6, 0.7, 1.1, .... ] のように発生時刻の点列として与えられているデータに対して、単位時間あたりの発生頻度をpythonで計算する方法を紹介します。
愚直にやる slow な方法と、np.searchsorted
を用いた高速な方法の2つを紹介します。
この記事の実行例は、
にあります。
numpy.searchsorted の詳細は、
を参考にしてください。とっても使いやすくて便利な関数です。
単位時間当たりの発生頻度曲線の高速&低速計算コード
ここでは、
xdata = np.random.uniform(0, 10000, 100000) # 0から10000までのランダムなデータを100000点生成
で擬似的にデータを生成して、それを、np.searchsortedを用いた fast_lc という関数と、愚直に計算する slow_lc という2つの関数を用いてライトカーブ(時系列データの単位時間あたりの発生率)を計算し、それらのスピードを比較します。
最後に図をプロットして目で結果が同じことを確認してみます。
import numpy as np # NumPyライブラリをnpという名前でインポート
import time # timeモジュールをインポート(実行時間計測用)
import matplotlib.pyplot as plt # Matplotlibのpyplotモジュールをpltという名前でインポート
def fast_lc(tstart, tstop, binsize, x):
times = np.arange(tstart, tstop, binsize)[:-1] # tstartからtstopまでbinsize間隔で数値を生成し、最後の数値を除去
n_bins = len(times) # timesの長さ(要素数)を取得
x_lc, y_lc = np.zeros(n_bins), np.zeros(n_bins) # xとyのデータを保存するための配列を初期化
x_err = np.full(n_bins, 0.5 * binsize) # xのエラーを保存するための配列を初期化
y_err = np.zeros(n_bins) # yのエラーを保存するための配列を初期化
x = np.sort(x) # xを昇順にソート
for i, t in enumerate(times): # timesの各要素に対してループ
start, end = np.searchsorted(x, [t, t + binsize])# x内でのtとt+binsizeの位置を探す
count = end - start # イベントの数を計算
x_lc[i] = t + 0.5 * binsize # xデータを保存
y_lc[i] = count / binsize # yデータを保存
y_err[i] = np.sqrt(count) / binsize # yのエラーを計算し保存
return x_lc, x_err, y_lc, y_err # 結果を返す
def slow_lc(tstart, tstop, binsize, x):
times = np.arange(tstart, tstop, binsize)[:-1] # tstartからtstopまでbinsize間隔で数値を生成し、最後の数値を除去
if not times.size: # timesが空の場合
print("No events, check data") # エラーメッセージを表示
x_lc, y_lc, x_err, y_err = [], [], [], [] # データを保存するためのリストを初期化
for t in times: # timesの各要素に対してループ
in_bin = (x > t) & (x <= t + binsize) # イベントが現在のbin内にあるかの条件
count = len(x[in_bin]) # イベントの数を計算
x_lc.append(t + 0.5 * binsize) # xデータを追加
y_lc.append(count / binsize) # yデータを追加
x_err.append(0.5 * binsize) # xのエラーを追加
y_err.append(np.sqrt(count) / binsize) # yのエラーを追加
return np.array(x_lc), np.array(x_err), np.array(y_lc), np.array(y_err) # 結果を返す
# Time comparison and plotting
xdata = np.random.uniform(0, 10000, 100000) # 0から10000までのランダムなデータを100000点生成
# For the fast_lc function
start_time = time.time() # 現在の時間を取得
x_fast, x_err_fast, y_fast, y_err_fast = fast_lc(0, 10000, 16, xdata) # fast_lc関数を実行
dt_fast = time.time() - start_time # 実行時間を計算
print(f"Fast LC: {dt_fast:.6f} seconds") # 実行時間を表示
# For the slow_lc function
start_time = time.time() # 現在の時間を取得
x_slow, x_err_slow, y_slow, y_err_slow = slow_lc(0, 10000, 16, xdata) # slow_lc関数を実行
dt_slow = time.time() - start_time # 実行時間を計算
print(f"Slow LC: {dt_slow:.6f} seconds") # 実行時間を表示
# Plotting
plt.figure(figsize=(12, 6)) # プロットのフィギュアサイズを指定
# Fast LC plot
plt.errorbar(x_fast, y_fast, xerr=x_err_fast, yerr=y_err_fast, fmt='o', markersize=5, label=f"Fast LC: {dt_fast:.6f} seconds", alpha=0.7) # Fast LCの結果をプロット
# Slow LC plot
plt.errorbar(x_slow, y_slow, xerr=x_err_slow, yerr=y_err_slow, fmt='s', markersize=5, label=f"Slow LC: {dt_slow:.6f} seconds", alpha=0.7) # Slow LCの結果をプロット
plt.xlabel("Time") # x軸のラベルを設定
plt.ylabel("Rate") # y軸のラベルを設定
plt.title("Comparison between Fast and Slow LC") # グラフのタイトルを設定
plt.legend() # 凡例を表示
plt.grid(True) # グリッドを表示
plt.tight_layout() # レイアウトを整える
plt.savefig("qiita_make_lc_speed_comparison.png") # 図を保存する
plt.show() # プロットを表示
これを実行すると計算時間は、google colab 上では、
Fast LC: 0.022879 seconds
Slow LC: 0.118692 seconds
のように、slow の方が5倍くらい遅いという結果が出ています。
生成された図を比較してみると、
このように、よく拡大すると、完璧に一致していることがわかります。
高速化の理由
np.searchsorted
は、ソートされた配列内に値を挿入するときのインデックスを返す関数です。つまり、指定された値がソートされた配列に挿入されるべき場所を示すインデックスを返してくれます。
例えば、a = [1, 3, 4, 4, 6, 7]
というソートされた配列があるとき、np.searchsorted(a, 5)
を呼び出すと、戻り値は4になります。これは、5を配列aに挿入するときの適切な位置が、4と6の間のインデックス4になるからです。
この機能をある時間間隔内に発生する頻度を数える関数の文脈で考えると、xdataは時刻が保存された配列で、その値は昇順にソートされていると仮定しています(この仮定が正しくない場合、関数の先頭でxdata = np.sort(xdata)という行を追加して、データを必ずソートしてください)。
例えば、xdata
という時刻の配列と、ltime という時刻の一点を使って説明すると、
start_idx = np.searchsorted(xdata, ltime)
とすると、これは、時刻 ltime
が xdata
配列に挿入されるべき位置を探しています。これは、現在のタイムビンが始まるインデックスに対応します。
同様に、ltime + timebinsize を用いると、
end_idx = np.searchsorted(xdata, ltime + timebinsize)
この行では、ltime + timebinsize
が xdata
配列に挿入されるべき位置を探しています。これは、現在のタイムビンが終わるインデックスに対応します。
この2つのインデックスを使用することで、時間ビンの内のイベントの数(events_in_bin = end_idx - start_idx
)を計算できます。これは、xdata
配列の start_idx
と end_idx
の間にある要素の数に相当します。
この方法を使用することで、条件式を用いて部分配列を取得するよりも大幅に高速にデータの範囲を取得できます。
上の例では、表記を簡略化し、
start, end = np.searchsorted(x, [t, t + binsize])# x内でのtとt+binsizeの位置を探す
count = end - start # イベントの数を計算
とすることで、t
と t + binsize
に該当する start
と end
の index を計算し、その差 count = end - start
から binsize
内のカウントを算出しています。