4
5

More than 1 year has passed since last update.

【Python/AWS】第4回 AWS Glueを使ってS3のデータを成形する【データ分析】

Last updated at Posted at 2022-10-14

1. はじめに

本記事は【Python/AWS】の第4回として前回に引き続き、
AWSのサービスを利用した一連のデータ分析の続きとなります。

▼前回の記事はこちら▼

私自身、AWSのサービスを触るのは初めてだったので、
細かい説明など不足する点はあるかと思いますが、
大まかな流れをこちらで解説していきたいと思います。

本分析の全体像は以下のようになります。
AWSアーキテクチャ.PNG
全体の分析目的は以前の分析から継続/発展して、
「Splatoon3」におけるコンテンツのユーザー満足度調査 です。

収集したデータから項目ごとに感情分析を行い、
ポジティブ/ネガティブの値を可視化することが最終目的です。

▼以前の記事はこちら▼

1-1. 本記事の概要と目標

今回のテーマは 「AWS Glueによるデータの加工」 ということで、
前回取り扱った感情分析を含んだ内容となっています。

初回、第2回ではデータの収集などにLambdaを使いましたが、
今回はとある理由から "AWS Glue" を使っていきます。

前回に引き続き、今回作成するパートのイメージはこのような形です。
Glueアーキテクチャ.png
使用するサービスとそれぞれの役割は以下の通りです。
Amazon S3: 様々な形式のデータを保管できるストレージ
Amazon Comprehend: テキストから洞察を見つける自然言語処理サービス
AWS Glue: データの検出/結合を簡単に行えるサーバーレスデータ統合サービス

なお本記事ではAWSの登録方法や初期設定、
Glue以外のサービス詳細については取り上げていません。
必要に応じて前回以前の記事、または以下を参考にして下さい。

1-2. 全体の流れ

自動実行の順番、流れとしては以下の通りです。
1.Glueのトリガーによる朝10時に実行
2.S3からJSONファイルをCSVとして読み込む
3.ツイートを感情分析して値を追加
4.新しいデータフレームをCSVとしてS3に出力

2. AWS Glueについて

2-1. AWS Glueとは

AWS GlueはAmazonが提供するサーバーレスで
データを検出、準備、移動、統合できるサービスです。

今回は様々ある機能の中から、 AWS Glue Studio という
機能を使って多量のデータの加工をしていきます。

今回は使用しませんが、保管している多種多様なデータを
効率的に管理するような機能も持っているようです。

▼AWS Glueの公式紹介▼

2-2. Lambdaとの違い

下の参考ページのリンク先でも解説されていますが、
Lambdaとの違いの一つとして稼働時間があります。

Lambdaの稼働限界が15分であるのに対して、
Glueは48時間という長い時間稼働することが可能 です。

今回行う作業は1万を超えるツイートのデータに対して、
データ読み込み、加工、CSVの出力を行うため、
Lambdaでは稼働時間を超えてエラーが出てしまいました。

性能やできることに関して大きな差はないようですが、
・時間のかかる作業は膨大な作業はGlue
・すぐに終わる軽量な作業はLambda

といった認識で良さそうです。

▼Lambdaとの違いに関する参考ページ▼

2-3. Glue使用の準備

上記のリンクからGlueのホーム画面に移動したら、
左側のメニューから [Jobs] を選択します。
Glue HOME.png
画像のように [Python Shell script editor] を選択し、
今回は [Create a new Script] の方を選んでいきます。
(初回同様にpyファイルを準備する場合は下を選ぶ)
Glue jobs.png
直接打ち込む場合は画像の赤枠に書きます。
こまめに右上の [Save] をしておきます。
scriptを打つ.png
[Job details] で詳細を設定します。
名前、説明、使用するロール、バージョンを選択します。
detail.png
今回使用するGlueのロールには少なくとも
以下の2つの権限が必要になるので付与しておきます。
権限の追加についてはこちらで解説しています。
for Glue Role.PNG

2-3. パッケージの使用

スクリプト内でパッケージを使用する際に、
外部から準備する方法もLambdaとは違いがあります。

今回はGlueで元から準備されているもの以外の
パッケージも使用するため、その準備をしていきます。

[Job details] ページの下部にある
[Advanced properties] を開きます。
property.PNG
さらにページ下部の [Add new parameter] を開きます。
パラメーター.PNG
[Job parameters]"Key"「--additional-python-modules」
"Value"「datetime,neologdn」 のようにカンマ区切りで
importしたいパッケージを入力しておきます。
add_package.PNG
Glueで元から使用できるパッケージについては、
以下の参考ページで一覧を確認することができます。

