概要
RDB
からDataStore
にCloud Task
を利用してデータを移し替えていくプロセスをまとめる
必要なSDKをセットアップする
gcp-worker.service.ts
import {Injectable} from "@nestjs/common";
import {Datastore} from "@google-cloud/datastore";
import {CloudTasksClient} from "@google-cloud/tasks";
@Injectable()
export class GcpWorkerService {
private readonly dataStore: Datastore
private readonly cloudTasksClient: CloudTasksClient
constructor(
private configService: ConfigService
) {
this.dataStore = new Datastore({
projectId: this.configService.get('gcp.projectId'),
})
this.cloudTasksClient = new CloudTasksClient()
}
}
バッチ用のエンドポイントを叩かれないように対策する
Cloud Run
に定義した環境変数をもとにBatch
用のCloud Run
契機で実行されている場合のみ読み込むようにする
app.module.ts
import { MiddlewareConsumer, Module } from '@nestjs/common'
import { ConfigModule, ConfigService } from '@nestjs/config'
import { TypeOrmModule } from '@nestjs/typeorm'
const add_modules = (() => {
const batch__only_modules = [QueueModule]
const app_only_modules = [HogeModule]
switch (process.env.SERVER_TYPE) {
case 'batch':
return batch__only_modules
case 'app':
return HogeModule
default:
return [...batch__only_modules, ...app_only_modules]
}
})()
@Module({
imports: [
ConfigModule.forRoot({
isGlobal: true,
load: [configuration],
}),
TypeOrmModule.forRootAsync({
imports: [ConfigModule],
useFactory: (configService: ConfigService) => ({}),
inject: [ConfigService],
}),
...add_modules
],
controllers: [AppController],
providers: [AppService],
})
export class AppModule {
configure(consumer: MiddlewareConsumer) {
consumer.apply(LoggerMiddleware).forRoutes('')
}
}
RDB
からデータをfetch
する処理を作成
500件ずつ取得してCloudTask
に別プロセスとして処理を行なってもらう
queue.controller.ts
@Controller('queue')
export class QueueController {
constructor(
private readonly queuesService: QueueService,
private readonly gcpClientService: GcpClientService
) {}
@Get()
async findAll(skip = 0) {
const take = 500
const users = await getRepository(User)
.createQueryBuilder('h')
.skip(skip)
.take(take)
.getMany()
if (users.length) {
await this.gcpClientService.createHttpTask(users)
await this.findAll(skip + take)
}
}
/**
* Cloud TaskでDataStoreに登録する処理
* @param req
*/
@Post('/save-data-store')
async saveDataStore(@Req() req) {
const users: User[] = req.body
if (users.length) {
users.map(async (user: User) => {
const dto: UserDto = {
...backupFileMap,
is_latest: backupFileMap.is_latest ?? true,
}
try {
await this.gcpClientService.saveGcd(
users.id,
dto
)
} catch (e) {
throw new InternalServerErrorException(e.message)
}
})
}
}
}
Cloud Task
にジョブを積んでいく処理を追加
gcp-worker.service.ts
/**
* Cloud Taskにタスクを登録
* @param users
*/
createHttpTask = (users) => {
const queueData = this.cloudTasksClientQueueInit(
this.configService.get('gcp.queueName'),
HttpMethod.POST,
users
)
const request = {
parent: queueData.parent,
task: queueData.task,
}
this.cloudTasksClient.createTask(request, { timeout: 10000 }).then((r) => {
this.logger.log(r)
})
}
/**
* Cloud taskをイニシャライズ
* @param queueName
* @param method
* @param users
*/
cloudTasksClientQueueInit = (
queueName: string,
method: number,
users: User[]
) => {
// キュー・リソース名を返す
const parent = this.cloudTasksClient.queuePath(
this.configService.get('gcp.projectId'),
'asia-northeast1',
queueName
)
const taskUrl = `${this.configService.get(
'gcp.batchContainerUrl'
)}/queue/save-data-store`
const task = {
httpRequest: {
httpMethod: method,
url: taskUrl,
oidcToken: {
serviceAccountEmail: this.configService.get(
'gcp.cronServiceAccountEmail'
),
},
body: Buffer.from(JSON.stringify(users)).toString('base64'),
headers: {
'Content-Type': 'application/json',
},
},
}
return {
parent: parent,
task: task,
}
}
DataStore
に登録する処理を追加
gcp-worker.service.ts
/**
* @function DataStoreへの登録
* @param terminalUuid
* @param users
*/
async saveGcd(
terminalUuid: string,
dto: UserDto
): Promise<void> {
const datastore = this.getDataStore()
const { path } = dto
const paths = path.split('/')
paths.unshift('/')
const datastoreKey: entity.Key = datastore.key(terminalUuid)
// 各階層単位でパスをハッシュ化する
const basePaths = paths.map((path: string, index: number) => {
return paths
.slice(0, index + 1)
.join('/')
.slice(1)
})
const hashPaths = basePaths.map((base: string) => {
return crypto
.createHash('sha1')
.update(base === '' ? '/' : base)
.digest('hex')
})
hashPaths.map(async (hashPath: string, index: number) => {
try {
const isLast = paths.length - 1 === index
const hashObject = {
parent_hash: hashPaths[index - 1] ?? null,
display_name: paths[index],
hash: hashPath,
is_latest: backupFileMapDto.is_latest ?? true,
parent_tree_path:
basePaths[index - 1] == '' ? '/' : basePaths[index - 1],
is_file: isLast,
}
const entity = {
key: datastoreKey,
data: isLast
? Object.assign(dto, hashObject)
: hashObject,
}
const [data] = await datastore.runQuery(
await datastore
.createQuery(terminalUuid)
.filter('hash', '=', hashPath)
.filter('is_latest', '=', true)
)
// ディレクトリが初回登録なら登録
if (!data.length && !isLast) {
await datastore.save(entity)
} else if (!data.length && isLast) {
// データが無くてファイル登録の場合
await datastore.save(entity)
} else if (data.length && isLast) {
data[0].is_latest = false
await datastore.update(data)
await datastore.save(entity)
}
} catch (e) {
throw new InternalServerErrorException(e)
}
})
}