LoginSignup
0
2

More than 1 year has passed since last update.

Databricks上でGeoLite2を使ってIPアドレスから地理情報を取得する

Last updated at Posted at 2022-01-27

Max Mind社が提供しているIP GeoLocationデータ(IPアドレスと地理情報のマッピング)であるGeoLite2を使って、Databricks上でIPアドレスから地理情報をマッピングする方法について見ていきます。

IPアドレスから地理情報が取得できれば、ログ解析やウェブマーケティグなどで幅広く活用できる情報源になります。一方で、IPアドレスは非常にカーディナリティ(値の種類)が高く、照合(Lookup)する際のコストが高くなります。DatabricksのDelta LakeおよびRange Joinを使うことで非常に高速・高効率にIPから地理情報の照合が実現できます。リアルタイムな地理情報が必要なアプリケーションなどでも使用できます。

worldmap2.png

GeoLite2を使用することで、IPアドレスから以下の情報をLookupすることができます。

 |-- ipaddr: string 
 |-- asn_network: string 
 |-- autonomous_system_number: integer 
 |-- autonomous_system_organization: string 
 |-- city_network: string 
 |-- registered_country_geoname_id: integer 
 |-- represented_country_geoname_id: integer 
 |-- is_anonymous_proxy: integer 
 |-- is_satellite_provider: integer 
 |-- postal_code: string 
 |-- latitude: double 
 |-- longitude: double 
 |-- accuracy_radius: integer 
 |-- locale_code: string 
 |-- continent_code: string 
 |-- continent_name: string 
 |-- country_iso_code: string 
 |-- country_name: string 
 |-- subdivision_1_iso_code: string 
 |-- subdivision_1_name: string 
 |-- subdivision_2_iso_code: string 
 |-- subdivision_2_name: string 
 |-- city_name: string 
 |-- metro_code: integer 
 |-- time_zone: string 
 |-- is_in_european_union: integer 

Max Mind - GeoLite2について

GeoLite2は、有償サービスであるGeoIPに比べて精度が低いものの、無償で利用できるIPアドレス-地理情報のデータベースになります。利用規約にあるとおり、商用で利用する場合にはアトリビューションの提示が必要になりますので、利用の際にはご確認ください。

GeoLite2はWeb上でIPから地理情報を取得できるWebサービスもありますが、データベースがCSVファイルでも提供されています。Databricks上で使用する際は、このCSVファイルからDelta Lakeを作成し、SQLのテーブルとして利用します。

GeoLite2のウェブサイトからユーザー登録をすると、ダウンロードのためのライセンスキーが発行できますので、そちらを用意します。

環境準備

以下の2点を事前準備しておきます。

  1. GeoLite2のライセンスキー
  2. データブリックスからアクセス可能(Read/Write)なオブジェクトストレージ

今回の例では、オブジェクトストレージとしてAWS S3を使用します。Azure Blog StorageやGoogle Cloud Storageでも同様に可能です。

GeoLite2のCSVのダウンロード・S3への配置

まずは、GeoLite2のCSVデータをダウンロードしてオブジェクトストレージ上に配置します。

GeoLite2はアップデートがあった場合でも機械的にファイルをダウンロードできるように静的なURL(Permalink)でCSVデータがダウンロードできるようになっています。

Databricksクラスタのドライバノード上にCSVファイルをダウンロードして、boto3を使ってS3にアップロードしています。

params={}
params['geolite2']={}
params['geolite2']['s3_bucket'] = S3_BUCKET_NAME
params['geolite2']['license_key'] = GEOLITE2_LICENSE_KEY
params['geolite2']['asn_csv_url'] = f"https://download.maxmind.com/app/geoip_download?edition_id=GeoLite2-ASN-CSV&license_key={params['geolite2']['license_key']}&suffix=zip"
params['geolite2']['city_csv_url'] =f"https://download.maxmind.com/app/geoip_download?edition_id=GeoLite2-City-CSV&license_key={params['geolite2']['license_key']}&suffix=zip"
params['geolite2']['country_csv_url'] = f"https://download.maxmind.com/app/geoip_download?edition_id=GeoLite2-Country-CSV&license_key={params['geolite2']['license_key']}&suffix=zip"
params['geolite2']['s3_base_dir'] = S3_DIRECTORY
params['geolite2']['asn_dirname']='ASN'
params['geolite2']['country_dirname']='Country'
params['geolite2']['city_dirname']='City'

def upload_to_s3(local_filepath, bucket, upload_path):
  import boto3
  from botocore.exceptions import ClientError
  client = boto3.client('s3')
  try:
    ret=client.upload_file(local_filepath, bucket, upload_path)
  except ClientError as e:
    print(e)
    return False
  return True


