2
3

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 1 year has passed since last update.

位置情報をSnowflakeに取り込んで簡単な地図表示アプリをStreamlitで作ってみた

Posted at

はじめに

こちらの記事で記載しましたが、PDFから都道府県の位置情報をDBに取り込むことができました。

その続き物として、せっかく取り込んだからには何かに活かさないと、思いました。

とりあえず可視化かな〜ということで、今回位置情報を取り込だのでそれを活用できる可視化といえば...地図表示。

地図に表示する非常にシンプルなアプリをStreamlit入門的に作成してみました。

できたもの

取り込んだデータがデータだけにシンプル!
取り込んだデータだけでとりあえずできそうなことをやってみました。

localhost_8501_.png

気象庁のページから天気予報等で用いられる区分を参考に、selectboxで地域選択ができるようにしてあります。
その他、以下を取り入れています。

  • Snowparkを使用
  • Snowflakeの認証情報にPrivate Keyを使用(よくある実装例はパスワード方式っぽかったので)
  • 地図表示とデータプロットにfoliumを使用
  • 緯度経度の変換処理(度分秒表記<->ドット表記)を実装

特に緯度経度の変換処理では今回使っていない処理もあり、今後データを増やしたりした時のためにやや過剰実装チックになっている部分もあります。

実行環境

今回は、自端末 (mac) で実行・表示しました。
最初、Streamlit in Snowflakeでやろうとしましたが、folium周りの地図表示で何かに反しているようで以下のエラーが出ました。

Unsupported component error: We removed the component, st.components.v1.html, that appeared here in keeping with our security policy.

データはSnowflake上にあるので、Snowparkでデータを取得してきてローカルで表示させるような感じにシフトしました。

コードの実装

コード全体はこちらです。
デフォルト値として指定しているオブジェクト(データベース名など)は、こちらで作成したものをそのまま活かす想定で指定しています。

コード全体
import streamlit as st
from snowflake.snowpark import Session
# from snowflake.snowpark.context import get_active_session  # for Snowsight
import folium
# from streamlit_folium import st_folium  # for Snowsight
from streamlit_folium import components
from cryptography.hazmat.primitives import serialization
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives.asymmetric import rsa
from cryptography.hazmat.primitives.asymmetric import dsa
import re
import os


center_lat = 35.697917
center_lon = 139.7075
database_name = "test_db"
schema_name = "test_schema"
table_name = "test_table"
role_name = "SYSADMIN"
warehouse_name = "compute_wh"
# 参考 : https://www.jma.go.jp/jma/kishou/know/kisetsu_riyou/division/kubun.html
japan_regions = {
    "北海道": ["北海道"],
    "東北": ["青森県", "岩手県", "秋田県", "宮城県", "山形県", "福島県"],
    "関東甲信": ["茨城県", "栃木県", "群馬県", "埼玉県", "千葉県", "東京都", "神奈川県", "山梨県", "長野県"],
    "北陸": ["新潟県", "富山県", "石川県", "福井県"],
    "東海": ["岐阜県", "静岡県", "愛知県", "三重県"],
    "近畿": ["滋賀県", "京都府", "大阪府", "兵庫県", "奈良県", "和歌山県"],
    "中国": ["鳥取県", "島根県", "岡山県", "広島県"],
    "四国": ["徳島県", "香川県", "愛媛県", "高知県"],
    "九州北部": ["山口県", "福岡県", "佐賀県", "長崎県", "熊本県", "大分県"],
    "九州南部・奄美": ["宮崎県", "鹿児島県"],
    "沖縄": ["沖縄県"]
}


