8
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

Cisco Systems JapanAdvent Calendar 2024

Day 11

pyATSを使ったCiscoデバイスのCDP情報収集+簡易構成図を作成してみた

Last updated at Posted at 2024-12-10

この記事はシスコの有志による Cisco Systems Japan Advent Calendar 2024 シリーズ 2 の 11 日目として投稿しております。

Cisco Systems Japan Advent Calendar
2017年版: https://qiita.com/advent-calendar/2017/cisco
2018年版: https://qiita.com/advent-calendar/2018/cisco
2019年版: https://qiita.com/advent-calendar/2019/cisco
2020年版 1枚目: https://qiita.com/advent-calendar/2020/cisco
2020年版 2枚目: https://qiita.com/advent-calendar/2020/cisco2
2021年版 1枚目https://qiita.com/advent-calendar/2021/cisco
2021年版 2枚目https://qiita.com/advent-calendar/2021/cisco2
2022年版(1,2): https://qiita.com/advent-calendar/2022/cisco
2023年版: https://qiita.com/advent-calendar/2023/cisco
2024年版: https://qiita.com/advent-calendar/2024/cisco

はじめに

ネットワーク管理や運用において、最近は、自動化による効率化が求められる場面が増えてきています。
手作業で行っている資料作成をもっと自動化したい
本当は構成図作るのめんどくさい
ので、コードを使って取り組みました。この記事では、Ciscoデバイスに接続して必要な情報を収集し、効率よく資料作成することにフォーカスを当てた記事となってます。
この記事を通じて、日常の作業効率を向上させるヒントになれば幸いです。

実行環境

Python 3.10.12
pyATS Version: 24.11
Catalyst 3850 V16.06.04s
Catalyst 9300 V17.15.01

ファイル構成

project/
├── job.py                 
├── testbed.yaml           
├── task/
│   ├── cdp.py                
│   ├── network_topology.py   
├── logs/                     
│   ├── <デバイス名>_show_version.txt
│   ├── <デバイス名>_show_cdp_neighbors.txt

実装内容

各ファイルの詳細を記載します。

1.ジョブスクリプト(job.py)

テストケースの実行順序と各スクリプトの呼び出しを定義します。
cdp.pyとnetwork_topology.pyのタスクを順番に実行します。

job.py
    from pyats.easypy import run
    
    def main(runtime):
    """
    Main function required for pyats run job.
    This function is the entry point for running the job.
    """
    print("Starting the pyATS job...")

    # CDP テストの実行
    print("Running CDP test script...")
    run(testscript='./task/cdp.py', runtime=runtime)
    print("CDP test script completed.")

    # Network Topology テストの実行
    print("Running Network Topology test script...")
    run(testscript='./task/network_topology.py', runtime=runtime)
    print("Network Topology test script completed.")

    print("Job completed successfully.")

2.テストヘッド定義(testbed.yaml)

testbed.yaml
testbed:
  name: MyTestbed
  credentials:
    default:
      username: "username"
      password: "password"

devices:
  Cat3850:
    alias: "switch1"
    os: iosxe
    connections:
      cli:
        protocol: telnet
        ip: X.X.X.X
devices:
  Cat9300:
    alias: "switch2"
    os: iosxe
    connections:
      cli:
        protocol: telnet
        ip: X.X.X.X        

3.CDP情報取得(cdp.py)

各機器に接続してshow versionとshow cdp neighborsコマンドを実行します。

cdp.py
import logging
import os
from pyats.topology import loader
from pyats.aetest import Testcase, test, setup, cleanup

# ログ設定
logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s")

