12
19

不正アクセスしてきたホストの国コードを知ってセキュリティ対策に活用する

Last updated at Posted at 2024-08-29

現在 Ubuntu server (24.04) で自宅サーバーを稼働させていますが、日々悩まされているのがSSHで不正ログインしてくるホストの増加です。一般的なセキュリィティ対策は行っていますがサーバーとして使っているPC(ノートパソコン)が10年以上前の古い機種なので日々のセキュリティ対応が悩みの種になっています。

こちらがその不正アクセスしてきたホストに関する情報(抜粋)です。

2024-06-12T20:59:45+09:00 example.host sshd[102647]: pam_unix(sshd:auth): authentication failure; logname= uid=0 euid=0 tty=ssh ruser= rhost=103.77.241.34  user=root
2024-06-12T21:00:12+09:00 example.host sshd[102649]: pam_unix(sshd:auth): authentication failure; logname= uid=0 euid=0 tty=ssh ruser= rhost=103.77.241.34  user=root

以下は上記のログを1日当たり30回数以上不正アクセスしたホストのアクセス回数を降順に集計したものになります。

ssh_auth_error_2024-06-14.csv
"log_date","ip_addr","appear_count"
"2024-06-14","112.65.98.231",5992
"2024-06-14","103.77.241.34",3382
"2024-06-14","77.93.248.247",467
"2024-06-14","64.227.161.202",412
...一部省略...
"2024-06-14","36.133.106.218",34
"2024-06-14","23.146.184.79",33

さらに4日以上連続して不正アクセスしてきたホストのアクセス回数を集計したものも以下に示します。

2024-06-12 | 103.77.241.34 |          392
2024-06-13 | 103.77.241.34 |         3269
2024-06-14 | 103.77.241.34 |         3382
2024-06-15 | 103.77.241.34 |         3345
2024-06-16 | 103.77.241.34 |          959

この記事で紹介するPythonスクリプトは、地域インターネットレジストリ(RIR)で公開されているデータの内から割り当て済みIPv4アドレスを元に不正アクセスしてきたホストが属するネットワークアドレス(CIDR表記)と国コードを取得するもので、パケットフィルタリングの参考にすることを想定しています。

割り当て済みIPv4アドレスのデータはデータベースに登録することを前提にしているのて、公開されているデータからインポート用CSVファイルの作成方法についても解説します。

この記事で紹介するpythonスクリプトの出力結果を下記に示します。

(py_psycopg2) $ python TestDetectCountryCode.py --target-ip 103.77.241.34
INFO Find 103.77.241.34 in (network: "103.77.240.0/23", country_code: "VN")

不正アクセスしてきたホストをひとつひとつをパケットフィルタリングの対象にするのではなく、ホストの属するネットワークをひとまとめにしてパケットフィルタリングの対象することで作業を効率することも可能になります。

$ sudo firewall-cmd --zone=drop --add-source=103.77.240.0/23

1. 環境

1-1. 自宅サーバー

  • OS: Ubuntu server 24.04
    • メールサーバー: Postfix
    • リバースプロキシサーバー

1-2. スクリプト実行環境

  • OS: Ubuntu Desktop 22.04
    • データベース
      Dockerコンテナ内で稼働する PostgreSQL-16
    • python 3.10.12
      スクリプト専用の仮想環境を作成し下記ライブラリをインストール
      pip install psycopg2-binary

2. 参考サイト

3.RIRデータの取得と加工

5地域のRIRデータの最新版のダウンロードリンクは以下の通りです。
※拡張子"txt"をつけて保存

※1 ファイル名に "-extended-" が含まれるデータはカラムに拡張列があるデータです
※2 APNIC, AFRINIC, LACNIC にも "-extended-" が含まれるファイルがあります

3-1.各レジストリのデータ

下記は、ダウンロードしたデータからIPアドレスがIpv4で割り当て済みデータ (allocated) を抽出した行の抜粋です。

(1) APNIC: Asia Pacific

delegated-apnic-latest.txt
apnic|CN|ipv4|1.0.1.0|256|20110414|allocated
apnic|CN|ipv4|1.0.2.0|512|20110414|allocated

(2) AFRINIC: Africa

delegated-afrinic-latest.txt
afrinic|ZA|ipv4|41.0.0.0|2097152|20071126|allocated
afrinic|EG|ipv4|41.32.0.0|1048576|20091105|allocated

(3) ARIN: Canada, many Caribbean and North Atlantic islands, and the United States

delegated-arin-extended-latest.txt
arin|US|ipv4|3.0.0.0|8388608|20171220|allocated|4a8a91b5b89d3f900098ebf73ca0b118
arin|US|ipv4|3.128.0.0|8388608|20180625|allocated|4a8a91b5b89d3f900098ebf73ca0b118

(4) LACNIC: Latin America and the Caribbean

delegated-lacnic-latest.txt
lacnic|BR|ipv4|24.152.0.0|1024|20200310|allocated
lacnic|BR|ipv4|24.152.4.0|1024|20200312|allocated

(5) RIPE NCC: Europe, the Middle East, and parts of Central Asia

delegated-ripencc-extended-latest.txt
ripencc|PS|ipv4|1.178.112.0|4096|20071126|allocated|23a7b676-99bb-4c16-85d6-3e8246f0b2a7
ripencc|PS|ipv4|1.178.128.0|4096|20071126|allocated|23a7b676-99bb-4c16-85d6-3e8246f0b2a7

3-2. 割当済みIPv4アドレスデータ抽出 (前処理)

対象となる行にマッチする正規表現

[1列目] [a-z]+    ※apnic,afrinic,arin,lacnic,ripencc
[2列目] [A-Z]{2}  ※国コード(大文字2文字)
[3列目] ipv4
[4列目] [0-9]{1-3}\.[0-9]{1-3}\.[0-9]{1-3}\.[0-9]{1-3}  ※IPv4アドレス
[5列目] [1-9][0-9]+    ※IP個数は整数(先頭は0以外)
[6列目] [1|2][0-9]{7}  ※19xxMMDD | 20xxMMDD
[7列目] allocated
[8列目] *.*$ ※8列目があるのはファイル名に '-extended-'が含まれるファイルのみ

grepコマンドで正規表現にマッチする行を抽出し、縦棒区切り("|") を TAB区切りに変換して別名でファイル保存します。
※TAB区切りに変換するのはテーブルインポート用CSV作成処理を容易にするため。

下記に APNICからダウンロードしたファイルの前処理例を示します。

$ grep -E "^[a-z]+\|[A-Z]{2}\|ipv4\|[0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3}\|[1-9][0-9]+\|[1|2][0-9]{7}\|allocated\|*.*$" \
> delegated-apnic-latest | tr '|' '\t' > ipv4-allocated-apnic.txt

前処理したデータの抜粋を下記に示します。

$ head -n3 ipv4-allocated-apnic.txt 
apnic	CN	ipv4	1.0.1.0	256	20110414	allocated
apnic	CN	ipv4	1.0.2.0	512	20110414	allocated
apnic	AU	ipv4	1.0.4.0	1024	20110412	allocated

他の地域のデータも同様に前処理してそれぞれ別名でファイル保存します。

3-3. RIRデータのテーブル定義

