こんにちは。株式会社ウフルの廣田です。
この記事は「2025 Japan AWS Jr. Champions 夏のQiitaリレー」の14日目です。
過去の投稿(リンク集)はこちらからご覧ください!
1. はじめに
-
AWS AppSyncとは
サーバーレスで高性能なGraphQLおよびPub/Sub APIsを使用して、アプリケーションとサービスをイベントに接続できるサービスです。[1]
元々はGraphQLのイメージが強かったですが、2024年10月にAppSync Event APIがリリースされました。
こちらはあらゆる規模のリアルタイムウェブ体験およびモバイル体験を強化する、安全で高性能なサーバーレス WebSocket API を構築するためのソリューションを提供しているようです。[2] -
課題、願望
Amazon Connect の文字起こしをリアルタイムで画面に表示させたら便利だよな〜と思っていました! -
今回やってみること!
「AppSync Event API のWebSocket APIを使用して、Amazon Connect Contact Lensによってリアルタイムに文字起こしされた情報をブラウザに表示する」ことを実装してみました!
ブラウザに表示することができるということは、他のSaaSなどに文字起こし内容をリアルタイムで表示することが可能になると考えています。
この記事の内容が誰かの助けになれば幸いです。
2. 全体アーキテクチャ
今回はAmazon Connect 内のストリーミング情報をKinesisで受け取り、LambdaにEvent情報として渡します。
Lambda側ではブラウザ側で通話者間のやりとりがわかりやすいように整形して、AWS AppSyncへ渡し、そのままブラウザに表示する流れです。
3. リソース一覧
No. | リソース名 | 備考 | 参考記事 |
---|---|---|---|
1 | Amazon Connect インスタンス 作成 | ストリームにKinesisを紐づける | Amazon Connect インスタンスを作成する |
2 | Kinesis Data Streams 作成 | AssociateInstanceStorageConfig API を使用して、音声コンタクトのリソースタイプを関連付ける | データストリームを作成する |
3 | AppSync Event API 作成 | APIキーとエンドポイントURLを取得 | [新機能] AppSync Event APIが発表されました! |
4 | Lambda 作成 | AWS AppSyncへPublish | - |
5 | HTML 作成 | AWS AppSyncからサブスクライブ | - |
4. AppSync Event API 作成
5. Lambda Publisher 実装
"""
Kinesis(Contact Lens Streams) → Lambda → AppSync Event API へ API キー認証でイベントを Publish する関数
環境変数
HTTP_ENDPOINT = "<api-id>.appsync-api.<region>.amazonaws.com"
API_KEY = "da2-xxxxxxxxxxxxxxxxxxxx" # AWS AppSync で発行したキー
CHANNEL = "/default/message" # チャンネル名
"""
import botocore.awsrequest, botocore.httpsession, json, os, base64
HTTP_ENDPOINT = os.environ["HTTP_ENDPOINT"].rstrip("/")
API_KEY = os.environ["API_KEY"]
CHANNEL = os.environ["CHANNEL"]
session = botocore.httpsession.URLLib3Session()
def lambda_handler(event, context):
try:
print(f"Received event: {event}")
segments = []
for record in event.get("Records", []):
payload = record["kinesis"]["data"]
decoded = base64.b64decode(payload)
data = json.loads(decoded)
print(f"Decoded data: {data}")
if "Segments" in data:
segments.extend(data["Segments"])
else:
print("No Segments found in the record.")
transcript = correct_transcript(segments)
events = [
json.dumps({"role": t["role"], "content": t["content"]})
for t in transcript
]
print(f"AppSync Post Event: ", events)
body = _http_post({"channel": CHANNEL, "events": events})
return {"statusCode": 200, "body": body}
except Exception as e:
print(f"Error processing event: {e}")
return {"statusCode": 500, "body": str(e)}
def correct_transcript(segments: list):
corrected_segments = []
role_mapping = {
"AGENT": "エージェント",
"CUSTOMER": "顧客",
}
for s in segments:
if "Transcript" in s:
transcript = s["Transcript"]
content = transcript.get("Content", "")
participant_role = transcript.get("ParticipantRole", "")
role = role_mapping.get(participant_role)
begin_offset = transcript.get("BeginOffsetMillis", 0)
if role and content:
corrected_segments.append({
"role": role,
"content": content,
"beginOffsetMillis": begin_offset
})
corrected_segments.sort(key=lambda x: x["beginOffsetMillis"])
return corrected_segments
def _http_post(payload: dict) -> str:
req = botocore.awsrequest.AWSRequest(
method="POST",
url=f"https://{HTTP_ENDPOINT}/event",
data=json.dumps(payload).encode(),
headers={
"content-type": "application/json",
"x-api-key": API_KEY
}
)
resp = session.send(req.prepare())
if resp.status_code >= 400:
raise RuntimeError(f"Publish failed: {resp.status_code} {resp.text[:200]}")
return resp.text or "OK"
6. ブラウザ Subscriber 実装
<!DOCTYPE html>
<html lang="ja">
<body>
<h3>リアルタイムの文字起こし</h3>
<ul id="log"></ul>
</body>
</html>
<script>
// ハードコーティングになってしまってすみません。こちらは適宜環境変数に置き換えてください。
const HTTP_ENDPOINT = "<api-id>.appsync-api.<region>.amazonaws.com/event";
const WS_ENDPOINT = "<api-id>.appsync-api.<region>.amazonaws.com";
const API_KEY = "da2-xxxxxxxxxxxxxxxxxxxx";
const CHANNEL = "/default/message";
// util
function b64url(str){return btoa(str).replace(/\+/g,'-').replace(/\//g,'_').replace(/=+$/,'');}
function addLog(text){document.querySelector("#log").insertAdjacentHTML("beforeend",`<li>${text}</li>`);}
// --------------------------------------------------
// WebSocket 接続
const header = b64url(JSON.stringify({host: HTTP_ENDPOINT.replace("/event",""), "x-api-key": API_KEY}));
const ws = new WebSocket(`wss://${WS_ENDPOINT}/event/realtime`,
['aws-appsync-event-ws', `header-${header}`]);
ws.onopen = () => { console.log("🟢 OPEN"); ws.send(JSON.stringify({type:"connection_init"})); };
ws.onerror = e => console.error("🔴 WS error", e);
ws.onclose = e => console.warn(`🟠 WS closed: ${e.code}`);
ws.onmessage = ev=>{
console.log("📩", ev.data);
const m = JSON.parse(ev.data);
if (m.type === "connection_ack") {
// 接続ACK → 購読
ws.send(JSON.stringify({
type:"subscribe",
id:crypto.randomUUID(),
channel:CHANNEL,
authorization:{host:HTTP_ENDPOINT.replace("/event",""), "x-api-key":API_KEY}
}));
}
if (m.type === "subscribe_success") console.log("✅ subscribed");
// ★ Event API は "data" フレームにセットされてくる
if (m.type === "data" && m.event) {
const payload = JSON.parse(m.event);
addLog(`[${payload.role}] ${payload.content}`);
}
};
</script>
</body></html>
7. 動作確認
GIFだと音声がないので分かりにくいですが、実際は発話後 5秒程度 遅れて表示されています。
8. まとめ
最後までご覧いただき、誠にありがとうございます!
AppSync Event API という機能を使用して、Amazon Connectでの通話内容をリアルタイムで文字起こしすることをやってみました。
AWS AppSyncについて、これまでいまいちわからなかったという方もいらっしゃったのではないでしょうか?
「AppSyncを使ってこんなこともできるんだ!」とわかっていただけたら嬉しいです!
自分もAWS AppSyncというサービスの一端を知ることができたと思っています。
これからも「こんなことできないかな?」というアイディアを形にしていき、技術について知っていきたいと思いました。