0
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

地域インターネットレジストリデータをネットワークアドレスに変換する

Last updated at Posted at 2024-10-17

ネットワークアドレスへの変換方法

前回投稿で PostgreSQLのネットワークアドレス型を活用すると不正アクセスしたホストの国コード検索が簡単になることを紹介いたしました。

また下記の投稿では、地域インターネットレジストリ(RIR)データをダウンロードしテーブルインポート用の CSV ファイルを作成するシェルスクリプト も紹介しました。

本記事では地域インターネットレジストリデータから作成したデータベースインポート用のCSVファイルの開始IPアドレスとIP件数からネットワークアドレス (cidr形式) に変換した CSV ファイルを出力する python スクリプトを紹介いたします。

地域インターネットレジストリ (APNIC) よりダウンロードしたファイルの中から IPv4 アドレスの割り当て済み (allocated) のものを抽出したデータの抜粋は以下のようになっていました。※2024-08-30日時点

delegated-apnic-latest.txt
apnic|CN|ipv4|1.0.1.0|256|20110414|allocated
apnic|CN|ipv4|1.0.2.0|512|20110414|allocated
apnic|AU|ipv4|1.0.4.0|1024|20110412|allocated
apnic|CN|ipv4|1.0.8.0|2048|20110412|allocated
apnic|JP|ipv4|1.0.16.0|4096|20110412|allocated
...以下省略...

記事「不正アクセスしてきたホストの国コードを知ってセキュリィティに活用する」で紹介した データベースへのインポート用に変換した CSV ファイルの抜粋を下記に示します。

ipv4-all-2024-08-30.csv
"ip_start","ip_count","country_code","allocated_date","registry_id"
"1.0.1.0",256,"CN","20110414",1
"1.0.2.0",512,"CN","20110414",1
"1.0.4.0",1024,"AU","20110412",1
"1.0.8.0",2048,"CN","20110412",1
"1.0.16.0",4096,"JP","20110412",1
...以下省略...

今回紹介するpythonスクリプトが出力するCSVファイルの内容は以下の通りで、先頭列 ("network_addr") のように変換することで、PostgreSQLのネットワークアドレス型 (CIDR型) としてインポート可能になります。

ipv4-all-2024-08-30_cidr.csv
"network_addr","country_code","allocated_date","registry_id"
"1.0.1.0/24","CN","20110414",1
"1.0.2.0/23","CN","20110414",1
"1.0.4.0/22","AU","20110412",1
"1.0.8.0/21","CN","20110412",1
"1.0.16.0/20","JP","20110412",1
...以下省略...

1. 実行環境

  • OS: Ubuntu Desktop 22.04
    • データベース
      Dockerコンテナ内で稼働する PostgreSQL-16
    • python 3.10.12
      スクリプト専用の仮想環境を作成し下記ライブラリをインストール
      pip install psycopg2-binary

2. データベース

以降 本投稿用のデータベースを qiita_exampledb 、ユーザ developer として説明します。

2-1. テーブル定義

テーブル名 RIR_ipv4_allocated_cidr, network_addr 列を PostgreSQL の CIDR型で保持する。

CREATE TABLE mainte2.RIR_ipv4_allocated_cidr(
   network_addr CIDR NOT NULL,
   country_code CHAR(2) NOT NULL,
   allocated_date DATE NOT NULL,
   registry_id SMALLINT NOT NULL
);

記事「不正アクセスしてきたホストの国コードを知ってセキュリィティに活用する」で紹介したテーブル定義は以下のようになっていました。

CREATE TABLE mainte2.RIR_ipv4_allocated(
   ip_start VARCHAR(15) NOT NULL,
   ip_count INTEGER NOT NULL,
   country_code CHAR(2) NOT NULL,
   allocated_date DATE NOT NULL,
   registry_id SMALLINT NOT NULL
);

上記 RIR_ipv4_allocated テーブルで、2024-08-30日時点の 地域インターネットレジストリデータの ipv4 アドレスの IP 個数を集計したものを下記に示します。

1e2b0f7bc9c8:/# echo "SELECT
  ip_count, COUNT(ip_count)
FROM
  mainte2.RIR_ipv4_allocated
GROUP BY ip_count ORDER BY ip_count DESC;" | psql -Udeveloper qiita_exampledb
 ip_count | count 
----------+-------
 16777216 |    19
  8388608 |    12
  4194304 |    58
  2097152 |   140
  1048576 |   350
   524288 |   622
   393216 |     1
   262144 |  1331
   131072 |  2019
    65536 | 10520
    33792 |     1
    32768 |  3742
    16384 |  5861
    13312 |     1
     8192 | 11972
     7168 |     1
     4096 | 15326
     2560 |     1
     2304 |     1
     2048 | 15611
     1536 |     1
     1280 |     1
     1024 | 62612
      512 | 21099
      256 | 50495
      128 |     5
       64 |     2
