0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

コンテキスト: web からダウンロードした CSV を Big Query に Load したいが、事前にちょっとした加工(Transform ほどではない)をしておきたい

TL;DR

  • 「CSV 更新を検知するたび Load」自体はできたので一定の学びはあった
  • 一部のデータのロードには失敗した
    • 理由: 1 次クリーニングが必要なデータだったため

敗因

始めて触る(クリーニングに必要な工数を把握していない)データであるにも関わらず、いきなりクラウド上で作業を始めてしまったこと

要件

  • ダウンロードした CSV を Big Query に Load したい
  • しかし下記の問題があり、そのままでは正常に Load できなかった
    • エンコーディング形式が Shift_JIS だった → UTF-8 にしておきたい
    • カラム名が日本語だった → Load 時にスキーマを指定する必要がある
    • {数字}{数字};{数字} が混在した列が INT64 と認識されてエラーになる -> STRING でロードする必要がある
  • csv は定期更新されるので上記の処理は自動化しておきたい
  • ファイルは Google Cloud Storage に置きたい
    • ローカルストレージから Big Query へのファイルアップロードにはサイズ上限があるため
  • データについて

ダウンロードした zip の内容

❯ tree
.
├── demo202402.csv
├── drug202402.csv
├── hist202402.csv
├── reac202402.csv
├── 利用規約.txt
└── 初めにお読みください.txt

設計

  • Big Query への CSV アップロード処理を Cloud Functions で書く
    • エンコーディング変換処理にはストリームと一時ファイルを使う
  • ファイルの更新によって Cloud Functions をトリガーする

実装手順

  1. まず 1 つの CSV に着目して実装し、ファイル更新の度に BigQuery へのロードが成功するか確認する
  2. ファイル名とスキーマのマップを作り、それを利用して四種類の CSV 全てのロードに対応させる

事前準備

エンコーディングが Shift_JIS だと作業もままならないので UTF-8 版を作っておく(あくまでも手元の作業用)

❯ iconv -f CP932 -t UTF-8 demo202402.csv > demo202402_utf8.csv
❯ head demo202402_utf8.csv
識別番号,報告回数,性別,年齢,体重,身長,報告年度・四半期,状況,報告の種類,報告者の資格,E2B
AB-04000003,01,女性,60歳代,,,2004・第一,調査完了,自発報告,消費者等,R2
AB-04000005,02,男性,20歳代,,,2004・第一,調査完了,試験,医師,R2
AB-04000006,04,女性,40歳代,,,2004・第一,調査完了,自発報告,医師,R2
AB-04000008,03,男性,70歳代,,,2004・第一,調査完了,自発報告,医師,R2
AB-04000010,02,女性,20歳代,50kg台,,2004・第一,調査完了,試験,,R2
AB-04000011,03,女性,60歳代,50kg台,,2004・第一,調査完了,試験,医師,R2
AB-04000012,01,男性,80歳代,50kg台,150cm台,2004・第一,調査完了,自発報告,,R2
AB-04000013,01,女性,50歳代,,,2004・第一,調査完了,自発報告,医師,R2
AB-04000014,01,女性,30歳代,40kg台,150cm台,2004・第一,調査完了,自発報告,,R2

このようなファイルをロードする

実装

まず一つのファイルをロードできるようにする

とりあえず demo データをロードできるようにした。

コード全体
// Function to convert file encoding from Shift_JIS to UTF8
const {Storage} = require('@google-cloud/storage');
const iconv = require('iconv-lite');
const storage = new Storage();

async function convertFileEncoding(bucketName, filePath, tempBucket, tempFilePath) {
  const readStream = storage.bucket(bucketName).file(filePath).createReadStream();
  const convertStream = readStream.pipe(iconv.decodeStream('cp932')).pipe(iconv.encodeStream('utf8'));

  const writeStream = storage.bucket(tempBucket).file(tempFilePath).createWriteStream();

  await new Promise((resolve, reject) => {
    convertStream.pipe(writeStream).on('finish', resolve).on('error', reject);
  });
}

// Function to delete tempfile from Cloud Storage
async function deleteTempFile(tempBucket, tempFilePath) {
  await storage.bucket(tempBucket).file(tempFilePath).delete();
}

// Function to load file to BigQuery
const {BigQuery} = require('@google-cloud/bigquery');
const bigquery = new BigQuery();
const fs = require('fs');

async function loadFileToBigQuery(datasetId, tableId, bucketName, filePath, jobConfig) {
  await bigquery.dataset(datasetId).table(tableId).load(storage.bucket(bucketName).file(filePath), jobConfig);
}

// Entrypoint
const functions = require('@google-cloud/functions-framework');
const tmp = require('tmp');
const path = require('path');

