5
5

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 5 years have passed since last update.

BigQuery と Cloud DataFlow でデータ分析基盤を作る練習(準備編)

Last updated at Posted at 2018-07-29

こんな感じのことがしたい

bi.png

サービスで利用しているデータが mongo に入っていて、それを KPI などで利用したい。
ほぼこれと同じカタチ

練習として Twitter のデータで試す

以下、目次(※ 準備編では手順2まで)

  1. Twitter API で検索したツイートデータをローカルの mongo に入れる
  2. ローカルの mongo のデータを BigQuery にそのまま上げる
  3. BigQuery に上げたデータを Cloud Dataflow を使って加工して、新たに BigQuery にテーブルを作成
  4. 加工したテーブルをダッシュボードで見れるようにしたり、メールやslackに飛ばす。できたらつい先日でた BigQuery ML も試す

環境

ローカルマシン : Mac Pro (Late 2013)
OS : Mac OS High Sierra 10.13.6

仮想環境

1, 2 の手順は仮想環境の Ubuntu で行っている
VirtualBox : 5.2.12
Vagrant : 2.1.1
OS : Ubuntu 16.04
mongodb : v3.6.5
digdag : 0.9.25
embulk : 0.9.7
python : 3.6.5

1. Twitter API で検索したツイートデータを mongo に入れる

digdag + embulk で入れてます。
ただ自分の環境でやりやすいものがあったので利用しただけで、なんでもいいです。
ローカルの mongodb に突っ込むので、mongo は入れておいてください。
検索するハッシュタグは #SiroTalk でリツイートを除きます。
twitter という DB の search_tweet というコレクションに入れます。

dig ファイルにスケジュールを指定していませんが、毎回これを叩けば最新のツイートが無駄なく取れるようになっています。
ただし Twitter API の仕様上 検索は1週間分しかとれないので、それ以上間隔をあけると漏れます。
久しぶりに Twitter API を触ったら tweet_mode=extended これを入れておかないと全文取れないことに気づきました。

twitter.dig
timezone: Asia/Tokyo

_export:
  plugin:
    repositories:
      - https://jitpack.io
    dependencies:
      - com.github.szyn:digdag-slack:0.1.4
  webhook_url: 自分の入れてね
  workflow_name: tweet_search
  ENV: dev

+get_tweet_data:
  py>: api.search_tweets_all
  _error:
    slack>: danger-template.yml

+finish:
  slack>: good-template.yml
twitter.py
# coding: utf-8
from __future__ import print_function
import os
import requests
import json
import time
import calendar
import math
from datetime import datetime
from pymongo import MongoClient
from requests_oauthlib import OAuth1
import digdag

CONSUMER_KEY = "自分のいれてね"
CONSUMER_SECRET = "自分のいれてね"
ACCESS_TOKEN = "自分のいれてね"
ACCESS_TOKEN_SECRET = "自分のいれてね"

SEARCH_API_URL = 'https://api.twitter.com/1.1/search/tweets.json?tweet_mode=extended'


# Twitter検索
def search_tweets_all():
    # Twitter 認証
    oauth = OAuth1(CONSUMER_KEY, CONSUMER_SECRET, ACCESS_TOKEN, ACCESS_TOKEN_SECRET)

    # db に入っている最新のツイートの tweet_id を取得
    # 重複しないように + 1
    since_id = get_last_tweet_id_from_db()
    since_id = since_id + 1

    # 検索クエリ(#SiroTalk をリツイートを除いて検索)
    query_form = {'q': u'#SiroTalk exclude:retweets', 'count': 100, 'result_type': 'recent', 'since_id': since_id}

    tweet_array = []

    while True:
        # Tweet データ取得
        response = requests.get(SEARCH_API_URL, auth = oauth, params = query_form)
        response_json = response.json()
        response_tweet_array = response_json['statuses']
        
        # 検索結果がなかったら終了
        if len(response_tweet_array) == 0:
            break

        # 検索結果を結合
        tweet_array.extend(response_tweet_array)

        # 次の検索のために tweet_id を今とってきた tweet の一番古い id - 1 に指定
        last_tweet = response_tweet_array[len(response_tweet_array) - 1]
        last_tweet_id = last_tweet['id']
        max_id = last_tweet_id - 1
        query_form['max_id'] = max_id

    # mongo に insert
    for tweet in tweet_array:
        # BigQuery でのパーティションのために、unixtime を ct として追加
        created_at_time_utc = time.strptime(tweet['created_at'], '%a %b %d %H:%M:%S +0000 %Y')
        unix_time = calendar.timegm(created_at_time_utc)
        tweet['ct'] = unix_time
        insert_tweet(tweet)