スクリプトによるバッチ処理に使用するため各レジストリのデータをマージしたCSVフアイルをテーブルにインポートします。
※1 RIRデータは20万件以上になるため各レジストリIDを割り振ったマスターテーブルを作成します
※2 テーブル生成後に一括でデータをインポートするため制約を分離して定義

rir_ipv4_table.sql
-- データベース: qiita-exampledb
-- 古いテーブルを削除
DROP TABLE IF EXISTS mainte2.RIR_ipv4_allocated CASCADE;
DROP TABLE IF EXISTS mainte2.RIR_registory_mst;

-- レジストリ名テーブル
-- name: {afrinic,apnic,arin,iana,lacnic,ripencc}
-- https://www.apnic.net/about-apnic/corporate-documents/documents/
--     resource-guidelines/rir-statistics-exchange-format/
CREATE TABLE mainte2.RIR_registory_mst(
   id SMALLINT PRIMARY KEY,
   name VARCHAR(8) NOT NULL
);

INSERT INTO mainte2.RIR_registory_mst(id, name) VALUES 
   (1,'apnic')
  ,(2,'afrinic')
  ,(3,'arin')
  ,(4,'lacnic')
  ,(5,'ripencc')
  ,(6,'iana');

-- APNICで公開している各国に割り当てているIPアドレス情報からipv4アドレスのみを抽出したマスタテーブル
CREATE TABLE mainte2.RIR_ipv4_allocated(
   ip_start VARCHAR(15) NOT NULL,
   ip_count INTEGER NOT NULL,
   country_code CHAR(2) NOT NULL,
   allocated_date DATE NOT NULL,
   registry_id SMALLINT NOT NULL
);

ALTER TABLE mainte2.RIR_ipv4_allocated ADD CONSTRAINT pk_RIR_ipv4_allocated
  PRIMARY KEY (ip_start);
ALTER TABLE mainte2.RIR_ipv4_allocated ADD CONSTRAINT fk_RIR_ipv4_allocated_registry
  FOREIGN KEY (registry_id) REFERENCES mainte2.RIR_registory_mst (id);

ALTER TABLE mainte2.RIR_registory_mst OWNER TO developer;
ALTER TABLE mainte2.RIR_ipv4_allocated OWNER TO developer;

3-4. テーブルインポート用CSVファイルの作成

CSVファイルのカラム定義は以下の通りです。

[1列目] 開始IPアドレス  ※ "ip_start"
[2列目] IPの個数       ※ "ip_count"
[3列目] 国コード       ※ "country_code"
[4列目] 割り当て日付    ※ "allocated_date"
[5列目] レジストリID   ※ "registry_id"

(1) awkコマンドを使い前処理したファイルからインポート用のCSVファイルを生成します

$ awk '{ printf("\"%s\",%d,\"%s\",\"%s\",1\n", $4,$5,$2,$6) }' ipv4-allocated-apnic.txt > ipv4-1-apnic.csv

生成したファイルの抜粋を以下に示します。

ipv4-1-apnic.csv
"1.0.1.0",256,"CN","20110414",1
"1.0.2.0",512,"CN","20110414",1
"1.0.4.0",1024,"AU","20110412",1

(2) 残りのRIRデータも末尾のレジストリID番号を固定(2〜5)にしてCSVファイルを生成します

$ awk '{ printf("\"%s\",%d,\"%s\",\"%s\",2\n", $4,$5,$2,$6) }' ipv4-allocated-afrinic.txt > ipv4-2-afrinic.csv
$ awk '{ printf("\"%s\",%d,\"%s\",\"%s\",3\n", $4,$5,$2,$6) }' ipv4-allocated-arin.txt > ipv4-3-arin.csv
$ awk '{ printf("\"%s\",%d,\"%s\",\"%s\",4\n", $4,$5,$2,$6) }' ipv4-allocated-lacnic.txt > ipv4-4-lacnic.csv
$ awk '{ printf("\"%s\",%d,\"%s\",\"%s\",5\n", $4,$5,$2,$6) }' ipv4-allocated-ripencc.txt > ipv4-5-ripencc.csv

(3) ヘッダーをCSVファイルに出力

$ echo "\"ip_start\",\"ip_count\",\"country_code\",\"allocated_date\",\"registry_id\"" > ipv4-all.csv

(4) 各RIRデータをマージしますCSVファイルに追記

# CSVファイルに各RIRデータを追記する
$ cat ipv4-1-apnic.csv ipv4-2-afrinic.csv ipv4-3-arin.csv \
> ipv4-4-lacnic.csv ipv4-5-ripencc.csv >>ipv4-all.csv
# 行数確認 (ヘッダー行含む)
$ cat ipv4-all.csv | wc -l
201556

5地域分をマージしたCSVファイル (ヘッダー付き) の抜粋を以下に示します。

ipv4-all.csv
"ip_start","ip_count","country_code","allocated_date","registry_id"
"1.0.1.0",256,"CN","20110414",1
"1.0.2.0",512,"CN","20110414",1
...一部省略...
"223.255.248.0",1024,"HK","20151105",1
"223.255.252.0",512,"CN","20110414",1
"41.0.0.0",2097152,"ZA","20071126",2
"41.32.0.0",1048576,"EG","20091105",2
...一部省略...
"217.170.144.0",4096,"ML","20040817",2
"217.199.144.0",4096,"KE","20030725",2
"2.57.164.0",1024,"US","20230828",3
"3.0.0.0",8388608,"US","20171220",3
...一部省略...
"223.165.96.0",4096,"US","20200707",3
"223.165.112.0",4096,"US","20200817",3
"5.183.80.0",1024,"DO","20240711",4
"24.152.0.0",1024,"BR","20200310",4
...一部省略...
"216.245.133.0",256,"BR","20230802",4
"223.27.115.0",256,"CO","20230213",4
"1.178.112.0",4096,"PS","20071126",5
"1.178.128.0",4096,"PS","20071126",5
...一部省略...
"223.27.112.0",512,"NL","20100824",5
"223.27.114.0",256,"NL","20100824",5

3-5. テーブル作成とCSVインポート

dockerコンテナの作成等については Qiita投稿サイトの下記コンテンツをご覧ください。
Qiita@pipito-yukio「psycopg2 バッチ処理に適したクエリーを作成する」

実行中の dockerコンテナ環境に入ります。

$ docker exec -it postgres-qiita bin/bash
3-5-1. テーブル作成

psqlコマンドでテーブル作成SQLを実行します。

※パスはdockerコンテナの環境依存のため省略しています。

34ecfe33bb94:# psql -Udeveloper qiita_exampledb < rir_ipv4_table.sql
DROP TABLE
DROP TABLE
CREATE TABLE
INSERT 0 6
CREATE TABLE
ALTER TABLE
ALTER TABLE
ALTER TABLE
ALTER TABLE
3-5-2. CSVファイルのインポート

(1) dockerコンテナ内で実行するシェルスクリプト作成

  • インポート手順
    • 主キー制約の解除
    • メタコマンド\copy でCSVファイルを一括インポート
    • 主キー制約を元に戻す

メタコマンド\copy を使ったインポート方法については下記サイトの内容を参考にしました
(stackoverflow) Script to automat import of CSV into PostgreSQL

import_from_allocated_ipv4_csv.sh
#!/bin/bash

# Qiita投稿用データベース: qiita_exampledb
# ./import_from_allocated_ipv4_csv.sh [CSVファイル名]

