5
4

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

AthenaからSnowflakeへ移行するときの躓きポイント集

Last updated at Posted at 2024-10-24

に一部書いたのですが、検討の末にデータ基盤をAmazon AthenaからSnowflakeへ移行しました。

構成の紹介は別途行うのですが、涙なし(?)では語れない移行の躓きポイントを紹介させてください。
(総合的には苦戦した部分もありましたが、将来のデータ分析に幅が広がり満足しているのでご安心を)

また回避方法等あればコメント待ってます!

躓きポイント集

都度S3へ置き換え配置されるcsv,paraquatファイルは、snowpipeの場合、全件追加になってしまう

当社の環境ではAthenaの構成を活かしたこともあり、RDSのparaquatデータは「Amazon RDS Snapshot Export to S3」を用いてS3に毎時 or 日次で置き換え配置されます。

ユーザーメンテナンスのデータ基盤で扱いたいcsvはGoogle スプレッドシートからGASでS3にアップロードされます。

snowpipeを使った場合、ファイルが置き換え配置された場合、増分取り込みではなく全件取り込まれてしまうようです。(サポートより)

一時テーブルを組んで増分転送を組むという手段もありますが、複雑になるため、外部テーブル or truncate & copy intoを使うことにしました。

ゼロから作るなら2024年7月にプレビューされた「Snowflake Connector for PostgreSQLとSnowflake Connector for MySQL」を検討するのもありかと思います。

create external table でcsvファイルにINFER_SCHEMAを使うことができない

parquetの外部テーブルの場合、以下のようにINFER_SCHEMAを使うことでparquetの定義を参照していい感じに外部テーブルもしくはテーブルを作成することができます。

create or replace external table <table_name>
  USING TEMPLATE (
    SELECT ARRAY_AGG(OBJECT_CONSTRUCT(*))
      FROM TABLE(
        INFER_SCHEMA(
          LOCATION=>'@<table_name>/',
          FILE_FORMAT=>'my_parquet_format'
        )
      ))
      with location = @<table_name>/
      FILE_FORMAT= (type = parquet)
      AUTO_REFRESH = TRUE;
     

結論としては外部テーブルを作成する際にカラムの定義をちゃんと書く形にしました。
不定期に更新されるcsvの件数としては10件ちょっとだったのでAthenaで使っていた定義からいい感じにクエリを作るという形でなんとかなりました。

CREATE OR REPLACE FILE FORMAT my_csv_format
  TYPE = CSV
  SKIP_HEADER = 1
  EMPTY_FIELD_AS_NULL = true;

create or replace external table status_code(
code int as (value:c1::int),
status string as (value:c2::string)
)
  with location = @status_code/
  FILE_FORMAT= my_csv_format
  AUTO_REFRESH = TRUE
;
  • 先述したようにcreate table & copy intoの場合はINFER_SCHEMAが使えるが、全件追加取り込みされるので選定できなかった
  • MATCH_BY_COLUMN_NAMEとかを駆使してもできない
  • FILE FORMATを2種類用意して無理やりやるパターンもだめ。
-- NG例
CREATE OR REPLACE FILE FORMAT my_csv_format
TYPE = 'CSV'
PARSE_HEADER = TRUE
-- SKIP_HEADER = 1
;

create or replace external table status_code
 USING TEMPLATE (
 SELECT ARRAY_AGG(OBJECT_CONSTRUCT(*))
 FROM TABLE(
 INFER_SCHEMA(
 LOCATION=>'@status_code/',
 FILE_FORMAT=>'my_csv_format'
 )
 ))
 with location = @status_code/
 FILE_FORMAT ='my_csv_format'
 AUTO_REFRESH = TRUE
 --MATCH_BY_COLUMN_NAME=CASE_INSENSITIVE
;

select * from status_code;
--
無効なファイル形式 "PARSE_HEADER" is only allowed for CSV INFER_SCHEMA and MATCH_BY_COLUMN_NAME
--

Amazon S3に対するファイルが置き換え配置される外部テーブルでS3通知による自動更新オプションを使っている場合、更新回数毎に読み込みが遅くなっていく

