ネットワークアドレスへの変換方法
前回投稿で PostgreSQLのネットワークアドレス型を活用すると不正アクセスしたホストの国コード検索が簡単になることを紹介いたしました。
また下記の投稿では、地域インターネットレジストリ(RIR)データをダウンロードしテーブルインポート用の CSV ファイルを作成するシェルスクリプト も紹介しました。
本記事では地域インターネットレジストリデータから作成したデータベースインポート用のCSVファイルの開始IPアドレスとIP件数からネットワークアドレス (cidr形式) に変換した CSV ファイルを出力する python スクリプトを紹介いたします。
地域インターネットレジストリ (APNIC) よりダウンロードしたファイルの中から IPv4 アドレスの割り当て済み (allocated) のものを抽出したデータの抜粋は以下のようになっていました。※2024-08-30日時点
apnic|CN|ipv4|1.0.1.0|256|20110414|allocated
apnic|CN|ipv4|1.0.2.0|512|20110414|allocated
apnic|AU|ipv4|1.0.4.0|1024|20110412|allocated
apnic|CN|ipv4|1.0.8.0|2048|20110412|allocated
apnic|JP|ipv4|1.0.16.0|4096|20110412|allocated
...以下省略...
記事「不正アクセスしてきたホストの国コードを知ってセキュリィティに活用する」で紹介した データベースへのインポート用に変換した CSV ファイルの抜粋を下記に示します。
"ip_start","ip_count","country_code","allocated_date","registry_id"
"1.0.1.0",256,"CN","20110414",1
"1.0.2.0",512,"CN","20110414",1
"1.0.4.0",1024,"AU","20110412",1
"1.0.8.0",2048,"CN","20110412",1
"1.0.16.0",4096,"JP","20110412",1
...以下省略...
今回紹介するpythonスクリプトが出力するCSVファイルの内容は以下の通りで、先頭列 ("network_addr") のように変換することで、PostgreSQLのネットワークアドレス型 (CIDR型) としてインポート可能になります。
"network_addr","country_code","allocated_date","registry_id"
"1.0.1.0/24","CN","20110414",1
"1.0.2.0/23","CN","20110414",1
"1.0.4.0/22","AU","20110412",1
"1.0.8.0/21","CN","20110412",1
"1.0.16.0/20","JP","20110412",1
...以下省略...
1. 実行環境
- OS: Ubuntu Desktop 22.04
- データベース
Dockerコンテナ内で稼働する PostgreSQL-16 - python 3.10.12
スクリプト専用の仮想環境を作成し下記ライブラリをインストール
pip install psycopg2-binary
- データベース
2. データベース
以降 本投稿用のデータベースを qiita_exampledb 、ユーザ developer として説明します。
2-1. テーブル定義
テーブル名 RIR_ipv4_allocated_cidr
, network_addr
列を PostgreSQL の CIDR型で保持する。
CREATE TABLE mainte2.RIR_ipv4_allocated_cidr(
network_addr CIDR NOT NULL,
country_code CHAR(2) NOT NULL,
allocated_date DATE NOT NULL,
registry_id SMALLINT NOT NULL
);
記事「不正アクセスしてきたホストの国コードを知ってセキュリィティに活用する」で紹介したテーブル定義は以下のようになっていました。
CREATE TABLE mainte2.RIR_ipv4_allocated(
ip_start VARCHAR(15) NOT NULL,
ip_count INTEGER NOT NULL,
country_code CHAR(2) NOT NULL,
allocated_date DATE NOT NULL,
registry_id SMALLINT NOT NULL
);
上記 RIR_ipv4_allocated
テーブルで、2024-08-30日時点の 地域インターネットレジストリデータの ipv4 アドレスの IP 個数を集計したものを下記に示します。
1e2b0f7bc9c8:/# echo "SELECT
ip_count, COUNT(ip_count)
FROM
mainte2.RIR_ipv4_allocated
GROUP BY ip_count ORDER BY ip_count DESC;" | psql -Udeveloper qiita_exampledb
ip_count | count
----------+-------
16777216 | 19
8388608 | 12
4194304 | 58
2097152 | 140
1048576 | 350
524288 | 622
393216 | 1
262144 | 1331
131072 | 2019
65536 | 10520
33792 | 1
32768 | 3742
16384 | 5861
13312 | 1
8192 | 11972
7168 | 1
4096 | 15326
2560 | 1
2304 | 1
2048 | 15611
1536 | 1
1280 | 1
1024 | 62612
512 | 21099
256 | 50495
128 | 5
64 | 2
(27 rows)
上記の ip_count列で 2^X (X=6 から ) になっていない IP 個数が割り当てられているために単純に変換することができません。
3. python スクリプト
3-0. 単純に変換できないネットワークアドレス
IP個数が 2^X にならないデータを2つ示します。
"164.146.0.0",393216,"ZA","19930312",2
"87.116.83.0",2304,"BG","20050913",5
ネットワークアドレスを python コンソール上で計算してみます。
(1) 開始IP="164.146.0.0", IP個数=393216
>>> from ipaddress import ip_address, summarize_address_range
>>> ip_start="164.146.0.0"
>>> ip_count=393216
>>> addr_start=ip_address(ip_start)
>>> addr_start
IPv4Address('164.146.0.0')
>>> addr_bcast=addr_start+ip_count-1
>>> addr_bcast
IPv4Address('164.151.255.255')
>>> cidr_iter=summarize_address_range(addr_start, addr_bcast)
>>> cidr_data=[cidr for cidr in cidr_iter]
>>> cidr_data
[IPv4Network('164.146.0.0/15'), IPv4Network('164.148.0.0/14')]
(2) 開始IP="87.116.83.0", IP個数=2304
>>> ip_start="87.116.83.0"
>>> ip_count=2304
>>> addr_start=ip_address(ip_start)
>>> addr_start
IPv4Address('87.116.83.0')
>>> addr_bcast=addr_start + ip_count -1
>>> addr_bcast
IPv4Address('87.116.91.255')
>>> cidr_iter=summarize_address_range(addr_start, addr_bcast)
>>> cidr_data = [cidr for cidr in cidr_iter]
>>> cidr_data
[IPv4Network('87.116.83.0/24'), IPv4Network('87.116.84.0/22'), IPv4Network('87.116.88.0/22')]
IP個数が 2^X にならない開始IPアドレスのネットワークアドレスは複数になります。
※ IP個数が 2^X になる場合は、ネットワークアドレスは常に1つです。
[例] 64 (/26), 128 (/25), 256 (/24), 512 (/23), ...,
3-1. 関数定義
モジュールのインポート
import argparse
import csv
import logging
import os
import typing
from typing import Iterator, List, Optional, Tuple
from ipaddress import (
ip_address, summarize_address_range, IPv4Address, IPv4Network
)
3-1-1. ページング
地域インターネットレジストリデータはレコード件数が20万件を超えるため下記のようなジェネレーター関数で CSVファイルをページ単位で読み込みします。
デフォルトのページサイズは 5000 で、yield pages
で CSVファイル 5000 行分のリストを生成します。
def paginate(reader: Iterator, page_size: int = 5000):
pages: List = []
it: Iterator = iter(reader)
while True:
try:
for i in range(page_size):
pages.append(next(it))
yield pages
pages = []
except StopIteration:
if pages:
yield pages
return
3-1-2. ファイル操作処理
3-1-2 (1) 読み込み用ファイルオブジェクト取得
読み込みモードでファイルをオープンし、ファイルオブジェクトを取得します。
def get_file_reader(file_name: str):
fp = open(file_name, mode='r')
return fp
3-1-2 (2) 書き込み用ファイルオブジェクト取得
書き込みモードでファイルをオープンし、ファイルオブジェクトを取得します。
def get_file_writer(file_name: str):
fp = open(file_name, mode='w')
return fp
3-1-3. ネットワークアドレス変換処理
ジェネレーターが生成するリスト (ページ単位) の変換処理を実行します。
内部関数 get_cidr_list
は開始IPアドレスとIP個数からネットワークアドレスのリストを生成します。
def ip_start_to_cidr_network(csv_lines: List[List]) -> List[str]:
@typing.no_type_check
def get_cidr_list(ip_start: str,
ip_count: int) -> List[str]:
addr_first: IPv4Address = ip_address(ip_start)
addr_last: IPv4Address = addr_first + ip_count - 1
cidr_ite: Iterator[IPv4Network] = summarize_address_range(addr_first, addr_last)
return [str(network) for network in cidr_ite]
result: List[str] = []
for csv_line in csv_lines:
cc: str = csv_line[2]
alloc_date: str = csv_line[3]
registry_id: int = int(csv_line[4])
cidr_list: List[str] = get_cidr_list(csv_line[0], int(csv_line[1]))
for cidr in cidr_list:
out_line: str = f'"{cidr}","{cc}","{alloc_date}",{registry_id}\n'
result.append(out_line)
return result
上記関数の引数 csv_lines
と変換結果 result
のDEUBG時の内容を下記に示します。
csv_lines = {list: 5} [['1.0.1.0', '256', 'CN', '20110414', '1'], ['1.0.2.0', '512', 'CN', '20110414', '1'], ['1.0.4.0', '1024', 'AU', '20110412', '1'], ['1.0.8.0', '2048', 'CN', '20110412', '1'], ['1.0.16.0', '4096', 'JP', '20110412', '1']]
0 = {list: 5} ['1.0.1.0', '256', 'CN', '20110414', '1']
1 = {list: 5} ['1.0.2.0', '512', 'CN', '20110414', '1']
2 = {list: 5} ['1.0.4.0', '1024', 'AU', '20110412', '1']
3 = {list: 5} ['1.0.8.0', '2048', 'CN', '20110412', '1']
4 = {list: 5} ['1.0.16.0', '4096', 'JP', '20110412', '1']
__len__ = {int} 5
result = {list: 5} ['"1.0.1.0/24","CN","20110414",1\n', '"1.0.2.0/23","CN","20110414",1\n', '"1.0.4.0/22","AU","20110412",1\n', '"1.0.8.0/21","CN","20110412",1\n', '"1.0.16.0/20","JP","20110412",1\n']
0 = {str} '"1.0.1.0/24","CN","20110414",1\n'
1 = {str} '"1.0.2.0/23","CN","20110414",1\n'
2 = {str} '"1.0.4.0/22","AU","20110412",1\n'
3 = {str} '"1.0.8.0/21","CN","20110412",1\n'
4 = {str} '"1.0.16.0/20","JP","20110412",1\n'
__len__ = {int} 5
3-1-4. 変換処理メイン関数
3-1-4. (1) 入力パラメータ取得処理
- オリジナルのCSVファイルパス:
--csv-file
※必須 - 変換処理後のCSVファイル出力ディレクトリ:
--output-dir
※任意
※未指定の場合は、オリジナルのCSVファイルのディレクトリ - ページサイズ: :
----page-size
※任意
※ 1回の処理で読み込む CSVファイルの行数
def batch_main():
logging.basicConfig(format='%(asctime)s %(levelname)s %(message)s',
datefmt='%Y-%m-%d %H:%M:%S')
app_logger = logging.getLogger(__name__)
app_logger.setLevel(level=logging.DEBUG)
parser = argparse.ArgumentParser()
parser.add_argument("--csv-file", type=str, required=True,
help="Source csv file path.")
# 変換後のCSVファイル保存先 ※未指定なら --csv-file のディレクトリ
parser.add_argument("--output-dir", type=str,
help="Output csv file directory.")
parser.add_argument("--page-size", type=int,
default=5000,
help="CSV row count size.")
args: argparse.Namespace = parser.parse_args()
csv_file: str = args.csv_file
output_dir: Optional[str] = args.output_dir
app_logger.info(f"csv_file: {csv_file}")
3-1-4. (2) CSVファイルパスチェック処理
- CSVファイルパスの存在チェック
※存在しない場合はエラー終了 - 変換処理後のCSVファイ名を生成する
- 出力先ディレクトリの設定
※未指定ならCSVファイルパスのディレクトリを設定する
# 元のCSVファイルチェック
f_path: str
if csv_file.find("~/") == 0:
f_path = os.path.expanduser(csv_file)
else:
f_path = csv_file
if not os.path.exists(f_path):
app_logger.error(f"FileNotFound: {f_path}")
exit(1)
# 出力ファイル名: オリジナル名にアンダースコア修飾する(xxx_cidr.csv)
file_name: str = os.path.basename(f_path)
names: Tuple[str, str] = os.path.splitext(file_name)
out_name: str = f"{names[0]}_cidr{names[1]}"
# 出力先
output_file: str
if output_dir is None:
# 出力先が未指定ならソースCSVのディレクトリ
dir_name: str = os.path.dirname(f_path)
output_file = os.path.join(dir_name, out_name)
else:
# 指定ディレクトリ
output_file = os.path.join(os.path.expanduser(output_dir), out_name)
3-1-4.(3) ネットワークアドレス変換処理とファイル出力
処理手順
- CSVファイルの1回あたりのページサイズ (読み込み行数) を設定する
- CSVファイルの読み込みファイルオブジェクトの取得
- 出力ファイルオブジェクトの取得
- ヘッダー処理
- CSVファイルのヘッダーを1行分読み飛ばす
- 変換後CSVファイルにヘッダーを1行出力
- [ループ処理] CSVファイルのデータをページサイズ分読み込む
- ネットワークアドレス変換後のCSV出力リストを取得する
- CSV出力リストをCSV出力ファイルオブジェクトに出力
CSV出力ファイルに書き出す (flush)
- ファイルオブジェクトのクリーンアップ
- 入力ファイルオブジェクトのクローズ
- 出力ファイルオブジェクトのクローズ
# ページサイズ
page_size: int = args.page_size
# ファイル操作オブジェクトを開く
fp_reader = get_file_reader(f_path)
app_logger.debug(fp_reader)
fp_writer = get_file_writer(output_file)
app_logger.debug(fp_writer)
try:
csv_reader = csv.reader(fp_reader, delimiter=',', dialect='unix')
# ソースCSVのヘッダースキップ
next(csv_reader)
# 出力CSVのヘッダー出力
fp_writer.write(OUT_CSV_HEADER)
fp_writer.flush()
# データ読み込み
input_total: int = 0
output_total: int = 0
csv_lines: List[List]
# RIRデータのCSVは約20万レコード以上あるので指定さりたページサイズ(行数)でファイルに保存
for csv_lines in paginate(csv_reader, page_size=page_size):
input_total += len(csv_lines)
out_lines: List[str] = ip_start_to_cidr_network(csv_lines)
output_total += len(out_lines)
app_logger.info(
f"{input_total}, input_lines: {input_total}, output_lines:{output_total}"
)
fp_writer.writelines(out_lines)
fp_writer.flush()
except Exception as ex:
app_logger.error(ex)
exit(1)
finally:
if fp_reader is not None:
fp_reader.close()
if fp_writer is not None:
fp_writer.close()
app_logger.info(f"Saved {output_file}")
if __name__ == '__main__':
batch_main()
3-2. python ソース
ソースコード全体を再掲します。
import argparse
import csv
import logging
import os
import typing
from typing import Iterator, List, Optional, Tuple
from ipaddress import (
ip_address, summarize_address_range, IPv4Address, IPv4Network
)
"""
RIR ipv4 allocated CSV file to cidr network.
"""
OUT_CSV_HEADER: str = '"network_addr","country_code","allocated_date","registry_id"\n'
def paginate(reader: Iterator, page_size: int = 5000):
pages: List = []
it: Iterator = iter(reader)
while True:
try:
for i in range(page_size):
pages.append(next(it))
yield pages
pages = []
except StopIteration:
if pages:
yield pages
return
def get_file_reader(file_name: str):
fp = open(file_name, mode='r')
return fp
def get_file_writer(file_name: str):
fp = open(file_name, mode='w')
return fp
def ip_start_to_cidr_network(csv_lines: List[List]) -> List[str]:
@typing.no_type_check
def get_cidr_list(ip_start: str,
ip_count: int) -> List[str]:
addr_first: IPv4Address = ip_address(ip_start)
addr_last: IPv4Address = addr_first + ip_count - 1
cidr_ite: Iterator[IPv4Network] = summarize_address_range(addr_first, addr_last)
return [str(network) for network in cidr_ite]
result: List[str] = []
for csv_line in csv_lines:
cc: str = csv_line[2]
alloc_date: str = csv_line[3]
registry_id: int = int(csv_line[4])
cidr_list: List[str] = get_cidr_list(csv_line[0], int(csv_line[1]))
for cidr in cidr_list:
out_line: str = f'"{cidr}","{cc}","{alloc_date}",{registry_id}\n'
result.append(out_line)
return result
def batch_main():
logging.basicConfig(format='%(asctime)s %(levelname)s %(message)s',
datefmt='%Y-%m-%d %H:%M:%S')
app_logger = logging.getLogger(__name__)
app_logger.setLevel(level=logging.DEBUG)
parser = argparse.ArgumentParser()
parser.add_argument("--csv-file", type=str, required=True,
help="Source csv file path.")
# 変換後のCSVファイル保存先 ※未指定なら --csv-file のディレクトリ
parser.add_argument("--output-dir", type=str,
help="Output csv file directory.")
parser.add_argument("--page-size", type=int,
default=5000,
help="CSV row count size.")
args: argparse.Namespace = parser.parse_args()
csv_file: str = args.csv_file
output_dir: Optional[str] = args.output_dir
app_logger.info(f"csv_file: {csv_file}")
# 元のCSVファイルチェック
f_path: str
if csv_file.find("~/") == 0:
f_path = os.path.expanduser(csv_file)
else:
f_path = csv_file
if not os.path.exists(f_path):
app_logger.error(f"FileNotFound: {f_path}")
exit(1)
# 出力ファイル名: オリジナル名にアンダースコア修飾する(xxx_cidr.csv)
file_name: str = os.path.basename(f_path)
names: Tuple[str, str] = os.path.splitext(file_name)
out_name: str = f"{names[0]}_cidr{names[1]}"
# 出力先
output_file: str
if output_dir is None:
# 出力先が未指定ならソースCSVのディレクトリ
dir_name: str = os.path.dirname(f_path)
output_file = os.path.join(dir_name, out_name)
else:
# 指定ディレクトリ
output_file = os.path.join(os.path.expanduser(output_dir), out_name)
# ページサイズ
page_size: int = args.page_size
# ファイル操作オブジェクトを開く
fp_reader = get_file_reader(f_path)
app_logger.debug(fp_reader)
fp_writer = get_file_writer(output_file)
app_logger.debug(fp_writer)
try:
csv_reader = csv.reader(fp_reader, delimiter=',', dialect='unix')
# ソースCSVのヘッダースキップ
next(csv_reader)
# 出力CSVのヘッダー出力
fp_writer.write(OUT_CSV_HEADER)
fp_writer.flush()
# データ読み込み
input_total: int = 0
output_total: int = 0
csv_lines: List[List]
# RIRデータのCSVは約20万レコード以上あるので指定さりたページサイズ(行数)でファイルに保存
for csv_lines in paginate(csv_reader, page_size=page_size):
input_total += len(csv_lines)
out_lines: List[str] = ip_start_to_cidr_network(csv_lines)
output_total += len(out_lines)
app_logger.info(
f"{input_total}, input_lines: {input_total}, output_lines:{output_total}"
)
fp_writer.writelines(out_lines)
fp_writer.flush()
except Exception as ex:
app_logger.error(ex)
exit(1)
finally:
if fp_reader is not None:
fp_reader.close()
if fp_writer is not None:
fp_writer.close()
app_logger.info(f"Saved {output_file}")
if __name__ == '__main__':
batch_main()
4. スクリプト実行
オリジナルのCSVファイルの内容 (抜粋)
"ip_start","ip_count","country_code","allocated_date","registry_id"
"1.0.1.0",256,"CN","20110414",1
"1.0.2.0",512,"CN","20110414",1
"1.0.4.0",1024,"AU","20110412",1
"1.0.8.0",2048,"CN","20110412",1
"1.0.16.0",4096,"JP","20110412",1
"1.0.32.0",8192,"CN","20110412",1
"1.0.64.0",16384,"JP","20110412",1
"1.0.128.0",32768,"TH","20110408",1
"1.1.0.0",256,"CN","20110414",1
"1.1.2.0",512,"CN","20110414",1
"1.1.4.0",1024,"CN","20110414",1
"1.1.8.0",256,"CN","20110412",1
"1.1.9.0",256,"CN","20110412",1
"1.1.10.0",512,"CN","20110412",1
"1.1.12.0",1024,"CN","20110412",1
"1.1.16.0",4096,"CN","20110412",1
"1.1.32.0",8192,"CN","20110412",1
"1.1.64.0",16384,"JP","20110412",1
"1.1.128.0",32768,"TH","20110408",1
"1.2.0.0",512,"CN","20110414",1
...以下省略...
4-1. pythonスクリプト実行
ページサイズ 20000 (行) で実行
※パスはユーザーホームのプロジェクトディレクトリですが省略しています。
$ . py_venv/py_psycopg2/bin/activate
(py_psycopg2) $ python RirIpv4Allocated_to_cidr_csv.py \
> --csv-file ~/Documents/public/RIR/csv/ipv4-all-2024-10-16.csv \
> --output-dir ~/data/sql/qiita_example/mainte2/csv/RIR \
> --page-size 20000
2024-10-17 15:08:08 INFO csv_file: /home/qiita/Documents/public/RIR/csv/ipv4-all-2024-08-30.csv
2024-10-17 15:08:08 DEBUG <_io.TextIOWrapper name='/home/qiita/Documents/public/RIR/csv/ipv4-all-2024-08-30.csv' mode='r' encoding='UTF-8'>
2024-10-17 15:08:08 DEBUG <_io.TextIOWrapper name='/home/qiita/data/sql/qiita_example/mainte2/csv/RIR/ipv4-all-2024-08-30_cidr.csv' mode='w' encoding='UTF-8'>
2024-10-17 15:08:08 INFO 20000, input_lines: 20000, output_lines:20000
2024-10-17 15:08:09 INFO 40000, input_lines: 40000, output_lines:40001
2024-10-17 15:08:09 INFO 60000, input_lines: 60000, output_lines:60008
2024-10-17 15:08:10 INFO 80000, input_lines: 80000, output_lines:80008
2024-10-17 15:08:10 INFO 100000, input_lines: 100000, output_lines:100008
2024-10-17 15:08:10 INFO 120000, input_lines: 120000, output_lines:120008
2024-10-17 15:08:11 INFO 140000, input_lines: 140000, output_lines:140008
2024-10-17 15:08:11 INFO 160000, input_lines: 160000, output_lines:160015
2024-10-17 15:08:12 INFO 180000, input_lines: 180000, output_lines:180017
2024-10-17 15:08:12 INFO 200000, input_lines: 200000, output_lines:200018
2024-10-17 15:08:12 INFO 201804, input_lines: 201804, output_lines:201822
2024-10-17 15:08:12 INFO Saved /home/qiita/data/sql/qiita_example/mainte2/csv/RIR/ipv4-all-2024-08-30_cidr.csv
変換後のCSVファイルの内容 (抜粋)
"network_addr","country_code","allocated_date","registry_id"
"1.0.1.0/24","CN","20110414",1
"1.0.2.0/23","CN","20110414",1
"1.0.4.0/22","AU","20110412",1
"1.0.8.0/21","CN","20110412",1
"1.0.16.0/20","JP","20110412",1
"1.0.32.0/19","CN","20110412",1
"1.0.64.0/18","JP","20110412",1
"1.0.128.0/17","TH","20110408",1
"1.1.0.0/24","CN","20110414",1
"1.1.2.0/23","CN","20110414",1
"1.1.4.0/22","CN","20110414",1
"1.1.8.0/24","CN","20110412",1
"1.1.9.0/24","CN","20110412",1
"1.1.10.0/23","CN","20110412",1
"1.1.12.0/22","CN","20110412",1
"1.1.16.0/20","CN","20110412",1
"1.1.32.0/19","CN","20110412",1
"1.1.64.0/18","JP","20110412",1
"1.1.128.0/17","TH","20110408",1
"1.2.0.0/23","CN","20110414",1
...以下省略...
4-2. csvインポート
4-2-1. テーブル生成クエリ
DROP TABLE IF EXISTS mainte2.RIR_ipv4_allocated_cidr CASCADE;
DROP TABLE IF EXISTS mainte2.RIR_registory_mst;
-- レジストリ名テーブル
-- name: {afrinic,apnic,arin,iana,lacnic,ripencc}
-- https://www.apnic.net/about-apnic/corporate-documents/documents/
-- resource-guidelines/rir-statistics-exchange-format/
CREATE TABLE mainte2.RIR_registory_mst(
id SMALLINT PRIMARY KEY,
name VARCHAR(8) NOT NULL
);
INSERT INTO mainte2.RIR_registory_mst(id, name) VALUES
(1,'apnic')
,(2,'afrinic')
,(3,'arin')
,(4,'lacnic')
,(5,'ripencc')
,(6,'iana');
-- APNICで公開している各国に割り当てているIPアドレス情報からipv4アドレスのみを抽出したマスタテーブル
-- (変更) region -> registry
CREATE TABLE mainte2.RIR_ipv4_allocated_cidr(
network_addr CIDR NOT NULL,
country_code CHAR(2) NOT NULL,
allocated_date DATE NOT NULL,
registry_id SMALLINT NOT NULL
);
ALTER TABLE mainte2.RIR_ipv4_allocated_cidr ADD CONSTRAINT pk_RIR_ipv4_allocated_cidr
PRIMARY KEY (network_addr);
ALTER TABLE mainte2.RIR_ipv4_allocated_cidr ADD CONSTRAINT fk_RIR_ipv4_allocated_cidr_registry
FOREIGN KEY (registry_id) REFERENCES mainte2.RIR_registory_mst (id);
ALTER TABLE mainte2.RIR_registory_mst OWNER TO developer;
ALTER TABLE mainte2.RIR_ipv4_allocated_cidr OWNER TO developer;
dockerコンテナ内の psql コマンドでテーブル生成クエリーを実行する
※パスはコンテナ環境に依存するため省略していますが通常は絶対パスを指定します。
$ docker exec -it postgres-qiita bash
c1d55b8abc42:/# psql -Udeveloper qiita_exampledb < 14_create_rir_ipv4_allocated_cidr.sql
NOTICE: table "RIR_ipv4_allocated_cidr" does not exist, skipping
DROP TABLE
NOTICE: table "rir_registory_mst" does not exist, skipping
DROP TABLE
CREATE TABLE
INSERT 0 6
CREATE TABLE
ALTER TABLE
ALTER TABLE
ALTER TABLE
ALTER TABLE
4-2-2. CSVファイルインポートスクリプト
実行中のdockerコンテナ内の psql コマンドでクエリー実行
CSVファイルのレコードが20万件を超えるためインポート前に PK 制約をドロップします。
- PK 制約ドロップ
- CSVファイルのインポート
- PK 制約を追加
#!/bin/bash
# https://stackoverflow.com/questions/34736762/script-to-automat-import-of-csv-into-postgresql
# Script to automat import of CSV into PostgreSQL
# PK制約をドロップ
psql -Udeveloper -d qiita_exampledb -c \
"ALTER TABLE mainte2.RIR_ipv4_allocated_cidr
DROP CONSTRAINT pk_RIR_ipv4_allocated_cidr;"
sleep 1
# データインポート
psql -Udeveloper -d qiita_exampledb -c \
"\copy mainte2.RIR_ipv4_allocated_cidr FROM
'/home/qiita/data/sql/qiita_example/mainte2/csv/RIR/${1}'
DELIMITER ',' CSV HEADER;"
sleep 2
# PK制約を戻す
psql -Udeveloper -d qiita_exampledb -c \
"ALTER TABLE mainte2.RIR_ipv4_allocated_cidr
ADD CONSTRAINT pk_RIR_ipv4_allocated_cidr PRIMARY KEY (network_addr);"
インポートシェルスクリプトを実行
※シェルスクリプトパスはコンテナ環境に依存するため省略していま。
c1d55b8abc42:/# ./import_from_2_rir_ipv4_cidr_csv.sh ipv4-all-2024-08-30_cidr.csv
ALTER TABLE
COPY 201822
ALTER TABLE
4-3. 国コード・ネットワークアドレス検索
4-3-1. PostgreSQL接続情報
{
"host": "{hostname}",
"port": "5432",
"database": "qiita_exampledb",
"user": "developer",
"password": "developerpassword"
}
4-3-2. python スクリプト
投稿記事「PostgreSQL 便利なネットワークアドレス型を活用する」で、検索クエリーについて解説しているのでここではソースのみを示します。
import argparse
import json
import logging
import os
import socket
from logging import Logger
from typing import Optional, Tuple
import psycopg2
from psycopg2.extensions import connection, cursor
"""
Qiita投稿用スクリプト
IPアドレスのネットワークアドレスと国コードをRIRデータテーブルから検索する
[テーブル名] mainte2.RIR_ipv4_allocated_cidr
"""
# データベース接続情報
DB_CONF_FILE: str = os.path.join("conf", "db_conn.json")
# PostgreSQLデータベース接続生成クラス
class PgDatabase(object):
def __init__(self, configfile,
hostname: Optional[str] = None,
logger: Optional[logging.Logger] = None):
self.logger = logger
with open(configfile, 'r') as fp:
db_conf = json.load(fp)
if hostname is None:
hostname = socket.gethostname()
db_conf["host"] = db_conf["host"].format(hostname=hostname)
self.conn = psycopg2.connect(**db_conf)
if self.logger is not None:
self.logger.debug(self.conn)
def get_connection(self) -> connection:
return self.conn
def rollback(self) -> None:
if self.conn is not None:
self.conn.rollback()
def commit(self) -> None:
if self.conn is not None:
self.conn.commit()
def close(self) -> None:
if self.conn is not None:
if self.logger is not None:
self.logger.debug(f"Close {self.conn}")
self.conn.close()
# ターゲットIPのネットワークがネットワークアドレス国コードテーブルに存在するかチェック
def target_ip_include_rir_table(
conn: connection,
target_ip: str,
logger: Optional[Logger] = None) -> Optional[Tuple[str, str]]:
if logger is not None:
logger.debug(f"target_ip: {target_ip}")
result: Optional[Tuple[str, str]]
try:
cur: cursor
with conn.cursor() as cur:
cur.execute("""
SELECT
network_addr, country_code
FROM
mainte2.RIR_ipv4_allocated_cidr
WHERE
inet %(target_ip)s << network_addr""",
({'target_ip': target_ip}))
row: Optional[Tuple[str, str]] = cur.fetchone()
if logger is not None:
if cur.query is not None:
logger.debug(f"{cur.query.decode('utf-8')}")
logger.debug(f"row: {row}")
result = row
return result
except (Exception, psycopg2.DatabaseError) as err:
raise err
def batch_main():
logging.basicConfig(format='%(levelname)s %(message)s')
app_logger = logging.getLogger(__name__)
app_logger.setLevel(level=logging.DEBUG)
parser = argparse.ArgumentParser()
parser.add_argument("--target-ip", required=True, type=str,
help="IP address.")
parser.add_argument("--enable-debug", action="store_true",
help="Enable logger debug out.")
args: argparse.Namespace = parser.parse_args()
target_ip: str = args.target_ip
enable_debug: bool = args.enable_debug
db: Optional[PgDatabase] = None
try:
db = PgDatabase(DB_CONF_FILE, logger=None)
conn: connection = db.get_connection()
# ネットワークアドレス国コードテーブル検索
find_rec: Optional[Tuple[str, str]] = target_ip_include_rir_table(
conn, target_ip, logger=app_logger if enable_debug else None
)
if find_rec is not None:
network: str = find_rec[0]
cc: str = find_rec[1]
app_logger.info(
f'Find {target_ip} in RIR_ipv4_allocated_cidr("{network}"'
f', country_code: "{cc}")'
)
else:
app_logger.info(f"{target_ip} is not match in tables.")
except psycopg2.Error as db_err:
app_logger.error(db_err)
exit(1)
except Exception as err:
app_logger.error(err)
exit(1)
finally:
if db is not None:
db.close()
if __name__ == '__main__':
batch_main()
4-3-3. python スクリプト実行
スクリプトパスは省略しています。
(1) ターゲットIPアドレスを含むネットワークアドレスレコード有り
--target-ip 125.124.213.77 --enable-debug
(py_psycopg2) $ python TestDetectCountryCode_in_rir_cidr.py \
> --target-ip 125.124.213.77 --enable-debug
DEBUG target_ip: 125.124.213.77
DEBUG
SELECT
network_addr, country_code
FROM
mainte2.RIR_ipv4_allocated_cidr
WHERE
inet '125.124.213.77' << network_addr
DEBUG row: ('125.112.0.0/12', 'CN')
INFO Find 125.124.213.77 in RIR_ipv4_allocated_cidr("125.112.0.0/12", country_code: "CN")
(2) ターゲットIPアドレスを含むネットワークアドレスレコード無し
--target-ip 103.253.175.77 --enable-debug
(py_psycopg2) $ python TestDetectCountryCode_in_rir_cidr.py --target-ip 103.253.175.77 --enable-debug
DEBUG target_ip: 103.253.175.77
DEBUG
SELECT
network_addr, country_code
FROM
mainte2.RIR_ipv4_allocated_cidr
WHERE
inet '103.253.175.77' << network_addr
DEBUG row: None
INFO 103.253.175.77 is not match in tables.
5. 最後に
過去の投稿「不正アクセスしてきたホストの国コードを知ってセキュリティ対策に活用する」で紹介した検索処理は地域インターネットレジストリデータのIPv4アドレス情報 (開始IPアドレス、IP個数) をそのままテーブルに登録していたため国コード・ネットワークアドレス検索処理がかなり複雑になっていました。
今回紹介した 変換用の python スクリプトで IPv4アドレス情報を CIDR 形式のネットワークアドレスに変換したCSVファイルをテーブルにインポートすることで検索処理が簡単になりました。
記事で紹介したスクリプトとサンプルCSVフアイルを下記GihtHubで公開しています。
GitHub pipito-yuko/ qiita-posts / python / RirIpv4Allocated_cidr
RirIpv4Allocated_cidr/
└── src
├── docker
│ ├── Dockerfile
│ ├── docker-compose.yml
│ └── initdb
│ ├── 10_createdb.sql
│ └── 14_create_rir_ipv4_allocated_cidr.sql
├── python_project
│ ├── RirIpv4Allocated_to_cidr_csv.py # IPアドレス情報をネットワークアドレス(CIDR形式)に変換
│ ├── TestDetectCountryCode_in_rir_cidr.py # ネットワークアドレス・国コード検索
│ ├── conf
│ │ └── db_conn.json
│ ├── csv
│ │ └── ipv4-all-2024-10-16.csv # 2024-10-16時点のRIRデータのオリジナルCSVファイル (変換前)
│ └── mypy.ini
├── requirements.txt
└── sql
└── scripts
└── import_from_2_rir_ipv4_cidr_csv.sh # 変換後のCSVインポートシェルスクリプト