1
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 5 years have passed since last update.

HiveQLで時系列データを等間隔サンプリングする

Last updated at Posted at 2019-12-06

良い先例がなかなか見つからなかったので、試行錯誤して編み出した方法を記録しておきます。

やりたいこと

Hiveテーブル化された時系列データを、時系列に等間隔にサンプリングしたい。
例えば、こういう形で0.1秒間隔で計測されているHiveテーブルのデータを、

id time sensor1
aaa 0.0 1.0
aaa 0.1 1.2
aaa 0.2 1.4
aaa 0.3 1.1
aaa 0.4 1.5
aaa 0.5 1.4
... ... ...
bbb 0.0 1.0
bbb 0.1 1.2
bbb 0.2 1.4
bbb 0.3 1.1
bbb 0.4 1.5
bbb 0.5 1.4
... ... ...

特定のID(aaa)を抽出しつつ1/10サンプリングして、こういう形で取り出したり、

id time sensor1
aaa 0.0 1.0
aaa 1.0 1.4
aaa 2.0 1.9
aaa 3.0 2.1
... ... ...

はたまた1/50サンプリングしてこういう形で取り出したいのです。

id time sensor1
aaa 0.0 1.0
aaa 5.0 1.3
aaa 10.0 2.0
aaa 15.0 1.4
... ... ...

TABLESAMPLEを使うと、ブロックごとのサンプリングが可能のようですが、今回やりたいこととはちょっと違いました。
他にも、ランダムサンプリングする方法はいくつか見つかりましたが、なかなか今回やりたいことが見つかりません。

考えた方法

row_number()で行番号を振り、pmodで行番号の剰余を取って、等間隔にレコードを抽出していくことにします。
具体的には以下のようなHiveQL文です。

hive> select * from (select row_number() over (order by time) as rn,* from テーブル名 where id = 'aaa') tbl1 where pmod(tbl1.rn,間隔数) = 1;

間隔数には数字が入ります。
1/10サンプリングなら10、1/50サンプリングなら50です。

実際に動かしてみた結果

テスト用のテーブルを作ります。ここは本筋ではないので、折りたたんで記載します。

