2
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

Frosty Friday Live ChallengeのためにStreamlitで動くXMLデータローダーを作成した

Posted at

Frosy Fridayとは

Frosty Fridayとは「Snowflakeユーザが、Snowflakeユーザのために作成し、Snowflakeスキルの練習と開発に役立つ」チャレンジです。
原文:
to help you practice and develop your Snowflake skills, created by Snowflake users, for Snowflake users.

こちらのサイトで毎週新しいチャレンジが公開されます。簡単な問題もあるのでSnowflake初心者が練習するときにちょうどいいですよ!

また、このFrosty Fridayに関するYouTube動画シリーズが「Frosty Friday Live Challenge」です。
このYouTube動画シリーズはJapan Snowflake User Group "SnowVillage"の有志が定期更新しており、毎回、出演者が自分なりの解法を解説しています。

XML ParsingとStreamlit

先日、Frosty Friday Live Challengeにゲスト出演させていただき、Week 49 - Intermediate - XML Parsingの解法を解説させていただきました。

SnowflakeではJSONやXMLのような半構造化データをSQLベースの命令文でデータ操作できます。
これにより、半構造化データを対象にデータ検索する、半構造化データを構造化する(つまり、スプレッドシートのような形にする)などなどが可能です。

これはこれでXMLファイルが大量にあってもスケールするいい機能なのですが、XMLファイルの構造を把握していることや独特の演算子を知っていることが必要となります。
また、手元のXMLファイルに対して試行錯誤しながら構造化したいシーンにも不向きなので、そのようなシーンにフォーカスしたStreamlitでアプリを試作してみました。

application screenshot

ポイント解説

findall()

今回はElementTreeという有名なライブラリを使っています。
このライブラリでは、findall()関数を使うとタグ名をキーとして簡単に要素(Element)をリストアップできます。繰り返し構造が含まれているXMLファイルに対しては強烈に有効です。

elements = root.findall(_path)

ET.tostring()

ElementTreeで取得した要素が実際にどのようなXMLになっているか確認するときはET.tostring()関数を使うと簡単です。
これを使うことで、タグ名をキーとした要素のリストアップと、その要素が実際にどのようなXMLなのかをインタラクティブに確認することができます。
なお、この関数はそのままだとバイト列が戻ってくるようなので、decode()関数を組み合わせています。

parted_xml = ET.tostring(element, encoding="utf-8").decode("utf-8")

コンポーネントを動的に増やす

Streamlitではテキストボックスなどのコンポーネントを動的に増やすことができます。これが簡単にできるのはStreamlitの特徴の一つです!
今回は最終的な構造化データを何列にするかをユーザに指定してもらうのですが、コンポーネントが動的に増やせることを活かして、それぞれの列の抽出条件を入力するテキストボックスを作成しました。

for i in range(columns_num):
    with tabs[i]:
        _path = st.text_input("XPath", typical_tag, key=f"path_{i}")

DataFrameをSnowflakeに保存する

DataFrameはPandas DataFrameが有名ですが、SnowflakeにはSnowflake専用のSnowPark DataFrameというDataFrameが用意されています。
Spark DataFrameにインタフェースが近い、データ操作がSnowflake上で行われるなどの特徴があります。
また、save_as_table()関数でSnowflake上のテーブルとしてDataFrameを保存することができます。

from snowflake.snowpark.context import get_active_session
session = get_active_session()
snowpark_df = session.create_dataframe(df)
result = snowpark_df.write.mode(mode).save_as_table(table_name)

ソースコード

ソースコードは以下の通りです。
通常のStreamlit環境でもある程度は動きますが、Streamlit in Snowflake上で動かすとSnowflakeに構造化したデータを書き込むことができます。

xml_loader.py
import streamlit as st
import pandas as pd
import xml.etree.ElementTree as ET

# 文字列をキャストする関数
def my_cast(s: str, data_type: str):
    if data_type == "int":
        return int(s)
    elif data_type == "float":
        return float(s)
    else:
        return s

# 子要素のタグを取得する関数。なくてもいい
def get_child_tags(element):
    return [child.tag for child in element]