冒頭に出てきたRDSのExport to S3で配置されるケースになります。ファイルの増分の場合は問題ないです。

時系列に紹介

  • 外部テーブルにおいて、ファイルが置き換えられるとselectがエラーになる
    • ファイルハッシュレベルで外部テーブルを定義している。Athenaだと置き換わったり追加されてもよしなに使えていて便利だった
  • 上記を回避するために、Amazon S3に対する外部テーブルの自動更新 を設定する
  • これで安心と思いきや、ファイルを置き換えるたびに、selectが遅くなっていく。
    • 数秒だったのが数分になっていきます
  • ALTER EXTERNAL TABLE REFRESH; をするとselectが遅いのが解消される
  • サポートに確認したところ、自動更新だとファイルの差分情報が蓄積されて過去のファイルも読みに行こうとしてしまう。REFRESHを行うと解消される。
    • 自動更新によるREFRESHとコマンドによるREFRESHの挙動が違うという罠
  • これによって移行期間中に実行しているタスクがじわじわ遅くなり、クレジットを消費しまくりました…悲しい

自動更新 + taskで適切な間隔で手動でREFRESHを実行する運用となっています。

external table周りでいくつか

  • デフォルトでvalue列を非表示にするオプションは無い(excludeオプションを使ったviewを別途定義する必要がある)
  • 分析環境で使っているRedashでは標準のままだとextanal tableがtable一覧に表示されない
    • →2点を加味してすべての外部テーブルはvalueを除いたviewを作成
  • 大文字小文字を区別しない設定(ALTER ACCOUNT SET QUOTED_IDENTIFIERS_IGNORE_CASE = TRUE;)の場合、selectの結果のカラム大文字になり、Redashで移行する際に影響が出る
    • クエリをidそのままで移行(書き換え)する際に、グラフ等々の定義をし直す必要があり(RedashのDBを無理やり更新する手もありますが…)気合で移行しました。
    • Redashの <カラム名>::multi-filter が使えない。Query Based Dropdown Listでなんとか回避しました

カラムを途中に追加できない

末尾に追加するしか無い
どうしても順番にこだわりたいなら一時テーブルに対ししてinsertすれば実現はできる

タスクや通知

  • 通知に関しては、最終的にSlack通知を組みました
  • 成功通知はtaskやプロシージャ内にうまくタスクを組み込む必要がある
  • 理想はAWSの後続のタスクをキックするために何かしらの連携を組みたかったが、組むのが面倒で諦めて終わっているであろう時間を定期実行する形にした。
    • 組むとしたら、LambdaとかのAPI Gatewayを用意して、Snowflakeの外部連携でAPIを叩く感じ
  • 失敗通知で、AWSのSNS経由のやつを設定しようとしたがなぜか飛ばなかった(設定ミスと思われるが解決できず)
    • 最終的にSlack通知組んだので、直近1時間のタスクの失敗数をカウントするタスクを作って、カウントがあれば通知するようにした。
-- taskの失敗監視
CREATE OR REPLACE PROCEDURE CHECK_TASK_FAILURE()
RETURNS STRING
LANGUAGE SQL
EXECUTE AS CALLER
AS
$$
BEGIN
    -- タスク失敗のカウントを保持する変数
    LET task_failed_count INTEGER DEFAULT 0;

    -- 直近1時間のタスク失敗数を取得
    SELECT 
        COUNT(*) INTO :task_failed_count
    FROM 
        SNOWFLAKE.ACCOUNT_USAGE.TASK_HISTORY
    WHERE 
        STATE = 'FAILED'
        AND COMPLETED_TIME >= DATEADD(hour, -1, CURRENT_TIMESTAMP);
    
    -- タスクが失敗しているか確認
    IF (task_failed_count > 0) THEN
        -- タスクが失敗している場合の処理
        SELECT
        'channelID' as channel
        ,'snowflakeのタスクが失敗しました' as message
        ,post_to_slack(channel ,message) as slack_notify;
        RETURN 'Task failed, action executed.';
    ELSE
        -- タスクが失敗していない場合の処理
        RETURN 'No task failure detected.';
    END IF;