まず、こんな感じでサンプルCSVファイルを作成します。 ヘッダ行なしのCSVです。 [root@quickstart ~]# cat test.csv aaa,0.0,10.0 aaa,0.1,10.1 aaa,0.2,10.2 aaa,0.3,10.3 aaa,0.4,10.4 aaa,0.5,10.5 aaa,0.6,10.6 aaa,0.7,10.7 aaa,0.8,10.8 aaa,0.9,10.9 aaa,1.0,11.0 aaa,1.1,11.1 aaa,1.2,11.2 aaa,1.3,11.3 aaa,1.4,11.4 aaa,1.5,11.5 aaa,1.6,11.6 aaa,1.7,11.7 aaa,1.8,11.8 aaa,1.9,11.9 aaa,2.0,12.0 bbb,0.0,10.0 bbb,0.1,10.1 bbb,0.2,10.2 bbb,0.3,10.3 bbb,0.4,10.4 bbb,0.5,10.5 bbb,0.6,10.6 bbb,0.7,10.7 bbb,0.8,10.8 bbb,0.9,10.9 bbb,1.0,11.0 bbb,1.1,11.1 bbb,1.2,11.2 bbb,1.3,11.3 bbb,1.4,11.4 bbb,1.5,11.5 bbb,1.6,11.6 bbb,1.7,11.7 bbb,1.8,11.8 bbb,1.9,11.9 bbb,2.0,12.0 [root@quickstart ~]#
次にHDFSにCSVファイルを格納します。 ユーザーディレクトリ配下に`work`ディレクトリを作り、ここに先ほどの`test.csv`を`put`します。 [root@quickstart ~]# hdfs dfs -mkdir work [root@quickstart ~]# hdfs dfs -ls /user/root Found 1 items drwxr-xr-x - root supergroup 0 2019-12-06 09:58 /user/root/work [root@quickstart ~]# hdfs dfs -put test.csv /user/root/work [root@quickstart ~]# hdfs dfs -ls /user/root/work Found 1 items -rw-r--r-- 1 root supergroup 546 2019-12-06 09:58 /user/root/work/test.csv [root@quickstart ~]#
Hiveコンソールを起動しExternalテーブルとして作成します。 スキーマ`schema1`を作り、テーブル`table1`を作成しました。`location`には先程CSVを置いたhdfsディレクトリ`/user/root/work`を指定します。 [root@quickstart ~]# hive Logging initialized using configuration in file:/etc/hive/conf.dist/hive-log4j.properties WARNING: Hive CLI is deprecated and migration to Beeline is recommended. hive> create schema if not exists schema1; OK Time taken: 2.688 seconds hive> create external table if not exists schema1.table1 (id string,time double,sensor1 double) row format delimited fields terminated by ',' stored as textfile location 'hdfs://quickstart.cloudera:8020/user/root/work'; OK Time taken: 1.753 seconds hive>
テーブルができたかどうか確認します。 `show schemas`でスキーマを、`show tables`でテーブルを、`desc`で項目名を確認した後、`select count (*)`で件数をチェック。`select count(*)`ではMapReduceジョブが1つ動きました。 hive> show schemas; OK default schema1 Time taken: 0.348 seconds, Fetched: 2 row(s) hive> use schema1; OK Time taken: 0.035 seconds hive> show tables; OK table1 Time taken: 0.147 seconds, Fetched: 1 row(s) hive> desc table1; OK id string time double sensor1 double Time taken: 0.322 seconds, Fetched: 3 row(s) hive> select count (*) from schema1.table1; Query ID = root_20191206100909_3265e881-4c9c-485d-a894-884546c1672e Total jobs = 1 Launching Job 1 out of 1 Number of reduce tasks determined at compile time: 1 In order to change the average load for a reducer (in bytes): set hive.exec.reducers.bytes.per.reducer= In order to limit the maximum number of reducers: set hive.exec.reducers.max= In order to set a constant number of reducers: set mapreduce.job.reduces= Starting Job = job_1575617686036_0001, Tracking URL = http://quickstart.cloudera:8088/proxy/application_1575617686036_0001/ Kill Command = /usr/lib/hadoop/bin/hadoop job -kill job_1575617686036_0001 Hadoop job information for Stage-1: number of mappers: 1; number of reducers: 1 2019-12-06 10:10:29,154 Stage-1 map = 0%, reduce = 0% 2019-12-06 10:10:45,313 Stage-1 map = 100%, reduce = 0%, Cumulative CPU 2.82 sec 2019-12-06 10:11:02,598 Stage-1 map = 100%, reduce = 100%, Cumulative CPU 9.04 sec MapReduce Total cumulative CPU time: 9 seconds 40 msec Ended Job = job_1575617686036_0001 MapReduce Jobs Launched: Stage-Stage-1: Map: 1 Reduce: 1 Cumulative CPU: 9.04 sec HDFS Read: 8067 HDFS Write: 3 SUCCESS Total MapReduce CPU Time Spent: 9 seconds 40 msec OK 42 Time taken: 75.443 seconds, Fetched: 1 row(s) hive>

42行、OKです。
準備ができました。クエリーしてみます。

サンプリングなし全件取得

まずサンプリングせず、id = 'aaa'に絞って全件抽出します。

hive> select * from schema1.table1 where id = 'aaa';
OK
aaa	0.0	10.0
aaa	0.1	10.1
aaa	0.2	10.2
aaa	0.3	10.3
aaa	0.4	10.4
aaa	0.5	10.5
aaa	0.6	10.6
aaa	0.7	10.7
aaa	0.8	10.8
aaa	0.9	10.9
aaa	1.0	11.0
aaa	1.1	11.1
aaa	1.2	11.2
aaa	1.3	11.3
aaa	1.4	11.4
aaa	1.5	11.5
aaa	1.6	11.6
aaa	1.7	11.7
aaa	1.8	11.8
aaa	1.9	11.9
aaa	2.0	12.0
Time taken: 0.454 seconds, Fetched: 21 row(s)
hive>

実行時間は一瞬です。MapReduceジョブ化されないようです。

1/3サンプリング

pmod(tbl1.rn,3) = 1として、1/3サンプリングしてみます。

hive> select * from (select row_number() over (order by time) as rn,* from schema1.table1 where id = 'aaa') tbl1 where pmod(tbl1.rn,3) = 1;

Query ID = root_20191206102424_e57cf233-a3bc-4e0c-a999-e6c46a25536d
Total jobs = 1
Launching Job 1 out of 1
Number of reduce tasks not specified. Estimated from input data size: 1
In order to change the average load for a reducer (in bytes):
  set hive.exec.reducers.bytes.per.reducer=<number>
In order to limit the maximum number of reducers:
  set hive.exec.reducers.max=<number>
In order to set a constant number of reducers:
  set mapreduce.job.reduces=<number>