▼パッケージ仕様に関する参考ページ▼

3. 実装

大まかなと説明の流れとして、
1.データの準備
2.データ取得/データフレーム作成
3.データの加工
4.感情分析の値を追加
5.csvでデータを出力

の5段階に分けて確認をしていきます。

以下がコードの全体像です。

json_to_csv.py
import json
import boto3
import pandas
import datetime
import re
import neologdn

#--------------------------------------準備--------------------------------------

#クライアント準備
s3_client = boto3.client("s3", "ap-northeast-1")

#テキストクレンジングの関数
def cleansing_text(text):
    tmp = re.sub(r"#[^\s]*", "", text, 10)
    tmp = re.sub(r"52", "ごーにー", tmp)
    tmp = re.sub(r"96", "きゅーろく", tmp)
    tmp = neologdn.normalize(tmp.lower())
    cleansed_text = re.sub(r"[^ぁ-んァ-ン一-龥ー]+", "", tmp)
    return cleansed_text

#テキスト内のスプラワード(検索ワード)の個数をカウントする関数
def count_key_word(text):
    num = 0
    for word in words:
        if word in text:
            num += 1
    return num

#"created_at"を日本時間に変更する関数
def UTC_to_JTS(date):
    delta_9 = datetime.timedelta(hours=9)
    str_UTC = re.sub("T", " ", date[:19])
    JTS = datetime.datetime.strptime(str_UTC, "%Y-%m-%d %H:%M:%S") + delta_9
    str_JTS = datetime.datetime.strftime(JTS, "%Y-%m-%d %H:%M:%S")
    return str_JTS

#ツイート検索の際にキーワードを格納したjsonファイルを読み込み
words_response = s3_client.get_object(Key="tweets_data_json/search_words.json", Bucket="for-twitter-api-bucket")
words_body = words_response["Body"].read()
search_words = json.loads(words_body.decode())["words"]

#「前日の日付文字列」+「ファイル名」をkeyに指定
t_delta = datetime.timedelta(hours=15)
yesterday = str(datetime.datetime.now() - t_delta)[:10]
wepsys = "tweets_data_json/" + yesterday + "_wepsys.json"
ssgoth = "tweets_data_json/" + yesterday + "_ssgoth.json"
bucket = "for-twitter-api-bucket"
#for文用にリストへ格納
key_list = [wepsys, ssgoth]

#-------------------------------------jsonデータをdataframeへ------------------------------------

#data_frame作成用のリスト
user_id = []
tweet_id = []
created_at = []
search_word = []
text = []
rt_count = []
rp_count = []
lk_count = []
qt_count = []

for i in range(2):

    #前日のjsonファイル(tweet_data)を取得
    json_response = s3_client.get_object(Key=key_list[i], Bucket=bucket)
    json_body = json_response["Body"].read()
    json_data = json.loads(json_body.decode())

    #取得したtweet_dataの中身をリストに全て格納
    for word in search_words[i]:
        try:
            for d in json_data[word]:
                user_id.append(d["author_id"])
                tweet_id.append(d["id"])
                created_at.append(d["created_at"])
                search_word.append(word)
                text.append(d["text"])
                rt_count.append(d["public_metrics"]["retweet_count"])
                rp_count.append(d["public_metrics"]["reply_count"])
                lk_count.append(d["public_metrics"]["like_count"])
                qt_count.append(d["public_metrics"]["quote_count"])
        except:
            pass

#data_frameを作成
df_list=list(zip(user_id, tweet_id, created_at, search_word, text, rt_count, rp_count, lk_count, qt_count))
df = pandas.DataFrame(data=df_list, columns=["user_id", "tweet_id", "created_at","search_word", "text", "rt_count", "rp_count", "lk_count", "qt_count"])

#--------------------------------データの加工(時間変更、不要ツイート削除)-------------------------------------

#日本時間に変更
df["created_at"] = df["created_at"].map(lambda x : UTC_to_JTS(x))

#ツイート文章をクレンジング
df["text"] = df["text"].map(lambda x : cleansing_text(x))
#クレンジング後に空白になってしまった行、重複している行を削除
df = df[df["text"] != '']
df = df.drop_duplicates(subset=['text'])

#テキスト内でカウントしたいワードリストの準備
words = search_words[0] + search_words[1]
words += ["ごーにー", "きゅーろく", "ガロン"]