def downlaod_file(url, local_filepath):
  import subprocess
  try:
    ret=subprocess.Popen(args=['curl', '-o', local_filepath, url],
                         stdout=subprocess.PIPE,
                         stderr=subprocess.PIPE )
    print( ret.communicate() )
  except Exception as e:
    print(f'error=>{e}')


def get_and_upload_csv_to_s3(url, tagname, s3_bucket, upload_dir, tmpdir='/tmp'):
  '''
  url: geolite2 permalink
  tagname: fike name for temp use
  upload_dir: s3 directory name the file is uploded to
  '''
  import zipfile
  import os.path

  # get the files
  local_filepath = os.path.join(tmpdir, tagname)
  print(f'local_filepath => {local_filepath}')
  downlaod_file(url, local_filepath)

  # unzip
  z = zipfile.ZipFile(local_filepath)
  files = z.namelist()

  with z as f:
    f.extractall(tmpdir)

  # uploading to s3
  for n in files:
    if n.lower().endswith('.csv'):
      local=os.path.join(tmpdir, n)
      base=os.path.basename(n)
      s3_path=os.path.join(upload_dir, tagname, base)
      print(f'''uploading files to s3: {local} => s3://{params['geolite2']['s3_bucket']}/{s3_path}''')
      upload_to_s3(local, s3_bucket, s3_path)


# ASN
get_and_upload_csv_to_s3(
  params['geolite2']['asn_csv_url'],
  params['geolite2']['asn_dirname'],
  params['geolite2']['s3_bucket'],
  params['geolite2']['s3_base_dir']
)

# Country
get_and_upload_csv_to_s3(
  params['geolite2']['country_csv_url'],
  params['geolite2']['country_dirname'],
  params['geolite2']['s3_bucket'],
  params['geolite2']['s3_base_dir']
)


# City
get_and_upload_csv_to_s3(
  params['geolite2']['city_csv_url'],
  params['geolite2']['city_dirname'],
  params['geolite2']['s3_bucket'],
  params['geolite2']['s3_base_dir']
)

CSVからDeltaテーブルを作成

CSVをSparkのデータフレームとしてロードして、そのままDeltaフォーマットで書き出します。
その後、SQLからも参照できるように、テーブルを作成します(CREATE TABLE)。

# CSVからDeltaテーブルを作成
def createTableDelta(csv_path, table_name, dbname='db_geolite2'):
  delta_path = csv_path + '.delta'
  (
    spark
    .read
    .format('csv')
    .option('Header', True)
    .option('inferSchema', True)
    .load(csv_path)
    .write
    .format('delta')
    .mode('overwrite')
    .save(delta_path)
  )

  sql(f'''
    CREATE DATABASE IF NOT EXISTS {dbname}
  ''')

  sql(f'''
    CREATE TABLE IF NOT EXISTS {dbname}.{table_name}
    USING delta
    LOCATION '{delta_path}'
  ''')



s3_base = f'''s3a://{params['geolite2']['s3_bucket']}/{params['geolite2']['s3_base_dir']}'''

# ASN
createTableDelta(f'{s3_base}/ASN/GeoLite2-ASN-Blocks-IPv4.csv', 'geolite2_asn_blocks_ipv4_delta', DELTA_DATABASE_NAME)
createTableDelta(f'{s3_base}/ASN/GeoLite2-ASN-Blocks-IPv6.csv', 'geolite2_asn_blocks_ipv6_delta', DELTA_DATABASE_NAME)

# country
createTableDelta(f'{s3_base}/Country/GeoLite2-Country-Blocks-IPv4.csv', 'geolite2_country_blocks_ipv4_delta', DELTA_DATABASE_NAME)
createTableDelta(f'{s3_base}/Country/GeoLite2-Country-Blocks-IPv6.csv', 'geolite2_country_blocks_ipv6_delta', DELTA_DATABASE_NAME)
createTableDelta(f'{s3_base}/Country/GeoLite2-Country-Locations-en.csv', 'geolite2_country_locations_en_delta', DELTA_DATABASE_NAME)

# city
createTableDelta(f'{s3_base}/City/GeoLite2-City-Blocks-IPv4.csv', 'geolite2_city_blocks_ipv4_delta', DELTA_DATABASE_NAME)
createTableDelta(f'{s3_base}/City/GeoLite2-City-Blocks-IPv6.csv', 'geolite2_city_blocks_ipv6_delta', DELTA_DATABASE_NAME)
createTableDelta(f'{s3_base}/City/GeoLite2-City-Locations-en.csv', 'geolite2_city_locations_en_delta', DELTA_DATABASE_NAME)

(Optional) データ確認

Deltaテーブル化されたGeoLite2データをいくつか確認してみます。

%sql

SELECT * FROM geolite2_asn_blocks_ipv4_delta LIMIT 5

