1
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

NTPサーバーにリクエストを投げて、返ってきたパケットの中身を解析して見やすくしてくれるスクリプト

Last updated at Posted at 2025-07-15

はじめに

NTPサーバーをいろいろ試していると、たまに挙動がおかしいサーバーに出会うことがあるが、パケットダンプを眺めながら解析するのがめんどくさくなってきたので、簡単に試せるスクリプトを生成AIに書かせました。

実行例(正常な場合)

$ ./ntp-query.py ntp.nict.jp
Querying NTP server: ntp.nict.jp:123 (NTPv4)

Accessed Server IP Address: 2001:df0:232:eea0::fff3

--- NTP Packet Details ---
  Leap Indicator (LI): 0 (No warning)
  Version Number (VN): 4
  Mode: 4 (Server)
  Stratum: 1 (Primary Reference)
  Poll Interval: 0 (2^0 seconds)
  Precision: 236 (2^236 seconds)
  Root Delay: 0.000000 seconds
  Root Dispersion: 0.000000 seconds
  Reference ID: 'NICT'
  Reference Timestamp: 2025-07-15 07:42:37.000000
  Originate Timestamp (T1 - Client Tx): 0 (Unspecified)
  Receive Timestamp (T2 - Server Rx): 2025-07-15 07:42:37.651567
  Transmit Timestamp (T3 - Server Tx): 2025-07-15 07:42:37.651568
  Client Receive Time (T4 - Client Rx): 2025-07-15 07:42:37.659879

  Calculated Round-trip Delay: 0.000 ms
  Calculated Clock Offset: 0.000 ms (Synchronized)

$

実行例(異常な場合)

verion 4でリクエストを投げたのに、返事がversion 3だった場合

$ ./ntp-query.py ntp3
Querying NTP server: ntp3:123 (NTPv4)

Accessed Server IP Address: 192.168.***.***

--- NTP Packet Details ---
  Leap Indicator (LI): 0 (No warning)
  Version Number (VN): 3 (Requested 4, received 3!)
  Mode: 4 (Server)
  Stratum: 1 (Primary Reference)
  Poll Interval: 0 (2^0 seconds)
  Precision: 250 (2^250 seconds)
  Root Delay: 0.000000 seconds
  Root Dispersion: 0.000000 seconds
  Reference ID: 'pps'
  Reference Timestamp: 2025-07-15 07:45:47.230000
  Originate Timestamp (T1 - Client Tx): 0 (Unspecified)
  Receive Timestamp (T2 - Server Rx): 2025-07-15 07:45:47.230000
  Transmit Timestamp (T3 - Server Tx): 2025-07-15 07:45:47.230000
  Client Receive Time (T4 - Client Rx): 2025-07-15 07:45:47.226787

  Calculated Round-trip Delay: 0.000 ms
  Calculated Clock Offset: 0.000 ms (Synchronized)

$

オプション

$ ./ntp-query.py -h
usage: ntp-query.py [-h] [-p PORT] [-v {1,2,3,4}] [-t TIMEOUT] [-4 | -6] server

Send an NTP request and display the parsed NTP response packet details.

positional arguments:
  server                The NTP server hostname or IP address to query.

options:
  -h, --help            show this help message and exit
  -p PORT, --port PORT  The NTP server port (default: 123).
  -v {1,2,3,4}, --version {1,2,3,4}
                        NTP version to use for the request (default: 4).
                          1: RFC 1059 (NTPv1)
                          2: RFC 1119 (NTPv2)
                          3: RFC 1305 (NTPv3)
                          4: RFC 5905 (NTPv4)
  -t TIMEOUT, --timeout TIMEOUT
                        Timeout in seconds for receiving the NTP response (default: 5).
  -4, --ipv4            Force IPv4 connection.
  -6, --ipv6            Force IPv6 connection.
$

スクリプト