# PK制約をドロップ
psql -Udeveloper -d qiita_exampledb -c "ALTER TABLE mainte2.RIR_ipv4_allocated DROP CONSTRAINT pk_RIR_ipv4_allocated;"

sleep 1

# データインポート: 呼び出し元でCSVファイル名を指定します。
# qiita_exampledbのcsvを参照する ※ユーザーホーム: qiita
psql -Udeveloper -d qiita_exampledb -c "\copy mainte2.RIR_ipv4_allocated FROM '/your_home/csv/${1}' DELIMITER ',' CSV HEADER;"

sleep 2

# PK制約を戻す
psql -Udeveloper -d qiita_exampledb -c "ALTER TABLE mainte2.RIR_ipv4_allocated ADD CONSTRAINT pk_RIR_ipv4_allocated PRIMARY KEY (ip_start);"

(2) シェルスクリプトを実行してCSVファイルを一括インポート

34ecfe33bb94:# ./import_from_allocated_ipv4_csv.sh ipv4-all.csv
ALTER TABLE
COPY 201555
ALTER TABLE

(3) インポート結果の確認
インポート件数、IP数の最小値と最大値の確認


34ecfe33bb94:/# echo "SELECT COUNT(*), MIN(ip_count), MAX(ip_count) FROM mainte2.RIR_ipv4_allocated;" \
> | psql -Udeveloper qiita_exampledb
 count  | min |   max    
--------+-----+----------
 201555 |  64 | 16777216
(1 row)

以上で RIRデータの準備が整いました。

4. pythonスクリプト

4-1. PostgreSQLデータベース接続

(1) 接続設定ファイル

conf/db_conn.json
{
  "host": "{hostname}",
  "port": "5432",
  "database": "qiita_exampledb",
  "user": "developer",
  "password": "developerpassword"
}

(2) pythonモジュール

db/pgdatabase.py
import json
import logging
import socket
from typing import Optional
import psycopg2
from psycopg2.extensions import connection

"""
PostgreSQL Database接続生成クラス
"""


class PgDatabase(object):
    def __init__(self, configfile,
                 hostname: Optional[str] = None,
                 logger: Optional[logging.Logger] = None):
        self.logger = logger
        with open(configfile, 'r') as fp:
            db_conf = json.load(fp)
            if hostname is None:
                hostname = socket.gethostname()
            db_conf["host"] = db_conf["host"].format(hostname=hostname)
        # default connection is itarable curosr
        self.conn = psycopg2.connect(**db_conf)
        if self.logger is not None:
            self.logger.debug(self.conn)

    def get_connection(self) -> connection:
        return self.conn

    def rollback(self) -> None:
        if self.conn is not None:
            self.conn.rollback()

    def commit(self) -> None:
        if self.conn is not None:
            self.conn.commit()

    def close(self) -> None:
        if self.conn is not None:
            if self.logger is not None:
                self.logger.debug(f"Close {self.conn}")
            self.conn.close()

4-2. 国コード取得スクリプト

コマンドラインから指定したIPアドレスの国コードを取得するスクリプトを作成します。

4-2-1. インポート、データクラス定義

(1) インポート

TestDetectCountryCode.py
import argparse
import logging
import os
from dataclasses import dataclass, asdict
from ipaddress import (
    ip_address, summarize_address_range,
    IPv4Network, IPv4Address
)
import typing
from typing import List, Iterator, Optional, Tuple

import psycopg2
from psycopg2.extensions import connection, cursor

from db import pgdatabase

(2) RIRレコードを格納するデータクラス

@dataclass(frozen=True)
class RirRecord:
    ip_start: str
    ip_count: int
    country_code: str
4-2-2. IPアドレスのネットワークと国コード取得処理

(1) 開始IPアドレスとIP個数から国コード判定用のCIDRリストを生成する関数

開始IPアドレスにIPの個数を加算して計算したブロードキャストアドレスからネットワークアドレス(CIDR表記)と国コード (タプル) のリストを取得する

@typing.no_type_check
def get_cidr_cc_list(ip_start: str,
                     ip_count: int,
                     country_code: str) -> List[Tuple[IPv4Network, str]]:
    addr_first: IPv4Address = ip_address(ip_start)
    # Broadcast address
    addr_last: IPv4Address = addr_first + ip_count - 1
    cidr_ite: Iterator[IPv4Network] = summarize_address_range(addr_first, addr_last)
    return [(cidr, country_code) for cidr in cidr_ite]

上記関数は下記の処理を関数としてまとめたものになります。

(py_psycopg2) $ python
Python 3.10.12 (main, Jul 29 2024, 16:56:48) [GCC 11.4.0] on linux
Type "help", "copyright", "credits" or "license" for more information.
>>> from ipaddress import ip_address, ip_network, summarize_address_range
>>> ip_start = ip_address('103.77.240.0')
>>> ip_start
IPv4Address('103.77.240.0')
>>> ip_count = 512
>>> ip_bcast = ip_start + ip_count - 1
>>> ip_bcast
IPv4Address('103.77.241.255')
>>> cidr_ite = summarize_address_range(ip_start, ip_bcast)
>>> cc = 'VN'
>>> cidr_ite = summarize_address_range(ip_start, ip_bcast)
>>> cidr_data = [(cidr, cc) for cidr in cidr_ite]
>>> cidr_data
[(IPv4Network('103.77.240.0/23'), 'VN')]

(2) IPアドレスの属するネットワークアドレスと国コードを取得する関数

ターゲットIPがCIDRリストのネットワークに含まれる場合、ネットワークアドレス(CIDR表記)と国コードを返却する。

@typing.no_type_check
def detect_cc_in_cidr_cc_list(
        target_ip: str,
        cidr_cc_list: List[Tuple[IPv4Network, str]]
        ) -> Tuple[Optional[str], Optional[str]]:
    target_ip_addr: IPv4Address = ip_address(target_ip)
    match_network: Optional[str] = None
    match_cc: Optional[str] = None
    for cidr_cc in cidr_cc_list:
        if target_ip_addr in cidr_cc[0]:
            match_network = str(cidr_cc[0])
            match_cc = cidr_cc[1]
            break

    return match_network, match_cc

ターゲットIPを含むネットワークがCIDRリストに存在した場合、上記関数が返却する値(タブル)は以下のようになります。

>>> target_ip = ip_address("103.77.241.34")
>>> target_ip
IPv4Address('103.77.241.34')
>>> detect = [(cidr[0], cidr[1]) for cidr in cidr_data if target_ip in cidr[0]]
>>> detect
[(IPv4Network('103.77.240.0/23'), 'VN')]

上記(3)と(4)のpythonによる実装方法については参考サイト[1]に詳しく解説されていますのでそちらをご覧ください。

4-2-3. RIRテーブル検索関数

引数の検索用文字列にLIKEバターンマッチングで一致するレコードを取得する

一致するレコードが存在する場合はタプル("ip_start",ip_count,"country_ocde") のリストを返却し、存在しない場合は空のリストを返却する。
※レコードはIPアドレス(前ゼロ埋め処理済み)の昇順でソート済み