#関数を使って新しい列を追加
df["num_key_word"] = df["text"].map(lambda x : count_key_word(x))

#検索ワードの出現回数が6回以上のツイートデータ
n = 6
n_words = df[df["num_key_word"] >= n][["user_id","text"]]
#ワードの出現回数6回以上のツイートを期間内に3回以上しているuser_id
n_appearance = n_words.value_counts("user_id")
n_appearance = n_appearance[n_appearance >= 3]
del_id = list(n_appearance.index)

#定型文を流すbotなどの可能性が高いためアカウントごと除外する
df = df[~df["user_id"].map(lambda x : x in del_id)]

#ワードの出現回数が0回のツイート
del_tweet = df[df["num_key_word"] == 0]["tweet_id"].values

#こちらはツイート単位で削除する
df = df[~df["tweet_id"].map(lambda x : x in del_tweet)]

#----------------------------------------感情分析パート-----------------------------------------

#comprehendのインスタンスを作成
comprehend = boto3.client("comprehend", "ap-northeast-1")

#ツイートデータの感情分析の結果を取得
sentiment = df["text"].map(lambda x : comprehend.detect_sentiment(Text=x, LanguageCode="ja"))

#必要な値を一度リストに格納
sm = []
ps = []
ng = []
nt = []
mx = []
for s in sentiment:
    sm.append(s["Sentiment"])
    ps.append(s["SentimentScore"]["Positive"])
    ng.append(s["SentimentScore"]["Negative"])
    nt.append(s["SentimentScore"]["Neutral"])
    mx.append(s["SentimentScore"]["Mixed"])

#dataframeに列として追加
df["sentiment"] = sm
df["positive"] = ps
df["negative"] = ng
df["neutral"] = nt
df["mixed"] = mx

#--------------------------------csv出力-------------------------------------

#csvに変換
csv = df.to_csv(index=False, line_terminator="\n")

#csvデータをs3へ格納
csv_key = "csv_data/" + yesterday + ".csv"
s3_client.put_object(Key=csv_key, Bucket=bucket, Body=csv)

3-1. データの準備

Preparation.py
s3_client = boto3.client("s3", "ap-northeast-1")

words_response = s3_client.get_object(Key="tweets_data_json/search_words.json", Bucket="for-twitter-api-bucket")
words_body = words_response["Body"].read()
search_words = json.loads(words_body.decode())["words"]

t_delta = datetime.timedelta(hours=15)
yesterday = str(datetime.datetime.now() - t_delta)[:10]
wepsys = "tweets_data_json/" + yesterday + "_wepsys.json"
ssgoth = "tweets_data_json/" + yesterday + "_ssgoth.json"
bucket = "for-twitter-api-bucket"

key_list = [wepsys, ssgoth]

まずは準備として検索ワードの整理をします。

検索ワードを格納したJSONファイルを読み込み、
この後for文で回すkey_listを作成します。

このGlueは日本時間の10:00に動きますが、
Lambdaと同様にUTC基準で時間を取得してしまうため、
前日のデータを取得できるように設定しています。

3-2. データ取得/データフレーム作成

json_to_df.py
user_id = []
tweet_id = []
created_at = []
search_word = []
text = []
rt_count = []
rp_count = []
lk_count = []
qt_count = []

for i in range(2):

    json_response = s3_client.get_object(Key=key_list[i], Bucket=bucket)
    json_body = json_response["Body"].read()
    json_data = json.loads(json_body.decode())

    for word in search_words[i]:
        try:
            for d in json_data[word]:
                user_id.append(d["author_id"])
                tweet_id.append(d["id"])
                created_at.append(d["created_at"])
                search_word.append(word)
                text.append(d["text"])
                rt_count.append(d["public_metrics"]["retweet_count"])
                rp_count.append(d["public_metrics"]["reply_count"])
                lk_count.append(d["public_metrics"]["like_count"])
                qt_count.append(d["public_metrics"]["quote_count"])
        except:
            pass

df_list=list(zip(user_id, tweet_id, created_at, search_word, text, rt_count, rp_count, lk_count, qt_count))
df = pandas.DataFrame(data=df_list, columns=["user_id", "tweet_id", "created_at","search_word", "text", "rt_count", "rp_count", "lk_count", "qt_count"])

ここでツイートデータのJSONファイルを読み込み、
必要な項目をデータフレームに変換しています。