class Coordinate(object):
    """
    参考 : https://github.com/frgmt/coordinate
    """
    def __init__(self, latitude, longitude):
        self.dms_pattern = re.compile('[°′″]')
        self.__latitude = self.convert_to_decimal(latitude)
        self.__longitude = self.convert_to_decimal(longitude)

    def convert_to_decimal(self, coordinate):
        # 度分秒表記の場合
        if "°" in str(coordinate):
            coord_list = self.dms_pattern.split(coordinate)[:-1]
            return float(coord_list[0]) + float(coord_list[1]) / 60 + float(coord_list[2]) / 3600
        # 点形式の場合
        elif str(coordinate).count('.') > 1:
            temp_list = str(coordinate).split('.')
            coord_list = temp_list[:2] + ['.'.join(temp_list[2:])]
            return float(coord_list[0]) + float(coord_list[1]) / 60 + float(coord_list[2]) / 3600
        # その他の形式の場合
        else:
            return float(coordinate)

    def get_deg_latitude(self):
        """
        10進法 表記 (Degree)
        :return: 緯度
        """
        return self.__latitude

    def get_deg_longitude(self):
        """
        10進法 表記 (Degree)
        :return: 経度
        """
        return self.__longitude

    def get_dms_latitude(self, separator="dot"):
        """
        度/分/秒 表記 (Degree Minute Second)
        :param separator: 'dms' は「°′″」区切り, 'dot' は「.」区切り
        :return: 緯度
        """
        return self._convert_to_dms(self.__latitude, separator)

    def get_dms_longitude(self, separator="dot"):
        """
        度/分/秒 表記 (Degree Minute Second)
        :param separator: 'dms' は「°′″」区切り, 'dot' は「.」区切り
        :return: 経度
        """
        return self._convert_to_dms(self.__longitude, separator)

    def _convert_to_dms(self, decimal_degree, separator):
        """
        10進法の度表記を度/分/秒表記に変換
        :param decimal_degree: 10進法での度
        :param separator: 'dms' は「°′″」区切り, 'dot' は「.」区切り
        """
        degrees = int(decimal_degree)
        minutes = int((decimal_degree - degrees) * 60)
        seconds = round((decimal_degree - degrees - minutes / 60) * 3600, 2)

        if separator == "dms":
            return f"{degrees}°{minutes}{seconds}"
        else:  # デフォルトは「.」区切り
            return f"{degrees}.{minutes}.{seconds}"

    def get_milli_latitude(self):
        """
        ミリ秒 表記
        :return: 緯度
        """
        return self.__latitude * 3600000

    def get_milli_longitude(self):
        """
        ミリ秒 表記
        :return: 経度
        """
        return self.__longitude * 3600000

    def tokyo_to_wgs84(self):
        """
        日本測地系から世界測地系へ変換
        """
        lat = self.__latitude - self.__latitude * 0.00010695 + self.__longitude * 0.000017464 + 0.0046017
        lon = self.__longitude - self.__latitude * 0.000046038 - self.__longitude * 0.000083043 + 0.010040
        self.__latitude, self.__longitude = lat, lon

    def wgs84_to_tokyo(self):
        """
        世界測地系から日本測地系へ変換
        """
        lat = self.__latitude + self.__latitude * 0.00010696 - self.__longitude * 0.000017467 - 0.0046020
        lon = self.__longitude + self.__latitude * 0.000046047 + self.__longitude * 0.000083049 - 0.010041
        self.__latitude, self.__longitude = lat, lon


def create_session():
  """
  参考 : https://community.snowflake.com/s/article/How-to-create-a-session-via-Snowpark-python-using-key-pair-authentication-in-jupyter

  以下の環境変数を設定しておくこと。

  export SNOWFLAKE_ACCOUNT=<アカウント識別子>
  export SNOWFLAKE_USER=<ユーザー名>
  export SNOWFLAKE_PRIVATE_KEY_PATH=<キー(〜.p8)のフルパス>
  export SNOWFLAKE_PRIVATE_KEY_PASSPHRASE=<キーのパスワードがあれば設定>

  以下はオプション。

  export SNOWFLAKE_ROLE=<ロール名>
  export SNOWFLAKE_WAREHOUSE=<ウェアハウス名>
  export SNOWFLAKE_DATABASE=<DB名>
  export SNOWFLAKE_SCHEMA=<スキーマ名>
  """
  
  pass_phrase = os.environ.get('SNOWFLAKE_PRIVATE_KEY_PASSPHRASE')
  with open(os.environ.get("SNOWFLAKE_PRIVATE_KEY_PATH"), "rb") as key:
    tmp_key = serialization.load_pem_private_key(
        key.read(),
        password=pass_phrase.encode() if pass_phrase else None,
        backend=default_backend()
    )

  private_key = tmp_key.private_bytes(
      encoding=serialization.Encoding.DER,
      format=serialization.PrivateFormat.PKCS8,
      encryption_algorithm=serialization.NoEncryption()
  )

  connection_parameters = {
    "account": os.environ.get("SNOWFLAKE_ACCOUNT"),
    "user": os.environ.get("SNOWFLAKE_USER"),
    "private_key": private_key,
    "role": os.environ.get("SNOWFLAKE_ROLE", role_name),
    "warehouse": os.environ.get("SNOWFLAKE_WAREHOUSE", warehouse_name),
    "database": os.environ.get("SNOWFLAKE_DATABASE", database_name),
    "schema": os.environ.get("SNOWFLAKE_SCHEMA", schema_name),
  }  

  return Session.builder.configs(connection_parameters).create() 


