LoginSignup
1
3

More than 5 years have passed since last update.

golangでGoogle Cloud StorageへのUploadからBigQueryへLoadする

Posted at

golangでGoogle Cloud StorageへのUploadからBigQueryへLoadする

GCP認証

func Auth() error {
    // 環境変数にセットしたGoogleApplicationCredentialsを指定
    fn := os.Getenv("GOOGLE_APPLICATION_CREDENTIALS")
    b, err := ioutil.ReadFile(fn)
    if err != nil {
        return err
    }
    _, err = google.JWTConfigFromJSON(b)
    if err != nil {
        return err
    }
    return nil
}

CloudStorege Upload

func GcsPut(bktName string, objName string, path string) error {

    err := Auth()
    if err != nil {
        return err
    }

    ctx := context.Background()

    // Creates a client.
    client, err := storage.NewClient(ctx)
    if err != nil {
        return err
    }
    defer client.Close()

    w := client.Bucket(bktName).Object(objName).NewWriter(ctx)
    // option
    w.ObjectAttrs.ContentType = "application/octet-stream"
    w.ChunkSize = 1024

    defer w.Close()

    f, err := os.Open(path)
    if err != nil {
        return err
    }

    defer f.Close()
    if _, err = io.Copy(w, f); err != nil {
        log.Fatal(err)
    }

    return nil
}
ContentType ChunkSize
型:String
ContentTypeを指定します。
型:Int
指定したサイズを上限にオブジェクトを分割してアップロードする指定。
1回のリクエストでアップしたい場合はCunkSizeを指定。
上限を指定しない場合は0または無指定。

オプションの詳細はこちら

BigQuery Load

func BigQueryLoad(config Config, gcspath string, tableid string) error {

    err := Auth()
    if err != nil {
        return err
    }

    projectid := config.BigQuery.Projectid
    gcsuri := config.BigQuery.Gcsuri + gcspath
    datasetid := config.BigQuery.Datasetid

    ctx := context.Background()

    // Creates a client.
    client, err := bigquery.NewClient(ctx, projectid)
    if err != nil {
        return err
    }

    gcsRef := bigquery.NewGCSReference(gcsuri)

    // option
    gcsRef.AllowJaggedRows = true
    gcsRef.Compression = bigquery.Gzip
    gcsRef.MaxBadRecords = 1000
    gcsRef.FieldDelimiter = "n"

    schema := bigquery.Schema{
        &bigquery.FieldSchema{Name: "col1", Required: true, Type: bigquery.IntegerFieldType},
        &bigquery.FieldSchema{Name: "col2", Required: false, Type: bigquery.StringFieldType},
        &bigquery.FieldSchema{Name: "col3", Required: true, Type: bigquery.StringFieldType},
    }

    dataset := client.Dataset(datasetid)

    err = dataset.Table(tableid).Create(ctx, &bigquery.TableMetadata{
        Schema: schema,
    })
    if err != nil {
        return err
    }

    loader := dataset.Table(tableid).LoaderFrom(gcsRef)
    loader.CreateDisposition = bigquery.CreateNever

    job, err := loader.Run(ctx)
    if err != nil {
        return err
    }
    status, err := job.Wait(ctx)
    if err != nil {
        return err
    }
    if err := status.Err(); err != nil {
        return err
    }
    return nil
}
AllowJaggedRows MaxBadRecords Compression FileDelimiter
型:bool
不正なレコードがあった場合、NULLとして扱うかエラーとして扱うかを選択します。
型:Int
不正なレコード数を許容するレコード数を指定します。
型:String
Google Cloud StorageにuploadされたLoadするObjectの圧縮タイプを指定します。
型:String
Loadされるファイルのdelimiterを指定します。

オプションの詳細はこちら
圧縮されたファイルをLoadをする場合はCompression、CSV形式ではないファイルをLoadする場合はFileDelimiterを指定しましょう。

1
3
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
1
3