0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

SPFフラット化と変更チェックスクリプト(2)

Last updated at Posted at 2025-09-01

先日ご紹介したSPFチェックスクリプトはPythonとシェルスクリプトで構成されていました。

普段の作業環境のMacではBSDのsort、Linuxで実行する時はGNUのsortなので、checkspf.shの方はsortの実装が違っても同じ結果になるように注意していました。

ただ、formatspf.shについてはその辺の配慮が漏れていることに気づき、Gemini CLIを使ってPythonに書き換えてもらいました。
sortコマンドではなくPythonのsortを使うことで実装の違いの影響を回避するという目論見です。

formatspf.py
#!/usr/bin/env python3

import os
import subprocess
import sys

def main():
    """
    Runs SPFlatten.py, sorts the IP mechanisms in the output SPF record,
    and formats it.
    """
    # ----- Replace it with your domain -----
    spf_host = '_spf.your-domain.com'
    # ---------------------------------------
    script_dir = os.path.dirname(os.path.abspath(__file__))
    spflatten_script = os.path.join(script_dir, 'SPFlatten.py')
    python_executable = os.path.join(script_dir, 'venv/bin/python3')

    if not os.path.exists(python_executable):
        print(f"Error: Python executable not found at {python_executable}", file=sys.stderr)
        sys.exit(1)

    if not os.path.exists(spflatten_script):
        print(f"Error: SPFlatten.py not found at {spflatten_script}", file=sys.stderr)
        sys.exit(1)

    # Execute SPFlatten.py and capture the output
    try:
        process = subprocess.run(
            [python_executable, spflatten_script, spf_host],
            capture_output=True,
            text=True,
            check=True
        )
        output = process.stdout
    except subprocess.CalledProcessError as e:
        print(f"Error running SPFlatten.py: {e}", file=sys.stderr)
        print(f"Stderr: {e.stderr}", file=sys.stderr)
        sys.exit(1)

    # Process the output
    for line in output.splitlines():
        if line.startswith('v=spf1'):
            parts = line.split()
            
            ip_mechanisms = sorted([p for p in parts if p.startswith('ip4:') or p.startswith('ip6:')])
            
            # Preserve other mechanisms, like '~all'
            other_mechanisms = [p for p in parts if not p.startswith('ip4:') and not p.startswith('ip6:') and p != 'v=spf1']

            # Reconstruct the record
            sorted_spf = " ".join(ip_mechanisms + other_mechanisms)
            
            # Format the final output string
            print(f"txt @ v=spf1 include:_spf.google.com {sorted_spf}")
            break

if __name__ == "__main__":
    main()

ついでに、checkspf.shもGemini CLIでPythonに書き換え。

checkspf.py
#!/usr/bin/env python3

"""
This script checks for changes in SPF records and sends notifications.
"""

import argparse
import os
import sys
import subprocess
import json
import difflib
import ipaddress
import time
from urllib import request, error
from datetime import datetime
import dns.resolver
import re

# --- Configuration ---
# Webhook URLs for Google Chat
ALERT_URL = "your_URL"
STATUS_URL = "your_URL"

# Domain to check and the host with the unflattened SPF record
MYDOMAIN = 'your-domain.com'
SPFHOST = '_spf.your-domain.com'

# --- Paths to Executables ---
PYTHON_EXEC = "./venv/bin/python3"
SPFLATTEN_PY = "./SPFlatten.py"

def send_notification(payload, url, channel_name, dry_run=False):
    """
    Sends a JSON payload to a URL, respecting dry_run.
    Provides error handling for the request.
    """
    if dry_run:
        return

    print(f"Attempting to send notification to {channel_name}...")
    data = json.dumps(payload).encode('utf-8')
    headers = {'Content-Type': 'application/json'}
    req = request.Request(url, data=data, headers=headers)

    try:
        with request.urlopen(req) as response:
            if 200 <= response.status < 300:
                print(f"Notification sent successfully to {channel_name}.")
            else:
                print(f"Error: Failed to send notification to {channel_name}. Status: {response.status}", file=sys.stderr)
                sys.exit(1)
    except error.URLError as e:
        print(f"Error: Failed to send notification to {channel_name}. Reason: {e.reason}", file=sys.stderr)
        sys.exit(1)

def get_spf_record(domain, max_retries=3, delay=5):
    """
    Fetches the SPF record for a domain, with retries on failure.
    """
    for attempt in range(max_retries):
        try:
            result = subprocess.run(
                [PYTHON_EXEC, SPFLATTEN_PY, domain],
                capture_output=True, text=True, check=True
            )
            spf_raw = result.stdout.strip()
            if 'v=spf1' in spf_raw:
                return spf_raw
            else:
                print(f"Warning: Attempt {attempt + 1} for {domain} returned invalid SPF: {spf_raw}", file=sys.stderr)
        except subprocess.CalledProcessError as e:
            print(f"Warning: Attempt {attempt + 1} for {domain} failed: {e.stderr}", file=sys.stderr)

        if attempt < max_retries - 1:
            time.sleep(delay)

    sys.exit(f"Error: Failed to get a valid SPF record for {domain} after {max_retries} attempts.")

