1
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

PostgreSQL 便利なネットワークアドレス型を活用する

Posted at

過去の投稿で地域インターネットレジストリ(RIR) で公開されているデータから不正アクセスしたホストの国コードとネットワークアドレスを取得する方法を紹介いたしました。

そのときの出力を再掲します。

(py_psycopg2) $ python TestDetectCountryCode.py --target-ip 103.77.241.34
INFO Find 103.77.241.34 in (network: "103.77.240.0/23", country_code: "VN")

上記投稿で紹介した地域インターネットレジストリ(RIR) で公開されているデータはデータベースで管理しており開始IPアドレス列に関しては文字列として格納していました。

そのときに紹介した方法ではネットワークアドレス (CIDR形式) と国コードを取得するまでに2段階の処理を実行していました。

  • 地域インターネットレジストリデータ (テーブル) の検索
    LIKE検索を繰返してIPアドレスを含むと思われるIPv4の開始アドレスの候補を見つける
    ※候補となるIPv4の開始アドレスは数十件〜数万件になる
  • python の ipaddress モジュールを使いネットワークアドレス計算する
    開始IPアドレスとIPアドレスの個数からブロードキャストアドレスを計算して比較処理を繰り返してIPアドレスを含むネットワークアドレスを特定

【処理概要】

qiita_post_No38_overview.png

ただ前回紹介した方法ではせっかく取得したネットワークアドレスが使い捨てになっていました。

ネットワークアドレス型の活用

今回の記事はネットワークアドレスと国コードを新たなテーブルに登録して再利用しようとするもので、最初に紹介した記事の拡張版の位置づけとなります。

ネットワークアドレス(CIDR形式) 103.77.240.0/23PostgreSQLのネットワークアドレス型 (cidr型) で保存すると、IPアドレスを含むネットワークアドレスの検索が非常に簡単になります。

IPアドレスがネットワークアドレスに含まれるかの検索用SQLの例を示します。
cidr_addr は PostgreSQL の CIDR型とします

SELECT cidr_addr FROM match_network WHERE inet '107.172.133.181' << cidr_addr;

1. 実行環境

  • OS: Ubuntu Desktop 22.04
    • データベース
      Dockerコンテナ内で稼働する PostgreSQL-16
    • python 3.10.12
      スクリプト専用の仮想環境を作成し下記ライブラリをインストール
      pip install psycopg2-binary

2. 参考サイト

下記日本語サイトのドキュメントのサンプルを参考にしています。

3. データベース

以降 本投稿用のデータベースを qiita_exampledb 、ユーザ developer として説明します

3-1. テーブル定義

この記事用に作成したテーブル定義

create_match_network.sql
-- Qiita投稿用のテストスキーマ
CREATE SCHEMA mainte2;

-- RIR_ipv4_allocated 検索で見つかったネットワークIP(開始アドレス)と国コードを保持
CREATE TABLE mainte2.match_network(
   id INTEGER NOT NULL,
   cidr_addr CIDR NOT NULL,
   country_code CHAR(2) NOT NULL
);
-- シーケンス定義
CREATE SEQUENCE mainte2.seq_match_network_id OWNED BY mainte2.match_network.id;
-- カラムidのデフォルト値をシーケンスに変更
ALTER TABLE mainte2.match_network ALTER id SET DEFAULT nextval('mainte2.seq_match_network_id');
ALTER TABLE mainte2.match_network ADD CONSTRAINT pk_match_network PRIMARY KEY (id);
-- IPネットワークはユニーク
CREATE UNIQUE INDEX idx_match_network ON mainte2.match_network(cidr_addr);

ALTER SCHEMA mainte2 OWNER TO developer;
ALTER TABLE mainte2.match_network OWNER TO developer;

4. スクリプト

4-1. psqlコマンドで検証

4-1-1. ネットワークアドレス関数と演算子の使い方

dockerコンテナ内の psql コマンドでパイプを通してSQLを実行します。

参考サイト② に掲載されている [表9.38] IPアドレス演算子の中から、「サブネットが完全にサブネットに含まれているか?」で示されているサンプルを実行し使い方を確認します。

1e2b0f7bc9c8:/# echo "SELECT (inet '192.168.1.5' << inet '192.168.1/24')
as include;" | psql -Udeveloper qiita_exampledb
 include 
---------
 t
(1 row)

1e2b0f7bc9c8:/# echo "SELECT (inet '192.168.0.5' << inet '192.168.1/24')
as include;" | psql -Udeveloper qiita_exampledb
 include 
---------
 f
(1 row)

(2) 左辺のIPアドレスがネットワークIP='103.77.240.0/23' に含まれるか検証

  • 左辺のIPアドレス例
    • (1) 含まれるIPアドレス: '103.77.241.34'
    • (2) 含まれるブロードキャストアドレス: '103.77.241.255'
    • (3) 範囲外のブロードキャストアドレス: '103.77.239.255'
# (1) 含まれるIPアドレス: '103.77.241.34' 
1e2b0f7bc9c8:/# echo "SELECT (inet '103.77.241.34' << inet '103.77.240.0/23')
as include;" | psql -Udeveloper qiita_exampledb
 include 
---------
 t
(1 row)

# (2) ブロードキャストアドレス: '103.77.241.255' 
1e2b0f7bc9c8:/# echo "SELECT (inet '103.77.241.255' << inet '103.77.240.0/23')
as include;" | psql -Udeveloper qiita_exampledb
 include 
---------
 t
(1 row)

# (3) 範囲外のブロードキャストアドレス: '103.77.239.255'
1e2b0f7bc9c8:/# /# echo "SELECT (inet '103.77.239.255' << inet '103.77.240.0/23')
 as include;" | psql -Udeveloper qiita_exampledb
 include 
---------
 f
(1 row)

4-1-2. 検索SQL

確認用レコードをネットワークアドレス国コードテーブルに登録

