ここで言う「私でもできる!」は筆者でもできるという意味であり、これを読んだ全ての人ができるという意味ではないので注意
目的
deadmanという死活監視ツールがありますが、これの中身はpingコマンドを使用してホストの監視を行っています。
実運用ではこれ以上無いくらい安定していますが、これを自作したいなと思ったので今回はicmpパケットを自作して死活監視を行えるツールを作成します。
出来上がったもの
ICMP編
まずはICMPの実装を行います
ICMPってどういう形
pingはicmpを利用していますが、その中身はとても簡単で
wikipediaより引用させていただきますが、

この形でデータをセットしてあげれば大丈夫です。
タイプ
タイプは疎通監視時に限定すれば大きく分けて3種類あります。
- 送信時 (8)
- 応答時 (0)
- エラー時 (3, 11)
これらの場合について考慮してあげれば良くすべてのパターンに対応させる必要は(死活監視だけにおいては)ありません。
コード
コードは番号によってエラーの種類が分類されているエラーコードのような認識で大丈夫です。
wikipedia にあるように送信時は0、リプライ時にタイプがエラーだった際に見ると幸せなくらいです。
シーケンス番号
何番目に送信したパケットなのかを判断するものです。
for文などで順番に生成して問題ないです。
識別子
監視先毎に分けることでどの監視先から返答されたものなのかを判断します。
コード中では(id)で記載されていることが多いです。
チェックサム
パケットが壊れていないことを保証する部分で、後述するビット演算を行います。
実際にコーディングをしてみる
基本的な実装は
Pythonでpingの実装してみる
というサイトを参考に進めます。
基本的に型定義なので同じように進めますが、エラーの型定義を追加したり、deadman用に
idやseqの判別を追加しています。
class ICMPType(Enum):
# 正常系
ECHOREPLY = 0
ECHO = 8
# エラー系
DESTINATION_UNREACHABLE = 3
TIME_EXCEEDED = 11
def __int__(self):
return self.value
@contextmanager
def raw_socket():
sock = socket.socket(socket.AF_INET, socket.SOCK_RAW, socket.IPPROTO_ICMP)
try:
yield sock
finally:
sock.close()
def ping(
host: str, seq: int, id: int, timeout: int = 5
) -> Optional[Tuple[IPHeader, ICMPEcho, float]]:
with raw_socket() as sock:
sock.settimeout(timeout)
sock.setsockopt(socket.IPPROTO_IP, socket.IP_TTL, TTL)
# パケットの作成
# ここのid/seqを動的に渡す
packet = ICMPEcho(ICMPType.ECHO, 0, id, seq, b"\xff").to_bytes()
# パケットを送信
sock.sendto(packet, (host, 0))
send_time = time.time
# パケットを受信
while True:
try:
# 受信したパケットの解析
data, addr = sock.recvfrom(4096)
response_time = time.time()
ip_header, payload = parse_ip_datagram(data)
echo_reply = ICMPEcho.from_bytes(payload)
# idが送信したものと同一ならreturn
if echo_reply.id == id:
if echo_reply.type == ICMPType.ECHOREPLY:
rtt = (response_time - send_time) * 1000
return (ip_header, echo_reply, rtt)
except socket.timeout:
print("Ping timed out")
return None, None, None
TCP編(HTTP)
次にTCPです。今回はHTTPの死活監視について作成しましたが、少し変更すればSSH等の他のプロトコルについても対応可能だと思います。
icmpの際はSOCK_RAWを使用して実装を行いましたが、TCPのパケットを実装する際にはSOCK_STREAMを使用します。
SOCK_RAWを用いても、図のようなパケット構造を作成すればパケットの送受信が可能となるのですが、自作3wayhandshakeはRSTされてしまいます。(理由については作成中です)
これを対策するために、素直にSOCK_STREAMを使用してTCPのヘッダーをいい感じ™に作成してもらいます
TCPの形
ネットワークエンジニアとしてより引用させていただくこのフォーマットの上位20byteのヘッダー部分が前述に該当します。
このデータ部分に各プロトコルのデータが入りますが、今回作成するHTTPでは、ここへHTTPリクエストを挿入します。
コード
基本的な実装はping関数を参考にして実装しています。前述の通りSOCK_STREAMで実装を行います。
リクエスト送信後、受信したデータの中からレスポンスコードだけを抽出して上位関数へ返します。
リクエストメッセージは
"GET / HTTP/1.1\r\nHost: {host}\r\nConnection: close\r\n\r\n"
と簡単なもので作成しており、指定したURLのWEBページが取得できるかどうかのみを要求しています。({host}についてはping(略)関数の引数と同様のstr型です。)
@contextmanager
def stream_socket():
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
try:
yield sock
finally:
sock.close()
def tcp(host: str, port: int, timeout: int = TIMEOUT) -> Optional[bytes]:
try:
remote_ip = socket.gethostbyname(host)
with stream_socket() as sock:
sock.settimeout(timeout)
sock.connect((remote_ip, port))
request = f"GET / HTTP/1.1\r\nHost: {host}\r\nConnection: close\r\n\r\n"
sock.send(request.encode())
send_time = time.time()
response = b""
while True:
data = sock.recv(4096)
if not data:
break # Connection closed by the server
response += data
response_time = time.time()
rtt = (response_time - send_time) * 1000
match = re.search(r"HTTP/\d\.\d (\d{3})", response.decode(errors="ignore"))
return [match.group(1), rtt]
except socket.timeout:
print("Socket timed out.")
return None
except Exception as e:
print(f"An error occurred: {e}")
return None
全てをまとめる上位関数編
これで、単体で動作するプログラムを作成することができました。
ただ、これでは本家のように複数台管理することができないため、これを並列に動作できる上位関数を作り複数台の死活監視を行います。
async def main(sleep_interval: int = 1):
queue = asyncio.Queue()
with Live(generate_table(monitor_table), refresh_per_second=4, screen=True) as live:
# UIを更新する単一タスクを生成
consumer_task = asyncio.create_task(consumer(queue, live))
# 各ホストのpingを実行するタスクを生成
producer_tasks = [
asyncio.create_task(producer(queue, host, id, sleep_interval))
for id, host in enumerate(toml_data["targets"])
]
all_tasks = producer_tasks + [consumer_task]
try:
await asyncio.gather(*all_tasks)
except KeyboardInterrupt:
console.print("Exiting...")
finally:
# 終了時にすべてのタスクを安全にキャンセル
for task in all_tasks:
task.cancel()
await asyncio.gather(*all_tasks, return_exceptions=True)
今回はasyncioで複数プロセスを作成して実装を行いました。あとはGUIやCLIの設定を書き込んでwhileとtimeで適度にpingを送信してあげればいい感じにできます。