def get_like_ip_start_matches(
        conn: connection,
        like_ip: str,
        logger: Optional[logging.Logger] = None) -> List[Tuple[str, int, str]]:
    result: List[Tuple[str, int, str]]
    try:
        cur: cursor
        # ゼロ埋めしたIPアドレスの昇順にソートする
        with conn.cursor() as cur:
            cur.execute("""
SELECT
   ip_start,ip_count,country_code
FROM
   mainte2.RIR_ipv4_allocated
WHERE
   ip_start LIKE %(partial_match)s
ORDER BY
 LPAD(SPLIT_PART(ip_start,'.',1), 3, '0') || '.' ||
 LPAD(SPLIT_PART(ip_start,'.',2), 3, '0') || '.' ||
 LPAD(SPLIT_PART(ip_start,'.',3), 3, '0') || '.' ||
 LPAD(SPLIT_PART(ip_start,'.',4), 3, '0')""",
                        ({'partial_match': like_ip}))
            # レコード取得件数チェック
            if cur.rowcount > 0:
                rows: List[Tuple[str, int, str]] = cur.fetchall()
                if logger is not None:
                    logger.debug(f"rows.size: {len(rows)}")
                    for row in rows:
                        logger.debug(f"{row}")
                result = rows
            else:
                # マッチしなかったら空のリスト
                result = []
        return result
    except (Exception, psycopg2.DatabaseError) as err:
        raise err

IPアドレスを昇順にソートするための前ゼロ加工イメージは下記図の通りです。
※1 PostgreSQL専用。
※2 ORDER BY 句のSQLは「5 参考図書」が出典です。

RIR_0_record_ip_full_0_padding.png

前ゼロ加工済みの開始IPアドレスの昇順でソートすることにより条件を満たした時点で早期に処理を終了することが可能になります。

参考までに psqlを使って上記関数の検索結果を確認できるシェルスクリプトを下記に示します。

find_target_ip_like_param.sh
#!/bin/bash

# ./find_target_ip_like_param.sh ip_like param
# like-param: (example) 83.222.%

cat<<-EOF | psql -Udeveloper qiita_exmapledb
SELECT
   ip_start,ip_count,country_code
FROM
   mainte2.RIR_ipv4_allocated
WHERE
   ip_start LIKE '${1}'
ORDER BY
 LPAD(SPLIT_PART(ip_start,'.',1), 3, '0') || '.' ||
 LPAD(SPLIT_PART(ip_start,'.',2), 3, '0') || '.' ||
 LPAD(SPLIT_PART(ip_start,'.',3), 3, '0') || '.' ||
 LPAD(SPLIT_PART(ip_start,'.',4), 3, '0');
EOF

PostgreSQLのdockerコンテナ内でスクリプトを実行した結果を下記に示します。

34ecfe33bb94:# ./find_target_ip_like_param.sh 103.77.24%
   ip_start   | ip_count | country_code 
--------------+----------+--------------
 103.77.24.0  |      512 | PG
 103.77.240.0 |      512 | VN
 103.77.242.0 |      512 | VN
 103.77.244.0 |      512 | VN
 103.77.246.0 |      512 | VN
(5 rows)
4-2-4. RIRレコード検索メイン関数

引数のIPアドレスを元にLIKEパターンマッチング用の文字列を生成し(最大3回)、該当するRIRレコードを取得

  • LIKEパターンマッチ用文字列生成関数 (make_like_ip)
    (例) IPアドレス="103.77.241.34" の場合に生成する文字列は以下の通り
(1) "103.77.241.%"   ▲検索結果無し
(2) "103.77.%"       ●検索結果有り: 57 件
 ※IPアドレスが属するネットワークの開始アドレスがここで見つかるとは限らない。
  (例) IPアドレスが属する開始アドレスが "103.76.xxx.0" の場合、次の(3)での検索が必要
(3) "103.%"          ●検索結果有り: 15,544 件

[処理手順]

  • LIKEパターンマッチ用文字列生成関数が None を返却するまで処理を繰り返す
    前回生成した文字列が先頭列のみになった場合は None を返却
    • RIRテーブル検索関数からレコード取得
      • レコードが存在する場合
        先頭レコードの開始IPアドレス first_ip_addr
        最終レコードの開始IPアドレスから計算したブロードキャストアドレス broadcast_addr
        • (条件) first_ip_addr < target_ip_addr < broadcast_addr
          • 条件を満たした場合
            RIRテーブル検索処理終了する
          • 条件を満たさない場合
            新たなLike文字列を生成してRIRテーブル検索関数に戻る
      • レコードが存在しない場合
        新たなLike文字列を生成してRIRテーブル検索関数に戻る
  • 検索レコードを返却
    ※レコードが存在しない場合は空のリスト
def get_matches_main(
        conn: connection,
        target_ip: str,
        logger: Optional[logging.Logger] = None) -> Optional[List[Tuple[str, int, str]]]:
    def make_like_ip(like_old: str) -> Optional[str]:
        # 末尾の likeプレースホルダを削除する
        raw_ip: str = like_old.replace(".%", "")
        fields: List[str] = raw_ip.split(".")
        # コンマで区切って残りが1つなら終了
        field_size: int = len(fields)
        if field_size == 1:
            return None

        # フィールドを1つ減らす
        del fields[field_size - 1]
        # 末尾にlikeプレースホルダ(".%")を付加して終了
        return ".".join(fields) + ".%"

    target_ip_addr: IPv4Address = ip_address(target_ip)  # type: ignore
    like_ip: Optional[str] = make_like_ip(target_ip)
    matches: Optional[List[Tuple[str, int, str]]] = None
    while like_ip is not None:
        matches = get_rir_table_matches(conn, like_ip, logger=logger)
        if len(matches) > 0:
            # 先頭レコードの開始IPアドレス
            first_ip: str = matches[0][0]
            first_ip_addr: IPv4Address = ip_address(first_ip)  # type: ignore
            # 最終レコードの開始IPアドレス
            last: Tuple[str, int, str] = matches[-1]
            last_ip: str = last[0]
            ip_cnt: int = int(last[1])
            last_ip_addr: IPv4Address = ip_address(last_ip)  # type: ignore
            # 最終レコードのブロードキャストアドレス計算
            broadcast_addr: IPv4Address = last_ip_addr + ip_cnt - 1  # type: ignore
            if logger is not None:
                logger.info(f"match_first: {first_ip}, match_last: {last_ip}")

            if first_ip_addr < target_ip_addr < broadcast_addr:
                # ターゲットIPが先頭レコードの開始IPと最終レコードのブロードキャストの範囲内なら終了
                if logger is not None:
                    logger.debug(
                        f"Range in ({first_ip} < {target_ip} < {str(broadcast_addr)})"
                        f", break"
                    )
                break
            else:
                # 範囲外: 次のlike検索文字列を生成して検索処理に戻る
                like_ip = make_like_ip(like_ip)
                if logger is not None:
                    logger.info(f"next {like_ip} continue.")
        else:
            # レコード無し: 次のlike検索文字列を生成して検索処理に戻る
            if logger is not None:
                logger.info(f"{like_ip} is no match.")
            like_ip = make_like_ip(like_ip)
    return matches
4-2-5. ネットワークアドレスと国コード取得処理

RIRレコード検索メイン処理から取得したRIRレコードリストを元にネットワークアドレスと国コードを取得する処理の判定条件 ①〜③