(27 rows)

上記の ip_count列で 2^X (X=6 から ) になっていない IP 個数が割り当てられているために単純に変換することができません。

3. python スクリプト

3-0. 単純に変換できないネットワークアドレス

IP個数が 2^X にならないデータを2つ示します。

"164.146.0.0",393216,"ZA","19930312",2
"87.116.83.0",2304,"BG","20050913",5

ネットワークアドレスを python コンソール上で計算してみます。

(1) 開始IP="164.146.0.0", IP個数=393216

>>> from ipaddress import ip_address, summarize_address_range
>>> ip_start="164.146.0.0"
>>> ip_count=393216
>>> addr_start=ip_address(ip_start)
>>> addr_start
IPv4Address('164.146.0.0')
>>> addr_bcast=addr_start+ip_count-1
>>> addr_bcast
IPv4Address('164.151.255.255')
>>> cidr_iter=summarize_address_range(addr_start, addr_bcast)
>>> cidr_data=[cidr for cidr in cidr_iter]
>>> cidr_data
[IPv4Network('164.146.0.0/15'), IPv4Network('164.148.0.0/14')]

(2) 開始IP="87.116.83.0", IP個数=2304

>>> ip_start="87.116.83.0"
>>> ip_count=2304
>>> addr_start=ip_address(ip_start)
>>> addr_start
IPv4Address('87.116.83.0')
>>> addr_bcast=addr_start + ip_count -1
>>> addr_bcast
IPv4Address('87.116.91.255')
>>> cidr_iter=summarize_address_range(addr_start, addr_bcast)
>>> cidr_data = [cidr for cidr in cidr_iter]
>>> cidr_data
[IPv4Network('87.116.83.0/24'), IPv4Network('87.116.84.0/22'), IPv4Network('87.116.88.0/22')]

IP個数が 2^X にならない開始IPアドレスのネットワークアドレスは複数になります。

※ IP個数が 2^X になる場合は、ネットワークアドレスは常に1つです。
[例] 64 (/26), 128 (/25), 256 (/24), 512 (/23), ...,

3-1. 関数定義

モジュールのインポート

import argparse
import csv
import logging
import os
import typing
from typing import Iterator, List, Optional, Tuple

from ipaddress import (
    ip_address, summarize_address_range, IPv4Address, IPv4Network
)

3-1-1. ページング

地域インターネットレジストリデータはレコード件数が20万件を超えるため下記のようなジェネレーター関数で CSVファイルをページ単位で読み込みします。

デフォルトのページサイズは 5000 で、yield pages で CSVファイル 5000 行分のリストを生成します。

def paginate(reader: Iterator, page_size: int = 5000):
    pages: List = []
    it: Iterator = iter(reader)
    while True:
        try:
            for i in range(page_size):
                pages.append(next(it))
            yield pages
            pages = []
        except StopIteration:
            if pages:
                yield pages
            return

3-1-2. ファイル操作処理

3-1-2 (1) 読み込み用ファイルオブジェクト取得

読み込みモードでファイルをオープンし、ファイルオブジェクトを取得します。

def get_file_reader(file_name: str):
    fp = open(file_name, mode='r')
    return fp
3-1-2 (2) 書き込み用ファイルオブジェクト取得

書き込みモードでファイルをオープンし、ファイルオブジェクトを取得します。

def get_file_writer(file_name: str):
    fp = open(file_name, mode='w')
    return fp

3-1-3. ネットワークアドレス変換処理

ジェネレーターが生成するリスト (ページ単位) の変換処理を実行します。

内部関数 get_cidr_list は開始IPアドレスとIP個数からネットワークアドレスのリストを生成します。

def ip_start_to_cidr_network(csv_lines: List[List]) -> List[str]:
    @typing.no_type_check
    def get_cidr_list(ip_start: str,
                      ip_count: int) -> List[str]:
        addr_first: IPv4Address = ip_address(ip_start)
        addr_last: IPv4Address = addr_first + ip_count - 1
        cidr_ite: Iterator[IPv4Network] = summarize_address_range(addr_first, addr_last)
        return [str(network) for network in cidr_ite]

    result: List[str] = []
    for csv_line in csv_lines:
        cc: str = csv_line[2]
        alloc_date: str = csv_line[3]
        registry_id: int = int(csv_line[4])
        cidr_list: List[str] = get_cidr_list(csv_line[0], int(csv_line[1]))
        for cidr in cidr_list:
            out_line: str = f'"{cidr}","{cc}","{alloc_date}",{registry_id}\n'
            result.append(out_line)
    return result

上記関数の引数 csv_lines と変換結果 result のDEUBG時の内容を下記に示します。

