今回は Digdag と Embulk を用いて分析基盤を作ったことについて書きます。
意外とハマりポイントが多く、ネット上に知見もそこまで多くなかったため Tips や解決策を書きます。
実行時の環境は以下になります。
Embulk v0.9.22
embulk-input-mysql 0.10.1
embulk-output-mysql 0.8.7
MySQL 5.7.21
背景
社内では Redash を用いて様々な部署の方がデータの分析をしています。
Redash からアクセス可能なデータの中には秘匿情報も含まれるため、権限管理で分析基盤へのアクセスを厳しく制限する形をとっていました。そこで、社内でより分析をしやすい状態を目指し、秘匿情報をマスキングすることで社員なら誰でもアクセスして問題ない状態を作ることにしました。
このような仕組みを作ると何かしらの情報漏洩が発生するリスクを限りなく小さくできるので、サイトのみならず、会社の信頼性に繋がります。やったね。
今回はワークフローエンジンの Digdag とバルクデータローダーの Embulk を使いました。
それぞれどういうものかはネット上に記事が多いのでここでは割愛します。
説明はこちらの記事が分かりやすいです。
構成
CloudWatchEvents から ECS (Fargate) のタスクを定期実行するようにしています。
こうすると、バッチの実行時のみ課金されるため利用料金を節約できます。
Embulk の INPUT には読み込み権限のみを持った DB、 OUTPUT には新規に作成した分析用の DB を指定します。
Tips
この構成を作る上で乗り越えることがいくつかありました。
細かい話ですが、同じ状況に遭遇した際の解決に役立ちそうなので書いていきます。
コンテナ内の環境変数を Embulk で使いたい
production 環境では Docker コンテナ内に定義した環境変数を使いたいです。
以下は調べると良く出てくるサンプルです。
timezone: UTC
_export:
INPUT_DATABASE_HOST: ${database.input_host}
INPUT_DATABASE_USER: ${database.input_user}
INPUT_DATABASE_PASSWORD: ${database.input_password}
INPUT_DATABASE_NAME: ${database.input_name}
OUTPUT_DATABASE_HOST: ${database.output_host}
OUTPUT_DATABASE_USER: ${database.output_user}
OUTPUT_DATABASE_PASSWORD: ${database.output_password}
OUTPUT_DATABASE_NAME: ${database.output_name}
database:
input_host: docker.for.mac.localhost
input_user: root
input_password: ""
input_name: development
output_host: docker.for.mac.localhost
output_user: root
output_password: ""
output_name: development_copy
# 起動
digdag run main.dig -P config/development.yml
良くあるサンプルなのですが、production の時はどうするのかという問題に当たります。
config/production.yml
に直接 DB 情報をベタがきする訳にもいかないので、この方法は使えません。
そこで、Embulk のタスクファイル内で直接環境変数を読むようにしました。
in:
type: mysql
host: {{env.INPUT_DATABASE_HOST}}
user: {{env.INPUT_DATABASE_USER}}
{% if env.INPUT_DATABASE_PASSWORD != '' %}
password: {{env.INPUT_DATABASE_PASSWORD}}
{% endif %}
database: {{env.INPUT_DATABASE_NAME}}
table: sample
select: "*"
out:
type: mysql
host: {{env.OUTPUT_DATABASE_HOST}}
port: {{env.OUTPUT_DATABASE_PORT}}
user: {{env.OUTPUT_DATABASE_USER}}
{% if env.OUTPUT_DATABASE_PASSWORD != '' %}
password: {{env.OUTPUT_DATABASE_PASSWORD}}
{% endif %}
database: {{env.OUTPUT_DATABASE_NAME}}
table: sample
select: "*"
mode: merge
このようにして書くと Embulk が Docker コンテナ内の環境変数を使ってくれます。
プラグインが対応していないカラムの型があった
embulk のプラグインで embulk-output-mysql を使っていました。
ログは CloudWatch に流れていくのですが、ログを見ると以下のようなエラーが出ていました。
Data truncation: Data too long for column 'body' at row 21
とあるテーブルの body 21行目のカラムを見ても値は NULL ... おかしいなと思い、再度 README を読んでみると Supported types の項目が。
ここでエラーが出ていた body のカラムの型を見てみると mediumtext 型でした。
単純にサポートされてなかったようです。PR 投げて直せたら良いですが、一旦の退避策としては以下のようになります。
out:
...
column_options:
body: { type: TEXT }
is_sample: { type: TINYINT }
mode: merge
filters:
- type: mask
columns:
- { name: body, type: substring, length: 65535 }
サポートされている TEXT 型で書き出すように設定します。また、embulk-filter-mask というプラグインを使っていたため、合わせて最大文字サイズも調整します。
boolean 型のサポートも今後対応との事で、TYNYINT で指定します。
mode: merge でマージされない
mode: merge
は既存のデータがあったら上書き、無ければ新規追加する挙動と認識していましたが、私の環境では上書きがされず、実行する度にレコードが増えていく挙動に遭遇しました。
原因は、OUTPUT の DB を0の状態から作ろうとしていたのですが、初回の実行時に、上書きに必要な情報である Primary key が設定されていなかったからでした。
Embulk の input, output は別プラグインで情報の共有はできないため、出力先は事前に Primary key 属性をつけてテーブルを作っておくか、column_options で primary keyを指定する必要がありました。
out:
type: mysql
column_options:
id: { type: 'bigint primary key' }
初回の実行で primary key が反映されていることを確認後、2回目の実行からはちゃんと merge されました。
index が貼られない
Embulk はデフォルトで index を貼ってくれません。
create_table_constraint
を使うと、テーブルの作成時に index を貼ることができます。
out:
...
create_table_constraint: '
KEY `index_users_on_user_column_1` (`user_column_1`),
KEY `index_users_on_user_column_2` (`user_column_2`),
'
DB のタイムアウト
Communications link failure
The last packet successfully received from the server was 114,983 milliseconds ago. The last packet sent successfully to the server was 1 milliseconds ago.
CloudWatch のログをみると、上記エラーが発生していました。
調べると MySQL の timeout が関係しているようなので調査をし、結果的には wait_timeout
が原因だったので値を増やして解決しました。
参考: https://qiita.com/uchiko/items/b27537c0a100b6537a4a
もし数値がデフォルトの socketTimeout (1800000) に近しい場合、
options で socketTimeout を伸ばすことで解決する場合もあります。
out:
type: mysql
options: { socketTimeout: 7200000 }
RDS のメモリ不足
# CloudWatch のエラーログ
No operations allowed after statement closed
# RDS のエラーログ
vailable memory is low.
...
<jemalloc>: Error in mmap(): err: 12, msg: Cannot allocate memory
<jemalloc>: Error in malloc(): out of memory
その次に CloudWatch に別のエラーが出ており原因がいまいち掴めないなと困っていた所、RDS のエラーログを見ると明確に OOME が出ていました。インスタンスタイプを上げて回避しましたが、バッチサイズを指定するなど、調整どころになります。
Slack 通知
タスクが終了した後の結果を通知したいときは、便利なライブラリがあったので紹介します。
このサンプルで ENVIRONMENT の値を動的に取りたかったのですが、調べる限り dig ファイルの中で直接環境を取得する方法は見当たらず、以下のような追記が必要そうです。
_export:
..
+step1:
rb>: SetEnv.run
require: 'config/set_env'
+load:
..
class SetEnv
def run
Digdag.env.store(ENVIRONMENT: ENV['ENVIRONMENT'])
end
end
もちろん Dockerfile にも ruby をインストールする記述が必要です。
RUN apk --update --no-cache add ... ruby ruby-bundler ruby-json && \
...
通知結果
時間がずれる
embulk-input-jdbcのMySQLプラグインで9時間時間がずれる
https://qiita.com/katsuyan/items/f22dc5a86522ba3c5652
その他
その他やったこととして Terraform でこの構成を作る事や、Redash と DB の間に HAProxy がいたので、一台の HAProxy でマスク前/後のDBに正しく Redash からリクエストを割振れるようにしたりと、やる事がたくさんありました。
(おまけ) Digdag 全体のコード
timezone: UTC
_export:
plugin:
repositories:
- https://jitpack.io
dependencies:
- com.github.szyn:digdag-slack:0.1.4
webhook_url: xxx
workflow_name: DB Masking
+step1:
rb>: SetEnv.run
require: 'config/set_env'
+load:
_parallel: true
+load1:
for_each>:
table: [
hoge,
...
]
_do:
sh>: embulk run ./tasks/${table}.yml.liquid
_check:
slack>: notify_templates/success.yml
_error:
slack>: notify_templates/failed.yml
+load2:
for_each>:
table: [
huga,
...
]
_do:
sh>: embulk run ./tasks/${table}.yml.liquid
_check:
slack>: notify_templates/success.yml
_error:
slack>: notify_templates/failed.yml
まとめ
今回は didag と embulk を使った分析基盤の作成(データのマスキング処理)と Redash からアクセス可能にするまでを行いました。このようなところでも技術を用いて広義の信頼性を高められるので、面白いなと思います。
Twitter のフォローもどうぞよろしくです! (@hassasa3)