概要
前回の①システム構成編ではMetabaseの直接データベース接続によるアクセス権の問題を解決するシステム構成、②ビュー作成処理編では項目の表示名による分析しやすいビューの作成について説明しました。
今回は、本番環境からデータ分析用環境への継続的なデータ同期の仕組みを解説します。バックグラウンドサーバスクリプトとデータ同期管理テーブルを活用した実装方法をご紹介します。
シリーズ構成
- ①システム構成編:全体構成とアーキテクチャの紹介
- ②ビュー作成処理編:PowerShellとタスクスケジューラによるビュー作成処理
- ③データ同期処理編(本記事):データ同期処理の仕組みとバックグラウンドサーバスクリプト
データ同期が必要な理由
第1回で説明したように、セキュリティを確保するために本番環境とデータ分析用環境を分離しましたが、継続的なデータ同期という新たな課題が生まれました。
データ同期の要件
セキュリティを確保しながら効率的なデータ同期を実現するため、以下の要件を満たす必要があります。
- セキュリティ:初回データの手動移行でアクセス権を考慮
- 自動化:日々の同期は自動で実行
- 効率性:差分同期による負荷軽減
- 監視:エラー検知と通知の仕組み
データ移行/同期の全体像
データ分析用環境へのデータ移行は、初回の手動移行と継続的な自動同期の2段階で構成されています。
- 初回データ移行:サイトパッケージとエクスポート/インポート機能による手動移行
- 更新データ同期:バックグラウンドサーバスクリプトによる自動同期
データ同期管理テーブルの構築
データ同期管理テーブルへの登録
初回データ移行完了後、継続的なデータ同期のため以下の設定を行います。
- インポートしたテーブルを同期対象としてデータ同期管理テーブルへ登録
- 本番環境の対象テーブルに更新データ同期用APIユーザの読取権限を追加
データ同期管理テーブルの設計
データ同期管理テーブルでは、本番環境からの同期対象テーブルを一元管理し、各テーブルの同期設定、スケジュール、オプション項目のマッピングなどを管理できる仕組みを提供しています。
データ同期管理テーブル:一覧画面
データ同期管理テーブル:編集画面
基本的にはバックグラウンドサーバスクリプトで指定したタイミングでの更新データ同期処理が行われますが、一覧画面と編集画面の「データ同期」ボタンを押すことで即時にデータ同期ができる手動実行機能もサーバスクリプトで実装しています。
基本設定
データ同期管理テーブルでは、各同期対象テーブルに対して以下の設定を行います。これらの設定により、テーブルごとに最適な同期方法を選択できます。
- 同期元サイトID:本番環境側(青)の対象サイトID
- 同期先サイトID:データ分析環境側(緑)の対象サイトID
- 前回同期日時:初回データを手動で移行した日時(この日時以降の更新データが同期対象)
- 同期方法:「毎日」「毎週」「毎月」から選択
- レコード:「Upsert」「Create」から選択
- Upsertのキー項目:レコード更新時のキー項目(複数指定可能)
- 項目値の種別:「表示名」「値」から選択
編集画面のガイドの設定内容
ご参考までに、編集画面のガイドに設定している内容です。
状況:「未着手」「準備」「運用中」「運用終了」「保留」から選択します。(既定値「運用中」から変更不要です)
対象サイト名:対象サイト名を入力してください。
同期元サイトID:同期元となる [本番環境] 側(青)の対象サイトIDを入力してください。
同期先サイトID:同期先となるデータ分析環境側(緑)の対象サイトIDを入力してください。
前回同期日時:初回データを手動で移行した日時を入力してください。(前回同期日時以降の更新データを対象に同期処理が行われます)
担当者:担当者(ご自身)のユーザ名を選択してください。
無効:同期を停止したい場合はチェックをオンにしてください。
MetabaseのURL:Metabase側のビューのURLを入力してください。
同期方法:
タイミング:「毎日」「毎週」「毎月」から同期したいタイミングを選択してください。
レコード:「[Upsert](https://pleasanter.org/ja/manual/server-script-items-upsert)」「[Create](https://pleasanter.org/ja/manual/server-script-items-create)」から選択してください。(詳細はユーザマニュアルをご参照ください)
Upsertのキー項目:レコード更新時のキーとなる項目を指定してください。(複数の項目を指定可能です)
項目値の種別:「表示名」「値」から選択してください。(詳細はユーザマニュアル「[ApiColumnValueDisplayTypeの指定方法](https://pleasanter.org/ja/manual/api-view)」をご参照ください)
同期オプション:
同期したい場合はチェックを入れて同期先で使用する項目(空き項目)を選択してください。
備考:必要に応じて入力してください。
レコードIDによるUpsert処理の課題と解決策
プリザンターの環境間データ同期において、レコードIDを直接Upsertのキー項目として使用することはできません。これは、本番環境とデータ分析用環境でレコードIDが自動採番により異なる値が割り当てられるためです。
課題
- 本番環境のレコードID「123」がデータ分析用環境では「456」として作成される
- レコードIDをキーとしたUpsert処理では正しい更新対象を特定できない
- 結果として重複レコードが作成されてしまう
解決策
この課題を解決するため、構築したシステムでは以下の工夫をしています。
- オプション項目への本番環境レコードID保存
- 本番環境のレコードIDを数値Zなどの空き項目に保存
- この項目をUpsertのキー項目として設定
- Upsertキー項目の活用
- 数値Zなどに保存された本番環境レコードIDをキーとしてUpsert実行
- 同一レコードの正確な特定と更新が可能
- 柔軟なキー設定
- 複数項目の組み合わせによるキー設定も可能
- テーブルの特性に応じた最適なキー選択
この仕組みにより、環境間でのレコードIDの相違問題を解決し、正確なデータ同期を実現しています。
初回データ移行時に、本番環境のレコードIDを数値Zなどの空き項目にインポートしておく必要があります。
セキュリティ要件
安全なデータ同期を実現するため、以下のセキュリティ要件を設定しています。
- 本番環境の対象テーブルに更新データ同期用APIユーザの読取権限が必要
- 1回あたりの同期レコード件数は1,000件に制限
- 処理タイムアウトは10分に設定
同期スケジュール
バックグラウンドサーバスクリプトにより、以下のスケジュールで自動実行されます。テーブルごとに更新頻度を設定できるため、データの特性に応じた最適な同期が可能です。
- 毎日:01:00
- 毎週:日曜日 02:00
- 毎月:1日 03:00
エラー監視と通知
同期処理の安定性を確保するため、包括的な監視・通知機能を実装しています。同期処理でエラーが発生した場合の対応は以下のとおりです。
- 実行ログテーブルに詳細なログが記録
- メール通知で担当者に自動送信
- 1,000件制限を超える場合は処理を中断して通知
スクリプト関連の設定
テーブルの管理画面より、以下の設定を行います。
スタイル
ボタンや項目の表示スタイルを設定します。
対象サイトIDの背景色を変更する.css(出力先:一覧)
.my-blue {
background-color: #deecf9;
}
.my-green {
background-color: #e8f9f3;
}
一覧画面のボタン色を設定する.css(出力先:一覧)
#Grid > tbody .ui-button {
border: 1px solid #d19405;
background: #fece2f;
}
URLの短縮表示.css(出力先:一覧)
#Grid > tbody > tr > td:nth-child(12) {
max-width: 150px; /* 適当な幅を設定 */
white-space: nowrap; /* 改行を防ぐ */
overflow: hidden; /* はみ出した部分を隠す */
text-overflow: ellipsis; /* 省略記号 (...) を表示 */
}
スクリプト
クライアントサイドでの入力項目の動的制御やユーザーインターフェース処理を設定します。
ユーザ入力操作により項目を読取専用にする.js(出力先:新規作成、編集)
// オプション項目のチェックボックスと同期先の項目
const checkMappings = {
'CheckB': 'ClassF', // レコードID
'CheckC': 'ClassG', // バージョン
'CheckD': 'ClassH', // 状況
'CheckE': 'ClassI', // 管理者
'CheckF': 'ClassJ', // 担当者
'CheckG': 'ClassK', // コメント
'CheckH': 'ClassL', // 作成者
'CheckI': 'ClassM', // 更新者
'CheckJ': 'ClassN', // 作成日時
'CheckK': 'ClassO' // 更新日時
};
// 同期方法:Upsertのキー項目を読取専用にする関数
function controlReadonlyClassD() {
const isUpsert = $p.getControl('ClassE').val() === 'Upsert'; // 同期方法
$p.getControl('ClassD').multiselect(isUpsert ? 'enable' : 'disable'); // 同期方法:Upsertのキー項目
}
// オプション項目を読取専用にする関数
function controlReadonlyOption(controlName, targetName) {
const isChecked = $p.getControl(controlName)[0].checked;
$p.getControl(targetName).prop('disabled', !isChecked)
.css('background', isChecked ? '#ffffff' : '#f5f5f5');
}
// 同期方法を変更したときのイベントを登録
$(document).on('change', '#Results_ClassE', function () {
controlReadonlyClassD();
});
// 一括でイベントを登録
Object.keys(checkMappings).forEach(checkId => {
$(document).on('change', `#Results_${checkId}`, function () {
controlReadonlyOption(checkId, checkMappings[checkId]);
});
});
// 編集画面を読み込んだときに実行するメソッド
$p.events.on_editor_load = function () {
setTimeout(() => {
controlReadonlyClassD();
// チェックボックスの状態に応じて対象を設定
Object.keys(checkMappings).forEach(checkId => {
controlReadonlyOption(checkId, checkMappings[checkId]);
});
}, 100); // 100ms 待つ(適宜調整)
};
サーバスクリプト
手動実行処理、ボタン表示、色分け表示のサーバサイド処理を設定します。
共通処理:インプリムテナントからのデータ同期.js(条件:画面表示の前)
// バックグラウンドサーバスクリプトの同名処理と同じ内容
対象サイトIDの背景色を変更する.js(条件:行表示の前)
columns.ClassA.ExtendedCellCss = 'my-blue'; // 対象サイトID(同期元)
columns.ClassB.ExtendedCellCss = 'my-green'; // 対象サイトID(同期先)
一覧画面の行単位のボタン表示.js(条件:行表示の前)
columns.ClassZ.RawText = `<button
id="MySync-${model.ResultId}"
class="button-icon my-row-button"
data-icon="ui-icon-circle-triangle-e"
data-method="post"
onclick="event.stopImmediatePropagation();$p.send($(this));">
データ同期
</button>`;
一覧画面のデータ同期ボタンから実行された場合の処理.js(条件:画面表示の前)
try {
context.Log('context.ControlId:' + context.ControlId);
// 編集画面の「データ同期」ボタンから実行された場合
if (context.ControlId === 'Process_1') {
CommonSyncFromProduction.executeScript('Manual', model.ResultId);
}
// 一覧画面の「データ同期」ボタンから実行された場合
else if (context.ControlId.startsWith("MySync")) {
CommonSyncFromProduction.executeScript('Manual', context.ControlId.split("-")[1]);
}
} catch (e) {
context.Log(e.stack);
}
バックグラウンドサーバスクリプトによる実装
同期処理の流れ
バックグラウンドサーバスクリプトによるデータ同期は、以下の流れで実行されます。このプロセスにより、差分データのみを効率的に同期し、システム負荷を最小限に抑えています。
- 同期対象の確認:データ同期管理テーブルから有効な設定を取得
- データ取得:本番環境から前回同期日時以降の更新データを取得
- データ処理:UpsertまたはCreateで同期先へ反映
- ログ記録:同期結果とエラー情報を記録
- 日時更新:最終同期日時を更新
スクリプト構成
データ同期処理は、バックグラウンドサーバスクリプトとデータ同期管理テーブルの設定により実装されています。この組み合わせにより、自動化された安定したデータ同期を実現しています。
バックグラウンドサーバスクリプト(テナント全体設定)
各同期スケジュールに対応したバックグラウンドサーバスクリプトです。
共通処理:本番環境からのデータ同期.js(共有にチェック)
class CommonSyncFromProduction {
static writeLog(logType, scheduleType, message) {
const logSiteId = 9999; //ログ記録用テーブルのサイトID
let item = items.NewResult();
item.Title = 'SyncFromProduction-' + scheduleType;
item.ClassA = logType;
item.Body = message;
items.Create(logSiteId, item);
}
static executeScript(scheduleType, RecordId) {
this.writeLog('Info', scheduleType, '同期処理を開始しました。');
const syncManageSiteId = 9999; //データ同期管理テーブルのサイトID
try {
// 1. 本環境のデータ同期管理テーブルからレコード情報を取得
let data1 = {
"View": {
"ColumnFilterHash": {
"CheckA": false // 無効チェック:オフ
}
}
};
if (scheduleType === 'Manual') {
data1.View.ColumnFilterHash.ResultId = RecordId; // ボタンをクリックした行のレコード
} else {
data1.View.ColumnFilterHash.ClassC = `["${scheduleType}"]`; // 同期方法:タイミング
}
let targetSites = items.Get(syncManageSiteId, JSON.stringify(data1));
// 2. 本番環境から対象サイトのレコード情報を取得
let response = '';
for (let targetSite of targetSites) {
let offset = 0;
const pageSize = 200;
let ymdhms = formatDateTime(targetSite.DateA);
let data2 = {
"ApiVersion": 1.1,
"ApiKey": '****',
"Offset": offset,
"View": {
"ApiDataType": "KeyValues",
"ApiColumnKeyDisplayType": "ColumnName",
"ApiColumnValueDisplayType": targetSite.ClassQ, // 同期方法:項目値の種別
"GridColumns": getGridColumns(),
"ColumnFilterHash": {
"UpdatedTime": "[\"" + ymdhms + ",\"]" // 前回同期日時
}
}
}
let mySiteId = targetSite.ClassA; // 対象サイトID(同期元)
httpClient.RequestUri = 'https://xxxx/api/items/' + mySiteId + '/get';
// 更新結果フラグ
let isUpdateSuccess = true;
// データを1000件まで取得するためのループ処理
while (true) {
httpClient.Content = JSON.stringify(data2);
response = httpClient.Post();
// 3. 本環境の対象サイトにレコード情報を更新
let syncRecords = JSON.parse(response).Response.Data;
// 結果が空であれば処理を終了
if (syncRecords.length === 0) {
break;
}
let formatSyncRecords = formatDataToHash(syncRecords);
syncRecords.forEach(function (record) {
record.Keys = JSON.parse(targetSite.ClassD); // 同期方法:Upsertのキー項目
// オプション項目:同期先の項目へ設定
setOptionValue(record, targetSite.CheckB, targetSite.ClassF, "NumHash", record.IssueId ?? record.ResultId); // レコードID
setOptionValue(record, targetSite.CheckC, targetSite.ClassG, "NumHash", record.Ver); // バージョン
setOptionValue(record, targetSite.CheckD, targetSite.ClassH, "ClassHash", record.Status); // 状況
setOptionValue(record, targetSite.CheckE, targetSite.ClassI, "ClassHash", record.Manager); // 管理者
setOptionValue(record, targetSite.CheckF, targetSite.ClassJ, "ClassHash", record.Owner); // 担当者
setOptionValue(record, targetSite.CheckG, targetSite.ClassK, "DescriptionHash", record.Comments); // コメント
setOptionValue(record, targetSite.CheckH, targetSite.ClassL, "ClassHash", record.Creator); // 作成者
setOptionValue(record, targetSite.CheckI, targetSite.ClassM, "ClassHash", record.Updator); // 更新者
setOptionValue(record, targetSite.CheckJ, targetSite.ClassN, "DateHash", record.CreatedTime); // 作成日時
setOptionValue(record, targetSite.CheckK, targetSite.ClassO, "DateHash", record.UpdatedTime); // 更新日時
// オプション項目:不要な項目を削除
delete record.IssueId; // レコードID(期限付きテーブル)
delete record.ResultId; // レコードID(記録テーブル)
delete record.Ver; // バージョン
delete record.Status; // 状況
delete record.Manager; // 管理者
delete record.Owner; // 担当者
delete record.Comments; // コメント
delete record.Creator; // 作成者
delete record.Updator; // 更新者
delete record.CreatedTime; // 作成日時
delete record.UpdatedTime; // 更新日時
});
for (let formatSyncRecord of formatSyncRecords) {
// 同期方法:レコードにより処理を振り分け
switch (targetSite.ClassE) {
case 'Upsert':
items.Upsert(targetSite.ClassB, JSON.stringify(formatSyncRecord)); // 対象サイトID(同期先)
break;
case 'Create':
items.Create(targetSite.ClassB, JSON.stringify(formatSyncRecord)); // 対象サイトID(同期先)
break;
default:
break;
}
}
// Offsetを更新
offset += pageSize;
// 1000件に到達した場合は処理を終了
if (offset === 1000) {
let limitOverMesage = '';
limitOverMesage += '1回あたりの同期対象のレコード数が1000件を超えています。超過分のレコードは同期されませんでした。\n';
limitOverMesage += 'http://xxxx/items/' + targetSite.ResultId + '/edit';
// 実行ログ出力
this.writeLog('Warn', scheduleType, limitOverMesage);
// 更新失敗をフラグに設定
isUpdateSuccess = false;
// メール通知
let notification = notifications.New();
notification.Address = `[User${targetSite.Owner}]`;
notification.CcAddress = `[Group99]`; // 運用担当チーム
notification.Title = '社内データ分析環境:同期レコード数が超過しました。';
notification.Body = limitOverMesage;
notification.Send();
break;
}
data2.Offset = offset;
}
if (httpClient.IsSuccess) {
let httpClientInfoMesage = '';
httpClientInfoMesage += 'httpClientによる同期処理が正常に完了しました。\n';
httpClientInfoMesage += 'http://xxxx/items/' + targetSite.ResultId + '/edit';
this.writeLog('Info', scheduleType, httpClientInfoMesage);
} else {
let httpClientErrMesage = '';
httpClientErrMesage += 'httpClientによる同期処理が失敗しました。\n';
httpClientErrMesage += 'Error: (' + httpClient.StatusCode + ')' + response + '\n';
httpClientErrMesage += 'http://xxxx/items/' + targetSite.ResultId + '/edit';
this.writeLog('Error', scheduleType, httpClientErrMesage);
// 更新失敗をフラグに設定
isUpdateSuccess = false;
// メール通知
let notification = notifications.New();
notification.Address = `[User${targetSite.Owner}]`;
notification.CcAddress = `[Group99]`; // 運用担当チーム
notification.Title = '社内データ分析環境:httpClientによる同期処理が失敗しました。';
notification.Body = httpClientErrMesage;
notification.Send();
}
// 4. 本環境のデータ同期管理テーブルの「前回同期日時」を更新する
if (isUpdateSuccess) {
var lastSyncDateTime = formatDateTime(new Date());
let data3 = {
DateHash: {
DateA: lastSyncDateTime
}
}
items.Update(targetSite.ResultId, JSON.stringify(data3))
}
}
this.writeLog('Info', scheduleType, '同期処理を終了しました。');
} catch (e) {
this.writeLog('Error', scheduleType, e.stack);
// メール通知
let notification = notifications.New();
notification.Address = `[Group99]`; // 運用担当チーム
notification.Title = '社内データ分析環境:同期処理が失敗しました。';
notification.Body = e.stack;
notification.Send();
}
}
}
// 日時をYYYY/MM/DD hh:mm:ss形式にフォーマットする関数
function formatDateTime(date) {
let time = new Date(date);
let year = time.getFullYear();
let month = time.getMonth() + 1;
let day = time.getDate();
let hours = time.getHours();
let minutes = time.getMinutes();
let seconds = time.getSeconds();
return `${year}/${pad(month)}/${pad(day)} ${pad(hours)}:${pad(minutes)}:${pad(seconds)}`;
}
// 数値を先頭ゼロ埋めの2桁に変換する関数
function pad(num) {
return num.toString().padStart(2, '0');
}
// GridColumnsを取得する関数
function getGridColumns() {
const prefixes = [
'Class',
'Num',
'Date',
'Description',
'Check'
];
const letters = 'ABCDEFGHIJKLMNOPQRSTUVWXYZ'.split('');
const columns = [
'IssueId',
'ResultId',
'Ver',
'Title',
'Body',
'StartTime',
'CompletionTime',
'WorkValue',
'ProgressRate',
'Status',
'Manager',
'Owner',
'Comments',
'Creator',
'Updator',
'CreatedTime',
'UpdatedTime'
];
prefixes.forEach(function (prefix) {
letters.forEach(function (letter) {
columns.push(prefix + letter);
});
});
return columns;
}
// Data内の各オブジェクトをHashに変換する関数
function formatDataToHash(data) {
var keys = 'ABCDEFGHIJKLMNOPQRSTUVWXYZ'.split(''); // A-Zのキー
// オブジェクトを作成する関数
function createHash(prefix, record) {
var hash = {};
keys.forEach(function (key) {
var field = prefix + key; // 'ClassA', 'ClassB' など
if (record[field] !== undefined && record[field] !== null && record[field] !== '') {
hash[field] = record[field]; // 値が存在する場合だけ追加
}
});
return hash;
}
// data配列を変更して新しいプロパティを追加
data.forEach(function (record) {
record.ClassHash = createHash('Class', record);
record.NumHash = createHash('Num', record);
record.DateHash = createHash('Date', record);
record.DescriptionHash = createHash('Description', record);
record.CheckHash = createHash('Check', record);
});
return data;
}
// オプション項目:同期対象かチェックする関数
function setOptionValue(record, syncCheck, syncToColumn, syncToHashType, syncFromColumn) {
if (syncCheck) {
record[syncToHashType][syncToColumn] = syncFromColumn;
}
}
本番環境からのデータ同期(毎日).js(毎日01:00)
CommonSyncFromProduction.executeScript('Daily');
本番環境からのデータ同期(毎週).js(毎週日曜/02:00)
CommonSyncFromProduction.executeScript('Weekly');
本番環境からのデータ同期(毎月).js(毎月1日/03:00)
CommonSyncFromProduction.executeScript('Monthly');
共通処理クラスの実装
実際のデータ同期処理は、CommonSyncFromProduction
クラスに集約されています。このクラスでは以下の処理を実装しています。
- 同期対象の取得:「無効」チェックがオフの有効なレコードを抽出
- データ取得:API経由で「前回同期日時」以降の更新データを取得(200件ずつ、最大1,000件)
- データ処理:「レコードID」「バージョン」「状況」などのオプション項目を設定し、UpsertまたはCreateで反映
- ログ記録:実行ログテーブルに処理結果を記録(Debug、Info、Warn、Error)
- エラー処理:HTTP通信エラーや処理例外をキャッチし、担当者と運用担当チームにメール通知
- データ変換:プリザンターの汎用項目(ClassA-Z、NumA-Z、DateA-Z等)をHashオブジェクトに変換
共通処理クラスCommonSyncFromProduction
により、1,000件制限の監視、エラーハンドリング、メール通知機能を実装しています。
運用とメンテナンス
制限事項への対応
安全で効率的なデータ同期を実現するための注意事項と制限事項は以下のとおりです。
注意事項
正確なデータ同期を実現するため、以下の注意事項があります。
- サイト設定の統一:移行元と移行先のエディタ設定を同一にする
- 項目マッピング:表示名での一致などによる自動マッピングは行われない
制限事項
システムの安全性と処理効率を確保するため、以下の制限事項があります。
- 添付ファイルと画像は同期対象外
- 別テーブルへのリンク項目(分類項目)は「?」が表示される場合あり
- 1,000件制限を超える場合は手動でのエクスポート・インポートが必要
監視とトラブルシューティング
継続的で安定したデータ同期を実現するには、適切な監視とメンテナンスが重要です。
実行ログの活用
同期処理の結果は、実行ログテーブルで詳細を確認できるようにしています。
- スクリプト名:実行されたスクリプトの識別
- ログ種別:Debug、Info、Warn、Error
- 実行結果:同期処理の詳細な結果
運用上のポイント
安定した同期処理を維持するため、以下の運用ポイントが重要です。
- 同期元・同期先サイトIDの正確性
- 前回同期日時の初期設定(手動移行が完了した日時)
- Upsertキー項目の適切な設定
- 本番環境の対象テーブルに更新データ同期用APIユーザの読取権限が設定されていること
おわりに
本シリーズでは、プリザンターとMetabaseを組み合わせたデータ分析基盤の構築について、3回にわたって解説しました。セキュリティを確保しながら効率的なデータ分析環境を構築することで、組織のデータ活用を大幅に向上させることができます。
3回シリーズの連載をお読みいただき、ありがとうございました!