大量データをストリーミングインサートすると時間もお金もかかるのでデータの読み込みをするのがおすすめ。
BigQueryへのデータの読み込みの概要
https://cloud.google.com/bigquery/docs/loading-data
BigQueryにデータを読み込むための準備
Laravelで作ってますが、storageの部分を書き換えれば他でも使えると思います。
$ composer require google/cloud-bigquery
'bigquery' => [
'driver' => 'local',
'root' => storage_path('bigquery'),
],
※ログの出力先はBigQuery専用で何か作るとよいかも。BigQueryクラス(長いので折り畳み)
<?php
use Carbon\Carbon;
use \Exception;
use Google\Cloud\BigQuery\BigQueryClient;
use Google\Cloud\BigQuery\QueryResults;
use Google\Cloud\BigQuery\Table;
use Google\Cloud\Core\ExponentialBackoff;
use Illuminate\Support\Facades\Storage;
/**
* Class BigQuery
*/
class BigQuery
{
/**
* @var BigQueryClient
*/
protected $bq;
/**
* @var string
*/
protected $dataset;
/**
* @var int
*/
protected $backoff_retries = 10;
/**
* @var int
*/
protected $backoff_exception_code = 500;
/**
* @var int
*/
protected $bigquery_printing_unit_price = 5; // US
/**
* @var int
*/
protected $bigquery_printing_unit_byte = 1 * 1024 * 1024 * 1024 * 1024;
/**
* @var bool
*/
protected $log = false;
/**
* BigQuery constructor.
*/
public function __construct()
{
$this->bq = new BigQueryClient([
'keyFilePath' => config('database.bigquery_json_file')
]);
$this->setDefaultDataset();
}
/**
* @param string $dataset
* @return $this
*/
public function setDataset($dataset)
{
if ($this->dataset !== $dataset) {
$this->dataset = $dataset;
}
return $this;
}
/**
* @return $this
*/
public function setDefaultDataset()
{
return $this->setDataset(config('database.bigquery_dataset'));
}
/**
* @param boolean $flag
*/
public function setOutputLog($flag = true)
{
$this->log = $flag;
}
/**
* @return string
*/
public function getDataset()
{
return $this->dataset;
}
/**
* @param string $start_date
* @param string $end_date
* @return array
*/
public function createTableSuffixParam($start_date, $end_date)
{
$s_date = Carbon::parse($start_date);
$t_date = Carbon::parse($end_date);
$tmp_s_date = explode('-', $s_date->toDateString());
$tmp_e_date = explode('-', $t_date->toDateString());
$suffix_check = true;
$suffix_table_name = '';
$suffix_start_table_name = '';
$suffix_end_table_name = '';
foreach ($tmp_s_date as $k => $s) {
if ($tmp_e_date[$k] == $s && $suffix_check) {
$suffix_table_name .= $s;
} else {
$suffix_check = false;
$suffix_start_table_name .= $s;
$suffix_end_table_name .= $tmp_e_date[$k];
}
}
return [
'suffix_flag' => ($suffix_check ? false : true),
'suffix_table_name' => $suffix_table_name,
'suffix_start_table_name' => $suffix_start_table_name,
'suffix_end_table_name' => $suffix_end_table_name,
];
}
/**
* @param string $query
* @param boolean $dry_run
* @return QueryResults
* @throws Exception
*/
public function execQuery($query, $dry_run = false)
{
$start_time = microtime(true);
$query_job_config = $this->bq->query($query)
->useLegacySql(false)
->dryRun($dry_run);
$query_job = $this->bq->startQuery($query_job_config);
if ($dry_run === false) {
$bf = new ExponentialBackoff($this->backoff_retries);
$bf->execute(function () use (&$query_job) {
$query_job->reload();
if ($query_job->isComplete() === false) {
throw new Exception('', $this->backoff_exception_code);
}
});
}
$query_info = $query_job->info();
$this->outputLog($query_info, [
'start_time' => $start_time,
]);
return $dry_run ? null : $query_job->queryResults();
}
/**
* @param string $format
* @return string
* @throws Exception
*/
public function getSourceFormat($format = 'CSV')
{
if (strtoupper($format) === 'CSV') {
$source_format = 'CSV';
} elseif (strtoupper($format) === 'JSON') {
$source_format = 'NEWLINE_DELIMITED_JSON';
} else {
throw new \InvalidArgumentException('Data format unknown. JSON or CSV');
}
return $source_format;
}
/**
* @param string $table_id
* @param string $data
* @param string $format
* @return boolean
* @throws Exception
*/
public function importData($table_id, $data, $format = 'CSV', $schema = null)
{
$file = 'import_' . time() . '.' . strtolower($format);
if (!Storage::disk('bigquery')->put($file, $data)) {
Storage::disk('bigquery')->delete($file);
throw new Exception('Error create ' . strtolower($format) . ' file.');
}
$this->importFile($table_id, $file, $format, $schema);
Storage::disk('bigquery')->delete($file);
return true;
}
/**
* @param string $table_id
* @param string $file
* @param string $format
* @param string $schema
* @return boolean
* @throws Exception
*/
public function importFile($table_id, $file, $format = 'CSV', $schema = null)
{
$start_time = microtime(true);
$options = [];
$source_format = $this->getSourceFormat($format);
$dataset = $this->bq->dataset($this->dataset);
$table = $dataset->table($table_id);
$loadConfig = $table->load(fopen(storage_path('bigquery') . '/' . $file, 'r'), $options);
$loadConfig->sourceFormat($source_format);
if (strtoupper($format) === 'JSON') {
if (is_null($schema)) {
$loadConfig->autodetect(true);
} else {
$str_schema = json_decode(Storage::disk('bigquery')->get('schema/' . $schema . '.json'), true);
$loadConfig->schema(['fields' => $str_schema]);
}
}
$loadConfig->writeDisposition('WRITE_APPEND');
$job = $table->runJob($loadConfig);
$bf = new ExponentialBackoff($this->backoff_retries);
$bf->execute(function () use ($job, $file) {
$job->reload();
if (!$job->isComplete()) {
throw new Exception('', $this->backoff_exception_code);
}
});
$info = $job->info();
$this->outputLog($info, [
'start_time' => $start_time,
]);
$error = $info['status']['errorResult']['message'] ?? '';
if (!empty($error)) {
throw new Exception(sprintf('Error running job: %s' . PHP_EOL, $error));
}
return true;
}
/**
* @param string $table_id
* @param array $schema
* @return Table
* @throws Exception
*/
public function createTable($table_id, $schema)
{
$options = ['schema' => ['fields' => $schema]];
try {
$dataset = $this->bq->dataset($this->dataset);
} catch (Exception $e) {
throw new Exception(sprintf('Error create table: table_id=%s schema=%s', $table_id, str_replace(["\r\n", "\r", "\n"], '', $schema)));
}
return $dataset->createTable($table_id, $options);
}
/**
* @param array $info
* @param array $options
* @return void
*/
private function outputLog($info, $options)
{
if ($this->log) {
$query = $info['configuration']['query']['query'] ?? '';
$info_statistics = $info['statistics'] ?? [];
$info_query = $info_statistics['query'] ?? [];
$printing = (($info_query['totalBytesBilled'] ?? 0) > 0 ? round($info_query['totalBytesBilled'] / $this->bigquery_printing_unit_byte * $info_query['billingTier'] * $this->bigquery_printing_unit_price, 4) : 0);
$exec_time = (($options['start_time'] ?? 0) > 0 ? round(microtime(true) - $options['start_time'], 3) : null);
\Log::debug($query, [
'backoff_retries' => $this->backoff_retries,
'statistics' => [
'creationTime(ms)' => $info_statistics['creationTime'] ?? null,
'startTime(ms)' => $info_statistics['startTime'] ?? null,
'endTime(ms)' => $info_statistics['endTime'] ?? null,
'timeline' => $info_query['timeline'] ?? null,
'totalBytesProcessed' => $info_query['totalBytesProcessed'] ?? null,
'totalBytesBilled' => $info_query['totalBytesBilled'] ?? null,
'totalSlotMs' => $info_query['totalSlotMs'] ?? null,
'billingTier' => $info_query['billingTier'] ?? null,
'printing($.4f)' => $printing,
'cacheHit' => $info_query['cacheHit'] ?? null,
],
'query_time(ms)' => round(($info_statistics['endTime'] ?? 0) - ($info_statistics['startTime'] ?? 0), 3),
'exec_time(s)' => $exec_time,
]);
}
}
}
DBデータをBigQueryにデータを読み込む例
CREATE TABLE `batch_bigquery_import_log` (
`id` bigint(20) unsigned NOT NULL AUTO_INCREMENT,
`date` date NOT NULL COMMENT '日付',
`dataset` varchar(255) COLLATE utf8mb4_unicode_ci NOT NULL COMMENT 'データセット名',
`table_name` varchar(255) COLLATE utf8mb4_unicode_ci NOT NULL COMMENT 'テーブル名',
`created_at` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP,
`updated_at` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
PRIMARY KEY (`id`),
UNIQUE KEY `batch_bigquery_import_log_date_dataset_table_name_unique` (`date`,`dataset`,`table_name`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci
CREATE TABLE `user_geo` (
`id` bigint(20) unsigned NOT NULL AUTO_INCREMENT,
`user_id` bigint(20) unsigned NOT NULL COMMENT 'ユーザーID',
`ip` int(10) unsigned NOT NULL COMMENT 'IPアドレス',
`country_id` int(10) unsigned NOT NULL COMMENT '国ID',
`country_code` varchar(3) COLLATE utf8mb4_unicode_ci NOT NULL COMMENT '国コード',
PRIMARY KEY (`id`),
UNIQUE KEY `unique_id_index` (`user_id`),
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci
[
{
"name": "date",
"type": "TIMESTAMP",
"mode": "REQUIRED"
},
{
"name": "user_id",
"type": "INTEGER",
"mode": "REQUIRED"
},
{
"name": "ip_addr",
"type": "STRING",
"mode": "REQUIRED"
},
{
"name": "country_id",
"type": "INTEGER",
"mode": "REQUIRED"
},
{
"name": "country_code",
"type": "STRING",
"mode": "REQUIRED"
}
]
$date = Carbon::now()->toDateString();
$bigquery_timestamp = Carbon::parse($date, 'UTC')->hour(0)->minute(0)->second(0)->getTimestamp();
$bq = new BigQuery();
$bq->setOutputLog();
$bq_table = 'user_geo';
if (BatchBigqueryImportLog::where('date', $date)
->where('dataset', $bq->getDataset())
->where('table_name', $bq_table)
->exists() === false
) {
$user_geo = UserGeo::get();
if (!empty($user_geo)) {
$import_csv = '';
foreach ($user_geo as $user) {
$row = [
'date' => $bigquery_timestamp,
'user_id' => $user['user_id'],
'ip_addr' => $user['ip_addr'],
'country_id' => $user['country_id'],
'country_code' => $user['country_code'],
];
$import_csv .= implode(',', $row) . PHP_EOL;
unset($row);
}
$bq->importData($bq_table, $import_csv);
$data = [
'date' => $date,
'dataset' => $bq->getDataset(),
'table_name' => $bq_table,
];
BatchBigqueryImportLog::updateOrInsert($data, $data);
unset($import_csv, $data, $user_geo);
}
}
この構成で500万行のデータをデータ読み込むのに3分ぐらいでした。
今回はCSVでの例でしたがスキーマ構成が変わった時に面倒なのでデータ形式はJSONをおすすめします。
(BigQueryクラスはJSONでも読み込みできるようにしてある)
BigQueryにデータを読み込んでから集計するみたいなバッチも実際の運用で使えると思います。
MySQL上でいろいろJOINしながら集計するのが重くてメモリも足りないという状況で
この方法を使ったら快適になりました。
(次は最初からBigQueryを導入したい!)