LoginSignup
14
7

More than 5 years have passed since last update.

PHPでBigQueryにデータを読み込む(jobs-load)

Last updated at Posted at 2018-10-04

大量データをストリーミングインサートすると時間もお金もかかるのでデータの読み込みをするのがおすすめ。

BigQueryへのデータの読み込みの概要
https://cloud.google.com/bigquery/docs/loading-data

BigQueryにデータを読み込むための準備

Laravelで作ってますが、storageの部分を書き換えれば他でも使えると思います。

BigQueryクライアントライブラリをインストール
$ composer require google/cloud-bigquery
config/filesystems.php(Laravelのstorageにbigqueryを追加)
'bigquery' => [
    'driver' => 'local',
    'root' => storage_path('bigquery'),
],

BigQueryクラス(長いので折り畳み)
BigQuery.php
<?php
use Carbon\Carbon;
use \Exception;
use Google\Cloud\BigQuery\BigQueryClient;
use Google\Cloud\BigQuery\QueryResults;
use Google\Cloud\BigQuery\Table;
use Google\Cloud\Core\ExponentialBackoff;
use Illuminate\Support\Facades\Storage;

/**
 * Class BigQuery
 */
class BigQuery
{
    /**
     * @var BigQueryClient
     */
    protected $bq;
    /**
     * @var string
     */
    protected $dataset;
    /**
     * @var int
     */
    protected $backoff_retries = 10;
    /**
     * @var int
     */
    protected $backoff_exception_code = 500;
    /**
     * @var int
     */
    protected $bigquery_printing_unit_price = 5;    // US
    /**
     * @var int
     */
    protected $bigquery_printing_unit_byte = 1 * 1024 * 1024 * 1024 * 1024;
    /**
     * @var bool
     */
    protected $log = false;

    /**
     * BigQuery constructor.
     */
    public function __construct()
    {
        $this->bq = new BigQueryClient([
            'keyFilePath' => config('database.bigquery_json_file')
        ]);
        $this->setDefaultDataset();
    }

    /**
     * @param string $dataset
     * @return $this
     */
    public function setDataset($dataset)
    {
        if ($this->dataset !== $dataset) {
            $this->dataset = $dataset;
        }
        return $this;
    }

    /**
     * @return $this
     */
    public function setDefaultDataset()
    {
        return $this->setDataset(config('database.bigquery_dataset'));
    }

    /**
     * @param boolean $flag
     */
    public function setOutputLog($flag = true)
    {
        $this->log = $flag;
    }

    /**
     * @return string
     */
    public function getDataset()
    {
        return $this->dataset;
    }

    /**
     * @param string $start_date
     * @param string $end_date
     * @return array
     */
    public function createTableSuffixParam($start_date, $end_date)
    {
        $s_date = Carbon::parse($start_date);
        $t_date = Carbon::parse($end_date);
        $tmp_s_date = explode('-', $s_date->toDateString());
        $tmp_e_date = explode('-', $t_date->toDateString());
        $suffix_check = true;
        $suffix_table_name = '';
        $suffix_start_table_name = '';
        $suffix_end_table_name = '';
        foreach ($tmp_s_date as $k => $s) {
            if ($tmp_e_date[$k] == $s && $suffix_check) {
                $suffix_table_name .= $s;
            } else {
                $suffix_check = false;
                $suffix_start_table_name .= $s;
                $suffix_end_table_name .= $tmp_e_date[$k];
            }
        }
        return [
            'suffix_flag' => ($suffix_check ? false : true),
            'suffix_table_name' => $suffix_table_name,
            'suffix_start_table_name' => $suffix_start_table_name,
            'suffix_end_table_name' => $suffix_end_table_name,
        ];
    }

    /**
     * @param string $query
     * @param boolean $dry_run
     * @return QueryResults
     * @throws Exception
     */
    public function execQuery($query, $dry_run = false)
    {
        $start_time = microtime(true);
        $query_job_config = $this->bq->query($query)
            ->useLegacySql(false)
            ->dryRun($dry_run);
        $query_job = $this->bq->startQuery($query_job_config);
        if ($dry_run === false) {
            $bf = new ExponentialBackoff($this->backoff_retries);
            $bf->execute(function () use (&$query_job) {
                $query_job->reload();
                if ($query_job->isComplete() === false) {
                    throw new Exception('', $this->backoff_exception_code);
                }
            });
        }
        $query_info = $query_job->info();
        $this->outputLog($query_info, [
            'start_time' => $start_time,
        ]);
        return $dry_run ? null : $query_job->queryResults();
    }

