LoginSignup
0
0

More than 1 year has passed since last update.

Redshift Spectrumを試してみた

Last updated at Posted at 2022-03-19

背景・目的

Redshiftのドキュメントの手順に倣い、Redshift Spectrum(以降、Spectrumと言う。)を利用する。

内容

概要

特徴

  • Redshiftクラスタとは別に、専用のRedshiftサーバがある。
    • AWS管理のリソースで実行される。VPCの外にある。
  • 以下のような大量の演算を行うタスクをSpectrumレイヤにプッシュする。その結果、クラスタの処理容量は少なくなる。
    • 集計処理
    • 述語フィルタリング
  • クエリの需要に基づいてインテリジェントに拡張する。数千のインスタンスを用いて超並列処理ができる。

カタログとテーブル

  • Spectrumはファイル構造を定義して、外部データカタログ内のテーブルとして登録する。
  • 以下のカタログが利用可能。
    • GlueやAthenaで使用しているカタログ
    • Apache Hiveメタストア
  • オプションで、外部テーブルを1つ以上の列でパーティション化ができ、パフォーマンスが向上する。
  • Glueのカタログを利用する場合に、暗号化されている可能性がある。その場合、AWS所有(AWS Glue)のKMSが必要になる。
  • CREATE EXTERNAL TABLEにより外部テーブルの作成が可能。

クエリ

  • SpectrumテーブルとRedshiftテーブルと同じように結合できる。
  • 外部テーブル(Spectrumテーブル)に対する更新や削除はサポートされていない。
    • ただし、insertによる挿入は可能。

システムテーブルの注意点

  • PG_TABLE_DEF、STV_TBL_PERM、PG_CLASS、または information_schema など、標準の Amazon Redshift テーブルに使用したものと同じリソースを使用して Redshift Spectrum テーブルの詳細を表示することはできない。

考慮事項

  • RedshiftクラスタとS3バケットは同一リージョンの必要がある。
  • SpectrumはS3のアクセスポイントをサポートしている。ただし、Spectrumでは、S3アクセスポイントのエイリアスを持つVPCをサポートしてない。(※後ほど要確認)
  • Lake Formationに対し有効になっている AWS Glue Data Catalog を使用する場合を除き、外部テーブルのユーザー権限を制御できない。
    • 外部スキーマに対してアクセス権限の付与および取り消しが実行可能。
  • Spectrum クエリを実行するには、データベースユーザーがデータベースに一時テーブルを作成するアクセス権限を持っている必要がある。
    • 以下は、spectrumdbデータベースの一時アクセス権限をspectrumusersユーザグループに付与している。
grant temp on database spectrumdb to group spectrumusers;
  • SpectrumでカタログにGlueを利用する場合、Glueのサービスクォーターに依存するため注意が必要。
  • Spectrumは、Kerberosを使用するEMRをサポートしてない。

実践

  • 以下を参考に、試してみる。

ステップ 1. Amazon Redshift 用の IAM ロールを作成する

  • ドキュメントの例のようにGetとList操作を付与しておく。
  • すでに、対応済みのため省略。
{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Action": [
                "s3:Get*",
                "s3:List*"
            ],
            "Resource": "arn:aws:s3:::awssampledbuswest2/*"
        }
    ]
}

ステップ 2: IAM ロールをクラスターと関連付ける

  • すでに、対応済みのため省略。

ステップ 3: 外部スキーマと外部テーブルを作成する

0.前提

  • 以前、作成したtickitデータベースを使用する。
  • 以下のコマンドでサンプルデータをコピーしておく。
aws s3 cp s3://awssampledbuswest2/tickit/spectrum/sales/ s3://bucket-name/tickit/spectrum/sales/ --recursive 
  • このようなデータが格納されている。