csv_lines = {list: 5} [['1.0.1.0', '256', 'CN', '20110414', '1'], ['1.0.2.0', '512', 'CN', '20110414', '1'], ['1.0.4.0', '1024', 'AU', '20110412', '1'], ['1.0.8.0', '2048', 'CN', '20110412', '1'], ['1.0.16.0', '4096', 'JP', '20110412', '1']]
 0 = {list: 5} ['1.0.1.0', '256', 'CN', '20110414', '1']
 1 = {list: 5} ['1.0.2.0', '512', 'CN', '20110414', '1']
 2 = {list: 5} ['1.0.4.0', '1024', 'AU', '20110412', '1']
 3 = {list: 5} ['1.0.8.0', '2048', 'CN', '20110412', '1']
 4 = {list: 5} ['1.0.16.0', '4096', 'JP', '20110412', '1']
 __len__ = {int} 5

 result = {list: 5} ['"1.0.1.0/24","CN","20110414",1\n', '"1.0.2.0/23","CN","20110414",1\n', '"1.0.4.0/22","AU","20110412",1\n', '"1.0.8.0/21","CN","20110412",1\n', '"1.0.16.0/20","JP","20110412",1\n']
 0 = {str} '"1.0.1.0/24","CN","20110414",1\n'
 1 = {str} '"1.0.2.0/23","CN","20110414",1\n'
 2 = {str} '"1.0.4.0/22","AU","20110412",1\n'
 3 = {str} '"1.0.8.0/21","CN","20110412",1\n'
 4 = {str} '"1.0.16.0/20","JP","20110412",1\n'
 __len__ = {int} 5

3-1-4. 変換処理メイン関数

3-1-4. (1) 入力パラメータ取得処理
  • オリジナルのCSVファイルパス: --csv-file ※必須
  • 変換処理後のCSVファイル出力ディレクトリ: --output-dir ※任意
    ※未指定の場合は、オリジナルのCSVファイルのディレクトリ
  • ページサイズ: : ----page-size ※任意
    ※ 1回の処理で読み込む CSVファイルの行数
def batch_main():
    logging.basicConfig(format='%(asctime)s %(levelname)s %(message)s',
                        datefmt='%Y-%m-%d %H:%M:%S')
    app_logger = logging.getLogger(__name__)
    app_logger.setLevel(level=logging.DEBUG)

    parser = argparse.ArgumentParser()
    parser.add_argument("--csv-file", type=str, required=True,
                        help="Source csv file path.")
    # 変換後のCSVファイル保存先 ※未指定なら --csv-file のディレクトリ
    parser.add_argument("--output-dir", type=str,
                        help="Output csv file directory.")
    parser.add_argument("--page-size", type=int,
                        default=5000,
                        help="CSV row count size.")
    args: argparse.Namespace = parser.parse_args()
    csv_file: str = args.csv_file
    output_dir: Optional[str] = args.output_dir
    app_logger.info(f"csv_file: {csv_file}")
3-1-4. (2) CSVファイルパスチェック処理
  • CSVファイルパスの存在チェック
    ※存在しない場合はエラー終了
  • 変換処理後のCSVファイ名を生成する
  • 出力先ディレクトリの設定
    ※未指定ならCSVファイルパスのディレクトリを設定する
    # 元のCSVファイルチェック
    f_path: str
    if csv_file.find("~/") == 0:
        f_path = os.path.expanduser(csv_file)
    else:
        f_path = csv_file
    if not os.path.exists(f_path):
        app_logger.error(f"FileNotFound: {f_path}")
        exit(1)

    # 出力ファイル名: オリジナル名にアンダースコア修飾する(xxx_cidr.csv)
    file_name: str = os.path.basename(f_path)
    names: Tuple[str, str] = os.path.splitext(file_name)
    out_name: str = f"{names[0]}_cidr{names[1]}"
    # 出力先
    output_file: str
    if output_dir is None:
        # 出力先が未指定ならソースCSVのディレクトリ
        dir_name: str = os.path.dirname(f_path)
        output_file = os.path.join(dir_name, out_name)
    else:
        # 指定ディレクトリ
        output_file = os.path.join(os.path.expanduser(output_dir), out_name)
3-1-4.(3) ネットワークアドレス変換処理とファイル出力

