Help us understand the problem. What is going on with this article?

【会社の信頼性エンジニアリング】Digdag + Embulk によるデータマスキング

こんにちは。

今年の10月に Web エンジニアから SRE エンジニアになった hassan です。
SRE チームに入って大きく変わったことと言えば、新機能の開発や機能の改修はしなくなり、よりインフラに近い仕事が増えたことです。

SRE エンジニアとしての最初の業務は Digdag と Embulk を用いて分析基盤を作ることでした。
意外とハマりポイントが多く、ネット上に知見もそこまで多くなかったため Tips や解決策を書きます。

背景

社内では Redash を用いて様々な部署の方がデータの分析をしています。

Redash からアクセス可能なデータの中には秘匿情報も含まれるため、権限管理で分析基盤へのアクセスを厳しく制限する形をとっていました。そこで、社内でより分析をしやすい状態を目指し、秘匿情報をマスキングすることで社員なら誰でもアクセスして問題ない状態を作ることにしました。

このような仕組みを作ると何かしらの情報漏洩が発生するリスクを限りなく小さくできるので、サイトのみならず、会社の信頼性に繋がります。やったね。

今回はワークフローエンジンの Digdag とバルクデータローダーの Embulk を使いました。
それぞれどういうものかはネット上に記事が多いのでここでは割愛します。

説明はこちらの記事が分かりやすいです。

構成

スクリーンショット 2019-12-07 14.06.07.png

CloudWatchEvents から ECS (Fargate) のタスクを定期実行するようにしています。
こうすると、バッチの実行時のみ課金されるため利用料金を節約できます。

Embulk の INPUT には読み込み権限のみを持った DB、 OUTPUT には新規に作成した分析用の DB を指定します。

乗り越えたこと

この構成を作る上で乗り越えることがいくつかありました。
細かい話ですが、同じ状況に遭遇した際の解決に役立ちそうなので書いていきます。

コンテナ内の環境変数を Embulk で使いたい

production 環境では Docker コンテナ内に定義した環境変数を使いたいです。
以下は調べると良く出てくるサンプルです。

main.dig
timezone: UTC

_export:
  INPUT_DATABASE_HOST: ${database.input_host}
  INPUT_DATABASE_USER: ${database.input_user}
  INPUT_DATABASE_PASSWORD: ${database.input_password}
  INPUT_DATABASE_NAME: ${database.input_name}
  OUTPUT_DATABASE_HOST: ${database.output_host}
  OUTPUT_DATABASE_USER: ${database.output_user}
  OUTPUT_DATABASE_PASSWORD: ${database.output_password}
  OUTPUT_DATABASE_NAME: ${database.output_name}
config/development.yml
database:
  input_host: docker.for.mac.localhost
  input_user: root
  input_password: ""
  input_name: development
  output_host: docker.for.mac.localhost
  output_user: root
  output_password: ""
  output_name: development_copy
# 起動
digdag run main.dig -P config/development.yml

良くあるサンプルなのですが、production の時はどうするのかという問題に当たります。
config/production.yml に直接 DB 情報をベタがきする訳にもいかないので、この方法は使えません。

そこで、Embulk のタスクファイル内で直接環境変数を読むようにしました。

tasks/hoge_task.yml.liquid
in:
  type: mysql
  host: {{env.INPUT_DATABASE_HOST}}
  user: {{env.INPUT_DATABASE_USER}}
  {% if env.INPUT_DATABASE_PASSWORD != '' %}
  password: {{env.INPUT_DATABASE_PASSWORD}}
  {% endif %}
  database: {{env.INPUT_DATABASE_NAME}}
  table: sample
  select: "*"

out:
  type: mysql
  host: {{env.OUTPUT_DATABASE_HOST}}
  port: {{env.OUTPUT_DATABASE_PORT}}
  user: {{env.OUTPUT_DATABASE_USER}}
  {% if env.OUTPUT_DATABASE_PASSWORD != '' %}
  password: {{env.OUTPUT_DATABASE_PASSWORD}}
  {% endif %}
  database: {{env.OUTPUT_DATABASE_NAME}}
  table: sample
  select: "*"
  mode: merge

このようにして書くと Embulk が Docker コンテナ内の環境変数を使ってくれます。

プラグインが対応していないカラムの型があった

embulk のプラグインで embulk-output-mysql を使っていました。
ログは CloudWatch に流れていくのですが、ログを見ると以下のようなエラーが出ていました。

Data truncation: Data too long for column 'body' at row 21

とあるテーブルの body 21行目のカラムを見ても値は NULL ... おかしいなと思い、再度 README を読んでみると Supported types の項目が。

スクリーンショット 2019-12-07 15.13.49.png

