はじめに
gungnir などのツールでCT(Certificate Transparency)ログをストリーミング監視していると、こんな文字列が大量に流れてきます。
p0nlwhm.gfedcbedcbupdate.6543294321432trxyndev.ymfahkcelkay.74.208.78.205.nip.io
1780556601966040952-3f3unzzsw0xra.tls-alpn-01.production.haplorrhini.com
update.sryupdate.qpojihgjihgbupdate.6543654321076543210admin.en.104-199-114-12.sslip.io
これらは監視対象の資産ではありません。Let's Encryptの検証ノイズ、スキャナーのトレース用マーカー、DGA(Domain Generation Algorithm)が吐き出したランダムドメインです。
本記事では、これらノイズの正体と発生原理を解説し、攻撃面管理(ASM)やOSINT収集の現場で実際に使えるPythonフィルタリングスクリプトを紹介します。
1. CTログを汚す「デジタルゴミ」の4分類
① Let's Encryptの多視点検証(Multi-Perspective Validation)ノイズ
頻出する *.production.haplorrhini.com はLet's Encryptのインフラです。
Haplorrhini は、サルやメガネザルを含む生物分類群。Let's Encryptの開発チームは、検証ノードのドメインにこうした生物分類名(他に
bignat.coなど)を使う慣習があります。
DNSインジェクションや経路ハイジャックによる不正証明書発行を防ぐため、Let's Encryptは世界中のマルチクラウド拠点(AWS / GCP / Azure 等)から同時に対象ドメインへ接続し、所有権を確認する Multi-Perspective Validation を実施しています。この検証フェーズ(TLS-ALPN-01 / DNS-01 チャレンジ)で使い捨てサブドメインが動的生成され、すべてCTログに記録されるため、大量のノイズとして観測されます。
② スキャナー・調査機関の「埋標トークン(Scan Token)」
gfedcbedcb(e-d-c-b-a の逆順)や qpojih(i-h-g-f-e の逆順)という文字列に気づいたことはありませんか。
これはCensys、Shodan、あるいは広域スキャンを行うグループが、自身の証明書発行リクエストがCTログのストリームにどう伝播するかを追跡するためのマーカーです。逆順アルファベット列は人間が意図的に埋め込んだ識別子であり、自動生成パターンの特徴的なシグネチャになります。
③ マルウェア・C2サーバーのDGA(Domain Generation Algorithm)
WAFやDNSブロックによるIPブロッキングを回避するため、C2サーバーやマルウェアはDGAで時間単位にドメインを大量生成します。HTTPS通信を行うためにこれらが自動で証明書を申請した結果、CTログに無数のランダムドメインが残ります。
④ クラウドインフラの内部ヘルスチェック
Kubernetesのイングレスコントローラーや各種ゲートウェイが、ワイルドカード証明書の動作検証・ヘルスチェック用に発行する証明書です。wildcardprobe-* のようなプレフィックスが特徴的です。
2. Pythonによる高精度フィルタリング
これらノイズはASM・バグバウンティ・OSINT調査において 99%不要なデータです。以下のスクリプトで実業務資産のみを抽出します。
ct_cleaner.py
#!/usr/bin/env python3
"""
ct_cleaner.py
CTログ(gungnir等のJSONストリーム出力)からノイズを除去し、
クリーンなドメインリストを生成・出力する最適化スクリプト。
"""
from __future__ import annotations
import json
import re
import sys
import argparse
from pathlib import Path
# ── フィルタリング用定数 ──────────────────────────────────────────────────────
BANNED_SUFFIXES: tuple[str, ...] = (
"haplorrhini.com", # Let's Encrypt 多視点検証ノード
"bignat.co", # Let's Encrypt 検証関連
"sslip.io", # IP ↔ ドメイン変換サービス
"nip.io", # IP ↔ ドメイン変換サービス
"viam.cloud", # IoT クラウド自動化
"myvolumio.org", # デバイス向け動的ドメイン
"workers.dev", # Cloudflare Workers
"pages.dev", # Cloudflare Pages
"plesk.page", # Plesk 一時ドメイン
"trafficmanager.net",# Azure Traffic Manager
"azurewebsites.net", # Azure App Service
)
# スキャナートークン(逆順アルファベット列等)のDGAパターン
SCANNER_RE = re.compile(
r"(?:abcdef|fedcba|edcba|ihgfed|defghi|qpojih|yxwvut|srqpon)"
)
NOISE_KEYWORDS: tuple[str, ...] = (
"wildcardprobe",
"tls-alpn-01",
"dns-01",
"acme-challenge",
)
MAX_DOMAIN_LEN = 80
# ── フィルタリングロジック ─────────────────────────────────────────────────────
def is_noise(domain: str) -> bool:
"""True を返すとそのドメインを除外する。"""
if domain.endswith(BANNED_SUFFIXES):
return True
if SCANNER_RE.search(domain):
return True
if len(domain) > MAX_DOMAIN_LEN:
return True
if any(kw in domain for kw in NOISE_KEYWORDS):
return True
# ラベルに数字のみのセグメントが3つ以上(IP埋め込みパターン)
labels = domain.split(".")
numeric_streak = sum(1 for label in labels if label.isdigit())
if numeric_streak >= 3:
return True
return False
def extract_domains(record: dict) -> list[str]:
"""JSON レコードからドメイン候補を全て取り出す。"""
candidates: list[str] = []
for key in ("commonName", "san", "domains"):
value = record.get(key)
if isinstance(value, str):
candidates.append(value)
elif isinstance(value, list):
candidates.extend(v for v in value if isinstance(v, str))
return candidates
# ── メイン処理 ────────────────────────────────────────────────────────────────
def process_stream(input_stream, output_stream, live_mode: bool) -> tuple[int, int, int]:
"""ストリーム単位でデータを処理する(メモリ肥大化を防止)"""
unique_domains: set[str] = set()
total_lines = 0
skipped_json = 0
for line in input_stream:
line = line.strip()
if not line:
continue
total_lines += 1
try:
record = json.loads(line)
except json.JSONDecodeError:
skipped_json += 1
continue
for raw in extract_domains(record):
domain = raw.lower().strip("*. ")
if not domain:
continue
if not is_noise(domain):
# リアルタイム(パイプライン)モードの場合は即時出力
if live_mode:
if domain not in unique_domains:
unique_domains.add(domain)
output_stream.write(f"{domain}\n")
output_stream.flush()
else:
# ファイル保存用のプール
unique_domains.add(domain)
# 一括ファイル出力モードの場合、ソートして書き出し
if not live_mode:
for d in sorted(unique_domains):
output_stream.write(f"{d}\n")
return total_lines, skipped_json, len(unique_domains)
def main() -> None:
parser = argparse.ArgumentParser(
description="Gungnir等のJSONストリームからノイズドメインを高度にフィルタリングするスクリプト。"
)
parser.add_argument(
"-i", "--input",
type=str,
help="入力JSONファイルのパス(指定しない場合は標準入力 STDIN を使用)"
)
parser.add_argument(
"-o", "--output",
type=str,
help="出力クレンジングドメインファイルのパス(指定しない場合は標準出力 STDOUT を使用)"
)
parser.add_argument(
"--live",
action="store_true",
help="リアルタイムストリーム出力モード(パイプライン連携用:ソートを無効化し即時出力)"
)
args = parser.parse_args()
# 入力ソースの決定
if args.input:
input_path = Path(args.input)
if not input_path.exists():
print(f"[!] エラー: 入力ファイルが見つかりません: {args.input}", file=sys.stderr)
sys.exit(1)
inf = input_path.open(encoding="utf-8")
else:
# 画面が詰まらないよう、標準入力が空の場合はヘルプを表示
if sys.stdin.isatty():
parser.print_help()
sys.exit(0)
inf = sys.stdin
# 出力先の決定
# --live モードが指定されているか、または出力先が指定されずSTDOUTに流す場合は、実質的にライブモードとして動かす
live_mode = args.live or (not args.output)
if args.output:
outf = Path(args.output).open("w", encoding="utf-8")
if not live_mode:
print(f"[*] 入力 : {args.input or 'STDIN'}", file=sys.stderr)
print(f"[*] 出力 : {args.output}", file=sys.stderr)
print(f"[*] クレンジング開始 ...", file=sys.stderr)
else:
outf = sys.stdout
try:
total, skipped, cleaned = process_stream(inf, outf, live_mode)
# ファイル出力時のみ統計ログを標準エラー(STDOUTのパイプを汚さないため)に出力
if args.output:
print(f"\n[+] クレンジング完了", file=sys.stderr)
print(f" 解析行数 : {total:,}", file=sys.stderr)
print(f" JSONパースエラー : {skipped:,}", file=sys.stderr)
print(f" 有効ドメイン数 : {cleaned:,}", file=sys.stderr)
finally:
if args.input:
inf.close()
if args.output:
outf.close()
if __name__ == "__main__":
main()
3. 実行例と後続パイプライン
実行
python3 ct_cleaner.py -i results.txt -o clean_domains.txt
[*] 入力ファイル : results.txt
[*] クレンジング開始 ...
[+] 完了
解析行数 : 48,203
JSONパースエラー : 12
クリーンドメイン数 : 3,847 → clean_domains.txt
後続ツールへの連携
# ① CTログからドメインを抽出・クレンジング
python3 ct_cleaner.py
# ② 生存確認(HTTP/HTTPS レスポンス・タイトル・技術スタック検出)
httpx -l clean_domains.txt -title -tech-detect -status-code -o alive_subdomains.txt
# ③ サブドメイン列挙・追加偵察
subfinder -dL clean_domains.txt -o subdomains_expanded.txt
4. フィルタルールの拡張指針
スクリプトは設定ベースで拡張できます。現場に応じて以下を追加してください。
BANNED_SUFFIXES に追記する例
"fly.dev", # Fly.io
"onrender.com", # Render
"herokuapp.com", # Heroku
"vercel.app", # Vercel
"netlify.app", # Netlify
NOISE_KEYWORDS に追記する例
"test.",
"staging.",
"sandbox.",
SCANNER_RE を強化する例
# 8文字以上の連続数字ブロックをDGA疑いとして検出
LONG_DIGIT_RE = re.compile(r"\d{8,}")
まとめ
データ収集ツールを流すだけでは「ノイズの海」に溺れます。CTログが汚く見えるとき、問題はツールではなく背後の構造にあります。
- Let's Encryptのインフラはわかりやすい固有ドメインを使う
- スキャナーのトークンには逆順アルファベットという規則性がある
- DGAは「長すぎる・数字だらけ」という統計的特徴を持つ
これらの「ノイズの文法」を理解すれば、フィルタリングは精度の高いルールベースで十分対応できます。クリーンなドメインリストは、後続の偵察精度を根本から改善します。