概要
RDBからDataStoreにCloud Taskを利用してデータを移し替えていくプロセスをまとめる
必要なSDKをセットアップする
gcp-worker.service.ts
import {Injectable} from "@nestjs/common";
import {Datastore} from "@google-cloud/datastore";
import {CloudTasksClient} from "@google-cloud/tasks";
@Injectable()
export class GcpWorkerService {
    private readonly dataStore: Datastore
    private readonly cloudTasksClient: CloudTasksClient
    constructor(
        private configService: ConfigService
    ) {
        this.dataStore = new Datastore({
            projectId: this.configService.get('gcp.projectId'),
        })
        this.cloudTasksClient = new CloudTasksClient()
    }
}
バッチ用のエンドポイントを叩かれないように対策する
Cloud Runに定義した環境変数をもとにBatch用のCloud Run契機で実行されている場合のみ読み込むようにする
app.module.ts
import { MiddlewareConsumer, Module } from '@nestjs/common'
import { ConfigModule, ConfigService } from '@nestjs/config'
import { TypeOrmModule } from '@nestjs/typeorm'
const add_modules = (() => {
  const batch__only_modules = [QueueModule]
  const app_only_modules = [HogeModule]
  switch (process.env.SERVER_TYPE) {
    case 'batch':
      return batch__only_modules
    case 'app':
      return HogeModule
    default:
      return [...batch__only_modules, ...app_only_modules]
  }
})()
@Module({
  imports: [
    ConfigModule.forRoot({
      isGlobal: true,
      load: [configuration],
    }),
    TypeOrmModule.forRootAsync({
      imports: [ConfigModule],
      useFactory: (configService: ConfigService) => ({}),
      inject: [ConfigService],
    }),
    ...add_modules
  ],
  controllers: [AppController],
  providers: [AppService],
})
export class AppModule {
  configure(consumer: MiddlewareConsumer) {
    consumer.apply(LoggerMiddleware).forRoutes('')
  }
}
RDBからデータをfetchする処理を作成
500件ずつ取得してCloudTaskに別プロセスとして処理を行なってもらう
queue.controller.ts
@Controller('queue')
export class QueueController {
  constructor(
    private readonly queuesService: QueueService,
    private readonly gcpClientService: GcpClientService
  ) {}
  @Get()
  async findAll(skip = 0) {
    const take = 500
    const users = await getRepository(User)
      .createQueryBuilder('h')
      .skip(skip)
      .take(take)
      .getMany()
    if (users.length) {
      await this.gcpClientService.createHttpTask(users)
      await this.findAll(skip + take)
    }
  }
  /**
   * Cloud TaskでDataStoreに登録する処理
   * @param req
   */
  @Post('/save-data-store')
  async saveDataStore(@Req() req) {
    const users: User[] = req.body
    if (users.length) {
      users.map(async (user: User) => {
        const dto: UserDto = {
          ...backupFileMap,
          is_latest: backupFileMap.is_latest ?? true,
        }
        try {
          await this.gcpClientService.saveGcd(
            users.id,
            dto
          )
        } catch (e) {
          throw new InternalServerErrorException(e.message)
        }
      })
    }
  }
}
Cloud Taskにジョブを積んでいく処理を追加
gcp-worker.service.ts
  /**
   * Cloud Taskにタスクを登録
   * @param users
   */
  createHttpTask = (users) => {
    const queueData = this.cloudTasksClientQueueInit(
      this.configService.get('gcp.queueName'),
      HttpMethod.POST,
      users
    )
    const request = {
      parent: queueData.parent,
      task: queueData.task,
    }
    this.cloudTasksClient.createTask(request, { timeout: 10000 }).then((r) => {
      this.logger.log(r)
    })
  }
/**
   * Cloud taskをイニシャライズ
   * @param queueName
   * @param method
   * @param users
   */
  cloudTasksClientQueueInit = (
    queueName: string,
    method: number,
    users: User[]
  ) => {
    // キュー・リソース名を返す
    const parent = this.cloudTasksClient.queuePath(
      this.configService.get('gcp.projectId'),
      'asia-northeast1',
      queueName
    )
    const taskUrl = `${this.configService.get(
      'gcp.batchContainerUrl'
    )}/queue/save-data-store`
    const task = {
      httpRequest: {
        httpMethod: method,
        url: taskUrl,
        oidcToken: {
          serviceAccountEmail: this.configService.get(
            'gcp.cronServiceAccountEmail'
          ),
        },
        body: Buffer.from(JSON.stringify(users)).toString('base64'),
        headers: {
          'Content-Type': 'application/json',
        },
      },
    }
    return {
      parent: parent,
      task: task,
    }
  }
DataStoreに登録する処理を追加
gcp-worker.service.ts
/**
   * @function DataStoreへの登録
   * @param terminalUuid
   * @param users
   */
  async saveGcd(
    terminalUuid: string,
    dto: UserDto
  ): Promise<void> {
    const datastore = this.getDataStore()
    const { path } = dto
    const paths = path.split('/')
    paths.unshift('/')
    const datastoreKey: entity.Key = datastore.key(terminalUuid)
    // 各階層単位でパスをハッシュ化する
    const basePaths = paths.map((path: string, index: number) => {
      return paths
        .slice(0, index + 1)
        .join('/')
        .slice(1)
    })
    const hashPaths = basePaths.map((base: string) => {
      return crypto
        .createHash('sha1')
        .update(base === '' ? '/' : base)
        .digest('hex')
    })
    hashPaths.map(async (hashPath: string, index: number) => {
      try {
        const isLast = paths.length - 1 === index
        const hashObject = {
          parent_hash: hashPaths[index - 1] ?? null,
          display_name: paths[index],
          hash: hashPath,
          is_latest: backupFileMapDto.is_latest ?? true,
          parent_tree_path:
            basePaths[index - 1] == '' ? '/' : basePaths[index - 1],
          is_file: isLast,
        }
        const entity = {
          key: datastoreKey,
          data: isLast
            ? Object.assign(dto, hashObject)
            : hashObject,
        }
        const [data] = await datastore.runQuery(
          await datastore
            .createQuery(terminalUuid)
            .filter('hash', '=', hashPath)
            .filter('is_latest', '=', true)
        )
        // ディレクトリが初回登録なら登録
        if (!data.length && !isLast) {
          await datastore.save(entity)
        } else if (!data.length && isLast) {
          // データが無くてファイル登録の場合
          await datastore.save(entity)
        } else if (data.length && isLast) {
          data[0].is_latest = false
          await datastore.update(data)
          await datastore.save(entity)
        }
      } catch (e) {
        throw new InternalServerErrorException(e)
      }
    })
  }