処理手順

  • CSVファイルの1回あたりのページサイズ (読み込み行数) を設定する
  • CSVファイルの読み込みファイルオブジェクトの取得
  • 出力ファイルオブジェクトの取得
  • ヘッダー処理
    • CSVファイルのヘッダーを1行分読み飛ばす
    • 変換後CSVファイルにヘッダーを1行出力
  • [ループ処理] CSVファイルのデータをページサイズ分読み込む
    • ネットワークアドレス変換後のCSV出力リストを取得する
    • CSV出力リストをCSV出力ファイルオブジェクトに出力
      CSV出力ファイルに書き出す (flush)
  • ファイルオブジェクトのクリーンアップ
    • 入力ファイルオブジェクトのクローズ
    • 出力ファイルオブジェクトのクローズ
    # ページサイズ
    page_size: int = args.page_size

    # ファイル操作オブジェクトを開く
    fp_reader = get_file_reader(f_path)
    app_logger.debug(fp_reader)
    fp_writer = get_file_writer(output_file)
    app_logger.debug(fp_writer)
    try:
        csv_reader = csv.reader(fp_reader, delimiter=',', dialect='unix')
        # ソースCSVのヘッダースキップ
        next(csv_reader)
        # 出力CSVのヘッダー出力
        fp_writer.write(OUT_CSV_HEADER)
        fp_writer.flush()

        # データ読み込み
        input_total: int = 0
        output_total: int = 0
        csv_lines: List[List]
        # RIRデータのCSVは約20万レコード以上あるので指定さりたページサイズ(行数)でファイルに保存
        for csv_lines in paginate(csv_reader, page_size=page_size):
            input_total += len(csv_lines)
            out_lines: List[str] = ip_start_to_cidr_network(csv_lines)
            output_total += len(out_lines)
            app_logger.info(
                f"{input_total}, input_lines: {input_total}, output_lines:{output_total}"
            )
            fp_writer.writelines(out_lines)
            fp_writer.flush()

    except Exception as ex:
        app_logger.error(ex)
        exit(1)
    finally:
        if fp_reader is not None:
            fp_reader.close()
        if fp_writer is not None:
            fp_writer.close()

    app_logger.info(f"Saved {output_file}")


if __name__ == '__main__':
    batch_main()

3-2. python ソース

ソースコード全体を再掲します。

RirIpv4Allocated_to_cidr_csv.py
import argparse
import csv
import logging
import os
import typing
from typing import Iterator, List, Optional, Tuple

from ipaddress import (
    ip_address, summarize_address_range, IPv4Address, IPv4Network
)


"""
RIR ipv4 allocated CSV file to cidr network.
"""

OUT_CSV_HEADER: str = '"network_addr","country_code","allocated_date","registry_id"\n'


def paginate(reader: Iterator, page_size: int = 5000):
    pages: List = []
    it: Iterator = iter(reader)
    while True:
        try:
            for i in range(page_size):
                pages.append(next(it))
            yield pages
            pages = []
        except StopIteration:
            if pages:
                yield pages
            return


def get_file_reader(file_name: str):
    fp = open(file_name, mode='r')
    return fp


def get_file_writer(file_name: str):
    fp = open(file_name, mode='w')
    return fp


def ip_start_to_cidr_network(csv_lines: List[List]) -> List[str]:
    @typing.no_type_check
    def get_cidr_list(ip_start: str,
                      ip_count: int) -> List[str]:
        addr_first: IPv4Address = ip_address(ip_start)
        addr_last: IPv4Address = addr_first + ip_count - 1
        cidr_ite: Iterator[IPv4Network] = summarize_address_range(addr_first, addr_last)
        return [str(network) for network in cidr_ite]

    result: List[str] = []
    for csv_line in csv_lines:
        cc: str = csv_line[2]
        alloc_date: str = csv_line[3]
        registry_id: int = int(csv_line[4])
        cidr_list: List[str] = get_cidr_list(csv_line[0], int(csv_line[1]))
        for cidr in cidr_list:
            out_line: str = f'"{cidr}","{cc}","{alloc_date}",{registry_id}\n'
            result.append(out_line)
    return result


