過去の投稿で地域インターネットレジストリ(RIR) で公開されているデータから不正アクセスしたホストの国コードとネットワークアドレスを取得する方法を紹介いたしました。
そのときの出力を再掲します。
(py_psycopg2) $ python TestDetectCountryCode.py --target-ip 103.77.241.34
INFO Find 103.77.241.34 in (network: "103.77.240.0/23", country_code: "VN")
上記投稿で紹介した地域インターネットレジストリ(RIR) で公開されているデータはデータベースで管理しており開始IPアドレス列に関しては文字列として格納していました。
そのときに紹介した方法ではネットワークアドレス (CIDR形式) と国コードを取得するまでに2段階の処理を実行していました。
-
地域インターネットレジストリデータ (テーブル) の検索
LIKE検索を繰返してIPアドレスを含むと思われるIPv4の開始アドレスの候補を見つける
※候補となるIPv4の開始アドレスは数十件〜数万件になる -
python の ipaddress モジュールを使いネットワークアドレス計算する
開始IPアドレスとIPアドレスの個数からブロードキャストアドレスを計算して比較処理を繰り返してIPアドレスを含むネットワークアドレスを特定
【処理概要】
ただ前回紹介した方法ではせっかく取得したネットワークアドレスが使い捨てになっていました。
ネットワークアドレス型の活用
今回の記事はネットワークアドレスと国コードを新たなテーブルに登録して再利用しようとするもので、最初に紹介した記事の拡張版の位置づけとなります。
ネットワークアドレス(CIDR形式) 103.77.240.0/23
を PostgreSQLのネットワークアドレス型 (cidr型) で保存すると、IPアドレスを含むネットワークアドレスの検索が非常に簡単になります。
IPアドレスがネットワークアドレスに含まれるかの検索用SQLの例を示します。
※cidr_addr は PostgreSQL の CIDR型とします
SELECT cidr_addr FROM match_network WHERE inet '107.172.133.181' << cidr_addr;
1. 実行環境
- OS: Ubuntu Desktop 22.04
- データベース
Dockerコンテナ内で稼働する PostgreSQL-16 - python 3.10.12
スクリプト専用の仮想環境を作成し下記ライブラリをインストール
pip install psycopg2-binary
- データベース
2. 参考サイト
下記日本語サイトのドキュメントのサンプルを参考にしています。
3. データベース
以降 本投稿用のデータベースを qiita_exampledb 、ユーザ developer として説明します
3-1. テーブル定義
この記事用に作成したテーブル定義
-- Qiita投稿用のテストスキーマ
CREATE SCHEMA mainte2;
-- RIR_ipv4_allocated 検索で見つかったネットワークIP(開始アドレス)と国コードを保持
CREATE TABLE mainte2.match_network(
id INTEGER NOT NULL,
cidr_addr CIDR NOT NULL,
country_code CHAR(2) NOT NULL
);
-- シーケンス定義
CREATE SEQUENCE mainte2.seq_match_network_id OWNED BY mainte2.match_network.id;
-- カラムidのデフォルト値をシーケンスに変更
ALTER TABLE mainte2.match_network ALTER id SET DEFAULT nextval('mainte2.seq_match_network_id');
ALTER TABLE mainte2.match_network ADD CONSTRAINT pk_match_network PRIMARY KEY (id);
-- IPネットワークはユニーク
CREATE UNIQUE INDEX idx_match_network ON mainte2.match_network(cidr_addr);
ALTER SCHEMA mainte2 OWNER TO developer;
ALTER TABLE mainte2.match_network OWNER TO developer;
4. スクリプト
4-1. psqlコマンドで検証
4-1-1. ネットワークアドレス関数と演算子の使い方
dockerコンテナ内の psql コマンドでパイプを通してSQLを実行します。
参考サイト② に掲載されている [表9.38] IPアドレス演算子の中から、「サブネットが完全にサブネットに含まれているか?」で示されているサンプルを実行し使い方を確認します。
1e2b0f7bc9c8:/# echo "SELECT (inet '192.168.1.5' << inet '192.168.1/24')
as include;" | psql -Udeveloper qiita_exampledb
include
---------
t
(1 row)
1e2b0f7bc9c8:/# echo "SELECT (inet '192.168.0.5' << inet '192.168.1/24')
as include;" | psql -Udeveloper qiita_exampledb
include
---------
f
(1 row)
(2) 左辺のIPアドレスがネットワークIP='103.77.240.0/23' に含まれるか検証
- 左辺のIPアドレス例
- (1) 含まれるIPアドレス: '103.77.241.34'
- (2) 含まれるブロードキャストアドレス: '103.77.241.255'
- (3) 範囲外のブロードキャストアドレス: '103.77.239.255'
# (1) 含まれるIPアドレス: '103.77.241.34'
1e2b0f7bc9c8:/# echo "SELECT (inet '103.77.241.34' << inet '103.77.240.0/23')
as include;" | psql -Udeveloper qiita_exampledb
include
---------
t
(1 row)
# (2) ブロードキャストアドレス: '103.77.241.255'
1e2b0f7bc9c8:/# echo "SELECT (inet '103.77.241.255' << inet '103.77.240.0/23')
as include;" | psql -Udeveloper qiita_exampledb
include
---------
t
(1 row)
# (3) 範囲外のブロードキャストアドレス: '103.77.239.255'
1e2b0f7bc9c8:/# /# echo "SELECT (inet '103.77.239.255' << inet '103.77.240.0/23')
as include;" | psql -Udeveloper qiita_exampledb
include
---------
f
(1 row)
4-1-2. 検索SQL
確認用レコードをネットワークアドレス国コードテーブルに登録
d52acdd248a7:/#echo "INSERT INTO mainte2.match_network(cidr_addr,country_code)
VALUES
('103.77.240.0/23','VN'),
('177.240.0.0/13','MX'),
('125.112.0.0/12','CN'),
('92.118.36.0/22','RO') RETURNING id,cidr_addr,country_code;" \
> | psql -Udeveloper qiita_exampledb
id | cidr_addr | country_code
----+-----------------+--------------
1 | 103.77.240.0/23 | VN
2 | 177.240.0.0/13 | MX
3 | 125.112.0.0/12 | CN
4 | 92.118.36.0/22 | RO
(4 rows)
INSERT 0 4
(1) レコード有り: IPアドレス('103.77.241.34')
d52acdd248a7:/# echo "SELECT * FROM mainte2.match_network
WHERE inet '103.77.241.34' << cidr_addr;" \
> | psql -Udeveloper qiita_exampledb
id | cidr_addr | country_code
----+-----------------+--------------
1 | 103.77.240.0/23 | VN
(1 row)
(2) レコード無し: IPアドレス('103.77.242.1')
d52acdd248a7:/# echo "SELECT * FROM mainte2.match_network
WHERE inet '103.77.242.1' << cidr_addr;" \
> | psql -Udeveloper qiita_exampledb
id | cidr_addr | country_code
----+-----------+--------------
(0 rows)
(3) 検索シェルスクリプト作成
#!/bin/bash
cat <<-EOF | psql -Udeveloper qiita_exampedb
SELECT
id, cidr_addr, country_code
FROM
mainte2.match_network
WHERE
inet '${1}' << cidr_addr;
EOF
スクリプトの実行
※コンテナ実行時のパスは環境依存のため省略しています。
d52acdd248a7:# ./find_ip_include_match_network.sh 92.118.39.133
id | cidr_addr | country_code
----+----------------+--------------
1 | 92.118.36.0/22 | RO
(1 row)
4-2. pythonスクリプト
4-2-0. 処理概要
ネットワークアドレス検索処理
- 入力されたIPアドレスに対し以下の2つの検索処理を実行
-
① ネットワークアドレス国コードテーブル検索
- IPアドレスを含むネットワークアドレス (cidr型) のレコード有り
ネットワークアドレスと国コードを出力して終了 - レコード無し
② RIR ipv4 割当済みテーブル検索へ
- IPアドレスを含むネットワークアドレス (cidr型) のレコード有り
-
② RIR ipv4 割当済みテーブル検索
- IPアドレスを含むネットワークアドレス (開始IP, IP個数) レコード有り
(1) ネットワークアドレスと国コードを出力
(2) 検索結果をネットワークアドレス国コードテーブルに登録する - レコード無し
国コード不明を出力
- IPアドレスを含むネットワークアドレス (開始IP, IP個数) レコード有り
-
① ネットワークアドレス国コードテーブル検索
①の検索で該当するレコードが見つかった場合
②の検索処理で該当するレコードが見つかった場合
4-2-1. クエリー実行関数
テーブル操作関数の共通インポート
from logging import Logger
from typing import Dict, List, Optional, Tuple
import psycopg2
from psycopg2.extensions import connection, cursor
4-2-1 (1) ネットワークアドレス国コード検索
IPアドレスを含むネットワークアドレスがネットワークアドレス国コードテーブルに存在するか検索する
- 引数
- DB接続オブジェクト
- IPアドレス (文字列)
- 戻り値
- ターゲットIPを含むネットワークアドレスのレコード有り
レコード(ID, ネットワークアドレス, 国コード) をタプルで返却 - 該当レコード無し
None 返却
- ターゲットIPを含むネットワークアドレスのレコード有り
def target_ip_include_match_network(
conn: connection,
target_ip: str,
logger: Optional[Logger] = None) -> Optional[Tuple[str, str]]:
if logger is not None:
logger.debug(f"target_ip: {target_ip}")
result: Optional[Tuple[str, str]]
try:
cur: cursor
with conn.cursor() as cur:
cur.execute("""
SELECT
cidr_addr, country_code
FROM
mainte2.match_network
WHERE
inet %(target_ip)s << cidr_addr""",
({'target_ip': target_ip}))
row: Optional[Tuple[str, str]] = cur.fetchone()
if logger is not None:
if cur.query is not None:
logger.debug(f"{cur.query.decode('utf-8')}")
logger.debug(f"row: {row}")
result = row
return result
except (Exception, psycopg2.DatabaseError) as err:
raise err
(1) 該当レコード有りの場合のログ出力
DEBUG target_ip: 125.124.213.77
DEBUG
SELECT
cidr_addr, country_code
FROM
mainte2.match_network
WHERE
inet '125.124.213.77' << cidr_addr
DEBUG row: ('125.112.0.0/12', 'CN')
(2) 該当レコード無しの場合のログ出力
DEBUG target_ip: 119.28.156.59
DEBUG
SELECT
cidr_addr, country_code
FROM
mainte2.match_network
WHERE
inet '119.28.156.59' << cidr_addr
DEBUG row: None
4-2-1 (2) ネットワークアドレス国コードテーブル登録
RIR ipv4 割当て済みテーブルから取得したネットワークと国コードをネットワークアドレス国コードテーブルに登録する
- 引数
- DB接続オブジェクト
- 登録用辞書オブジェクト
{"cidr_addr": ネットワークアドレス(CIDR形式),"country_code": 国コード}
- 戻り値
- 登録時に採番されたIDとネットワークアドレスのタプル
def insert_match_network(
conn: connection,
params: Dict[str, str],
logger: Optional[Logger] = None) -> Optional[Tuple[int, str]]:
if logger is not None:
logger.debug(f"params: {params}")
try:
cur: cursor
with conn.cursor() as cur:
cur.execute(
"""
INSERT INTO mainte2.match_network(cidr_addr, country_code)
VALUES (%(cidr_addr)s, %(country_code)s) RETURNING id,cidr_addr""",
params
)
if logger is not None:
if cur.query is not None:
logger.debug(f"{cur.query.decode('utf-8')}")
# 戻り値: 登録ID, ネットワークアドレス
row: Optional[Tuple[int, str]] = cur.fetchone()
if logger is not None:
logger.debug(row)
return row
except (Exception, psycopg2.DatabaseError) as err:
raise err
関数実行時のDEBUG出力
DEBUG params: {'cidr_addr': '119.28.0.0/15', 'country_code': 'CN'}
DEBUG
INSERT INTO mainte2.match_network(cidr_addr, country_code)
VALUES ('119.28.0.0/15', 'CN') RETURNING id,cidr_addr
DEBUG (5, '119.28.0.0/15')
4-2-2. RIR ipv4 割当て済みテーブル検索
先頭の記事で紹介した関数を再掲します。処理内容については先頭記事をご覧ください。
def get_rir_table_matches(
conn: connection,
like_ip: str,
logger: Optional[Logger] = None) -> List[Tuple[str, int, str]]:
if logger is not None:
logger.debug(f"like_ip: {like_ip}")
result: List[Tuple[str, int, str]]
try:
cur: cursor
# 前ゼロ埋めしたIPアドレスの昇順にソートする
with conn.cursor() as cur:
cur.execute("""
SELECT
ip_start,ip_count,country_code
FROM
mainte2.RIR_ipv4_allocated
WHERE
ip_start LIKE %(partial_match)s
ORDER BY
LPAD(SPLIT_PART(ip_start,'.',1), 3, '0') || '.' ||
LPAD(SPLIT_PART(ip_start,'.',2), 3, '0') || '.' ||
LPAD(SPLIT_PART(ip_start,'.',3), 3, '0') || '.' ||
LPAD(SPLIT_PART(ip_start,'.',4), 3, '0')""",
({'partial_match': like_ip}))
# レコード取得件数チェック
if cur.rowcount > 0:
rows: List[Tuple[str, int, str]] = cur.fetchall()
if logger is not None:
logger.debug(f"rows.size: {len(rows)}")
for row in rows:
logger.debug(f"{row}")
result = rows
else:
# マッチしなかったら空のリスト
result = []
return result
except (Exception, psycopg2.DatabaseError) as err:
raise err
4-3. 処理メイン処理
4-3-1. RIRテーブル検索メイン処理
複数のクエリー検索実行関数を呼び出しネットワークアドレスと国コードを取得する
- 検索処理手順
-
① ネットワークアドレス国コードテーブル検索処理を実行
※「# ■ 今回追加した検索処理 ■」で囲った部分 -
② RIR ipv4 割当済み検索処理実行
対象となるネットワークアドレスのリストを取得する -
③ ネットワークアドレス特定処理を実行
※1「# ■ ネットワークアドレス特定処理 ■」で囲った部分
※2 前回記事のスクリプトでは、この処理を抜けてから実行していました
-
① ネットワークアドレス国コードテーブル検索処理を実行
- 戻り値
- 該当レコードが有り
ネットワークアドレス・国コード、検索テーブル名を保持するデータクラス - 該当レコードが無し
None
- 該当レコードが有り
def get_matches_main(
conn: connection,
target_ip: str,
logger: Optional[logging.Logger] = None) -> Optional[MatchNetworkData]:
def make_like_ip(like_old: str) -> Optional[str]:
# 末尾の likeプレースホルダを削除する
raw_ip: str = like_old.replace(".%", "")
fields: List[str] = raw_ip.split(".")
# コンマで区切って残りが1つなら終了
field_size: int = len(fields)
if field_size == 1:
return None
# フィールドを1つ減らす
del fields[field_size - 1]
# 末尾にlikeプレースホルダ(".%")を付加して終了
return ".".join(fields) + ".%"
# ■ 今回追加した検索処理 ■
# ネットワークアドレス国コードテーブル検索
find_rec: Optional[Tuple[str, str]] = target_ip_include_match_network(
conn, target_ip, logger=logger
)
if find_rec is not None:
# レコードが見つかったら処理終了
return MatchNetworkData(
network_ip=find_rec[0], country_code=find_rec[1], from_table="match_network"
)
# ■ 今回追加した検索処理 ■
# RIR_ipv4_allocated テーブル検索処理
target_ip_addr: IPv4Address = ip_address(target_ip) # type: ignore
like_ip: Optional[str] = make_like_ip(target_ip)
matches: Optional[List[Tuple[str, int, str]]] = None
while like_ip is not None:
matches = get_rir_table_matches(conn, like_ip, logger=logger)
if len(matches) > 0:
# 先頭レコードの開始IPアドレス
first_ip: str = matches[0][0]
first_ip_addr: IPv4Address = ip_address(first_ip) # type: ignore
# 最終レコードの開始IPアドレス
last: Tuple[str, int, str] = matches[-1]
last_ip: str = last[0]
ip_cnt: int = int(last[1])
last_ip_addr: IPv4Address = ip_address(last_ip) # type: ignore
# 最終レコードのブロードキャストアドレス計算
broadcast_addr: IPv4Address = last_ip_addr + ip_cnt - 1 # type: ignore
if logger is not None:
logger.info(f"match_first: {first_ip}, match_last: {last_ip}")
if first_ip_addr < target_ip_addr < broadcast_addr:
# ターゲットIPが先頭レコードの開始IPと最終レコードのブロードキャストの範囲内なら終了
if logger is not None:
logger.debug(
f"Range in ({first_ip} < {target_ip} < {str(broadcast_addr)})"
f", break"
)
break
else:
# 範囲外: 次のlike検索文字列を生成して検索処理に戻る
like_ip = make_like_ip(like_ip)
if logger is not None:
logger.info(f"next {like_ip} continue.")
else:
# レコード無し: 次のlike検索文字列を生成して検索処理に戻る
if logger is not None:
logger.info(f"{like_ip} is no match.")
like_ip = make_like_ip(like_ip)
# ■ ネットワークアドレス特定処理 ■
# ターケットIPが属するネットワークアドレスと国コードを取得する
upd_cc: Optional[str]
# ターゲットIPのネットワーク(CIDR表記)と国コードを取得する
if matches is not None and len(matches) > 0:
network: Optional[str]
cc: Optional[str]
network, cc = detect_cc_in_matches(target_ip, matches, logger=logger)
if network is not None and cc is not None:
return MatchNetworkData(
network_ip=network, country_code=cc, from_table="RIR_ipv4_allocated"
)
else:
# RIR_ipv4_allocated テーブルにターゲットIPを含むネットワークアドレスなし
return None
else:
# RIR_ipv4_allocated テーブルに該当レコードなし
return None
# ■ ネットワークアドレス特定処理 ■
4-3-2. バッチメイン処理
- 処理手順
- 入力パラメータ処理
検索対象のIPアドレスを取得する - PostgreSQL接続オブジェクト取得
- RIRテーブル検索メイン処理を実行
- 検索結果出力
-
ネットワークアドレス国コードテーブル登録処理
※1 RIR ipv4 割当済みテーブルから取得した場合に登録する
※2 「# ■ 今回追加した登録処理 ■」に挟まれた部分
- 入力パラメータ処理
def batch_main():
logging.basicConfig(format='%(levelname)s %(message)s')
app_logger = logging.getLogger(__name__)
app_logger.setLevel(level=logging.DEBUG)
parser = argparse.ArgumentParser()
parser.add_argument("--target-ip", required=True, type=str,
help="IP address.")
parser.add_argument("--enable-debug", action="store_true",
help="Enable logger debug out.")
args: argparse.Namespace = parser.parse_args()
target_ip: str = args.target_ip
enable_debug: bool = args.enable_debug
db: Optional[pgdatabase.PgDatabase] = None
try:
db = pgdatabase.PgDatabase(DB_CONF_FILE, logger=None)
conn: connection = db.get_connection()
match_data: Optional[MatchNetworkData] = get_matches_main(
conn, target_ip,
logger=app_logger if enable_debug else None
)
if match_data is not None:
network: str = match_data.network_ip
cc: str = match_data.country_code
from_table: str = match_data.from_table
app_logger.info(
f'Find {target_ip} in {from_table}("{network}", country_code: "{cc}")')
# ■ 今回追加した登録処理 ■
# 見つかった国コードとネットワークアドレスがRIRレコードなら登録する
if from_table == "RIR_ipv4_allocated":
params: Dict[str, str] = {"cidr_addr": network, "country_code": cc}
inserted: Optional[Tuple[int, str]] = insert_match_network(
conn, params, logger=app_logger if enable_debug else None
)
conn.commit()
# 登録結果
if inserted is not None:
reg_id: int = inserted[0]
cidr_addr: str = inserted[1]
app_logger.info(
f'Inserted match_network(id={reg_id}, cidr_addr="{cidr_addr}")')
# ■ 今回追加した登録処理 ■
else:
app_logger.info(f"{target_ip} is not match in tables.")
except psycopg2.Error as db_err:
app_logger.error(db_err)
exit(1)
except Exception as err:
app_logger.error(err)
exit(1)
finally:
if db is not None:
db.close()
if __name__ == '__main__':
batch_main()
4-4. pythonスクリプトソース
以下 (1)と(2)は最初に紹介した投稿記事のソースを再掲します。
(1) PostgreSQL接続情報 (JSON)
{
"host": "{hostname}",
"port": "5432",
"database": "qiita_exampledb",
"user": "developer",
"password": "developerpassword"
}
(2) PostgreSQL接続オブジェクト取得モジュール
import json
import logging
import socket
from typing import Optional
import psycopg2
from psycopg2.extensions import connection
"""
PostgreSQL Database接続生成クラス
"""
class PgDatabase(object):
def __init__(self, configfile,
hostname: Optional[str] = None,
logger: Optional[logging.Logger] = None):
self.logger = logger
with open(configfile, 'r') as fp:
db_conf = json.load(fp)
if hostname is None:
hostname = socket.gethostname()
db_conf["host"] = db_conf["host"].format(hostname=hostname)
# default connection is itarable curosr
self.conn = psycopg2.connect(**db_conf)
# Dictinaly-like cursor connection.
# self.conn = psycopg2.connect(**db_conf, connection_factory=DictConnection)
if self.logger is not None:
self.logger.debug(self.conn)
def get_connection(self) -> connection:
return self.conn
def rollback(self) -> None:
if self.conn is not None:
self.conn.rollback()
def commit(self) -> None:
if self.conn is not None:
self.conn.commit()
def close(self) -> None:
if self.conn is not None:
if self.logger is not None:
self.logger.debug(f"Close {self.conn}")
self.conn.close()
(3) バッチ処理のソース全体を下記に示します。
import argparse
import csv
import json
import logging
import os
from dataclasses import asdict, dataclass
from ipaddress import (
ip_address, summarize_address_range, IPv4Address, IPv4Network
)
from logging import Logger
import typing
from typing import Any, List, Dict, Iterator, Optional, Tuple
import psycopg2
from psycopg2.extensions import connection, cursor
from db import pgdatabase
# データベース接続情報
DB_CONF_FILE: str = os.path.join("conf", "db_conn.json")
# RIR データクラス
@dataclass(frozen=True)
class RirRecord:
ip_start: str
ip_count: int
country_code: str
# RIRテーブル検索メイン処理の戻り値
@dataclass(frozen=True)
class MatchNetworkData:
network_ip: str
country_code: str
from_table: str
def read_json(file_name: str) -> Dict[str, Any]:
with open(file_name, 'r') as fp:
data = json.load(fp)
return data
def read_csv(file_name: str,
skip_header=True, header_cnt=1) -> List[str]:
with open(file_name, 'r') as fp:
reader = csv.reader(fp, dialect='unix')
if skip_header:
for skip in range(header_cnt):
next(reader)
# リストをカンマ区切りで連結する
lines = [",".join(rec) for rec in reader]
return lines
@typing.no_type_check
def get_cidr_cc_list(ip_start: str,
ip_count: int,
country_code: str) -> List[Tuple[IPv4Network, str]]:
addr_first: IPv4Address = ip_address(ip_start)
addr_last: IPv4Address = addr_first + ip_count - 1
cidr_ite: Iterator[IPv4Network] = summarize_address_range(addr_first, addr_last)
return [(cidr, country_code) for cidr in cidr_ite]
@typing.no_type_check
def detect_cc_in_cidr_cc_list(
target_ip: str,
cidr_cc_list: List[Tuple[IPv4Network, str]]
) -> Tuple[Optional[str], Optional[str]]:
target_ip_addr: IPv4Address = ip_address(target_ip)
match_network: Optional[str] = None
match_cc: Optional[str] = None
for cidr_cc in cidr_cc_list:
if target_ip_addr in cidr_cc[0]:
match_network = str(cidr_cc[0])
match_cc = cidr_cc[1]
break
return match_network, match_cc
# RIR_ipv4_allocated のみ mainte2スキーマのものを参照する
def get_rir_table_matches(
conn: connection,
like_ip: str,
logger: Optional[Logger] = None) -> List[Tuple[str, int, str]]:
if logger is not None:
logger.debug(f"like_ip: {like_ip}")
result: List[Tuple[str, int, str]]
try:
cur: cursor
# 前ゼロ埋めしたIPアドレスの昇順にソートする
with conn.cursor() as cur:
cur.execute("""
SELECT
ip_start,ip_count,country_code
FROM
mainte2.RIR_ipv4_allocated
WHERE
ip_start LIKE %(partial_match)s
ORDER BY
LPAD(SPLIT_PART(ip_start,'.',1), 3, '0') || '.' ||
LPAD(SPLIT_PART(ip_start,'.',2), 3, '0') || '.' ||
LPAD(SPLIT_PART(ip_start,'.',3), 3, '0') || '.' ||
LPAD(SPLIT_PART(ip_start,'.',4), 3, '0')""",
({'partial_match': like_ip}))
# レコード取得件数チェック
if cur.rowcount > 0:
rows: List[Tuple[str, int, str]] = cur.fetchall()
if logger is not None:
logger.debug(f"rows.size: {len(rows)}")
for row in rows:
logger.debug(f"{row}")
result = rows
else:
# マッチしなかったら空のリスト
result = []
return result
except (Exception, psycopg2.DatabaseError) as err:
raise err
# ターゲットIPのネットワークがネットワークアドレス国コードテーブルに存在するかチェック
def target_ip_include_match_network(
conn: connection,
target_ip: str,
logger: Optional[Logger] = None) -> Optional[Tuple[str, str]]:
if logger is not None:
logger.debug(f"target_ip: {target_ip}")
result: Optional[Tuple[str, str]]
try:
cur: cursor
with conn.cursor() as cur:
cur.execute("""
SELECT
cidr_addr, country_code
FROM
mainte2.match_network
WHERE
inet %(target_ip)s << cidr_addr""",
({'target_ip': target_ip}))
row: Optional[Tuple[str, str]] = cur.fetchone()
if logger is not None:
if cur.query is not None:
logger.debug(f"{cur.query.decode('utf-8')}")
logger.debug(f"row: {row}")
result = row
return result
except (Exception, psycopg2.DatabaseError) as err:
raise err
# ネットワークアドレス国コードテーブルに一括登録
def insert_match_network(
conn: connection,
params: Dict[str, str],
logger: Optional[Logger] = None) -> Optional[Tuple[int, str]]:
if logger is not None:
logger.debug(f"params: {params}")
try:
cur: cursor
with conn.cursor() as cur:
cur.execute(
"""
INSERT INTO mainte2.match_network(cidr_addr, country_code)
VALUES (%(cidr_addr)s, %(country_code)s) RETURNING id,cidr_addr""",
params
)
if logger is not None:
if cur.query is not None:
logger.debug(f"{cur.query.decode('utf-8')}")
# 戻り値: 登録ID, ネットワークアドレス
row: Optional[Tuple[int, str]] = cur.fetchone()
if logger is not None:
logger.debug(row)
return row
except (Exception, psycopg2.DatabaseError) as err:
raise err
def detect_cc_in_matches(
target_ip: str,
matches: List[Tuple[str, int, str]],
logger: Optional[logging.Logger] = None) -> Tuple[Optional[str], Optional[str]]:
def next_record(rows: List[Tuple[str, int, str]]) -> Iterator[RirRecord]:
for (ip_sta, ip_cnt, cc) in rows:
yield RirRecord(ip_start=ip_sta, ip_count=ip_cnt, country_code=cc)
target_ip_addr: IPv4Address = ip_address(target_ip) # type: ignore
match_network: Optional[str] = None
match_cc: Optional[str] = None
rec: RirRecord
for rec in next_record(matches):
# ターゲットIP が ネットワークIPアドレスより大きい場合は範囲外のため処理終了
if ip_address(rec.ip_start) > target_ip_addr: # type: ignore
if logger is not None:
logger.debug(
f"{target_ip} < {rec.ip_start} break. No more match."
)
# マッチするデータなし
break
# 開始ネットワークIPのブロードキャストアドレスがターゲットIPより小さければ次のレコードへ
broadcast_addr: IPv4Address = (
ip_address(rec.ip_start) + rec.ip_count - 1) # type: ignore
if broadcast_addr < target_ip_addr:
if logger is not None:
logger.debug(f"({str(broadcast_addr)} < {target_ip}) -> continue")
continue
cidr_cc_list: List[Tuple[IPv4Network, str]] = get_cidr_cc_list(
**asdict(rec)
)
if logger is not None:
logger.debug(cidr_cc_list)
match_network, match_cc = detect_cc_in_cidr_cc_list(target_ip, cidr_cc_list)
break
return match_network, match_cc
def get_matches_main(
conn: connection,
target_ip: str,
logger: Optional[logging.Logger] = None) -> Optional[MatchNetworkData]:
def make_like_ip(like_old: str) -> Optional[str]:
# 末尾の likeプレースホルダを削除する
raw_ip: str = like_old.replace(".%", "")
fields: List[str] = raw_ip.split(".")
# コンマで区切って残りが1つなら終了
field_size: int = len(fields)
if field_size == 1:
return None
# フィールドを1つ減らす
del fields[field_size - 1]
# 末尾にlikeプレースホルダ(".%")を付加して終了
return ".".join(fields) + ".%"
# ネットワークアドレス国コードテーブル検索
find_rec: Optional[Tuple[str, str]] = target_ip_include_match_network(
conn, target_ip, logger=logger
)
if find_rec is not None:
# レコードが見つかったら処理終了
return MatchNetworkData(
network_ip=find_rec[0], country_code=find_rec[1], from_table="match_network"
)
# RIR_ipv4_allocated テーブル検索処理
target_ip_addr: IPv4Address = ip_address(target_ip) # type: ignore
like_ip: Optional[str] = make_like_ip(target_ip)
matches: Optional[List[Tuple[str, int, str]]] = None
while like_ip is not None:
matches = get_rir_table_matches(conn, like_ip, logger=logger)
if len(matches) > 0:
# 先頭レコードの開始IPアドレス
first_ip: str = matches[0][0]
first_ip_addr: IPv4Address = ip_address(first_ip) # type: ignore
# 最終レコードの開始IPアドレス
last: Tuple[str, int, str] = matches[-1]
last_ip: str = last[0]
ip_cnt: int = int(last[1])
last_ip_addr: IPv4Address = ip_address(last_ip) # type: ignore
# 最終レコードのブロードキャストアドレス計算
broadcast_addr: IPv4Address = last_ip_addr + ip_cnt - 1 # type: ignore
if logger is not None:
logger.info(f"match_first: {first_ip}, match_last: {last_ip}")
if first_ip_addr < target_ip_addr < broadcast_addr:
# ターゲットIPが先頭レコードの開始IPと最終レコードのブロードキャストの範囲内なら終了
if logger is not None:
logger.debug(
f"Range in ({first_ip} < {target_ip} < {str(broadcast_addr)})"
f", break"
)
break
else:
# 範囲外: 次のlike検索文字列を生成して検索処理に戻る
like_ip = make_like_ip(like_ip)
if logger is not None:
logger.info(f"next {like_ip} continue.")
else:
# レコード無し: 次のlike検索文字列を生成して検索処理に戻る
if logger is not None:
logger.info(f"{like_ip} is no match.")
like_ip = make_like_ip(like_ip)
# ターケットIPが属するネットワークアドレスと国コードを取得する
upd_cc: Optional[str]
# ターゲットIPのネットワーク(CIDR表記)と国コードを取得する
if matches is not None and len(matches) > 0:
network: Optional[str]
cc: Optional[str]
network, cc = detect_cc_in_matches(target_ip, matches, logger=logger)
if network is not None and cc is not None:
return MatchNetworkData(
network_ip=network, country_code=cc, from_table="RIR_ipv4_allocated"
)
else:
# RIR_ipv4_allocated テーブルにターゲットIPを含むネットワークアドレスなし
return None
else:
# RIR_ipv4_allocated テーブルに該当レコードなし
return None
def batch_main():
logging.basicConfig(format='%(levelname)s %(message)s')
app_logger = logging.getLogger(__name__)
app_logger.setLevel(level=logging.DEBUG)
parser = argparse.ArgumentParser()
parser.add_argument("--target-ip", required=True, type=str,
help="IP address.")
parser.add_argument("--enable-debug", action="store_true",
help="Enable logger debug out.")
args: argparse.Namespace = parser.parse_args()
target_ip: str = args.target_ip
enable_debug: bool = args.enable_debug
db: Optional[pgdatabase.PgDatabase] = None
try:
db = pgdatabase.PgDatabase(DB_CONF_FILE, logger=None)
conn: connection = db.get_connection()
match_data: Optional[MatchNetworkData] = get_matches_main(
conn, target_ip,
logger=app_logger if enable_debug else None
)
if match_data is not None:
network: str = match_data.network_ip
cc: str = match_data.country_code
from_table: str = match_data.from_table
app_logger.info(
f'Find {target_ip} in {from_table}("{network}", country_code: "{cc}")')
# 見つかった国コードとネットワークアドレスがRIRレコードなら登録する
if from_table == "RIR_ipv4_allocated":
params: Dict[str, str] = {"cidr_addr": network, "country_code": cc}
inserted: Optional[Tuple[int, str]] = insert_match_network(
conn, params, logger=app_logger if enable_debug else None
)
conn.commit()
# 登録結果
if inserted is not None:
reg_id: int = inserted[0]
cidr_addr: str = inserted[1]
app_logger.info(
f'Inserted match_network(id={reg_id}, cidr_addr="{cidr_addr}")')
else:
app_logger.info(f"{target_ip} is not match in tables.")
except psycopg2.Error as db_err:
app_logger.error(db_err)
exit(1)
except Exception as err:
app_logger.error(err)
exit(1)
finally:
if db is not None:
db.close()
if __name__ == '__main__':
batch_main()
5. pythonスクリプトの実行
5-1. テストレコード
「4-1-2. 検索SQL」で登録したレコードを再掲します
21d653972513:/# echo "SELECT * FROM mainte2.match_network ORDER BY id;" \
> | psql -Udeveloper qiita_exampledb
id | cidr_addr | country_code
----+-----------------+--------------
1 | 103.77.240.0/23 | VN
2 | 177.240.0.0/13 | MX
3 | 125.112.0.0/12 | CN
4 | 92.118.36.0/22 | RO
(4 rows)
5-2. スクリプト実行
python 仮想環境 (py_psycopg2) に入る
$ . py_venv/py_psycopg2/bin/activate
(py_psycopg2) $
(1) ネットワークアドレス国コードテーブルに該当レード有り
(py_psycopg2) $ python TestDetectCountryCode_in_match_network.py \
> --target-ip 125.124.213.77 --enable-debug
DEBUG target_ip: 125.124.213.77
DEBUG
SELECT
cidr_addr, country_code
FROM
mainte2.match_network
WHERE
inet '125.124.213.77' << cidr_addr
DEBUG row: ('125.112.0.0/12', 'CN')
INFO Find 125.124.213.77 in match_network("125.112.0.0/12", country_code: "CN")
(2) RIR ipv4 割当済みテーブルに該当レード有り
※ネットワークアドレス国コードテーブル該当レード無し
(py_psycopg2) $ python TestDetectCountryCode_in_match_network.py \
> --target-ip 119.28.156.59 --enable-debug
INFO target_ip: 119.28.156.59, enable_debug: True
DEBUG target_ip: 119.28.156.59
DEBUG
SELECT
cidr_addr, country_code
FROM
mainte2.match_network
WHERE
inet '119.28.156.59' << cidr_addr
DEBUG row: None
DEBUG like_ip: 119.28.156.%
INFO 119.28.156.% is no match.
DEBUG like_ip: 119.28.%
DEBUG rows.size: 1
DEBUG ('119.28.0.0', 131072, 'CN')
INFO match_first: 119.28.0.0, match_last: 119.28.0.0
DEBUG Range in (119.28.0.0 < 119.28.156.59 < 119.29.255.255), break
DEBUG [(IPv4Network('119.28.0.0/15'), 'CN')]
INFO Find 119.28.156.59 in RIR_ipv4_allocated("119.28.0.0/15", country_code: "CN")
DEBUG network_addr: 119.28.0.0/15
DEBUG
SELECT id FROM mainte2.match_network WHERE cidr_addr = '119.28.0.0/15'
DEBUG row : None
DEBUG params: {'cidr_addr': '119.28.0.0/15', 'country_code': 'CN'}
DEBUG
INSERT INTO mainte2.match_network(cidr_addr, country_code)
VALUES ('119.28.0.0/15', 'CN') RETURNING id,cidr_addr
DEBUG (5, '119.28.0.0/15')
INFO Inserted match_network(id=5, cidr_addr="119.28.0.0/15")
(3) ネットワークアドレス不明
(py_psycopg2) $ python TestDetectCountryCode_in_match_network.py \
> --target-ip 103.253.175.77 --enable-debug
DEBUG target_ip: 103.253.175.77
DEBUG
SELECT
cidr_addr, country_code
FROM
mainte2.match_network
WHERE
inet '103.253.175.77' << cidr_addr
DEBUG row: None
DEBUG like_ip: 103.253.175.%
INFO 103.253.175.% is no match.
DEBUG like_ip: 103.253.%
DEBUG rows.size: 52
DEBUG ('103.253.0.0', 1024, 'ID')
DEBUG ('103.253.4.0', 1024, 'CN')
DEBUG ('103.253.8.0', 1024, 'HK')
DEBUG ('103.253.12.0', 1024, 'MY')
DEBUG ('103.253.16.0', 512, 'VN')
DEBUG ('103.253.18.0', 512, 'PK')
DEBUG ('103.253.20.0', 512, 'VN')
DEBUG ('103.253.22.0', 512, 'VN')
DEBUG ('103.253.24.0', 1024, 'SG')
DEBUG ('103.253.32.0', 1024, 'IN')
DEBUG ('103.253.36.0', 512, 'LT')
DEBUG ('103.253.40.0', 1024, 'HK')
DEBUG ('103.253.44.0', 1024, 'BD')
DEBUG ('103.253.48.0', 1024, 'NZ')
DEBUG ('103.253.60.0', 1024, 'CN')
DEBUG ('103.253.64.0', 1024, 'AU')
DEBUG ('103.253.71.0', 256, 'IN')
DEBUG ('103.253.72.0', 1024, 'TH')
DEBUG ('103.253.76.0', 1024, 'JP')
DEBUG ('103.253.84.0', 512, 'JP')
DEBUG ('103.253.92.0', 1024, 'AU')
DEBUG ('103.253.96.0', 1024, 'CA')
DEBUG ('103.253.100.0', 512, 'MY')
DEBUG ('103.253.102.0', 512, 'BD')
DEBUG ('103.253.106.0', 512, 'ID')
DEBUG ('103.253.108.0', 1024, 'MY')
DEBUG ('103.253.128.0', 1024, 'IN')
DEBUG ('103.253.132.0', 1024, 'TH')
DEBUG ('103.253.136.0', 1024, 'NZ')
DEBUG ('103.253.140.0', 1024, 'ES')
DEBUG ('103.253.144.0', 1024, 'SG')
DEBUG ('103.253.156.0', 1024, 'PK')
DEBUG ('103.253.160.0', 1024, 'BD')
DEBUG ('103.253.164.0', 512, 'ID')
DEBUG ('103.253.168.0', 1024, 'IN')
DEBUG ('103.253.176.0', 512, 'BD')
DEBUG ('103.253.180.0', 1024, 'KH')
DEBUG ('103.253.188.0', 1024, 'JP')
DEBUG ('103.253.192.0', 256, 'NZ')
DEBUG ('103.253.193.0', 256, 'AU')
DEBUG ('103.253.194.0', 512, 'NZ')
DEBUG ('103.253.200.0', 1024, 'IN')
DEBUG ('103.253.204.0', 1024, 'CN')
DEBUG ('103.253.208.0', 1024, 'IN')
DEBUG ('103.253.216.0', 1024, 'JP')
DEBUG ('103.253.220.0', 1024, 'CN')
DEBUG ('103.253.224.0', 1024, 'CN')
DEBUG ('103.253.228.0', 256, 'TH')
DEBUG ('103.253.232.0', 1024, 'CN')
DEBUG ('103.253.236.0', 1024, 'HK')
DEBUG ('103.253.240.0', 1024, 'KR')
DEBUG ('103.253.248.0', 1024, 'HK')
INFO match_first: 103.253.0.0, match_last: 103.253.248.0
DEBUG Range in (103.253.0.0 < 103.253.175.77 < 103.253.251.255), break
DEBUG (103.253.3.255 < 103.253.175.77) -> continue
DEBUG (103.253.7.255 < 103.253.175.77) -> continue
DEBUG (103.253.11.255 < 103.253.175.77) -> continue
DEBUG (103.253.15.255 < 103.253.175.77) -> continue
DEBUG (103.253.17.255 < 103.253.175.77) -> continue
DEBUG (103.253.19.255 < 103.253.175.77) -> continue
DEBUG (103.253.21.255 < 103.253.175.77) -> continue
DEBUG (103.253.23.255 < 103.253.175.77) -> continue
DEBUG (103.253.27.255 < 103.253.175.77) -> continue
DEBUG (103.253.35.255 < 103.253.175.77) -> continue
DEBUG (103.253.37.255 < 103.253.175.77) -> continue
DEBUG (103.253.43.255 < 103.253.175.77) -> continue
DEBUG (103.253.47.255 < 103.253.175.77) -> continue
DEBUG (103.253.51.255 < 103.253.175.77) -> continue
DEBUG (103.253.63.255 < 103.253.175.77) -> continue
DEBUG (103.253.67.255 < 103.253.175.77) -> continue
DEBUG (103.253.71.255 < 103.253.175.77) -> continue
DEBUG (103.253.75.255 < 103.253.175.77) -> continue
DEBUG (103.253.79.255 < 103.253.175.77) -> continue
DEBUG (103.253.85.255 < 103.253.175.77) -> continue
DEBUG (103.253.95.255 < 103.253.175.77) -> continue
DEBUG (103.253.99.255 < 103.253.175.77) -> continue
DEBUG (103.253.101.255 < 103.253.175.77) -> continue
DEBUG (103.253.103.255 < 103.253.175.77) -> continue
DEBUG (103.253.107.255 < 103.253.175.77) -> continue
DEBUG (103.253.111.255 < 103.253.175.77) -> continue
DEBUG (103.253.131.255 < 103.253.175.77) -> continue
DEBUG (103.253.135.255 < 103.253.175.77) -> continue
DEBUG (103.253.139.255 < 103.253.175.77) -> continue
DEBUG (103.253.143.255 < 103.253.175.77) -> continue
DEBUG (103.253.147.255 < 103.253.175.77) -> continue
DEBUG (103.253.159.255 < 103.253.175.77) -> continue
DEBUG (103.253.163.255 < 103.253.175.77) -> continue
DEBUG (103.253.165.255 < 103.253.175.77) -> continue
DEBUG (103.253.171.255 < 103.253.175.77) -> continue
DEBUG 103.253.175.77 < 103.253.176.0 break. No more match.
INFO 103.253.175.77 is not match in tables.
6. 最後に
この記事の先頭で紹介した投稿記事では「RIR ipv4 割当済みテーブル」にIPアドレスを文字列で格納していました。
今となってわかったのですがネットワークアドレス (CIDR形式) がわかれば CIDR型で登録したほうが検索ロジックは非常に簡単になることがわかりました。
「RIR ipv4 割当済みテーブル」にインポートしたCSVファイルは以下のようになっていました。
"ip_start","ip_count","country_code","allocated_date","registry_id"
"1.0.1.0",256,"CN","20110414",1
"1.0.2.0",512,"CN","20110414",1
"1.0.4.0",1024,"AU","20110412",1
"1.0.8.0",2048,"CN","20110412",1
"1.0.16.0",4096,"JP","20110412",1
...以下省略...
"ip_start" と "ip_count" 列の値から、下記 "network_addr" 列のように cidr 形式に変換することでCDIR型でのインポートが可能になります。
"network_addr","country_code","allocated_date","registry_id"
"1.0.1.0/24","CN","20110414",1
"1.0.2.0/23","CN","20110414",1
"1.0.4.0/22","AU","20110412",1
"1.0.8.0/21","CN","20110412",1
"1.0.16.0/20","JP","20110412",1
...以下省略...
但し、検索処理としては簡単なるもののRIRデータは日々更新されているためテーブルのメンテナンスの難易度が上がってしまう可能性があるのが玉に瑕。
上記のようなCSVファイルに変換する方法については次回以降に投稿する予定です。