class CDPTelnetTest(Testcase):
    @setup
    def setup(self):
        """
        Testbed をロードして利用可能なデバイスを確認。
        """
        try:
            # testbed.yaml の"絶対"パスを取得
            testbed_path = os.path.abspath(os.path.join(os.path.dirname(__file__), '../testbed.yaml'))
            self.testbed = loader.load(testbed_path)
            logging.info("Testbed loaded successfully.")

            # 利用可能なデバイスをフィルタリング
            self.devices = [device for device in self.testbed.devices.values() if device.os == 'iosxe']
            if not self.devices:
                self.failed("No devices available in the testbed!")
            logging.info(f"Available devices: {[device.name for device in self.devices]}")
        except Exception as e:
            logging.error(f"Error during setup: {str(e)}")
            self.failed("Failed to load testbed or find devices.")

    @test
    def test_telnet_and_collect_cdp(self):
        """
        各デバイスに Telnet 接続し、CDP 情報と 'show version' の出力を収集。
        """
        for device in self.devices:
            logging.info(f"Attempting to connect to {device.name} at {device.connections.cli.ip}")

            try:
                # デバイスに接続
                device.connect(via='cli')
                logging.info(f"Connected to {device.name}")

                # 'show version' コマンドを実行し、結果を保存
                output = device.execute('show version')
                self._save_output(device.name, "show_version", output)

                # 'show cdp neighbors' コマンドを実行し、結果を保存
                cdp_output = device.execute('show cdp neighbors')
                self._save_output(device.name, "show_cdp_neighbors", cdp_output)

            except Exception as e:
                logging.error(f"Error with {device.name}: {str(e)}")
                self.failed(f"Failed to connect to or execute commands on {device.name}")

            finally:
                device.disconnect()
                logging.info(f"Disconnected from {device.name}")

    @cleanup
    def cleanup(self):
        """
        テスト後のクリーンアップ処理。
        """
        logging.info("Cleanup completed.")

    def _save_output(self, device_name, command_name, output):
        """
        コマンドの出力をファイルに保存するヘルパーメソッド。
        """
        try:
            # logs ディレクトリの絶対パスを取得
            base_dir = os.path.dirname(os.path.abspath(__file__))  # 現在のスクリプトのディレクトリ
            logs_dir = os.path.join(base_dir, "../logs")  # logs フォルダの絶対パス
            os.makedirs(logs_dir, exist_ok=True)  # logs ディレクトリが存在しない場合は作成

            # 保存先ファイルパスを生成
            file_path = os.path.join(logs_dir, f"{device_name}_{command_name}.txt")
            
            # ファイルに出力を保存
            with open(file_path, "w") as f:
                f.write(output)
            logging.info(f"Output of '{command_name}' saved for {device_name} at {file_path}.")
        except Exception as e:
            logging.error(f"Failed to save output for {device_name}: {str(e)}")
            self.failed(f"Failed to save output for {device_name}.")

if __name__ == "__main__":
    from pyats.aetest import main
    main()

4.ネットワークトポロジ収集と描画(network_topology.py)

トポロジ情報をもとにネットワーク構成図を描画します。

network_topology.py
from pyats import aetest
from pyats.topology import loader
from genie.conf import Genie
import networkx as nx
import matplotlib.pyplot as plt
import os
from matplotlib.patches import FancyBboxPatch

def draw_fixed_rectangle_nodes(graph, pos, ax, fontsize=10):
    rect_width = 0.3  # 図形の横幅
    rect_height = 0.15  # 図形の縦幅

    for node, (x, y) in pos.items():
        # ホスト名から「.」以降を削除
        text = node.split('.')[0] if '.' in node else node
        rect = FancyBboxPatch(
            (x - rect_width / 2, y - rect_height / 2),  # 左下座標
            rect_width, rect_height,  # 幅と高さ
            boxstyle="round,pad=0.02",
            linewidth=1,
            edgecolor="black",
            facecolor="lightblue"
        )
        ax.add_patch(rect)
        ax.text(
            x, y,
            text,
            ha="center", va="center",
            fontsize=max(fontsize, 8),  # 最小フォントサイズを8に制限
            fontweight="bold"
        )

