Help us understand the problem. What is going on with this article?

DMSを使った再現性のあるRDBのレプリケーション

More than 1 year has passed since last update.

概要

機械学習でRDSのデータ使いたいんだけどUpdateすると前の状態消えちゃうからなんとかして欲しいという依頼をもらったので、AWSを使ってなんとかしたお話です。

依頼の詳細

機械学習チームでRDSに保存しているデータを利用したいという要望があったのですが、RDBのデータはUpdateやDeleteのような変更処理を実行すると前の状態を再現することが出来ないので、再現性を担保することが出来ずに、モデルの評価や改善がし辛いからなんとかしたい!という要望を受けました。

少しわかりにくいと思うので、例を上げて説明してみます。

ある人がある商品をお気に入りした情報が、以下の3つのテーブルの中に保管されているとします。

Person

id name age created_at updated_at
1 山田 19 2018/12/2 2018/12/2
2 山井 20 2018/12/4 2018/12/4
3 山野 21 2018/12/5 2018/12/5

Item

id name price created_at updated_at
1 加湿器 18000 2017/10/2 2017/10/2
2 ストーブ 6000 2017/10/3 2017/10/3
3 ドライヤー 3000 2017/10/4 2017/10/4

Favorite

id person_id item_id created_at updated_at
1 1 1 2018/12/10 2018/12/10
2 2 1 2018/12/11 2018/12/11
3 1 3 2018/12/12 2018/12/12

2018/12/13 にこの3つのテーブルのデータを利用して特徴量を抽出し、モデルを生成したとします。
この時のモデルの精度は50%だったとしましょう。

次に Item テーブルの加湿器の値段がモデルを生成した次の日(2018/12/14)に値下がりしたとしましょう。

Item

id name price created_at updated_at
1 加湿器 10000 2017/10/2 2018/12/14
2 ストーブ 6000 2017/10/3 2017/10/3
3 ドライヤー 3000 2017/10/4 2017/10/4

すると、テーブルは↑のようになります。

次に、2018/12/15 にモデル生成アルゴリズムを変更し、再度評価した結果、モデルの精度は40%になってしまいました。
しかし、 2018/12/15 時点で 2018/12/13 に生成してモデルが使用しているデータと異なるデータを利用してモデルを生成しているため、精度に影響を与えたのが、データなのかアルゴリズムなのかはわからないと思います。
こういう状態を再現性のない状態と呼ぶらしいです。

ということで、前説が長くなりましたが、RDBでも再現性のある状態を作って欲しいというのが機械学習チームからの依頼内容でした。

作ったもの

再現性のない状態を回避するために、モデル生成時に使ったDBはスナップショットをとって保管しておくとか、S3に保管しておくとか、色々考えたんですが、コスパ的に微妙だったのと、管理しづらい、利用しづらそうだったので、別の方法を考えました。

image.png

最終的に、↑のようなものを構築しました。

各コンポーネント毎に簡単に説明していきます。

RDS -> DMS

RDBで再現性のあるデータを作るためには、変更ログをとって過去の状態を保持しておく必要があると考えました。
ただ、今回の要件的にただログを取っていれば良いという訳でもなく、機械学習で利用しやすいようにしておく必要がありました。

そこで、AWSのデータベース移行サービスであるDMSを用いて、機械学習で利用しやすい形で変更ログを保持する構成にしました。

DMS -> S3

DMSは、一度きりのバッチ処理的なデータ移行も可能ですが、継続的なレプリケーション(CDC)も可能です。
継続的レプリケーションの機能を利用すると、移行元のDBを可動させながら、移行先と同期させることで、最小のダウンタイムでデータベースを移行することが可能となります。

この継続的レプリケーションですが、移行先がRDSのようなDBであれば移行元の状態と同期を取るため変更も同じように上書きされてしまいますが、移行先をS3にすると面白いことに、変更前のレコードを残したまま、変更後のデータを保管しておくことが可能となります。

具体的には、↓のように先頭カラムに Operation コードが挿入され、変更がログのようにインサートされていくので、前の状態が上書きされることはありません。

I,101,Smith,Bob,4-Jun-14,New York
U,101,Smith,Bob,8-Oct-15,Los Angeles
U,101,Smith,Bob,13-Mar-17,Dallas
D,101,Smith,Bob,13-Mar-17,Dallas

↑の例からわかるように、このデータをクエリすることさえできればRDBのデータでも再現性をもたせて参照することができそうです。

S3 -> Glue -> Glue Data catalog

S3のデータをクエリするためには、現状2通りの方法があります。
Redshift SpectrumとAmazon Athenaです。