d52acdd248a7:/#echo "INSERT INTO mainte2.match_network(cidr_addr,country_code)
 VALUES 
 ('103.77.240.0/23','VN'),
 ('177.240.0.0/13','MX'),
 ('125.112.0.0/12','CN'),
 ('92.118.36.0/22','RO') RETURNING id,cidr_addr,country_code;" \
 > | psql -Udeveloper qiita_exampledb
 id |    cidr_addr    | country_code 
----+-----------------+--------------
  1 | 103.77.240.0/23 | VN
  2 | 177.240.0.0/13  | MX
  3 | 125.112.0.0/12  | CN
  4 | 92.118.36.0/22  | RO
(4 rows)

INSERT 0 4

(1) レコード有り: IPアドレス('103.77.241.34')

d52acdd248a7:/# echo "SELECT * FROM mainte2.match_network
 WHERE inet '103.77.241.34' << cidr_addr;" \
> | psql -Udeveloper qiita_exampledb
 id |    cidr_addr    | country_code 
----+-----------------+--------------
  1 | 103.77.240.0/23 | VN
(1 row)

(2) レコード無し: IPアドレス('103.77.242.1')

d52acdd248a7:/# echo "SELECT * FROM mainte2.match_network
 WHERE inet '103.77.242.1' << cidr_addr;" \
> | psql -Udeveloper qiita_exampledb
 id | cidr_addr | country_code 
----+-----------+--------------
(0 rows)

(3) 検索シェルスクリプト作成

find_ip_include_match_network.sh
#!/bin/bash

cat <<-EOF | psql -Udeveloper qiita_exampedb
SELECT
  id, cidr_addr, country_code
FROM
  mainte2.match_network
WHERE
  inet '${1}' << cidr_addr;
EOF

スクリプトの実行

※コンテナ実行時のパスは環境依存のため省略しています。

d52acdd248a7:# ./find_ip_include_match_network.sh 92.118.39.133
 id |   cidr_addr    | country_code 
----+----------------+--------------
  1 | 92.118.36.0/22 | RO
(1 row)

4-2. pythonスクリプト

4-2-0. 処理概要

ネットワークアドレス検索処理

  • 入力されたIPアドレスに対し以下の2つの検索処理を実行
    • ① ネットワークアドレス国コードテーブル検索
      • IPアドレスを含むネットワークアドレス (cidr型) のレコード有り
        ネットワークアドレスと国コードを出力して終了
      • レコード無し
        ② RIR ipv4 割当済みテーブル検索へ
    • ② RIR ipv4 割当済みテーブル検索
      • IPアドレスを含むネットワークアドレス (開始IP, IP個数) レコード有り
        (1) ネットワークアドレスと国コードを出力
        (2) 検索結果をネットワークアドレス国コードテーブルに登録する
      • レコード無し
        国コード不明を出力

①の検索で該当するレコードが見つかった場合

TestDetectNetwork_1_find_match_network.png

②の検索処理で該当するレコードが見つかった場合

TestDetectNetwork_2_find_rir_ipv4_allocated.png

4-2-1. クエリー実行関数

テーブル操作関数の共通インポート

from logging import Logger
from typing import Dict, List, Optional, Tuple

import psycopg2
from psycopg2.extensions import connection, cursor
4-2-1 (1) ネットワークアドレス国コード検索

IPアドレスを含むネットワークアドレスがネットワークアドレス国コードテーブルに存在するか検索する

  • 引数
    • DB接続オブジェクト
    • IPアドレス (文字列)
  • 戻り値
    • ターゲットIPを含むネットワークアドレスのレコード有り
      レコード(ID, ネットワークアドレス, 国コード) をタプルで返却
    • 該当レコード無し
      None 返却
def target_ip_include_match_network(
        conn: connection,
        target_ip: str,
        logger: Optional[Logger] = None) -> Optional[Tuple[str, str]]:
    if logger is not None:
        logger.debug(f"target_ip: {target_ip}")

    result: Optional[Tuple[str, str]]
    try:
        cur: cursor
        with conn.cursor() as cur:
            cur.execute("""
SELECT
  cidr_addr, country_code
FROM
  mainte2.match_network
WHERE
  inet %(target_ip)s << cidr_addr""",
                        ({'target_ip': target_ip}))
            row: Optional[Tuple[str, str]] = cur.fetchone()
            if logger is not None:
                if cur.query is not None:
                    logger.debug(f"{cur.query.decode('utf-8')}")
                logger.debug(f"row: {row}")
            result = row
        return result
    except (Exception, psycopg2.DatabaseError) as err:
        raise err

(1) 該当レコード有りの場合のログ出力

DEBUG target_ip: 125.124.213.77
DEBUG 
SELECT
  cidr_addr, country_code
FROM
  mainte2.match_network
WHERE
  inet '125.124.213.77' << cidr_addr
DEBUG row: ('125.112.0.0/12', 'CN')

(2) 該当レコード無しの場合のログ出力

DEBUG target_ip: 119.28.156.59
DEBUG 
SELECT
  cidr_addr, country_code
FROM
  mainte2.match_network
WHERE
  inet '119.28.156.59' << cidr_addr
DEBUG row: None
4-2-1 (2) ネットワークアドレス国コードテーブル登録

RIR ipv4 割当て済みテーブルから取得したネットワークと国コードをネットワークアドレス国コードテーブルに登録する

  • 引数
    • DB接続オブジェクト
    • 登録用辞書オブジェクト
      {"cidr_addr": ネットワークアドレス(CIDR形式),"country_code": 国コード}
  • 戻り値
    • 登録時に採番されたIDとネットワークアドレスのタプル
def insert_match_network(
        conn: connection,
        params: Dict[str, str],
        logger: Optional[Logger] = None) -> Optional[Tuple[int, str]]:
    if logger is not None:
        logger.debug(f"params: {params}")
    try:
        cur: cursor
        with conn.cursor() as cur:
            cur.execute(
                """
INSERT INTO mainte2.match_network(cidr_addr, country_code)
 VALUES (%(cidr_addr)s, %(country_code)s) RETURNING id,cidr_addr""",
                params
            )
            if logger is not None:
                if cur.query is not None:
                    logger.debug(f"{cur.query.decode('utf-8')}")

            # 戻り値: 登録ID, ネットワークアドレス
            row: Optional[Tuple[int, str]] = cur.fetchone()
            if logger is not None:
                logger.debug(row)
            return row
    except (Exception, psycopg2.DatabaseError) as err:
        raise err

