業務上GlueとS3の連携がよくやっていますので、連携する方法をメモしました。
AWS GlueでS3に保存しているParquetファイルの読み取りと書き込み
from awsglue.context import GlueContext
from awsglue.dynamicframe import DynamicFrame
from pyspark import SparkContext
glueContext = GlueContext(SparkContext.getOrCreate())
# ファイルの読み取り
readPath = "s3://bucket/path/file"
inputDF = glueContext.create_dynamic_frame_from_options(
connection_type = "s3",
connection_options = {"paths": [readPath]},
format="parquet"
)
type(inputDF)
# ...データ処理...
# S3にファイルのアップロード
writePath = "s3://bucker/path2"
inputDF.repartition(1).write.option("parquet.block.size", 128 * 1024 * 1024)
.mode('overwrite')
.parquet(writePath)
S3へのアップロードができましたら、writePathの下にParguetファイルが配置されます。
AWS Glueでカタログのデータの読み込み
AWS Glueにあるカタログのデータを読み込む。
from pyspark.context import SparkContext
from awsglue.context import GlueContext
glueContext = GlueContext(SparkContext.getOrCreate())
floorDF = glueContext.create_dynamic_frame.from_catalog(
database="test_database",
table_name="floor")
print(floorDF.printSchema())
参考:
DynamicFrameWriter クラス - AWS Glue
boto3でS3との連携
S3にファイルのアップロード。
import boto3
s3 = boto3.resource(
service_name='s3'
)
bucket = "bucket"
localFile = "./tmp/upload.txt"
targetDirectory = "path/filename.text"
s3.Bucket(bucket).upload_file(
localFile,
targetDirectory
)
S3のファイルを読み取る。
import boto3
import io
import csv
def get_s3file(bucket_name, key):
s3 = boto3.resource('s3')
s3obj = s3.Object(bucket_name, key).get()
return io.TextIOWrapper(io.BytesIO(s3obj['Body'].read()))
for rec in csv.DictReader(get_s3file('test', 'file/id=310/a.csv')):
print(rec)
PandasでS3との連携
PandasでS3のCSVデータを読み取る。
import pandas as pd
df = pd.read_csv('s3n://dev/test.csv')
PandasからS3にデータをアップロードする
import pandas as pd
import s3fs
key = "your-aws-access-key"
secret = "your-aws-secret-access-key"
outpath='s3://test'
bytes_to_write = df.to_csv(None, index=False).encode()
fs = s3fs.S3FileSystem(key=key, secret=secret)
with fs.open(outpath, 'wb') as f:
f.write(bytes_to_write)
S3のデータをpyarrow.parquet テーブルとして読み込む。
import pyarrow.parquet as pq
import s3fs
fs = s3fs.S3FileSystem()
dataset = pq.ParquetDataset('s3://test/20190401_100.pq', filesystem=fs)
table = dataset.read()
SparkSQL DynamicFrame, Apache Spark DataFrame, Pandas DataFrameの変換
GlueContextのcreate_dynamic_frame_from_rdd
, create_dynamic_frame_from_catalog
, create_dynamic_frame_from_options
関数で作成したDynamicFrameをApache Spark DataFrameやPandas DataFrameに変換する方法。
DynamicFrame <-> Apache Spark DataFrame
DynamicFrame.toDF() -> Apache Spark DataFrame
DynamicFrame.from(Apache Spark DataFrame, GlueContext, tableName) -> DynamicFrame
Apache Spark DataFrame <-> Pandas DataFrame
Apache Spark DataFrame.toPandas() -> Pandas DataFrame
GlueContext.createDataFrame(Pandas DataFrame) -> Apache Spark DataFrame.toPandas()