class NetworkTopology(aetest.Testcase):
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.graph = None
        self.fig = None
        self.ax = None
        self.pos = None
        self.fontsize = 10  # 初期フォントサイズ
        self.dragging = False
        self.last_mouse_position = None

    @aetest.setup
    def setup(self):
        try:
            print("Loading testbed.yaml...")
            testbed_path = os.path.abspath(os.path.join(os.path.dirname(__file__), '../testbed.yaml'))
            self.testbed = loader.load(testbed_path)
            print("Testbed loaded successfully.")
            self.genie_testbed = Genie.init(self.testbed)
            self.graph = nx.Graph()
            print("Setup completed successfully.")
        except Exception as e:
            print(f"Error during setup: {e}")
            self.failed(f"Testbed loading or initialization failed: {e}")

    @aetest.test
    def collect_topology(self):
        try:
            for device_name, device in self.genie_testbed.devices.items():
                print(f"Connecting to device: {device_name}")
                device.connect()
                cdp_info = device.parse('show cdp neighbors detail')
                for entry in cdp_info['index'].values():
                    local_device = device_name
                    local_intf = entry['local_interface']
                    remote_device = entry['device_id']
                    remote_intf = entry['port_id']
                    self.graph.add_edge(
                        local_device,
                        remote_device,
                        label=f"{local_intf}{remote_intf}"
                    )
                device.disconnect()
                print(f"Disconnected from {device_name}.")
        except Exception as e:
            print(f"Error during topology collection: {e}")
            self.failed(f"Failed to collect topology information: {e}")

    @aetest.cleanup
    def draw_topology(self):
        try:
            print("Drawing network topology...")
            self.fig, self.ax = plt.subplots(figsize=(12, 10))
            self.pos = nx.spring_layout(self.graph, k=2, iterations=100)

            self.redraw()

            self.fig.canvas.mpl_connect('scroll_event', self.on_scroll)
            self.fig.canvas.mpl_connect('button_press_event', self.on_mouse_press)
            self.fig.canvas.mpl_connect('button_release_event', self.on_mouse_release)
            self.fig.canvas.mpl_connect('motion_notify_event', self.on_mouse_move)
            plt.show()
            print("Network topology drawn successfully.")
        except Exception as e:
            print(f"Error during topology drawing: {e}")
            self.failed(f"Failed to draw topology: {e}")

    def redraw(self):
        self.ax.clear()

        # エッジ描画
        for u, v, data in self.graph.edges(data=True):
            x1, y1 = self.pos[u]
            x2, y2 = self.pos[v]
            self.ax.plot([x1, x2], [y1, y2], color="black", lw=1, zorder=1)

        # ノード描画
        draw_fixed_rectangle_nodes(self.graph, self.pos, self.ax, fontsize=self.fontsize)

        # ラベル描画 (動的オフセット調整)
        for i, (u, v, data) in enumerate(self.graph.edges(data=True)):
            x1, y1 = self.pos[u]
            x2, y2 = self.pos[v]
            mid_x, mid_y = (x1 + x2) / 2, (y1 + y2) / 2

            # 動的オフセットをエッジごとに調整
            offset_x, offset_y = (i % 3 - 1) * 0.03, (i % 3 - 1) * 0.03
            self.ax.text(
                mid_x + offset_x, mid_y + offset_y,
                data["label"],
                fontsize=8, ha="center", va="center", color="red", zorder=3
            )

        self.ax.set_title("Network Topology (Responsive Nodes)", fontsize=16)
        self.ax.axis("off")
        self.fig.canvas.draw_idle()

    def on_scroll(self, event):
        base_scale = 1.2
        if event.button == 'up':
            scale_factor = 1 / base_scale
            self.fontsize = max(self.fontsize * base_scale, 8)  # 最小フォントサイズを8に設定
        elif event.button == 'down':
            scale_factor = base_scale
            self.fontsize *= base_scale
        else:
            return

        cur_xlim = self.ax.get_xlim()
        cur_ylim = self.ax.get_ylim()

        xdata = event.xdata
        ydata = event.ydata

        if xdata is None or ydata is None:
            return

        new_width = (cur_xlim[1] - cur_xlim[0]) * scale_factor
        new_height = (cur_ylim[1] - cur_ylim[0]) * scale_factor

        relx = (xdata - cur_xlim[0]) / (cur_xlim[1] - cur_xlim[0])
        rely = (ydata - cur_ylim[0]) / (cur_ylim[1] - cur_ylim[0])

        self.ax.set_xlim([xdata - relx * new_width, xdata + (1 - relx) * new_width])
        self.ax.set_ylim([ydata - rely * new_height, ydata + (1 - rely) * new_height])

        self.fig.canvas.draw_idle()
       #オプション:右クリック長押しでトポロジ移動&マウススクロールでのズームとズームアウト
    def on_mouse_press(self, event):
        if event.button == 3:  # 右クリック
            self.dragging = True
            self.last_mouse_position = (event.x, event.y)

    def on_mouse_release(self, event):
        if event.button == 3:  # 右クリック
            self.dragging = False
            self.last_mouse_position = None

    def on_mouse_move(self, event):
        if self.dragging and self.last_mouse_position:
            dx = event.x - self.last_mouse_position[0]
            dy = event.y - self.last_mouse_position[1]

            cur_xlim = self.ax.get_xlim()
            cur_ylim = self.ax.get_ylim()

            self.ax.set_xlim(cur_xlim[0] - dx * 0.01, cur_xlim[1] - dx * 0.01)
            self.ax.set_ylim(cur_ylim[0] - dy * 0.01, cur_ylim[1] - dy * 0.01)

            self.last_mouse_position = (event.x, event.y)
            self.fig.canvas.draw_idle()

def main():
    aetest.main()

if __name__ == '__main__':
    main()

5.ログの出力(<デバイス名>_show_cdp_neighbors.txt)

Cat3850_show_cdp_neighbors.txt
Capability Codes: R - Router, T - Trans Bridge, B - Source Route Bridge
                  S - Switch, H - Host, I - IGMP, r - Repeater, P - Phone, 
                  D - Remote, C - CVTA, M - Two-port Mac Relay 

