bigquery
gcp
Embulk
CloudDataflow

BigQuery と Cloud DataFlow でデータ分析基盤を作る練習(準備編)


こんな感じのことがしたい

bi.png

サービスで利用しているデータが mongo に入っていて、それを KPI などで利用したい。

ほぼこれと同じカタチ


練習として Twitter のデータで試す

以下、目次(※ 準備編では手順2まで)

1. Twitter API で検索したツイートデータをローカルの mongo に入れる

2. ローカルの mongo のデータを BigQuery にそのまま上げる

3. BigQuery に上げたデータを Cloud Dataflow を使って加工して、新たに BigQuery にテーブルを作成

4. 加工したテーブルをダッシュボードで見れるようにしたり、メールやslackに飛ばす。できたらつい先日でた BigQuery ML も試す


環境

ローカルマシン : Mac Pro (Late 2013)

OS : Mac OS High Sierra 10.13.6


仮想環境

1, 2 の手順は仮想環境の Ubuntu で行っている

VirtualBox : 5.2.12

Vagrant : 2.1.1

OS : Ubuntu 16.04

mongodb : v3.6.5

digdag : 0.9.25

embulk : 0.9.7

python : 3.6.5


1. Twitter API で検索したツイートデータを mongo に入れる

digdag + embulk で入れてます。

ただ自分の環境でやりやすいものがあったので利用しただけで、なんでもいいです。

ローカルの mongodb に突っ込むので、mongo は入れておいてください。

検索するハッシュタグは #SiroTalk でリツイートを除きます。

twitter という DB の search_tweet というコレクションに入れます。

dig ファイルにスケジュールを指定していませんが、毎回これを叩けば最新のツイートが無駄なく取れるようになっています。

ただし Twitter API の仕様上 検索は1週間分しかとれないので、それ以上間隔をあけると漏れます。

久しぶりに Twitter API を触ったら tweet_mode=extended これを入れておかないと全文取れないことに気づきました。


twitter.dig

timezone: Asia/Tokyo

_export:
plugin:
repositories:
- https://jitpack.io
dependencies:
- com.github.szyn:digdag-slack:0.1.4
webhook_url: 自分の入れてね
workflow_name: tweet_search
ENV: dev

+get_tweet_data:
py>: api.search_tweets_all
_error:
slack>: danger-template.yml

+finish:
slack>: good-template.yml



twitter.py

# coding: utf-8

from __future__ import print_function
import os
import requests
import json
import time
import calendar
import math
from datetime import datetime
from pymongo import MongoClient
from requests_oauthlib import OAuth1
import digdag

CONSUMER_KEY = "自分のいれてね"
CONSUMER_SECRET = "自分のいれてね"
ACCESS_TOKEN = "自分のいれてね"
ACCESS_TOKEN_SECRET = "自分のいれてね"

SEARCH_API_URL = 'https://api.twitter.com/1.1/search/tweets.json?tweet_mode=extended'

# Twitter検索
def search_tweets_all():
# Twitter 認証
oauth = OAuth1(CONSUMER_KEY, CONSUMER_SECRET, ACCESS_TOKEN, ACCESS_TOKEN_SECRET)

# db に入っている最新のツイートの tweet_id を取得
# 重複しないように + 1
since_id = get_last_tweet_id_from_db()
since_id = since_id + 1

# 検索クエリ(#SiroTalk をリツイートを除いて検索)
query_form = {'q': u'#SiroTalk exclude:retweets', 'count': 100, 'result_type': 'recent', 'since_id': since_id}

tweet_array = []

while True:
# Tweet データ取得
response = requests.get(SEARCH_API_URL, auth = oauth, params = query_form)
response_json = response.json()
response_tweet_array = response_json['statuses']

# 検索結果がなかったら終了
if len(response_tweet_array) == 0:
break

# 検索結果を結合
tweet_array.extend(response_tweet_array)

# 次の検索のために tweet_id を今とってきた tweet の一番古い id - 1 に指定
last_tweet = response_tweet_array[len(response_tweet_array) - 1]
last_tweet_id = last_tweet['id']
max_id = last_tweet_id - 1
query_form['max_id'] = max_id

# mongo に insert
for tweet in tweet_array:
# BigQuery でのパーティションのために、unixtime を ct として追加
created_at_time_utc = time.strptime(tweet['created_at'], '%a %b %d %H:%M:%S +0000 %Y')
unix_time = calendar.timegm(created_at_time_utc)
tweet['ct'] = unix_time
insert_tweet(tweet)