def batch_main():
    logging.basicConfig(format='%(asctime)s %(levelname)s %(message)s',
                        datefmt='%Y-%m-%d %H:%M:%S')
    app_logger = logging.getLogger(__name__)
    app_logger.setLevel(level=logging.DEBUG)

    parser = argparse.ArgumentParser()
    parser.add_argument("--csv-file", type=str, required=True,
                        help="Source csv file path.")
    # 変換後のCSVファイル保存先 ※未指定なら --csv-file のディレクトリ
    parser.add_argument("--output-dir", type=str,
                        help="Output csv file directory.")
    parser.add_argument("--page-size", type=int,
                        default=5000,
                        help="CSV row count size.")
    args: argparse.Namespace = parser.parse_args()
    csv_file: str = args.csv_file
    output_dir: Optional[str] = args.output_dir
    app_logger.info(f"csv_file: {csv_file}")
    # 元のCSVファイルチェック
    f_path: str
    if csv_file.find("~/") == 0:
        f_path = os.path.expanduser(csv_file)
    else:
        f_path = csv_file
    if not os.path.exists(f_path):
        app_logger.error(f"FileNotFound: {f_path}")
        exit(1)

    # 出力ファイル名: オリジナル名にアンダースコア修飾する(xxx_cidr.csv)
    file_name: str = os.path.basename(f_path)
    names: Tuple[str, str] = os.path.splitext(file_name)
    out_name: str = f"{names[0]}_cidr{names[1]}"
    # 出力先
    output_file: str
    if output_dir is None:
        # 出力先が未指定ならソースCSVのディレクトリ
        dir_name: str = os.path.dirname(f_path)
        output_file = os.path.join(dir_name, out_name)
    else:
        # 指定ディレクトリ
        output_file = os.path.join(os.path.expanduser(output_dir), out_name)

    # ページサイズ
    page_size: int = args.page_size

    # ファイル操作オブジェクトを開く
    fp_reader = get_file_reader(f_path)
    app_logger.debug(fp_reader)
    fp_writer = get_file_writer(output_file)
    app_logger.debug(fp_writer)
    try:
        csv_reader = csv.reader(fp_reader, delimiter=',', dialect='unix')
        # ソースCSVのヘッダースキップ
        next(csv_reader)
        # 出力CSVのヘッダー出力
        fp_writer.write(OUT_CSV_HEADER)
        fp_writer.flush()

        # データ読み込み
        input_total: int = 0
        output_total: int = 0
        csv_lines: List[List]
        # RIRデータのCSVは約20万レコード以上あるので指定さりたページサイズ(行数)でファイルに保存
        for csv_lines in paginate(csv_reader, page_size=page_size):
            input_total += len(csv_lines)
            out_lines: List[str] = ip_start_to_cidr_network(csv_lines)
            output_total += len(out_lines)
            app_logger.info(
                f"{input_total}, input_lines: {input_total}, output_lines:{output_total}"
            )
            fp_writer.writelines(out_lines)
            fp_writer.flush()

    except Exception as ex:
        app_logger.error(ex)
        exit(1)
    finally:
        if fp_reader is not None:
            fp_reader.close()
        if fp_writer is not None:
            fp_writer.close()

    app_logger.info(f"Saved {output_file}")


if __name__ == '__main__':
    batch_main()

4. スクリプト実行

オリジナルのCSVファイルの内容 (抜粋)

~/Documents/public/RIR/csv/ipv4-all-2024-08-30.csv
"ip_start","ip_count","country_code","allocated_date","registry_id"
"1.0.1.0",256,"CN","20110414",1
"1.0.2.0",512,"CN","20110414",1
"1.0.4.0",1024,"AU","20110412",1
"1.0.8.0",2048,"CN","20110412",1
"1.0.16.0",4096,"JP","20110412",1
"1.0.32.0",8192,"CN","20110412",1
"1.0.64.0",16384,"JP","20110412",1
"1.0.128.0",32768,"TH","20110408",1
"1.1.0.0",256,"CN","20110414",1
"1.1.2.0",512,"CN","20110414",1
"1.1.4.0",1024,"CN","20110414",1
"1.1.8.0",256,"CN","20110412",1
"1.1.9.0",256,"CN","20110412",1
"1.1.10.0",512,"CN","20110412",1
"1.1.12.0",1024,"CN","20110412",1
"1.1.16.0",4096,"CN","20110412",1
"1.1.32.0",8192,"CN","20110412",1
"1.1.64.0",16384,"JP","20110412",1
"1.1.128.0",32768,"TH","20110408",1
"1.2.0.0",512,"CN","20110414",1
...以下省略...

4-1. pythonスクリプト実行

ページサイズ 20000 (行) で実行
※パスはユーザーホームのプロジェクトディレクトリですが省略しています。

