LoginSignup
8

More than 3 years have passed since last update.

Redshift における データロード それと slow query 対策の実践

Last updated at Posted at 2019-12-03

NewsPicks の VPoEの戸辺と申します。普段は、データ分析基盤の構築、機械学習、NewsPicks におけるSRE的なチームリード、チームビルディング、予算管理、QA、問い合わせ対応、採用、などを主に担当しています。それらの業務の中で、今年の Adevent Caledar では、採用とデータ分析まわりについて担当いたします。採用については、別途「採用アドベントカレンダー2019」に記事を書いております。

=> NewsPicks流エンジニア採用の極意

エンジニア採用に興味のある方が是非そちらも御覧ください。

こちらでは、今回、データ分析基盤に関する記事を書かせていただきました。普段使っている AWS Redshift の Tips をいくつかご紹介したいと思います。Tips というか現場で運用しているノウハウ及び、その実践例です。

もっと網羅的にご紹介させていただきたかったのですが、今日は今年よくやっていた業務から2つ紹介させていただきます。それ以外については、また別の機会でご紹介できればと思います。

1. データロード

今年は他のエントリでもあったように、Data Lake を作るプロジェクトをやっていたため、ETL 処理はたくさんつくりました。特に、データロードはベストプラクティスをしっかりと踏襲していかないと、かなり時間的なロスが発生します。

Redshift へのデータロードについては公式ドキュメントが充実しているのでまずはそれを見ましょう。これは必見です!
Amazon Redshift のデータロードのベストプラクティス

これをご覧いただければわかるのですが、とにかく COPY が最速です。COPY 以外でデータロードをしていたら、COPY に置き換えることを検討してください。最も遅いのは行ごとの insert です。

行ごとの insert < バルクでの insert < COPY

まず、これを叩き込んでください。

行単位の insert

では、insert から改善していきます。

(擬似コードです)

data = [(1, 'a'), (2, 'b'), (3, 'c'), (4, 'd')]
for value in data
    do_sql("insert into table(column) value({}, '{}');".format(value[0], value[1]))

要はこういうことですね

insert into table(id, column) value(1, 'a');
insert into table(id, column) value(2, 'b');
insert into table(id, column) value(3, 'c');
insert into table(id, column) value(4, 'd');

このような行単位の insert は本当に遅い!これはよっぽど件数の少ない状況で、頻度も少ないときにしか使いません。1行 insert を for で回して何回も実行しているようなところがあったら今すぐ辞めましょう。

bulk insert

上記のは insert がデータの個数分実行されていますが、これはまとめて insert にすれば少し改善されます。

(擬似コードです)

data = [(1, 'a'), (2, 'b'), (3, 'c'), (4, 'd')]
values=",".join(data)
do_sql("insert into table(id, column) value{};".format(values))

要はこういうことですね

insert into table(id, column) value (1, 'a'), (2, 'b'), (3, 'c'), (4, 'd');

これはだいぶマシです。

COPY

これが最終形。最速です。(データが少ないときは上のほうが速いこともあります)

(擬似コードです)

data = [(1, 'a'), (2, 'b'), (3, 'c'), (4, 'd')]
csv = "\n".join(data)
to_file(csv, "data.csv")
do_shell("aws s3 cp data.csv s3://data-bucket/")
do_sql("COPY table from s3://data-bucket/data.csv") 

要はこういうことです。