def folium_static(fig, width=700, height=500):
  """
  参考 : https://qiita.com/sentencebird/items/478e7151e952798c2bb8
  """
  if isinstance(fig, folium.Map):
      fig = folium.Figure().add_child(fig)
      return components.html(
          fig.render(), height=(fig.height or height) + 10, width=width
          )
  elif isinstance(fig, folium.plugins.DualMap):
      return components.html(
          fig._repr_html_(), height=height + 10, width=width
      )


def plot_marker(row):
  c = Coordinate(row["LATITUDE"], row["LONGITUDE"])

  pop=f'{row["CITY_NAME"]}<br>{row["MESH_CODE"]}<br>{row["LATITUDE"]}<br>{row["LONGITUDE"]}'

  folium.Marker(
      location=[
          c.get_deg_latitude(),
          c.get_deg_longitude()
      ],
      tooltip=row["PREFECTURE_NAME"],
      popup=folium.Popup(pop, max_width=300),
      icon=folium.Icon(icon_color="white", color="blue")
  ).add_to(folium_map)

# 参考 : https://welovepython.net/streamlit-folium/

st.header("Streamlit App Test")
st.subheader("各都道府県の位置情報")

# folium_figure = folium.Figure(width=700, height=600)  # for Snowsight
folium_figure = folium.Figure()

folium_map = folium.Map(
  location = [center_lat,center_lon],
  # 地理院タイル一覧 : https://maps.gsi.go.jp/development/ichiran.html
  tiles='https://cyberjapandata.gsi.go.jp/xyz/pale/{z}/{x}/{y}.png',
  attr='都道府県庁所在地',
  zoom_start=5
).add_to(folium_figure)

# session = get_active_session()  # for Snowsight
session = create_session()

table = session.table(table_name)

selected_region = st.selectbox(
  label="表示する地域",
  options=list(japan_regions.keys()),
  index=None,
  placeholder="表示する地域を選択してください。",
)

if selected_region is None:
  st.text(f"表示する地域:全国")
else:
  st.text(f"表示する地域:{selected_region}")
  region_list = "'" + "', '".join(japan_regions[selected_region]) + "'"
  query = f"select * from {table_name} where prefecture_name in ({region_list});"
  table = session.sql(query)

queried_data = table.to_pandas()
st.dataframe(queried_data, use_container_width=True)

queried_data.apply(plot_marker, axis=1)

# st_data = st_folium(folium_map)  # for Snowsight
st_data = folium_static(folium_map, width=700, height=600)

実装内容

部分的にみていきます。

Snowparkとstreamlit、地図表示とデータのプロットにfoliumを使っています。

import streamlit as st
from snowflake.snowpark import Session
import folium
from streamlit_folium import components

以下はSnowsight上で実行しようとした名残です。今回はローカル実行なので使っていません。

# from snowflake.snowpark.context import get_active_session  # for Snowsight
# from streamlit_folium import st_folium  # for Snowsight

# ... 中略 ...

# session = get_active_session()  # for Snowsight

# ... 中略 ...

# st_data = st_folium(folium_map)  # for Snowsight

Snowflakeの認証(SnowparkのSession)に、Private Keyを用いる処理を実装するために使いました。(参考)

from cryptography.hazmat.primitives import serialization
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives.asymmetric import rsa
from cryptography.hazmat.primitives.asymmetric import dsa

緯度経度変換処理は、Snowsight上でそれっぽいPackagesが選択できなかったこともあり、実装例を探しました。

