先日ご紹介したSPFチェックスクリプトはPythonとシェルスクリプトで構成されていました。
普段の作業環境のMacではBSDのsort、Linuxで実行する時はGNUのsortなので、checkspf.shの方はsortの実装が違っても同じ結果になるように注意していました。
ただ、formatspf.shについてはその辺の配慮が漏れていることに気づき、Gemini CLIを使ってPythonに書き換えてもらいました。
sortコマンドではなくPythonのsortを使うことで実装の違いの影響を回避するという目論見です。
formatspf.py
#!/usr/bin/env python3
import os
import subprocess
import sys
def main():
"""
Runs SPFlatten.py, sorts the IP mechanisms in the output SPF record,
and formats it.
"""
# ----- Replace it with your domain -----
spf_host = '_spf.your-domain.com'
# ---------------------------------------
script_dir = os.path.dirname(os.path.abspath(__file__))
spflatten_script = os.path.join(script_dir, 'SPFlatten.py')
python_executable = os.path.join(script_dir, 'venv/bin/python3')
if not os.path.exists(python_executable):
print(f"Error: Python executable not found at {python_executable}", file=sys.stderr)
sys.exit(1)
if not os.path.exists(spflatten_script):
print(f"Error: SPFlatten.py not found at {spflatten_script}", file=sys.stderr)
sys.exit(1)
# Execute SPFlatten.py and capture the output
try:
process = subprocess.run(
[python_executable, spflatten_script, spf_host],
capture_output=True,
text=True,
check=True
)
output = process.stdout
except subprocess.CalledProcessError as e:
print(f"Error running SPFlatten.py: {e}", file=sys.stderr)
print(f"Stderr: {e.stderr}", file=sys.stderr)
sys.exit(1)
# Process the output
for line in output.splitlines():
if line.startswith('v=spf1'):
parts = line.split()
ip_mechanisms = sorted([p for p in parts if p.startswith('ip4:') or p.startswith('ip6:')])
# Preserve other mechanisms, like '~all'
other_mechanisms = [p for p in parts if not p.startswith('ip4:') and not p.startswith('ip6:') and p != 'v=spf1']
# Reconstruct the record
sorted_spf = " ".join(ip_mechanisms + other_mechanisms)
# Format the final output string
print(f"txt @ v=spf1 include:_spf.google.com {sorted_spf}")
break
if __name__ == "__main__":
main()
ついでに、checkspf.shもGemini CLIでPythonに書き換え。
checkspf.py
#!/usr/bin/env python3
"""
This script checks for changes in SPF records and sends notifications.
"""
import argparse
import os
import sys
import subprocess
import json
import difflib
import ipaddress
import time
from urllib import request, error
from datetime import datetime
import dns.resolver
import re
# --- Configuration ---
# Webhook URLs for Google Chat
ALERT_URL = "your_URL"
STATUS_URL = "your_URL"
# Domain to check and the host with the unflattened SPF record
MYDOMAIN = 'your-domain.com'
SPFHOST = '_spf.your-domain.com'
# --- Paths to Executables ---
PYTHON_EXEC = "./venv/bin/python3"
SPFLATTEN_PY = "./SPFlatten.py"
def send_notification(payload, url, channel_name, dry_run=False):
"""
Sends a JSON payload to a URL, respecting dry_run.
Provides error handling for the request.
"""
if dry_run:
return
print(f"Attempting to send notification to {channel_name}...")
data = json.dumps(payload).encode('utf-8')
headers = {'Content-Type': 'application/json'}
req = request.Request(url, data=data, headers=headers)
try:
with request.urlopen(req) as response:
if 200 <= response.status < 300:
print(f"Notification sent successfully to {channel_name}.")
else:
print(f"Error: Failed to send notification to {channel_name}. Status: {response.status}", file=sys.stderr)
sys.exit(1)
except error.URLError as e:
print(f"Error: Failed to send notification to {channel_name}. Reason: {e.reason}", file=sys.stderr)
sys.exit(1)
def get_spf_record(domain, max_retries=3, delay=5):
"""
Fetches the SPF record for a domain, with retries on failure.
"""
for attempt in range(max_retries):
try:
result = subprocess.run(
[PYTHON_EXEC, SPFLATTEN_PY, domain],
capture_output=True, text=True, check=True
)
spf_raw = result.stdout.strip()
if 'v=spf1' in spf_raw:
return spf_raw
else:
print(f"Warning: Attempt {attempt + 1} for {domain} returned invalid SPF: {spf_raw}", file=sys.stderr)
except subprocess.CalledProcessError as e:
print(f"Warning: Attempt {attempt + 1} for {domain} failed: {e.stderr}", file=sys.stderr)
if attempt < max_retries - 1:
time.sleep(delay)
sys.exit(f"Error: Failed to get a valid SPF record for {domain} after {max_retries} attempts.")
def get_and_flatten_spf(domain):
"""
Gets and flattens an SPF record for a domain, working around issues
with multi-string TXT records that SPFlatten.py cannot handle.
This is a partial re-implementation of SPFlatten.py's logic that
delegates to SPFlatten.py for included domains.
"""
try:
txt_records = dns.resolver.resolve(domain, "TXT")
except dns.exception.DNSException:
sys.exit(f"Error: No TXT records found for {domain}")
spf_string = ""
for record in txt_records:
record_text = b"".join(record.strings).decode("utf-8")
if record_text.startswith("v=spf1"):
spf_string = record_text
break
if not spf_string:
sys.exit(f"Error: No SPF record found for {domain}")
all_parts = []
# Extract IPs from the main record
ips = re.findall(r'ip[46]:\S+', spf_string)
all_parts.extend(ips)
# Handle includes
includes = re.findall(r'include:(\S+)', spf_string)
for included_domain in includes:
flattened_include = get_spf_record(included_domain)
if flattened_include:
included_ips = re.findall(r'ip[46]:\S+', flattened_include)
all_parts.extend(included_ips)
# Extract the "all" mechanism
all_mechanism_match = re.search(r'([?~+-]all)', spf_string)
all_mechanism = all_mechanism_match.group(1) if all_mechanism_match else "~all"
# Deduplicate while preserving order to mimic SPFlatten.py
unique_parts = []
seen = set()
for part in all_parts:
if part not in seen:
unique_parts.append(part)
seen.add(part)
final_spf = "v=spf1 " + " ".join(unique_parts) + " " + all_mechanism
return final_spf
def spf_sort_key(part):
"""
Custom sort key for SPF parts to ensure consistent ordering.
Sorts v=spf1 first, then ip4, then ip6, then other mechanisms, and ~all last.
"""
if part == 'v=spf1':
return (0, 0, '') # type, version, value
if part.startswith('ip4:'):
try:
net = ipaddress.ip_network(part.split(':', 1)[1], strict=False)
return (1, 4, net.network_address)
except ValueError:
return (3, 0, part) # invalid
if part.startswith('ip6:'):
try:
net = ipaddress.ip_network(part.split(':', 1)[1], strict=False)
return (1, 6, net.network_address)
except ValueError:
return (3, 0, part) # invalid
if part == '~all':
return (4, 0, '')
# Default sort for other mechanisms
return (2, 0, part)
def process_spf(spf_string):
"""
Processes a raw SPF string into a sorted, newline-separated list of mechanisms.
"""
if not spf_string:
return []
# Filters for mechanisms starting with v, i, or ~.
# This is to match the logic of the original shell script's `grep '^[vi~]'`.
parts = [part for part in spf_string.split() if part.startswith(('v', 'i', '~'))]
return sorted(parts, key=spf_sort_key)
def main():
"""Main execution of the script."""
parser = argparse.ArgumentParser(description="This script checks for changes in SPF records.")
parser.add_argument(
'-r', '--run',
action='store_true',
help="Run the SPF check and send notifications on changes."
)
parser.add_argument(
'-t', '--test', '--dry-run',
action='store_true',
dest='dry_run',
help="Perform a dry run. Fetches and compares SPF records but does not send notifications."
)
parser.add_argument(
'-x', '--debug',
action='store_true',
help="Enable debug mode for verbose output."
)
args = parser.parse_args()
if not args.run and not args.dry_run:
parser.print_help()
sys.exit("Error: You must specify an operation, either --run or --test.")
# --- Script Setup ---
script_dir = os.path.dirname(os.path.abspath(__file__))
os.chdir(script_dir)
# --- Sanity Checks ---
if not os.access(PYTHON_EXEC, os.X_OK):
sys.exit(f"Error: Python executable not found or not executable at {PYTHON_EXEC}")
if not os.path.isfile(SPFLATTEN_PY):
sys.exit(f"Error: SPFlatten.py not found at {SPFLATTEN_PY}")
print(f"--- Running checkspf.py at {datetime.now()} ---")
# --- Main Execution ---
current_spf_raw = get_and_flatten_spf(MYDOMAIN)
new_spf_raw = get_spf_record(SPFHOST)
current_spf_parts = process_spf(current_spf_raw)
new_spf_parts = process_spf(new_spf_raw)
diff_lines = difflib.unified_diff(
current_spf_parts,
new_spf_parts,
fromfile=f'current_{MYDOMAIN}',
tofile=f'new_{SPFHOST}',
)
diff_text = '\n'.join(diff_lines)
# --- Reporting ---
if diff_text:
message = f"SPF values changed for {MYDOMAIN}\n\n{diff_text}"
payload = {"text": f"```{message}```"}
if args.dry_run:
print(f"SPF values changed for {MYDOMAIN} (Dry Run):")
print(diff_text)
else:
print("SPF values changed. Sending notifications.")
send_notification(payload, ALERT_URL, "IT: Alert", args.dry_run)
send_notification(payload, STATUS_URL, "IT: Update Status", args.dry_run)
else:
message = f"SPF values for {MYDOMAIN} are unchanged."
payload = {"text": f"```{message}```"}
if args.dry_run:
print(f"SPF values for {MYDOMAIN} are unchanged. (Dry Run)")
else:
print("SPF values are unchanged. Sending status update.")
send_notification(payload, STATUS_URL, "IT: Update Status", args.dry_run)
print(f"--- checkspf.py finished at {datetime.now()} ---")
if __name__ == "__main__":
main()
これでsortコマンドの実装の違いを気にする必要がなくなり、さらにdiffコマンドへの依存もなくなりました。
Gemini CLIを使うと、こういったコードの書き換えも簡単にできるのが嬉しいですね。