はじめに
NTPサーバーをいろいろ試していると、たまに挙動がおかしいサーバーに出会うことがあるが、パケットダンプを眺めながら解析するのがめんどくさくなってきたので、簡単に試せるスクリプトを生成AIに書かせました。
実行例(正常な場合)
$ ./ntp-query.py ntp.nict.jp
Querying NTP server: ntp.nict.jp:123 (NTPv4)
Accessed Server IP Address: 2001:df0:232:eea0::fff3
--- NTP Packet Details ---
Leap Indicator (LI): 0 (No warning)
Version Number (VN): 4
Mode: 4 (Server)
Stratum: 1 (Primary Reference)
Poll Interval: 0 (2^0 seconds)
Precision: 236 (2^236 seconds)
Root Delay: 0.000000 seconds
Root Dispersion: 0.000000 seconds
Reference ID: 'NICT'
Reference Timestamp: 2025-07-15 07:42:37.000000
Originate Timestamp (T1 - Client Tx): 0 (Unspecified)
Receive Timestamp (T2 - Server Rx): 2025-07-15 07:42:37.651567
Transmit Timestamp (T3 - Server Tx): 2025-07-15 07:42:37.651568
Client Receive Time (T4 - Client Rx): 2025-07-15 07:42:37.659879
Calculated Round-trip Delay: 0.000 ms
Calculated Clock Offset: 0.000 ms (Synchronized)
$
実行例(異常な場合)
verion 4でリクエストを投げたのに、返事がversion 3だった場合
$ ./ntp-query.py ntp3
Querying NTP server: ntp3:123 (NTPv4)
Accessed Server IP Address: 192.168.***.***
--- NTP Packet Details ---
Leap Indicator (LI): 0 (No warning)
Version Number (VN): 3 (Requested 4, received 3!)
Mode: 4 (Server)
Stratum: 1 (Primary Reference)
Poll Interval: 0 (2^0 seconds)
Precision: 250 (2^250 seconds)
Root Delay: 0.000000 seconds
Root Dispersion: 0.000000 seconds
Reference ID: 'pps'
Reference Timestamp: 2025-07-15 07:45:47.230000
Originate Timestamp (T1 - Client Tx): 0 (Unspecified)
Receive Timestamp (T2 - Server Rx): 2025-07-15 07:45:47.230000
Transmit Timestamp (T3 - Server Tx): 2025-07-15 07:45:47.230000
Client Receive Time (T4 - Client Rx): 2025-07-15 07:45:47.226787
Calculated Round-trip Delay: 0.000 ms
Calculated Clock Offset: 0.000 ms (Synchronized)
$
オプション
$ ./ntp-query.py -h
usage: ntp-query.py [-h] [-p PORT] [-v {1,2,3,4}] [-t TIMEOUT] [-4 | -6] server
Send an NTP request and display the parsed NTP response packet details.
positional arguments:
server The NTP server hostname or IP address to query.
options:
-h, --help show this help message and exit
-p PORT, --port PORT The NTP server port (default: 123).
-v {1,2,3,4}, --version {1,2,3,4}
NTP version to use for the request (default: 4).
1: RFC 1059 (NTPv1)
2: RFC 1119 (NTPv2)
3: RFC 1305 (NTPv3)
4: RFC 5905 (NTPv4)
-t TIMEOUT, --timeout TIMEOUT
Timeout in seconds for receiving the NTP response (default: 5).
-4, --ipv4 Force IPv4 connection.
-6, --ipv6 Force IPv6 connection.
$
スクリプト
#!/usr/bin/env python3
#
# NTP Query Tool
# Version: 1.0.4
#
# Description:
# This script sends an NTP request packet to a specified NTP server and
# parses the returned NTP reply packet, displaying its contents in a human-readable format.
# It defaults to NTPv4 but allows specifying other NTP versions via command-line options.
#
# Author: Gemini
#
# Change Log:
# -----------------------------------------------------------------------------------
# 2025-07-15 v1.0.0: Initial release. Basic NTP client request/response parsing.
# Supports NTPv1, v2, v3, v4. Calculates Round-trip Delay and Offset.
# 2025-07-15 v1.0.1: Fixed SyntaxError in f-string related to backslashes in
# Stratum description by pre-calculating the description string.
# 2025-07-15 v1.0.2: Shortened the display of the 'Precision' field for clarity.
# 2025-07-15 v1.0.3: Code review adjustments: Removed unused 'sys' module and 'precision_s' variable.
# Clarified Stratum 0 (Kiss-o'-Death) handling and removed misleading comments.
# 2025-07-15 v1.0.4: Added display of the resolved server IP address.
# Added options (-4, -6) to force IPv4 or IPv6 communication.
# -----------------------------------------------------------------------------------
import socket
import struct
import argparse
import datetime
import time
from colorama import Fore, Style, init
# colorama の初期化
init(autoreset=True)
# NTP epoch (1900年1月1日) から Unix epoch (1970年1月1日) までの秒数
NTP_DELTA = (datetime.datetime(1970, 1, 1) - datetime.datetime(1900, 1, 1)).total_seconds()
def get_ntp_packet_data(server, port=123, version=4, timeout=5, af=socket.AF_UNSPEC):
"""
NTPサーバーにリクエストを送信し、NTP応答パケットの生データを取得します。
:param server: NTPサーバーのホスト名またはIPアドレス
:param port: NTPサーバーのポート
:param version: NTPバージョン
:param timeout: タイムアウト秒数
:param af: アドレスファミリー (socket.AF_INET for IPv4, socket.AF_INET6 for IPv6, socket.AF_UNSPEC for auto)
:return: (data, request_time, receive_time, resolved_ip) または (None, None, None, None)
"""
resolved_ip = None
try:
# ホスト名とアドレスファミリーに基づいてアドレス情報を取得
# type=socket.SOCK_DGRAM は UDP ソケット用
# proto=socket.IPPROTO_UDP は UDP プロトコル用
# flags=socket.AI_NUMERICHOST はサーバーが数値IPアドレスの場合に名前解決をスキップ
addr_info = socket.getaddrinfo(server, port, af, socket.SOCK_DGRAM, socket.IPPROTO_UDP)
if not addr_info:
print(f"{Fore.RED}Error: Could not resolve hostname {server} for specified address family.{Style.RESET_ALL}")
return None, None, None, None
# 最初の有効なアドレスを使用
# addr_info の要素: (family, socktype, proto, canonname, sockaddr)
# sockaddr: (ip_address, port) for IPv4, (ip_address, port, flowinfo, scopeid) for IPv6
actual_af, _, _, _, sockaddr = addr_info[0]
resolved_ip = sockaddr[0] # 解決されたIPアドレス
client = socket.socket(actual_af, socket.SOCK_DGRAM)
client.settimeout(timeout)
# LI (Leap Indicator): 0 (no warning)
# VN (Version Number): 指定されたバージョン
# Mode: 3 (client)
mode_vn_li = (0 << 6) | (version << 3) | 3
# 現在のUnixタイムスタンプからNTPタイムスタンプへの変換
current_time = time.time()
ntp_time = current_time + NTP_DELTA
# Transmit Timestamp (64-bit fixed-point: high 32 bits integer, low 32 bits fraction)
tx_timestamp_int = int(ntp_time)
tx_timestamp_frac = int((ntp_time - tx_timestamp_int) * (2**32))
# NTP リクエストパケット (48バイト) を作成
# クライアントのリクエストでは、Originate Timestamp (バイト24-31) に送信時刻を設定
packet = bytearray(48)
packet[0] = mode_vn_li # LI | VN | Mode
struct.pack_into('>II', packet, 24, tx_timestamp_int, tx_timestamp_frac) # Originate Timestamp
# リクエストを送信
client.sendto(packet, (resolved_ip, port))
# クライアントがパケットを送信した正確な時刻を記録 (Unix timestamp)
request_time = time.time()
# 応答を受信
data, address = client.recvfrom(1024)
receive_time = time.time() # サーバーからの応答を受信した時刻を記録
return data, request_time, receive_time, resolved_ip
except socket.timeout:
print(f"{Fore.RED}Error: Request to {server}:{port} ({resolved_ip if resolved_ip else 'resolving...'}) timed out after {timeout} seconds.{Style.RESET_ALL}")
return None, None, None, None
except socket.gaierror:
print(f"{Fore.RED}Error: Could not resolve hostname {server}. Check the server address or DNS settings.{Style.RESET_ALL}")
return None, None, None, None
except Exception as e:
print(f"{Fore.RED}An error occurred during NTP request to {server}:{port} ({resolved_ip if resolved_ip else 'unknown IP'}): {e}{Style.RESET_ALL}")
return None, None, None, None
finally:
if 'client' in locals() and client:
client.close()
def parse_ntp_packet(data, request_time, receive_time, version):
"""
NTP応答パケットのデータを解析し、分かりやすく表示します。
"""
if not data or len(data) < 48:
print(f"{Fore.RED}Error: Invalid NTP packet data received. Length: {len(data)} (expected at least 48 bytes).{Style.RESET_ALL}")
return None
# NTPパケットの構造をアンパック (ヘッダー部分)
li_vn_mode, stratum, poll, precision, root_delay_int, root_disp_int, ref_id_val = struct.unpack_from('>BBBBIII', data, 0)
# タイムスタンプをアンパック (各8バイト = 2つの4バイト整数)
ref_ts_int, ref_ts_frac = struct.unpack_from('>II', data, 16)
orig_ts_int, orig_ts_frac = struct.unpack_from('>II', data, 24)
recv_ts_int, recv_ts_frac = struct.unpack_from('>II', data, 32)
tx_ts_int, tx_ts_frac = struct.unpack_from('>II', data, 40)
# 各フィールドをデコード
li = (li_vn_mode >> 6) & 0x03
vn = (li_vn_mode >> 3) & 0x07
mode = li_vn_mode & 0x07
# Root Delay / Root Dispersion を秒に変換 (16.16 固定小数点形式)
root_delay_s = float(root_delay_int) / (2**16)
root_dispersion_s = float(root_disp_int) / (2**16)
# Reference IDの解釈
reference_id = ""
if stratum == 0:
# Kiss-o'-death code (KOD) または Unspecified
try:
# Reference IDをASCII文字列として解釈を試みる
kod_string = bytes.fromhex(f'{ref_id_val:08x}').decode('ascii', errors='ignore').strip()
if kod_string:
reference_id = f"KOD: '{kod_string}'"
else:
reference_id = "N/A" # KODだが文字列なし
except Exception: # 例外発生時は変換不可として扱う
reference_id = "N/A"
elif stratum == 1:
# Stratum 1 (Primary reference) は4バイトの文字列 (e.g., 'GPS', 'LOCL', 'PPS')
reference_id = f"'{bytes.fromhex(f'{ref_id_val:08x}').decode('ascii', errors='ignore').strip()}'"
elif stratum >= 2:
# Stratum 2+ (Secondary reference) は参照元サーバーのIPv4アドレス
try:
reference_id = socket.inet_ntoa(struct.pack('>I', ref_id_val))
except OSError: # 例: ref_id_val が有効な IPv4 アドレスではない場合
reference_id = f"0x{ref_id_val:08x} (Invalid IP)" if ref_id_val else "N/A"
else:
reference_id = "N/A" # 不明なStratum値の場合
# NTP 64-bit 固定小数点タイムスタンプを Unix タイムスタンプに変換
def ntp_to_unix_time(integer, fraction):
if integer == 0 and fraction == 0:
return 0.0 # ゼロタイムスタンプは特定されていないことを示す
return (integer + fraction / (2**32)) - NTP_DELTA
ref_timestamp = ntp_to_unix_time(ref_ts_int, ref_ts_frac)
orig_timestamp = ntp_to_unix_time(orig_ts_int, orig_ts_frac)
recv_timestamp = ntp_to_unix_time(recv_ts_int, recv_ts_frac)
tx_timestamp = ntp_to_unix_time(tx_ts_int, tx_ts_frac)
# Unix タイムスタンプを読みやすい datetime 形式に変換
def format_timestamp(unix_time):
if unix_time == 0:
return "0 (Unspecified)"
try:
# UTCで表示
dt_object = datetime.datetime.fromtimestamp(unix_time, tz=datetime.timezone.utc)
return dt_object.strftime('%Y-%m-%d %H:%M:%S.%f UTC')[:-3] # ミリ秒精度
except ValueError:
return f"{unix_time:.6f} (Invalid timestamp)"
# ラウンドトリップ遅延とクロックオフセットの計算 (RFC 5905)
# T1 = クライアント送信時刻 (server response: Originate Timestamp)
# T2 = サーバー受信時刻 (server response: Receive Timestamp)
# T3 = サーバー送信時刻 (server response: Transmit Timestamp)
# T4 = クライアント受信時刻 (client script: receive_time)
# 遅延 (Delta): (T2 - T1) + (T4 - T3)
# オフセット (Theta): ((T2 - T1) + (T3 - T4)) / 2
delay = ((recv_timestamp - orig_timestamp) + (receive_time - tx_timestamp)) if orig_timestamp != 0 and tx_timestamp != 0 else 0.0
offset = ((recv_timestamp - orig_timestamp) + (tx_timestamp - receive_time)) / 2.0 if orig_timestamp != 0 and tx_timestamp != 0 else 0.0
output = f"{Fore.CYAN}--- NTP Packet Details ---{Style.RESET_ALL}\n"
output += f" {Fore.YELLOW}Leap Indicator (LI):{Style.RESET_ALL} {li} ({['No warning', 'Last minute has 61 seconds', 'Last minute has 59 seconds', 'Alarm condition (clock not synchronized)'][li]})\n"
output += f" {Fore.YELLOW}Version Number (VN):{Style.RESET_ALL} {vn}"
if vn != version:
output += f" {Fore.RED}(Requested {version}, received {vn}!) {Style.RESET_ALL}"
output += "\n"
output += f" {Fore.YELLOW}Mode:{Style.RESET_ALL} {mode} ({['Reserved', 'Symmetric Active', 'Symmetric Passive', 'Client', 'Server', 'Broadcast', 'NTP Control Message', 'Reserved for private use'][mode]})\n"
# Stratumの説明を事前に変数に格納する
stratum_desc = ""
if stratum == 0:
if "KOD:" in reference_id: # Reference IDにKOD情報があるかチェック
stratum_desc = "Kiss-o'-Death"
else:
stratum_desc = "Unspecified"
elif stratum == 1:
stratum_desc = "Primary Reference"
elif 2 <= stratum <= 16:
stratum_desc = "Secondary Reference"
elif stratum >= 17:
stratum_desc = "Reserved"
else:
stratum_desc = "Invalid"
output += f" {Fore.YELLOW}Stratum:{Style.RESET_ALL} {stratum} ({stratum_desc})\n"
output += f" {Fore.YELLOW}Poll Interval:{Style.RESET_ALL} {poll} (2^{poll} seconds)\n"
output += f" {Fore.YELLOW}Precision:{Style.RESET_ALL} {precision} (2^{precision} seconds)\n"
output += f" {Fore.YELLOW}Root Delay:{Style.RESET_ALL} {root_delay_s:.6f} seconds\n"
output += f" {Fore.YELLOW}Root Dispersion:{Style.RESET_ALL} {root_dispersion_s:.6f} seconds\n"
output += f" {Fore.YELLOW}Reference ID:{Style.RESET_ALL} {reference_id}\n"
output += f" {Fore.YELLOW}Reference Timestamp:{Style.RESET_ALL} {format_timestamp(ref_timestamp)}\n"
output += f" {Fore.YELLOW}Originate Timestamp (T1 - Client Tx):{Style.RESET_ALL} {format_timestamp(orig_timestamp)}\n"
output += f" {Fore.YELLOW}Receive Timestamp (T2 - Server Rx):{Style.RESET_ALL} {format_timestamp(recv_timestamp)}\n"
output += f" {Fore.YELLOW}Transmit Timestamp (T3 - Server Tx):{Style.RESET_ALL} {format_timestamp(tx_timestamp)}\n"
output += f" {Fore.YELLOW}Client Receive Time (T4 - Client Rx):{Style.RESET_ALL} {format_timestamp(receive_time)}\n"
output += f"\n {Fore.GREEN}Calculated Round-trip Delay:{Style.RESET_ALL} {delay * 1000:.3f} ms\n"
output += f" {Fore.GREEN}Calculated Clock Offset:{Style.RESET_ALL} {offset * 1000:.3f} ms ({'Client is fast' if offset > 0 else 'Client is slow' if offset < 0 else 'Synchronized'})\n"
return output
def main():
parser = argparse.ArgumentParser(
description="Send an NTP request and display the parsed NTP response packet details.",
formatter_class=argparse.RawTextHelpFormatter
)
parser.add_argument(
'server', type=str,
help="The NTP server hostname or IP address to query."
)
parser.add_argument(
'-p', '--port', type=int, default=123,
help="The NTP server port (default: 123)."
)
parser.add_argument(
'-v', '--version', type=int, default=4, choices=[1, 2, 3, 4],
help="NTP version to use for the request (default: 4).\n"
" 1: RFC 1059 (NTPv1)\n"
" 2: RFC 1119 (NTPv2)\n"
" 3: RFC 1305 (NTPv3)\n"
" 4: RFC 5905 (NTPv4)"
)
parser.add_argument(
'-t', '--timeout', type=int, default=5,
help="Timeout in seconds for receiving the NTP response (default: 5)."
)
# IPv4/IPv6 固定オプションを追加
group = parser.add_mutually_exclusive_group()
group.add_argument(
'-4', '--ipv4', action='store_true',
help="Force IPv4 connection."
)
group.add_argument(
'-6', '--ipv6', action='store_true',
help="Force IPv6 connection."
)
args = parser.parse_args()
server_address = args.server
server_port = args.port
ntp_version = args.version
timeout_seconds = args.timeout
# アドレスファミリーの選択
addr_family = socket.AF_UNSPEC # デフォルトは自動選択
if args.ipv4:
addr_family = socket.AF_INET
elif args.ipv6:
addr_family = socket.AF_INET6
print(f"{Fore.CYAN}Querying NTP server: {server_address}:{server_port} (NTPv{ntp_version}){Style.RESET_ALL}\n")
# NTPサーバーにリクエストを送信し、生データ、タイムスタンプ、解決されたIPアドレスを取得
packet_data, request_time, receive_time, resolved_ip = get_ntp_packet_data(
server_address, server_port, ntp_version, timeout_seconds, addr_family
)
if packet_data:
if resolved_ip:
print(f"{Fore.GREEN}Accessed Server IP Address:{Style.RESET_ALL} {resolved_ip}\n")
parsed_output = parse_ntp_packet(packet_data, request_time, receive_time, ntp_version)
print(parsed_output)
else:
print(f"{Fore.RED}Failed to get NTP response from {server_address}.{Style.RESET_ALL}")
if __name__ == "__main__":
main()