関数実行時のDEBUG出力

DEBUG params: {'cidr_addr': '119.28.0.0/15', 'country_code': 'CN'}
DEBUG 
INSERT INTO mainte2.match_network(cidr_addr, country_code)
 VALUES ('119.28.0.0/15', 'CN') RETURNING id,cidr_addr
DEBUG (5, '119.28.0.0/15')

4-2-2. RIR ipv4 割当て済みテーブル検索

先頭の記事で紹介した関数を再掲します。処理内容については先頭記事をご覧ください。

def get_rir_table_matches(
        conn: connection,
        like_ip: str,
        logger: Optional[Logger] = None) -> List[Tuple[str, int, str]]:
    if logger is not None:
        logger.debug(f"like_ip: {like_ip}")
    result: List[Tuple[str, int, str]]
    try:
        cur: cursor
        # 前ゼロ埋めしたIPアドレスの昇順にソートする
        with conn.cursor() as cur:
            cur.execute("""
SELECT
   ip_start,ip_count,country_code
FROM
   mainte2.RIR_ipv4_allocated
WHERE
   ip_start LIKE %(partial_match)s
ORDER BY
 LPAD(SPLIT_PART(ip_start,'.',1), 3, '0') || '.' ||
 LPAD(SPLIT_PART(ip_start,'.',2), 3, '0') || '.' ||
 LPAD(SPLIT_PART(ip_start,'.',3), 3, '0') || '.' ||
 LPAD(SPLIT_PART(ip_start,'.',4), 3, '0')""",
                        ({'partial_match': like_ip}))
            # レコード取得件数チェック
            if cur.rowcount > 0:
                rows: List[Tuple[str, int, str]] = cur.fetchall()
                if logger is not None:
                    logger.debug(f"rows.size: {len(rows)}")
                    for row in rows:
                        logger.debug(f"{row}")
                result = rows
            else:
                # マッチしなかったら空のリスト
                result = []
        return result
    except (Exception, psycopg2.DatabaseError) as err:
        raise err

4-3. 処理メイン処理

4-3-1. RIRテーブル検索メイン処理

複数のクエリー検索実行関数を呼び出しネットワークアドレスと国コードを取得する

  • 検索処理手順
    • ① ネットワークアドレス国コードテーブル検索処理を実行
      ※「# ■ 今回追加した検索処理 ■」で囲った部分
    • ② RIR ipv4 割当済み検索処理実行
      対象となるネットワークアドレスのリストを取得する
    • ③ ネットワークアドレス特定処理を実行
      ※1「# ■ ネットワークアドレス特定処理 ■」で囲った部分
      ※2 前回記事のスクリプトでは、この処理を抜けてから実行していました
  • 戻り値
    • 該当レコードが有り
      ネットワークアドレス・国コード、検索テーブル名を保持するデータクラス
    • 該当レコードが無し
      None
def get_matches_main(
        conn: connection,
        target_ip: str,
        logger: Optional[logging.Logger] = None) -> Optional[MatchNetworkData]:
    def make_like_ip(like_old: str) -> Optional[str]:
        # 末尾の likeプレースホルダを削除する
        raw_ip: str = like_old.replace(".%", "")
        fields: List[str] = raw_ip.split(".")
        # コンマで区切って残りが1つなら終了
        field_size: int = len(fields)
        if field_size == 1:
            return None

        # フィールドを1つ減らす
        del fields[field_size - 1]
        # 末尾にlikeプレースホルダ(".%")を付加して終了
        return ".".join(fields) + ".%"

    # ■ 今回追加した検索処理 ■
    # ネットワークアドレス国コードテーブル検索
    find_rec: Optional[Tuple[str, str]] = target_ip_include_match_network(
        conn, target_ip, logger=logger
    )
    if find_rec is not None:
        # レコードが見つかったら処理終了
        return MatchNetworkData(
            network_ip=find_rec[0], country_code=find_rec[1], from_table="match_network"
        )
    # ■ 今回追加した検索処理 ■

    # RIR_ipv4_allocated テーブル検索処理
    target_ip_addr: IPv4Address = ip_address(target_ip)  # type: ignore
    like_ip: Optional[str] = make_like_ip(target_ip)
    matches: Optional[List[Tuple[str, int, str]]] = None
    while like_ip is not None:
        matches = get_rir_table_matches(conn, like_ip, logger=logger)
        if len(matches) > 0:
            # 先頭レコードの開始IPアドレス
            first_ip: str = matches[0][0]
            first_ip_addr: IPv4Address = ip_address(first_ip)  # type: ignore
            # 最終レコードの開始IPアドレス
            last: Tuple[str, int, str] = matches[-1]
            last_ip: str = last[0]
            ip_cnt: int = int(last[1])
            last_ip_addr: IPv4Address = ip_address(last_ip)  # type: ignore
            # 最終レコードのブロードキャストアドレス計算
            broadcast_addr: IPv4Address = last_ip_addr + ip_cnt - 1  # type: ignore
            if logger is not None:
                logger.info(f"match_first: {first_ip}, match_last: {last_ip}")

            if first_ip_addr < target_ip_addr < broadcast_addr:
                # ターゲットIPが先頭レコードの開始IPと最終レコードのブロードキャストの範囲内なら終了
                if logger is not None:
                    logger.debug(
                        f"Range in ({first_ip} < {target_ip} < {str(broadcast_addr)})"
                        f", break"
                    )
                break
            else:
                # 範囲外: 次のlike検索文字列を生成して検索処理に戻る
                like_ip = make_like_ip(like_ip)
                if logger is not None:
                    logger.info(f"next {like_ip} continue.")
        else:
            # レコード無し: 次のlike検索文字列を生成して検索処理に戻る
            if logger is not None:
                logger.info(f"{like_ip} is no match.")
            like_ip = make_like_ip(like_ip)

    # ■ ネットワークアドレス特定処理 ■
    # ターケットIPが属するネットワークアドレスと国コードを取得する
    upd_cc: Optional[str]
    # ターゲットIPのネットワーク(CIDR表記)と国コードを取得する
    if matches is not None and len(matches) > 0:
        network: Optional[str]
        cc: Optional[str]
        network, cc = detect_cc_in_matches(target_ip, matches, logger=logger)
        if network is not None and cc is not None:
            return MatchNetworkData(
                network_ip=network, country_code=cc, from_table="RIR_ipv4_allocated"
            )
        else:
            # RIR_ipv4_allocated テーブルにターゲットIPを含むネットワークアドレスなし
            return None
    else:
        # RIR_ipv4_allocated テーブルに該当レコードなし
        return None
    # ■ ネットワークアドレス特定処理 ■

