LoginSignup
1
4

More than 5 years have passed since last update.

TreasureDataにAdobeAnalyticsのデータを取り込みたい!

Last updated at Posted at 2018-06-19

はじめに

某treasuredataユーザーです。えらく久しぶりの投稿になってしまった。下書きにいっぱいあるんだけど・・・
Adobe Analytics(以下AA)からレポーティングで出力したcsv(AAのFTPにある)をTDに何とかして格納したい物語。

取り込むファイル

レポーティングの結果出力であるためこんなことになっている。

######################################################################
# Company:,xxxx
# URL:,.
# Site:,myApp
# Range:,Mon. 11 Jun. 2018
# Report:,xxxxx Report
# Description:,"分析用)"
######################################################################
# Report Options:
# Report Type: ,"Ranked"
# Selected Metrics: ,"Instances"
# Correlation Filter: ,"None"
# Data Filter: ,"(advanced filter...)"
# Compare to Report Suite: ,"None"
# Compare to Segment: ,"None"
# Percent Shown as: ,"Number"
# Segment: ,"ログ:iOS & Android"
######################################################################
#
# Copyright 2018 Adobe Systems Incorporated. All rights reserved.
# Use of this document signifies your agreement to the Terms of Use (http://marketing.adobe.com/resources/help/terms.html?type=prod&locale=en_US) and Online Privacy Policy (http://my.omniture.com/x/privacy).
# Adobe Systems Incorporated products and services are licensed under the following Netratings patents: 5675510 5796952 6115680 6108637 6138155 6643696 and 6763386.
#
######################################################################

,contents,Instances,
1.,aaa|a011|push,10131,7.8%
2.,aaa|a011|none,7733,5.8%
3.,bbb|b011|push,3258,2.7%
4.,ccc|c011|push,1292,2.2%
5.,ddd|d011|push,1711,2.0%
6.,aaaa|aa0011|none,1034,1.5%
7.,bbbb|bb011|push,27,1.3%
…
…

いらねーーーなにこのヘッダー・・・
まぁでもヘッダーskipする設定ができたとおもうから26行目まで飛ばしちゃえばいいのね。

取り込み手段その1 GUI編

TD connector(GUI)

トレジャーデータのコンソールにあるconnectionsから行けるコネクター
image.png

(ここにAAがあれば苦労しないわな)
今回はADOBE ANALYTICS側の指定エリアにレポーティングバッチで吐き出してもらいそれを取りにいくのでFTP。
FTPを選んでNewConnectionをこんな感じで作成
image.png

こんな感じの画面が続き、エラーになります。

f-05.GIF

JSON.parse: unexpected character at line 1 column 1 of the JSON data

どうやら、transferを設定した瞬間にfetchしにいってしまうので上記のヘッダー(26行目)までを読み込もうとしてエラーになってしまうようです。(skip lineの設定はこの次の画面でしかできない)
ということで
結果:GUIからはできませんでした。

取り込み手段その2 CLI

画面の仕様の問題っぽいな。じゃTD client(コマンドライン)を使ってならいけるんじゃね

この通りやりました。
https://docs.treasuredata.com/articles/data-connector-ftp#a-modes-for-out-plugin
こんなymlつくって

daily_load.yml

in:
  type: ftp
  host: xxxxxxxxxxx.com
  port: xx
  user: xxxxxxxxxxxxxxxx
  password: xxxxxxxxxxxxxxx
  path_prefix: /
  parser:
   charset: UTF-8
   newline: LF
   type: csv
   delimiter: ','
   escape: null
   skip_header_lines: 26
   columns:
   - name: seq
     type: string
   - name: contents
     type: string
   - name: instances
     type: string
   - name: ratio
     type: string
out:
  mode: append
filters:
- type: add_time
  to_column:
    name: time
    type: timestamp
  from_value:
    mode: upload_time # insert a scheduled time into the `time` column

(timeカラム追加忘れずに)

$ td connector:issue load.yml --database td_sample_db --table td_sample_table

入った!!!わーい。

しかし問題発生。
ftpコネクターの仕様でこんなものがあった。

For the scheduled import, the Data Connector for FTP imports all files that match with the specified prefix (e.g. path_prefix: path/to/sample_ –> path/to>sample_201501.csv.gz, path/to/sample_201502.csv.gz, …, path/to/sample_201505.csv.gz) at first and remembers the last path (path/to/sample_201505.csv.gz) for the next execution.

スケジュールされたインポートの場合、FTP用データコネクタは指定されたプレフィックス(たとえば、path_prefix:path / to / sample_ - > path / to / sample_201501.csv.gz、path / to / sample_201502.csv.gzなど)に一致するすべてのファイルをインポートします。
...、path / to / sample_201505.csv.gz)を保存し、次の実行の最終パス(path / to / sample_201505.csv.gz)を覚えています。

ほう。最新のものしか取らないということね。

On the second and on subsequent runs, the connector imports only files that comes after the last path in alphabetical (lexicographic) order. (path/to/sample_201506.csv.gz, …)

2回目以降の実行では、コネクターは最後のパスの後ろにあるファイルのみをアルファベット順(辞書順)でインポートします。
(path / to / sample_201506.csv.gz、...)

