【2020/11/12】 コードを修正
つい先日,アップサンプリング・ダウンサンプリングに苦しんでいる姿を目撃したので,改めて実装して確認してみようとした記事です.
それほど需要があるものではないと思いますが,誰かのヒントになればと思いPythonで実装してみました.
コードはこちら
...
開発環境
MacOS High Sierra 10.13.2
Python 3.6.0
numpy 1.14.1
scipy 1.0.0
Sampling Rate Conversionとは?
サンプリング周波数変換とは簡単に言えば,「あるサンプリング周波数でサンプリングされた信号を別のサンプリング周波数でサンプリングされた信号に変換する」処理です.
もとのサンプリング周波数より大きくすることをアップサンプリング(upsampling)と呼び,小さくすることをダウンサンプリング(downsampling)と呼んだりします.
今回は,変換前と変換後のサンプリング周波数が「整数倍 or 整数分の1」の場合のみを対象とします.
ちなみに,10kHzの音を5倍,10倍したからといって10kHz以上の音が入るわけでは有りません.
Upsampling
アップサンプリングは,基本的には補完処理を行います.
もとのサンプリング周波数 fs1 をサンプリング周波数 fs2 に変換する場合,信号の各サンプルの間に (fs2/fs1 - 1) 個の新しい値0のサンプルを追加します.
このようにサンプルを増やしていきますが,こうしたとき波形がギザギザになってしまいます.
これは信号に不要な成分(折り返しノイズ)が入ってしまっているためです.
なので,もとのサンプリング周波数のナイキスト周波数 (fs1/2) 以上の周波数成分を除去するようなLPF(Low Pass Filter)処理を施します.
Downsampling
ダウンサンプリングはアップサンプリングとは対象的に,信号のサンプルを間引きます.
もとのサンプリング周波数 fs1 をサンプリング周波数 fs2 に変換する場合,サンプル1個を取りだした後に (fs2/fs1 - 1) 個のサンプルを捨ててしまいます.
この時,間引いた信号には折り返しが発生する可能性があります(例:44.1kHzを22.05kHzに落とすとき,元信号に20kHzの信号が入ってるとダウンサンプリングしたとき折り返しが発生する).
なので,間引き処理を行う前に変換後のサンプリング周波数の半分(ナイキスト周波数)以上の周波数成分を除去するためにLPF処理を施します.
実装
実装自体は3段階の構成になっています.
ちなみに今回は1chの音源のみを対象としていますが,2chの音源の場合はそれぞれのchのデータに対して同様の処理を行うだけです.
- Wavファイルの読み込み処理
- Sampling Rate Conversion
- Upsampling処理
- Downsampling処理
- Wavファイルの書き出し処理
## Wavファイルの読み込み処理
def read_wav(filename: str) -> Tuple[np.array, int]:
"""wavファイルを読み込んで,データ・サンプリングレートを返す関数
Args:
filename (str): wavファイルパス
Returns:
Tuple[np.array, int]: (信号データ, サンプリングレート)
"""
try:
wf = wave.open(filename)
fs = wf.getframerate()
# -1 ~ 1までに正規化した信号データを読み込む
data = np.frombuffer(wf.readframes(wf.getnframes()), dtype="int16") / 32768.0
return (data, fs)
except Exception as e:
print(e)
exit()
Sampling Rate Conversion
Upsampling処理
def upsampling(conversion_rate: int, data: np.array, fs: int) -> Tuple[np.array, int]:
"""アップサンプリングを行う.
入力として,変換レートとデータとサンプリング周波数.
アップサンプリング後のデータとサンプリング周波数を返す.
Args:
conversion_rate (int): 変換レート
data (np.array): 信号データ
fs (int): サンプリングレート
Returns:
Tuple[np.array, int]: 変換後の信号データとサンプリングレート
"""
# 補間するサンプル数を決める
interpolation_sample_num = conversion_rate-1
# FIRフィルタの用意をする
nyqF = (fs*conversion_rate)/2.0 # 変換後のナイキスト周波数
cF = (fs/2.0-500.)/nyqF # カットオフ周波数を設定(変換前のナイキスト周波数より少し下を設定)
taps = 511 # フィルタ係数(奇数じゃないとだめ)
b = scipy.signal.firwin(taps, cF) # LPFを用意
# 補間処理
up_data = []
for d in data:
up_data.append(d)
# 1サンプルの後に,interpolation_sample_num分だけ0を追加する
for i in range(interpolation_sample_num):
up_data.append(0.0)
# フィルタリング
result_data = scipy.signal.lfilter(b, 1, up_data)
return (result_data, int(fs*conversion_rate))
Downsampling処理
def downsampling(conversion_rate: int, data: np.array, fs: int) -> Tuple[np.array, int]:
"""ダウンサンプリングを行う.
入力として,変換レートとデータとサンプリング周波数.
アップサンプリング後のデータとサンプリング周波数を返す.
Args:
conversion_rate (int): 変換レート
data (np.array): 信号データ
fs (int): サンプリングレート
Returns:
Tuple[np.array, int]: 変換後の信号データとサンプリングレート
"""
# 間引くサンプル数を決める
decimation_sampleNum = conversion_rate-1
# FIRフィルタの用意をする
nyqF = (fs/conversion_rate)/2.0 # 変換後のナイキスト周波数
cF = (fs/conversion_rate/2.0-500.)/nyqF # カットオフ周波数を設定(変換前のナイキスト周波数より少し下を設定)
taps = 511 # フィルタ係数(奇数じゃないとだめ)
b = scipy.signal.firwin(taps, cF) # LPFを用意
# フィルタリング
data = scipy.signal.lfilter(b, 1, data)
# 間引き処理
down_data = []
for i in range(0, len(data), decimation_sampleNum+1):
down_data.append(data[i])
return (down_data, int(fs/conversion_rate))
Wavファイルの書き出し処理
def write_wav(filename: str, data: np.array, fs: int):
"""入力されたファイル名でwavファイルを書き出す.
Args:
filename (str): 出力ファイルパス
data (np.array): 信号データ
fs (int): サンプリングレート
"""
# データを-32768から32767の整数値に変換
data = [int(x * 32767.0) for x in data]
# バイナリ化
binwave = struct.pack("h" * len(data), *data)
wf = wave.Wave_write(filename)
wf.setparams((
1, # channel
2, # byte width
fs, # sampling rate
len(data), # number of frames
"NONE", "not compressed" # no compression
))
wf.writeframes(binwave)
wf.close()
Mainスクリプト
def get_args() -> argparse.Namespace:
"""引数取得
Returns:
argparse.Namespace: 引数情報
"""
parser = argparse.ArgumentParser(
prog="sr_converter.py", usage="convert samplingrate",
add_help=True
)
parser.add_argument("input", type=str, help="input wav file path")
parser.add_argument("output_dir", type=str, help="output dir path")
parser.add_argument(
"--up", type=int, default=None, help="up conversion rate: int"
)
parser.add_argument(
"--down", type=int, default=None, help="down conversion rate: int"
)
return parser.parse_args()
if __name__ == "__main__":
args = get_args()
# テストwavファイルを読み込む
data, fs = read_wav(args.input)
if not os.path.exists(args.output_dir):
os.makedirs(args.output_dir)
base_file_name = os.path.splitext(os.path.basename(args.input))[0]
if args.up is not None:
up_data, up_fs = upsampling(args.up, data, fs)
write_wav(
os.path.join(args.output_dir, base_file_name + "_up.wav"),
up_data, up_fs
)
if args.down is not None:
down_data, down_fs = downsampling(args.down, data, fs)
write_wav(
os.path.join(args.output_dir, base_file_name + "_down.wav"),
down_data, down_fs
)
テスト
今回はテストとして,基本周波数1000Hzの矩形波をtest.wavとして入力します.
そして,4倍のアップサンプリング,1/4のダウンサンプリングを実行してみました.
その結果をup.wav,down.wavとして書き出しました.
それぞれのスペクトログラムは以下の図のようになっています.
ソースコード
ちょっとしたソースコードですが一応公開しておきます.こちら
最後に・・・
復習がてらに作ってみたけれど,あんまり使わないなぁと思ったり・・・.
誰かの参考になれば良いかなと思っています.