4-3-2. バッチメイン処理

  • 処理手順
    • 入力パラメータ処理
      検索対象のIPアドレスを取得する
    • PostgreSQL接続オブジェクト取得
    • RIRテーブル検索メイン処理を実行
    • 検索結果出力
    • ネットワークアドレス国コードテーブル登録処理
      ※1 RIR ipv4 割当済みテーブルから取得した場合に登録する
      ※2 「# ■ 今回追加した登録処理 ■」に挟まれた部分
def batch_main():
    logging.basicConfig(format='%(levelname)s %(message)s')
    app_logger = logging.getLogger(__name__)
    app_logger.setLevel(level=logging.DEBUG)

    parser = argparse.ArgumentParser()
    parser.add_argument("--target-ip", required=True, type=str,
                        help="IP address.")
    parser.add_argument("--enable-debug", action="store_true",
                        help="Enable logger debug out.")
    args: argparse.Namespace = parser.parse_args()
    target_ip: str = args.target_ip
    enable_debug: bool = args.enable_debug

    db: Optional[pgdatabase.PgDatabase] = None
    try:
        db = pgdatabase.PgDatabase(DB_CONF_FILE, logger=None)
        conn: connection = db.get_connection()
        match_data: Optional[MatchNetworkData] = get_matches_main(
            conn, target_ip,
            logger=app_logger if enable_debug else None
        )

        if match_data is not None:
            network: str = match_data.network_ip
            cc: str = match_data.country_code
            from_table: str = match_data.from_table
            app_logger.info(
                f'Find {target_ip} in {from_table}("{network}", country_code: "{cc}")')

            # ■ 今回追加した登録処理 ■
            # 見つかった国コードとネットワークアドレスがRIRレコードなら登録する
            if from_table == "RIR_ipv4_allocated":
                params: Dict[str, str] = {"cidr_addr": network, "country_code": cc}
                inserted: Optional[Tuple[int, str]] = insert_match_network(
                    conn, params, logger=app_logger if enable_debug else None
                )
                conn.commit()
                # 登録結果
                if inserted is not None:
                    reg_id: int = inserted[0]
                    cidr_addr: str = inserted[1]
                    app_logger.info(
                        f'Inserted match_network(id={reg_id}, cidr_addr="{cidr_addr}")')
            # ■ 今回追加した登録処理 ■
        else:
            app_logger.info(f"{target_ip} is not match in tables.")
    except psycopg2.Error as db_err:
        app_logger.error(db_err)
        exit(1)
    except Exception as err:
        app_logger.error(err)
        exit(1)
    finally:
        if db is not None:
            db.close()


if __name__ == '__main__':
    batch_main()

4-4. pythonスクリプトソース

以下 (1)と(2)は最初に紹介した投稿記事のソースを再掲します。

(1) PostgreSQL接続情報 (JSON)

conf/db_conn.json
{
  "host": "{hostname}",
  "port": "5432",
  "database": "qiita_exampledb",
  "user": "developer",
  "password": "developerpassword"
}

(2) PostgreSQL接続オブジェクト取得モジュール

db/pgdatabase.py
import json
import logging
import socket
from typing import Optional
import psycopg2
from psycopg2.extensions import connection

"""
PostgreSQL Database接続生成クラス
"""


class PgDatabase(object):
    def __init__(self, configfile,
                 hostname: Optional[str] = None,
                 logger: Optional[logging.Logger] = None):
        self.logger = logger
        with open(configfile, 'r') as fp:
            db_conf = json.load(fp)
            if hostname is None:
                hostname = socket.gethostname()
            db_conf["host"] = db_conf["host"].format(hostname=hostname)
        # default connection is itarable curosr
        self.conn = psycopg2.connect(**db_conf)
        # Dictinaly-like cursor connection.
        # self.conn = psycopg2.connect(**db_conf, connection_factory=DictConnection)
        if self.logger is not None:
            self.logger.debug(self.conn)

    def get_connection(self) -> connection:
        return self.conn

    def rollback(self) -> None:
        if self.conn is not None:
            self.conn.rollback()

    def commit(self) -> None:
        if self.conn is not None:
            self.conn.commit()

    def close(self) -> None:
        if self.conn is not None:
            if self.logger is not None:
                self.logger.debug(f"Close {self.conn}")
            self.conn.close()

(3) バッチ処理のソース全体を下記に示します。

TestDetectCountryCode_in_match_network.py
import argparse
import csv
import json
import logging
import os
from dataclasses import asdict, dataclass
from ipaddress import (
    ip_address, summarize_address_range, IPv4Address, IPv4Network
)
from logging import Logger

import typing
from typing import Any, List, Dict, Iterator, Optional, Tuple

import psycopg2
from psycopg2.extensions import connection, cursor

from db import pgdatabase


# データベース接続情報
DB_CONF_FILE: str = os.path.join("conf", "db_conn.json")