https://github.com/embulk/embulk-output-jdbc/tree/master/embulk-output-mysql#supported-types

ここでエラーが出ていた body のカラムの型を見てみると mediumtext 型でした。
単純にサポートされてなかったようです。PR 投げて直せたら良いですが、一旦の退避策としては以下のようになります。

out:
  ...
  column_options:
    body: { type: TEXT }
    is_sample: { type: TINYINT }
  mode: merge

filters:
  - type: mask
    columns:
      - { name: body, type: substring, length: 65535 }

サポートされている TEXT 型で書き出すように設定します。また、embulk-filter-mask というプラグインを使っていたため、合わせて最大文字サイズも調整します。

boolean 型のサポートも今後対応との事で、TYNYINT で指定します。

DB のタイムアウト

Communications link failure
The last packet successfully received from the server was 114,983 milliseconds ago. The last packet sent successfully to the server was 1 milliseconds ago.

CloudWatch のログをみると、上記エラーが発生していました。
調べると MySQL の timeout が関係しているようなので調査をし、結果的には wait_timeout が原因だったので値を増やして解決しました。

参考: https://qiita.com/uchiko/items/b27537c0a100b6537a4a

RDS のメモリ不足

# CloudWatch のエラーログ
No operations allowed after statement closed
# RDS のエラーログ

vailable memory is low.
...
<jemalloc>: Error in mmap(): err: 12, msg: Cannot allocate memory
<jemalloc>: Error in malloc(): out of memory

その次に CloudWatch に別のエラーが出ており原因がいまいち掴めないなと困っていた所、RDS のエラーログを見ると明確に OOME が出ていました。インスタンスタイプを上げて回避しましたが、オートスケールするようにしても良さそうです。

Slack 通知

タスクが終了した後の結果を通知したいときは、便利なライブラリがあったので紹介します。

https://github.com/szyn/digdag-slack

このサンプルで ENVIRONMENT の値を動的に取りたかったのですが、調べる限り dig ファイルの中で直接環境を取得する方法は見当たらず、以下のような追記が必要そうです。

main.dig
_export:
  ..

+step1:
  rb>: SetEnv.run
  require: 'config/set_env'

+load:
  .. 
config/set_env.rb
class SetEnv
  def run
    Digdag.env.store(ENVIRONMENT: ENV['ENVIRONMENT'])
  end
end

もちろん Dockerfile にも ruby をインストールする記述が必要です。

RUN apk --update --no-cache add ... ruby ruby-bundler ruby-json && \
...

通知結果

Frame 3.png

その他

その他やったこととして Terraform でこの構成を作る事や、Redash と DB の間に HAProxy がいたので、一台の HAProxy でマスク前/後のDBに正しく Redash からリクエストを割振れるようにしたりと、やる事がたくさんありました。

(おまけ) Digdag 全体のコード

main.dig
timezone: UTC

_export:
  plugin:
    repositories:
      - https://jitpack.io
    dependencies:
      - com.github.szyn:digdag-slack:0.1.4
  webhook_url: xxx
  workflow_name: DB Masking

+step1:
  rb>: SetEnv.run
  require: 'config/set_env'

+load:
  _parallel: true

  +load1:
    for_each>:
      table: [
        hoge,
        ...
      ]
    _do:
      sh>: embulk run ./tasks/${table}.yml.liquid

    _check:
      slack>: notify_templates/success.yml

    _error:
      slack>: notify_templates/failed.yml

  +load2:
    for_each>:
      table: [
        huga,
        ...
      ]
    _do:
      sh>: embulk run ./tasks/${table}.yml.liquid

    _check:
      slack>: notify_templates/success.yml

    _error:
      slack>: notify_templates/failed.yml

まとめ

今回は didag と embulk を使った分析基盤の作成(データのマスキング処理)と Redash からアクセス可能にするまでを行いました。このようなところでも技術を用いて広義の信頼性を高められるので、面白いなと思います。

Twitter のフォローもどうぞよろしくです! (@hassasa3)

Why not register and get more from Qiita?
  1. We will deliver articles that match you
    By following users and tags, you can catch up information on technical fields that you are interested in as a whole
  2. you can read useful information later efficiently
    By "stocking" the articles you like, you can search right away
Comments
Sign up for free and join this conversation.
If you already have a Qiita account
Why do not you register as a user and use Qiita more conveniently?
You need to log in to use this function. Qiita can be used more conveniently after logging in.
You seem to be reading articles frequently this month. Qiita can be used more conveniently after logging in.
  1. We will deliver articles that match you
    By following users and tags, you can catch up information on technical fields that you are interested in as a whole
  2. you can read useful information later efficiently
    By "stocking" the articles you like, you can search right away