概要
Treasure Data CDPでは他ツールからデータを取得する方法が様々ございます。
今回は、データコネクターとWorkflowを使ってデータ取得する方法を紹介します。
インポートの種類
Data Connectorによるインポート処理の強み
「WEB UI」
Sourceであれば簡易的な設定で取り込みがスムーズですが
Workflowを利用することで簡単に依存関係を設けることができます。
pipelineのようなイメージですね。
※余談:「ワークフロー」は実際に現場で行われる作業フローのことで、その中でデザイナーの画力が発揮されます。 「パイプライン」はワークフローの様々な箇所で利用されるツールなどをまとめたもの
設定方法
手順
大まかな流れ!
① ワークフロー実行dig & yamlファイルを作成
② テーブルを作成
③ ワークフローを実施
詳細
① yamlファイルを作成
# 「in」は取り込み元のデータソースを指定します
in:
# 取り込み元は「s3」
type: s3
# 「s3(AWS)」のアクセスID
access_key_id: XXX
# 「s3(AWS)」のアクセスキー
secret_access_key: XXX
# 「s3(AWS)」のバケット
bucket: XXX
# 「s3(AWS)」のパス
path_prefix: XXX
# 新規登録として指定
mode: insert
# 取り込み対象ファイルのパースを設定
parser:
# 文字コード
charset: UTF-8
# CSVの改行コード
newline: LF
# ファイル・タイプ
type: csv
# CSVの区切り文字
delimiter: ','
# CSVの文字列区切り文字
quote: '"'
# CSVのエスケープ文字
escape: '"'
# ‘’をnullに変換
null_string: ''
# クォートされていないと文字列前後の空白を削除、初期値false
trim_if_not_quoted: false
# CSVの1行目を取り込まない
skip_header_lines: 1
# 「true」の場合は過多なカラムを取り込まない
allow_extra_columns: false
# 「true」の場合、足りていないカラムを無視
allow_optional_columns: false
# 「true」の場合でパースエラーが発生した場合停止させる
stop_on_invalid_record: true
# デフォルトのタイムゾーン
default_timezone: 'Asia/Tokyo'
# カラムを定義
columns:
- {name: id, type: string}
- {name: text, type: string}
filters:
- type: add_time
to_column:
name: time
type: timestamp
from_value:
value: ${session_unixtime}
ワークフロー実行
digファイル
(余談;Digdagのワークフロー定義について)
Digdagのワークフローは拡張子*.digを持つファイルとして作成
# リージョンとスケジュール設定
timezone: Asia/Tokyo
schedule:
daily>: 00:00:00
_export:
# S3取り込み先データベース
to_database: xxxx
# ↑!includeを使用して子ファイルの呼び出しも可能です。
# S3取り込み先テーブル
to_table: xxxx
+import:
call>: digファイル
# 上記のS3取り込み設定ファイルを指定
td_load>: s3.yml
database: データベース名
table: テーブル名
② テーブルを作成
先にテーブルを作成する
データをインポートするための箱を先に作るので
yamlファイルに記載した型に合わせたテーブルを先に作成
create table データセット名.テーブル名 as
select
'' as column_1 --文字列
1 as column_1 --数値型
from
データセット.テーブル -- 仮テーブル/なんでもよし
limit 1 -- 1行のみ
③ ワークフローを実施
成功を確認!
参照資料