0
2

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 1 year has passed since last update.

【NestJS】Cloud Taskで500件ずつData StoreにDBのデータを登録する

Posted at

概要

RDBからDataStoreCloud Taskを利用してデータを移し替えていくプロセスをまとめる

必要なSDKをセットアップする

gcp-worker.service.ts
import {Injectable} from "@nestjs/common";
import {Datastore} from "@google-cloud/datastore";
import {CloudTasksClient} from "@google-cloud/tasks";

@Injectable()
export class GcpWorkerService {
    private readonly dataStore: Datastore
    private readonly cloudTasksClient: CloudTasksClient

    constructor(
        private configService: ConfigService
    ) {
        this.dataStore = new Datastore({
            projectId: this.configService.get('gcp.projectId'),
        })

        this.cloudTasksClient = new CloudTasksClient()
    }
}

バッチ用のエンドポイントを叩かれないように対策する

Cloud Runに定義した環境変数をもとにBatch用のCloud Run契機で実行されている場合のみ読み込むようにする

app.module.ts
import { MiddlewareConsumer, Module } from '@nestjs/common'
import { ConfigModule, ConfigService } from '@nestjs/config'
import { TypeOrmModule } from '@nestjs/typeorm'

const add_modules = (() => {
  const batch__only_modules = [QueueModule]

  const app_only_modules = [HogeModule]

  switch (process.env.SERVER_TYPE) {
    case 'batch':
      return batch__only_modules
    case 'app':
      return HogeModule
    default:
      return [...batch__only_modules, ...app_only_modules]
  }
})()

@Module({
  imports: [
    ConfigModule.forRoot({
      isGlobal: true,
      load: [configuration],
    }),
    TypeOrmModule.forRootAsync({
      imports: [ConfigModule],
      useFactory: (configService: ConfigService) => ({}),
      inject: [ConfigService],
    }),
    ...add_modules
  ],
  controllers: [AppController],
  providers: [AppService],
})
export class AppModule {
  configure(consumer: MiddlewareConsumer) {
    consumer.apply(LoggerMiddleware).forRoutes('')
  }
}

RDBからデータをfetchする処理を作成

500件ずつ取得してCloudTaskに別プロセスとして処理を行なってもらう

queue.controller.ts
@Controller('queue')
export class QueueController {
  constructor(
    private readonly queuesService: QueueService,
    private readonly gcpClientService: GcpClientService
  ) {}

  @Get()
  async findAll(skip = 0) {
    const take = 500
    const users = await getRepository(User)
      .createQueryBuilder('h')
      .skip(skip)
      .take(take)
      .getMany()

    if (users.length) {
      await this.gcpClientService.createHttpTask(users)
      await this.findAll(skip + take)
    }
  }

  /**
   * Cloud TaskでDataStoreに登録する処理
   * @param req
   */
  @Post('/save-data-store')
  async saveDataStore(@Req() req) {
    const users: User[] = req.body

    if (users.length) {
      users.map(async (user: User) => {
        const dto: UserDto = {
          ...backupFileMap,
          is_latest: backupFileMap.is_latest ?? true,
        }

        try {
          await this.gcpClientService.saveGcd(
            users.id,
            dto
          )
        } catch (e) {
          throw new InternalServerErrorException(e.message)
        }
      })
    }
  }
}

Cloud Taskにジョブを積んでいく処理を追加

gcp-worker.service.ts
  /**
   * Cloud Taskにタスクを登録
   * @param users
   */
  createHttpTask = (users) => {
    const queueData = this.cloudTasksClientQueueInit(
      this.configService.get('gcp.queueName'),
      HttpMethod.POST,
      users
    )

    const request = {
      parent: queueData.parent,
      task: queueData.task,
    }

    this.cloudTasksClient.createTask(request, { timeout: 10000 }).then((r) => {
      this.logger.log(r)
    })
  }

/**
   * Cloud taskをイニシャライズ
   * @param queueName
   * @param method
   * @param users
   */
  cloudTasksClientQueueInit = (
    queueName: string,
    method: number,
    users: User[]
  ) => {
    // キュー・リソース名を返す
    const parent = this.cloudTasksClient.queuePath(
      this.configService.get('gcp.projectId'),
      'asia-northeast1',
      queueName
    )

    const taskUrl = `${this.configService.get(
      'gcp.batchContainerUrl'
    )}/queue/save-data-store`

    const task = {
      httpRequest: {
        httpMethod: method,
        url: taskUrl,
        oidcToken: {
          serviceAccountEmail: this.configService.get(
            'gcp.cronServiceAccountEmail'
          ),
        },
        body: Buffer.from(JSON.stringify(users)).toString('base64'),
        headers: {
          'Content-Type': 'application/json',
        },
      },
    }

    return {
      parent: parent,
      task: task,
    }
  }

DataStoreに登録する処理を追加

gcp-worker.service.ts
/**
   * @function DataStoreへの登録
   * @param terminalUuid
   * @param users
   */
  async saveGcd(
    terminalUuid: string,
    dto: UserDto
  ): Promise<void> {
    const datastore = this.getDataStore()

    const { path } = dto
    const paths = path.split('/')
    paths.unshift('/')
    const datastoreKey: entity.Key = datastore.key(terminalUuid)

    // 各階層単位でパスをハッシュ化する
    const basePaths = paths.map((path: string, index: number) => {
      return paths
        .slice(0, index + 1)
        .join('/')
        .slice(1)
    })

    const hashPaths = basePaths.map((base: string) => {
      return crypto
        .createHash('sha1')
        .update(base === '' ? '/' : base)
        .digest('hex')
    })

    hashPaths.map(async (hashPath: string, index: number) => {
      try {
        const isLast = paths.length - 1 === index
        const hashObject = {
          parent_hash: hashPaths[index - 1] ?? null,
          display_name: paths[index],
          hash: hashPath,
          is_latest: backupFileMapDto.is_latest ?? true,
          parent_tree_path:
            basePaths[index - 1] == '' ? '/' : basePaths[index - 1],
          is_file: isLast,
        }

        const entity = {
          key: datastoreKey,
          data: isLast
            ? Object.assign(dto, hashObject)
            : hashObject,
        }

        const [data] = await datastore.runQuery(
          await datastore
            .createQuery(terminalUuid)
            .filter('hash', '=', hashPath)
            .filter('is_latest', '=', true)
        )

        // ディレクトリが初回登録なら登録
        if (!data.length && !isLast) {
          await datastore.save(entity)
        } else if (!data.length && isLast) {
          // データが無くてファイル登録の場合
          await datastore.save(entity)
        } else if (data.length && isLast) {
          data[0].is_latest = false
          await datastore.update(data)
          await datastore.save(entity)
        }
      } catch (e) {
        throw new InternalServerErrorException(e)
      }
    })
  }
0
2
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
2

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?