3
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 1 year has passed since last update.

DENSOAdvent Calendar 2021

Day 5

PythonからSnowflakeを操作する際のケースインセンシティブな注意点

Last updated at Posted at 2021-12-04

はじめに

データウェアハウスとしてクラウドサービスであるSnowflake、近年注目株で業務に使用している方もいると思います。今回はこのSnowflakeにPythonからクエリを実行させる際、ケースインセンシティブの扱いで当時私がハマった内容と対応方法について記録しておきます。

前提

使用した言語やライブラリ

  • python 3.8
  • pandas 1.1.5
  • sqlalchemy 1.3.21
  • snowflake-sqlalchemy 1.2.4

Snowflakeのケースインセンシティブな仕様

ドキュメント曰く、Snowflakeではテーブルやスキーマなどのオブジェクト名は大文字がデフォルトであり、小文字として認識させるためには"でオブジェクト名を囲わないといけません。

つまりデータベース名・スキーマ名・テーブル名をhoge.fuga.piyoと書いてクエリを投げると、SnowflakeはHOGE.FUGA.PIYOに変換してオブジェクトを探しに行きます。これを防ぐためには"hoge"."fuga"."piyo"と書く必要があります。

問題になったケースの詳細

Snowflakeだけを使う場合は上記仕様は特に気にしなくていいのですが、実際には外部からデータを継続的にインポートしたりすると思います。

今回私はSnowflakeが提供するデータパイプライン機能を使い、AWSのS3バケット上にアップロードしたファイルのデータを自動でインポートするようにしていました。この際インポート先のスキーマとテーブルを、S3上のフォルダ名からコピーし自動で生成するLambdaを組んでいます。ちなみにテーブル名は自動的に大文字のサフィックスが付きます。
image.png

するとS3のオブジェクト名はケースセンシティブなので、同じ名前で大文字と小文字が違うだけのフォルダを作られると一緒のスキーマで扱われてしまいます。稀なケースではありますが、これをケアするためにはSnowflakeもケースセンシティブで扱いたくなります。

ということで、公式がツールキット(Snowflake SQLAlchemy)を用意してくれていますので、"を付けてPandasでread_csvからデータファイルの中身を引っこ抜いてto_sqlすれば自動的にCREATE TABLEしてINSERTして無事終わり…とはなりません (´・ω・`)ドウシテ

SQL compilation error:
Object does not exist, or operation cannot be performed.

