#はじめに
とある処理を実装する必要がありました。処理は以下の通りです。
「リスト中の各オブジェクトについて、新規オブジェクトよりも劣っている場合はリストから取り除く」
劣ってるってなんだよ?ってところですが、ここではオブジェクトがタプルで表現されているとし、新規オブジェクトをn
、元々リストにいたオブジェクトをx
とすると、
[n[i] > x[i] for i in range(len(n))]
の結果が[True, True, ..., True]
となる(全部True)場合を言うとします。
ただし、各添え字について値が大きい方がいいのか小さい方がいいのかは問題により変わる(つまり、プログラムはそれに対応しなければいけない)とします。例えば、1軸目を最大化、2軸目は最小化が行われるという場合、取り除くオブジェクトの条件は以下のようになります。
n[0] > x[0] and n[1] < x[1]
以下ではこの処理をPythonでとりあえず書いてみて、その後、段階的に改善していきたいと思います。
#はじめのプログラム
というわけで説明した処理を実装するプログラムです。各軸で最大化なのか最小化なのかは引数maximize
で渡されるとします(変数名がいまいちですが)
def filter00(l, n, maximize):
nl = []
for x in l:
remove = True
for i in range(len(x)):
if maximize[i]:
if n[i] > x[i]:
pass
else:
remove = False
else:
if n[i] < x[i]:
pass
else:
remove = False
if not remove:
nl.append(x)
return nl
削除する前提でチェックし、x
の方がすぐれている軸がある場合は削除をとりやめます。ifがpassでelseでremove
をFalseにしているのが冗長に思いますが、条件ひっくり返したりnot付けたりするのも可読性が下がると思うのでこのままとします(どうせ改善していきますし)
#改善その1:ループに関係ない条件分岐の削除
filter00
関数の以下のコード、
for i in range(len(x)):
if maximize[i]:
if n[i] > x[i]:
pass
else:
remove = False
else:
if n[i] < x[i]:
pass
else:
remove = False
maximize
は引数で渡されるものでループ内では変わりません。毎回条件分岐をしているのが余計な処理に感じます。そこで、if maximize[i]
を外に出せないかを考えます。
方法としては、n[i] > x[i]
とn[i] < x[i]
をlambda式にしてしまうというものになります。Pythonは演算子に対応する関数を実行するoperatorモジュールがあるのでそれを使って書き直してみましょう。
import operator
def filter01(l, n, maximize):
cond = []
for m in maximize:
cond.append(operator.gt if m else operator.lt)
nl = []
for x in l:
remove = True
for i in range(len(x)):
if cond[i](n[i], x[i]):
pass
else:
remove = False
if not remove:
nl.append(x)
return nl
#性能の測定
ところでループによって変わらない条件分岐を削除しましたが本当に速くなったのでしょうか。感覚ではなくちゃんと計測してみましょう。今回は以下のような計測用のスクリプトを作ってみました。
import filters
import time
import random
import numpy as np
random.seed(123)
def perf(ll, n, m, f):
elapsed = []
for l in ll:
start = time.time()
f(l, n, m)
end = time.time()
elapsed.append(end - start)
print(f.__name__, np.average(elapsed))
LOOP = 100
SIZE = 1000
n = (70, 70)
m = (True, True)
ll = [[(random.randint(0, 99), random.randint(0, 99)) for _ in range(SIZE)] for _ in range(LOOP)]
perf(ll, n, m, filters.filter00)
perf(ll, n, m, filters.filter01)
平均値だけで比較するとマサカリが飛んでくる危険がありますが、一応事前調査で平均値でいいかなという結果になっているのでマサカリは投げないでください(笑)
さて、実行結果
filter00 0.00383123636246
filter01 0.00437139987946
予想に反して5ミリ秒遅くなってしまいました。
考えられる可能性として、比較が演算から関数呼び出しに変わったことによるコストが、ループによって変わらない条件分岐を削除したコストを上回ったということがあります。
#改善その2:zip関数を使う
コードも短くなって速度も上がってハッピーという話にしたかったのですが出だしでつまずいてしまいました。
実はこの改善は一通りコードを短くした後の速度検討でわかったものですが、遅いままのコードでいくのもあれなので、ループで複数のリストを扱う際にはzip関数を使うのがよいとは聞くがどのようなものかと試してみました。
def filter02(l, n, maximize):
cond = []
for m in maximize:
cond.append(operator.gt if m else operator.lt)
nl = []
for x in l:
remove = True
for c, a, b in zip(cond, n, x):
if c(a, b):
pass
else:
remove = False
if not remove:
nl.append(x)
return nl
計測結果。なお、filter00とfilter01の値が先ほどと全く同じですが改善プログラムの最終結果を作ったところで計った値を小出しで出しています。悪しからず。
filter00 0.00383123636246
filter01 0.00437139987946
filter02 0.0029260468483
速い。初期コードよりも速くなりました。
filter01
とfilter02
の違いはcond
、n
、x
を添え字で参照しているかzip関数が返すものを受け取っているかの違いです。添え字のアクセスは__getitem__
メソッドが呼ばれるので、関数呼び出しのコストがかかっていたものがなくなったと思われます。と言いつつ、zip関数だって__getitem__
は呼び出さないといけませんがそこら辺はCレベルで処理してるから速いのかなと別の興味がわいてきましたがとりあえずコード改善を進めます。
改善その3:内包表記を使う
みんな大好き内包表記。だらだらforのループを書くなんてダサすぎます。内包表記を使ってすっきり書きましょう。
def filter03(l, n, maximize):
cond = [operator.gt if m else operator.lt for m in maximize]
return [x for x in l if not all([c(a, b) for c, a, b in zip(cond, n, x)])]
いきなりまとめすぎ!とツッコミをいただきそうなのでちゃんと説明します。
あ、でもその前に。先に言っておくとこのコードは行数的には非常に短くなりましたがはっきり言って遅いです。というわけで速いコードを知りたいという方は改善その5まで飛ばしてください。
内側の内包表記
さて、filter03
の説明です。この関数では内包表記を二重に使っています。先に内側の説明。
[c(a, b) for c, a, b in zip(cond, n, x)]
この部分はfilter02
で書いていた内側のループとほぼ同じです。ほぼというのは、filter02
では条件分岐をして単一の変数remove
に値を設定していましたが上の内包表記では、
[True, True]
みたいな感じにリストが返されるという違いがあります。
ここでめんどくさい問題が生じてきます。残す、残さないを判断するにはリストではなくスカラーにしないといけません。そのためにall関数を利用します。all関数はリスト(というかiterable)が全部TrueならTrueを返す関数です。これを使うことで、filter02
で条件分岐、elseの場合にremove
をFalseにしていた処理が実現できます。(いずれかの軸で負けていなければリストの少なくとも1つがFalseになり全体もFalseになります)
外側の内包表記
というわけで内側がわかったので今度は外側。
[x for x in l if not all([c(a, b) for c, a, b in zip(cond, n, x)])]
これがどう動くかというと、
-
l
から要素が一つ取り出されてx
に代入される - その
x
を使ってif ...
の部分が評価される - Trueが返ってくれば
x
をリストに含める
というわけで、外側の内包表記はfilter02
でremove
の値(内側のループで決定される)に応じてappendするしないを処理していたコードに相当します。
実行速度
さて、お待ちかねの実行速度です。
filter00 0.00383123636246
filter01 0.00437139987946
filter02 0.0029260468483
filter03 0.00512305736542
遅い。今まででダントツに遅くなりました。
実は内包表記は関数的に実行される&all関数が登場したことによりオーバーヘッドがかかっていると思われます。
改善その4:内部関数を使う
さて、すでに速度的な優位性はありませんがコード記述量的な優位性はあるので、改善その3のコードをもう少し改善します。filter03
でわかりにくかったのは二重の内包表記なのでその部分を内部関数を使ってもう少しかっこよくします。
def filter04(l, n, maximize):
cond = [operator.gt if m else operator.lt for m in maximize]
def dominated(x, n):
return all([c(a, b) for c, a, b in zip(cond, n, x)])
return [x for x in l if not dominated(x, n)]
if not
の後ろに書かれていた内包表記(とall関数)をdominated
という内部関数にすることで何をしているのかを一言で説明できるようになり可読性が上がりました。
なお、実行速度ですがご想像の通り一番遅いです。
filter00 0.00383123636246
filter01 0.00437139987946
filter02 0.0029260468483
filter03 0.00512305736542
filter04 0.00578190803528
改善その5:知見を活かして速く動くコードを書いてみる
さて、ここまでを振り返ってみましょう。
- ループ中変わらない条件分岐の除去→operatorを使えばできるが関数呼び出し分遅くなる
- zip関数速い
- 内包表記実は遅い(all関数使わないといけなくなるからより遅い)
というわけで、これらを踏まえたコードです。なお、filter021
となっているのはfilter02
をベースに改造を加えたという意味です。
def filter021(l, n, maximize):
nl = []
for x in l:
remove = True
for a, b, m in zip(n, x, maximize):
remove = remove and (a > b if m else a < b)
if not remove:
nl.append(x)
return nl
内側のループ、operator使えないのでand演算子と条件演算子を使ってコンパクトにしました。
さて、実行時間
filter00 0.00383123636246
filter01 0.00437139987946
filter02 0.0029260468483
filter03 0.00512305736542
filter04 0.00578190803528
filter021 0.00255712985992
一番速くなりました
改善その6:特殊化してみる
話はまだ終わりではありません。どうにかしてもっと速くできないでしょうか。
アイデアとしては、よく使う場合とそれ以外(一般)の場合でコードを変えるというものです。つまり、「N変数、最大最小が入り混じることがある」と言っても大体の利用では「2変数、両軸最大化」とすると以下のように書いてしまうことができます。
def filter02X(l, n, maximize):
if len(n) == 2 and maximize == (True, True):
return [x for x in l if not (n[0] > x[0] and n[1] > x[1])]
else:
return filter021(l, n, maximize)
計測結果
filter00 0.00383123636246
filter01 0.00437139987946
filter02 0.0029260468483
filter03 0.00512305736542
filter04 0.00578190803528
filter021 0.00255712985992
filter02X 0.000690214633942
段違いのスピードになりました
言語処理系実装とかで2変数の場合、3変数の場合、それ以上とかしているのはこういう意味があったのですね。
まとめ
この記事ではベタなプログラムを徐々に改善していきました。当初の目論見としてはPythonっぽくないコード(他の言語を学んだ人がPythonでプログラムを書く際に書きがちなコード)をPythonっぽくしていくことで可読性も速度も上がってハッピーという話にしたかったのですが実際計ってみるとそうでもないことがわかったので少し話を変えました。要約すると次の3点になります。
- Pythonは関数呼び出しが遅い。処理の一部を関数に切り出すことはPythonっぽいプログラムだが速度が求められる場合は要検討
- 速度のためにはよくある場合の特殊化を入れるのは悪じゃない
- まあと言いつつもそこまで速度を要求しないようなものなら可読性がいい方がいいよ?
2020/3/22追記
記事の投稿から3年。当時はまだnumpy力も低く、「あの記事で書いた速度改善、今ならnumpy使ってもっと速くできるんだろうなー」とは思ってたものの手をつけていませんでした。
一方で、この記事の元ネタになったプログラムを久しぶりにメンテしたところあまりの遅さに絶望し、numpyを使って速くしました。
すべてのforを消し去りたい
filter021
にはnumpy○チにとって許せないものがあります。
そう、for文です。
というわけで消し去りました。
def filter05(l, n, maximize):
cond = [np.greater if m else np.less for m in maximize]
npa = np.array(l)
check = np.empty(npa.shape, dtype=np.bool)
for i, c in enumerate(cond):
check[:, i] = c(n[i], npa[:, i])
# nに全軸負けている(条件式が全部Trueになる)要素はremove、そうでなければnot remove
not_remove = (check.sum(axis=1) != len(n))
return [x for x, nr in zip(l, not_remove) if nr]
消し去れてないやん!
言い訳すると上記のコードになっているのは理由があります。
- 軸ごとに最大化なのか最小化なのか異なるので(私のnumpy力では)一つの条件式では書けない
- 戻り値はlistでなければいけない。また、元ネタプログラムではlistの要素は「listを継承したクラスのオブジェクト」というめんどくさいものであり、npaに直接not_removeを作用(boolean indexを使用)できない1
ともかく時間を測ってみましょう。
filter00 0.0037975716590881348
filter01 0.004087417125701904
filter02 0.003038032054901123
filter03 0.004926862716674804
filter04 0.005476489067077637
filter021 0.002838168144226074
filter02X 0.0006895852088928222
filter05 0.002588319778442383
誤差ですね。
速くならない理由はlistをnumpyに変換するオーバーヘッドと思われます。
なおlen(l) >> len(n)であれば軸ごと比較をしてるfor文は無視できると思われます。
すべてをnumpyに
先述のように元ネタプログラムでは入力としてnumpy配列を想定できないためこれ以上速くできませんが、試しに入出力をnumpy配列としていいのならどれぐらい速くなるのか試してみました。
def filter06(a, n, maximize):
cond = [np.greater if m else np.less for m in maximize]
check = np.empty(a.shape, dtype=np.bool)
for i, c in enumerate(cond):
check[:, i] = c(n[i], a[:, i])
not_remove = (check.sum(axis=1) != len(n))
return a[not_remove]
計測にかける前にnumpy配列にしておきます。
a = np.array(ll)
perf(a, n, m, filters.filter06)
結果。やはりnumpyだけで完結できれば速い。
filter00 0.0037975716590881348
filter01 0.004087417125701904
filter02 0.003038032054901123
filter03 0.004926862716674804
filter04 0.005476489067077637
filter021 0.002838168144226074
filter02X 0.0006895852088928222
filter05 0.002588319778442383
filter06 0.00030982017517089844
というか特殊化版(filter02X
)なんでこんなに速いのだろう。ちなみにfilter05
やfilter06
を特殊化した版を作って計ってみましたがほとんど変わらない(むしろ遅くなる)結果でした。
2020/3/28追記
filter06
の軸ごと比較してるのどうにか一気にやる方法ないかなーと考えたところ、軸ごと比較は排除できませんでしたがforの後のsumを消す方法は思いつきました。
def filter061(a, n, maximize):
cond = [np.greater if m else np.less for m in maximize]
check = np.ones(a.shape[0], dtype=np.bool)
for i, c in enumerate(cond):
check = check & c(n[i], a[:, i])
not_remove = ~check
return a[not_remove]
計測すると2倍ぐらい速くなります。なおfilter05
にも同じ改善を適用してみましたがやはり変換オーバーヘッドの方が大きかったです。
filter06 0.0003197813034057617
filter061 0.00017987966537475587
比較演算自体を反転し、最後の~
をなくすともう1~2割速くなりますが可読性は落ちます。
def filter062(a, n, maximize):
cond = [np.less if m else np.greater for m in maximize]
not_remove = np.zeros(a.shape[0], dtype=np.bool)
for i, c in enumerate(cond):
not_remove = not_remove | c(n[i], a[:, i])
return a[not_remove]
-
仕様を満たさなくなりますが試しにboolean indexを使ってみたものの速くなりませんでした。 ↩