こちらにあったclassの実装を大いに参考にさせていただき、カスタマイズしました。(感謝...!)

class Coordinate(object):
    def __init__(self, latitude, longitude):
        self.dms_pattern = re.compile('[°′″]')
        self.__latitude = self.convert_to_decimal(latitude)
        self.__longitude = self.convert_to_decimal(longitude)
    # ... 以下略 ...

今回PDFから取り込んだ緯度経度の情報は度分秒の60進法表記(例:43°03′30″)で、このままだとfoliumで使えないことから変換処理を実装しました。

    def convert_to_decimal(self, coordinate):
        # 度分秒表記の場合
        if "°" in str(coordinate):
            coord_list = self.dms_pattern.split(coordinate)[:-1]
            return float(coord_list[0]) + float(coord_list[1]) / 60 + float(coord_list[2]) / 3600
    # ... 以下略 ...

foliumで使用する時は、10進法の値が欲しいので以下を使って取り出します。

    def get_deg_latitude(self):
        return self.__latitude
    def get_deg_longitude(self):
        return self.__longitude

その他、参考にしたコードに諸々変換の考慮が盛り込まれていたこともあり、捨てるのはもったいないなぁということで残りも移植しました。(詳細は割愛)

次に、SnowparkのSessionを使ってSnowflakeの認証を行う部分です。
パスワードを使う場合はもっと簡単な実装になりますが、せっかくなのでこちらを参考に、Private Keyで認証できるように実装しました。

def create_session():
  pass_phrase = os.environ.get('SNOWFLAKE_PRIVATE_KEY_PASSPHRASE')
  with open(os.environ.get("SNOWFLAKE_PRIVATE_KEY_PATH"), "rb") as key:
    tmp_key = serialization.load_pem_private_key(
        key.read(),
        password=pass_phrase.encode() if pass_phrase else None,
        backend=default_backend()
    )

  private_key = tmp_key.private_bytes(
      encoding=serialization.Encoding.DER,
      format=serialization.PrivateFormat.PKCS8,
      encryption_algorithm=serialization.NoEncryption()
  )

  connection_parameters = {
    "account": os.environ.get("SNOWFLAKE_ACCOUNT"),
    "user": os.environ.get("SNOWFLAKE_USER"),
    "private_key": private_key,
    "role": os.environ.get("SNOWFLAKE_ROLE", role_name),
    "warehouse": os.environ.get("SNOWFLAKE_WAREHOUSE", warehouse_name),
    "database": os.environ.get("SNOWFLAKE_DATABASE", database_name),
    "schema": os.environ.get("SNOWFLAKE_SCHEMA", schema_name),
  }  

予めSnwoflake上で対応するPublic Keyの設定はしておきます。

ALTER USER <ユーザー名> SET RSA_PUBLIC_KEY = 'MIICI...';

認証に使用する各種情報は環境変数経由で取れるようにしています。
何気にTerraformのSnowflake Providerで使用する環境変数名と合わせているので、Terraformも使っている人には使い勝手がいいかもしれません。

export SNOWFLAKE_ACCOUNT=<アカウント識別子>
export SNOWFLAKE_USER=<ユーザー名>
export SNOWFLAKE_PRIVATE_KEY_PATH=<キー(〜.p8)のフルパス>
export SNOWFLAKE_PRIVATE_KEY_PASSPHRASE=<キーのパスワードがあれば設定>

# 以下はコード記載のデフォルト値から変えたい場合は設定。
export SNOWFLAKE_ROLE=<ロール名>
export SNOWFLAKE_WAREHOUSE=<ウェアハウス名>
export SNOWFLAKE_DATABASE=<DB名>
export SNOWFLAKE_SCHEMA=<スキーマ名>

環境変数設定後、コードからsession = create_session()を実行するとSnowflakeアカウントにクエリが実行できるようになります。

foliumをローカルのstreamlitで表示させる際の考慮に関して、こちらを参考に盛り込みました。

def folium_static(fig, width=700, height=500):
  if isinstance(fig, folium.Map):
      fig = folium.Figure().add_child(fig)
      return components.html(
          fig.render(), height=(fig.height or height) + 10, width=width
      )
# ... 以下略 ...