aws s3 cp s3://{バケット名}/tickit/spectrum/sales/sales_ts.000 - | head
2       4       8117    11498   4337    1983    2       76.00   11.40   2008-06-06 05:00:16
6       10      24858   24888   3375    2023    2       394.00  59.10   2008-07-16 11:59:24
7       10      24858   7952    3375    2003    4       788.00  118.20  2008-06-26 00:56:06
8       10      24858   19715   3375    2017    1       197.00  29.55   2008-07-10 02:12:36
9       10      24858   29891   3375    2029    3       591.00  88.65   2008-07-22 02:23:17
28      29      34152   10978   7622    2133    2       58.00   8.70    2008-11-03 03:35:47
29      29      34152   9876    7622    2131    1       29.00   4.35    2008-11-01 03:46:01
45      52      27110   11635   2300    2129    3       378.00  56.70   2008-10-30 10:33:47
62      72      11258   10729   3398    2089    2       328.00  49.20   2008-09-20 05:18:34
63      72      11258   2671    3398    2080    1       164.00  24.60   2008-09-11 05:28:14

1.外部スキーマを作成する。

  • 以下のクエリで外部スキーマを作成する。
create external schema myspectrum_schema 
from data catalog 
database 'tickit'
iam_role 'arn:aws:iam::{アカウントID}:role/{ロール名}'
create external database if not exists;

2.外部テーブルを作成する。

  • 以下のクエリで外部テーブルを作成する。
create external table myspectrum_schema.sales(
salesid integer,
listid integer,
sellerid integer,
buyerid integer,
eventid integer,
dateid smallint,
qtysold smallint,
pricepaid decimal(8,2),
commission decimal(8,2),
saletime timestamp)
row format delimited
fields terminated by '\t'
stored as textfile
location 's3://{バケット名}/tickit/spectrum/sales/'
table properties ('numRows'='172000');
  • テーブル定義を確認する。
SELECT *
FROM SVV_EXTERNAL_COLUMNS
WHERE schemaname = 'myspectrum_schema'
AND tablename = 'sales';

===
schemaname,tablename,columnname,external_type,columnnum,part_key,is_nullable
myspectrum_schema,sales,salesid,int,1,0,
myspectrum_schema,sales,listid,int,2,0,
myspectrum_schema,sales,sellerid,int,3,0,
myspectrum_schema,sales,buyerid,int,4,0,
myspectrum_schema,sales,eventid,int,5,0,
myspectrum_schema,sales,dateid,smallint,6,0,
myspectrum_schema,sales,qtysold,smallint,7,0,
myspectrum_schema,sales,pricepaid,"decimal(8,2)",8,0,
myspectrum_schema,sales,commission,"decimal(8,2)",9,0,
myspectrum_schema,sales,saletime,timestamp,10,0,

ステップ 4: Amazon S3 のデータにクエリを実行する

行数を取得

  • 以下のクエリで7秒程度だった。感覚としてはAthenaと同じくらい。
select count(*) from myspectrum_schema.sales;

====
count
172456

経過時間: 00 分 07 秒

Spectrumの外部テーブルとRedshiftのテーブルの結合

  • S3に大きなファクトテーブルと、Redshiftに小さなディメンションテーブルを持つのはベストプラクティスとのこと。
  • ディメンジョンのeventテーブルとファクトのsalesテーブルを結合し、トップ 10 イベントの合計セールスを検出するクエリを実行する。
  • なお、eventテーブルは以前、こちらで作成したものを利用している。
  • 事前にeventテーブルの件数を確認する。
select count(1) from event;

====
count
8798

経過時間: 00 分 02 秒
  • 以下のクエリで、salesテーブルと、eventテーブルのジョインを実行する。
  • 実行時間は8秒程度。Athenaでジョインするより速い気がする。今後、比較してみたい。
select top 10 myspectrum_schema.sales.eventid, sum(myspectrum_schema.sales.pricepaid) 
from myspectrum_schema.sales, event
where myspectrum_schema.sales.eventid = event.eventid
and myspectrum_schema.sales.pricepaid > 30
group by myspectrum_schema.sales.eventid
order by 2 desc;

====
eventid,sum
289,51846.00
7895,51049.00
1602,50301.00
851,49956.00
7315,49823.00
6471,47997.00
2118,47863.00
984,46780.00
7851,46661.00
5638,46280.00

経過時間: 00 分 08 秒

クエリプランの確認

  • 上記クエリのクエリプランを確認する。
explain
select top 10 myspectrum_schema.sales.eventid, sum(myspectrum_schema.sales.pricepaid) 
from myspectrum_schema.sales, event
where myspectrum_schema.sales.eventid = event.eventid
and myspectrum_schema.sales.pricepaid > 30
group by myspectrum_schema.sales.eventid
order by 2 desc;