Starting Job = job_1575617686036_0003, Tracking URL = http://quickstart.cloudera:8088/proxy/application_1575617686036_0003/
Kill Command = /usr/lib/hadoop/bin/hadoop job  -kill job_1575617686036_0003
Hadoop job information for Stage-1: number of mappers: 1; number of reducers: 1
2019-12-06 10:25:05,553 Stage-1 map = 0%,  reduce = 0%
2019-12-06 10:25:20,414 Stage-1 map = 100%,  reduce = 0%, Cumulative CPU 4.15 sec
2019-12-06 10:25:37,678 Stage-1 map = 100%,  reduce = 100%, Cumulative CPU 8.42 sec
MapReduce Total cumulative CPU time: 8 seconds 420 msec
Ended Job = job_1575617686036_0003
MapReduce Jobs Launched:
Stage-Stage-1: Map: 1  Reduce: 1   Cumulative CPU: 8.42 sec   HDFS Read: 11149 HDFS Write: 109 SUCCESS
Total MapReduce CPU Time Spent: 8 seconds 420 msec
OK
1	aaa	0.0	10.0
4	aaa	0.3	10.3
7	aaa	0.6	10.6
10	aaa	0.9	10.9
13	aaa	1.2	11.2
16	aaa	1.5	11.5
19	aaa	1.8	11.8
Time taken: 50.75 seconds, Fetched: 7 row(s)
hive>

MapReduceジョブが1つ走り、50秒位かかりました。
結果には一番左に元の列番号がついています。3列目のtimeを見ると、3行間隔でサンプリングするという当初の目的が達成できています。

1/10サンプリング

同様に、pmod(tbl1.rn,10) = 1として、1/10サンプリングしてみます。

hive> select * from (select row_number() over (order by time) as rn,* from schema1.table1 where id = 'aaa') tbl1 where pmod(tbl1.rn,10) = 1;

Query ID = root_20191206102222_9c8926e9-ec56-4e8a-9fca-c3e062098547
Total jobs = 1
Launching Job 1 out of 1
Number of reduce tasks not specified. Estimated from input data size: 1
In order to change the average load for a reducer (in bytes):
  set hive.exec.reducers.bytes.per.reducer=<number>
In order to limit the maximum number of reducers:
  set hive.exec.reducers.max=<number>
In order to set a constant number of reducers:
  set mapreduce.job.reduces=<number>
Starting Job = job_1575617686036_0002, Tracking URL = http://quickstart.cloudera:8088/proxy/application_1575617686036_0002/
Kill Command = /usr/lib/hadoop/bin/hadoop job  -kill job_1575617686036_0002
Hadoop job information for Stage-1: number of mappers: 1; number of reducers: 1
2019-12-06 10:22:40,518 Stage-1 map = 0%,  reduce = 0%
2019-12-06 10:23:05,465 Stage-1 map = 100%,  reduce = 0%, Cumulative CPU 6.0 sec
2019-12-06 10:23:21,529 Stage-1 map = 100%,  reduce = 100%, Cumulative CPU 10.27 sec
MapReduce Total cumulative CPU time: 10 seconds 270 msec
Ended Job = job_1575617686036_0002
MapReduce Jobs Launched:
Stage-Stage-1: Map: 1  Reduce: 1   Cumulative CPU: 10.27 sec   HDFS Read: 11135 HDFS Write: 47 SUCCESS
Total MapReduce CPU Time Spent: 10 seconds 270 msec
OK
1	aaa	0.0	10.0
11	aaa	1.0	11.0
21	aaa	2.0	12.0
Time taken: 64.588 seconds, Fetched: 3 row(s)
hive>

こちらも同様に、MapReduceジョブが1つ走り、結果が得られました。
この調子でpmodの第2引数を変えれば、1/50サンプリングの結果も得られます。

実行時間について

実行環境:
MacBook Pro (Retina, 13-inch, Early 2015)
Intel Core i7 3.1 GHz
16 GB 1867 MHz DDR3
MacOS 10.14.6 (Mojave)
CDH5.13 + hive 1.1.0

この環境で、MapReduceジョブが1つ走るパターンのHiveクエリー実行時間は30〜70秒くらいでした。データ量に依存するというよりは、その時のマシンのリソース状態で時間が変わるように見えました。

参考にしたサイト

row_number () over (order by xxx)の使い方は、こちらを参考にさせていただきました。
https://qiita.com/hoto17296/items/8738b2e63239c0def612

pmodによる剰余計算は、こちらのサイトを参考にしました。
https://support.treasuredata.com/hc/ja/articles/215724527-Hive-%E6%95%B0%E5%AD%A6%E9%96%A2%E6%95%B0

1
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
1
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?