はじめに
FITS(Flexible Image Transport System)は、天文学や物理学の分野で広く使われているファイルフォーマットです。本記事では、Pythonを用いてFITSファイルを生成し、その中から特定の条件に基づいてデータの一部だけをフィルタリングする2つのプログラムを紹介します。
具体的には、gen_mock.py
でモックデータを生成し、cut_eventfile.py
で特定の条件に基づいてデータをカットする方法について説明します。
1. モックデータ生成スクリプト gen_mock.py
このスクリプトでは、時間情報(TIME
)とランダムな数値(PHA
)を持つFITSファイルを生成します。天文データを扱う際に、観測データの代わりにモックデータを使用してアルゴリズムの動作確認を行うことができます。
コードの説明
#!/usr/bin/env python
import numpy as np
from astropy.io import fits
# モックFITSファイルを生成する関数
def create_mock_fits(fname):
# TIME列を定義(0秒から100秒まで、0.5秒刻み)
time_data = np.arange(0, 100, 0.5)
# PHA列をランダムな値で生成(0から65536の範囲)
ref_data = np.random.randint(0, 65536, len(time_data))
# FITSカラムの作成
col1 = fits.Column(name='TIME', format='E', array=time_data)
col2 = fits.Column(name='PHA', format='E', array=ref_data)
# HDU(Header/Data Unit)の作成
hdu = fits.BinTableHDU.from_columns([col1, col2])
# ファイルに書き込み
hdu.writeto(fname, overwrite=True)
print(f"Mock FITS file created: {fname}")
# モックFITSファイルの出力先
mock_fits_filename = 'mock_fits.evt'
# モックFITSファイルを作成
create_mock_fits(mock_fits_filename)
ポイント
-
time_data
は0.5秒刻みの時間データです。 -
PHA
はFITSデータのランダムな参考値として使われ、0から65536の範囲でランダムな値が生成されます。 -
BinTableHDU
を使用して、複数のカラムを持つFITSファイルを生成します。 - 最終的に、指定したファイル名(例:
mock_fits.evt
)にFITSファイルとして保存されます。
2. データフィルタリングスクリプト cut_eventfile.py
次に、生成したモックFITSファイルを元に、特定の条件を満たすデータの前後1秒(デフォルトの設定)のデータをフィルタリングして新しいFITSファイルに保存するスクリプトです。
コードの説明
#!/usr/bin/env python
import argparse
from astropy.io import fits
import numpy as np
def main():
# argparseを使用してコマンドライン引数を設定
parser = argparse.ArgumentParser(description="FITSファイルを処理して、特定の条件を満たすデータの前後1秒のデータを抽出します。")
parser.add_argument("fname", type=str, help="処理するFITSファイルの名前")
parser.add_argument("--threshold",'-t', type=float, default=65000., help="フィルタリングに使用する閾値")
parser.add_argument("--column", '-c', type=str, default='PHA', help="フィルタリング対象となるカラム名")
parser.add_argument("--timewindow", '-w', type=float, default=1.0, help="前後何秒間のデータを抽出するか")
args = parser.parse_args()
fname = args.fname
threshold = args.threshold
column = args.column
timewindow = args.timewindow
# 出力ファイル名を作成
outftag = fname.replace(".evt", "").replace(".gz", "")
output_fname = f"sc_{outftag}_{column}_{int(threshold):d}_{int(timewindow):d}.evt"
# FITSファイルを読み込む
hdu = fits.open(args.fname)[1]
data = hdu.data
# 時間データとフィルタリング対象のカラムデータを取得
time_array = data["TIME"]
ref_array = data[column]
# 閾値を超えるデータのインデックスを取得
condition_indices = np.where(ref_array >= threshold)[0]
# 前後timewindow秒のデータを保持するためのマスクを作成
mask = np.zeros(len(time_array), dtype=bool)
# 閾値を満たすインデックスごとに前後timewindow秒のデータを抽出
for idx in condition_indices:
current_time = time_array[idx]
# TIMEカラムがcurrent_time ± timewindow秒の範囲内にあるかをチェック
time_condition = (time_array >= current_time - timewindow) & (time_array <= current_time + timewindow)
mask |= time_condition # 条件を満たすデータのマスクを結合
# マスクを適用してフィルタリングされたデータを取得
filtered_data = data[mask]
# フィルタリングされたデータを新しいFITSファイルに保存
hdu_new = fits.BinTableHDU(filtered_data, header=hdu.header)
hdu_new.writeto(output_fname, overwrite=True)
print(f"フィルタリングされたデータが{output_fname}に保存されました。")
if __name__ == "__main__":
main()
ポイント
- このスクリプトは
argparse
を使用して、コマンドラインからFITSファイル名、閾値、カラム名、時間ウィンドウを指定します。 - 特定の閾値(デフォルトは
65000
)を満たすデータのインデックスを取得し、その前後timewindow
(デフォルトは1秒)のデータを抽出します。 - マスクを使ってフィルタリングし、新しいFITSファイルとして保存します。
- 出力ファイル名は、
sc_{fname}_{column}_{threshold}_{timewindow}.evt
の形式で保存されます。
3. スクリプトの実行方法
まず、gen_mock.py
を実行してモックデータを生成します。
python gen_mock.py
次に、生成されたFITSファイルに対してフィルタリングを行います。
python cut_eventfile.py mock_fits.evt --threshold 60000 --timewindow 2
上記の例では、PHA
カラムの値が60000
以上のデータを基準に、その前後2秒間のデータがフィルタリングされ、新しいFITSファイルに保存されます。
まとめ
この2つのPythonスクリプトを組み合わせることで、FITSファイルの生成とフィルタリングの確認を行うことができます。より高度な条件判定や処理などを行いたい場合は、
など参考にしてください。