LoginSignup
1
1

More than 5 years have passed since last update.

PrestoとHiveでのORDER BYでの評価の仕方を見てみる(on TreasureData)

Last updated at Posted at 2016-04-13

はじめに

Hive 0.13/Presto0.144 on TreasureDataでのメモ。
Order byで下記のケースでHiveではエラーになり、Prestoでは成功したので、実行計画を眺めてみた。
あくまで眺めただけなので、なぜ実行計画がそのようになるかについては触れないです。
(TD_TIME_FORMATはunixtime(bigint/long/int)を指定のフォーマットの文字列に変換するUDF)

原因としては、HiveのOrder byでのTD_TIME_FORMATの評価がSELECT内でtimeを変換した後に行われるために、文字列に対してTD_TIME_FORMATをかけるためにエラーになってしまう。
ORDER BYの評価順としてはこれが正しそうで、Prestoはなぜか成功する。

SELECT 
  TD_TIME_FORMAT(time,
    'yyyy‐MM‐dd',
    'JST') AS time,
  COUNT(*) AS cnt
FROM
  adjust
WHERE
  network = 'Organic'
  AND TD_TIME_RANGE(time,
    TD_TIME_ADD(TD_SCHEDULED_TIME(),
      '-1d',
      'JST'))
GROUP BY
  TD_TIME_FORMAT(time,
    'yyyy‐MM‐dd',
    'JST')
ORDER BY TD_TIME_FORMAT(time,
    'yyyy‐MM‐dd',
    'JST') DESC LIMIT 100

結論としては、Prestoはクエリの実行計画のオプティマイザが賢いので、うまくやってくれてるっぽい。
が、SELECTの前にEXPLAINをしてそれぞれの実行計画をみてみた。

Hiveの実行計画

上記のままだとエラーになるので、order byのtd_time_formatはtimeに直している。
実行計画をみると、Stage-1のsort orderの際には既に_col0がstringになっている。
stringになるのは、その前段階で実行されるSelect Operatorの中でTD_TIME_FORMATが適用されているからで、単純な処理の流れで考えるとそうなりそうです。

                  Reduce Output Operator
                    key expressions: _col0 (type: string)
                    sort order: +
                    Map-reduce partition columns: _col0 (type: string)

STAGE DEPENDENCIES:
  Stage-1 is a root stage
  Stage-2 depends on stages: Stage-1
  Stage-0 is a root stage

STAGE PLANS:
  Stage: Stage-1
    Map Reduce
      Map Operator Tree:
          TableScan
            alias: adjust
            Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE
            Filter Operator
              predicate: ((network = 'Organic') and TD_TIME_RANGE(time, td_time_add(TD_SCHEDULED_TIME(), '-1d', 'JST'))) (type: boolean)
              Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE
              Select Operator
                expressions: time (type: int)
                outputColumnNames: time
                Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE
                Group By Operator
                  aggregations: count()
                  keys: td_time_format(time, 'yyyy‐MM‐dd', 'JST') (type: string)
                  mode: hash
                  outputColumnNames: _col0, _col1
                  Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE
                  Reduce Output Operator
                    key expressions: _col0 (type: string)
                    sort order: +
                    Map-reduce partition columns: _col0 (type: string)
                    Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE
                    value expressions: _col1 (type: bigint)
      Reduce Operator Tree:
        Group By Operator
          aggregations: count(VALUE._col0)
          keys: KEY._col0 (type: string)
          mode: mergepartial
          outputColumnNames: _col0, _col1
          Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE
          Select Operator
            expressions: _col0 (type: string), _col1 (type: bigint)
            outputColumnNames: _col0, _col1
            Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE
            File Output Operator
              compressed: true
              table:
                  input format: org.apache.hadoop.mapred.SequenceFileInputFormat
                  output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
                  serde: org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe

  Stage: Stage-2
    Map Reduce
      Map Operator Tree:
          TableScan
            Reduce Output Operator
              key expressions: _col0 (type: string)
              sort order: -
              Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE
              value expressions: _col0 (type: string), _col1 (type: bigint)
      Reduce Operator Tree:
        Extract
          Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE
          Limit
            Number of rows: 100
            Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE
            File Output Operator
              compressed: false
              Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE
              table:
                  input format: org.apache.hadoop.mapred.SequenceFileInputFormat
                  output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
                  serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe

  Stage: Stage-0
    Fetch Operator
      limit: 100

Prestoの実行計画

ORDER BY TD_~でもtimeでも1にしても全部下の実行計画になる。
下記の段階でlimitも含めて良きように計らってくれているみたい。
- TopN[100 by (td_time_format DESC_NULLS_LAST)] => [td_time_format:varchar, count:bigint]

- Output[time, cnt] => [td_time_format:varchar, count:bigint]
        time := td_time_format
        cnt := count
    - TopN[100 by (td_time_format DESC_NULLS_LAST)] => [td_time_format:varchar, count:bigint]
        - Exchange[GATHER] => td_time_format:varchar, count:bigint
            - TopN[100 by (td_time_format DESC_NULLS_LAST)] => [td_time_format:varchar, count:bigint]
                - Project => [td_time_format:varchar, count:bigint]
                    - Aggregate(FINAL)[td_time_format] => [td_time_format:varchar, $hashvalue:bigint, count:bigint]
                            count := ""count""(""count_8"")
                        - Exchange[REPARTITION] => td_time_format:varchar, count_8:bigint, $hashvalue:bigint
                            - Aggregate(PARTIAL)[td_time_format] => [td_time_format:varchar, $hashvalue_9:bigint, count_8:bigint]
                                    count_8 := ""count""(*)
                                - Project => [td_time_format:varchar, $hashvalue_9:bigint]
                                        $hashvalue_9 := ""combine_hash""(0, COALESCE(""$operator$hash_code""(""td_time_format""), 0))
                                    - Project => [td_time_format:varchar]
                                            td_time_format := ""td_time_format""(""time"", CAST('yyyy‐MM‐dd' AS VARCHAR), CAST('JST' AS VARCHAR))
                                        - Filter[((""time"" >= 1460446740) AND (""network"" = CAST('Organic' AS VARCHAR)))] => [network:varchar, time:bigint]
                                            - TableScan[td-presto:td:support.adjust, originalConstraint = ((""network"" = 'Organic') AND (""time"" >= 1460446740))] => [network:varchar, time:bigint]
                                                    LAYOUT: com.treasure_data.presto.connector.TDTableLayoutHandle@57531b8
                                                    network := td:network
                                                    time := td:time

そういえば、Prestoの- TableScan[td-presto:td:support.adjust, originalConstraint = ((""network"" = 'Organic') AND (""time"" >= 1460446740))] => [network:varchar, time:bigint]
を見ると、TableScan時にtimeカラムのパーティショニングとカラムナデータベースによるnetworkのカラムだけスキャン対象になっていることがわかったりする。

結論

ただの確認。
ORDER BY 1って使えば間違いが少ないかも。っておもったけど、hiveだとorder by 1ってやると1番目のカラムをそのまま利用するから同じくエラーになりますね。カラム名は変えていくのが良さそうです。

1
1
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
1
1