    /**
     * @param string $format
     * @return string
     * @throws Exception
     */
    public function getSourceFormat($format = 'CSV')
    {
        if (strtoupper($format) === 'CSV') {
            $source_format = 'CSV';
        } elseif (strtoupper($format) === 'JSON') {
            $source_format = 'NEWLINE_DELIMITED_JSON';
        } else {
            throw new \InvalidArgumentException('Data format unknown. JSON or CSV');
        }
        return $source_format;
    }

    /**
     * @param string $table_id
     * @param string $data
     * @param string $format
     * @return boolean
     * @throws Exception
     */
    public function importData($table_id, $data, $format = 'CSV', $schema = null)
    {
        $file = 'import_' . time() . '.' . strtolower($format);
        if (!Storage::disk('bigquery')->put($file, $data)) {
            Storage::disk('bigquery')->delete($file);
            throw new Exception('Error create ' . strtolower($format) . ' file.');
        }
        $this->importFile($table_id, $file, $format, $schema);
        Storage::disk('bigquery')->delete($file);
        return true;
    }

    /**
     * @param string $table_id
     * @param string $file
     * @param string $format
     * @param string $schema
     * @return boolean
     * @throws Exception
     */
    public function importFile($table_id, $file, $format = 'CSV', $schema = null)
    {
        $start_time = microtime(true);
        $options = [];
        $source_format = $this->getSourceFormat($format);
        $dataset = $this->bq->dataset($this->dataset);
        $table = $dataset->table($table_id);
        $loadConfig = $table->load(fopen(storage_path('bigquery') . '/' . $file, 'r'), $options);
        $loadConfig->sourceFormat($source_format);
        if (strtoupper($format) === 'JSON') {
            if (is_null($schema)) {
                $loadConfig->autodetect(true);
            } else {
                $str_schema = json_decode(Storage::disk('bigquery')->get('schema/' . $schema . '.json'), true);
                $loadConfig->schema(['fields' => $str_schema]);
            }
        }
        $loadConfig->writeDisposition('WRITE_APPEND');
        $job = $table->runJob($loadConfig);
        $bf = new ExponentialBackoff($this->backoff_retries);
        $bf->execute(function () use ($job, $file) {
            $job->reload();
            if (!$job->isComplete()) {
                throw new Exception('', $this->backoff_exception_code);
            }
        });
        $info = $job->info();
        $this->outputLog($info, [
            'start_time' => $start_time,
        ]);
        $error = $info['status']['errorResult']['message'] ?? '';
        if (!empty($error)) {
            throw new Exception(sprintf('Error running job: %s' . PHP_EOL, $error));
        }
        return true;
    }

    /**
     * @param string $table_id
     * @param array $schema
     * @return Table
     * @throws Exception
     */
    public function createTable($table_id, $schema)
    {
        $options = ['schema' => ['fields' => $schema]];
        try {
            $dataset = $this->bq->dataset($this->dataset);
        } catch (Exception $e) {
            throw new Exception(sprintf('Error create table: table_id=%s schema=%s', $table_id, str_replace(["\r\n", "\r", "\n"], '', $schema)));
        }
        return $dataset->createTable($table_id, $options);
    }

    /**
     * @param array $info
     * @param array $options
     * @return void
     */
    private function outputLog($info, $options)
    {
        if ($this->log) {
            $query = $info['configuration']['query']['query'] ?? '';
            $info_statistics = $info['statistics'] ?? [];
            $info_query = $info_statistics['query'] ?? [];
            $printing = (($info_query['totalBytesBilled'] ?? 0) > 0 ? round($info_query['totalBytesBilled'] / $this->bigquery_printing_unit_byte * $info_query['billingTier'] * $this->bigquery_printing_unit_price, 4) : 0);
            $exec_time = (($options['start_time'] ?? 0) > 0 ? round(microtime(true) - $options['start_time'], 3) : null);
            \Log::debug($query, [
                'backoff_retries' => $this->backoff_retries,
                'statistics' => [
                    'creationTime(ms)' => $info_statistics['creationTime'] ?? null,
                    'startTime(ms)' => $info_statistics['startTime'] ?? null,
                    'endTime(ms)' => $info_statistics['endTime'] ?? null,
                    'timeline' => $info_query['timeline'] ?? null,
                    'totalBytesProcessed' => $info_query['totalBytesProcessed'] ?? null,
                    'totalBytesBilled' => $info_query['totalBytesBilled'] ?? null,
                    'totalSlotMs' => $info_query['totalSlotMs'] ?? null,
                    'billingTier' => $info_query['billingTier'] ?? null,
                    'printing($.4f)' => $printing,
                    'cacheHit' => $info_query['cacheHit'] ?? null,
                ],
                'query_time(ms)' => round(($info_statistics['endTime'] ?? 0) - ($info_statistics['startTime'] ?? 0), 3),
                'exec_time(s)' => $exec_time,
            ]);
        }
    }
}

