本記事はGCP(Google Cloud Platform) Advent Calendar 2022 7日目の記事です。
概要
Shift-JISのCSVファイルをBigQueryにアップロードする場合、UTF-8に変換する必要があります。毎回ローカルで文字コード変換していたのですが、めんどくさくなってきたのでCloud Functionsを使って自動化しようと思います。調べると同じような事に課題を感じている人は多くやり方を記述されたブログも見つけましたが自分としてはサービスアカウントなどの設定で結構詰まったのでその辺も調べて書きました。
今回作るもの
- Google StrageバケットにアップロードされたShift-JISのファイルをUTF-8に変換して、別のバケットに保存するもの。
- トリガーはGoogle Strageにファイルがアップロードされた時とする。
今回対象としないこと
- 対象ファイルが大容量の場合
- Cloud Functionsのメモリは最大32GBなので、それで足りない場合は対象外とします
- (個人的にやってみた感じでは10GB程度までなら、メモリ的にもタイムアウト時間的にもギリ大丈夫そうです。)
手順
01.Cloud Storageの作成
まず文字コード変換対象をアップロードするバケットと変換されたファイルが保存されるバケットをそれぞれ作成します。ロケーションはasia-northeast1(東京)
とします。
- Cloud Storageにアクセスする。
- 「+作成」をクリックする。
- 以下の項目を設定して作成をクリックする。
- 名前:
<バケット名_入力元>
- ロケーションタイプ:
Region, asia-northeast1(東京)
- 他の項目はデフォルトの設定を仕様。
- 名前:
- 同様に
<バケット名_出力先>
のバケットも作成します。
02.ソースコードの作成
コードはpython3.10で記述します。ソースコード全文は記事の最下部に記載しています。
文字コード変換
Cloud Functionsではiconv
が使用できるため、それを使用します。
import os
def convert_cp932_to_utf8(src_cp932_file, dst_utf8_file):
cmd = f'iconv -f cp932 -t utf8 {src_cp932_file} > {dst_utf8_file}'
os.system(cmd)
ファイルのダウンロードとアップロード
Cloud Storage からファイルをダウンロードする関数と、ファイルをアップロードする関数は公式ドキュメントのサンプルをそのまま使用します。
- 変換前のデータをダウンロードする関数:(download_blob)
- 変換後のデータをアップロードする関数:(upload_blob)
イベント処理
指定したCloud Storage のバケットへファイルがアップロードされたというイベント情報を受け取ったら、データをダウンロードして文字コード変換したファイルをまた別の指定したバケットへアップロードするコードを作成します。
import functions_framework
@functions_framework.cloud_event
def storage_iconv(cloud_event):
data = cloud_event.data
event_id = cloud_event["id"]
src_bucket_name = data["bucket"]
dst_bucket_name = '<出力先のバケット名>'
target_blob_name = data["name"]
tmp_cp932 = 'tmp_cp932.csv'
tmp_utf8 = 'tmp_utf8.csv'
download_blob(src_bucket_name, target_blob_name, tmp_cp932)
convert_cp932_to_utf8(tmp_cp932, tmp_utf8)
upload_blob(dst_bucket_name, tmp_utf8, target_blob_name)
03.デプロイ
ここでは作成したコードをデプロイする手順を記述します。
- Cloud Functionsにアクセスする。
- 「+関数を作成」をクリックして、以下の項目を入力する。
- 環境:第2世代
- 関数名:
convert-cp932-to-utf8
(なんでもいいです) - リージョン:
asia-northeast1
- 割り当てられるメモリ:4GB
- タイムアウト:540秒
- 「EVENTARCトリガーを追加」 をクリック
- トリガータイプ:自社
- イベントプロバイダ:
Cloud Storage
- イベント:
google.cloud.storage.object.v1.finalized
- バケット:
<変換元ファイルを保存するバケット名>
- サービスアカウント:「CREATE NEW SERVICE ACCOUNT」をクリックして新しいサービスアカウントを作成します。ロールは「Cloud Functions サービスエージェント、Eventarcイベント受信者、Storageオブジェクト管理者」を付与します。
- 「トリガーを保存」をクリックする。
- 「次へ」をクリックする。
- ランタイムに「Python3.10」を選択して、作成したソースコードをコピペする。
- requestments.txtには「functions-framework==3.*」と「google-cloud-storage==2.6.0」を記述すること。
- 「デプロイ」をクリックする。
- 以上
サービスアカウントについて
個人的に、サービスアカウント周りで結構つまずいたので詳しくメモしておきます。
今回付与するロールは以下の3つです。
-
Cloud Functions サービスエージェント
- とりあえず、Cloud Functionsを使う場合は必要なようです。
- サービスエージェントというのは、サービスがリソースにアクセスできるようにするためのGoogleのマネージドなサービスアカウントのこと。( 参考:サービスエージェント)
-
Eventarc イベント受信者
- 今回のようにCloud Storageにデータがアップロードされたら実行する、イベントをトリガーとする場合に必要なようです。
- 一度このロールを付けずにデプロイしようとしたら、「
eventarc.events.receiveEvent
権限がない」とエラーがでたため、調べてみたらこのページにたどり着き「Eventarc イベント受信者」ロールを付与させれば良いと分かりました。
-
Storage オブジェクト管理者
- 今回はCloud Storageからのファイルのダウンロードとアップロードを行うため「Sotrage オブジェクト管理者」のロールを付与します。
- オブジェクト管理者は、オブジェクトの読み取り、編集、作成ができるようです。
使ってみる
まずは約1GBの容量を持つShift-JISのCSVファイルを作成します。何でも良いのですが、ひたすら同じ住所が記述されたcsvファイルを作成しました。ちなみにこのcsvファイルの容量は1.1GBです。
import numpy as np
import pandas as pd
n = 30_000_000
pd.DataFrame({
'id':np.arange(n),
'郵便番号':'100-0001',
'住所':'東京都千代田区千代田'
}).to_csv('address.csv', encoding='cp932', index=None)
このファイルを入力元のバケットにアップロードします。しばらくすると、出力先のバケットに文字コードが変換されたファイルが保存されていることを確認できます。このときのメモリ使用率を確認すると2.8GB程度使用されていました。
まとめ
というわけで、CloudStorageに保存したファイルをCloud Functionsを使い自動的に文字コード変換することができました。
今回Cloud Functionsを初めて使ってましたがコード作成、権限設定、デプロイなどの中で権限周りの設定で結構つまづきました。
また現在、数10GBの容量を持つShift-JISのCSVファイルが大量にあるため、それをうまくUTF-8へ変換する方法もないかな~とか考えています。
いい方法を思いついたらまとめてみようと思います。
参考
Cloud Storage のチュートリアル(第 2 世代)
BigQueryロードする前の文字コード変換
ソースコード
main.py
import os
import functions_framework
from google.cloud import storage
def convert_cp932_to_utf8(src_cp932_file, dst_utf8_file):
cmd = f'iconv -f cp932 -t utf8 {src_cp932_file} > {dst_utf8_file}'
res = os.system(cmd)
print(res)
def download_blob(bucket_name, source_blob_name, destination_file_name):
storage_client = storage.Client()
bucket = storage_client.bucket(bucket_name)
blob = bucket.blob(source_blob_name)
blob.download_to_filename(destination_file_name)
print(
"Downloaded storage object {} from bucket {} to local file {}.".format(
source_blob_name, bucket_name, destination_file_name
)
)
def upload_blob(bucket_name, source_file_name, destination_blob_name):
storage_client = storage.Client()
bucket = storage_client.bucket(bucket_name)
blob = bucket.blob(destination_blob_name)
blob.upload_from_filename(source_file_name)
print(
f"File {source_file_name} uploaded to {destination_blob_name}."
)
@functions_framework.cloud_event
def storage_iconv(cloud_event):
data = cloud_event.data
event_id = cloud_event["id"]
src_bucket_name = data["bucket"]
dst_bucket_name = '<出力先のバケット名>'
target_blob_name = data["name"]
tmp_cp932 = 'tmp_cp932.csv'
tmp_utf8 = 'tmp_utf8.csv'
print('start download')
download_blob(src_bucket_name, target_blob_name, tmp_cp932)
print('start iconv')
convert_cp932_to_utf8(tmp_cp932, tmp_utf8)
print('start upload')
upload_blob(dst_bucket_name, tmp_utf8, target_blob_name)
requestments.txt
functions-framework==3.*
google-cloud-storage==2.6.0