Qiita Teams that are logged in
You are not logged in to any team

Log in to Qiita Team
Community
OrganizationEventAdvent CalendarQiitadon (β)
Service
Qiita JobsQiita ZineQiita Blog
1
Help us understand the problem. What are the problem?

More than 1 year has passed since last update.

posted at

Organization

グラフの最短経路探索アルゴリズムの速度計測

二次元のメトリックな地図における最短経路を動的に探索する必要があったため、重み付きグラフの最短経路探索アルゴリズムをpython3で実装し、速度を計測してみました。

今回実装したアルゴリズム

今回は、下記のような状況下で最短経路を動的探索する必要がありました。

  • 最短経路探索の要件
    • 各ノードはメトリックな地図上の「座標 (x, y)」によって特徴づけられる
    • スタートノードとゴールノードが一つ与えられる
    • エッジの重みはノード間のユークリッド距離で良い

上記要件から、今回はWikipediaの解説を参考にA*アルゴリズムダイクストラアルゴリズムを実装しました。なおA*のヒューリスティック関数は、当該ノードとゴールノード間のユークリッド距離としました。

計測結果

50 x 50 のグリッド上に配置したノードを少しだけランダムに位置をずらし、ある一定の確率でグリッドの辺を消去して生成したグラフを使って、各アルゴリズムの計算速度を求めてみました。驚くなかれ、なんと100倍もの速度差が!ほんとに実装大丈夫なのかな・・・

アルゴリズム 計算時間の例(秒)
(普通の)A* 4.088
(普通の)ダイクストラ 5.747
(速度改善した)A* 0.037
  • 探索したグラフの例(各実装において算出された最短経路は、3つとも同一)
    グラフ

  • 計測した環境

    • iMac (Retina 5K, 27-inch, 2017), 4.2 GHz Intel Core i7, 32 GB 2400 MHz DDR4
    • macOS Mojave, 10.14.6
    • python, 3.8.1

実装

(普通の)A*とダイクストラ

WikipediaのA*の解説「アルゴリズムの流れ」をほぼそのまま。mの状態判定の順序が違う程度。
_find_candidateが(たぶん)O(n)なので、ノード数が多くなると遅そう。またremoveもたぶんO(n)で遅そう。
なおWikipediaに「優先度付きキュー使え」と書いてあったので heapq を使ってみましたが、生のpython listに対して min(open_list) 等の操作をする実装でも、計算速度が数%遅くなる程度の影響しかありませんでした。あれー?

searcher.py
class Searcher(metaclass=ABCMeta):
    def __init__(self, start: Node, goal: Node):
        self.start = start
        self.goal = goal

    def calculate(self) -> List[Node]:
        target: CalcNode

        open_list: List[CalcNode] = []
        close_list: List[CalcNode] = []
        heapq.heapify(open_list)

        start = CalcNode.new(self.start)
        start.fs = self._hs(start)

        heapq.heappush(open_list, start)

        while True:
            if len(open_list) == 0:
                print('No route found')
                return []

            target = heapq.heappop(open_list)
            if target.node == self.goal:
                break

            close_list.append(target)

            target.gs = target.fs - self._hs(target)

            for candidate in target.nexts:
                fs = target.gs + self._cost(target, candidate) + self._hs(candidate)

                opened_candidate = self._find_candidate(open_list, candidate)
                if opened_candidate is not None:
                    if fs < opened_candidate.fs:
                        open_list.remove(opened_candidate)
                        opened_candidate.fs = fs
                        opened_candidate.parent = target
                        heapq.heappush(open_list, opened_candidate)
                else:
                    closed_candidate = self._find_candidate(close_list, candidate)
                    if closed_candidate is not None:
                        if fs < closed_candidate.fs:
                            close_list.remove(closed_candidate)
                            closed_candidate.fs = fs
                            closed_candidate.parent = target
                            heapq.heappush(open_list, closed_candidate)
                    else:
                        candidate.fs = fs
                        candidate.parent = target
                        heapq.heappush(open_list, candidate)

        return [li.node for li in reversed(list(target))]

    def _cost(self, target: CalcNode, candidate: CalcNode) -> float:
        return math.sqrt((candidate.x - target.x)**2 + (candidate.y - target.y)**2)

    def _find_candidate(self, li: List[CalcNode], candidate: CalcNode) -> Optional[CalcNode]:
        try:
            return li[li.index(candidate)]
        except ValueError:
            return None

    @abstractmethod
    def _hs(self, node: CalcNode) -> float:
        raise NotImplementedError


