5
4

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 3 years have passed since last update.

AWS GlueでS3との連携処理

Last updated at Posted at 2020-10-15

業務上GlueとS3の連携がよくやっていますので、連携する方法をメモしました。

AWS GlueでS3に保存しているParquetファイルの読み取りと書き込み


from awsglue.context import GlueContext
from awsglue.dynamicframe import DynamicFrame
from pyspark import SparkContext

glueContext = GlueContext(SparkContext.getOrCreate()) 

# ファイルの読み取り
readPath = "s3://bucket/path/file"
inputDF = glueContext.create_dynamic_frame_from_options(
    connection_type = "s3", 
    connection_options = {"paths": [readPath]}, 
    format="parquet"
)

type(inputDF)

# ...データ処理...

# S3にファイルのアップロード
writePath = "s3://bucker/path2"
inputDF.repartition(1).write.option("parquet.block.size", 128 * 1024 * 1024)
    .mode('overwrite')
    .parquet(writePath)

S3へのアップロードができましたら、writePathの下にParguetファイルが配置されます。

AWS Glueでカタログのデータの読み込み

AWS Glueにあるカタログのデータを読み込む。


from pyspark.context import SparkContext
from awsglue.context import GlueContext

glueContext = GlueContext(SparkContext.getOrCreate())

floorDF = glueContext.create_dynamic_frame.from_catalog(
             database="test_database",
             table_name="floor")

print(floorDF.printSchema())

参考:
DynamicFrameWriter クラス - AWS Glue

boto3でS3との連携

S3にファイルのアップロード。

import boto3

s3 = boto3.resource(
    service_name='s3'
)
bucket = "bucket"
localFile = "./tmp/upload.txt"
targetDirectory = "path/filename.text"
s3.Bucket(bucket).upload_file(
    localFile, 
    targetDirectory
)

S3のファイルを読み取る。

import boto3
import io
import csv

def get_s3file(bucket_name, key):
    s3 = boto3.resource('s3')
    s3obj = s3.Object(bucket_name, key).get()

    return io.TextIOWrapper(io.BytesIO(s3obj['Body'].read()))


for rec in csv.DictReader(get_s3file('test', 'file/id=310/a.csv')):
    print(rec)

PandasでS3との連携

PandasでS3のCSVデータを読み取る。

import pandas as pd

df = pd.read_csv('s3n://dev/test.csv')

PandasからS3にデータをアップロードする

import pandas as pd
import s3fs

key = "your-aws-access-key"
secret = "your-aws-secret-access-key"
outpath='s3://test'
bytes_to_write = df.to_csv(None, index=False).encode()
fs = s3fs.S3FileSystem(key=key, secret=secret)
with fs.open(outpath, 'wb') as f:
  f.write(bytes_to_write)

S3のデータをpyarrow.parquet テーブルとして読み込む。

import pyarrow.parquet as pq
import s3fs

fs = s3fs.S3FileSystem()
dataset = pq.ParquetDataset('s3://test/20190401_100.pq', filesystem=fs)
table = dataset.read()

SparkSQL DynamicFrame, Apache Spark DataFrame, Pandas DataFrameの変換

GlueContextのcreate_dynamic_frame_from_rdd, create_dynamic_frame_from_catalog, create_dynamic_frame_from_options関数で作成したDynamicFrameApache Spark DataFramePandas DataFrameに変換する方法。

DynamicFrame <-> Apache Spark DataFrame

DynamicFrame.toDF() -> Apache Spark DataFrame

DynamicFrame.from(Apache Spark DataFrame, GlueContext, tableName) -> DynamicFrame

Apache Spark DataFrame <-> Pandas DataFrame

Apache Spark DataFrame.toPandas() -> Pandas DataFrame
GlueContext.createDataFrame(Pandas DataFrame) -> Apache Spark DataFrame.toPandas()

5
4
1

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
5
4

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?