① 開始IPアドレス > ターゲットIP
これ以降ターゲットIPを含むレコードが存在しないので処理終了
※ターゲットIPを含むネットワークアドレス無し
② ブロードキャストアドレス (開始IPアドレス + IP個数 -1) < ターゲットIP
次のレコードへ
③ 開始IPアドレス < ターゲットIP < ブロードキャストアドレス
ターゲットIPが属するネットワークと国コードを取得して処理終了
def detect_cc_in_matches(
        target_ip: str,
        matches: List[Tuple[str, int, str]],
        logger: Optional[logging.Logger] = None) -> Tuple[Optional[str], Optional[str]]:
    def next_record(rows: List[Tuple[str, int, str]]) -> Iterator[RirRecord]:
        for (ip_sta, ip_cnt, cc) in rows:
            yield RirRecord(ip_start=ip_sta, ip_count=ip_cnt, country_code=cc)

    target_ip_addr: IPv4Address = ip_address(target_ip)  # type: ignore
    match_network: Optional[str] = None
    match_cc: Optional[str] = None
    rec: RirRecord
    for rec in next_record(matches):
        # ターゲットIP が 開始IPアドレスより大きい場合は範囲外のため処理終了
        if ip_address(rec.ip_start) > target_ip_addr:  # type: ignore
            if logger is not None:
                logger.debug(
                    f"{target_ip} < {rec.ip_start} break. No more match."
                )
            # マッチするデータなし
            break

        # 開始IPのブロードキャストアドレスがターゲットIP未満の場合は次のレコードへ
        broadcast_addr: IPv4Address = (
                ip_address(rec.ip_start) + rec.ip_count - 1)  # type: ignore
        if broadcast_addr < target_ip_addr:
            if logger is not None:
                logger.debug(f"({str(broadcast_addr)} < {target_ip}) -> continue")
            continue

        cidr_cc_list: List[Tuple[IPv4Network, str]] = get_cidr_cc_list(
            **asdict(rec)
        )
        if logger is not None:
            logger.debug(cidr_cc_list)
        match_network, match_cc = detect_cc_in_cidr_cc_list(target_ip, cidr_cc_list)
        break

    return match_network, match_cc

上記 ③の条件を満たすデータが存在するRIRレコードリストの例を下記に示します。

RIR_1_match_likeIp.png

上記 ③の条件を満たすデータが存在しないRIRリストの例を下記に示します。

RIR_2_no_match_likeIp.png

4-2-5. スクリプトメイン処理
  • ログ出力設定
  • 入力パラメータの設定
    • --target-ip: 国コードを知りたいグローバルIPv4アドレス
    • --enable-debug: 指定された場合、ログレベルDEBUGメッセージを出力
  • PostgreSQLデータベース接続取得
  • RIRレコード検索メイン関数の実行
  • RIRレコードからネットワークアドレスと国コードを取得する
def exec_main():
    logging.basicConfig(format='%(levelname)s %(message)s')
    app_logger = logging.getLogger(__name__)
    app_logger.setLevel(level=logging.DEBUG)
    parser = argparse.ArgumentParser()
    parser.add_argument("--target-ip", required=True, type=str,
                        help="IP address.")
    parser.add_argument("--enable-debug", action="store_true",
                        help="Enable logger debug out.")
    args: argparse.Namespace = parser.parse_args()
    target_ip: str = args.target_ip
    enable_debug: bool = args.enable_debug
    app_logger.info(f"target_ip: {target_ip}, enable_debug: {enable_debug}")

    db: Optional[pgdatabase.PgDatabase] = None
    try:
        db = pgdatabase.PgDatabase(DB_CONF_FILE, logger=None)
        conn: connection = db.get_connection()
        matches: Optional[List[Tuple[str, int, str]]] = get_matches_main(
            conn, target_ip, logger=app_logger if enable_debug else None)
        app_logger.info("Match table Finished.")
    except psycopg2.Error as db_err:
        app_logger.error(db_err)
        exit(1)
    except Exception as exp:
        app_logger.error(exp)
        exit(1)
    finally:
        if db is not None:
            db.close()

    # ターゲットIPのネットワーク(CIDR表記)と国コードを取得する
    if matches is not None and len(matches) > 0:
        network: Optional[str]
        cc: Optional[str]
        network, cc = detect_cc_in_matches(
            target_ip, matches, logger=app_logger if enable_debug else None)
        if network is not None and cc is not None:
            app_logger.info(
                f'Find {target_ip} in (network: "{network}", country_code: "{cc}")'
            )
        else:
            app_logger.info(f"Not match in data.")
    else:
        # このケースは想定しない
        app_logger.warning(f"Not exists in RIR table.")


if __name__ == '__main__':
    exec_main()
4-2-6. スクリプト実行

スクリプトのコード全体を以下に示します。

TestDetectCountryCode.py
import argparse
import logging
import os
from dataclasses import dataclass, asdict
from ipaddress import (
    ip_address, summarize_address_range,
    IPv4Network, IPv4Address
)
import typing
from typing import List, Iterator, Optional, Tuple

import psycopg2
from psycopg2.extensions import connection, cursor

from db import pgdatabase

"""
[Qiita投稿の説明用スクリプト]
指定されたIPアドレスの属するネットワーク(CIDR表記)と国コードをRIRデータから取得する
"""

# データベース接続情報
DB_CONF_FILE: str = os.path.join("conf", "db_conn.json")


@dataclass(frozen=True)
class RirRecord:
    ip_start: str
    ip_count: int
    country_code: str


@typing.no_type_check
def get_cidr_cc_list(ip_start: str,
                     ip_count: int,
                     country_code: str) -> List[Tuple[IPv4Network, str]]:
    # mypy check error IPv?Address to Any.
    # IP address: IPv6Address | IPv4Address
    addr_first: IPv4Address = ip_address(ip_start)
    # Broadcast address
    addr_last: IPv4Address = addr_first + ip_count - 1
    cidr_ite: Iterator[IPv4Network] = summarize_address_range(addr_first, addr_last)
    return [(cidr, country_code) for cidr in cidr_ite]


@typing.no_type_check
def detect_cc_in_cidr_cc_list(
        target_ip: str,
        cidr_cc_list: List[Tuple[IPv4Network, str]]
        ) -> Tuple[Optional[str], Optional[str]]:
    target_ip_addr: IPv4Address = ip_address(target_ip)
    match_network: Optional[str] = None
    match_cc: Optional[str] = None
    for cidr_cc in cidr_cc_list:
        if target_ip_addr in cidr_cc[0]:
            match_network = str(cidr_cc[0])
            match_cc = cidr_cc[1]
            break

    return match_network, match_cc


def get_rir_table_matches(
        conn: connection,
        like_ip: str,
        logger: Optional[logging.Logger] = None) -> List[Tuple[str, int, str]]:
    if logger is not None:
        logger.debug(f"like_ip: {like_ip}")
    result: List[Tuple[str, int, str]]
    try:
        cur: cursor
        # 前ゼロ埋めしたIPアドレスの昇順にソートする
        with conn.cursor() as cur:
            cur.execute("""
SELECT
   ip_start,ip_count,country_code
FROM
   mainte2.RIR_ipv4_allocated
WHERE
   ip_start LIKE %(partial_match)s
ORDER BY
 LPAD(SPLIT_PART(ip_start,'.',1), 3, '0') || '.' ||
 LPAD(SPLIT_PART(ip_start,'.',2), 3, '0') || '.' ||
 LPAD(SPLIT_PART(ip_start,'.',3), 3, '0') || '.' ||
 LPAD(SPLIT_PART(ip_start,'.',4), 3, '0')""",
                        ({'partial_match': like_ip}))
            # レコード取得件数チェック
            if cur.rowcount > 0:
                rows: List[Tuple[str, int, str]] = cur.fetchall()
                if logger is not None:
                    logger.debug(f"rows.size: {len(rows)}")
                    for row in rows:
                        logger.debug(f"{row}")
                result = rows
            else:
                # マッチしなかったら空のリスト
                result = []
        return result
    except (Exception, psycopg2.DatabaseError) as err:
        raise err