def get_and_flatten_spf(domain):
    """
    Gets and flattens an SPF record for a domain, working around issues
    with multi-string TXT records that SPFlatten.py cannot handle.
    This is a partial re-implementation of SPFlatten.py's logic that
    delegates to SPFlatten.py for included domains.
    """
    try:
        txt_records = dns.resolver.resolve(domain, "TXT")
    except dns.exception.DNSException:
        sys.exit(f"Error: No TXT records found for {domain}")

    spf_string = ""
    for record in txt_records:
        record_text = b"".join(record.strings).decode("utf-8")
        if record_text.startswith("v=spf1"):
            spf_string = record_text
            break

    if not spf_string:
        sys.exit(f"Error: No SPF record found for {domain}")

    all_parts = []

    # Extract IPs from the main record
    ips = re.findall(r'ip[46]:\S+', spf_string)
    all_parts.extend(ips)

    # Handle includes
    includes = re.findall(r'include:(\S+)', spf_string)
    for included_domain in includes:
        flattened_include = get_spf_record(included_domain)
        if flattened_include:
            included_ips = re.findall(r'ip[46]:\S+', flattened_include)
            all_parts.extend(included_ips)

    # Extract the "all" mechanism
    all_mechanism_match = re.search(r'([?~+-]all)', spf_string)
    all_mechanism = all_mechanism_match.group(1) if all_mechanism_match else "~all"

    # Deduplicate while preserving order to mimic SPFlatten.py
    unique_parts = []
    seen = set()
    for part in all_parts:
        if part not in seen:
            unique_parts.append(part)
            seen.add(part)

    final_spf = "v=spf1 " + " ".join(unique_parts) + " " + all_mechanism
    return final_spf

def spf_sort_key(part):
    """
    Custom sort key for SPF parts to ensure consistent ordering.
    Sorts v=spf1 first, then ip4, then ip6, then other mechanisms, and ~all last.
    """
    if part == 'v=spf1':
        return (0, 0, '')  # type, version, value
    if part.startswith('ip4:'):
        try:
            net = ipaddress.ip_network(part.split(':', 1)[1], strict=False)
            return (1, 4, net.network_address)
        except ValueError:
            return (3, 0, part)  # invalid
    if part.startswith('ip6:'):
        try:
            net = ipaddress.ip_network(part.split(':', 1)[1], strict=False)
            return (1, 6, net.network_address)
        except ValueError:
            return (3, 0, part)  # invalid
    if part == '~all':
        return (4, 0, '')
    # Default sort for other mechanisms
    return (2, 0, part)

def process_spf(spf_string):
    """
    Processes a raw SPF string into a sorted, newline-separated list of mechanisms.
    """
    if not spf_string:
        return []
    # Filters for mechanisms starting with v, i, or ~.
    # This is to match the logic of the original shell script's `grep '^[vi~]'`.
    parts = [part for part in spf_string.split() if part.startswith(('v', 'i', '~'))]
    return sorted(parts, key=spf_sort_key)

def main():
    """Main execution of the script."""
    parser = argparse.ArgumentParser(description="This script checks for changes in SPF records.")
    parser.add_argument(
        '-r', '--run',
        action='store_true',
        help="Run the SPF check and send notifications on changes."
    )
    parser.add_argument(
        '-t', '--test', '--dry-run',
        action='store_true',
        dest='dry_run',
        help="Perform a dry run. Fetches and compares SPF records but does not send notifications."
    )
    parser.add_argument(
        '-x', '--debug',
        action='store_true',
        help="Enable debug mode for verbose output."
    )

    args = parser.parse_args()

    if not args.run and not args.dry_run:
        parser.print_help()
        sys.exit("Error: You must specify an operation, either --run or --test.")

    # --- Script Setup ---
    script_dir = os.path.dirname(os.path.abspath(__file__))
    os.chdir(script_dir)

    # --- Sanity Checks ---
    if not os.access(PYTHON_EXEC, os.X_OK):
        sys.exit(f"Error: Python executable not found or not executable at {PYTHON_EXEC}")
    if not os.path.isfile(SPFLATTEN_PY):
        sys.exit(f"Error: SPFlatten.py not found at {SPFLATTEN_PY}")

    print(f"--- Running checkspf.py at {datetime.now()} ---")

    # --- Main Execution ---
    current_spf_raw = get_and_flatten_spf(MYDOMAIN)
    new_spf_raw = get_spf_record(SPFHOST)

    current_spf_parts = process_spf(current_spf_raw)
    new_spf_parts = process_spf(new_spf_raw)

    diff_lines = difflib.unified_diff(
        current_spf_parts,
        new_spf_parts,
        fromfile=f'current_{MYDOMAIN}',
        tofile=f'new_{SPFHOST}',
    )
    diff_text = '\n'.join(diff_lines)

    # --- Reporting ---
    if diff_text:
        message = f"SPF values changed for {MYDOMAIN}\n\n{diff_text}"
        payload = {"text": f"```{message}```"}

        if args.dry_run:
            print(f"SPF values changed for {MYDOMAIN} (Dry Run):")
            print(diff_text)
        else:
            print("SPF values changed. Sending notifications.")
            send_notification(payload, ALERT_URL, "IT: Alert", args.dry_run)
            send_notification(payload, STATUS_URL, "IT: Update Status", args.dry_run)
    else:
        message = f"SPF values for {MYDOMAIN} are unchanged."
        payload = {"text": f"```{message}```"}

        if args.dry_run:
            print(f"SPF values for {MYDOMAIN} are unchanged. (Dry Run)")
        else:
            print("SPF values are unchanged. Sending status update.")
            send_notification(payload, STATUS_URL, "IT: Update Status", args.dry_run)

    print(f"--- checkspf.py finished at {datetime.now()} ---")

if __name__ == "__main__":
    main()

これでsortコマンドの実装の違いを気にする必要がなくなり、さらにdiffコマンドへの依存もなくなりました。

Gemini CLIを使うと、こういったコードの書き換えも簡単にできるのが嬉しいですね。

0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?