# RIR データクラス
@dataclass(frozen=True)
class RirRecord:
    ip_start: str
    ip_count: int
    country_code: str


# RIRテーブル検索メイン処理の戻り値
@dataclass(frozen=True)
class MatchNetworkData:
    network_ip: str
    country_code: str
    from_table: str


def read_json(file_name: str) -> Dict[str, Any]:
    with open(file_name, 'r') as fp:
        data = json.load(fp)
    return data


def read_csv(file_name: str,
             skip_header=True, header_cnt=1) -> List[str]:
    with open(file_name, 'r') as fp:
        reader = csv.reader(fp, dialect='unix')
        if skip_header:
            for skip in range(header_cnt):
                next(reader)
        # リストをカンマ区切りで連結する
        lines = [",".join(rec) for rec in reader]
    return lines


@typing.no_type_check
def get_cidr_cc_list(ip_start: str,
                     ip_count: int,
                     country_code: str) -> List[Tuple[IPv4Network, str]]:
    addr_first: IPv4Address = ip_address(ip_start)
    addr_last: IPv4Address = addr_first + ip_count - 1
    cidr_ite: Iterator[IPv4Network] = summarize_address_range(addr_first, addr_last)
    return [(cidr, country_code) for cidr in cidr_ite]


@typing.no_type_check
def detect_cc_in_cidr_cc_list(
        target_ip: str,
        cidr_cc_list: List[Tuple[IPv4Network, str]]
) -> Tuple[Optional[str], Optional[str]]:
    target_ip_addr: IPv4Address = ip_address(target_ip)
    match_network: Optional[str] = None
    match_cc: Optional[str] = None
    for cidr_cc in cidr_cc_list:
        if target_ip_addr in cidr_cc[0]:
            match_network = str(cidr_cc[0])
            match_cc = cidr_cc[1]
            break

    return match_network, match_cc


# RIR_ipv4_allocated のみ mainte2スキーマのものを参照する
def get_rir_table_matches(
        conn: connection,
        like_ip: str,
        logger: Optional[Logger] = None) -> List[Tuple[str, int, str]]:
    if logger is not None:
        logger.debug(f"like_ip: {like_ip}")
    result: List[Tuple[str, int, str]]
    try:
        cur: cursor
        # 前ゼロ埋めしたIPアドレスの昇順にソートする
        with conn.cursor() as cur:
            cur.execute("""
SELECT
   ip_start,ip_count,country_code
FROM
   mainte2.RIR_ipv4_allocated
WHERE
   ip_start LIKE %(partial_match)s
ORDER BY
 LPAD(SPLIT_PART(ip_start,'.',1), 3, '0') || '.' ||
 LPAD(SPLIT_PART(ip_start,'.',2), 3, '0') || '.' ||
 LPAD(SPLIT_PART(ip_start,'.',3), 3, '0') || '.' ||
 LPAD(SPLIT_PART(ip_start,'.',4), 3, '0')""",
                        ({'partial_match': like_ip}))
            # レコード取得件数チェック
            if cur.rowcount > 0:
                rows: List[Tuple[str, int, str]] = cur.fetchall()
                if logger is not None:
                    logger.debug(f"rows.size: {len(rows)}")
                    for row in rows:
                        logger.debug(f"{row}")
                result = rows
            else:
                # マッチしなかったら空のリスト
                result = []
        return result
    except (Exception, psycopg2.DatabaseError) as err:
        raise err


# ターゲットIPのネットワークがネットワークアドレス国コードテーブルに存在するかチェック
def target_ip_include_match_network(
        conn: connection,
        target_ip: str,
        logger: Optional[Logger] = None) -> Optional[Tuple[str, str]]:
    if logger is not None:
        logger.debug(f"target_ip: {target_ip}")

    result: Optional[Tuple[str, str]]
    try:
        cur: cursor
        with conn.cursor() as cur:
            cur.execute("""
SELECT
  cidr_addr, country_code
FROM
  mainte2.match_network
WHERE
  inet %(target_ip)s << cidr_addr""",
                        ({'target_ip': target_ip}))
            row: Optional[Tuple[str, str]] = cur.fetchone()
            if logger is not None:
                if cur.query is not None:
                    logger.debug(f"{cur.query.decode('utf-8')}")
                logger.debug(f"row: {row}")
            result = row
        return result
    except (Exception, psycopg2.DatabaseError) as err:
        raise err


# ネットワークアドレス国コードテーブルに一括登録
def insert_match_network(
        conn: connection,
        params: Dict[str, str],
        logger: Optional[Logger] = None) -> Optional[Tuple[int, str]]:
    if logger is not None:
        logger.debug(f"params: {params}")
    try:
        cur: cursor
        with conn.cursor() as cur:
            cur.execute(
                """
INSERT INTO mainte2.match_network(cidr_addr, country_code)
 VALUES (%(cidr_addr)s, %(country_code)s) RETURNING id,cidr_addr""",
                params
            )
            if logger is not None:
                if cur.query is not None:
                    logger.debug(f"{cur.query.decode('utf-8')}")

            # 戻り値: 登録ID, ネットワークアドレス
            row: Optional[Tuple[int, str]] = cur.fetchone()
            if logger is not None:
                logger.debug(row)
            return row
    except (Exception, psycopg2.DatabaseError) as err:
        raise err