この2つの違いに関してはこちらの記事を参照していただければと思います。
RedShift SpectrumはRedShiftクラスタを用意する必要があり、コスパが悪かったので、Athenaを使ってS3に対してクエリをすることにしました。

Athenaを使ってS3にクエリするためには、Athena側でテーブルを作成する必要があり、Athenaでテーブルを作るためには、AWS ConsoleやAthenaのAPIを利用して create table ステートメントを実行する必要があります(公式ドキュメントにテーブル作成プロセスがあります)。
AthenaでクエリしたいS3の対象となるテーブル数が少なければ、この方法でも問題ないのですが、今回は500テーブルを超えるテーブルが対象であったため、この方法では骨が折れそうでした。

そこで、Athenaと統合されているAWS Glueのクローラーを用いてデータカタログを作成することにしました。
詳しい方法については、クラスメソッドさんの記事を参考に頂けると良いかと思います。

ただ、ここで2つほど注意点があります。

1. CSVファイルにカラムの情報がないと、Glueのクローラは認識できない

GlueのクローラはS3のファイルをクローリングしてスキーマを自動生成してくれます。
なので、S3にカラムの情報がない場合には、当然生成されるスキーマにはカラム名は付けられません。

DMSでターゲットをS3にした場合には、デフォルトでカラム名を出力しないような設定になっているため、この設定を変える必要があります。
DMSでS3のターゲットを作る時に、追加の接続属性というフォームがあるので、そこに addColumnName=true; を追加しておくと、DMSのタスクで生成されるファイルには、先頭にカラム名が出力されるようになります。

2. 初回ロードのデータには Operation カラムがない

DMSを使う場合、初回のロード時だけは Operationコードのカラムが追加されません。
そのため、Athenaでクエリを書いても、初回ロードのデータだけは一行ずれて認識されてしまいます。

Op,id,name,age,created_at,updated_at
1,山田,19,2018/10/10,2018/10/10 // 初回ロードのデータ
U,1,山田,20,2018/10/10,2018/12/02 // 継続的レプリケーションのデータ

幸いにも、初回ロードのファイルS3上では↓のような形で保存されるので、継続的なレプリケーションで作られたファイルと区別するのは簡単そうです。

<schema_name>/<table_name>/LOAD001.csv // 初回ロード
<schema_name>/<table_name>/LOAD002.csv // 初回ロード
<schema_name>/<table_name>/<time-stamp>.csv // 継続的なレプリケーション

自分は、一度初回ロードのファイルだけダウンロードし、CSVファイルの最初のカラムに Operation コード I を挿入し、再度アップロードすることで対処しました。

Local PC -> Athena -> Glue Data Catalog

ここまでくれば、AthenaでGlueのData Catalogに対するクエリを書くだけです。
しかし、Athenaには、「クエリ結果がCSVファイルで出力されてしまう」という制約があります。

そのため、クエリ結果のデータ数が多い場合には、ダウンロードに時間がかかってしまう可能性もあります。
また、一度にローカルPCにダウンロードしてもメモリに乗り切らないかもしれません。

そこで、Pythonのaws-sdkである boto3 を使ってクエリ結果のページネーションを可能にしました。
しかし、ここにもいくつか注意点があります。

1. クエリ結果のレスポンスがちょっと利用しづらい

AthenaのAPIを使ってクエリ結果を取得しようとした場合、↓のような形式でレスポンスが返ってきます(参照)。

{
    'UpdateCount': 123,
    'ResultSet': {
        'Rows': [
            {
                'Data': [
                    {
                        'VarCharValue': 'string'
                    },
                ]
            },
        ],
        'ResultSetMetadata': {
            'ColumnInfo': [
                {
                    'CatalogName': 'string',
                    'SchemaName': 'string',
                    'TableName': 'string',
                    'Name': 'string',
                    'Label': 'string',
                    'Type': 'string',
                    'Precision': 123,
                    'Scale': 123,
                    'Nullable': 'NOT_NULL'|'NULLABLE'|'UNKNOWN',
                    'CaseSensitive': True|False
                },
            ]
        }
    },
    'NextToken': 'string'
}

恐らく、クエリ結果は tuple型のようにどの順番で入っているのか知っている前提で利用する事を想定しているためこのような形になっていると思います。
が、今回の機械学習チームの要望的にカラム名で参照するのは必須要件でした。

ということで、 boto3 のAthenaのラッパークラスを作成しました。

import json
import boto3
from time import sleep


