0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

私でもできる! pingコマンドに頼らない死活監視ツール

Last updated at Posted at 2025-11-19

ここで言う「私でもできる!」は筆者でもできるという意味であり、これを読んだ全ての人ができるという意味ではないので注意

目的

deadmanという死活監視ツールがありますが、これの中身はpingコマンドを使用してホストの監視を行っています。
実運用ではこれ以上無いくらい安定していますが、これを自作したいなと思ったので今回はicmpパケットを自作して死活監視を行えるツールを作成します。

出来上がったもの

ICMP編

まずはICMPの実装を行います

ICMPってどういう形

pingはicmpを利用していますが、その中身はとても簡単で

wikipediaより引用させていただきますが、
image.png
この形でデータをセットしてあげれば大丈夫です。

タイプ

タイプは疎通監視時に限定すれば大きく分けて3種類あります。

  • 送信時 (8)
  • 応答時 (0)
  • エラー時 (3, 11)

これらの場合について考慮してあげれば良くすべてのパターンに対応させる必要は(死活監視だけにおいては)ありません。

コード

コードは番号によってエラーの種類が分類されているエラーコードのような認識で大丈夫です。
wikipedia にあるように送信時は0、リプライ時にタイプがエラーだった際に見ると幸せなくらいです。

シーケンス番号

何番目に送信したパケットなのかを判断するものです。
for文などで順番に生成して問題ないです。

識別子

監視先毎に分けることでどの監視先から返答されたものなのかを判断します。
コード中では(id)で記載されていることが多いです。

チェックサム

パケットが壊れていないことを保証する部分で、後述するビット演算を行います。

実際にコーディングをしてみる

基本的な実装は
Pythonでpingの実装してみる
というサイトを参考に進めます。

基本的に型定義なので同じように進めますが、エラーの型定義を追加したり、deadman用に
idseqの判別を追加しています。

class ICMPType(Enum):
    # 正常系
    ECHOREPLY = 0
    ECHO = 8
    
    # エラー系
    DESTINATION_UNREACHABLE = 3
    TIME_EXCEEDED = 11
    def __int__(self):
        return self.value
@contextmanager
def raw_socket():
    sock = socket.socket(socket.AF_INET, socket.SOCK_RAW, socket.IPPROTO_ICMP)
    try:
        yield sock
    finally:
        sock.close()
def ping(
    host: str, seq: int, id: int, timeout: int = 5
) -> Optional[Tuple[IPHeader, ICMPEcho, float]]:
    with raw_socket() as sock:
        sock.settimeout(timeout)
        sock.setsockopt(socket.IPPROTO_IP, socket.IP_TTL, TTL)
        # パケットの作成
        # ここのid/seqを動的に渡す
        packet = ICMPEcho(ICMPType.ECHO, 0, id, seq, b"\xff").to_bytes()
        # パケットを送信
        sock.sendto(packet, (host, 0))
        send_time = time.time

        # パケットを受信
        while True:
            try:
                # 受信したパケットの解析
                data, addr = sock.recvfrom(4096)
                response_time = time.time()

                ip_header, payload = parse_ip_datagram(data)
                echo_reply = ICMPEcho.from_bytes(payload)
                # idが送信したものと同一ならreturn
                if echo_reply.id == id:
                    if echo_reply.type == ICMPType.ECHOREPLY:
                        rtt = (response_time - send_time) * 1000
                        return (ip_header, echo_reply, rtt)

            except socket.timeout:
                print("Ping timed out")
                return None, None, None

TCP編(HTTP)

次にTCPです。今回はHTTPの死活監視について作成しましたが、少し変更すればSSH等の他のプロトコルについても対応可能だと思います。

icmpの際はSOCK_RAWを使用して実装を行いましたが、TCPのパケットを実装する際にはSOCK_STREAMを使用します。

SOCK_RAWを用いても、図のようなパケット構造を作成すればパケットの送受信が可能となるのですが、自作3wayhandshakeはRSTされてしまいます。(理由については作成中です)

これを対策するために、素直にSOCK_STREAMを使用してTCPのヘッダーをいい感じ™に作成してもらいます

TCPの形

ネットワークエンジニアとしてより引用させていただくこのフォーマットの上位20byteのヘッダー部分が前述に該当します。

スクリーンショット 2025-09-08 16.45.21.png

このデータ部分に各プロトコルのデータが入りますが、今回作成するHTTPでは、ここへHTTPリクエストを挿入します。

コード

基本的な実装はping関数を参考にして実装しています。前述の通りSOCK_STREAMで実装を行います。
リクエスト送信後、受信したデータの中からレスポンスコードだけを抽出して上位関数へ返します。

リクエストメッセージは
"GET / HTTP/1.1\r\nHost: {host}\r\nConnection: close\r\n\r\n"
と簡単なもので作成しており、指定したURLのWEBページが取得できるかどうかのみを要求しています。({host}についてはping(略)関数の引数と同様のstr型です。)

@contextmanager
def stream_socket():
    sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    try:
        yield sock
    finally:
        sock.close()
def tcp(host: str, port: int, timeout: int = TIMEOUT) -> Optional[bytes]:

    try:
        remote_ip = socket.gethostbyname(host)
        with stream_socket() as sock:
            sock.settimeout(timeout)
            sock.connect((remote_ip, port))

            request = f"GET / HTTP/1.1\r\nHost: {host}\r\nConnection: close\r\n\r\n"
            sock.send(request.encode())
            send_time = time.time()
            response = b""
            while True:
                data = sock.recv(4096)
                if not data:
                    break  # Connection closed by the server
                response += data
            response_time = time.time()
            rtt = (response_time - send_time) * 1000
            match = re.search(r"HTTP/\d\.\d (\d{3})", response.decode(errors="ignore"))
            return [match.group(1), rtt]

    except socket.timeout:
        print("Socket timed out.")
        return None
    except Exception as e:
        print(f"An error occurred: {e}")
        return None

全てをまとめる上位関数編

これで、単体で動作するプログラムを作成することができました。

ただ、これでは本家のように複数台管理することができないため、これを並列に動作できる上位関数を作り複数台の死活監視を行います。

async def main(sleep_interval: int = 1):
    queue = asyncio.Queue()

    with Live(generate_table(monitor_table), refresh_per_second=4, screen=True) as live:
        # UIを更新する単一タスクを生成
        consumer_task = asyncio.create_task(consumer(queue, live))

        # 各ホストのpingを実行するタスクを生成
        producer_tasks = [
            asyncio.create_task(producer(queue, host, id, sleep_interval))
            for id, host in enumerate(toml_data["targets"])
        ]
        all_tasks = producer_tasks + [consumer_task]
        try:
            await asyncio.gather(*all_tasks)
        except KeyboardInterrupt:
            console.print("Exiting...")
        finally:
            # 終了時にすべてのタスクを安全にキャンセル
            for task in all_tasks:
                task.cancel()
            await asyncio.gather(*all_tasks, return_exceptions=True)

今回はasyncioで複数プロセスを作成して実装を行いました。あとはGUIやCLIの設定を書き込んでwhileとtimeで適度にpingを送信してあげればいい感じにできます。

0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?