def detect_cc_in_matches(
        target_ip: str,
        matches: List[Tuple[str, int, str]],
        logger: Optional[logging.Logger] = None) -> Tuple[Optional[str], Optional[str]]:
    def next_record(rows: List[Tuple[str, int, str]]) -> Iterator[RirRecord]:
        for (ip_sta, ip_cnt, cc) in rows:
            yield RirRecord(ip_start=ip_sta, ip_count=ip_cnt, country_code=cc)

    target_ip_addr: IPv4Address = ip_address(target_ip)  # type: ignore
    match_network: Optional[str] = None
    match_cc: Optional[str] = None
    rec: RirRecord
    for rec in next_record(matches):
        # ターゲットIP が ネットワークIPアドレスより大きい場合は範囲外のため処理終了
        if ip_address(rec.ip_start) > target_ip_addr:  # type: ignore
            if logger is not None:
                logger.debug(
                    f"{target_ip} < {rec.ip_start} break. No more match."
                )
            # マッチするデータなし
            break

        # 開始ネットワークIPのブロードキャストアドレスがターゲットIPより小さければ次のレコードへ
        broadcast_addr: IPv4Address = (
                ip_address(rec.ip_start) + rec.ip_count - 1)  # type: ignore
        if broadcast_addr < target_ip_addr:
            if logger is not None:
                logger.debug(f"({str(broadcast_addr)} < {target_ip}) -> continue")
            continue

        cidr_cc_list: List[Tuple[IPv4Network, str]] = get_cidr_cc_list(
            **asdict(rec)
        )
        if logger is not None:
            logger.debug(cidr_cc_list)
        match_network, match_cc = detect_cc_in_cidr_cc_list(target_ip, cidr_cc_list)
        break

    return match_network, match_cc


def get_matches_main(
        conn: connection,
        target_ip: str,
        logger: Optional[logging.Logger] = None) -> Optional[MatchNetworkData]:
    def make_like_ip(like_old: str) -> Optional[str]:
        # 末尾の likeプレースホルダを削除する
        raw_ip: str = like_old.replace(".%", "")
        fields: List[str] = raw_ip.split(".")
        # コンマで区切って残りが1つなら終了
        field_size: int = len(fields)
        if field_size == 1:
            return None

        # フィールドを1つ減らす
        del fields[field_size - 1]
        # 末尾にlikeプレースホルダ(".%")を付加して終了
        return ".".join(fields) + ".%"

    # ネットワークアドレス国コードテーブル検索
    find_rec: Optional[Tuple[str, str]] = target_ip_include_match_network(
        conn, target_ip, logger=logger
    )
    if find_rec is not None:
        # レコードが見つかったら処理終了
        return MatchNetworkData(
            network_ip=find_rec[0], country_code=find_rec[1], from_table="match_network"
        )

    # RIR_ipv4_allocated テーブル検索処理
    target_ip_addr: IPv4Address = ip_address(target_ip)  # type: ignore
    like_ip: Optional[str] = make_like_ip(target_ip)
    matches: Optional[List[Tuple[str, int, str]]] = None
    while like_ip is not None:
        matches = get_rir_table_matches(conn, like_ip, logger=logger)
        if len(matches) > 0:
            # 先頭レコードの開始IPアドレス
            first_ip: str = matches[0][0]
            first_ip_addr: IPv4Address = ip_address(first_ip)  # type: ignore
            # 最終レコードの開始IPアドレス
            last: Tuple[str, int, str] = matches[-1]
            last_ip: str = last[0]
            ip_cnt: int = int(last[1])
            last_ip_addr: IPv4Address = ip_address(last_ip)  # type: ignore
            # 最終レコードのブロードキャストアドレス計算
            broadcast_addr: IPv4Address = last_ip_addr + ip_cnt - 1  # type: ignore
            if logger is not None:
                logger.info(f"match_first: {first_ip}, match_last: {last_ip}")

            if first_ip_addr < target_ip_addr < broadcast_addr:
                # ターゲットIPが先頭レコードの開始IPと最終レコードのブロードキャストの範囲内なら終了
                if logger is not None:
                    logger.debug(
                        f"Range in ({first_ip} < {target_ip} < {str(broadcast_addr)})"
                        f", break"
                    )
                break
            else:
                # 範囲外: 次のlike検索文字列を生成して検索処理に戻る
                like_ip = make_like_ip(like_ip)
                if logger is not None:
                    logger.info(f"next {like_ip} continue.")
        else:
            # レコード無し: 次のlike検索文字列を生成して検索処理に戻る
            if logger is not None:
                logger.info(f"{like_ip} is no match.")
            like_ip = make_like_ip(like_ip)

    # ターケットIPが属するネットワークアドレスと国コードを取得する
    upd_cc: Optional[str]
    # ターゲットIPのネットワーク(CIDR表記)と国コードを取得する
    if matches is not None and len(matches) > 0:
        network: Optional[str]
        cc: Optional[str]
        network, cc = detect_cc_in_matches(target_ip, matches, logger=logger)
        if network is not None and cc is not None:
            return MatchNetworkData(
                network_ip=network, country_code=cc, from_table="RIR_ipv4_allocated"
            )
        else:
            # RIR_ipv4_allocated テーブルにターゲットIPを含むネットワークアドレスなし
            return None
    else:
        # RIR_ipv4_allocated テーブルに該当レコードなし
        return None


def batch_main():
    logging.basicConfig(format='%(levelname)s %(message)s')
    app_logger = logging.getLogger(__name__)
    app_logger.setLevel(level=logging.DEBUG)

    parser = argparse.ArgumentParser()
    parser.add_argument("--target-ip", required=True, type=str,
                        help="IP address.")
    parser.add_argument("--enable-debug", action="store_true",
                        help="Enable logger debug out.")
    args: argparse.Namespace = parser.parse_args()
    target_ip: str = args.target_ip
    enable_debug: bool = args.enable_debug

    db: Optional[pgdatabase.PgDatabase] = None
    try:
        db = pgdatabase.PgDatabase(DB_CONF_FILE, logger=None)
        conn: connection = db.get_connection()
        match_data: Optional[MatchNetworkData] = get_matches_main(
            conn, target_ip,
            logger=app_logger if enable_debug else None
        )

        if match_data is not None:
            network: str = match_data.network_ip
            cc: str = match_data.country_code
            from_table: str = match_data.from_table
            app_logger.info(
                f'Find {target_ip} in {from_table}("{network}", country_code: "{cc}")')

            # 見つかった国コードとネットワークアドレスがRIRレコードなら登録する
            if from_table == "RIR_ipv4_allocated":
                params: Dict[str, str] = {"cidr_addr": network, "country_code": cc}
                inserted: Optional[Tuple[int, str]] = insert_match_network(
                    conn, params, logger=app_logger if enable_debug else None
                )
                conn.commit()
                # 登録結果
                if inserted is not None:
                    reg_id: int = inserted[0]
                    cidr_addr: str = inserted[1]
                    app_logger.info(
                        f'Inserted match_network(id={reg_id}, cidr_addr="{cidr_addr}")')
        else:
            app_logger.info(f"{target_ip} is not match in tables.")
    except psycopg2.Error as db_err:
        app_logger.error(db_err)
        exit(1)
    except Exception as err:
        app_logger.error(err)
        exit(1)
    finally:
        if db is not None:
            db.close()


