LoginSignup
1
1

More than 3 years have passed since last update.

量が多いAmazon S3のファイルをローカルのDBに読み込む

Last updated at Posted at 2020-08-14

前回、Treasure DataからAmazon S3にデータをバルクエクスポートした。

ファイルが分割、圧縮されてエクスポートされているため、

  1. ダウンロードして
  2. 解凍して
  3. MySQLにインポートして
  4. ダウンロードしたファイルを削除する

スクリプトを書いてみた。

※ 事前にデータベース、テーブルは作成しておくこと
load data infile が実行出来るMySQL設定となっていること

// Groovy Version: 2.5.0 JVM: 1.8.0_181 Vendor: Azul Systems, Inc. OS: Mac OS X
@GrabConfig(systemClassLoader = true)
@Grab('mysql:mysql-connector-java:5.1.31')
@Grab('com.amazonaws:aws-java-sdk-s3:1.11.839')
import com.amazonaws.regions.Regions
import com.amazonaws.services.s3.AmazonS3
import com.amazonaws.services.s3.AmazonS3ClientBuilder
import groovy.sql.Sql
import groovyx.gpars.GParsPool

import java.nio.file.Files
import java.nio.file.StandardCopyOption


AmazonS3 s3Client = AmazonS3ClientBuilder.standard()
        .withRegion(Regions.US_EAST_1)
        .build()

// 適宜パラメータ化
def db = Sql.newInstance("jdbc:mysql://localhost/database_name?useLegacyDatetimeCode=false", "username", "password", "com.mysql.jdbc.Driver")


GParsPool.withPool {
    // 出力されたファイル数分に調整すること
    (0..100).eachParallel {
        def fileNumber = String.format('%03d', it)
        log(fileNumber, "Start")

        s3Client.getObject('bucketName', "table_name/export.$fileNumber.0000.tsv.gz").getObjectContent().withCloseable { objectContent ->

            File file = File.createTempFile('table_name-', '.tsv.gz')
            def downloadPath = file.absolutePath

            def tsvfilePath = downloadPath.replaceAll('\\.gz$', '')
            try {
                log(fileNumber, "Download to $downloadPath")
                Files.copy(objectContent, file.toPath(), StandardCopyOption.REPLACE_EXISTING)

                log(fileNumber, "Decompress")
                "unpigz $downloadPath".execute().waitFor() // parallel gunzip

                log(fileNumber, "Import")
                db.execute("load data infile '" + tsvfilePath + "' into table table_name")

                log(fileNumber, "Done")
            } finally {
                file.delete() // 解凍されてファイル名が変わっているため、実質何も起こらないが念のため削除
                new File(tsvfilePath).delete()
            }
        }
    }
}

def log(String fileNumber, String s) {
    println "$fileNumber: $s"
}
実行ログ例
001: Start
000: Start
002: Start
001: Download to /var/folders/ss/hw79tp5d1v3bwgvs_rnjk7jm0000gn/T/table_name-3124187279161512834.tsv.gz
002: Download to /var/folders/ss/hw79tp5d1v3bwgvs_rnjk7jm0000gn/T/table_name-556558218486495982.tsv.gz
000: Download to /var/folders/ss/hw79tp5d1v3bwgvs_rnjk7jm0000gn/T/table_name-3422206227941372411.tsv.gz
001: Decompress
001: Import
002: Decompress
002: Import
001: Done
000: Decompress
000: Import
002: Done
000: Done

参考

1
1
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
1
1