numpyでランレングス圧縮(Run length encoding; 連長圧縮)を高速化する
はじめに
この記事ではnumpyを利用してランレングス圧縮を高速化するアルゴリズムを説明します。ランレングス圧縮とは、Wikipediaによると
連長圧縮
出典: フリー百科事典『ウィキペディア(Wikipedia)』
連長圧縮(れんちょうあっしゅく)は、データ圧縮アルゴリズムの一つで、可逆圧縮に分類される。ランレングス圧縮、RLE (Run Length Encoding) とも呼ばれる。
だそうです。例えば、次のようなデータのシーケンス[AAAAABBBCCCCCC ...] があったときに、
[A5B3C6]のように、データとその長さの組み合わせで表現する方法です。データの種類が少なく、繰り返し現れる回数が多い場合、効率よくデータを圧縮できます。文字データにも数値データにも使われますが、文字はUnicodeなどで数値に変換できるので、本稿では数値に限定してお話します。
PythonでのRLEの実装としてpipに収録されているpython-rleがありますが、forループを回しているためデータが多い時にはパフォーマンスの低下が問題になります。今回、numpyを利用して、配列を配列のまま扱う、numpythonicな書き方で高速化ができたので解説します。
forループによる実装
一般的なforループによるRLEの実装はこんな感じです。
def RLE_for(sequence):
#戻り値の初期化
comp_seq = list() # 圧縮されたデータのリスト
lengths = list() # データの連続する長さのリスト
# 最初の要素を記録
comp_seq.append(sequence[0])
temp = sequence[0]
length = 1
# 2番目から最後まで
for i in range(1, len(sequence)):
if sequence[i] != temp: # 新しいデータが来たら、これまでのデータとその長さを記録
lengths.append(length)
comp_seq.append(sequence[i])
temp = sequence[i]
length = 1
else: # 前と同じデータが来たら、lengthをインクリメント
length += 1
# 最後にlengthを追加
lengths.append(length)
return comp_seq, lengths
こんな感じです。実行してみましょう。
sequence = np.array([1, 1, 2, 2, 2, 3, 3, 3, 3, 3,
5, 5, 5, 5, 3, 5, 3, 8, 8, 8, 8])
print(RLE_for(sequence))
> ([1, 2, 3, 5, 3, 5, 3, 8], [2, 3, 5, 4, 1, 1, 1, 4])
うまくいっているようです。
numpyによる実装
Forループを使わずにnumpythonicに書くとこんな感じです。
def RLE_numpy(sequence):
diff_seq = np.diff(sequence) # sequence[i+1] - sequence[i]のアレイ。隣と同じだと0になる。
# newdata は、前の要素とインデックスが違うときだけTrueになるBoolのアレイ。
newdata = np.append(True, diff_seq != 0) # 先頭をTrueにして、2番目以降をappendで追加している。
comp_seq = sequence[newdata] # sequence から、newdataがTrueの要素だけ抜き出す
comp_seq_index = np.where(newdata)[0] # newdataがTrueの要素が、アレイの何番目に来るか取得
comp_seq_index = np.append(comp_seq_index, len(sequence)) # アレイの終了をつける
lengths = np.diff(comp_seq_index) # newdataがTrueになっている位置の差がlengthになる
return comp_seq, lengths
こちらも実行してみましょう。
sequence = np.array([1, 1, 2, 2, 2, 3, 3, 3, 3, 3,
5, 5, 5, 5, 3, 5, 3, 8, 8, 8, 8])
print(RLE_numpy(sequence))
> (array([1, 2, 3, 5, 3, 5, 3, 8]), array([2, 3, 5, 4, 1, 1, 1, 4], dtype=int64))
こちらもうまくいっているようです。
nkayさんの方法(2020/12/14追記)
nkayさんからコメントでより高速な方法を教えていただきました!
関数名はRLE_numpy2にさせていただきました.
def RLE_numpy2(sequence):
comp_seq_index, = np.concatenate(([True], sequence[1:] != sequence[:-1], [True])).nonzero()
return sequence[comp_seq_index[:-1]], np.ediff1d(comp_seq_index)
速度の比較
次のコードで速度を比較してみましょう。Sequenceはだいたい4か5になります。
import time
size = int(10e5)
sequence = ((np.random.normal(0.5, 0.02, size)*10)).astype(int)
start = time.time()
print(RLE_for(sequence))
print("RLE_for elapsed time:{:.5f}".format(time.time()-start) + " s")
start = time.time()
print(RLE_numpy(sequence))
print("RLE_numpy elapsed time:{:.5f}".format(time.time()-start) + " s")
start = time.time()
print(RLE_numpy2(sequence))
print("RLE_numpy2 elapsed time:{:.5f}".format(time.time()-start) + " s")
sizeの値を変えた時の実行速度の比較がこちらです。
size | for | numpy | numpy2 |
---|---|---|---|
$10^4$ | 0.34s | 0.0048s | 0.0025s |
$10^5$ | 0.96s | 0.037s | 0.017s |
$10^6$ | 9.2s | 0.23s | 0.16s |
$10^7$ | 80.8s | 2.36s | 1.31s |
numpyにすることで10倍以上の高速化ができました。nkayさんのnumpy2ではさらに2倍程度の高速化ができています。
おわりに
筆者は、こちらの圧縮をGDSII Layout作成に利用しています。