csv (s3://data-bucket/data.csv)

1, 'a'
2, 'b'
3, 'c'
4, 'd'
copy table from s3://data-bucket/data.csv

前述の bulk insert とことなり、途中で csv を使うため、データの不備などには弱いという弱点があります。速度とデータの不備対策にかかるコストとのトレードオフになることもあります。次に、データロード時のエラー対策もあわせて説明します。

データロード時のエラー対策

copy コマンドで失敗したものは、 stl_load_errors というテーブルにエラー内容が格納されます。 stl_load_error には以下のような情報が格納されています。

     Column      |            Type             | Collation | Nullable | Default 
-----------------+-----------------------------+-----------+----------+---------
 userid          | integer                     |           | not null | 
 slice           | integer                     |           | not null | 
 tbl             | integer                     |           | not null | 
 starttime       | timestamp without time zone |           | not null | 
 session         | integer                     |           | not null | 
 query           | integer                     |           | not null | 
 filename        | character(256)              |           | not null | 
 line_number     | bigint                      |           | not null | 
 colname         | character(127)              |           | not null | 
 type            | character(10)               |           | not null | 
 col_length      | character(10)               |           | not null | 
 position        | integer                     |           | not null | 
 raw_line        | character(1024)             |           | not null | 
 raw_field_value | character(1024)             |           | not null | 
 err_code        | integer                     |           | not null | 
 err_reason      | character(100)              |           | not null | 

この中で、有用な情報だけをそれなりにフォーマットされた状態で確認できるクエリがこちらです。

select
    substring(starttime, 1, 19) as tm
    ,sle.line_number as li
--    ,filename
    ,substring(sle.colname, 1, 30) as col
    ,substring(sle."type", 1, 8) as t
    ,substring(sle.col_length, 1, 5) as len
    ,sle."position" as p
    ,substring(sle."raw_line", 1, 20) as raw_l
    ,substring(sle."raw_field_value", 1, 20) as raw_f
    ,substring(sle.err_reason, 1, 35) as error
from stl_load_errors sle
where query = (select max(query) from stl_load_errors)
order by starttime desc
limit 100
;

where 句は適宜調整してください。この例は、一番最後の load error だけを出力している例です。他にも、テーブル名を指定したり、いつ失敗したかを期間で絞り込んだりという用途が考えられます。 filename はそのエラーになったデータの s3 bucket が表示されます。長いので、見やすさのために普段はコメントアウトしていますが、実データを確認したい場合、出力して、そのデータをダウンロードして確認したりします。

エラーのデバッグはエラー表をみながらやるとはかどります。

ロードエラー参照

よく遭遇するエラーというのはありますが、エラー内容を鵜呑みにしてはいけません。csv というのが罠で、それの parse というか区切りの失敗などが原因の場合、想定していないカラムに想定していないデータがマッピングされることになり、その結果 入力データがデータ型で受入可能な範囲を超過していました。 というエラーになることが多くあります。これはデータが長すぎるわけではなく、区切りに失敗していることも疑わなければなりません。

まず、区切りに失敗するデータを先にチェックするとデバッグがはかどります。 csv でエラーになりやすいのは、データの中に ,, \, ", あたりが入っているケースです。, に関しては、オプション等を利用して、全データを " で囲うようにすると対処できます。次に、この場合データの中に " があるとそこで区切れてしまうので、それを "" としてエスケープする必要があります。このようにして、必要なデータ変換をほどこすと、 COPY もすんなり通ると思います。

おまけですが、RDS から Redshift にデータをロードしたい場合、RDS から csv を出力したくなると思います。ところが RDS には csv フォーマットで出力する機能がありません。そこで、先人たちは、SELECT 文を駆使して、csv フォーマットで出力することを考えたようです。(弊社ではほとんど採用していませんw)

Exporting table from Amazon RDS into a csv file - Stack Overflow

2. slow query 対策

Work Load Management(WLM)を使っていない場合、誰かが重いクエリを投げてしまうと、他の人の作業が止まってしまいます。システムがバッチで実行する予定だったものが正常に終えられないなどの悲劇につながることもあります。そこで、NewsPicks では、slow query を監視し、production user 以外が発行したクエリは、一定時間経過しても終わらない場合、自動 cancel するようにしています。

slow query 検知 query

select
    pid
    ,substring(user_name, 1, 8) as user
    ,substring(status, 1, 8) as status
    ,duration
    ,starttime
    ,substring(regexp_replace(query, '\n', ' '), 1, 100)
from stv_recents
where duration > 600000000
and status = 'Running'
;

これで、10分以上時間がかかっているクエリが見つけられます。これをバッチサーバかなにかに仕込んでおいて、検知し次第 Slack に通知して、production 以外が発行しているものは、自動で cancel してしまいます。

コードは少し長い(& 弊社作成のモジュールなども使っているためこのまま実行はできない)ですが、雰囲気は伝わりますし、少し書き換えるだけでそのまま使えるかと思います。

# -*- coding: utf-8 -*-

import sys
import requests
import json
import datetime
import os
from db.redshift import Redshift # この辺は弊社ライブラリ
from config.authentication import Authentication #この辺も...
# (各種設定情報や DB への接続を扱うクラスです)

class SlowQueryChecker:

    def __init__(self):
        conf = Authentication()
        self.IMPORTANT_USER_SET = (
            '[slow query でも自動キャンセルしてはいけないユーザー一覧]',
            'important_user_name',
            'production_batch',
            'production_ai',
        )
        self.slack_url = "https://hooks.slack.com/services/{}/{}/{}".format(
            '[slack組織ID]', '[slackチームID]', '[webhookID]'
        ) # ちょっと変数は忘れてしまったけど、要は webhook のURI/いまなら Slack APIが better
        self.slack_channel = "#[通知先のチャンネル名]"
        self.redshift = Redshift(conf.getConnectInfo('redshift'))

    def __call__(self):
        results = self.getSlowQuery()
        if len(results) is 0:
            sys.exit()
        attachements = self.attachements(results)
        self.postToSlack(attachements)
        self.cancelQuery(results)

    def getSlowQuery(self):
        cursor = self.redshift.getCursor()
        cursor.execute(self.checkQuery())
        return cursor.fetchall()

    def cancelQuery(self, results):
        cursor = self.redshift.getCursor()
        [cursor.execute(self.cancelQueryQuery(result[0]))
         for result in results if result[2].rstrip() not in self.IMPORTANT_USER_SET]

    def checkQuery(self):
        elapsed_threshold = 600000000
        return "select pid , userid , user_name , status , duration , starttime, query  \
               from stv_recents  \
               where duration > " + str(elapsed_threshold) + "  \
               and status = 'Running'"

    def cancelQueryQuery(self, queryId):
        return "cancel {}".format(queryId)

    def postToSlack(self, attachments):
        payload = {
            "channel": self.slack_channel,
            "username": "slow-query",
            "icon_emoji": ":scream:",
            "text": "redshift slow query has occurred within 10 minutes.",
            "title": "",
            "attachments": attachments}
        requests.post(self.slack_url, json.dumps(payload))

    def attachements(self, results):
        attachments = []
        for result in results:
            user = result[2].rstrip()
            userName = "*user name* : `{}`".format(
                user
            )
            cancelCommand = "*cancel command* : `cancel {};`".format(
                str(result[0])
            )
            query = "*query* ```{}```".format(
                result[6]
            )
            text = "\n".join([userName, cancelCommand, query])
            fields = []
            field_duration = {
                "title": "duration",
                "value": self.displayFormatDuration(result[4]),
                "short": "true"
            }
            field_starttime = {
                "title": "starttime",
                "value": result[5].strftime('%Y-%m-%d %H:%M:%S'),
                "short": "true"
            }
            field_auto_killed = {
                "title": "process",
                "value": "** So sorry, your query was killed by system because it is not good. **",
                "short": "false"
            }
            field_only_alert = {
                "title": "process",
                "value": "This query spend long time, but it was not killed because it is important.",
                "short": "false"
            }
            fields += [field_duration, field_starttime]
            if user not in self.IMPORTANT_USER_SET:
                fields += [field_auto_killed]
            else:
                fields += [field_only_alert]
            attachment = {
                "fallback": "slow query attachment.",
                "color": "danger",
                "text": text,
                "mrkdwn_in": ["text"],
                "fields": fields
            }
            attachments.append(attachment)
        return attachments

    def displayFormatDuration(self, microseconds):
        td = datetime.timedelta(microseconds=microseconds)
        minutes, seconds = divmod(td.seconds, 60)
        return str(minutes) + "分" + str(seconds) + "秒 実行中"


if __name__ == '__main__':
    SlowQueryChecker()()

これを Redshift に接続できるバッチサーバかなにかにおいて、10分に1度実行しておけば、slow query を検知 & 自動クエリキャンセルできます。

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
What you can do with signing up
8