if __name__ == '__main__':
    batch_main()

5. pythonスクリプトの実行

5-1. テストレコード

「4-1-2. 検索SQL」で登録したレコードを再掲します

21d653972513:/# echo "SELECT * FROM mainte2.match_network ORDER BY id;" \
> | psql -Udeveloper qiita_exampledb
 id |    cidr_addr    | country_code 
----+-----------------+--------------
  1 | 103.77.240.0/23 | VN
  2 | 177.240.0.0/13  | MX
  3 | 125.112.0.0/12  | CN
  4 | 92.118.36.0/22  | RO
(4 rows)

5-2. スクリプト実行

python 仮想環境 (py_psycopg2) に入る

$ . py_venv/py_psycopg2/bin/activate
(py_psycopg2) $ 

(1) ネットワークアドレス国コードテーブルに該当レード有り

(py_psycopg2) $ python TestDetectCountryCode_in_match_network.py \
> --target-ip 125.124.213.77 --enable-debug
DEBUG target_ip: 125.124.213.77
DEBUG 
SELECT
  cidr_addr, country_code
FROM
  mainte2.match_network
WHERE
  inet '125.124.213.77' << cidr_addr
DEBUG row: ('125.112.0.0/12', 'CN')
INFO Find 125.124.213.77 in match_network("125.112.0.0/12", country_code: "CN")

(2) RIR ipv4 割当済みテーブルに該当レード有り
※ネットワークアドレス国コードテーブル該当レード無し

(py_psycopg2) $ python TestDetectCountryCode_in_match_network.py \
> --target-ip 119.28.156.59 --enable-debug
INFO target_ip: 119.28.156.59, enable_debug: True
DEBUG target_ip: 119.28.156.59
DEBUG 
SELECT
  cidr_addr, country_code
FROM
  mainte2.match_network
WHERE
  inet '119.28.156.59' << cidr_addr
DEBUG row: None
DEBUG like_ip: 119.28.156.%
INFO 119.28.156.% is no match.
DEBUG like_ip: 119.28.%
DEBUG rows.size: 1
DEBUG ('119.28.0.0', 131072, 'CN')
INFO match_first: 119.28.0.0, match_last: 119.28.0.0
DEBUG Range in (119.28.0.0 < 119.28.156.59 < 119.29.255.255), break
DEBUG [(IPv4Network('119.28.0.0/15'), 'CN')]
INFO Find 119.28.156.59 in RIR_ipv4_allocated("119.28.0.0/15", country_code: "CN")
DEBUG network_addr: 119.28.0.0/15
DEBUG 
SELECT id FROM mainte2.match_network WHERE cidr_addr = '119.28.0.0/15'
DEBUG row : None
DEBUG params: {'cidr_addr': '119.28.0.0/15', 'country_code': 'CN'}
DEBUG 
INSERT INTO mainte2.match_network(cidr_addr, country_code)
 VALUES ('119.28.0.0/15', 'CN') RETURNING id,cidr_addr
DEBUG (5, '119.28.0.0/15')
INFO Inserted match_network(id=5, cidr_addr="119.28.0.0/15")

(3) ネットワークアドレス不明

(py_psycopg2) $ python TestDetectCountryCode_in_match_network.py \
> --target-ip 103.253.175.77 --enable-debug
DEBUG target_ip: 103.253.175.77
DEBUG 
SELECT
  cidr_addr, country_code
FROM
  mainte2.match_network
WHERE
  inet '103.253.175.77' << cidr_addr