class Astar(Searcher):
    def _hs(self, node: CalcNode) -> float:
        return math.sqrt((node.x - self.goal.x)**2 + (node.y - self.goal.y)**2)


class Dijkstra(Searcher):
    def _hs(self, node: CalcNode) -> float:
        return 0.0

速度改善したA*

WikipediaのA*の解説「実際に使われているOPEN/CLOSEリストの実装」をほぼそのままですが、ノードが持つタグとして OPENCLOSE以外にNEWも持たせ、ノードの初期状態では NEW となるようにしています。open_listとclose_list間でのノードの移動が不必要になり、重複チェックのためのlist内のノード探索が無くなったことが、非常に大きな速度改善効果を及ぼしている気がします。
こちらの実装は、open_listにノードが重複して追加されるため、速度改善の代わりにメモリを消費するはずです。open_listにはノードオブジェクトそのものではなくfsとidのタプルだけ登録するので、そこまで問題にはならないはずですが、計測していません。。。

fast_astar.py
class NodeType(Enum):
    NEW = auto()
    OPEN = auto()
    CLOSE = auto()
fast_astar.py
class FastAstar:
    def __init__(self, start: Node, goal: Node):
        self.start = start
        self.goal = goal

    def calculate(self) -> List[Node]:
        target: CalcNode
        table: Dict[int, CalcNode] = {}
        open_list: List[Tuple[float, int]] = []
        heapq.heapify(open_list)

        start = CalcNode.new(self.start)
        start.fs = self._hs(start)

        table[start.id] = start
        heapq.heappush(open_list, (start.fs, start.id))

        while True:
            if len(open_list) == 0:
                print('No route found')
                return []

            target = table[heapq.heappop(open_list)[1]]
            if target.nodetype == NodeType.CLOSE:
                continue

            if target.node == self.goal:
                break

            target.nodetype = NodeType.CLOSE

            target.gs = target.fs - self._hs(target)

            for candidate in target.nexts:
                fs = target.gs + self._cost(target, candidate) + self._hs(candidate)

                if candidate.nodetype == NodeType.NEW:
                    candidate.fs = fs
                    candidate.parent = target
                    candidate.nodetype = NodeType.OPEN
                    table[candidate.id] = candidate
                    heapq.heappush(open_list, (candidate.fs, candidate.id))
                else:
                    if fs < candidate.fs:
                        candidate.fs = fs
                        candidate.parent = target
                        candidate.nodetype = NodeType.OPEN
                        table[candidate.id] = candidate
                        heapq.heappush(open_list, (candidate.fs, candidate.id))

        return [li.node for li in reversed(list(target))]

    def _cost(self, target: CalcNode, candidate: CalcNode) -> float:
        return math.sqrt((candidate.x - target.x)**2 + (candidate.y - target.y)**2)

    def _hs(self, node: CalcNode) -> float:
        return math.sqrt((node.x - self.goal.x)**2 + (node.y - self.goal.y)**2)

ソースコード

nmatsui/path-searchとしてgithubに公開していますので、よろしければイイカンジに試してみてください。バグがあったら、こっそり教えてもらえると助かります。

Why not register and get more from Qiita?
  1. We will deliver articles that match you
    By following users and tags, you can catch up information on technical fields that you are interested in as a whole
  2. you can read useful information later efficiently
    By "stocking" the articles you like, you can search right away
1
Help us understand the problem. What are the problem?