# mongo に insert
def insert_tweet(tweet):
client = MongoClient('localhost', 27017)
db = client.twitter
collection = db.search_tweet

# もし既に同じ tweet_id が db に入っていたら、update するようにする
tweet_id = tweet['id']
result = collection.update(
{'tweet_id': tweet_id}
, {'$set': tweet}
, upsert=True
)
return result

def get_last_tweet_id_from_db():
client = MongoClient('localhost', 27017)
db = client.twitter
collection = db.search_tweet

last_tweet_array = collection.find().sort('tweet_id', -1).limit(1)
last_tweet = last_tweet_array[0]
last_tweet_id = last_tweet['tweet_id']
return last_tweet_id


mongo でデータが入っているか確認

$mongo

> use twitter
> db.search_tweet.findOne()


2. ローカルの mongo のデータを BigQuery にそのまま上げる

取得した1週間分のツイートデータを BigQuery に上げます。

embulk gem install で予め以下が必要

embulk-input-mongodb

embulk-filter-expand_json

embulk-output-bigquery


mongo_to_bq.dig

timezone: Asia/Tokyo

_export:
plugin:
repositories:
- https://jitpack.io
dependencies:
- com.github.szyn:digdag-slack:0.1.4
webhook_url: 自分の入れてね
workflow_name: mongo_to_bq
ENV: dev

+mongo_to_bq:
_export:
COLLECTION_NAME: search_tweet
START_DAY: 2018-07-21
END_DAY: 2018-07-29

+insert:
loop>: ${moment(END_DAY).diff(moment(START_DAY), 'day')}
_parallel: true
_do:
_export:
START_TIME: ${moment(START_DAY).add(i, 'days').startOf('day').unix()}
END_TIME: ${moment(START_DAY).add(i + 1, 'days').startOf('day').unix()}
PARTITION: ${moment(START_DAY).add(i, 'days').format("YYYYMMDD")}

+embulk:
sh>: embulk run mongo_to_bq.yml.liquid

_error:
slack>: danger-template.yml

+finish:
slack>: good-template.yml



mongo_to_bq.yml.liquid

in:

type: mongodb
uri: mongodb://localhost:27017/twitter
collection: {{ env.COLLECTION_NAME }}
query: '{ "ct": { $gte: {{ env.START_TIME }}, $lt: {{ env.END_TIME }} } }'

filters:
- type: expand_json
json_column_name: record
expanded_columns:
- {name: "tweet_id", type: long}
- {name: "created_at", type: string}
- {name: "ct", type: long}
- {name: "full_text", type: string}
- {name: "favorite_count", type: long}
- {name: "retweet_count", type: long}
- {name: "source", type: string}
- {name: "lang", type: string}
- {name: "place", type: json}
- {name: "geo", type: json}
- {name: "user", type: json}
- {name: "contributors", type: json}
- {name: "coordinates", type: json}
- {name: "favorited", type: boolean}
- {name: "retweeted", type: boolean}
- {name: "truncated", type: boolean}
- {name: "in_reply_to_screen_name", type: string}
- {name: "in_reply_to_status_id", type: long}
- {name: "in_reply_to_user_id", type: long}
- {name: "is_quote_status", type: json}
- {name: "metadata", type: json}
- {name: "possibly_sensitive", type: boolean}
- {name: "display_text_range", type: json}
- {name: "entities", type: json}
- {name: "extended_entities", type: json}

out:
type: bigquery
auth_method: json_key
# GCPからダウンロードしてきたjsonファイルのパスを書きます
json_keyfile: 自分のいれてね.json
project: 自分のいれてね
dataset: 自分のいれてね
table: 自分のいれてね${{ env.PARTITION }}
auto_create_table: true
mode: append
prevent_duplicate_insert: true
allow_quoted_newlines: true

# gcs_bucket: 自分のいれてね
# auto_create_gcs_bucket: true
# compression: GZIP


gcs_bucket を使った方が効率的と見て、使いたかったのですが以下のエラーが解決できず断念

undefined local variable or method `path bigquery gcs

BigQuery にデータが入っていることを確認

スクリーンショット 2018-07-29 17.25.22.png


3. Cloud Dataflow を使って加工

また今度

ハマっててまだできていません。

Dataflow は java と python が用意されているのですが

python は足りていない機能があるようなのと、 java はコードの書き方が独特すぎて難しかったので Scio を使おうと思っています。

このあたりの理由はこちらとかを参考に