※ログの出力先はBigQuery専用で何か作るとよいかも。

DBデータをBigQueryにデータを読み込む例

データ読み込みを管理するDBテーブル(MySQL-DDL)
CREATE TABLE `batch_bigquery_import_log` (
  `id` bigint(20) unsigned NOT NULL AUTO_INCREMENT,
  `date` date NOT NULL COMMENT '日付',
  `dataset` varchar(255) COLLATE utf8mb4_unicode_ci NOT NULL COMMENT 'データセット名',
  `table_name` varchar(255) COLLATE utf8mb4_unicode_ci NOT NULL COMMENT 'テーブル名',
  `created_at` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP,
  `updated_at` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
  PRIMARY KEY (`id`),
  UNIQUE KEY `batch_bigquery_import_log_date_dataset_table_name_unique` (`date`,`dataset`,`table_name`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci
データ読み込み対象DBテーブル(MySQL-DDL)
CREATE TABLE `user_geo` (
  `id` bigint(20) unsigned NOT NULL AUTO_INCREMENT,
  `user_id` bigint(20) unsigned NOT NULL COMMENT 'ユーザーID',
  `ip` int(10) unsigned NOT NULL COMMENT 'IPアドレス',
  `country_id` int(10) unsigned NOT NULL COMMENT '国ID',
  `country_code` varchar(3) COLLATE utf8mb4_unicode_ci NOT NULL COMMENT '国コード',
  PRIMARY KEY (`id`),
  UNIQUE KEY `unique_id_index` (`user_id`),
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci
データ読み込み先のスキーマ構成(user_geo)
[
    {
        "name": "date",
        "type": "TIMESTAMP",
        "mode": "REQUIRED"
    },
    {
        "name": "user_id",
        "type": "INTEGER",
        "mode": "REQUIRED"
    },
    {
        "name": "ip_addr",
        "type": "STRING",
        "mode": "REQUIRED"
    },
    {
        "name": "country_id",
        "type": "INTEGER",
        "mode": "REQUIRED"
    },
    {
        "name": "country_code",
        "type": "STRING",
        "mode": "REQUIRED"
    }
]
DBデータからCSVファイルを作成して、データ読み込み(jobs-load)する。※EloquentORMで必要なものは解説省略
$date = Carbon::now()->toDateString();
$bigquery_timestamp = Carbon::parse($date, 'UTC')->hour(0)->minute(0)->second(0)->getTimestamp();

$bq = new BigQuery();
$bq->setOutputLog();
$bq_table = 'user_geo';
if (BatchBigqueryImportLog::where('date', $date)
        ->where('dataset', $bq->getDataset())
        ->where('table_name', $bq_table)
        ->exists() === false
) {
    $user_geo = UserGeo::get();
    if (!empty($user_geo)) {
        $import_csv = '';
        foreach ($user_geo as $user) {
            $row = [
                'date' => $bigquery_timestamp,
                'user_id' => $user['user_id'],
                'ip_addr' => $user['ip_addr'],
                'country_id' => $user['country_id'],
                'country_code' => $user['country_code'],
            ];
            $import_csv .= implode(',', $row) . PHP_EOL;
            unset($row);
        }
        $bq->importData($bq_table, $import_csv);
        $data = [
            'date' => $date,
            'dataset' => $bq->getDataset(),
            'table_name' => $bq_table,
        ];
        BatchBigqueryImportLog::updateOrInsert($data, $data);
        unset($import_csv, $data, $user_geo);
    }
}

この構成で500万行のデータをデータ読み込むのに3分ぐらいでした。
今回はCSVでの例でしたがスキーマ構成が変わった時に面倒なのでデータ形式はJSONをおすすめします。
(BigQueryクラスはJSONでも読み込みできるようにしてある)

BigQueryにデータを読み込んでから集計するみたいなバッチも実際の運用で使えると思います。
MySQL上でいろいろJOINしながら集計するのが重くてメモリも足りないという状況で
この方法を使ったら快適になりました。
(次は最初からBigQueryを導入したい!)

14
7
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
14
7