#!/usr/bin/env python3
#
# NTP Query Tool
# Version: 1.0.4
#
# Description:
# This script sends an NTP request packet to a specified NTP server and
# parses the returned NTP reply packet, displaying its contents in a human-readable format.
# It defaults to NTPv4 but allows specifying other NTP versions via command-line options.
#
# Author: Gemini
#
# Change Log:
# -----------------------------------------------------------------------------------
# 2025-07-15 v1.0.0: Initial release. Basic NTP client request/response parsing.
#                    Supports NTPv1, v2, v3, v4. Calculates Round-trip Delay and Offset.
# 2025-07-15 v1.0.1: Fixed SyntaxError in f-string related to backslashes in
#                    Stratum description by pre-calculating the description string.
# 2025-07-15 v1.0.2: Shortened the display of the 'Precision' field for clarity.
# 2025-07-15 v1.0.3: Code review adjustments: Removed unused 'sys' module and 'precision_s' variable.
#                    Clarified Stratum 0 (Kiss-o'-Death) handling and removed misleading comments.
# 2025-07-15 v1.0.4: Added display of the resolved server IP address.
#                    Added options (-4, -6) to force IPv4 or IPv6 communication.
# -----------------------------------------------------------------------------------

import socket
import struct
import argparse
import datetime
import time
from colorama import Fore, Style, init

# colorama の初期化
init(autoreset=True)

# NTP epoch (1900年1月1日) から Unix epoch (1970年1月1日) までの秒数
NTP_DELTA = (datetime.datetime(1970, 1, 1) - datetime.datetime(1900, 1, 1)).total_seconds()

def get_ntp_packet_data(server, port=123, version=4, timeout=5, af=socket.AF_UNSPEC):
    """
    NTPサーバーにリクエストを送信し、NTP応答パケットの生データを取得します。
    :param server: NTPサーバーのホスト名またはIPアドレス
    :param port: NTPサーバーのポート
    :param version: NTPバージョン
    :param timeout: タイムアウト秒数
    :param af: アドレスファミリー (socket.AF_INET for IPv4, socket.AF_INET6 for IPv6, socket.AF_UNSPEC for auto)
    :return: (data, request_time, receive_time, resolved_ip) または (None, None, None, None)
    """
    resolved_ip = None
    try:
        # ホスト名とアドレスファミリーに基づいてアドレス情報を取得
        # type=socket.SOCK_DGRAM は UDP ソケット用
        # proto=socket.IPPROTO_UDP は UDP プロトコル用
        # flags=socket.AI_NUMERICHOST はサーバーが数値IPアドレスの場合に名前解決をスキップ
        addr_info = socket.getaddrinfo(server, port, af, socket.SOCK_DGRAM, socket.IPPROTO_UDP)

        if not addr_info:
            print(f"{Fore.RED}Error: Could not resolve hostname {server} for specified address family.{Style.RESET_ALL}")
            return None, None, None, None

        # 最初の有効なアドレスを使用
        # addr_info の要素: (family, socktype, proto, canonname, sockaddr)
        # sockaddr: (ip_address, port) for IPv4, (ip_address, port, flowinfo, scopeid) for IPv6
        actual_af, _, _, _, sockaddr = addr_info[0]
        resolved_ip = sockaddr[0] # 解決されたIPアドレス

        client = socket.socket(actual_af, socket.SOCK_DGRAM)
        client.settimeout(timeout)

        # LI (Leap Indicator): 0 (no warning)
        # VN (Version Number): 指定されたバージョン
        # Mode: 3 (client)
        mode_vn_li = (0 << 6) | (version << 3) | 3

        # 現在のUnixタイムスタンプからNTPタイムスタンプへの変換
        current_time = time.time()
        ntp_time = current_time + NTP_DELTA

        # Transmit Timestamp (64-bit fixed-point: high 32 bits integer, low 32 bits fraction)
        tx_timestamp_int = int(ntp_time)
        tx_timestamp_frac = int((ntp_time - tx_timestamp_int) * (2**32))

        # NTP リクエストパケット (48バイト) を作成
        # クライアントのリクエストでは、Originate Timestamp (バイト24-31) に送信時刻を設定
        packet = bytearray(48)
        packet[0] = mode_vn_li # LI | VN | Mode
        struct.pack_into('>II', packet, 24, tx_timestamp_int, tx_timestamp_frac) # Originate Timestamp

        # リクエストを送信
        client.sendto(packet, (resolved_ip, port))

        # クライアントがパケットを送信した正確な時刻を記録 (Unix timestamp)
        request_time = time.time()

        # 応答を受信
        data, address = client.recvfrom(1024)
        receive_time = time.time() # サーバーからの応答を受信した時刻を記録

        return data, request_time, receive_time, resolved_ip

    except socket.timeout:
        print(f"{Fore.RED}Error: Request to {server}:{port} ({resolved_ip if resolved_ip else 'resolving...'}) timed out after {timeout} seconds.{Style.RESET_ALL}")
        return None, None, None, None
    except socket.gaierror:
        print(f"{Fore.RED}Error: Could not resolve hostname {server}. Check the server address or DNS settings.{Style.RESET_ALL}")
        return None, None, None, None
    except Exception as e:
        print(f"{Fore.RED}An error occurred during NTP request to {server}:{port} ({resolved_ip if resolved_ip else 'unknown IP'}): {e}{Style.RESET_ALL}")
        return None, None, None, None
    finally:
        if 'client' in locals() and client:
            client.close()