$ . py_venv/py_psycopg2/bin/activate
(py_psycopg2) $ python RirIpv4Allocated_to_cidr_csv.py \
> --csv-file ~/Documents/public/RIR/csv/ipv4-all-2024-10-16.csv \
> --output-dir ~/data/sql/qiita_example/mainte2/csv/RIR \
> --page-size 20000
2024-10-17 15:08:08 INFO csv_file: /home/qiita/Documents/public/RIR/csv/ipv4-all-2024-08-30.csv
2024-10-17 15:08:08 DEBUG <_io.TextIOWrapper name='/home/qiita/Documents/public/RIR/csv/ipv4-all-2024-08-30.csv' mode='r' encoding='UTF-8'>
2024-10-17 15:08:08 DEBUG <_io.TextIOWrapper name='/home/qiita/data/sql/qiita_example/mainte2/csv/RIR/ipv4-all-2024-08-30_cidr.csv' mode='w' encoding='UTF-8'>
2024-10-17 15:08:08 INFO 20000, input_lines: 20000, output_lines:20000
2024-10-17 15:08:09 INFO 40000, input_lines: 40000, output_lines:40001
2024-10-17 15:08:09 INFO 60000, input_lines: 60000, output_lines:60008
2024-10-17 15:08:10 INFO 80000, input_lines: 80000, output_lines:80008
2024-10-17 15:08:10 INFO 100000, input_lines: 100000, output_lines:100008
2024-10-17 15:08:10 INFO 120000, input_lines: 120000, output_lines:120008
2024-10-17 15:08:11 INFO 140000, input_lines: 140000, output_lines:140008
2024-10-17 15:08:11 INFO 160000, input_lines: 160000, output_lines:160015
2024-10-17 15:08:12 INFO 180000, input_lines: 180000, output_lines:180017
2024-10-17 15:08:12 INFO 200000, input_lines: 200000, output_lines:200018
2024-10-17 15:08:12 INFO 201804, input_lines: 201804, output_lines:201822
2024-10-17 15:08:12 INFO Saved /home/qiita/data/sql/qiita_example/mainte2/csv/RIR/ipv4-all-2024-08-30_cidr.csv

変換後のCSVファイルの内容 (抜粋)

~/data/sql/qiita_example/mainte2/csv/RIR/ipv4-all-2024-08-30_cidr.csv
"network_addr","country_code","allocated_date","registry_id"
"1.0.1.0/24","CN","20110414",1
"1.0.2.0/23","CN","20110414",1
"1.0.4.0/22","AU","20110412",1
"1.0.8.0/21","CN","20110412",1
"1.0.16.0/20","JP","20110412",1
"1.0.32.0/19","CN","20110412",1
"1.0.64.0/18","JP","20110412",1
"1.0.128.0/17","TH","20110408",1
"1.1.0.0/24","CN","20110414",1
"1.1.2.0/23","CN","20110414",1
"1.1.4.0/22","CN","20110414",1
"1.1.8.0/24","CN","20110412",1
"1.1.9.0/24","CN","20110412",1
"1.1.10.0/23","CN","20110412",1
"1.1.12.0/22","CN","20110412",1
"1.1.16.0/20","CN","20110412",1
"1.1.32.0/19","CN","20110412",1
"1.1.64.0/18","JP","20110412",1
"1.1.128.0/17","TH","20110408",1
"1.2.0.0/23","CN","20110414",1
...以下省略...

4-2. csvインポート

4-2-1. テーブル生成クエリ

14_create_rir_ipv4_allocated_cidr.sql
DROP TABLE IF EXISTS mainte2.RIR_ipv4_allocated_cidr CASCADE;
DROP TABLE IF EXISTS mainte2.RIR_registory_mst;
-- レジストリ名テーブル
-- name: {afrinic,apnic,arin,iana,lacnic,ripencc}
-- https://www.apnic.net/about-apnic/corporate-documents/documents/
--     resource-guidelines/rir-statistics-exchange-format/
CREATE TABLE mainte2.RIR_registory_mst(
   id SMALLINT PRIMARY KEY,
   name VARCHAR(8) NOT NULL
);

INSERT INTO mainte2.RIR_registory_mst(id, name) VALUES 
   (1,'apnic')
  ,(2,'afrinic')
  ,(3,'arin')
  ,(4,'lacnic')
  ,(5,'ripencc')
  ,(6,'iana');

-- APNICで公開している各国に割り当てているIPアドレス情報からipv4アドレスのみを抽出したマスタテーブル
-- (変更) region -> registry
CREATE TABLE mainte2.RIR_ipv4_allocated_cidr(
   network_addr CIDR NOT NULL,
   country_code CHAR(2) NOT NULL,
   allocated_date DATE NOT NULL,
   registry_id SMALLINT NOT NULL
);

ALTER TABLE mainte2.RIR_ipv4_allocated_cidr ADD CONSTRAINT pk_RIR_ipv4_allocated_cidr
  PRIMARY KEY (network_addr);
ALTER TABLE mainte2.RIR_ipv4_allocated_cidr ADD CONSTRAINT fk_RIR_ipv4_allocated_cidr_registry
  FOREIGN KEY (registry_id) REFERENCES mainte2.RIR_registory_mst (id);

ALTER TABLE mainte2.RIR_registory_mst OWNER TO developer;
ALTER TABLE mainte2.RIR_ipv4_allocated_cidr OWNER TO developer;

dockerコンテナ内の psql コマンドでテーブル生成クエリーを実行する
※パスはコンテナ環境に依存するため省略していますが通常は絶対パスを指定します。