def get_matches_main(
        conn: connection,
        target_ip: str,
        logger: Optional[logging.Logger] = None) -> Optional[List[Tuple[str, int, str]]]:
    def make_like_ip(like_old: str) -> Optional[str]:
        # 末尾の likeプレースホルダを削除する
        raw_ip: str = like_old.replace(".%", "")
        fields: List[str] = raw_ip.split(".")
        # コンマで区切って残りが1つなら終了
        field_size: int = len(fields)
        if field_size == 1:
            return None

        # フィールドを1つ減らす
        del fields[field_size - 1]
        # 末尾にlikeプレースホルダ(".%")を付加して終了
        return ".".join(fields) + ".%"

    target_ip_addr: IPv4Address = ip_address(target_ip)  # type: ignore
    like_ip: Optional[str] = make_like_ip(target_ip)
    matches: Optional[List[Tuple[str, int, str]]] = None
    while like_ip is not None:
        matches = get_rir_table_matches(conn, like_ip, logger=logger)
        if len(matches) > 0:
            # 先頭レコードの開始IPアドレス
            first_ip: str = matches[0][0]
            first_ip_addr: IPv4Address = ip_address(first_ip)  # type: ignore
            # 最終レコードの開始IPアドレス
            last: Tuple[str, int, str] = matches[-1]
            last_ip: str = last[0]
            ip_cnt: int = int(last[1])
            last_ip_addr: IPv4Address = ip_address(last_ip)  # type: ignore
            # 最終レコードのブロードキャストアドレス計算
            broadcast_addr: IPv4Address = last_ip_addr + ip_cnt - 1  # type: ignore
            if logger is not None:
                logger.info(f"match_first: {first_ip}, match_last: {last_ip}")

            if first_ip_addr < target_ip_addr < broadcast_addr:
                # ターゲットIPが先頭レコードの開始IPと最終レコードのブロードキャストの範囲内なら終了
                if logger is not None:
                    logger.debug(
                        f"Range in ({first_ip} < {target_ip} < {str(broadcast_addr)})"
                        f", break"
                    )
                break
            else:
                # 範囲外: 次のlike検索文字列を生成して検索処理に戻る
                like_ip = make_like_ip(like_ip)
                if logger is not None:
                    logger.info(f"next {like_ip} continue.")
        else:
            # レコード無し: 次のlike検索文字列を生成して検索処理に戻る
            if logger is not None:
                logger.info(f"{like_ip} is no match.")
            like_ip = make_like_ip(like_ip)
    return matches


def detect_cc_in_matches(
        target_ip: str,
        matches: List[Tuple[str, int, str]],
        logger: Optional[logging.Logger] = None) -> Tuple[Optional[str], Optional[str]]:
    def next_record(rows: List[Tuple[str, int, str]]) -> Iterator[RirRecord]:
        for (ip_sta, ip_cnt, cc) in rows:
            yield RirRecord(ip_start=ip_sta, ip_count=ip_cnt, country_code=cc)

    target_ip_addr: IPv4Address = ip_address(target_ip)  # type: ignore
    match_network: Optional[str] = None
    match_cc: Optional[str] = None
    rec: RirRecord
    for rec in next_record(matches):
        # ターゲットIP が ネットワークIPアドレスより大きい場合は範囲外のため処理終了
        if ip_address(rec.ip_start) > target_ip_addr:  # type: ignore
            if logger is not None:
                logger.debug(
                    f"{target_ip} < {rec.ip_start} break. No more match."
                )
            # マッチするデータなし
            break

        # 開始ネットワークIPのブロードキャストアドレスがターゲットIPより小さければ次のレコードへ
        broadcast_addr: IPv4Address = (
                ip_address(rec.ip_start) + rec.ip_count - 1)  # type: ignore
        if broadcast_addr < target_ip_addr:
            if logger is not None:
                logger.debug(f"({str(broadcast_addr)} < {target_ip}) -> continue")
            continue

        cidr_cc_list: List[Tuple[IPv4Network, str]] = get_cidr_cc_list(
            **asdict(rec)
        )
        if logger is not None:
            logger.debug(cidr_cc_list)
        match_network, match_cc = detect_cc_in_cidr_cc_list(target_ip, cidr_cc_list)
        break

    return match_network, match_cc


def exec_main():
    logging.basicConfig(format='%(levelname)s %(message)s')
    app_logger = logging.getLogger(__name__)
    app_logger.setLevel(level=logging.DEBUG)
    parser = argparse.ArgumentParser()
    parser.add_argument("--target-ip", required=True, type=str,
                        help="IP address.")
    parser.add_argument("--enable-debug", action="store_true",
                        help="Enable logger debug out.")
    args: argparse.Namespace = parser.parse_args()
    target_ip: str = args.target_ip
    enable_debug: bool = args.enable_debug

    db: Optional[pgdatabase.PgDatabase] = None
    try:
        db = pgdatabase.PgDatabase(DB_CONF_FILE, logger=None)
        conn: connection = db.get_connection()
        matches: Optional[List[Tuple[str, int, str]]] = get_matches_main(
            conn, target_ip, logger=app_logger if enable_debug else None)
    except psycopg2.Error as db_err:
        app_logger.error(db_err)
        exit(1)
    except Exception as exp:
        app_logger.error(exp)
        exit(1)
    finally:
        if db is not None:
            db.close()

    # ターゲットIPのネットワーク(CIDR表記)と国コードを取得する
    if matches is not None and len(matches) > 0:
        network: Optional[str]
        cc: Optional[str]
        network, cc = detect_cc_in_matches(
            target_ip, matches, logger=app_logger if enable_debug else None)
        if network is not None and cc is not None:
            app_logger.info(
                f'Find {target_ip} in (network: "{network}", country_code: "{cc}")'
            )
        else:
            app_logger.info(f"Not match in data.")
    else:
        # このケースは想定しない
        app_logger.warning(f"Not exists in RIR table.")


if __name__ == '__main__':
    exec_main()

python仮想環境に入り pythonスクリプトを実行する

$ . py_venv/py_psycopg2/bin/activate

(1) DEBUG出力を有効にして実行