class Athena:

    def __init__(self, database = None, bucket = None):
        self.database = database
        self.bucket = bucket
        self.client = boto3.client('athena')

    def query(self, sql, batch = 1000):
        query_execution = self.client.start_query_execution(
            QueryString = sql,
            QueryExecutionContext={
                'Database': self.database
            },
            ResultConfiguration={
                'OutputLocation': 's3://%s/%s/' % (self.bucket, self.database)
            }
        )

        self.wait(query_execution)

        return Cursor(
            paginator = self.client.get_paginator('get_query_results'),
            id = query_execution['QueryExecutionId'],
            batch = batch
        )

    def wait(self, query_execution, duration=1):
        while True:
            result = self.client.get_query_execution(QueryExecutionId = query_execution['QueryExecutionId'])
            if result['QueryExecution']['Status']['State'] == 'SUCCEEDED':
                break
            elif result['QueryExecution']['Status']['State'] == 'FAILED':
                raise BaseException(result['QueryExecution']['Status']['StateChangeReason'])
            else:
                sleep(duration)

class Cursor:

    def __init__(self, paginator, id, batch = 1000):
        self.paginator = paginator
        self.id = id
        self.batch = batch
        self.token = None
        self.end_flag = False
        self.count = 0

    def __iter__(self):
        return self

    def get(self, token = None):
        result_iter = self.paginator.paginate(
            QueryExecutionId = self.id,
            PaginationConfig = {
                'MaxItems': self.batch,
                'PageSize': self.batch,
                'StartingToken': token
            }
        )
        result = next(iter(result_iter))
        self.token = result.get('NextToken')
        self.end_flag = self.token == None
        self.count += 1

        return self.__parse_result(result['ResultSet'])

    def __next__(self):
        if self.is_end():
            raise StopIteration()

        return self.get(token = self.token)

    def is_end(self):
        return self.end_flag

    def __parse_result(self, result_set):
        return [{info['Name']: self.__cast(col.get('VarCharValue'), info['Type'])
            for info, col in zip(result_set['ResultSetMetadata']['ColumnInfo'], row['Data'])}
            for i, row in enumerate(result_set['Rows'])
            if self.count != 1 or i != 0] # 1ページ名の最初のレコードにはカラム名が入るのでスキップする

    def __cast(self, column, type):
        if column is None:
            return column
        elif type == 'varchar':
            return str(column)
        elif type in {'interger', 'bigint'}:
            return int(column)
        elif type == 'json':
            return json.loads(column)
        else:
            return column

簡単ですが、これを使えばカラム名をkeyとして dict 形式でクエリ結果を参照することが出来ます。

2. Map型とRow型は利用できない形で返ってくる

これには結構衝撃を受けたのですが、Athenaでクエリを書いた時に Map型と Row型は↓の形式で返ってきます。

{key1=val1, key2=val2}

json 形式でもないし、Pythonの dictでもない...。
恐らくCSVにする時に、色々合ってこの形式になったんだと思うけど、これではvalueの型もわからない...。
パーサーを書こうとしても、この形式ではvalueに =, が入ってきた瞬間エラーになるし、正解も判別できない...。

ということで、MapRowはこのままでは利用できないので、クエリの段階でjsonにパースして利用してもらうことにしました。
jsonであれば↓のような形で返ってくるので普通にパース可能になります。

{"key1": "val1", "key2": "val2"}

まとめ

RDBのように状態が変化するものを機械学習で利用できるレベルに持ってくるために色々なAWSサービスを利用してみました。
個人的にはDMSで変更ログをとったところが、裏技的で好きだったんですが、結構DMSにもハマりポイントがあったりと、実際に構築する際にはかなり詰まりながら進んでいました。

結局コスパ的には、DMSのレプリケーションインスタンスの費用が少し高いくらいで、S3, Glue, Athenaの費用は思った以上に少なかったです。
管理コストについては、ほとんどかかっておらず、機械学習チームのメンバーにAthenaのクエリ実行権現をお渡しするだけで済んでいます。

このタスクを通して、機械学習周りの開発に関わる基本的なことは大体わかった気になりました。

re:Invent 2018でも機械学習系のサービスが沢山発表されていたので、その辺は追いつつ、次はSageMakerあたりをちゃんと触ってみたいなーと思っています。

Why do not you register as a user and use Qiita more conveniently?
  1. We will deliver articles that match you
    By following users and tags, you can catch up information on technical fields that you are interested in as a whole
  2. you can read useful information later efficiently
    By "stocking" the articles you like, you can search right away
Comments
Sign up for free and join this conversation.
If you already have a Qiita account
Why do not you register as a user and use Qiita more conveniently?
You need to log in to use this function. Qiita can be used more conveniently after logging in.
You seem to be reading articles frequently this month. Qiita can be used more conveniently after logging in.
  1. We will deliver articles that match you
    By following users and tags, you can catch up information on technical fields that you are interested in as a whole
  2. you can read useful information later efficiently
    By "stocking" the articles you like, you can search right away