def parse_ntp_packet(data, request_time, receive_time, version):
    """
    NTP応答パケットのデータを解析し、分かりやすく表示します。
    """
    if not data or len(data) < 48:
        print(f"{Fore.RED}Error: Invalid NTP packet data received. Length: {len(data)} (expected at least 48 bytes).{Style.RESET_ALL}")
        return None

    # NTPパケットの構造をアンパック (ヘッダー部分)
    li_vn_mode, stratum, poll, precision, root_delay_int, root_disp_int, ref_id_val = struct.unpack_from('>BBBBIII', data, 0)

    # タイムスタンプをアンパック (各8バイト = 2つの4バイト整数)
    ref_ts_int, ref_ts_frac = struct.unpack_from('>II', data, 16)
    orig_ts_int, orig_ts_frac = struct.unpack_from('>II', data, 24)
    recv_ts_int, recv_ts_frac = struct.unpack_from('>II', data, 32)
    tx_ts_int, tx_ts_frac = struct.unpack_from('>II', data, 40)

    # 各フィールドをデコード
    li = (li_vn_mode >> 6) & 0x03
    vn = (li_vn_mode >> 3) & 0x07
    mode = li_vn_mode & 0x07

    # Root Delay / Root Dispersion を秒に変換 (16.16 固定小数点形式)
    root_delay_s = float(root_delay_int) / (2**16)
    root_dispersion_s = float(root_disp_int) / (2**16)

    # Reference IDの解釈
    reference_id = ""
    if stratum == 0:
        # Kiss-o'-death code (KOD) または Unspecified
        try:
            # Reference IDをASCII文字列として解釈を試みる
            kod_string = bytes.fromhex(f'{ref_id_val:08x}').decode('ascii', errors='ignore').strip()
            if kod_string:
                reference_id = f"KOD: '{kod_string}'"
            else:
                reference_id = "N/A" # KODだが文字列なし
        except Exception: # 例外発生時は変換不可として扱う
            reference_id = "N/A"
    elif stratum == 1:
        # Stratum 1 (Primary reference) は4バイトの文字列 (e.g., 'GPS', 'LOCL', 'PPS')
        reference_id = f"'{bytes.fromhex(f'{ref_id_val:08x}').decode('ascii', errors='ignore').strip()}'"
    elif stratum >= 2:
        # Stratum 2+ (Secondary reference) は参照元サーバーのIPv4アドレス
        try:
            reference_id = socket.inet_ntoa(struct.pack('>I', ref_id_val))
        except OSError: # 例: ref_id_val が有効な IPv4 アドレスではない場合
            reference_id = f"0x{ref_id_val:08x} (Invalid IP)" if ref_id_val else "N/A"
    else:
        reference_id = "N/A" # 不明なStratum値の場合

    # NTP 64-bit 固定小数点タイムスタンプを Unix タイムスタンプに変換
    def ntp_to_unix_time(integer, fraction):
        if integer == 0 and fraction == 0:
            return 0.0 # ゼロタイムスタンプは特定されていないことを示す
        return (integer + fraction / (2**32)) - NTP_DELTA

    ref_timestamp = ntp_to_unix_time(ref_ts_int, ref_ts_frac)
    orig_timestamp = ntp_to_unix_time(orig_ts_int, orig_ts_frac)
    recv_timestamp = ntp_to_unix_time(recv_ts_int, recv_ts_frac)
    tx_timestamp = ntp_to_unix_time(tx_ts_int, tx_ts_frac)

    # Unix タイムスタンプを読みやすい datetime 形式に変換
    def format_timestamp(unix_time):
        if unix_time == 0:
            return "0 (Unspecified)"
        try:
            # UTCで表示
            dt_object = datetime.datetime.fromtimestamp(unix_time, tz=datetime.timezone.utc)
            return dt_object.strftime('%Y-%m-%d %H:%M:%S.%f UTC')[:-3] # ミリ秒精度
        except ValueError:
            return f"{unix_time:.6f} (Invalid timestamp)"


    # ラウンドトリップ遅延とクロックオフセットの計算 (RFC 5905)
    # T1 = クライアント送信時刻 (server response: Originate Timestamp)
    # T2 = サーバー受信時刻 (server response: Receive Timestamp)
    # T3 = サーバー送信時刻 (server response: Transmit Timestamp)
    # T4 = クライアント受信時刻 (client script: receive_time)

    # 遅延 (Delta): (T2 - T1) + (T4 - T3)
    # オフセット (Theta): ((T2 - T1) + (T3 - T4)) / 2
    delay = ((recv_timestamp - orig_timestamp) + (receive_time - tx_timestamp)) if orig_timestamp != 0 and tx_timestamp != 0 else 0.0
    offset = ((recv_timestamp - orig_timestamp) + (tx_timestamp - receive_time)) / 2.0 if orig_timestamp != 0 and tx_timestamp != 0 else 0.0

    output = f"{Fore.CYAN}--- NTP Packet Details ---{Style.RESET_ALL}\n"
    output += f"  {Fore.YELLOW}Leap Indicator (LI):{Style.RESET_ALL} {li} ({['No warning', 'Last minute has 61 seconds', 'Last minute has 59 seconds', 'Alarm condition (clock not synchronized)'][li]})\n"
    output += f"  {Fore.YELLOW}Version Number (VN):{Style.RESET_ALL} {vn}"
    if vn != version:
        output += f" {Fore.RED}(Requested {version}, received {vn}!) {Style.RESET_ALL}"
    output += "\n"
    output += f"  {Fore.YELLOW}Mode:{Style.RESET_ALL} {mode} ({['Reserved', 'Symmetric Active', 'Symmetric Passive', 'Client', 'Server', 'Broadcast', 'NTP Control Message', 'Reserved for private use'][mode]})\n"

    # Stratumの説明を事前に変数に格納する
    stratum_desc = ""
    if stratum == 0:
        if "KOD:" in reference_id: # Reference IDにKOD情報があるかチェック
            stratum_desc = "Kiss-o'-Death"
        else:
            stratum_desc = "Unspecified"
    elif stratum == 1:
        stratum_desc = "Primary Reference"
    elif 2 <= stratum <= 16:
        stratum_desc = "Secondary Reference"
    elif stratum >= 17:
        stratum_desc = "Reserved"
    else:
        stratum_desc = "Invalid"

    output += f"  {Fore.YELLOW}Stratum:{Style.RESET_ALL} {stratum} ({stratum_desc})\n"

    output += f"  {Fore.YELLOW}Poll Interval:{Style.RESET_ALL} {poll} (2^{poll} seconds)\n"
    output += f"  {Fore.YELLOW}Precision:{Style.RESET_ALL} {precision} (2^{precision} seconds)\n"

    output += f"  {Fore.YELLOW}Root Delay:{Style.RESET_ALL} {root_delay_s:.6f} seconds\n"
    output += f"  {Fore.YELLOW}Root Dispersion:{Style.RESET_ALL} {root_dispersion_s:.6f} seconds\n"
    output += f"  {Fore.YELLOW}Reference ID:{Style.RESET_ALL} {reference_id}\n"
    output += f"  {Fore.YELLOW}Reference Timestamp:{Style.RESET_ALL} {format_timestamp(ref_timestamp)}\n"
    output += f"  {Fore.YELLOW}Originate Timestamp (T1 - Client Tx):{Style.RESET_ALL} {format_timestamp(orig_timestamp)}\n"
    output += f"  {Fore.YELLOW}Receive Timestamp (T2 - Server Rx):{Style.RESET_ALL} {format_timestamp(recv_timestamp)}\n"
    output += f"  {Fore.YELLOW}Transmit Timestamp (T3 - Server Tx):{Style.RESET_ALL} {format_timestamp(tx_timestamp)}\n"
    output += f"  {Fore.YELLOW}Client Receive Time (T4 - Client Rx):{Style.RESET_ALL} {format_timestamp(receive_time)}\n"
    output += f"\n  {Fore.GREEN}Calculated Round-trip Delay:{Style.RESET_ALL} {delay * 1000:.3f} ms\n"
    output += f"  {Fore.GREEN}Calculated Clock Offset:{Style.RESET_ALL} {offset * 1000:.3f} ms ({'Client is fast' if offset > 0 else 'Client is slow' if offset < 0 else 'Synchronized'})\n"

    return output