$ docker exec -it postgres-qiita bash
c1d55b8abc42:/# psql -Udeveloper qiita_exampledb < 14_create_rir_ipv4_allocated_cidr.sql
NOTICE:  table "RIR_ipv4_allocated_cidr" does not exist, skipping
DROP TABLE
NOTICE:  table "rir_registory_mst" does not exist, skipping
DROP TABLE
CREATE TABLE
INSERT 0 6
CREATE TABLE
ALTER TABLE
ALTER TABLE
ALTER TABLE
ALTER TABLE

4-2-2. CSVファイルインポートスクリプト

実行中のdockerコンテナ内の psql コマンドでクエリー実行

CSVファイルのレコードが20万件を超えるためインポート前に PK 制約をドロップします。

  • PK 制約ドロップ
  • CSVファイルのインポート
  • PK 制約を追加
import_from_2_rir_ipv4_cidr_csv.sh
#!/bin/bash

# https://stackoverflow.com/questions/34736762/script-to-automat-import-of-csv-into-postgresql
#   Script to automat import of CSV into PostgreSQL

# PK制約をドロップ
psql -Udeveloper -d qiita_exampledb -c \
"ALTER TABLE mainte2.RIR_ipv4_allocated_cidr
 DROP CONSTRAINT pk_RIR_ipv4_allocated_cidr;"


sleep 1

# データインポート
psql -Udeveloper -d qiita_exampledb -c \
"\copy mainte2.RIR_ipv4_allocated_cidr FROM 
'/home/qiita/data/sql/qiita_example/mainte2/csv/RIR/${1}' 
DELIMITER ',' CSV HEADER;"

sleep 2

# PK制約を戻す
psql -Udeveloper -d qiita_exampledb -c \
"ALTER TABLE mainte2.RIR_ipv4_allocated_cidr
 ADD CONSTRAINT pk_RIR_ipv4_allocated_cidr PRIMARY KEY (network_addr);"

インポートシェルスクリプトを実行
※シェルスクリプトパスはコンテナ環境に依存するため省略していま。

c1d55b8abc42:/# ./import_from_2_rir_ipv4_cidr_csv.sh ipv4-all-2024-08-30_cidr.csv
ALTER TABLE
COPY 201822
ALTER TABLE

4-3. 国コード・ネットワークアドレス検索

4-3-1. PostgreSQL接続情報

conf/db_conn.json
{
  "host": "{hostname}",
  "port": "5432",
  "database": "qiita_exampledb",
  "user": "developer",
  "password": "developerpassword"
}

4-3-2. python スクリプト

投稿記事「PostgreSQL 便利なネットワークアドレス型を活用する」で、検索クエリーについて解説しているのでここではソースのみを示します。

TestDetectCountryCode_in_rir_cidr.py
import argparse
import json
import logging
import os
import socket

from logging import Logger
from typing import Optional, Tuple

import psycopg2
from psycopg2.extensions import connection, cursor

"""
Qiita投稿用スクリプト
IPアドレスのネットワークアドレスと国コードをRIRデータテーブルから検索する

[テーブル名] mainte2.RIR_ipv4_allocated_cidr
"""

# データベース接続情報
DB_CONF_FILE: str = os.path.join("conf", "db_conn.json")


# PostgreSQLデータベース接続生成クラス
class PgDatabase(object):
    def __init__(self, configfile,
                 hostname: Optional[str] = None,
                 logger: Optional[logging.Logger] = None):
        self.logger = logger
        with open(configfile, 'r') as fp:
            db_conf = json.load(fp)
            if hostname is None:
                hostname = socket.gethostname()
            db_conf["host"] = db_conf["host"].format(hostname=hostname)
        self.conn = psycopg2.connect(**db_conf)
        if self.logger is not None:
            self.logger.debug(self.conn)

    def get_connection(self) -> connection:
        return self.conn

    def rollback(self) -> None:
        if self.conn is not None:
            self.conn.rollback()

    def commit(self) -> None:
        if self.conn is not None:
            self.conn.commit()

    def close(self) -> None:
        if self.conn is not None:
            if self.logger is not None:
                self.logger.debug(f"Close {self.conn}")
            self.conn.close()


# ターゲットIPのネットワークがネットワークアドレス国コードテーブルに存在するかチェック
def target_ip_include_rir_table(
        conn: connection,
        target_ip: str,
        logger: Optional[Logger] = None) -> Optional[Tuple[str, str]]:
    if logger is not None:
        logger.debug(f"target_ip: {target_ip}")

    result: Optional[Tuple[str, str]]
    try:
        cur: cursor
        with conn.cursor() as cur:
            cur.execute("""
SELECT
  network_addr, country_code
FROM
  mainte2.RIR_ipv4_allocated_cidr
WHERE
  inet %(target_ip)s << network_addr""",
                        ({'target_ip': target_ip}))
            row: Optional[Tuple[str, str]] = cur.fetchone()
            if logger is not None:
                if cur.query is not None:
                    logger.debug(f"{cur.query.decode('utf-8')}")
                logger.debug(f"row: {row}")
            result = row
        return result
    except (Exception, psycopg2.DatabaseError) as err:
        raise err