DEBUG row: None
DEBUG like_ip: 103.253.175.%
INFO 103.253.175.% is no match.
DEBUG like_ip: 103.253.%
DEBUG rows.size: 52
DEBUG ('103.253.0.0', 1024, 'ID')
DEBUG ('103.253.4.0', 1024, 'CN')
DEBUG ('103.253.8.0', 1024, 'HK')
DEBUG ('103.253.12.0', 1024, 'MY')
DEBUG ('103.253.16.0', 512, 'VN')
DEBUG ('103.253.18.0', 512, 'PK')
DEBUG ('103.253.20.0', 512, 'VN')
DEBUG ('103.253.22.0', 512, 'VN')
DEBUG ('103.253.24.0', 1024, 'SG')
DEBUG ('103.253.32.0', 1024, 'IN')
DEBUG ('103.253.36.0', 512, 'LT')
DEBUG ('103.253.40.0', 1024, 'HK')
DEBUG ('103.253.44.0', 1024, 'BD')
DEBUG ('103.253.48.0', 1024, 'NZ')
DEBUG ('103.253.60.0', 1024, 'CN')
DEBUG ('103.253.64.0', 1024, 'AU')
DEBUG ('103.253.71.0', 256, 'IN')
DEBUG ('103.253.72.0', 1024, 'TH')
DEBUG ('103.253.76.0', 1024, 'JP')
DEBUG ('103.253.84.0', 512, 'JP')
DEBUG ('103.253.92.0', 1024, 'AU')
DEBUG ('103.253.96.0', 1024, 'CA')
DEBUG ('103.253.100.0', 512, 'MY')
DEBUG ('103.253.102.0', 512, 'BD')
DEBUG ('103.253.106.0', 512, 'ID')
DEBUG ('103.253.108.0', 1024, 'MY')
DEBUG ('103.253.128.0', 1024, 'IN')
DEBUG ('103.253.132.0', 1024, 'TH')
DEBUG ('103.253.136.0', 1024, 'NZ')
DEBUG ('103.253.140.0', 1024, 'ES')
DEBUG ('103.253.144.0', 1024, 'SG')
DEBUG ('103.253.156.0', 1024, 'PK')
DEBUG ('103.253.160.0', 1024, 'BD')
DEBUG ('103.253.164.0', 512, 'ID')
DEBUG ('103.253.168.0', 1024, 'IN')
DEBUG ('103.253.176.0', 512, 'BD')
DEBUG ('103.253.180.0', 1024, 'KH')
DEBUG ('103.253.188.0', 1024, 'JP')
DEBUG ('103.253.192.0', 256, 'NZ')
DEBUG ('103.253.193.0', 256, 'AU')
DEBUG ('103.253.194.0', 512, 'NZ')
DEBUG ('103.253.200.0', 1024, 'IN')
DEBUG ('103.253.204.0', 1024, 'CN')
DEBUG ('103.253.208.0', 1024, 'IN')
DEBUG ('103.253.216.0', 1024, 'JP')
DEBUG ('103.253.220.0', 1024, 'CN')
DEBUG ('103.253.224.0', 1024, 'CN')
DEBUG ('103.253.228.0', 256, 'TH')
DEBUG ('103.253.232.0', 1024, 'CN')
DEBUG ('103.253.236.0', 1024, 'HK')
DEBUG ('103.253.240.0', 1024, 'KR')
DEBUG ('103.253.248.0', 1024, 'HK')
INFO match_first: 103.253.0.0, match_last: 103.253.248.0
DEBUG Range in (103.253.0.0 < 103.253.175.77 < 103.253.251.255), break
DEBUG (103.253.3.255 < 103.253.175.77) -> continue
DEBUG (103.253.7.255 < 103.253.175.77) -> continue
DEBUG (103.253.11.255 < 103.253.175.77) -> continue
DEBUG (103.253.15.255 < 103.253.175.77) -> continue
DEBUG (103.253.17.255 < 103.253.175.77) -> continue
DEBUG (103.253.19.255 < 103.253.175.77) -> continue
DEBUG (103.253.21.255 < 103.253.175.77) -> continue
DEBUG (103.253.23.255 < 103.253.175.77) -> continue
DEBUG (103.253.27.255 < 103.253.175.77) -> continue
DEBUG (103.253.35.255 < 103.253.175.77) -> continue
DEBUG (103.253.37.255 < 103.253.175.77) -> continue
DEBUG (103.253.43.255 < 103.253.175.77) -> continue
DEBUG (103.253.47.255 < 103.253.175.77) -> continue
DEBUG (103.253.51.255 < 103.253.175.77) -> continue
DEBUG (103.253.63.255 < 103.253.175.77) -> continue
DEBUG (103.253.67.255 < 103.253.175.77) -> continue
DEBUG (103.253.71.255 < 103.253.175.77) -> continue
DEBUG (103.253.75.255 < 103.253.175.77) -> continue
DEBUG (103.253.79.255 < 103.253.175.77) -> continue
DEBUG (103.253.85.255 < 103.253.175.77) -> continue
DEBUG (103.253.95.255 < 103.253.175.77) -> continue
DEBUG (103.253.99.255 < 103.253.175.77) -> continue
DEBUG (103.253.101.255 < 103.253.175.77) -> continue
DEBUG (103.253.103.255 < 103.253.175.77) -> continue
DEBUG (103.253.107.255 < 103.253.175.77) -> continue
DEBUG (103.253.111.255 < 103.253.175.77) -> continue
DEBUG (103.253.131.255 < 103.253.175.77) -> continue
DEBUG (103.253.135.255 < 103.253.175.77) -> continue
DEBUG (103.253.139.255 < 103.253.175.77) -> continue
DEBUG (103.253.143.255 < 103.253.175.77) -> continue
DEBUG (103.253.147.255 < 103.253.175.77) -> continue
DEBUG (103.253.159.255 < 103.253.175.77) -> continue
DEBUG (103.253.163.255 < 103.253.175.77) -> continue
DEBUG (103.253.165.255 < 103.253.175.77) -> continue
DEBUG (103.253.171.255 < 103.253.175.77) -> continue
DEBUG 103.253.175.77 < 103.253.176.0 break. No more match.
INFO 103.253.175.77 is not match in tables.

6. 最後に

この記事の先頭で紹介した投稿記事では「RIR ipv4 割当済みテーブル」にIPアドレスを文字列で格納していました。

今となってわかったのですがネットワークアドレス (CIDR形式) がわかれば CIDR型で登録したほうが検索ロジックは非常に簡単になることがわかりました。

「RIR ipv4 割当済みテーブル」にインポートしたCSVファイルは以下のようになっていました。

ipv4-all-2024-08-30.csv
"ip_start","ip_count","country_code","allocated_date","registry_id"
"1.0.1.0",256,"CN","20110414",1
"1.0.2.0",512,"CN","20110414",1
"1.0.4.0",1024,"AU","20110412",1
"1.0.8.0",2048,"CN","20110412",1
"1.0.16.0",4096,"JP","20110412",1
...以下省略...

"ip_start" と "ip_count" 列の値から、下記 "network_addr" 列のように cidr 形式に変換することでCDIR型でのインポートが可能になります。

ipv4-all-cidr2024-08-30.csv
"network_addr","country_code","allocated_date","registry_id"
"1.0.1.0/24","CN","20110414",1
"1.0.2.0/23","CN","20110414",1
"1.0.4.0/22","AU","20110412",1
"1.0.8.0/21","CN","20110412",1
"1.0.16.0/20","JP","20110412",1
...以下省略...

但し、検索処理としては簡単なるもののRIRデータは日々更新されているためテーブルのメンテナンスの難易度が上がってしまう可能性があるのが玉に瑕。

上記のようなCSVファイルに変換する方法については次回以降に投稿する予定です。

1
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
1
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?