1
3

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 5 years have passed since last update.

golangでGoogle Cloud StorageへのUploadからBigQueryへLoadする

Posted at

golangでGoogle Cloud StorageへのUploadからBigQueryへLoadする

GCP認証

func Auth() error {
	// 環境変数にセットしたGoogleApplicationCredentialsを指定
	fn := os.Getenv("GOOGLE_APPLICATION_CREDENTIALS")
	b, err := ioutil.ReadFile(fn)
	if err != nil {
		return err
	}
	_, err = google.JWTConfigFromJSON(b)
	if err != nil {
		return err
	}
	return nil
}

CloudStorege Upload

func GcsPut(bktName string, objName string, path string) error {

	err := Auth()
	if err != nil {
		return err
	}

	ctx := context.Background()

	// Creates a client.
	client, err := storage.NewClient(ctx)
	if err != nil {
		return err
	}
	defer client.Close()

	w := client.Bucket(bktName).Object(objName).NewWriter(ctx)
	// option
	w.ObjectAttrs.ContentType = "application/octet-stream"
	w.ChunkSize = 1024

	defer w.Close()

	f, err := os.Open(path)
	if err != nil {
		return err
	}

	defer f.Close()
	if _, err = io.Copy(w, f); err != nil {
		log.Fatal(err)
	}

	return nil
}
ContentType ChunkSize
型:String
ContentTypeを指定します。
型:Int
指定したサイズを上限にオブジェクトを分割してアップロードする指定。
1回のリクエストでアップしたい場合はCunkSizeを指定。
上限を指定しない場合は0または無指定。
オプションの詳細はこちら

BigQuery Load

func BigQueryLoad(config Config, gcspath string, tableid string) error {

	err := Auth()
	if err != nil {
		return err
	}

	projectid := config.BigQuery.Projectid
	gcsuri := config.BigQuery.Gcsuri + gcspath
	datasetid := config.BigQuery.Datasetid

	ctx := context.Background()

	// Creates a client.
	client, err := bigquery.NewClient(ctx, projectid)
	if err != nil {
		return err
	}

	gcsRef := bigquery.NewGCSReference(gcsuri)

	// option
	gcsRef.AllowJaggedRows = true
	gcsRef.Compression = bigquery.Gzip
	gcsRef.MaxBadRecords = 1000
	gcsRef.FieldDelimiter = "n"

	schema := bigquery.Schema{
		&bigquery.FieldSchema{Name: "col1", Required: true, Type: bigquery.IntegerFieldType},
		&bigquery.FieldSchema{Name: "col2", Required: false, Type: bigquery.StringFieldType},
		&bigquery.FieldSchema{Name: "col3", Required: true, Type: bigquery.StringFieldType},
	}

	dataset := client.Dataset(datasetid)

	err = dataset.Table(tableid).Create(ctx, &bigquery.TableMetadata{
		Schema: schema,
	})
	if err != nil {
		return err
	}

	loader := dataset.Table(tableid).LoaderFrom(gcsRef)
	loader.CreateDisposition = bigquery.CreateNever

	job, err := loader.Run(ctx)
	if err != nil {
		return err
	}
	status, err := job.Wait(ctx)
	if err != nil {
		return err
	}
	if err := status.Err(); err != nil {
		return err
	}
	return nil
}
AllowJaggedRows MaxBadRecords Compression FileDelimiter
型:bool
不正なレコードがあった場合、NULLとして扱うかエラーとして扱うかを選択します。
型:Int
不正なレコード数を許容するレコード数を指定します。
型:String
Google Cloud StorageにuploadされたLoadするObjectの圧縮タイプを指定します。
型:String
Loadされるファイルのdelimiterを指定します。
オプションの詳細はこちら
圧縮されたファイルをLoadをする場合はCompression、CSV形式ではないファイルをLoadする場合はFileDelimiterを指定しましょう。
1
3
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
1
3

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?