(py_psycopg2) $ python TestDetectCountryCode.py --target-ip 103.77.241.34 --enable-debug
DEBUG like_ip: 103.77.241.%
INFO 103.77.241.% is no match.
DEBUG like_ip: 103.77.%
DEBUG rows.size: 57
DEBUG ('103.77.0.0', 1024, 'IN')
DEBUG ('103.77.4.0', 512, 'NZ')
DEBUG ('103.77.8.0', 1024, 'PK')
DEBUG ('103.77.16.0', 1024, 'BD')
DEBUG ('103.77.20.0', 1024, 'HK')
DEBUG ('103.77.24.0', 512, 'PG')
DEBUG ('103.77.28.0', 1024, 'CN')
DEBUG ('103.77.32.0', 1024, 'FR')
DEBUG ('103.77.48.0', 1024, 'ID')
DEBUG ('103.77.52.0', 1024, 'CN')
DEBUG ('103.77.56.0', 1024, 'CN')
DEBUG ('103.77.60.0', 1024, 'BD')
DEBUG ('103.77.64.0', 1024, 'LK')
DEBUG ('103.77.70.0', 512, 'AU')
DEBUG ('103.77.72.0', 1024, 'CN')
DEBUG ('103.77.76.0', 1024, 'ID')
DEBUG ('103.77.80.0', 512, 'IN')
DEBUG ('103.77.84.0', 1024, 'KR')
DEBUG ('103.77.88.0', 1024, 'CN')
DEBUG ('103.77.92.0', 1024, 'CN')
DEBUG ('103.77.100.0', 1024, 'BD')
DEBUG ('103.77.116.0', 512, 'BD')
DEBUG ('103.77.118.0', 512, 'IN')
DEBUG ('103.77.120.0', 1024, 'AU')
DEBUG ('103.77.132.0', 1024, 'CN')
DEBUG ('103.77.140.0', 1024, 'SG')
DEBUG ('103.77.144.0', 1024, 'AU')
DEBUG ('103.77.148.0', 1024, 'CN')
DEBUG ('103.77.152.0', 1024, 'IN')
DEBUG ('103.77.156.0', 1024, 'ID')
DEBUG ('103.77.160.0', 1024, 'VN')
DEBUG ('103.77.164.0', 1024, 'VN')
DEBUG ('103.77.168.0', 1024, 'VN')
DEBUG ('103.77.172.0', 512, 'VN')
DEBUG ('103.77.174.0', 512, 'VN')
DEBUG ('103.77.178.0', 512, 'KH')
DEBUG ('103.77.182.0', 512, 'VN')
DEBUG ('103.77.186.0', 512, 'IN')
DEBUG ('103.77.188.0', 1024, 'BD')
DEBUG ('103.77.192.0', 1024, 'HK')
DEBUG ('103.77.196.0', 512, 'IN')
DEBUG ('103.77.204.0', 1024, 'ID')
DEBUG ('103.77.208.0', 512, 'VN')
DEBUG ('103.77.210.0', 512, 'HK')
DEBUG ('103.77.212.0', 512, 'NP')
DEBUG ('103.77.214.0', 512, 'VN')
DEBUG ('103.77.216.0', 512, 'MM')
DEBUG ('103.77.218.0', 512, 'BD')
DEBUG ('103.77.220.0', 1024, 'CN')
DEBUG ('103.77.224.0', 256, 'AU')
DEBUG ('103.77.232.0', 1024, 'AU')
DEBUG ('103.77.238.0', 512, 'IN')
DEBUG ('103.77.240.0', 512, 'VN')
DEBUG ('103.77.242.0', 512, 'VN')
DEBUG ('103.77.244.0', 512, 'VN')
DEBUG ('103.77.246.0', 512, 'VN')
DEBUG ('103.77.252.0', 512, 'BD')
INFO match_first: 103.77.0.0, match_last: 103.77.252.0
DEBUG Range in (103.77.0.0 < 103.77.241.34 < 103.77.253.255), break
DEBUG (103.77.3.255 < 103.77.241.34) -> continue
DEBUG (103.77.5.255 < 103.77.241.34) -> continue
DEBUG (103.77.11.255 < 103.77.241.34) -> continue
DEBUG (103.77.19.255 < 103.77.241.34) -> continue
DEBUG (103.77.23.255 < 103.77.241.34) -> continue
DEBUG (103.77.25.255 < 103.77.241.34) -> continue
DEBUG (103.77.31.255 < 103.77.241.34) -> continue
DEBUG (103.77.35.255 < 103.77.241.34) -> continue
DEBUG (103.77.51.255 < 103.77.241.34) -> continue
DEBUG (103.77.55.255 < 103.77.241.34) -> continue
DEBUG (103.77.59.255 < 103.77.241.34) -> continue
DEBUG (103.77.63.255 < 103.77.241.34) -> continue
DEBUG (103.77.67.255 < 103.77.241.34) -> continue
DEBUG (103.77.71.255 < 103.77.241.34) -> continue
DEBUG (103.77.75.255 < 103.77.241.34) -> continue
DEBUG (103.77.79.255 < 103.77.241.34) -> continue
DEBUG (103.77.81.255 < 103.77.241.34) -> continue
DEBUG (103.77.87.255 < 103.77.241.34) -> continue
DEBUG (103.77.91.255 < 103.77.241.34) -> continue
DEBUG (103.77.95.255 < 103.77.241.34) -> continue
DEBUG (103.77.103.255 < 103.77.241.34) -> continue
DEBUG (103.77.117.255 < 103.77.241.34) -> continue
DEBUG (103.77.119.255 < 103.77.241.34) -> continue
DEBUG (103.77.123.255 < 103.77.241.34) -> continue
DEBUG (103.77.135.255 < 103.77.241.34) -> continue
DEBUG (103.77.143.255 < 103.77.241.34) -> continue
DEBUG (103.77.147.255 < 103.77.241.34) -> continue
DEBUG (103.77.151.255 < 103.77.241.34) -> continue
DEBUG (103.77.155.255 < 103.77.241.34) -> continue
DEBUG (103.77.159.255 < 103.77.241.34) -> continue
DEBUG (103.77.163.255 < 103.77.241.34) -> continue
DEBUG (103.77.167.255 < 103.77.241.34) -> continue
DEBUG (103.77.171.255 < 103.77.241.34) -> continue
DEBUG (103.77.173.255 < 103.77.241.34) -> continue
DEBUG (103.77.175.255 < 103.77.241.34) -> continue
DEBUG (103.77.179.255 < 103.77.241.34) -> continue
DEBUG (103.77.183.255 < 103.77.241.34) -> continue
DEBUG (103.77.187.255 < 103.77.241.34) -> continue
DEBUG (103.77.191.255 < 103.77.241.34) -> continue
DEBUG (103.77.195.255 < 103.77.241.34) -> continue
DEBUG (103.77.197.255 < 103.77.241.34) -> continue
DEBUG (103.77.207.255 < 103.77.241.34) -> continue
DEBUG (103.77.209.255 < 103.77.241.34) -> continue
DEBUG (103.77.211.255 < 103.77.241.34) -> continue
DEBUG (103.77.213.255 < 103.77.241.34) -> continue
DEBUG (103.77.215.255 < 103.77.241.34) -> continue
DEBUG (103.77.217.255 < 103.77.241.34) -> continue
DEBUG (103.77.219.255 < 103.77.241.34) -> continue
DEBUG (103.77.223.255 < 103.77.241.34) -> continue
DEBUG (103.77.224.255 < 103.77.241.34) -> continue
DEBUG (103.77.235.255 < 103.77.241.34) -> continue
DEBUG (103.77.239.255 < 103.77.241.34) -> continue
DEBUG [(IPv4Network('103.77.240.0/23'), 'VN')]
INFO Find 103.77.241.34 in (network: "103.77.240.0/23", country_code: "VN")