Device ID        Local Intrfce     Holdtme    Capability  Platform      Port ID
Switch1          Gig 2/0/13        169              S I   Generic-SW1  Gig 1/0/48
Switch2          Gig 1/0/18        132             R S I  Generic-RT1  Gig 0/0/0
Switch3          Gig 1/0/24        172              S I   Generic-SW2  Fas 0/9
Switch4          Gig 1/0/21        139             R S I  Generic-SW3  Gig 1/0/24
Switch5          Gig 1/0/15        163             R S I  Generic-SW4  Gig 1/0/1
Switch6          Gig 1/0/17        139             R S I  Generic-SW5  Gig 1/0/24
Switch7          Gig 1/0/7         166               H    Generic-HOST eth0
Switch8          Gig 2/0/2         143              S I   Generic-SW6  Gig 1/0/48
Switch9          Gig 2/0/12        134             R S I  Generic-SW7  Gig 1/0/1
Switch10         Gig 1/0/22        142             R S I  Generic-SW8  Gig 1/0/48
Switch11         Gig 1/0/5         154               H    Generic-HOST eth0

Total cdp entries displayed : 11

実行結果

実行結果ログ
log.txt
(Qiita) user@hostname:~/pyats-project$ pyats run job job.py
~省略~
  The result of section collect_topology is => PASSED
  +------------------------------------------------------------------------------+
  |                        Starting section draw_topology                        |
  +------------------------------------------------------------------------------+
Drawing network topology...
Network topology drawn successfully.
 The result of section draw_topology is => PASSED
 The result of testcase NetworkTopology is => PASSED
Network Topology test script completed.
Job completed successfully.
 Saving Abstract Tree...
 Abstract Tree saved
 WebEx Token not given as argument or in config. No WebEx notification will be sent
 --------------------------------------------------------------------------------
 Job finished. Wrapping up...
 Creating archive file: /path/to/archive/job.2024Dec10_18:35:59.616800.zip
 +------------------------------------------------------------------------------+
 |                                Easypy Report                                 |
 +------------------------------------------------------------------------------+
 pyATS Instance   : /path/to/pyats-instance
 Python Version   : cpython-3.10.12 (64bit)
 CLI Arguments    : /path/to/pyats-instance/bin/pyats run job job.py
 User             : user123
 Host Server      : host123
 Host OS Version  : Ubuntu 22.04 jammy (x86_64)

 Job Information
     Name         : job
     Start time   : 2024-12-10 18:36:03.544921+09:00
     Stop time    : 2024-12-10 18:37:18.372055+09:00
     Elapsed time : 0:01:15
     Archive      : /path/to/archive/job.2024Dec10_18:35:59.616800.zip

 Total Tasks    : 2

 Overall Stats
     Passed     : 2
     Passx      : 0
     Failed     : 0
     Aborted    : 0
     Blocked    : 0
     Skipped    : 0
     Errored    : 0

     TOTAL      : 2

 Success Rate   : 100.00 %

 Section Stats
     Passed     : 6
     Passx      : 0
     Failed     : 0
     Aborted    : 0
     Blocked    : 0
     Skipped    : 0
     Errored    : 0

     TOTAL      : 6

 Section Success Rate   : 100.00 %

 +------------------------------------------------------------------------------+
 |                             Task Result Summary                              |
 +------------------------------------------------------------------------------+
 Task-1: cdp.CDPTelnetTest                                                 PASSED
 Task-2: network_topology.NetworkTopology                                  PASSED

 +------------------------------------------------------------------------------+
 |                             Task Result Details                              |
 +------------------------------------------------------------------------------+
 Task-1: cdp
 `-- CDPTelnetTest                                                         PASSED
     |-- setup                                                             PASSED
     |-- test_telnet_and_collect_cdp                                       PASSED
     `-- cleanup                                                           PASSED
 Task-2: network_topology
 `-- NetworkTopology                                                       PASSED
     |-- setup                                                             PASSED
     |-- collect_topology                                                  PASSED
     `-- draw_topology                                                     PASSED
 Sending report email...
 Missing SMTP server configuration, or failed to reach/authenticate/send mail. Result notification email failed to send.
 Done!

トポロジ図

※ホスト名は黒く塗りつぶしてます。

image.png

最後に

今回はCisco機器からCDPの情報を収集し、簡単な構成図の作成を行いました。
図のクオリティは作りこみ次第で、まだまだあげていけそうです。
図の描写には、Pythonには、外部ツールが必要がないため、networkx,matplotlibを組み合わせて作成いたしました。

参考資料

Networkxについて

Matplotlibについて

NetworkX と Matplotlib の使い方

PyATSについて

8
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
8
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?