TreasureDataのテーブルにsftpサーバのCSVをワークフローでインポートする方法のメモ
【やりたいこと】
sftpサーバに毎日送られてくる売上CSVを毎日特定のTreasureDataのテーブルにインポート(テーブルは毎日上書きされその日のCSVデータだけが格納)する。
【売上CSVの前提】
売上CSVのファイル名には年月日時分秒が付いてくる。
2019年11月14日の売上CSVの場合
「Sales20191114235959.csv」
となる。
【ワークフロー作成手順】
①とりあえず空のワークフローを作成
・Workflowsの「New Workflow」ボタンを押す
・"Create Workflow"の入力項目を入力する
Workflow Name:ワークフロー名
Project Name:ワークフロー名と同じ
Workflow Template:Blank
・もう一度「New Workflow」を押すと空のワークフローが作成される
②ワークフローの定義(dig)を作成
・①で作成したワークフローを選択
・"Workflow Definition"を選択
・[Edit Workflow]アイコンを押す
・digファイルに以下の内容を記述(毎日8:10に処理を実行する場合)
timezone: "Asia/Tokyo"
schedule:
daily>: 08:10:00
_export:
td:
_db: db_name
_table: table_name
+load_step:
td_load>: yaml/import_csv.yml
database: ${td._db}
table: ${td._table}
<注意点>
>: の後にスペースが無いとエラーになる
>事前にインポート先のテーブルを作っておかないとインポートされない
③インポートの定義(yml)を作成
・[Add Project File]アイコンを押す
・ファイル名に"yaml/import_csv.yml"を入力
・"yaml/import_csv.yml"に以下の内容を記述
---
in:
type: sftp
host: sftpサーバのipアドレス
port: 22
user: sftpLoginID
password: password
user_directory_is_root: false
path_prefix: "/csv_path/Sales"
path_match_pattern: ${moment(session_time).format("YYYYMMDD")}.*csv
parser:
charset: MS932
newline: CRLF
type: csv
delimiter: ","
quote: "\""
escape: "\""
trim_if_not_quoted: false
skip_header_lines: 0
allow_extra_columns: false
allow_optional_columns: false
columns:
- {name: id, type: long}
- {name: description, type: string}
out: {mode: replace}
exec: {}
filters:
- from_value: {mode: upload_time}
to_column: {name: time}
type: add_time
<注意点>
>: のあとにスペースを入れないとエラーになる
>charsetはモビルスーツの型番と間違えてしまいそうな謎の"MS932"、日本語のデータがあるからと"SHIFT-JIS"としてしまうと文字化けする
>columnsはデータに合わせて変えてください
・「Save & Commit」ボタンを押して内容を保存