Lambdaの稼働時間の問題で検索ワードを
大きく2つに分けていたので、ここでのfor文も
key_list[i]で2回に分けて取得しています。

3-3. データの加工

processing.py
def cleansing_text(text):
    tmp = re.sub(r"#[^\s]*", "", text, 10)
    tmp = re.sub(r"52", "ごーにー", tmp)
    tmp = re.sub(r"96", "きゅーろく", tmp)
    tmp = neologdn.normalize(tmp.lower())
    cleansed_text = re.sub(r"[^ぁ-んァ-ン一-龥ー]+", "", tmp)
    return cleansed_text

def count_key_word(text):
    num = 0
    for word in words:
        if word in text:
            num += 1
    return num

def UTC_to_JTS(date):
    delta_9 = datetime.timedelta(hours=9)
    str_UTC = re.sub("T", " ", date[:19])
    JTS = datetime.datetime.strptime(str_UTC, "%Y-%m-%d %H:%M:%S") + delta_9
    str_JTS = datetime.datetime.strftime(JTS, "%Y-%m-%d %H:%M:%S")
    return str_JTS

df["created_at"] = df["created_at"].map(lambda x : UTC_to_JTS(x))

df["text"] = df["text"].map(lambda x : cleansing_text(x))
df = df[df["text"] != '']
df = df.drop_duplicates(subset=['text'])

words = search_words[0] + search_words[1]
words += ["ごーにー", "きゅーろく", "ガロン"]

df["num_key_word"] = df["text"].map(lambda x : count_key_word(x))

n = 6
n_words = df[df["num_key_word"] >= n][["user_id","text"]]
n_appearance = n_words.value_counts("user_id")
n_appearance = n_appearance[n_appearance >= 3]

del_id = list(n_appearance.index)
df = df[~df["user_id"].map(lambda x : x in del_id)]

del_tweet = df[df["num_key_word"] == 0]["tweet_id"].values
df = df[~df["tweet_id"].map(lambda x : x in del_tweet)]

ここで行う作業は大きく4つです。
・時間表記の変更
・ツイートのクレンジング
・検索ワードのカウントデータを追加
・不要なツイートの除去

3-3-1. 時間表記の変更

change_time.py
def UTC_to_JTS(date):
    delta_9 = datetime.timedelta(hours=9)
    str_UTC = re.sub("T", " ", date[:19])
    JTS = datetime.datetime.strptime(str_UTC, "%Y-%m-%d %H:%M:%S") + delta_9
    str_JTS = datetime.datetime.strftime(JTS, "%Y-%m-%d %H:%M:%S")
    return str_JTS

df["created_at"] = df["created_at"].map(lambda x : UTC_to_JTS(x))

Twitterではデフォルトの時間表記がUTC基準で
"YYYY-mm-ddTHH:MM:SSZ" のようになっています。

これを日本時間に直しつつ、見やすいように
"YYYY-mm-dd HH:MM:SS" に変換しています。

3-3-2. ツイートのクレンジング

cleansing.py
def cleansing_text(text):
    tmp = re.sub(r"#[^\s]*", "", text, 10)
    tmp = re.sub(r"52", "ごーにー", tmp)
    tmp = re.sub(r"96", "きゅーろく", tmp)
    tmp = neologdn.normalize(tmp.lower())
    cleansed_text = re.sub(r"[^ぁ-んァ-ン一-龥ー]+", "", tmp)
    return cleansed_text

df["text"] = df["text"].map(lambda x : cleansing_text(x))
df = df[df["text"] != '']
df = df.drop_duplicates(subset=['text'])

クレンジングでは主に日本語で使われる
"ひらがな","カタカナ","漢字","ー" のみ取り出します。

不要なテキスト除去のために数字も排除していますが、
検索ワードの中には数字を含むワードもあります。

クレンジングの際に必要なワードが除去されないように
re.sub(r"52", "ごーにー", tmp)のように変換しておきます。

最後に空欄、重複したデータを除去します。

3-3-3. 検索ワードのカウントデータを追加

word_count.py
def count_key_word(text):
    num = 0
    for word in words:
        if word in text:
            num += 1
    return num

words = search_words[0] + search_words[1]
words += ["ごーにー", "きゅーろく", "ガロン"]

df["num_key_word"] = df["text"].map(lambda x : count_key_word(x))

この部分では検索ワードをテキスト内に
いくつ含んでいるかの特徴量を追加しています。

この特徴量はこの後に行うデータの精製に使用します。

