概要
僕はいま業務でRedashを使っています。
定期的にクエリが実行されるようスケジューリングしているにも関わらず、なぜかクエリが実行されなくなる事象が発生しました。
「どうせスケジュールが設定されていないだけでしょ」と思ったんですが、画面を見るとたしかに3時間おきに実行されるようセットされている。。
「じゃあクエリが間違っているんだ!」と思ってクエリを手動で実行してみたら、きちんと狙い通りのデータが取得できる。。
Redashの公式ドキュメントにも目を通してみましたが、やはり「たとえ失敗して再実行される」としか書かれておらず、今回の事象に当てはまる記述はありませんでした。
そこでRedashのコードを読んで仕様を調べたので、その内容をまとめます。
結論
- スケジュール設定したクエリの実行時にエラーが生じると、そのクエリは次回から実行されなくなる
- いったんスケジュールをNeverに変更し、スケジュール設定し直すと再び正常に動作するようになる
Redashのコードを読んで深掘りしてみた
Queryモデルには「schedule」というJSON型のカラムが存在します。
{
"interval": 3600,
"time": null,
"day_of_week": null,
"until": "2023-06-30",
"disabled": true,
}
この中に「disabled」という要素があり、ここに「true」が設定されてしまうと、たとえ画面上では定期実行が設定されているように見えても、クエリのスケジュール予約が無効になってしまいます。
クエリ自体に問題があり、実行に失敗してしまうと、無効化されてしまうようです。
「query.schedule["disabled"] = True」が設定されるのは次の箇所です。
※Pythonでは、boolean値を表すTrueとFalseは大文字で始まるのが正しい表記方法
@classmethod
def outdated_queries(cls):
# 全てのクエリを取得
queries = (
Query.query.options(
joinedload(Query.latest_query_data).load_only("retrieved_at")
)
.filter(Query.schedule.isnot(None))
.order_by(Query.id)
.all()
)
now = utils.utcnow()
outdated_queries = {}
scheduled_queries_executions.refresh()
for query in queries:
try:
# スケジュール設定が無効化されているクエリはスルー
if query.schedule.get("disabled"):
continue
# スケジュールに終了日が設定されている場合、その日時を過ぎているクエリはスルー
if query.schedule["until"]:
schedule_until = pytz.utc.localize(
datetime.datetime.strptime(query.schedule["until"], "%Y-%m-%d")
)
if schedule_until <= now:
continue
# クエリが最後にスケジュール実行された時間を取得
# 存在しない場合はスケジュール設定されずに実行された時間を取得
retrieved_at = scheduled_queries_executions.get(query.id) or (
query.latest_query_data and query.latest_query_data.retrieved_at
)
# 実行すべきクエリか判定
if should_schedule_next(
retrieved_at or now,
now,
query.schedule["interval"],
query.schedule["time"],
query.schedule["day_of_week"],
query.schedule_failures,
):
key = "{}:{}".format(query.query_hash, query.data_source_id)
outdated_queries[key] = query
except Exception as e:
# 上記処理で例外が発生した場合、そのクエリのスケジュールを無効化し、エラー情報をログに出力
query.schedule["disabled"] = True
db.session.commit()
message = (
"Could not determine if query %d is outdated due to %s. The schedule for this query has been disabled."
% (query.id, repr(e))
)
logging.info(message)
sentry.capture_exception(
type(e)(message).with_traceback(e.__traceback__)
)
return list(outdated_queries.values())
上記のoutdated_queriesメソッドはredash/redash/task/queries/maintenance.pyのrefresh_queriesメソッドの中で呼び出されます。
refresh_queriesメソッドはスケジュール予約されているクエリをジョブキューに登録し実行を待つメソッドです。
# スケジュール予約されているクエリをジョブキューに登録し実行を待つメソッド
def refresh_queries():
logger.info("Refreshing queries..")
enqueued = []
# outdated_queriesメソッドでスケジュール予約されているクエリのうち最新結果の取得が必要なクエリを取得
for query in models.Query.outdated_queries():
if not _should_refresh_query(query):
continue
try:
query_text = _apply_default_parameters(query)
query_text = _apply_auto_limit(query_text, query)
# クエリをジョブキューに登録し実行を待つメソッド
enqueue_query(
query_text,
query.data_source,
query.user_id,
scheduled_query=query,
metadata={"query_id": query.id, "Username": "Scheduled"},
)
enqueued.append(query)
except Exception as e:
message = "Could not enqueue query %d due to %s" % (query.id, repr(e))
logging.info(message)
error = RefreshQueriesError(message).with_traceback(e.__traceback__)
sentry.capture_exception(error)
status = {
"outdated_queries_count": len(enqueued),
"last_refresh_at": time.time(),
"query_ids": json_dumps([q.id for q in enqueued]),
}
redis_connection.hmset("redash:status", status)
logger.info("Done refreshing queries: %s" % status)
今回わからなかったこと
- 業務ではRedashのログを閲覧する権限が僕にはないため、スケジュール設定を無効化する処理の場所は特定できたが、そもそもなぜ無効化されてしまったのかは特定できなかった。