def main():
    parser = argparse.ArgumentParser(
        description="Send an NTP request and display the parsed NTP response packet details.",
        formatter_class=argparse.RawTextHelpFormatter
    )
    parser.add_argument(
        'server', type=str,
        help="The NTP server hostname or IP address to query."
    )
    parser.add_argument(
        '-p', '--port', type=int, default=123,
        help="The NTP server port (default: 123)."
    )
    parser.add_argument(
        '-v', '--version', type=int, default=4, choices=[1, 2, 3, 4],
        help="NTP version to use for the request (default: 4).\n"
             "  1: RFC 1059 (NTPv1)\n"
             "  2: RFC 1119 (NTPv2)\n"
             "  3: RFC 1305 (NTPv3)\n"
             "  4: RFC 5905 (NTPv4)"
    )
    parser.add_argument(
        '-t', '--timeout', type=int, default=5,
        help="Timeout in seconds for receiving the NTP response (default: 5)."
    )
    # IPv4/IPv6 固定オプションを追加
    group = parser.add_mutually_exclusive_group()
    group.add_argument(
        '-4', '--ipv4', action='store_true',
        help="Force IPv4 connection."
    )
    group.add_argument(
        '-6', '--ipv6', action='store_true',
        help="Force IPv6 connection."
    )

    args = parser.parse_args()

    server_address = args.server
    server_port = args.port
    ntp_version = args.version
    timeout_seconds = args.timeout

    # アドレスファミリーの選択
    addr_family = socket.AF_UNSPEC # デフォルトは自動選択
    if args.ipv4:
        addr_family = socket.AF_INET
    elif args.ipv6:
        addr_family = socket.AF_INET6

    print(f"{Fore.CYAN}Querying NTP server: {server_address}:{server_port} (NTPv{ntp_version}){Style.RESET_ALL}\n")

    # NTPサーバーにリクエストを送信し、生データ、タイムスタンプ、解決されたIPアドレスを取得
    packet_data, request_time, receive_time, resolved_ip = get_ntp_packet_data(
        server_address, server_port, ntp_version, timeout_seconds, addr_family
    )

    if packet_data:
        if resolved_ip:
            print(f"{Fore.GREEN}Accessed Server IP Address:{Style.RESET_ALL} {resolved_ip}\n")
        parsed_output = parse_ntp_packet(packet_data, request_time, receive_time, ntp_version)
        print(parsed_output)
    else:
        print(f"{Fore.RED}Failed to get NTP response from {server_address}.{Style.RESET_ALL}")

if __name__ == "__main__":
    main()

1
1
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
1
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?