はじめに
前回の記事「Amazon S3 / Google Cloud Storageから、イベントドリブンでTROCCOを実行する(Terraform Module付き)」で、オブジェクトストレージ起点でイベントドリブンにTROCCOを実行する方法について整理してみました。
ただ、手動アップロードが絡むのはビジネスユーザー起点のものも多く、Google DriveやBoxなどはアップロード先の1つになるでしょう。
Boxの方はイベントトリガーのWebhookがあるので、おそらくこれを使えば実現できそうですが、検証できる環境が手元にありません。
そこで、まずはGoogle Driveについてやり方を考えてみました。
こんな方におすすめ
- TROCCOをファイル格納のイベントドリブンで実行したい方
先に結論
- Google Driveと聞いてまず(?)頭に浮かぶGoogle Apps Scriptでは、スクリプト実行のトリガーにファイルのアップロードはありません。
- 正確にイベントドリブンで実装をしようとすると、Google Workspace Events APIというものを使うしかないようです。
- さすがにこれを気軽に利用する権限は私にはないので、こちらの利用は諦めました・・・。おそらくPub/Subに飛ばせば、その後は前回の記事同様の実装で利用できそうです。
- ということで、以下の内容はGoogle Apps Scriptを利用したファイル変更の定期確認→変更検知時のAPIによるジョブ実行の仕組みになっています。
できるようになったこと
- 指定の時間間隔(分)でスクリプトを実行
- ファイルの変更があるときに、API経由で転送ジョブを実行
処理の流れ
- スプレッドシートに転送設定のID、フォルダのURL、ファイルサフィックスのマッピングを一覧にしておく
- マッピングのフォルダで取得したファイルの一覧と、新たに取得したファイルの一覧を取得して、差分があるときにTROCCO APIを実行する
実装したコード
処理としては非常にシンプルです。トリガーの部分だけやや特殊で、実行のたびに既存のトリガーを削除&新規トリガーを作成しています。一応1, 5, 10 15, 30分おきの実行はできますが、この方が処理を柔軟に設定できるからです。
(そもそもN分おきという設定があることを全く認知しておらず、無駄に書いてしまったのはそれはそれとして。)
/**
* Google Drive Event-Driven Ingestion for TROCCO
*/
const TROCCO_BASE_URL = "https://trocco.io";
/**
* 定期実行のメイン関数
* 指定間隔でファイルの差分をチェック
*/
function main() {
try {
console.log("Started file check process");
const intervalMinutes = 5;
setupTrigger(intervalMinutes);
const spreadsheetUrl =
PropertiesService.getScriptProperties().getProperty("SPREADSHEET_URL");
const spreadsheet = SpreadsheetApp.openByUrl(spreadsheetUrl);
const mappings = getJobMappings(spreadsheet);
console.log(`Found ${mappings.length} job mappings`);
for (const mapping of mappings) {
try {
processFolderMapping(mapping, spreadsheet);
} catch (error) {
console.error(
`Error processing mapping ${mapping.job_definition_id}:`,
error,
);
sendSlackNotification(
`マッピング処理エラー: ${mapping.job_definition_id} - ${error.message}`,
);
}
}
console.log("File check process completed");
} catch (error) {
console.error("Main process error:", error);
sendSlackNotification(`システムエラー: ${error.message}`);
}
}
/**
* トリガーを設定
*/
function setupTrigger(intervalMinutes) {
ScriptApp.getProjectTriggers().forEach((trigger) => {
if (trigger.getHandlerFunction() === "main") {
ScriptApp.deleteTrigger(trigger);
}
});
const currentMinute = new Date().getMinutes();
const nextTriggerMinute =
Math.ceil((currentMinute + 1) / intervalMinutes) * intervalMinutes;
const nextTriggerTime = new Date();
nextTriggerTime.setMinutes(nextTriggerMinute % 60);
nextTriggerTime.setSeconds(0);
nextTriggerTime.setMilliseconds(0);
if (nextTriggerTime <= new Date()) {
nextTriggerTime.setHours(nextTriggerTime.getHours() + 1);
}
ScriptApp.newTrigger("main").timeBased().at(nextTriggerTime).create();
console.log(
`Next trigger scheduled at: ${nextTriggerTime.toLocaleString()} (${intervalMinutes}min interval)`,
);
}
/**
* ジョブマッピング設定を取得
*/
function getJobMappings(spreadsheet) {
const sheet = spreadsheet.getSheetByName("job_mappings");
const data = sheet.getDataRange().getValues();
const mappings = [];
for (let i = 1; i < data.length; i++) {
if (data[i][0] && data[i][1] && data[i][2]) {
mappings.push({
job_definition_id: data[i][0],
folder_url: data[i][1],
file_suffix: data[i][2],
});
}
}
return mappings;
}
/**
* 個別フォルダマッピングの処理
*/
function processFolderMapping(mapping, spreadsheet) {
console.log(`Processing folder: ${mapping.folder_url}`);
const folderId = mapping.folder_url.split("/")[7];
const currentFiles = getCurrentFilesFromFolder(folderId, mapping.file_suffix);
const processedFiles = getProcessedFiles(folderId, spreadsheet);
const newFiles = findNewFiles(currentFiles, processedFiles);
console.log(`Found ${newFiles.length} new files in folder ${folderId}`);
for (const file of newFiles) {
try {
processNewFile(file, mapping);
recordProcessedFile(file, folderId, spreadsheet);
} catch (error) {
console.error(`Error processing file ${file.name}:`, error);
sendSlackNotification(
`ファイル処理エラー: ${file.name} - ${error.message}`,
);
}
}
}
/**
* 指定フォルダから指定サフィックスのファイル情報を取得
*/
function getCurrentFilesFromFolder(folderId, fileSuffix) {
const folder = DriveApp.getFolderById(folderId);
const files = [];
const iterator = folder.getFiles();
while (iterator.hasNext()) {
const file = iterator.next();
const fileName = file.getName();
if (fileSuffix && !fileName.endsWith(fileSuffix)) {
continue;
}
files.push({
id: file.getId(),
name: fileName,
url: file.getUrl(),
lastUpdated: file.getLastUpdated().toISOString(),
size: file.getSize(),
});
}
return files;
}
/**
* 既に処理済みのファイル情報を取得
*/
function getProcessedFiles(folderId, spreadsheet) {
const sheetName = `${folderId}_files`;
let sheet = spreadsheet.getSheetByName(sheetName);
if (!sheet) {
sheet = spreadsheet.insertSheet(sheetName);
sheet
.getRange(1, 1, 1, 6)
.setValues([
[
"file_id",
"file_name",
"file_url",
"last_updated",
"file_size",
"processed_at",
],
]);
return new Set();
}
const data = sheet.getDataRange().getValues();
const processedFiles = new Set();
for (let i = 1; i < data.length; i++) {
if (data[i][0]) {
processedFiles.add(data[i][0]);
}
}
return processedFiles;
}
/**
* 新しいファイルを特定
*/
function findNewFiles(currentFiles, processedFiles) {
return currentFiles.filter((file) => !processedFiles.has(file.id));
}
/**
* 新しいファイルを処理
*/
function processNewFile(file, mapping) {
console.log(
`Processing file: ${file.name} with job_definition_id: ${mapping.job_definition_id}`,
);
const result = executeTroccoJob(mapping.job_definition_id, file);
console.log(`TROCCO job started: ${JSON.stringify(result)}`);
}
/**
* TROCCO転送ジョブを実行
*/
function executeTroccoJob(jobDefinitionId, file) {
const apiKey =
PropertiesService.getScriptProperties().getProperty("TROCCO_API_KEY");
const url = `${TROCCO_BASE_URL}/api/jobs?job_definition_id=${jobDefinitionId}`;
const payload = {
custom_variables: [
{ name: "$file_name$", value: file.name },
{ name: "$file_url$", value: file.url.split('?')[0] },
{ name: "$file_size$", value: file.size.toString() },
{ name: "$file_last_updated$", value: file.lastUpdated },
],
};
console.log(
"TROCCO API execution variables:",
JSON.stringify(payload.custom_variables, null, 2),
);
const options = {
method: "POST",
headers: {
Authorization: `Token ${apiKey}`,
"Content-Type": "application/json",
},
payload: JSON.stringify(payload),
};
const response = UrlFetchApp.fetch(url, options);
if (response.getResponseCode() !== 200) {
throw new Error(
`TROCCO API error: ${response.getResponseCode()} - ${response.getContentText()}`,
);
}
return JSON.parse(response.getContentText());
}
/**
* 処理済みファイル情報をスプレッドシートに記録
*/
function recordProcessedFile(file, folderId, spreadsheet) {
const sheetName = `${folderId}_files`;
const sheet = spreadsheet.getSheetByName(sheetName);
sheet.appendRow([
file.id,
file.name,
file.url,
file.lastUpdated,
file.size,
new Date().toISOString(),
]);
console.log(`Recorded processed file: ${file.name} in ${sheetName}`);
}
/**
* Slack通知
*/
function sendSlackNotification(message) {
try {
const webhookUrl =
PropertiesService.getScriptProperties().getProperty("SLACK_WEBHOOK_URL");
if (!webhookUrl) {
console.log("SLACK_WEBHOOK_URL not set, skipping notification");
return;
}
const payload = {
text: `[Google Drive Ingestion] ${message}`,
};
UrlFetchApp.fetch(webhookUrl, {
method: "POST",
headers: { "Content-Type": "application/json" },
payload: JSON.stringify(payload),
});
} catch (error) {
console.error("Slack notification failed:", error);
}
}
注意点
- 同一のタイミングで複数ファイルの格納が検知されると、ジョブの同時実行が発生する可能性があります。その場合、転送先で適切に処理できるかは処理設計次第になるのでお気をつけください。
- 大規模に扱おうとすると、これで管理しきるのもなかなか難しい気もしています。
おわりに
Claude Codeにコードを書かせたら全体として2時間ほどで実装できました。こういうのをサクッと作れるの便利だなーと改めて。