# ファイルアップローダ
uploaded_xml = st.file_uploader("XMLファイルを選択", type=["xml"])

# ファイルがアップロードされていれば
if uploaded_xml:
    # XMLファイルを文字列として読み込む
    xml_content = uploaded_xml.read().decode("utf-8")
    # &を含むXMLファイルを正しく表示するために、&をエスケープする
    xml_content = xml_content.replace("&", "&")

    # 読み込んだXMLファイルの内容を表示
    with st.expander("XMLファイルの内容"):
        st.code(xml_content, language="xml")

    # ElementTreeでXMLファイルを解析
    root = ET.fromstring(xml_content)

    # 代表的なタグを取得
    typical_tag = ""
    child_tags = get_child_tags(root)
    if len(child_tags) > 0:
        typical_tag = child_tags[0]

    # 抽出するカラム数を指定してもらう
    columns_num = st.number_input("カラム数", min_value=1, max_value=10, value=3, step=1)

    # 指定されたカラム数だけタブを作成
    tab_list = [f"[{i+1}] XPath" for i in range(columns_num)]
    tabs = st.tabs(tab_list)

    # 各タブで抽出するカラムのXPathを入力してもらう
    for i in range(columns_num):
        with tabs[i]:
            # ここからは各タブの内容
            # Streamlitにおいて各入力コンポーネントの値はst.session_state.{key}に保存される。後でこのkeyを使って値を取得する
            _path = st.text_input("XPath", typical_tag, key=f"path_{i}")
            _type = st.selectbox("", ["text", "int", "float"], key=f"type_{i}")
            # XPathを使って要素を取得
            if _path:
                # findall()メソッドにXPathを渡すと、それに一致する要素がリストで得られる
                elements = root.findall(_path)
                if len(elements) > 0:
                    # 最初の要素のみをプレビュー表示する
                    element = elements[0]
                    # 一致した要素をXMLで表示する
                    parted_xml = ET.tostring(element, encoding="utf-8").decode("utf-8")
                    st.code(parted_xml, language="xml")
                    # 一致した要素の値を取得して型変換を試す
                    text = element.text.strip()
                    try:
                        _ = my_cast(text, _type)
                        st.success(f"変換成功: {_type}(\"{text}\")")
                    except ValueError:
                        st.error(f"変換エラー: {_type}(\"{text}\")")

# ここまでの設定に基づいてデータを抽出する
if st.button("データ抽出"):
    # このDataFrameに抽出したデータを格納する
    df = pd.DataFrame()

    for i in range(columns_num):
        _path = st.session_state[f"path_{i}"]
        _type = st.session_state[f"type_{i}"]
        data_array = []
        # XPathを使って要素を取得
        elements = root.findall(_path)
        for element in elements:
            # 要素の値を取得して型変換する
            try:
                d = my_cast(element.text.strip(), _type)
                data_array.append(d)
            except ValueError:
                pass
        # データをDataFrameに追加する。カラム名はタグから決定する
        if len(data_array) > 0:
            column_name = _path.split("/")[-1]
            df[column_name] = data_array

    # データを格納したDataFrameをst.session_stateに保存する
    st.session_state["df"] = df

# データが格納されたDataFrameを表示する
if "df" in st.session_state:
    # データを格納したDataFrameを取得する
    df = st.session_state["df"]
    st.dataframe(df, use_container_width=True)

    # Snowflakeに書き込む処理
    st.divider()
    st.caption("以降はStreamlit in Snowflakeのみで動作します")

    # Snowflakeに書き込むための設定を入力してもらう
    table_name = st.text_input("テーブル名", "")
    mode = st.selectbox("書き込みモード", ["append", "overwrite", "errorifexists", "ignore"])
    if table_name:
        if st.button("Snowflakeに保存"):
            # Snowflakeに書き込む処理
            try:
                from snowflake.snowpark.context import get_active_session
                session = get_active_session()
                snowpark_df = session.create_dataframe(df)
                result = snowpark_df.write.mode(mode).save_as_table(table_name)
                st.info("データを保存しました")
            except Exception as e:
                st.exception(e)
2
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
2
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?