===
QUERYPLAN
XN Limit  (cost=1001055765307.12..1001055765307.15 rows=10 width=31)
"  ->  XN Merge  (cost=1001055765307.12..1001055765307.62 rows=200 width=31)"
"        Merge Key: sum(sales.derived_col2)"
"        ->  XN Network  (cost=1001055765307.12..1001055765307.62 rows=200 width=31)"
"              Send to leader"
"              ->  XN Sort  (cost=1001055765307.12..1001055765307.62 rows=200 width=31)"
"                    Sort Key: sum(sales.derived_col2)"
"                    ->  XN HashAggregate  (cost=1055765298.98..1055765299.48 rows=200 width=31)"
"                          ->  XN Hash Join DS_BCAST_INNER  (cost=2546.64..1055765002.71 rows=59254 width=31)"
"                                Hash Cond: (""outer"".derived_col1 = ""inner"".eventid)"
"                                ->  XN S3 Query Scan sales  (cost=2436.67..3010.18 rows=57334 width=31)"
"                                      ->  S3 HashAggregate  (cost=2436.67..2436.84 rows=57334 width=16)"
"                                            ->  S3 Seq Scan myspectrum_schema.sales location:""s3://{バケット名}/tickit/spectrum/sales"" format:TEXT  (cost=0.00..2150.00 rows=57334 width=16)"
"                                                  Filter: (pricepaid > 30.00)"
"                                ->  XN Hash  (cost=87.98..87.98 rows=8798 width=4)"
"                                      ->  XN Seq Scan on event  (cost=0.00..87.98 rows=8798 width=4)"
1. S3上のデータの読み込み、フィルター処理、HashAggregateを実行する。
  • HashAggregateは、未ソートのグループ化関数のようだ。クエリプランの説明はこちらを参照。
->  S3 HashAggregate  (cost=2436.67..2436.84 rows=57334 width=16)"
   ->  S3 Seq Scan myspectrum_schema.sales location:"s3://{バケット名}/tickit/spectrum/sales" format:TEXT  (cost=0.00..2150.00 rows=172000 width=16)
         Filter: (pricepaid > 30.00) 
2. ディメンションテーブルを読み込む。
  • 上記1と同時に実行されている。
->  XN Hash  (cost=87.98..87.98 rows=8798 width=4)                                                                                                        
    ->  XN Seq Scan on event  (cost=0.00..87.98 rows=8798 width=4)
3.上記の1と2のに対して、ハッシュジョインする。
  • クエリプランはDS_BCAST_INNERを使用している。
  • DS_BCAST_INNERは、内部テーブル全体のコピーがすべてのコンピューティングノードにブロードキャストされるらしい。
  • クエリプランの評価の説明は、こちらに記載されている。クエリプランについて、今後理解を深める予定。
->  XN Hash Join DS_BCAST_INNER  (cost=3119.97..1055769620.49 rows=200000 width=31)                                                                             
     Hash Cond: ("outer".derived_col1 = "inner".eventid)                                                                                                       
4.ソート処理を実行する。
  • 後述するXN Network前に実行していることから、各コンピューティングノードで実行している。
 ->  XN Sort  (cost=1001055765307.12..1001055765307.62 rows=200 width=31)
        Sort Key: sum(sales.derived_col2)
5. リーダーノードに送る。
->  XN Network  (cost=1001055765307.12..1001055765307.62 rows=200 width=31)
     Send to leader
6.マージ処理する。
  • リーダーノードでMerge演算子により、コンピュートノードからの結果をマージしソートが行われる。
->  XN Merge  (cost=1001055765307.12..1001055765307.62 rows=200 width=31)
      Merge Key: sum(sales.derived_col2)
7.結果の絞り込み。
  • select句のtop 10が指定されているので、Limit演算子により10行に絞られる。
XN Limit  (cost=1001055765307.12..1001055765307.15 rows=10 width=31)

考察

  • Spectrumを使うことで、S3上のファイルを直接読めることがわかった。
  • 今後、クエリプランの理解と、Athena上での結合処理との比較をする予定。

参考

0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0