functions.cloudEvent('uploadCsvToBigQuery', async (cloudEvent) => {
  const file = cloudEvent.data;
  const bucketName = file.bucket;
  const tempBucket = 'jader_temp'; // use another bucket to avoid infinite loop
  const filePath = file.name;
  const fileName = path.basename(filePath);
  const tempFileName = path.basename(tmp.tmpNameSync()) + '.csv';
  const tempFilePath = `temp/${tempFileName}`;

  if (!fileName.endsWith('.csv')) {
    console.log('not csv');
    return;
  }
  if (!fileName.startsWith('demo')) {
    console.log("not 'demo' file");
    return;
  }

  const datasetId  = 'jader';
  const tableId = fileName.replace(/[0-9]{6}\.csv/, '');

  const jobConfig = {
    sourceFormat: 'CSV',
    schema: {
        fields: [
        {name: 'ID', type: 'STRING'},
        {name: 'TIMES_REPORTED', type: 'INT64'},
        {name: 'SEX', type: 'STRING'},
        {name: 'AGE', type: 'STRING'},
        {name: 'WEIGHT', type: 'STRING'},
        {name: 'HEIGHT', type: 'STRING'},
        {name: 'FYEAR_QUARTER_REPORTED', type: 'STRING'},
        {name: 'SURVEY_STATUS', type: 'STRING'},
        {name: 'REPORT_CATEGORY', type: 'STRING'},
        {name: 'REPORTER_LICENSE', type: 'STRING'},
        {name: 'E2B', type: 'STRING'}
      ],
    },
    skipLeadingRows: 1,
    autodetect: false,
  };

  try {
    console.log('Converting file encoding...');
    await convertFileEncoding(bucketName, filePath, tempBucket, tempFilePath);
    
    console.log('Loading file to BigQuery...');
    await loadFileToBigQuery(datasetId, tableId, tempBucket, tempFilePath, jobConfig);
    
    console.log('Cleaning up...');
    await deleteTempFile(tempBucket, tempFilePath);
    
    console.log('Process completed successfully.');
  } catch (error) {
    console.error(`Process failed: ${error.message}`);
    await deleteTempFile(tempBucket, tempFilePath); // Ensure temp file is deleted even on error
    throw error;
  }
});

ファイルの更新ごとに実行したかったちょっとした処理は「エンコーディング変換」と「スキーマ指定」なので、下にメモ書きを残しておく

エンコーディング変換

ポイント:

  • ストリームを使っている: サイズが大きめのファイルを扱うので
  • UTF-8 変換後のファイルも CloudStorage に保存している: BigQuery にロードするときのファイルサイズ制限を回避するため
// Function to convert file encoding from Shift_JIS to UTF8
const {Storage} = require('@google-cloud/storage');
const iconv = require('iconv-lite');
const storage = new Storage();

async function convertFileEncoding(bucketName, filePath, tempBucket, tempFilePath) {
  const readStream = storage.bucket(bucketName).file(filePath).createReadStream();
  const convertStream = readStream.pipe(iconv.decodeStream('cp932')).pipe(iconv.encodeStream('utf8'));

  const writeStream = storage.bucket(tempBucket).file(tempFilePath).createWriteStream();

  await new Promise((resolve, reject) => {
    convertStream.pipe(writeStream).on('finish', resolve).on('error', reject);
  });
}

スキーマ指定部

データを見てみたら、ほぼ全ての列を STRING にするしかなかった。

 const jobConfig = {
    sourceFormat: 'CSV',
    schema: {
        fields: [
        {name: 'ID', type: 'STRING'},
        {name: 'TIMES_REPORTED', type: 'INT64'},
        {name: 'SEX', type: 'STRING'},
        {name: 'AGE', type: 'STRING'},
        {name: 'WEIGHT', type: 'STRING'},
        {name: 'HEIGHT', type: 'STRING'},
        {name: 'FYEAR_QUARTER_REPORTED', type: 'STRING'},
        {name: 'SURVEY_STATUS', type: 'STRING'},
        {name: 'REPORT_CATEGORY', type: 'STRING'},
        {name: 'REPORTER_LICENSE', type: 'STRING'},
        {name: 'E2B', type: 'STRING'}
      ],
    },
    skipLeadingRows: 1,
    autodetect: false,
  };
//(中略)
async function loadFileToBigQuery(datasetId, tableId, bucketName, filePath, jobConfig) {
  await bigquery.dataset(datasetId).table(tableId).load(storage.bucket(bucketName).file(filePath), jobConfig);
}

四種類の CSV 全てのアップロードに対応させる

下記のコードはロジック的には問題ないのだが、データのクリーニングをしていないため一部のデータではロードエラーが出る状態。

差分
 async function loadFileToBigQuery(datasetId, tableId, bucketName, filePath, jobConfig) {
     await bigquery.dataset(datasetId).table(tableId).load(storage.bucket(bucketName).file(filePath), jobConfig);
 }
