golangでGoogle Cloud StorageへのUploadからBigQueryへLoadする
GCP認証
func Auth() error {
// 環境変数にセットしたGoogleApplicationCredentialsを指定
fn := os.Getenv("GOOGLE_APPLICATION_CREDENTIALS")
b, err := ioutil.ReadFile(fn)
if err != nil {
return err
}
_, err = google.JWTConfigFromJSON(b)
if err != nil {
return err
}
return nil
}
CloudStorege Upload
func GcsPut(bktName string, objName string, path string) error {
err := Auth()
if err != nil {
return err
}
ctx := context.Background()
// Creates a client.
client, err := storage.NewClient(ctx)
if err != nil {
return err
}
defer client.Close()
w := client.Bucket(bktName).Object(objName).NewWriter(ctx)
// option
w.ObjectAttrs.ContentType = "application/octet-stream"
w.ChunkSize = 1024
defer w.Close()
f, err := os.Open(path)
if err != nil {
return err
}
defer f.Close()
if _, err = io.Copy(w, f); err != nil {
log.Fatal(err)
}
return nil
}
ContentType | ChunkSize |
---|---|
型:String ContentTypeを指定します。 |
型:Int 指定したサイズを上限にオブジェクトを分割してアップロードする指定。 1回のリクエストでアップしたい場合はCunkSizeを指定。 上限を指定しない場合は0または無指定。 |
オプションの詳細はこちら |
BigQuery Load
func BigQueryLoad(config Config, gcspath string, tableid string) error {
err := Auth()
if err != nil {
return err
}
projectid := config.BigQuery.Projectid
gcsuri := config.BigQuery.Gcsuri + gcspath
datasetid := config.BigQuery.Datasetid
ctx := context.Background()
// Creates a client.
client, err := bigquery.NewClient(ctx, projectid)
if err != nil {
return err
}
gcsRef := bigquery.NewGCSReference(gcsuri)
// option
gcsRef.AllowJaggedRows = true
gcsRef.Compression = bigquery.Gzip
gcsRef.MaxBadRecords = 1000
gcsRef.FieldDelimiter = "n"
schema := bigquery.Schema{
&bigquery.FieldSchema{Name: "col1", Required: true, Type: bigquery.IntegerFieldType},
&bigquery.FieldSchema{Name: "col2", Required: false, Type: bigquery.StringFieldType},
&bigquery.FieldSchema{Name: "col3", Required: true, Type: bigquery.StringFieldType},
}
dataset := client.Dataset(datasetid)
err = dataset.Table(tableid).Create(ctx, &bigquery.TableMetadata{
Schema: schema,
})
if err != nil {
return err
}
loader := dataset.Table(tableid).LoaderFrom(gcsRef)
loader.CreateDisposition = bigquery.CreateNever
job, err := loader.Run(ctx)
if err != nil {
return err
}
status, err := job.Wait(ctx)
if err != nil {
return err
}
if err := status.Err(); err != nil {
return err
}
return nil
}
AllowJaggedRows | MaxBadRecords | Compression | FileDelimiter |
---|---|---|---|
型:bool 不正なレコードがあった場合、NULLとして扱うかエラーとして扱うかを選択します。 |
型:Int 不正なレコード数を許容するレコード数を指定します。 |
型:String Google Cloud StorageにuploadされたLoadするObjectの圧縮タイプを指定します。 |
型:String Loadされるファイルのdelimiterを指定します。 |
オプションの詳細はこちら | |||
圧縮されたファイルをLoadをする場合はCompression、CSV形式ではないファイルをLoadする場合はFileDelimiterを指定しましょう。 |