asn.png

%sql

SELECT * FROM geolite2_country_locations_en_delta LIMIT 5

Screen Shot 2022-01-27 at 19.21.28.png

IPアドレスから地理情報を調べる(Lookup)

IPアドレスは文字列になっている一方で、GeoLite2のロケーションはCIDRブロックで与えられています。そのため、IPアドレスとCIDRブロックのマッピングで使う関数を定義します(UDF)。

# IP addressとCIDRブロックのマッピングで使うUDF
import pandas as pd
from pyspark.sql.functions import pandas_udf, col

@pandas_udf('long')
def to_network_address(cidr: pd.Series) -> pd.DataFrame:
  import ipaddress as ip
  return cidr.apply(lambda x: int(ip.IPv4Network(x).network_address) )

spark.udf.register('to_network_address', to_network_address)


@pandas_udf('long')
def to_broadcast_address(cidr: pd.Series) -> pd.DataFrame:
  import ipaddress as ip
  return cidr.apply(lambda x: int(ip.IPv4Network(x).broadcast_address) )

spark.udf.register('to_broadcast_address', to_broadcast_address)


@pandas_udf('long')
def to_address_int(cidr: pd.Series) -> pd.DataFrame:
  import ipaddress as ip
  return cidr.apply(lambda x: int(ip.IPv4Address(x)) )

spark.udf.register('to_address_int', to_address_int)

準備ができましたので、ランダムなIPアドレスを使って、その地理情報を調べてみましょう。

# テスト用のランダムIPのDataFrameを準備
ips='''
78.111.216.254
218.108.247.146
28.111.231.220
38.122.132.153
32.103.10.172
207.134.12.84
61.167.199.90
57.131.76.227
31.87.240.21
159.215.114.209
111.5.31.50
98.48.65.209
168.157.24.255
42.161.173.238
23.213.73.177
233.23.214.180
102.100.40.45
246.60.187.154
71.209.81.8
115.250.101.47
'''

ipaddr = ips.splitlines()[1:]
df_ipaddr = spark.createDataFrame([{'ipaddr':ip} for ip in ipaddr])
df_ipaddr.createOrReplaceTempView('ipaddr_list')

display(sql(' select * from ipaddr_list'))

Screen Shot 2022-01-27 at 19.25.50.png

IPアドレスとCIDRのマッピングはRange Joinになるため、range joinの最適化(binSize指定)を実施するとクエリが速くなります(必須ではない)。

%sql
-- [Optional]
-- range joinのため、bin=65536に設定しておく
SET spark.databricks.optimizer.rangeJoin.binSize=65536

1) AS NumberのLook-up

まず初めにAS番号からLookupしてみます。

%sql

SELECT *
FROM ipaddr_list ip
LEFT JOIN geolite2_asn_blocks_ipv4_delta asn
ON to_address_int(ip.ipaddr) BETWEEN to_network_address(asn.network) AND to_broadcast_address(asn.network)

Screen Shot 2022-01-27 at 19.29.54.png

スキーマ

 |-- ipaddr: string 
 |-- network: string 
 |-- autonomous_system_number: integer 
 |-- autonomous_system_organization: string 

どのネットワーク(ISPなどのAS単位)からのアクセスかが分かります。
例えば、CloudTrailのログ解析などの場合には、そのアクセスがAWS内からだったのか、AWS外部からだったのかなどの判定でも使用できます。

2) CountryのLook-up

国レベルのLookupの例です。国コードなどもありますので、国レベルの地図のマップも可能です。

%sql

SELECT 
  * 
FROM (
  SELECT *
  FROM ipaddr_list ip
  LEFT JOIN geolite2_country_blocks_ipv4_delta c
  ON to_address_int(ip.ipaddr) BETWEEN to_network_address(c.network) AND to_broadcast_address(c.network) ) fact
LEFT JOIN geolite2_country_locations_en_delta loc
ON fact.geoname_id = loc.geoname_id

Screen Shot 2022-01-27 at 19.37.34.png

スキーマ

 |-- ipaddr: string 
 |-- network: string 
 |-- geoname_id: integer 
 |-- registered_country_geoname_id: integer 
 |-- represented_country_geoname_id: integer 
 |-- is_anonymous_proxy: integer 
 |-- is_satellite_provider: integer 
 |-- geoname_id: integer 
 |-- locale_code: string 
 |-- continent_code: string 
 |-- continent_name: string 
 |-- country_iso_code: string 
 |-- country_name: string 
 |-- is_in_european_union: integer 

is_in_european_unionによって、EUの国の判定などもできます。

3) CityのLook-up

CityレベルのLookupです。一番細かいレベルです。緯度経度情報も載ってきますので、このテーブルを使用して地図へのマップなども可能です。

%sql

SELECT 
  * 