END;
$$;
  • 毎時のワークフローの中で日次のみ実行されるものを作るときは、プロシージャ内にifを作るしか無い
    • タスクの定義内でやったほうがメンテナンス性が高いが、サポートよりできないとのこと
CREATE OR REPLACE PROCEDURE ex()
RETURNS STRING
LANGUAGE SQL
AS
$$
BEGIN
  IF (TO_CHAR(CURRENT_TIMESTAMP(), 'HH24') = '20') THEN
    -- 処理を記載
    RETURN 'Completed';
  ELSE
    -- 該当時間以外はスキップ
    RETURN 'not run';
  END IF;
END;
$$;

Redashの接続設定

Snowflakeのユーザー設定 or Redashの接続設定の何かが間違えると接続エラーになってしまいます。
アカウントログを見ながら、そもそもログインが実行されたのかどうか見ながらトラブルシューティングします。

  • Account
    • xxxxxx-xxxxxxx のやつをいれる。フルのURLいれるとエラーになる
  • Region
    • 入れない
  • User
    • デフォルトロールを指定しておく必要がある。ウェアハウス、データベースも指定しておいたほうがベター
  • Password
  • Warehouse
    • 入れておく。Redashのクエリ画面でウェアハウスを変更しても、ここで指定したものになる。早いやつと普通のやつを使い分けるときは2つ接続先を作る必要がある。
  • Database
    • 入れる

Embulk

Embulkは常になにかハマりポイントがあるのですっかり忘れていましたが、書いておきます。(追記)
Snowflakeをインプット(移行時)、アウトプット(リバースETL用)両方に使いました。

Docker.file
FROM openjdk:8-jre-alpine

RUN wget -q https://dl.embulk.org/embulk-0.10.50.jar -O /bin/embulk \
  && chmod +x /bin/embulk

RUN apk add --no-cache libc6-compat

RUN mkdir -p /opt/snowflake \
  && wget --no-check-certificate -O /opt/snowflake/snowflake-jdbc.jar "https://repo1.maven.org/maven2/net/snowflake/snowflake-jdbc/3.16.0/snowflake-jdbc-3.16.0.jar"

RUN mkdir -p /bin/jruby \
&& wget --no-check-certificate -O /bin/jruby/jruby-complete-9.4.5.0.jar "https://repo1.maven.org/maven2/org/jruby/jruby-complete/9.4.5.0/jruby-complete-9.4.5.0.jar"
RUN chmod +x /bin/jruby/jruby-complete-9.4.5.0.jar

COPY ./embulk.properties /root/.embulk/embulk.properties
RUN embulk gem install embulk -v 0.10.50
RUN embulk gem install msgpack -v 1.7.2
RUN embulk gem install liquid
RUN embulk gem install bundler
RUN embulk gem install embulk-input-jdbc
RUN embulk gem install embulk-output-jdbc
RUN embulk gem install embulk-input-mysql
RUN embulk gem install embulk-output-mysql
RUN apk add --no-cache mysql-client

WORKDIR /work
COPY entrypoint.sh /usr/bin/
RUN chmod +x /usr/bin/entrypoint.sh

COPY config /work

そもそもどのプラグイン使うのかと言うのが迷子になる
ドライバとembulkのバージョンの組み合わせを新しくしすぎるとうまく動かなかった気がする
ともかくjdbcプラグインでSnowflakeに接続するまでがかなり大変だった気がする。

docker-compose.yml
services:
  embulk:
    build: .
    env_file:
      - .env
    depends_on:
      - mysql
  mysql:
    image: mysql:8.0
    command:
      --default-authentication-plugin=mysql_native_password
      --character-set-server=utf8mb4
      --collation-server=utf8mb4_unicode_ci
    ports:
      - 3306:3306
    restart: always
    environment:
      - MYSQL_ROOT_PASSWORD
      - MYSQL_DATABASE=${DB_DATABASE}

既存で使っていたものが開発用にmysqlコンテナ立てるやつだったので、そのまま。
(単体のコンテナにしようとしたけどエラー出ていいやってなった)