3-3-4. 不要なツイートの除去

del_tw.py
n = 6
n_words = df[df["num_key_word"] >= n][["user_id","text"]]
n_appearance = n_words.value_counts("user_id")
n_appearance = n_appearance[n_appearance >= 3]

del_id = list(n_appearance.index)
df = df[~df["user_id"].map(lambda x : x in del_id)]

del_tweet = df[df["num_key_word"] == 0]["tweet_id"].values
df = df[~df["tweet_id"].map(lambda x : x in del_tweet)]

先ほど作成した特徴量を使ってデータを精製します。

ここでの考えは2点あります。

まず1点。
キーワードの出現が0回のツイートについては、
ハッシュタグで抽出された
のちに除去された可能性が高く、
文章の意味の扱いが不安定になるので除去してしまう。

そしてもう1点。
キーワードの出現が6回以上で多すぎるツイート、
かつ1日の出現数が3回以上のユーザー
は、
ゲームに関する情報を定期で配信するbotの可能性が高い。

ユーザーの意見を反映するツイートではないため、
ここでは除去することにしました。

ここの検討は実際にグラフを描画して確認したり、
各ユーザーを確認してbotの割合を見た上で判断しました。
この精製方法についてはいずれ検討する予定です。

3-4. 感情分析の値を追加

add_sentimentscore.py
comprehend = boto3.client("comprehend", "ap-northeast-1")

sentiment = df["text"].map(lambda x : comprehend.detect_sentiment(Text=x, LanguageCode="ja"))

sm = []
ps = []
ng = []
nt = []
mx = []
for s in sentiment:
    sm.append(s["Sentiment"])
    ps.append(s["SentimentScore"]["Positive"])
    ng.append(s["SentimentScore"]["Negative"])
    nt.append(s["SentimentScore"]["Neutral"])
    mx.append(s["SentimentScore"]["Mixed"])

df["sentiment"] = sm
df["positive"] = ps
df["negative"] = ng
df["neutral"] = nt
df["mixed"] = mx

前回の記事で利用したComprehendを使って
テキスト毎の感情分類の確信度 を追加しています。

3-5. csvでデータを出力

to_csv.py
csv = df.to_csv(index=False, line_terminator="\n")

csv_key = "csv_data/" + yesterday + ".csv"
s3_client.put_object(Key=csv_key, Bucket=bucket, Body=csv)

最後にデータフレームをcsvに変換して、
Bodyに格納してs3へ出力しています。

のちにAthenaでクエリをする際に必要になるので、
line_terminator="\n"を指定しています。

これで一連の作業は完了です。

4. デプロイ

上記コードをpyファイルでアップロードするか、
コンソール画面にコピペしたらコードは完了です。

まずはコードが正しく動くかテストをします。

[Run] のボタンを押下するとコードが動きます。
[Runs] タブの [Cloudwatch logs] でログも確認可能で、
成功すると [Run status]success と表示されます。
Glue Run.png
次に稼働スケジュールの設定をします。
[schedules] タブの [Create schedule] を選択します。
Glue schedule.png
名前、頻度、時間、説明の欄を埋めていきます。
UTC基準なので画像の例だと昼の12:30に動きます。
最後に [Create schedule] を押せば設定も完了です。
Glue Schedule 設定.png

5. 所感と今後の展望

5-1. 躓いた点

・Glueを使用する理由
・外部パッケージの利用
・データの精製

今回の記事でも取り上げましたが、
最初は Lambdaとの違いの理解が曖昧 で、
わざわざGlueを採用する必要があるのか迷いました。

結果的に作業時間が15分を超えることが分かったため
今回はGlueを使うこととなりました。

しかしコードの見直しなどによって処理を軽くして、
15分以内に収めるような工夫もあるのかも知れません。

外部パッケージの利用については調べて解決しました。
記事を上げて下さっている先駆者の方に感謝です。

データの精製 については今後の課題です。
今回の処理だけでも数百件のデータは除外していますが、
まだまだ精度は足りていないというのが個人の感想です。

5-2. 今後の展望

今回までで必要なデータを含むcsvファイルを
自動的に出力する仕組みができました。
次回はこのデータを使って可視化を行います。

またデータの精製はずっと課題になりそうです。
結果に正解がある作業ではないため、
有意性の検討に非常に頭を使う作業です。

5-3. さいごに

本記事を読んで頂いてありがとうございました。
コメントや質問、お気軽に宜しくお願い致します。

次回の記事

4
5
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
4
5