はじめに
某treasuredataユーザーです。えらく久しぶりの投稿になってしまった。下書きにいっぱいあるんだけど・・・
Adobe Analytics(以下AA)からレポーティングで出力したcsv(AAのFTPにある)をTDに何とかして格納したい物語。
取り込むファイル
レポーティングの結果出力であるためこんなことになっている。
######################################################################
# Company:,xxxx
# URL:,.
# Site:,myApp
# Range:,Mon. 11 Jun. 2018
# Report:,xxxxx Report
# Description:,"分析用)"
######################################################################
# Report Options:
# Report Type: ,"Ranked"
# Selected Metrics: ,"Instances"
# Correlation Filter: ,"None"
# Data Filter: ,"(advanced filter...)"
# Compare to Report Suite: ,"None"
# Compare to Segment: ,"None"
# Percent Shown as: ,"Number"
# Segment: ,"ログ:iOS & Android"
######################################################################
#
# Copyright 2018 Adobe Systems Incorporated. All rights reserved.
# Use of this document signifies your agreement to the Terms of Use (http://marketing.adobe.com/resources/help/terms.html?type=prod&locale=en_US) and Online Privacy Policy (http://my.omniture.com/x/privacy).
# Adobe Systems Incorporated products and services are licensed under the following Netratings patents: 5675510 5796952 6115680 6108637 6138155 6643696 and 6763386.
#
######################################################################
,contents,Instances,
1.,aaa|a011|push,10131,7.8%
2.,aaa|a011|none,7733,5.8%
3.,bbb|b011|push,3258,2.7%
4.,ccc|c011|push,1292,2.2%
5.,ddd|d011|push,1711,2.0%
6.,aaaa|aa0011|none,1034,1.5%
7.,bbbb|bb011|push,27,1.3%
…
…
いらねーーーなにこのヘッダー・・・
まぁでもヘッダーskipする設定ができたとおもうから26行目まで飛ばしちゃえばいいのね。
取り込み手段その1 GUI編
TD connector(GUI)
トレジャーデータのコンソールにあるconnectionsから行けるコネクター
(ここにAAがあれば苦労しないわな)
今回はADOBE ANALYTICS側の指定エリアにレポーティングバッチで吐き出してもらいそれを取りにいくのでFTP。
FTPを選んでNewConnectionをこんな感じで作成
こんな感じの画面が続き、エラーになります。
JSON.parse: unexpected character at line 1 column 1 of the JSON data
どうやら、transferを設定した瞬間にfetchしにいってしまうので上記のヘッダー(26行目)までを読み込もうとしてエラーになってしまうようです。(skip lineの設定はこの次の画面でしかできない)
ということで
結果:GUIからはできませんでした。
取り込み手段その2 CLI
画面の仕様の問題っぽいな。じゃTD client(コマンドライン)を使ってならいけるんじゃね
この通りやりました。
https://docs.treasuredata.com/articles/data-connector-ftp#a-modes-for-out-plugin
こんなymlつくって
in:
type: ftp
host: xxxxxxxxxxx.com
port: xx
user: xxxxxxxxxxxxxxxx
password: xxxxxxxxxxxxxxx
path_prefix: /
parser:
charset: UTF-8
newline: LF
type: csv
delimiter: ','
escape: null
skip_header_lines: 26
columns:
- name: seq
type: string
- name: contents
type: string
- name: instances
type: string
- name: ratio
type: string
out:
mode: append
filters:
- type: add_time
to_column:
name: time
type: timestamp
from_value:
mode: upload_time # insert a scheduled time into the `time` column
(timeカラム追加忘れずに)
$ td connector:issue load.yml --database td_sample_db --table td_sample_table
入った!!!わーい。
しかし問題発生。
ftpコネクターの仕様でこんなものがあった。
For the scheduled import, the Data Connector for FTP imports all files that match with the specified prefix (e.g. path_prefix: path/to/sample_ –> path/to>sample_201501.csv.gz, path/to/sample_201502.csv.gz, …, path/to/sample_201505.csv.gz) at first and remembers the last path (path/to/sample_201505.csv.gz) for the next execution.
スケジュールされたインポートの場合、FTP用データコネクタは指定されたプレフィックス(たとえば、path_prefix:path / to / sample_ - > path / to / sample_201501.csv.gz、path / to / sample_201502.csv.gzなど)に一致するすべてのファイルをインポートします。
...、path / to / sample_201505.csv.gz)を保存し、次の実行の最終パス(path / to / sample_201505.csv.gz)を覚えています。
ほう。最新のものしか取らないということね。
On the second and on subsequent runs, the connector imports only files that comes after the last path in alphabetical (lexicographic) order. (path/to/sample_201506.csv.gz, …)
2回目以降の実行では、コネクターは最後のパスの後ろにあるファイルのみをアルファベット順(辞書順)でインポートします。
(path / to / sample_201506.csv.gz、...)
アルファベット順?????ファイル作成日時とかにできないの???→できない。
じゃ覚えなくていいから毎回全件入れてよ。→それもできない。
だって今回のファイル名は
(例)1 「today_report_Wed. 13 Jun. 2018.csv」
(例)2 「today_report_Thu. 14 Jun. 2018.csv」
こんな感じ。アルファベット順だと14日のファイルが13日より前になっちゃう・・・。
adobe側の設定でもどうにもできない。どうすんだこのありがた迷惑。
結果:CLIでもダメでした
取り込み手段その3 td workflow
画面もCLIも結局はworkflowを生成してくれてるんでしょ(多分)じゃwf書くんならdigdagでプログラムチックに色々できるだろ
なんだあるじゃんあるじゃんサンプル。色々調べていたら結構この方とかコネクターからwfに書き換えてたりする。
https://qiita.com/oqrusk/items/e47913c0de271859afe0
(↑だいぶ参考になりましたありがとうございました)
wfで日付の形式をプログラムっぽく変えてしまえばいいのでは。
moment.jsのライブラリを呼ぶことができるのでこんな感じでいけるかな。
moment(session_time).format("ddd. DD MMM. YYYY")}
いけました。
出来上がったdigはこんな感じ
timezone: Asia/Tokyo
schedule:
daily>: 07:30:00
_export:
td:
database: xxxxxxxxxx
tmp_table: tmp_xxxxx
load_table: xxxxx
engine: presto
fd:
filename: "/today_report_"
extension: ".csv"
# 最初こんな感じでやろうとしたら変数のoverwriteに不具合があるとかでできなかった・・・。
# yml側に書けばいけるらしいよ。
# dates: ${moment(session_time).add(-1, 'days').format("ddd. DD MMM. YYYY")}
ftp:
path_prefix: ${fd.filename}${moment(session_time).add(-1, 'days').format("ddd. DD MMM. YYYY")}${fd.extension}
ssl: false
ssl_verify: false
+echo2:
echo>: 'I take this file: ${ftp.path_prefix}'
+prepare_table:
+create_tmp_table_if_not_exists:
td_ddl>:
database: ${td.database}
create_tables: ["${td.tmp_table}"]
+create_load_table_if_not_exists:
td_ddl>:
database: ${td.database}
create_tables: ["${td.load_table}"]
+load_step:
td_load>: config/daily_load.yml
database: ${td.database}
table: ${td.tmp_table}
# 1回tmpに入れてtmpから読込先のTBLの同じ日付のレコードを削除してから入れるように一応した。
+delete_for_rerun_step:
td>: queries/00_del_rerun.sql
delete_for: ${td.load_table}
input_tbl: ${td.tmp_table}
engine: presto
+insert_step:
td>: queries/01_tmp_to.sql
insert_into: ${td.load_table}
input_tbl: ${td.tmp_table}
engine: presto
ってことでこのdigファイルとできました。load用のymlファイル
in:
type: ftp
host: ${secret:ftp.host}
port: ${secret:ftp.port}
user: ${secret:ftp.user}
password: ${secret:ftp.password}
path_prefix: ${ftp.path_prefix}
parser:
charset: UTF-8
newline: LF
type: csv
delimiter: ','
escape: null
skip_header_lines: 26
columns:
- name: seq
type: string
- name: contents
type: string
- name: instances
type: string
- name: per
type: string
out:
mode: replace
filters:
- type: add_time
to_column:
name: time
type: timestamp
from_value:
mode: upload_time # insert a scheduled time into the `time` column
Delete from ${delete_for} where ${delete_for}.create_time IN
( SELECT TD_TIME_FORMAT(time,'yyyy-MM-dd','JST')
from ${input_tbl} );
とtmpからオリジナルに読み込む用のSQLを用意してtd wf push!!!!
-- DIGDAG_INSERT_LINE
select
TD_TIME_FORMAT(td_time_add(time,'-1d'),'yyyy-MM-dd','JST') as create_time,
replace(seq,'.','') as seq,
split_part(contents,'|',1) as category,
split_part(contents,'|',2) as sub_category,
split_part(contents,'|',3) as itemid,
split_part(contents,'|',4) as pushed,
instances,
replace(per,'%','') as ratio
from
${input_tbl};
同じような問題にぶつかった方のためにご参考まで記します。
追記:RUNやpushの仕方、環境変数的なユーザー名やパスワードの埋め込み方(secret)の設定などはこちらを御覧ください
https://github.com/treasure-data/workflow-examples/tree/master/td_load/ftp
以上です。お疲れ様でした。なんか他にもpartial_deleteが上手くいかないとかありましたけどそれもまた書きます。
ぶつかる度に聞いてしまったトレジャーデータのサポートのnodaさんkamikasedaさんtoruさん皆様、ありがとうございました。
追記:7月になったらデータが取り込めていないという事象が発覚。なんでやとおもったら
2018-07-02 22:31:29.419 +0000 [INFO] (0001:transaction): Listing ftp files at directory '/' filtering filename by prefix 'xxxx_report_Mon. 02 Jul. 2018.csv'
2018-07-02 22:31:31.414 +0000 [INFO] (0001:transaction): No records to be inserted
ファイル名は1桁日付ZERO埋めなし:xxxx_report_Mon. 2 Jul. 2018.csv'
修正中。