def batch_main():
    logging.basicConfig(format='%(levelname)s %(message)s')
    app_logger = logging.getLogger(__name__)
    app_logger.setLevel(level=logging.DEBUG)

    parser = argparse.ArgumentParser()
    parser.add_argument("--target-ip", required=True, type=str,
                        help="IP address.")
    parser.add_argument("--enable-debug", action="store_true",
                        help="Enable logger debug out.")
    args: argparse.Namespace = parser.parse_args()
    target_ip: str = args.target_ip
    enable_debug: bool = args.enable_debug

    db: Optional[PgDatabase] = None
    try:
        db = PgDatabase(DB_CONF_FILE, logger=None)
        conn: connection = db.get_connection()
        # ネットワークアドレス国コードテーブル検索
        find_rec: Optional[Tuple[str, str]] = target_ip_include_rir_table(
            conn, target_ip, logger=app_logger if enable_debug else None
        )
        if find_rec is not None:
            network: str = find_rec[0]
            cc: str = find_rec[1]
            app_logger.info(
                f'Find {target_ip} in RIR_ipv4_allocated_cidr("{network}"'
                f', country_code: "{cc}")'
            )
        else:
            app_logger.info(f"{target_ip} is not match in tables.")
    except psycopg2.Error as db_err:
        app_logger.error(db_err)
        exit(1)
    except Exception as err:
        app_logger.error(err)
        exit(1)
    finally:
        if db is not None:
            db.close()


if __name__ == '__main__':
    batch_main()

4-3-3. python スクリプト実行

スクリプトパスは省略しています。

(1) ターゲットIPアドレスを含むネットワークアドレスレコード有り
--target-ip 125.124.213.77 --enable-debug

(py_psycopg2) $ python TestDetectCountryCode_in_rir_cidr.py \
> --target-ip 125.124.213.77 --enable-debug
DEBUG target_ip: 125.124.213.77
DEBUG 
SELECT
  network_addr, country_code
FROM
  mainte2.RIR_ipv4_allocated_cidr
WHERE
  inet '125.124.213.77' << network_addr
DEBUG row: ('125.112.0.0/12', 'CN')
INFO Find 125.124.213.77 in RIR_ipv4_allocated_cidr("125.112.0.0/12", country_code: "CN")

(2) ターゲットIPアドレスを含むネットワークアドレスレコード無し
--target-ip 103.253.175.77 --enable-debug

(py_psycopg2) $ python TestDetectCountryCode_in_rir_cidr.py --target-ip 103.253.175.77 --enable-debug
DEBUG target_ip: 103.253.175.77
DEBUG 
SELECT
  network_addr, country_code
FROM
  mainte2.RIR_ipv4_allocated_cidr
WHERE
  inet '103.253.175.77' << network_addr
DEBUG row: None
INFO 103.253.175.77 is not match in tables.

5. 最後に

過去の投稿「不正アクセスしてきたホストの国コードを知ってセキュリティ対策に活用する」で紹介した検索処理は地域インターネットレジストリデータのIPv4アドレス情報 (開始IPアドレス、IP個数) をそのままテーブルに登録していたため国コード・ネットワークアドレス検索処理がかなり複雑になっていました。

今回紹介した 変換用の python スクリプトで IPv4アドレス情報を CIDR 形式のネットワークアドレスに変換したCSVファイルをテーブルにインポートすることで検索処理が簡単になりました。

記事で紹介したスクリプトとサンプルCSVフアイルを下記GihtHubで公開しています。
GitHub pipito-yuko/ qiita-posts / python / RirIpv4Allocated_cidr

RirIpv4Allocated_cidr/
└── src
    ├── docker
    │   ├── Dockerfile
    │   ├── docker-compose.yml
    │   └── initdb
    │       ├── 10_createdb.sql
    │       └── 14_create_rir_ipv4_allocated_cidr.sql
    ├── python_project
    │   ├── RirIpv4Allocated_to_cidr_csv.py       # IPアドレス情報をネットワークアドレス(CIDR形式)に変換
    │   ├── TestDetectCountryCode_in_rir_cidr.py  # ネットワークアドレス・国コード検索
    │   ├── conf
    │   │   └── db_conn.json
    │   ├── csv
    │   │   └── ipv4-all-2024-10-16.csv           # 2024-10-16時点のRIRデータのオリジナルCSVファイル (変換前)
    │   └── mypy.ini
    ├── requirements.txt
    └── sql
        └── scripts
            └── import_from_2_rir_ipv4_cidr_csv.sh # 変換後のCSVインポートシェルスクリプト
0
1
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?