参考までに APNIC の公開データ(2024-08-11時点)から該当するIPアドレスを含む箇所の抜粋を以下に示します。

delegated-apnic-latest.txt
apnic|IN|ipv4|103.77.238.0|512|20230131|allocated
apnic|VN|ipv4|103.77.240.0|512|20230912|allocated  ★★ 該当するデータ ★★
apnic|VN|ipv4|103.77.242.0|512|20230912|allocated
apnic|VN|ipv4|103.77.244.0|512|20230912|allocated
apnic|VN|ipv4|103.77.246.0|512|20230912|allocated

(2) ターゲットIPのみを指定して実行

(py_psycopg2) $ python TestDetectCountryCode.py --target-ip 103.77.241.34
INFO Find 103.77.241.34 in (network: "103.77.240.0/23", country_code: "VN")

(3) 国コードが取得できないターゲットIPを指定して実行

(py_psycopg2) $ python TestDetectCountryCode.py --target-ip 51.77.58.143
INFO Not match in data.

5. 課題

5-1. RIR公開データの更新頻度

1つ目の課題として、RIRデータは日々更新されているためデータベースに投入して利用するには定期的に更新し続ける必要が有るということです。

下記に2024年08月05日と2024年08月11日のデータの差分の抜粋を示します。

diff -c 2024-08-05/ipv4-allocated-apnic.txt 2024-08-11/ipv4-allocated-apnic.txt
...一部省略...
*** 26793,26798 ****
--- 26793,26811 ----
  apnic	IN	ipv4	160.30.36.0	256	20240730	allocated
  apnic	AU	ipv4	160.30.37.0	256	20240802	allocated
  apnic	CN	ipv4	160.30.40.0	512	20240730	allocated
+ apnic	AU	ipv4	160.30.58.0	512	20240807	allocated
+ apnic	IN	ipv4	160.30.60.0	512	20240807	allocated
+ apnic	BD	ipv4	160.30.62.0	512	20240807	allocated
+ apnic	HK	ipv4	160.30.64.0	512	20240807	allocated
+ apnic	MM	ipv4	160.30.66.0	512	20240808	allocated
+ apnic	PH	ipv4	160.30.68.0	512	20240808	allocated
+ apnic	BD	ipv4	160.30.70.0	256	20240808	allocated
+ apnic	IN	ipv4	160.30.72.0	512	20240808	allocated
+ apnic	IN	ipv4	160.30.74.0	512	20240808	allocated
+ apnic	HK	ipv4	160.30.76.0	512	20240808	allocated
+ apnic	TW	ipv4	160.30.78.0	512	20240808	allocated
+ apnic	IN	ipv4	160.30.82.0	512	20240809	allocated
+ apnic	IN	ipv4	160.30.84.0	512	20240809	allocated
  apnic	SG	ipv4	160.32.240.0	2048	19920420	allocated
  apnic	AU	ipv4	160.64.0.0	65536	19920423	allocated
  apnic	JP	ipv4	160.74.0.0	65536	19920423	allocated
***************
...一部省略...

個人管理なら数週間の間隔で ① CSV形式に変換したファイルを diff で比較し差分のみをテーブルにインポートするか、あるいは思い切って ②テーブルドロップと最新CSV一括インポートも選択肢として有りかなと思います。

5-2. RIRデータに含まれないIPアドレスの存在

2つ目の課題として、RIRデータのみでは判明しないIPアドレスが存在することです。
※ 下記サイト「ネットワークチェックツール」でも不明と回答されたIPアドレスが有りました。

運用開始から約80日間で国コードが判明したホストのIPアドレス数と比率

IP件数 比率(%)
総件数 2,834 100.0
判明 2,650 93.5
不明 184 6.5

6. 最後に

運用開始当初はデータも少なかったので、下記サイトからひとつひとつ手入力でIPアドレスを指定して国コードを取得していました。

ネットワークチェックツール【無料で確認】

ただ運用開始から約2月位経過するとさすがに大変な作業になったため、今回のスクリプトを作る動機になりました。

但しこの記事のスクリプトは説明のために作成したもので、実際の運用環境では使っていません。不正アクセスしたホストのIPアドレスはデータベースのテーブルに日々登録しており、別なスクリプト(下記 IpNetworkCC_in_hosts.py の運用版) を使って取得しています。

これらのスクリプトにより国コードのみならず不正アクセスしたホストのIPアドレスに属するネットワークアドレスも知ることができパケットフィルタリングも効率的に実施することができるようになりました。
※国コード取得できないホストのIPアドレスは、何日もアクセスが続くようなら単独で指定する運用にしています。

実際に運用環境で使っているスクリプトで出力したものの一部を抜粋して示します。

ip_network_cc_with_hosts_2024-08-28.txt
"43.152.0.0/13","SG",["43.152.213.54",
"43.153.0.227",
"43.153.8.10",
"43.153.8.12",
"43.153.14.111",
"43.153.23.100",
"43.153.24.194",
...一部省略...
"43.159.51.7",
"43.159.55.112",
"43.159.59.67",
"43.159.59.118",
"43.159.62.162",
"43.159.137.53"]

"SG"はシンガポール共和国(Republic of Singapore) でホストのIPアドレスは163件ありました。

国名コード表は下記サイトからダウンロードしたものを使いました。
法務省: 国名コード表 (PDF)

今回紹介したスクリプトと運用版スクリプトを投稿用に編集したものを下記 GitHub リポジトリで公開しています。

(GitHub) pipito-yukio / qiita-posts / python / Network_cc_in_target

[リソース一覧]

Network_cc_in_target/
├── README.md
├── bin
│   └── rir_download_to_csv.sh # RIRデータダウンロード、インポート用CSV作成スクリプト
├── output
│   └── match_networks  # IpNetworkCC_in_hosts.pyを実行して出力したファイル
│       ├── ip_network_cc_with_hosts_2024-08-28.txt  # 国コードが取得できたIPアドレス
│       └── unknown_ip_hosts_2024-08-28.txt  # 国コードが取得できなかったIPアドレス
├── requirements.txt
├── scripts
│   ├── find_target_ip_like_param.sh
│   └── import_from_allocated_ipv4_csv.sh
├── sql
│   └── 14_add_ipv4_table.sql
└── src
    ├── IpNetworkCC_in_hosts.py      # 運用版のスクリプトをQiita投稿用に編集したスクリプト
    ├── IpNetworkCC_in_hosts_with_csv.py # CSVファイルのIPアドレスから一括取得するスクリプト
    ├── TestDetectCountryCode.py         # 本記事で紹介したスクリプト
    ├── conf
    │   ├── db_conn.json
    │   └── export_sql_with_ip_country_code.json # IpNetworkCC_in_hosts.py で参照する出力先定義
    ├── csv  # スクリプト IpNetworkCC_in_hosts_with_csv.py 用のサンプルCSVファイル
    │   ├── ssh_auth_error_2024-06-10.csv
    │   └── ssh_auth_error_cc_match.csv
    ├── db
    │   └── pgdatabase.py
    └── mypy.ini

この投稿に関連する記事は Qiita投稿サイトの下記コンテンツをご覧ください。
Qiita@pipito-yukio「psycopg2 バッチ処理に適したクエリーを作成する」

12
19
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
12
19