# mongo に insert
def insert_tweet(tweet):
    client = MongoClient('localhost', 27017)
    db = client.twitter
    collection = db.search_tweet

    # もし既に同じ tweet_id が db に入っていたら、update するようにする
    tweet_id = tweet['id']
    result = collection.update(
        {'tweet_id': tweet_id}
        , {'$set': tweet}
        , upsert=True
    )
    return result


def get_last_tweet_id_from_db():
    client = MongoClient('localhost', 27017)
    db = client.twitter
    collection = db.search_tweet

    last_tweet_array = collection.find().sort('tweet_id', -1).limit(1)
    last_tweet = last_tweet_array[0]
    last_tweet_id = last_tweet['tweet_id']
    return last_tweet_id

mongo でデータが入っているか確認

$mongo

> use twitter
> db.search_tweet.findOne()

2. ローカルの mongo のデータを BigQuery にそのまま上げる

取得した1週間分のツイートデータを BigQuery に上げます。

embulk gem install で予め以下が必要
embulk-input-mongodb
embulk-filter-expand_json
embulk-output-bigquery

mongo_to_bq.dig
timezone: Asia/Tokyo

_export:
  plugin:
    repositories:
      - https://jitpack.io
    dependencies:
      - com.github.szyn:digdag-slack:0.1.4
  webhook_url: 自分の入れてね
  workflow_name: mongo_to_bq
  ENV: dev

+mongo_to_bq:
  _export:
    COLLECTION_NAME: search_tweet
    START_DAY: 2018-07-21
    END_DAY: 2018-07-29

  +insert:
    loop>: ${moment(END_DAY).diff(moment(START_DAY), 'day')}
    _parallel: true
    _do:
      _export:
        START_TIME: ${moment(START_DAY).add(i, 'days').startOf('day').unix()}
        END_TIME: ${moment(START_DAY).add(i + 1, 'days').startOf('day').unix()}
        PARTITION: ${moment(START_DAY).add(i, 'days').format("YYYYMMDD")}

      +embulk:
        sh>: embulk run mongo_to_bq.yml.liquid
  
  _error:
    slack>: danger-template.yml
  
+finish:
  slack>: good-template.yml
mongo_to_bq.yml.liquid
in:
  type: mongodb
  uri: mongodb://localhost:27017/twitter
  collection: {{ env.COLLECTION_NAME }}
  query: '{ "ct": { $gte: {{ env.START_TIME }}, $lt: {{ env.END_TIME }} } }'

filters:
- type: expand_json
  json_column_name: record
  expanded_columns:
    - {name: "tweet_id", type: long}
    - {name: "created_at", type: string}
    - {name: "ct", type: long}
    - {name: "full_text", type: string}
    - {name: "favorite_count", type: long}
    - {name: "retweet_count", type: long}
    - {name: "source", type: string}
    - {name: "lang", type: string}
    - {name: "place", type: json}
    - {name: "geo", type: json}
    - {name: "user", type: json}
    - {name: "contributors", type: json}
    - {name: "coordinates", type: json}
    - {name: "favorited", type: boolean}
    - {name: "retweeted", type: boolean}
    - {name: "truncated", type: boolean}
    - {name: "in_reply_to_screen_name", type: string}
    - {name: "in_reply_to_status_id", type: long}
    - {name: "in_reply_to_user_id", type: long}
    - {name: "is_quote_status", type: json}
    - {name: "metadata", type: json}
    - {name: "possibly_sensitive", type: boolean}
    - {name: "display_text_range", type: json}
    - {name: "entities", type: json}
    - {name: "extended_entities", type: json}
  
out:
  type: bigquery
  auth_method: json_key
  # GCPからダウンロードしてきたjsonファイルのパスを書きます
  json_keyfile: 自分のいれてね.json
  project: 自分のいれてね
  dataset: 自分のいれてね
  table: 自分のいれてね${{ env.PARTITION }}
  auto_create_table: true
  mode: append
  prevent_duplicate_insert: true
  allow_quoted_newlines: true

  # gcs_bucket: 自分のいれてね
  # auto_create_gcs_bucket: true
  # compression: GZIP

gcs_bucket を使った方が効率的と見て、使いたかったのですが以下のエラーが解決できず断念

undefined local variable or method `path bigquery gcs

BigQuery にデータが入っていることを確認

スクリーンショット 2018-07-29 17.25.22.png

3. Cloud Dataflow を使って加工

また今度

ハマっててまだできていません。
Dataflow は java と python が用意されているのですが
python は足りていない機能があるようなのと、 java はコードの書き方が独特すぎて難しかったので Scio を使おうと思っています。
このあたりの理由はこちらとかを参考に

5
5
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
5
5

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?