背景
こちらの記事の続きになります。
claudeで話題を取得
前回の記事で、(100, 1536)行列を(100, 24)行列へ次元削減して、hdbscanでクラスタリングを行いました。今回は、クラスタリング結果を踏まえて、クラスター毎に文章をclaudeへ投げて、クラスター毎の話題を取得してみます。
まずは、claudeを使うための変数を設定します。今回はclaude v3.5 sonnetを使います。
MODELID_CLAUDE_v3_5 = "anthropic.claude-3-5-sonnet-20240620-v1:0"
claudeを呼び出す関数になります。
コードはこちらを参考にしました。
def call_bedrock_claude(bedrock, model_id, prompt_text):
body = json.dumps({"anthropic_version": "bedrock-2023-05-31",
"max_tokens": 1000,
"messages": [{"role": "user",
"content": [{"type": "text",
"text": prompt_text}]
}]
})
response = bedrock.invoke_model(body=body,
modelId=model_id,
accept="application/json",
contentType="application/json")
response_body = json.loads(response.get("body").read())
print(response_body)
print(response_body["content"][0]["text"])
response_body_categorize = response_body["content"][0]["text"]
return response_body_categorize
続いて、先程作った関数を使う形で、クラスター毎に所属している文章を丸っとclaudeに「これらの文章の共通の話題は何?」というプロンプトを添えて投げる関数を作ります。
claudeのAPIリクエスト制限に抵触しないように、「クラスター毎に所属している文章」の文字数が多い場合は、途中で文章を切る形にします。
今回は、入力データの100文章の1文章当たりの平均文字数を算出して、その平均文字数の定数倍を超えていたら、そこで文章を切る形にしています。その定数倍は、引数letter_num_threshold_timesで指定します。
def get_insight_in_text(clustering_data, doc_list, letter_num_threshold_times):
cluster_label_list = [label for label in np.unique(clustering_data.labels_) if label != -1]
if len(cluster_label_list) == 0:
print("クラスターは外れ値のみです")
return None
same_cluster_doc_list_dict = {}
same_cluster_doc_index_dict = {}
same_cluster_prompt_dict = {}
same_cluster_insight_dict = {}
for cluster_label in cluster_label_list:
same_cluster_doc = []
same_cluster_index = []
for i in range(len(clustering_data.labels_)):
if clustering_data.labels_[i] == cluster_label:
same_cluster_doc.append(doc_list[i])
same_cluster_index.append(i)
prompt_list = ["**依頼文**\n\n日本語のみで結果を出力してください。重複、コメント、説明は不要です。以下のデータはあるクラスタリングアルゴリズムによって同じクラスタに分類された文章群です。次のステップに沿って情報を整理し、JSON形式で結果を出力してください。出力するJSONは一つのみとし、JSON以外の内容を出力しないでください。\n\n**ステップ**\n\n1. 文章群を読み、話題ごとのカテゴリに分けて整理してください。\n2. 各カテゴリに共通する話題を見つけ、その話題に基づいてクラスタ全体に名前を付けてください。\n\n最終的に、以下の条件に従ってJSON形式で結果を出力してください。\n- カテゴリ、話題、クラスタ名を含むJSONを生成してください。\n- 各キー名は「category」、「topic」、「cluster_name」としてください。\n- キーの値は文字列型としてください。\n- JSONオブジェクトの先頭と末尾には「***」を付け、JSONを「***」で囲んでください。\n- 「***」の前後に余分な文字や記号を加えないでください。\n- もし情報を整理できない場合は、最も多い話題を特定してクラスタ名を付けてください。\n\n**データ**\n\n"]
for doc in same_cluster_doc:
prompt_list.append(doc)
prompt_list.append("\n")
prompt = "".join(prompt_list)
doc_letter_num_list = [len(doc_list[i]) for i in range(len(doc_list))]
ave_doc_letter_num = int(sum(doc_letter_num_list) / len(doc_letter_num_list))
if len(prompt) >= ave_doc_letter_num * letter_num_threshold_times: # プロンプトの文字数が文章データの平均文字数の指定倍以上の場合、
prompt = prompt[:ave_doc_letter_num*letter_num_threshold_times] # プロンプトの文字数を文章データの平均文字数の指定倍に留める
try:
text_insight = call_bedrock_claude(bedrock=BEDROCK,
model_id=MODELID_CLAUDE_v3_5,
prompt_text=prompt)
same_cluster_prompt_dict[cluster_label] = prompt
same_cluster_doc_list_dict[cluster_label] = same_cluster_doc
same_cluster_doc_index_dict[cluster_label] = same_cluster_index
same_cluster_insight_dict[cluster_label] = text_insight
except Exception as e:
print("---クラスター{a}はプロンプトに放り込むトークン数が多いので、さらにクラスタリングして文章数を減らす事をお勧めします---".format(a=cluster_label))
same_cluster_prompt_dict[cluster_label] = None
same_cluster_doc_list_dict[cluster_label] = same_cluster_doc
same_cluster_insight_dict[cluster_label] = None
continue
time.sleep(30)
return same_cluster_doc_list_dict, same_cluster_doc_index_dict, same_cluster_prompt_dict, same_cluster_insight_dict
それでは、claudeに文章を投げてみます。今回は文章を切るかどうかの判断の定数倍は25にしました。
doc_list_dict, doc_list_index_dict, prompt_dict, insight_dict = get_insight_in_text(clustering_data=clustering_data,
doc_list=list(text_df["text"]),
letter_num_threshold_times=25)
{'id': 'msg_bdrk_01CTQLDCpPVG2sRijVkBvcWf', 'type': 'message', 'role': 'assistant', 'model': 'claude-3-5-sonnet-20240620', 'content': [{'type': 'text', 'text': '以下は日本語で出力したJSONデータです:\n\n***\n{\n "category": "スポーツ",\n "topic": "野球とサッカー",\n "cluster_name": "プロスポーツに関するニュース・分析"\n}\n***'}], 'stop_reason': 'end_turn', 'stop_sequence': None, 'usage': {'input_tokens': 16097, 'output_tokens': 77}}
以下は日本語で出力したJSONデータです:
***
{
"category": "スポーツ",
"topic": "野球とサッカー",
"cluster_name": "プロスポーツに関するニュース・分析"
}
***
{'id': 'msg_bdrk_01GTDg7A9QQgt3yrqyFazPny', 'type': 'message', 'role': 'assistant', 'model': 'claude-3-5-sonnet-20240620', 'content': [{'type': 'text', 'text': '以下のJSONが要求された結果の出力です:\n\n***\n{\n "category": "料理レシピと食品情報",\n "topic": "食べ物に関する情報や調理方法、健康効果など",\n "cluster_name": "食品と料理に関する情報"\n}\n***'}], 'stop_reason': 'end_turn', 'stop_sequence': None, 'usage': {'input_tokens': 17546, 'output_tokens': 91}}
以下のJSONが要求された結果の出力です:
***
{
"category": "料理レシピと食品情報",
"topic": "食べ物に関する情報や調理方法、健康効果など",
"cluster_name": "食品と料理に関する情報"
}
***
JSON形式で出力するようにプロンプトで指示していましたので、想定通りの挙動をしてくれました。
どうせなので、claudeで話題を取得したら、その結果をcsvファイルにしてS3へ保存する形にしてみます。JSON形式は***で囲っているので、正規表現を使って、うまくJSON形式の部分を抜き出します。
import re
def get_result_as_dataframe(result_index_dict, result_insight_dict):
result_cluster_number_list = [int(list(result_insight_dict.keys())[i]) for i in range(len(result_insight_dict.keys()))]
result_cluster_doc_index_list = [str(result_index_dict[i]) for i in result_cluster_number_list]
regex = r"\*\*\*(.*)\*\*\*"
result_cluster_name_list = []
for i in result_cluster_number_list:
claude_answer = result_insight_dict[i]
claude_answer_temp1 = claude_answer.replace("\n", "")
claude_answer_temp2 = claude_answer_temp1.replace(" ", "")
regex_result = re.search(pattern=regex,
string=claude_answer_temp2)
claude_answer_temp3 = regex_result.group(1)
claude_answer_temp4 = json.loads(s=claude_answer_temp3)
cluster_insight_name = claude_answer_temp4["topic"]
result_cluster_name_list.append(cluster_insight_name)
result_as_df = pd.DataFrame(data={"cluster_number": result_cluster_number_list,
"topic": result_cluster_name_list,
"doc_id": result_cluster_doc_index_list})
result_as_csv = result_as_df.to_csv(index=False,
encoding="utf-8_sig")
S3_CLIENT.put_object(Body=result_as_csv,
Bucket="sagemaker-*****",
Key="*****/result_hogehoge.csv")
return
claudeに文章を投げる関数と、先程作った関数を使います。
claudeは時々JSON形式の結果を出してくれない時があり、その時は先程作った関数がエラーになってしまうので、try-except文とfor文を使って、3回はリトライする形にしてみました。
for i in range(3):
doc_list_dict, doc_list_index_dict, prompt_dict, insight_dict = get_insight_in_text(clustering_data=clustering_data,
doc_list=list(text_df["text"]),
letter_num_threshold_times=25)
try:
get_result_as_dataframe(result_index_dict=doc_list_index_dict,
result_insight_dict=insight_dict)
break
except Exception as e:
print("リトライ{a}回目".format(a=i))
continue
こちらを行った後、対象のS3バケットのフォルダを確認してみます。
csvファイルを確認してみると、
想定通りの形で、claudeの結果をcsvファイルで出力出来ました。
(話題(topic)の内容が変わっていて、すみません。別日に改めてcsvファイルを取得し直した形でして。)
doc_idの数値は、入力のcsvファイルのid列の数値と紐付く形になりますので、該当のidの文章がその話題のものかどうかを確認する事が出来ます。
以上になります。
その1からお付き合い頂きまして、ありがとうございました。
まとめ
色々とブラッシュアップする余地はありそうですが、一旦はそれっぽいものが出来ました(笑)
「S3にcsvファイルがアップロードされた事をトリガーにして、AWS Lambdaでこの処理を行って、結果をS3にcsvファイルで保存する。」
という流れを行おうと思いましたが、ライブラリーをインストールする所でAWS Lambdaのレイヤーの50MB以内という制約に抵触してしまい、AWS Lambdaでは実装出来ずでした。
そのため、AWS Fargateで試そうと思います。AWS Fargateでの実装は、また別の記事で書いてみようと思います。