ネットワーク機器の状態確認どうしてますか??
突然ですが、皆様は、ネットワーク機器の日々の状態確認はどうやって行っていますか?
イケている企業の皆様なら、NetFlowやTelemetryで取得した情報を基に自動で分析だよ!!とかあるかもしれません。
しかし弊社では、まだ偉大な先人が遺したTera TermマクロやPHPで動くツールが現役です。
それらのツールは、社内のサービス用のCisco社やJuniper社のルータやスイッチなどの機器にログインして、いろいろなコマンドを実行してその実行結果を保存するようなことをしています。
偉大なツールの問題点
偉大な先人が遺したツールには問題点がいくつかあります。
- 実行に時間がかかる
- 1台ずつ実行するため対象ホストが数十台数百台となると、実行終了までに数十分かかることもある
- 対象ホストの追加・削除に手間がかかる
- ツールの対象ホストを追加・削除したい場合は、Tera Termマクロの場合は参照しているファイルの編集が、PHPのツールの場合は、ソースコードの変更が必要(プログラミングが苦手な運用者には心理的ハードルが高い)
ではどうするか?
以下のライブラリなどを使用して作成する方針としました。
Streamlit
JANOG53で以下のような発表がありました。
「StreamlitはNWエンジニアにとって
Web開発の銀の弾丸となり得るか?(仮)」
https://www.janog.gr.jp/meeting/janog53/wp-content/uploads/2024/01/lt02streamlit.pdf
実際に現地で聴講させていただいたのですが、かなり心に残っており今回採用しようと思い至りました。
Streamlitとは、Pythonの数十行のコードのみで簡単なGUIが作れるという代物です。
データサイエンスの分野でよく使われているようです。
https://streamlit.io/
これなら私でも、GUIを簡単に作れるのでは!?
netmiko
sshやtelnetを使用して、機器にログインしてコマンドを打てるライブラリです。
https://github.com/ktbyers/netmiko
multiprocessing
Pythonのライブラリで、プログラムの並列処理を行うライブラリです。
https://docs.python.org/ja/3/library/multiprocessing.html
これをうまく使えば、状態取得を同時並列的にたくさんできるのは!?
実装
今回はJuniper社のルータやスイッチに対して実行することにします。
以下プログラムのツリーです。
全て同じ階層に置いているのは許してください
- main.py
- base.py
- ssh.py
- commands_list.py
main.pyが置いてあるディレクトリで以下を実行
streamlit run main.py --server.port 8080
main.pyを実行し、base.pyをその中で呼び出す形としています。
また、コマンドは、command_list.pyに格納して呼び出しています。
import sqlite3
from base import base
# 機器ログインに使用するusernameとpasswordを記載
# .envに書いて読み込む方式も可
SSH_USERNAME = "testuser"
SSH_PASSWORD = "testtest"
# データベースのセットアップ
conn = sqlite3.connect("devices.db")
c = conn.cursor()
c.execute(
"""
CREATE TABLE IF NOT EXISTS devices (
id INTEGER PRIMARY KEY AUTOINCREMENT,
hostname TEXT,
ip_address TEXT,
device_type TEXT,
service_type TEXT,
register_date TIMESTAMP
)
"""
)
conn.commit()
db = "sqlite"
def main(SSH_USERNAME, SSH_PASSWORD, c, conn, db):
base(SSH_USERNAME, SSH_PASSWORD, c, conn, db)
if __name__ == "__main__":
main(SSH_USERNAME, SSH_PASSWORD, c, conn, db)
import streamlit as st
import pandas as pd
import multiprocessing
from datetime import datetime, timedelta
import time
import ipaddress
import re
import commands_list
from ssh import ssh_login
from device_delete import delete_device
def execute_state_check(
selected_devices, df, devices, SSH_USERNAME, SSH_PASSWORD, start_date, end_date
):
# 実行時に再度日付を設定
commands_list.set_dates(start_date, end_date)
# print(selected_devices)
selected_rows = df.loc[selected_devices]
with multiprocessing.Pool(processes=30) as pool:
results = pool.starmap(
ssh_login,
[
(devices[device_id], SSH_USERNAME, SSH_PASSWORD)
for device_id in selected_rows.index
],
)
# ローカルにファイルを保存
# output_file = f"results_{datetime.now().strftime('%Y%m%d_%H%M%S')}.txt"
# with open(output_file, 'w') as f:
# for result in results:
# f.write(result + '\n')
output_data = "\n\n".join(results)
st.download_button(
label="結果をダウンロード",
data=output_data,
file_name=f"results_{datetime.now().strftime('%Y%m%d_%H%M%S')}.txt",
mime="text/plain",
)
def extract_suffix_number(hostname):
match = re.search(r"t.*(\d{2})$", hostname)
return int(match.group(1)) if match else None
def base(SSH_USERNAME, SSH_PASSWORD, c, conn, db):
st.title("State Check")
st.markdown("---")
with st.sidebar:
st.write("MX用のコマンド", commands_list.mx_commands_explain)
st.write("EX,QFX用のコマンド", commands_list.l2sw_commands_explain)
# サービス名がTestCheckの物のみ取得
if db == "postgres":
c.execute("SELECT * FROM devices WHERE service_type = %s", ("TestCheck",))
else:
c.execute('SELECT * FROM devices WHERE service_type = ?', ("TestCheck",))
devices = c.fetchall()
# 機種でソートしつつ、その次にIPアドレスでソート。MXが先に来るようにカスタムソートしている
df = pd.DataFrame(
devices, columns=["ID", "ホスト名", "IPアドレス", "機種", "サービス名", "登録日"]
)
custom_sort = {"MX": 0, "QFX": 1, "EX": 2}
df["機種ソート"] = df["機種"].map(custom_sort)
# sorted_df = df.sort_values(by=['機種ソート', 'IPアドレス']).drop(columns=['機種ソート'])
# IPアドレスを数値に変換してソート
df["IPアドレスソート"] = df["IPアドレス"].apply(
lambda x: int(ipaddress.IPv4Address(x))
)
# ホスト名の末尾2桁を数値に変換
df["ホスト名末尾数字"] = df["ホスト名"].apply(extract_suffix_number)
# 機種ごとに異なるソート基準を適用
# mxのソート処理。
sorted_df = df.copy()
sorted_df_mx = df[df["機種"] == "MX"].sort_values(by=["IPアドレスソート"])
# EXのソート処理。ホスト名がtで始まる場合は、ホスト名の末尾2桁でソートし、それ以外の場合はホスト名でソート
ex_df = df[df["機種"] == "EX"]
ex_cc1 = ex_df[ex_df["ホスト名"].str.startswith("t")].sort_values(
by=["ホスト名末尾数字"]
)
ex_non_cc1 = ex_df[~ex_df["ホスト名"].str.startswith("t")].sort_values(
by=["ホスト名"]
)
sorted_df_ex = pd.concat([ex_cc1, ex_non_cc1])
# QFXのソート処理。ホスト名がtで始まる場合は、ホスト名の末尾2桁でソートし、それ以外の場合はホスト名でソート
qfx_df = df[df["機種"] == "QFX"]
qfx_cc1 = qfx_df[qfx_df["ホスト名"].str.startswith("t")].sort_values(
by=["ホスト名末尾数字"]
)
qfx_non_cc1 = qfx_df[~qfx_df["ホスト名"].str.startswith("t")].sort_values(
by=["ホスト名"]
)
sorted_df_qfx = pd.concat([qfx_cc1, qfx_non_cc1])
sorted_df = pd.concat([sorted_df_mx, sorted_df_qfx, sorted_df_ex]).drop(
columns=["機種ソート", "IPアドレスソート", "ホスト名末尾数字"]
)
if "checkbox_states" not in st.session_state:
st.session_state.checkbox_states = {
row["ID"]: True for index, row in df.iterrows()
}
if "delete_confirm" not in st.session_state:
st.session_state.delete_confirm = {}
if "selected_devices" not in st.session_state:
st.session_state.selected_devices = []
# 日付入力欄を追加
today = datetime.now().date()
yesterday = today - timedelta(days=1)
st.write("### ログ取得日時設定")
col1, col2 = st.columns(2)
with col1:
start_date = st.date_input(
"開始日",
value=yesterday,
on_change=lambda: commands_list.set_dates(
st.session_state["start_date"], st.session_state["end_date"]
),
key="start_date",
)
with col2:
end_date = st.date_input(
"終了日",
value=today,
on_change=lambda: commands_list.set_dates(
st.session_state["start_date"], st.session_state["end_date"]
),
key="end_date",
)
if st.button("State Check 実行"):
execute_state_check(
st.session_state.selected_devices,
df,
devices,
SSH_USERNAME,
SSH_PASSWORD,
start_date,
end_date,
)
st.markdown("---")
st.write("## デバイス一覧(機種でソート)")
# 最新の登録日を取得して表示
st.write("### 更新履歴")
if db == "postgres":
# 最新のregister_dateを取得
c.execute("SELECT MAX(register_date::date) FROM devices WHERE service_type = %s", ("TestCheck",))
else:
c.execute('SELECT MAX(DATE(register_date)) FROM devices WHERE service_type = ?', ("TestCheck",))
latest_register_date = c.fetchone()[0]
if db == "postgres":
# 最新のregister_dateの「日付」と一致するデバイスを取得
c.execute("SELECT hostname FROM devices WHERE service_type = %s AND register_date::date = %s", ("TestCheck", latest_register_date))
else:
c.execute('SELECT hostname FROM devices WHERE service_type = ? AND DATE(register_date) = ?', ("TestCheck", latest_register_date,))
latest_devices = c.fetchall()
if latest_register_date and latest_devices:
st.write(f"最新の登録日: {latest_register_date}")
st.write(f"デバイス名: {latest_devices}")
else:
st.write("直近で登録されたデバイスはありません。")
st.write("### チェックボックスを外すと実行対象から外れます")
if st.button("全選択"):
for key in st.session_state.checkbox_states.keys():
st.session_state.checkbox_states[key] = True
st.rerun()
if st.button("選択全解除"):
for key in st.session_state.checkbox_states.keys():
st.session_state.checkbox_states[key] = False
st.rerun()
st.markdown("---")
# カラムのヘッダーを追加
col1, col2, col3, col4, col5 = st.columns([0.3, 0.3, 0.3, 0.2, 0.3])
with col1:
st.write("対象機器選択")
with col2:
st.write("ホスト名")
with col3:
st.write("IPアドレス")
with col4:
st.write("機種")
with col5:
st.write("システム登録削除")
st.warning("削除と入力後、ボタンを押す")
st.markdown("---")
for index, row in sorted_df.iterrows():
col1, col2, col3, col4, col5 = st.columns([0.3, 0.3, 0.3, 0.2, 0.3])
with col1:
state = st.session_state.checkbox_states.get(row["ID"], True)
if st.checkbox(
label=str(row["ID"]),
value=state,
key=str(row["ID"]),
label_visibility="collapsed",
):
if row.name not in st.session_state.selected_devices:
st.session_state.selected_devices.append(row.name)
st.session_state.checkbox_states[row["ID"]] = True
else:
if row.name in st.session_state.selected_devices:
st.session_state.selected_devices.remove(row.name)
st.session_state.checkbox_states[row["ID"]] = False
with col2:
st.write(row["ホスト名"])
with col3:
st.write(row["IPアドレス"])
with col4:
st.write(row["機種"])
with col5:
confirm_key = f"confirm_{row['ID']}"
if confirm_key not in st.session_state:
st.session_state[confirm_key] = ""
confirm_input = st.text_input(
"確認",
value=st.session_state[confirm_key],
key=confirm_key,
placeholder="削除",
label_visibility="collapsed",
)
if confirm_input == "削除":
if st.button("削除する", key=f"delete_{row['ID']}"):
if st.session_state.delete_confirm.get(row["ID"]):
delete_device(row["ID"], row["ホスト名"], c, conn, db)
# 確認状態をリセット
st.session_state.delete_confirm.pop(row["ID"], None)
else:
st.session_state.delete_confirm[row["ID"]] = True
st.warning(
f"本当に{row['ホスト名']}を削除しますか?もう一度削除ボタンを押してください。"
)
st.markdown("---")
if "form_key" not in st.session_state:
st.session_state.form_key = "device_form"
st.write("## 機器情報の新規登録")
st.subheader("登録された情報は上のリストに表示されます")
with st.form(key=st.session_state.form_key):
hostname = st.text_input("ホスト名")
ip_address = st.text_input("IPアドレス")
device_type = st.radio("機種", ("MX", "QFX", "EX"))
submit_button = st.form_submit_button(label="登録")
if submit_button:
# 登録する際は、入力された情報に加えて、サービスタイプも追加している(この場合は、TestCheck)
now = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
try:
if db == "postgres":
c.execute(
"INSERT INTO devices (hostname, ip_address, device_type, service_type, register_date) VALUES (%s, %s, %s, %s, %s)",
(hostname, ip_address, device_type, "TestCheck", now),
)
else:
c.execute(
"INSERT INTO devices (hostname, ip_address, device_type, service_type, register_date) VALUES (?, ?, ?, ?, ?)",
(hostname, ip_address, device_type, "TestCheck", now),
)
conn.commit()
st.success("データベースに登録されました")
st.session_state.checkbox_states[
f"{hostname} ({ip_address}) ({device_type})"
] = True
st.session_state["data_changed"] = True
time.sleep(1.5)
st.rerun()
except Exception as e:
print(f"データ挿入中にエラーが発生しました: {e}")
conn.rollback()
import time
import streamlit as st
# デバイス削除関数
def delete_device(device_id, device_name, c, conn, db):
if db == "postgres":
c.execute("DELETE FROM devices WHERE id = %s", (device_id,))
else:
c.execute("DELETE FROM devices WHERE id = ?", (device_id,))
conn.commit()
st.success("データベースから削除されました")
if device_name in st.session_state.checkbox_states:
del st.session_state.checkbox_states[device_name]
st.session_state["data_changed"] = True
time.sleep(1.5)
st.rerun()
from netmiko import (
ConnectHandler,
SSHDetect,
NetmikoTimeoutException,
NetmikoAuthenticationException,
)
from commands_list import get_commands
def ssh_login(device, SSH_USERNAME, SSH_PASSWORD):
l2sw_commands, mx_commands = get_commands()
hostname, ip, device_type = device[1], device[2], device[3]
device_params = {
"device_type": "autodetect",
"host": ip,
"username": SSH_USERNAME,
"password": SSH_PASSWORD,
}
commands = mx_commands if device_type == "MX" else l2sw_commands
try:
guesser = SSHDetect(**device_params)
best_match = guesser.autodetect()
device_params["device_type"] = best_match
connection = ConnectHandler(**device_params)
output = f"------------------------{hostname} begin------------------------\n"
output += f"{hostname} ({ip}) ({device_type})\n"
for command in commands:
output += f"\n> {command}\n"
output += connection.send_command(command, read_timeout=10)
output += f"\n------------------------{hostname} end------------------------\n\n"
connection.disconnect()
return output
except (NetmikoTimeoutException, NetmikoAuthenticationException) as e:
return f"Error connecting to {hostname} ({ip}) ({device_type}): {e}"
finally:
if 'connection' in locals():
connection.disconnect()
from datetime import datetime, timedelta, date
import time
# グローバル変数を使用してコマンドを格納する
l2sw_commands = []
mx_commands = []
def set_dates(start, end):
global l2sw_commands, mx_commands
start_date = start
end_date = end
# 日付の範囲内のすべての日付を取得
current_date = start_date
date_list = []
while current_date <= end_date:
# 8/1ならばAug 1, 8/31ならばAug 18で日付をフォーマット
formatted_date = current_date.strftime("%b %d").replace(" 0", " ")
date_list.append(formatted_date)
current_date += timedelta(days=1)
# 日付をパイプで結合してday_strに格納
day_str = "|".join(date_list)
# print("日付範囲:", day_str)
l2sw_commands = [
"set cli screen-length 0",
"set cli screen-width 1024",
f'show log messages.0.gz | match "{day_str}" | no-more',
f'show log messages | match "{day_str}" | no-more',
"show chassis routing-engine",
"show chassis alarms",
"show system alarms",
"show interfaces terse",
"show vlans",
]
mx_commands = [
"set cli screen-length 0",
"set cli screen-width 1024",
f'show log messages.0.gz | match "{day_str}" | no-more',
f'show log messages| match "{day_str}" | no-more',
"show chassis routing-engine",
"show chassis alarms",
"show system alarms",
"show interfaces terse",
"show rsvp session",
"show mpls lsp",
"show vpls connections",
]
# print(l2sw_commands, mx_commands)
def get_commands():
return l2sw_commands, mx_commands
l2sw_commands_explain = [
"set cli screen-length 0",
"set cli screen-width 1024",
"show log messages.0.gz | no-more\n⇒ 異常なログが出力されていないか",
"show log messages| no-more\n⇒ 異常なログが出力されていないか",
"show chassis routing-engine",
"show chassis alarms",
"show system alarms",
"show interfaces terse",
]
mx_commands_explain = [
"set cli screen-length 0\n⇒ CLIの画面の長さを設定",
"set cli screen-width 1024\n⇒ CLIの画面の幅を設定",
'show log messages.0.gz | no-more\n⇒ 異常なログが出力されていないか',
'show log messages | no-more\n⇒ 異常なログが出力されていないか',
"show chassis routing-engine\n⇒ CPU使用率が高負荷となっていないか",
"show chassis alarms\n⇒ アラームが出力されていないか",
"show system alarms\n⇒ アラームが出力されていないか",
"show interfaces terse\n⇒ interfaceがdownとなっていないか",
]
# テスト用
if __name__ == "__main__":
start_date = date(2023, 8, 28)
end_date = date(2023, 9, 2)
set_dates(start_date, end_date)
動かしてみた
必要なライブラリは以下です。
今回はパッケージ管理にpoetryを使用しています
pythonは、3.12.0を使用しています
※poetryのインストール時にpythonのバージョンは必ず指定してあげてください
poetryのインストール方法は割愛していますが、こちらの記事で解説してます
https://qiita.com/amanem/items/e83e414efd8f04bbd2c6
[tool.poetry]
name = "statecheck"
version = "0.1.0"
description = ""
authors = ["None"]
readme = "README.md"
[tool.poetry.dependencies]
python = "3.12.0"
streamlit = "1.37.1"
python-dotenv = "1.0.1"
netmiko = "4.4.0"
pandas = "2.2.2"
setuptools = "75.6.0"
[build-system]
requires = ["poetry-core"]
build-backend = "poetry.core.masonry.api"
main.pyのあるディレクトリ上で、以下を実行します
poetry install
poetry shell
streamlit run main.py --server.port 8080
# port番号はお好きな番号に変えて下さい
そうすると、ブラウザが起動して以下の画面が表示されるはずです。
機器の登録
機器を何台か登録してみましょう
機器は、画面下の機器情報の新規登録の部分から登録できます。
必要事項を入力して、登録ボタンを押してください。
成功すれば、以下のダイアログが表示されるはずです。
機器の削除
先程登録した機器を削除してみましょう
デバイス一覧に表示されているはずです。
テキスト入力欄に「削除」と入力すると削除ボタンが表示されます。
削除ボタンを押すと、最終の確認ダイアログが表示されるので、問題なければもう一度削除ボタンを押すと削除完了です。
ログを取ってみよう
「全選択」と「選択全解除」ボタンを押すと、ログ取得の対象を全選択と選択全解除ができます。
もちろん1台ずつチェックを外すこともできます
ログ取得日時は、自由に選択可能です。
※あまり長すぎるとログが消えている可能性があります
いよいよ実行ボタンを押してみましょう。
StateCheck実行ボタンを押すと状態確認が実行されます
実行中は以下のように右上にトライアスロンしている人が表示されます(笑)
ここからダウンロードして完了となります。
実際に、このツールを導入して、状態確認スクリプトの実行時間は、
20分⇒30秒に短縮されました。
実環境では、このツールをdockerコンテナの中で動かしていたりするので、時間があればそのあたりの解説もできればと思います。