アルファベット順?????ファイル作成日時とかにできないの???→できない。
じゃ覚えなくていいから毎回全件入れてよ。→それもできない。
だって今回のファイル名は
(例)1 「today_report_Wed. 13 Jun. 2018.csv」
(例)2 「today_report_Thu. 14 Jun. 2018.csv」
こんな感じ。アルファベット順だと14日のファイルが13日より前になっちゃう・・・。
adobe側の設定でもどうにもできない。どうすんだこのありがた迷惑。

結果:CLIでもダメでした

取り込み手段その3 td workflow

画面もCLIも結局はworkflowを生成してくれてるんでしょ(多分)じゃwf書くんならdigdagでプログラムチックに色々できるだろ

なんだあるじゃんあるじゃんサンプル。色々調べていたら結構この方とかコネクターからwfに書き換えてたりする。
https://qiita.com/oqrusk/items/e47913c0de271859afe0
(↑だいぶ参考になりましたありがとうございました)

wfで日付の形式をプログラムっぽく変えてしまえばいいのでは。
moment.jsのライブラリを呼ぶことができるのでこんな感じでいけるかな。

moment(session_time).format("ddd. DD MMM. YYYY")}

いけました。
出来上がったdigはこんな感じ

daily_load.dig

timezone: Asia/Tokyo

schedule:
  daily>: 07:30:00

_export:
  td:
    database: xxxxxxxxxx
    tmp_table: tmp_xxxxx
    load_table: xxxxx
    engine: presto
  fd:
    filename: "/today_report_"
    extension: ".csv"

# 最初こんな感じでやろうとしたら変数のoverwriteに不具合があるとかでできなかった・・・。
# yml側に書けばいけるらしいよ。
#   dates: ${moment(session_time).add(-1, 'days').format("ddd. DD MMM. YYYY")}

  ftp:
    path_prefix: ${fd.filename}${moment(session_time).add(-1, 'days').format("ddd. DD MMM. YYYY")}${fd.extension}
    ssl: false
    ssl_verify: false

+echo2:
    echo>: 'I take this file: ${ftp.path_prefix}'

+prepare_table:
  +create_tmp_table_if_not_exists:
   td_ddl>:
   database: ${td.database}
   create_tables: ["${td.tmp_table}"]

  +create_load_table_if_not_exists:
   td_ddl>:
   database: ${td.database}
   create_tables: ["${td.load_table}"]

+load_step:
  td_load>: config/daily_load.yml
  database: ${td.database}
  table: ${td.tmp_table}

# 1回tmpに入れてtmpから読込先のTBLの同じ日付のレコードを削除してから入れるように一応した。
+delete_for_rerun_step:
  td>: queries/00_del_rerun.sql
  delete_for: ${td.load_table}
  input_tbl: ${td.tmp_table}
  engine: presto

+insert_step:
  td>: queries/01_tmp_to.sql
  insert_into: ${td.load_table}
  input_tbl: ${td.tmp_table}
  engine: presto

ってことでこのdigファイルとできました。load用のymlファイル

daily_load.yml

in:
  type: ftp
  host: ${secret:ftp.host}
  port: ${secret:ftp.port}
  user: ${secret:ftp.user}
  password: ${secret:ftp.password}
  path_prefix: ${ftp.path_prefix}
  parser:
   charset: UTF-8
   newline: LF
   type: csv
   delimiter: ','
   escape: null
   skip_header_lines: 26
   columns:
   - name: seq
     type: string
   - name: contents
     type: string
   - name: instances
     type: string
   - name: per
     type: string
out:
  mode: replace
filters:
- type: add_time
  to_column:
    name: time
    type: timestamp
  from_value:
    mode: upload_time # insert a scheduled time into the `time` column
00_del_rerun.sql

Delete from ${delete_for} where  ${delete_for}.create_time IN
 (  SELECT TD_TIME_FORMAT(time,'yyyy-MM-dd','JST') 
 from  ${input_tbl} );

とtmpからオリジナルに読み込む用のSQLを用意してtd wf push!!!!

01_tmp_to.sql
-- DIGDAG_INSERT_LINE
select
 TD_TIME_FORMAT(td_time_add(time,'-1d'),'yyyy-MM-dd','JST') as create_time,
 replace(seq,'.','') as seq,
 split_part(contents,'|',1) as category,
 split_part(contents,'|',2) as sub_category,
 split_part(contents,'|',3) as itemid,
 split_part(contents,'|',4) as pushed,
 instances,
 replace(per,'%','') as ratio
from 
 ${input_tbl};

同じような問題にぶつかった方のためにご参考まで記します。

追記:RUNやpushの仕方、環境変数的なユーザー名やパスワードの埋め込み方(secret)の設定などはこちらを御覧ください
https://github.com/treasure-data/workflow-examples/tree/master/td_load/ftp

以上です。お疲れ様でした。なんか他にもpartial_deleteが上手くいかないとかありましたけどそれもまた書きます。
ぶつかる度に聞いてしまったトレジャーデータのサポートのnodaさんkamikasedaさんtoruさん皆様、ありがとうございました。

追記:7月になったらデータが取り込めていないという事象が発覚。なんでやとおもったら

2018-07-02 22:31:29.419 +0000 [INFO] (0001:transaction): Listing ftp files at directory '/' filtering filename by prefix 'xxxx_report_Mon. 02 Jul. 2018.csv'
2018-07-02 22:31:31.414 +0000 [INFO] (0001:transaction): No records to be inserted

ファイル名は1桁日付ZERO埋めなし:xxxx_report_Mon. 2 Jul. 2018.csv'

修正中。

1
4
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
1
4