+const datasetSchemas = {
+    'jader': {
+        fields: [
+            { name: 'ID', type: 'STRING' },
+            { name: 'TIMES_REPORTED', type: 'INT64' },
+            { name: 'SEX', type: 'STRING' },
+            { name: 'AGE', type: 'STRING' },
+            { name: 'WEIGHT', type: 'STRING' },
+            { name: 'HEIGHT', type: 'STRING' },
+            { name: 'FYEAR_QUARTER_REPORTED', type: 'STRING' },
+            { name: 'SURVEY_STATUS', type: 'STRING' },
+            { name: 'REPORT_CATEGORY', type: 'STRING' },
+            { name: 'REPORTER_LICENSE', type: 'STRING' },
+            { name: 'E2B', type: 'STRING' }
+        ],
+    },
+    'hist': {
+        fields: [
+            { name: 'ID', type: 'STRING' },
+            { name: 'TIMES_REPORTED', type: 'INT64' },
+            { name: 'PRIMARY_DESEASE_CODE', type: 'INT64' },
+            { name: 'PRIMARY_DESEASE', type: 'STRING' },
+        ],
+    },
+    'drug': {
+        fields: [
+            { name: 'ID', type: 'STRING' },
+            { name: 'TIMES_REPORTED', type: 'INT64' },
+            { name: 'DRUG_CODE', type: 'INT64' },
+            { name: 'DRUG_INVOLVEMENT', type: 'STRING' },
+            { name: 'DRUG_GENERAL_NAME', type: 'STRING' },
+            { name: 'DRUG_PRODUCT_NAME', type: 'STRING' },
+            { name: 'ADMINISTRATION_ROUTE', type: 'STRING' },
+            { name: 'DATE_ADMINISTRATION_START', type: 'STRING' },
+            { name: 'DATE_ADMINISTRATION_END', type: 'STRING' },
+            { name: 'AMOUNT_ADMINISTRATION', type: 'STRING' },
+            { name: 'UNIT_ADMINISTRATION', type: 'STRING' },
+            { name: 'TIMES_DIVIDED_ADMINISTRATION', type: 'STRING' },
+            { name: 'REASON_ADMINISTRATION', type: 'STRING' },
+            { name: 'DRUG_MANIPURATION', type: 'STRING' },
+            { name: 'RECURRENCED_BY_READMINISTRATION', type: 'STRING' },
+            { name: 'RISK_CLASS', type: 'STRING' },
+        ],
+    },
+    'reac': {
+        fields: [
+            { name: 'ID', type: 'STRING' },
+            { name: 'TIMES_REPORTED', type: 'INT64' },
+            { name: 'ADVERSE_EVENT_CODE', type: 'INT64' },
+            { name: 'ADVERSE_EVENT_NAME', type: 'STRING' },
+            { name: 'OUTCOME', type: 'STRING' },
+            { name: 'DATE_ADVERSE_EVENT_RECURRENCED', type: 'STRING' },
+        ],
+    },
+};
 
 // Entrypoint
 const functions = require('@google-cloud/functions-framework');
@@ -50,33 +105,16 @@ functions.cloudEvent('uploadCsvToBigQuery', async (cloudEvent) => {
         console.log('not csv');
         return;
     }
-    if (!fileName.startsWith('demo')) {
-        console.log("not 'demo' file");
-        return;
-    }
 
     const datasetId = 'jader';
     const tableId = fileName.replace(/[0-9]{6}\.csv/, '');
 
     const jobConfig = {
         sourceFormat: 'CSV',
-        schema: {
-            fields: [
-                { name: 'ID', type: 'STRING' },
-                { name: 'TIMES_REPORTED', type: 'STRING' },
-                { name: 'SEX', type: 'STRING' },
-                { name: 'AGE', type: 'STRING' },
-                { name: 'WEIGHT', type: 'STRING' },
-                { name: 'HEIGHT', type: 'STRING' },
-                { name: 'FYEAR_QUARTER_REPORTED', type: 'STRING' },
-                { name: 'SURVEY_STATUS', type: 'STRING' },
-                { name: 'REPORT_CATEGORY', type: 'STRING' },
-                { name: 'REPORTER_LICENSE', type: 'STRING' },
-                { name: 'E2B', type: 'STRING' }
-            ],
-        },
+        schema: datasetSchemas[fileName.replace(/[0-9]{6}\.csv/, '')],
         skipLeadingRows: 1,
         autodetect: false,
+        quote: '"',
     };
 
     try {

'drug' データはだいぶクリーニングが必要で、ロード時にカラム数不整合のエラーが多数出た。

少なくとも column1,"column2a,column2b",column3 という形式のエントリがあったためquote = '"' を追加したが、まだ他の問題もあるようだった。

方針が悪いので諦める

この方法だと Cloud Functions へのデプロイと Cloud Storage への CSV アップロードを繰り返す必要があるため、いったん中断することにした。

振り返ると、この取り組みの失敗は手順の設計ミスから生じていたと思う。
データクリーニングの方針が見えないうちにクラウドネイティブなアーキテクチャで作業を始めてしまったところにそもそもの間違いがあった。

といいつつも、正しくは BigQuery を「選定」したわけではなく、「使ったことがないので使ってみよう」駆動だったのでまぁ仕方がないと思っている。

Node.js を使ったのも「もうちょっと書き慣れよう」程度の動機だったのだが、やはり少なくとも初手はデータラングリングに向いた言語のほうがよかっただろう。

リベンジするなら

少なくとも一次検証には、ローカルで完結するアーキテクチャを選定する。

クラウド化はその後。

0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?