Edited at

PrestoとHiveでのORDER BYでの評価の仕方を見てみる(on TreasureData)

More than 3 years have passed since last update.


はじめに

Hive 0.13/Presto0.144 on TreasureDataでのメモ。

Order byで下記のケースでHiveではエラーになり、Prestoでは成功したので、実行計画を眺めてみた。

あくまで眺めただけなので、なぜ実行計画がそのようになるかについては触れないです。

(TD_TIME_FORMATはunixtime(bigint/long/int)を指定のフォーマットの文字列に変換するUDF)

原因としては、HiveのOrder byでのTD_TIME_FORMATの評価がSELECT内でtimeを変換した後に行われるために、文字列に対してTD_TIME_FORMATをかけるためにエラーになってしまう。

ORDER BYの評価順としてはこれが正しそうで、Prestoはなぜか成功する。

SELECT 

TD_TIME_FORMAT(time,
'yyyy‐MM‐dd',
'JST') AS time,
COUNT(*) AS cnt
FROM
adjust
WHERE
network = 'Organic'
AND TD_TIME_RANGE(time,
TD_TIME_ADD(TD_SCHEDULED_TIME(),
'-1d',
'JST'))
GROUP BY
TD_TIME_FORMAT(time,
'yyyy‐MM‐dd',
'JST')
ORDER BY TD_TIME_FORMAT(time,
'yyyy‐MM‐dd',
'JST') DESC LIMIT 100

結論としては、Prestoはクエリの実行計画のオプティマイザが賢いので、うまくやってくれてるっぽい。

が、SELECTの前にEXPLAINをしてそれぞれの実行計画をみてみた。


Hiveの実行計画

上記のままだとエラーになるので、order byのtd_time_formatはtimeに直している。

実行計画をみると、Stage-1のsort orderの際には既に_col0がstringになっている。

stringになるのは、その前段階で実行されるSelect Operatorの中でTD_TIME_FORMATが適用されているからで、単純な処理の流れで考えるとそうなりそうです。

                  Reduce Output Operator

key expressions: _col0 (type: string)
sort order: +
Map-reduce partition columns: _col0 (type: string)


STAGE DEPENDENCIES:
Stage-1 is a root stage
Stage-2 depends on stages: Stage-1
Stage-0 is a root stage

STAGE PLANS:
Stage: Stage-1
Map Reduce
Map Operator Tree:
TableScan
alias: adjust
Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE
Filter Operator
predicate: ((network = 'Organic') and TD_TIME_RANGE(time, td_time_add(TD_SCHEDULED_TIME(), '-1d', 'JST'))) (type: boolean)
Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE
Select Operator
expressions: time (type: int)
outputColumnNames: time
Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE
Group By Operator
aggregations: count()
keys: td_time_format(time, 'yyyy‐MM‐dd', 'JST') (type: string)
mode: hash
outputColumnNames: _col0, _col1
Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE
Reduce Output Operator
key expressions: _col0 (type: string)
sort order: +
Map-reduce partition columns: _col0 (type: string)
Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE
value expressions: _col1 (type: bigint)
Reduce Operator Tree:
Group By Operator
aggregations: count(VALUE._col0)
keys: KEY._col0 (type: string)
mode: mergepartial
outputColumnNames: _col0, _col1
Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE
Select Operator
expressions: _col0 (type: string), _col1 (type: bigint)
outputColumnNames: _col0, _col1
Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE
File Output Operator
compressed: true
table:
input format: org.apache.hadoop.mapred.SequenceFileInputFormat
output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
serde: org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe

Stage: Stage-2
Map Reduce
Map Operator Tree:
TableScan
Reduce Output Operator
key expressions: _col0 (type: string)
sort order: -
Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE
value expressions: _col0 (type: string), _col1 (type: bigint)
Reduce Operator Tree:
Extract
Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE
Limit
Number of rows: 100
Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE
File Output Operator
compressed: false
Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE
table:
input format: org.apache.hadoop.mapred.SequenceFileInputFormat
output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe

Stage: Stage-0
Fetch Operator
limit: 100


Prestoの実行計画

ORDER BY TD_~でもtimeでも1にしても全部下の実行計画になる。

下記の段階でlimitも含めて良きように計らってくれているみたい。

- TopN[100 by (td_time_format DESC_NULLS_LAST)] => [td_time_format:varchar, count:bigint]

- Output[time, cnt] => [td_time_format:varchar, count:bigint]

time := td_time_format
cnt := count
- TopN[100 by (td_time_format DESC_NULLS_LAST)] => [td_time_format:varchar, count:bigint]
- Exchange[GATHER] => td_time_format:varchar, count:bigint
- TopN[100 by (td_time_format DESC_NULLS_LAST)] => [td_time_format:varchar, count:bigint]
- Project => [td_time_format:varchar, count:bigint]
- Aggregate(FINAL)[td_time_format] => [td_time_format:varchar, $hashvalue:bigint, count:bigint]
count := ""count""(""count_8"")
- Exchange[REPARTITION] => td_time_format:varchar, count_8:bigint, $hashvalue:bigint
- Aggregate(PARTIAL)[td_time_format] => [td_time_format:varchar, $hashvalue_9:bigint, count_8:bigint]
count_8 := ""count""(*)
- Project => [td_time_format:varchar, $hashvalue_9:bigint]
$hashvalue_9 := ""combine_hash""(0, COALESCE(""$operator$hash_code""(""td_time_format""), 0))
- Project => [td_time_format:varchar]
td_time_format := ""td_time_format""(""time"", CAST('yyyy‐MM‐dd' AS VARCHAR), CAST('JST' AS VARCHAR))
- Filter[((""time"" >= 1460446740) AND (""network"" = CAST('Organic' AS VARCHAR)))] => [network:varchar, time:bigint]
- TableScan[td-presto:td:support.adjust, originalConstraint = ((""network"" = 'Organic') AND (""time"" >= 1460446740))] => [network:varchar, time:bigint]
LAYOUT: com.treasure_data.presto.connector.TDTableLayoutHandle@57531b8
network := td:network
time := td:time

そういえば、Prestoの- TableScan[td-presto:td:support.adjust, originalConstraint = ((""network"" = 'Organic') AND (""time"" >= 1460446740))] => [network:varchar, time:bigint]

を見ると、TableScan時にtimeカラムのパーティショニングとカラムナデータベースによるnetworkのカラムだけスキャン対象になっていることがわかったりする。


結論

ただの確認。

ORDER BY 1って使えば間違いが少ないかも。っておもったけど、hiveだとorder by 1ってやると1番目のカラムをそのまま利用するから同じくエラーになりますね。カラム名は変えていくのが良さそうです。