embulk.yml.liquid
in:
  type: mysql
  host: {{ env.DA_DB_HOST }}
  user: {{ env.DA_DB_USER }}
  password: {{ env.DA_DB_PASSWORD }}
  database: {{ env.DA_DB_DATABASE }}
  port: {{ env.DA_DB_PORT }}
  query: |
    select * from table_name
  columns:
    - {name: as_of_date, type: date}
out:
  type: jdbc
  driver_class: net.snowflake.client.jdbc.SnowflakeDriver
  driver_path: /opt/snowflake/snowflake-jdbc.jar
  url: {{ env.SNOWFLAKE_URL}}
  user: {{ env.SNOWFLAKE_USER}}
  password: {{ env.SNOWFLAKE_PASSWORD}}
  table: table_name
  mode: insert -- mergeにするとエラーになる
  create_table_constraint: 'primary key()'
  column_options:
    as_of_date: {type: 'date NOT NULL'}

-- Snowflakeユーザーにデフォルトスキーマを指定しておかないと、embulkで作る一時テーブルが別の場所に作られてしまいエラーになる
-- 以下内容を.envファイルに持たせている
SNOWFLAKE_URL=jdbc:snowflake://xxxxx-xxxxxx.snowflakecomputing.com
SNOWFLAKE_USER=
SNOWFLAKE_PASSWORD=
DA_DB_HOST=host.docker.internal
DA_DB_USER=
DA_DB_PASSWORD=
DA_DB_DATABASE=
embulk.yml.liquid
in:
  type: jdbc
  driver_class: net.snowflake.client.jdbc.SnowflakeDriver
  driver_path: /opt/snowflake/snowflake-jdbc.jar
  url: {{ env.SNOWFLAKE_URL}}
  user: {{ env.SNOWFLAKE_USER}}
  password: {{ env.SNOWFLAKE_PASSWORD}}
  query: |
    SELECT * FROM table_name
  columns:
    - {name: as_of_date, type: string}
out:
  type: mysql
  host: {{ env.DB_HOST }}
  user: {{ env.DB_USER }}
  password: {{ env.DB_PASSWORD }}
  database: {{ env.DB_DATABASE }}
  port: {{ env.DB_PORT }}
  table: table_name 
  mode: merge
  create_table_constraint: 'primary key()'
  column_options:
    as_of_date     : {type: date, value_type: string}

snowflakeをインプットにするときはそんなにハマりどころはなかった気がする。

終わりに

今回の躓きポイントは、troccoやairflowをはじめとするETLツールを使っていないというのもありますが、移行前も今回も規模も小さめなことから、なんとかなってしまっているのでこのあたりの導入は迷いどころです。

「サポートより」の記載がある部分については、Snowflakeのサポートにケースを起票しご協力いただきながら白黒つけていきました。日本語で対応できる方もそれなりにいらっしゃるようで、だいたい1週間前後でクローズできました。

Japan Snowflake User GroupであるSnowVillageも覗いていましたが、今回挙げた躓いたところを質問するには至りませんでした。込み入った話でサポートに確認したほうが良さそうな内容が多かったり、解決したいことが多すぎて、自己解決 or サポート解決 or コミュニティでの質問の選択肢が並んだときに、質問しすぎるのも良くないなと。
本記事である程度まとめられたかなと思うので聞いてみたいなとは思ってます。

Athenaからの移行の事例はちらほらと見かけるようになりましたが、ETLツールなどを含めた構成の一致となるとより限られてきますし、公開されている情報は概要的なところはあれど、こういった躓きポイントは少ない気がします(観測できてないだけかも)

2024年前半に移行した時点での情報ですので、勢いよく機能追加しているSnowflakeでは刻々と状況が変わっていくかと思います。適宜参考にしつつその時の最適な選択を各々して頂けると幸いです。

冒頭にも上げた通り、移行についての記事は追って記載中ですのでお待ち下さい。
(構成図をどこまで作り込むかが鍵になりそうですw)

↑書きました

5
4
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
5
4

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?