FROM (
  SELECT *
  FROM ipaddr_list ip
  LEFT JOIN geolite2_city_blocks_ipv4_delta c
  ON to_address_int(ip.ipaddr) BETWEEN to_network_address(c.network) AND to_broadcast_address(c.network) ) fact
LEFT JOIN geolite2_city_locations_en_delta loc
ON fact.geoname_id = loc.geoname_id

Screen Shot 2022-01-27 at 19.39.15.png

スキーマ

 |-- ipaddr: string 
 |-- network: string 
 |-- geoname_id: integer 
 |-- registered_country_geoname_id: integer 
 |-- represented_country_geoname_id: integer 
 |-- is_anonymous_proxy: integer 
 |-- is_satellite_provider: integer 
 |-- postal_code: string 
 |-- latitude: double 
 |-- longitude: double 
 |-- accuracy_radius: integer 
 |-- geoname_id: integer 
 |-- locale_code: string 
 |-- continent_code: string 
 |-- continent_name: string 
 |-- country_iso_code: string 
 |-- country_name: string 
 |-- subdivision_1_iso_code: string 
 |-- subdivision_1_name: string 
 |-- subdivision_2_iso_code: string 
 |-- subdivision_2_name: string 
 |-- city_name: string 
 |-- metro_code: integer 
 |-- time_zone: string 
 |-- is_in_european_union: integer 

4) 全部入り (AS Number, Country, CityのLook-up)

IPアドレスから全ての地理データをLookupします。

%sql
-- (CityテーブルがCountryテーブルを内包している)

SELECT
  ret_asn.ipaddr,
  ret_asn.network as asn_network,
  ret_asn.autonomous_system_number,
  ret_asn.autonomous_system_organization,
  ret_city.network as city_network,
  --ret_city.geoname_id,
  ret_city.registered_country_geoname_id,
  ret_city.represented_country_geoname_id,
  ret_city.is_anonymous_proxy,
  ret_city.is_satellite_provider,
  ret_city.postal_code,
  ret_city.latitude,
  ret_city.longitude,
  ret_city.accuracy_radius,
  ret_city.locale_code,
  ret_city.continent_code,
  ret_city.continent_name,
  ret_city.country_iso_code,
  ret_city.country_name,
  ret_city.subdivision_1_iso_code,
  ret_city.subdivision_1_name,
  ret_city.subdivision_2_iso_code,
  ret_city.subdivision_2_name,
  ret_city.city_name,
  ret_city.metro_code,
  ret_city.time_zone,
  ret_city.is_in_european_union
FROM
  (
    SELECT
      *
    FROM
      ipaddr_list ip
      LEFT JOIN geolite2_asn_blocks_ipv4_delta asn ON to_address_int(ip.ipaddr) BETWEEN to_network_address(asn.network)
      AND to_broadcast_address(asn.network)
  ) AS ret_asn
  LEFT JOIN (
    SELECT
      *
    FROM
      (
        SELECT
          *
        FROM
          ipaddr_list ip
          LEFT JOIN geolite2_city_blocks_ipv4_delta c ON to_address_int(ip.ipaddr) BETWEEN to_network_address(c.network)
          AND to_broadcast_address(c.network)
      ) fact
      LEFT JOIN geolite2_city_locations_en_delta loc ON fact.geoname_id = loc.geoname_id
  ) AS ret_city ON ret_asn.ipaddr = ret_city.ipaddr

Screen Shot 2022-01-27 at 19.41.50.png

Databricks SQL(Redash)で地図上にマップしてみる

Lookupした情報からRedashの世界地図上にプロットする例です。

国コード別にプロット
worldmap.png

緯度経度でプロット
worldmap2.png

サンプルNotebookの使い方

サンプルNotebookでは以下のヘッダ部分の変数設定だけで、テーブルが作成が可能です。

#例) s3://foo-bucket/bar/ 配下をデータ配置場所として使用する場合
S3_BUCKET_NAME       = 'foo-bucket'
S3_DIRECTORY         = 'bar'
GEOLITE2_LICENSE_KEY = 'GeoLite2のライセンスキーに置き換えてください' #(安全のためSecretを使用してください)
DELTA_DATABASE_NAME  = 'test_geolite2'

GeoLite2のCSVダウンロードも含めたコードになっていますので、このNotebookを定期実行するだけで、最新のGeoLocation情報のテーブルに更新できます。

また、Delta Lakeのタイムトラベル機能がありますので、過去のテーブルの状態を復元・参照することも可能です。

まとめ

IP-GeoLocationのDeltaテーブルを常備しておくことでDatabricks上の様々なデータと掛け合わせて、地理情報を利用することができます。サンプルNotebookもすぐに使用できるコードになっていますので、併せて参照ください。

0
2
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
2