The above exception was the direct cause of the following exception:
 sqlalchemy.exc.ProgrammingError: (snowflake.connector.errors.ProgrammingError) 002043 (02000): 019ace41-008e-12b4-0000-20090001e2f2: SQL compilation error: Object does not exist, or operation cannot be performed. [SQL: SHOW /* sqlalchemy:get_table_names */ TABLES IN fuga] (Background on this error at: http://sqlalche.me/e/13/f405)

具体的には、テーブルは問題なく作成できるのですがエラーメッセージが出力されます。また、エラーになるのはスキーマ名が全て小文字の場合です。

原因

まずはエラーメッセージから問題となった処理を探します。具体的にはpandas内の下記関数の実行時にエラーを吐いていました。

def check_case_sensitive(
        self,
        name,
        schema,
    ):
        """
        Checks table name for issues with case-sensitivity.
        Method is called after data is inserted.
        """
        if not name.isdigit() and not name.islower():
            # check for potentially case sensitivity issues (GH7815)
            # Only check when name is not a number and name is not lower case
            engine = self.connectable.engine
            with self.connectable.connect() as conn:
                if _gt14():
                    from sqlalchemy import inspect

                    insp = inspect(conn)
                    table_names = insp.get_table_names(
                        schema=schema or self.meta.schema
                    )
                else:
                    table_names = engine.table_names(
                        schema=schema or self.meta.schema, connection=conn
                    )
            if name not in table_names:
                msg = (
                    f"The provided table name '{name}' is not found exactly as "
                    "such in the database after writing the table, possibly "
                    "due to case sensitivity issues. Consider using lower "
                    "case table names."
                )
                warnings.warn(msg, UserWarning)

コメント部分にあるように、データがインサートされた後にわざわざテーブル名を確認して、ケースセンシティブな食い違いがあったら警告してくれるようです。

そしてengine.table_namesから更に辿っていくと、Snowflake SQLAlchemyの関数からテーブル名を取得しています。SHOW /* sqlalchemy:get_table_names */ TABLES INの部分がエラーメッセージに出ています。

 def get_table_names(self, connection, schema=None, **kw):
        """
        Gets all table names.
        """
        schema = schema or self.default_schema_name
        current_schema = schema
        if schema:
            cursor = connection.execute(text(
                "SHOW /* sqlalchemy:get_table_names */ TABLES IN {0}".format(
                    self._denormalize_quote_join(schema))))
        else:
            cursor = connection.execute(text(
                "SHOW /* sqlalchemy:get_table_names */ TABLES"))
            _, current_schema = self._current_database_schema(connection)

        ret = [self.normalize_name(row[1]) for row in cursor]

        return ret


    def _denormalize_quote_join(self, *idents):
        ip = self.identifier_preparer
        split_idents = reduce(
            operator.add,
            [ip._split_schema_by_dot(ids) for ids in idents if ids is not None])
        return '.'.join(
            ip._quote_free_identifiers(*split_idents))

denormalize_quote_join(schema)…どうやらケースセンシティブな扱いが必要かを自動で判断して補正してくれる関数のようです。(´・ω・`)ココガアヤシイ
そしてさらに辿っていくと、最終的にケースセンシティブな扱いが必要かどうかを判断するロジックがSQLAlchemy内にありました。

    def _requires_quotes(self, value):
        """Return True if the given identifier requires quoting."""
        lc_value = value.lower()
        return (
            lc_value in self.reserved_words
            or value[0] in self.illegal_initial_characters
            or not self.legal_characters.match(util.text_type(value))
            or (lc_value != value)
        )

これを見るとlc_value != value、つまり与えられたオブジェクト名が全て小文字の場合は"が必要ないと見なしています。(´・ω・`)ギャクジャン
つまりテーブル作成やインサートはできるものの、その後の確認処理でスキーマ名が全て小文字だとケースインセンシティブなままSnowflakeにクエリを投げてしまい、Snowflake側で大文字に変換されるためスキーマが不在でObject does not existと出てきたわけです。(スキーマ自体は小文字で作成されている)

これはSnowflakeのドキュメントにあるように、SQLAlchemyとSnowflakeのケースセンシティブな条件が異なることに起因します。ただドキュメント通りならあらかじめ"で囲ってあげれば問題ないはずですが、残念ながらエラーになります。というのも、ダブルクォーテーションは処理の中でオブジェクトの区切り文字として使われ省かれてしまうからです。
実際スキーマ名を"で囲って実行してログに出しても見事に剝がされます。

 def _split_schema_by_dot(self, schema):
        ret = []
        idx = 0
        pre_idx = 0
        in_quote = False
        while idx < len(schema):
            if not in_quote:
                if schema[idx] == '.' and pre_idx < idx:
                    ret.append(schema[pre_idx:idx])
                    pre_idx = idx + 1
                elif schema[idx] == '"':
                    in_quote = True
                    pre_idx = idx + 1
            else:
                if schema[idx] == '"' and pre_idx < idx:
                    ret.append(schema[pre_idx:idx])
                    in_quote = False
                    pre_idx = idx + 1
            idx += 1
            if pre_idx < len(schema) and schema[pre_idx] == '.':
                pre_idx += 1
        if pre_idx < idx:
            ret.append(schema[pre_idx:idx])
        return ret

この処理をちょっと弄った上でスキーマ名を"で囲ってあげると、無事小文字オンリーのスキーマ名でテーブルを確認しにいってくれるようになりました。

対応方法

ライブラリを弄って自分で管理し続けるのは嫌なのでそれ以外で考えます。ということで自分でCREATE TABLEしてデータ行ごとにINSERTするか、もしくはエラーメッセージでフィルターしてスルーするか。今回は誤作動もなさそうだし後者にしました。

以上

3
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
3
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?