二次元のメトリックな地図における最短経路を動的に探索する必要があったため、重み付きグラフの最短経路探索アルゴリズムをpython3で実装し、速度を計測してみました。
今回実装したアルゴリズム
今回は、下記のような状況下で最短経路を動的探索する必要がありました。
- 最短経路探索の要件
- 各ノードはメトリックな地図上の「座標 (x, y)」によって特徴づけられる
- スタートノードとゴールノードが一つ与えられる
- エッジの重みはノード間のユークリッド距離で良い
上記要件から、今回はWikipediaの解説を参考にA*アルゴリズムとダイクストラアルゴリズムを実装しました。なおA*のヒューリスティック関数は、当該ノードとゴールノード間のユークリッド距離としました。
計測結果
50 x 50 のグリッド上に配置したノードを少しだけランダムに位置をずらし、ある一定の確率でグリッドの辺を消去して生成したグラフを使って、各アルゴリズムの計算速度を求めてみました。驚くなかれ、なんと100倍もの速度差が!ほんとに実装大丈夫なのかな・・・
アルゴリズム | 計算時間の例(秒) |
---|---|
(普通の)A* | 4.088 |
(普通の)ダイクストラ | 5.747 |
(速度改善した)A* | 0.037 |
-
計測した環境
- iMac (Retina 5K, 27-inch, 2017), 4.2 GHz Intel Core i7, 32 GB 2400 MHz DDR4
- macOS Mojave, 10.14.6
- python, 3.8.1
実装
(普通の)A*とダイクストラ
WikipediaのA*の解説「アルゴリズムの流れ」をほぼそのまま。mの状態判定の順序が違う程度。
_find_candidate
が(たぶん)O(n)なので、ノード数が多くなると遅そう。またremoveもたぶんO(n)で遅そう。
なおWikipediaに「優先度付きキュー使え」と書いてあったので heapq
を使ってみましたが、生のpython listに対して min(open_list)
等の操作をする実装でも、計算速度が数%遅くなる程度の影響しかありませんでした。あれー?
class Searcher(metaclass=ABCMeta):
def __init__(self, start: Node, goal: Node):
self.start = start
self.goal = goal
def calculate(self) -> List[Node]:
target: CalcNode
open_list: List[CalcNode] = []
close_list: List[CalcNode] = []
heapq.heapify(open_list)
start = CalcNode.new(self.start)
start.fs = self._hs(start)
heapq.heappush(open_list, start)
while True:
if len(open_list) == 0:
print('No route found')
return []
target = heapq.heappop(open_list)
if target.node == self.goal:
break
close_list.append(target)
target.gs = target.fs - self._hs(target)
for candidate in target.nexts:
fs = target.gs + self._cost(target, candidate) + self._hs(candidate)
opened_candidate = self._find_candidate(open_list, candidate)
if opened_candidate is not None:
if fs < opened_candidate.fs:
open_list.remove(opened_candidate)
opened_candidate.fs = fs
opened_candidate.parent = target
heapq.heappush(open_list, opened_candidate)
else:
closed_candidate = self._find_candidate(close_list, candidate)
if closed_candidate is not None:
if fs < closed_candidate.fs:
close_list.remove(closed_candidate)
closed_candidate.fs = fs
closed_candidate.parent = target
heapq.heappush(open_list, closed_candidate)
else:
candidate.fs = fs
candidate.parent = target
heapq.heappush(open_list, candidate)
return [li.node for li in reversed(list(target))]
def _cost(self, target: CalcNode, candidate: CalcNode) -> float:
return math.sqrt((candidate.x - target.x)**2 + (candidate.y - target.y)**2)
def _find_candidate(self, li: List[CalcNode], candidate: CalcNode) -> Optional[CalcNode]:
try:
return li[li.index(candidate)]
except ValueError:
return None
@abstractmethod
def _hs(self, node: CalcNode) -> float:
raise NotImplementedError
class Astar(Searcher):
def _hs(self, node: CalcNode) -> float:
return math.sqrt((node.x - self.goal.x)**2 + (node.y - self.goal.y)**2)
class Dijkstra(Searcher):
def _hs(self, node: CalcNode) -> float:
return 0.0
速度改善したA*
WikipediaのA*の解説「実際に使われているOPEN/CLOSEリストの実装」をほぼそのままですが、ノードが持つタグとして OPEN
とCLOSE
以外にNEW
も持たせ、ノードの初期状態では NEW
となるようにしています。open_listとclose_list間でのノードの移動が不必要になり、重複チェックのためのlist内のノード探索が無くなったことが、非常に大きな速度改善効果を及ぼしている気がします。
こちらの実装は、open_listにノードが重複して追加されるため、速度改善の代わりにメモリを消費するはずです。open_listにはノードオブジェクトそのものではなくfsとidのタプルだけ登録するので、そこまで問題にはならないはずですが、計測していません。。。
class NodeType(Enum):
NEW = auto()
OPEN = auto()
CLOSE = auto()
class FastAstar:
def __init__(self, start: Node, goal: Node):
self.start = start
self.goal = goal
def calculate(self) -> List[Node]:
target: CalcNode
table: Dict[int, CalcNode] = {}
open_list: List[Tuple[float, int]] = []
heapq.heapify(open_list)
start = CalcNode.new(self.start)
start.fs = self._hs(start)
table[start.id] = start
heapq.heappush(open_list, (start.fs, start.id))
while True:
if len(open_list) == 0:
print('No route found')
return []
target = table[heapq.heappop(open_list)[1]]
if target.nodetype == NodeType.CLOSE:
continue
if target.node == self.goal:
break
target.nodetype = NodeType.CLOSE
target.gs = target.fs - self._hs(target)
for candidate in target.nexts:
fs = target.gs + self._cost(target, candidate) + self._hs(candidate)
if candidate.nodetype == NodeType.NEW:
candidate.fs = fs
candidate.parent = target
candidate.nodetype = NodeType.OPEN
table[candidate.id] = candidate
heapq.heappush(open_list, (candidate.fs, candidate.id))
else:
if fs < candidate.fs:
candidate.fs = fs
candidate.parent = target
candidate.nodetype = NodeType.OPEN
table[candidate.id] = candidate
heapq.heappush(open_list, (candidate.fs, candidate.id))
return [li.node for li in reversed(list(target))]
def _cost(self, target: CalcNode, candidate: CalcNode) -> float:
return math.sqrt((candidate.x - target.x)**2 + (candidate.y - target.y)**2)
def _hs(self, node: CalcNode) -> float:
return math.sqrt((node.x - self.goal.x)**2 + (node.y - self.goal.y)**2)
ソースコード
nmatsui/path-searchとしてgithubに公開していますので、よろしければイイカンジに試してみてください。バグがあったら、こっそり教えてもらえると助かります。