4
3

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 3 years have passed since last update.

グラフの最短経路探索アルゴリズムの速度計測

Posted at

二次元のメトリックな地図における最短経路を動的に探索する必要があったため、重み付きグラフの最短経路探索アルゴリズムをpython3で実装し、速度を計測してみました。

今回実装したアルゴリズム

今回は、下記のような状況下で最短経路を動的探索する必要がありました。

  • 最短経路探索の要件
    • 各ノードはメトリックな地図上の「座標 (x, y)」によって特徴づけられる
    • スタートノードとゴールノードが一つ与えられる
    • エッジの重みはノード間のユークリッド距離で良い

上記要件から、今回はWikipediaの解説を参考にA*アルゴリズムダイクストラアルゴリズムを実装しました。なおA*のヒューリスティック関数は、当該ノードとゴールノード間のユークリッド距離としました。

計測結果

50 x 50 のグリッド上に配置したノードを少しだけランダムに位置をずらし、ある一定の確率でグリッドの辺を消去して生成したグラフを使って、各アルゴリズムの計算速度を求めてみました。驚くなかれ、なんと100倍もの速度差が!ほんとに実装大丈夫なのかな・・・

アルゴリズム 計算時間の例(秒)
(普通の)A* 4.088
(普通の)ダイクストラ 5.747
(速度改善した)A* 0.037
  • 探索したグラフの例(各実装において算出された最短経路は、3つとも同一)
    グラフ

  • 計測した環境

    • iMac (Retina 5K, 27-inch, 2017), 4.2 GHz Intel Core i7, 32 GB 2400 MHz DDR4
    • macOS Mojave, 10.14.6
    • python, 3.8.1

実装

(普通の)A*とダイクストラ

WikipediaのA*の解説「アルゴリズムの流れ」をほぼそのまま。mの状態判定の順序が違う程度。
_find_candidateが(たぶん)O(n)なので、ノード数が多くなると遅そう。またremoveもたぶんO(n)で遅そう。
なおWikipediaに「優先度付きキュー使え」と書いてあったので heapq を使ってみましたが、生のpython listに対して min(open_list) 等の操作をする実装でも、計算速度が数%遅くなる程度の影響しかありませんでした。あれー?

searcher.py
class Searcher(metaclass=ABCMeta):
    def __init__(self, start: Node, goal: Node):
        self.start = start
        self.goal = goal

    def calculate(self) -> List[Node]:
        target: CalcNode

        open_list: List[CalcNode] = []
        close_list: List[CalcNode] = []
        heapq.heapify(open_list)

        start = CalcNode.new(self.start)
        start.fs = self._hs(start)

        heapq.heappush(open_list, start)

        while True:
            if len(open_list) == 0:
                print('No route found')
                return []

            target = heapq.heappop(open_list)
            if target.node == self.goal:
                break

            close_list.append(target)

            target.gs = target.fs - self._hs(target)

            for candidate in target.nexts:
                fs = target.gs + self._cost(target, candidate) + self._hs(candidate)

                opened_candidate = self._find_candidate(open_list, candidate)
                if opened_candidate is not None:
                    if fs < opened_candidate.fs:
                        open_list.remove(opened_candidate)
                        opened_candidate.fs = fs
                        opened_candidate.parent = target
                        heapq.heappush(open_list, opened_candidate)
                else:
                    closed_candidate = self._find_candidate(close_list, candidate)
                    if closed_candidate is not None:
                        if fs < closed_candidate.fs:
                            close_list.remove(closed_candidate)
                            closed_candidate.fs = fs
                            closed_candidate.parent = target
                            heapq.heappush(open_list, closed_candidate)
                    else:
                        candidate.fs = fs
                        candidate.parent = target
                        heapq.heappush(open_list, candidate)

        return [li.node for li in reversed(list(target))]

    def _cost(self, target: CalcNode, candidate: CalcNode) -> float:
        return math.sqrt((candidate.x - target.x)**2 + (candidate.y - target.y)**2)

    def _find_candidate(self, li: List[CalcNode], candidate: CalcNode) -> Optional[CalcNode]:
        try:
            return li[li.index(candidate)]
        except ValueError:
            return None

    @abstractmethod
    def _hs(self, node: CalcNode) -> float:
        raise NotImplementedError


class Astar(Searcher):
    def _hs(self, node: CalcNode) -> float:
        return math.sqrt((node.x - self.goal.x)**2 + (node.y - self.goal.y)**2)


class Dijkstra(Searcher):
    def _hs(self, node: CalcNode) -> float:
        return 0.0

速度改善したA*

WikipediaのA*の解説「実際に使われているOPEN/CLOSEリストの実装」をほぼそのままですが、ノードが持つタグとして OPENCLOSE以外にNEWも持たせ、ノードの初期状態では NEW となるようにしています。open_listとclose_list間でのノードの移動が不必要になり、重複チェックのためのlist内のノード探索が無くなったことが、非常に大きな速度改善効果を及ぼしている気がします。
こちらの実装は、open_listにノードが重複して追加されるため、速度改善の代わりにメモリを消費するはずです。open_listにはノードオブジェクトそのものではなくfsとidのタプルだけ登録するので、そこまで問題にはならないはずですが、計測していません。。。

fast_astar.py
class NodeType(Enum):
    NEW = auto()
    OPEN = auto()
    CLOSE = auto()
fast_astar.py
class FastAstar:
    def __init__(self, start: Node, goal: Node):
        self.start = start
        self.goal = goal

    def calculate(self) -> List[Node]:
        target: CalcNode
        table: Dict[int, CalcNode] = {}
        open_list: List[Tuple[float, int]] = []
        heapq.heapify(open_list)

        start = CalcNode.new(self.start)
        start.fs = self._hs(start)

        table[start.id] = start
        heapq.heappush(open_list, (start.fs, start.id))

        while True:
            if len(open_list) == 0:
                print('No route found')
                return []

            target = table[heapq.heappop(open_list)[1]]
            if target.nodetype == NodeType.CLOSE:
                continue

            if target.node == self.goal:
                break

            target.nodetype = NodeType.CLOSE

            target.gs = target.fs - self._hs(target)

            for candidate in target.nexts:
                fs = target.gs + self._cost(target, candidate) + self._hs(candidate)

                if candidate.nodetype == NodeType.NEW:
                    candidate.fs = fs
                    candidate.parent = target
                    candidate.nodetype = NodeType.OPEN
                    table[candidate.id] = candidate
                    heapq.heappush(open_list, (candidate.fs, candidate.id))
                else:
                    if fs < candidate.fs:
                        candidate.fs = fs
                        candidate.parent = target
                        candidate.nodetype = NodeType.OPEN
                        table[candidate.id] = candidate
                        heapq.heappush(open_list, (candidate.fs, candidate.id))

        return [li.node for li in reversed(list(target))]

    def _cost(self, target: CalcNode, candidate: CalcNode) -> float:
        return math.sqrt((candidate.x - target.x)**2 + (candidate.y - target.y)**2)

    def _hs(self, node: CalcNode) -> float:
        return math.sqrt((node.x - self.goal.x)**2 + (node.y - self.goal.y)**2)

ソースコード

nmatsui/path-searchとしてgithubに公開していますので、よろしければイイカンジに試してみてください。バグがあったら、こっそり教えてもらえると助かります。

4
3
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
4
3

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?