いよいよfoliumを使って地図上に位置情報をプロットする部分の実装です。
関数化させておいて、dataframeのapplyを使って一気に適用することで地図表示の高速化を図っているつもりです。

# pandasのdataframeからapplyを使って呼び出す
# e.g. df.apply(plot_marker, axis=1)
def plot_marker(row):
  c = Coordinate(row["LATITUDE"], row["LONGITUDE"])

  pop=f'{row["CITY_NAME"]}<br>{row["MESH_CODE"]}<br>{row["LATITUDE"]}<br>{row["LONGITUDE"]}'

  folium.Marker(
      location=[
          c.get_deg_latitude(),
          c.get_deg_longitude()
      ],
      tooltip=row["PREFECTURE_NAME"],
      popup=folium.Popup(pop, max_width=300),
      icon=folium.Icon(icon_color="white", color="blue")
  ).add_to(folium_map)

マーカーをクリックするとテーブルに入っている他の情報を表示するようにしています。

image.png

foliumで表示するmapの設定です。
こちらで公開されている地理院タイルからいろんな地図を選べます。

folium_figure = folium.Figure()

folium_map = folium.Map(
  location = [center_lat,center_lon],
  tiles='https://cyberjapandata.gsi.go.jp/xyz/pale/{z}/{x}/{y}.png',
  attr='都道府県庁所在地',
  zoom_start=5
).add_to(folium_figure)

上記の
tiles='https://cyberjapandata.gsi.go.jp/xyz/pale/{z}/{x}/{y}.png'
の場合はこのような地図になります。

image.png

他のものに変えてみると...

tiles=https://cyberjapandata.gsi.go.jp/xyz/seamlessphoto/{z}/{x}/{y}.jpg

image.png

ずいぶん印象が変わりますね。

後はSnowflakeからデータを引っ張ってきて、それを画面に表示する部分ですね。
地域を選択する部分は以下のようにしました。

japan_regions = {
    "北海道": ["北海道"],
    "東北": ["青森県", "岩手県", "秋田県", "宮城県", "山形県", "福島県"],
    "関東甲信": ["茨城県", "栃木県", "群馬県", "埼玉県", "千葉県", "東京都", "神奈川県", "山梨県", "長野県"],
    "北陸": ["新潟県", "富山県", "石川県", "福井県"],
    "東海": ["岐阜県", "静岡県", "愛知県", "三重県"],
    "近畿": ["滋賀県", "京都府", "大阪府", "兵庫県", "奈良県", "和歌山県"],
    "中国": ["鳥取県", "島根県", "岡山県", "広島県"],
    "四国": ["徳島県", "香川県", "愛媛県", "高知県"],
    "九州北部": ["山口県", "福岡県", "佐賀県", "長崎県", "熊本県", "大分県"],
    "九州南部・奄美": ["宮崎県", "鹿児島県"],
    "沖縄": ["沖縄県"]
}

# ... 中略 ...

table = session.table(table_name)

selected_region = st.selectbox(
  label="表示する地域",
  options=list(japan_regions.keys()),
  index=None,
  placeholder="表示する地域を選択してください。",
)

if selected_region is None:
  st.text(f"表示する地域:全国")
else:
  st.text(f"表示する地域:{selected_region}")
  region_list = "'" + "', '".join(japan_regions[selected_region]) + "'"
  query = f"select * from {table_name} where prefecture_name in ({region_list});"
  table = session.sql(query)

queried_data = table.to_pandas()
st.dataframe(queried_data, use_container_width=True)

selectboxはデフォルトでは何も選択していない状態(index=None)にしておいて全国のデータとマーカーを表示します。

image.png

image.png

地域を選択すると、Snowpark経由でクエリを実行して必要なデータに絞ってから画面表示を更新します。

image.png

image.png

このように表と地図の情報が更新されます。
Streamlitだとこの手の実装が非常に楽で素晴らしいですね。

トドメにapplyで地図にプロットして画面表示で終わりです。

queried_data.apply(plot_marker, axis=1)
st_data = folium_static(folium_map, width=700, height=600)

おわりに

Streamlitいいですね。

もっといろんな部品を使いこなして、データも集めたり組み合わせたりして、データ活用をしたいい感じのアプリケーションやダッシュボードを作ってみたいですね。

以上です。

2
3
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
2
3

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?