👋 はじめに
AWSの新機能や更新情報を追いかけるのって、結構大変ですよね?
AWS What's Newは毎日更新されていて、全部チェックするのも一苦労…。(´・ω・`)
しかも、日本語のサイトは更新が遅れていることも多く、最新情報を得るには英語版を見に行かなきゃいけません。
そこで、AWS What's Newの更新を自動で取得して翻訳し、LINEに通知するシステムをTerraformで作ってみました!
🎯 やりたいこと
- AWS What's Newの更新を自動で取得したい
- 英語の記事を日本語に翻訳したい
- 翻訳した内容をLINEで通知したい
- インフラをコードで管理したい(IaC)
🏗️ システムアーキテクチャ
使うAWSサービス
サービス | 用途 |
---|---|
EventBridge Scheduler | 定期実行のトリガー |
Lambda (3つ) | 情報取得・翻訳・通知処理 |
SQS | 記事をキューとして保持 |
DynamoDB | 記事情報の保存 |
Amazon Bedrock | 英語記事の日本語翻訳 |
LINE Messaging API | 通知配信 |
システム構成図
💡 設計のポイント
1. RSSフィードを使う理由
なぜRSSを選んだの?
- AWS What's Newは公式にRSSフィードを提供していて、
更新情報を簡単にキャッチできるから。 - APIと比べてシンプルに実装できるので、手軽に始められる。
- 更新情報を効率よく取得できる。
2. Lambdaを分けた理由
Lambdaを3つに分けた理由と、それぞれの機能について説明しますね。
① 情報取得Lambda
- AWS What's NewのRSSフィードから新着記事をGet!
- 新しい記事をDynamoDBに保存
- 翻訳するためにSQSにキューを送信
- ②を1回でいいのに同じ記事に対して大量に処理してしまう可能性があるので
可視性タイムアウトはいい感じに設定する必要があります
- ②を1回でいいのに同じ記事に対して大量に処理してしまう可能性があるので
② 翻訳Lambda
- SQSから翻訳待ちの記事を受け取る
- Amazon Bedrockで日本語に翻訳&要約
- 翻訳&要約した記事をDynamoDBに保存
🔍 重要なポイント: Bedrockの連続呼び出しによるThrottlingExceptionを避けるために:
- SQSでキュー管理をして
- Lambdaの同時実行数を1に制限しています
これをやらないと以下のようなエラーが出ちゃいます
An error occurred (ThrottlingException) when calling the InvokeModel operation (reached max retries: 4): Too many requests, please wait before trying again. You have sent too many requests. Wait before trying again.
③ 通知Lambda
- 翻訳した記事をDynamoDBから取得
- LINE Messaging APIを使って通知を送信
3. 二段階スケジューリング
スケジュール | 目的 | 実行間隔 |
---|---|---|
情報取得 | 新規記事のチェック | 1時間ごと |
通知処理 | 翻訳済み記事の通知 | 5分ごと |
📱 LINE通知の実装
LINE Messaging APIの採用理由
❌ LINE Notify
- 以前投稿した記事ではLINE Notifyを利用していましたがサービス終了が決定したため代替手段が必要になりました。
- そこで公式からも紹介があった、LINE Messaging APIを使うことにしました。
✅ LINE Messaging API
LINE Messaging APIの利点は以下の通りです
- 公式サポート継続
- 柔軟な通知カスタマイズ
しかし、以下の注意点があります。
利用時の注意点
- 無料プラン: 月200通まで
- それ以上は課金プランが必要
- LINE公式アカウントの料金プランを確認
セットアップ手順
- LINE公式アカウント作成
- Messaging API有効化
- チャネルアクセストークン取得
- Lambda内でAPI呼び出し
詳細はLINE Messaging APIの設定方法を参照
💻 実装コード
こちらは参考になるかは分かりませんがおいておきます
情報取得Lambda
import json
import re
import boto3
import feedparser
import pytz
import time
from datetime import datetime, timedelta
from typing import List, Dict, Any
from botocore.exceptions import ClientError
# 定数定義
AWS_REGION = 'ap-northeast-1'
RSS_FEED_URL = 'https://aws.amazon.com/new/feed/'
JST_TIMEZONE = pytz.timezone('Asia/Tokyo')
UTC_TIMEZONE = pytz.UTC
DEFAULT_TTL_DAYS = 60
# AWS リソース初期化
def init_aws_resources():
dynamodb = boto3.resource('dynamodb')
sqs = boto3.client('sqs')
ssm = boto3.client('ssm')
table_name = ssm.get_parameter(Name='/what-aws-news/dynamodb/article/table-name', WithDecryption=True)['Parameter']['Value']
queue_url = ssm.get_parameter(Name='/what-aws-news/sqs/translate/url', WithDecryption=True)['Parameter']['Value']
return dynamodb.Table(table_name), sqs, queue_url
news_table, sqs_client, QUEUE_URL = init_aws_resources()
def convert_to_jst(utc_time_str: str) -> datetime:
"""UTC時間文字列をJSTのdatetimeオブジェクトに変換"""
utc_time = datetime.strptime(utc_time_str, "%a, %d %b %Y %H:%M:%S %Z")
return utc_time.replace(tzinfo=UTC_TIMEZONE).astimezone(JST_TIMEZONE)
def clean_content(text: str) -> str:
"""HTMLタグとTo learn more以降のテキストを削除"""
text = re.sub('<.*?>', '', text)
return text.split("To learn more")[0].strip() if "To learn more" in text else text.strip()
def create_article(entry: Dict[str, Any], fetch_date: str) -> Dict[str, Any]:
"""フィードエントリーから記事データを作成"""
published_jst = convert_to_jst(entry.get('published', datetime.now().strftime("%a, %d %b %Y %H:%M:%S %Z")))
return {
'タイトル': entry.get('title', 'タイトルなし'),
'link': entry.get('link', ''),
'公開日': published_jst.strftime('%Y/%m/%d'),
'取得日': fetch_date,
'本文': clean_content(entry.get('description', '')),
'TimeToExist': int((datetime.now() + timedelta(days=DEFAULT_TTL_DAYS)).timestamp())
}
def get_rss_feed() -> List[Dict[str, Any]]:
"""RSSフィードから記事を取得"""
try:
feed = feedparser.parse(RSS_FEED_URL)
fetch_date = datetime.now(JST_TIMEZONE).strftime('%Y/%m/%d')
return [create_article(entry, fetch_date) for entry in feed.entries]
except Exception as e:
print(f"RSSフィード取得エラー: {e}")
return []
def save_and_queue_article(article: Dict[str, Any]) -> bool:
"""記事をDynamoDBに保存しSQSに送信"""
try:
news_table.put_item(
Item=article,
ConditionExpression='attribute_not_exists(link)'
)
sqs_client.send_message(
QueueUrl=QUEUE_URL,
MessageBody=json.dumps(article, ensure_ascii=False)
)
return True
except ClientError as e:
if e.response['Error']['Code'] == 'ConditionalCheckFailedException':
return False
print(f"記事の保存/送信エラー: {e}")
return False
except Exception as e:
print(f"記事の保存/送信エラー: {e}")
return False
def lambda_handler(event: Dict[str, Any], context: Any) -> Dict[str, Any]:
try:
articles = get_rss_feed()
new_articles = [article for article in articles if save_and_queue_article(article)]
print(f"新規記事数: {len(new_articles)}/{len(articles)}")
if new_articles:
print("取得した記事:", json.dumps(new_articles, ensure_ascii=False, indent=2))
return {
'statusCode': 200,
'body': json.dumps({'message': f'{len(articles)}件の記事を処理'})
}
except Exception as e:
print(f"Lambda実行エラー: {e}")
return {
'statusCode': 500,
'body': json.dumps({'error': str(e)})
}
翻訳Lambda
import json
import boto3
import time
from datetime import datetime
from typing import Dict, Any, Optional, Set
# AWS クライアントの初期化
def init_aws_clients():
"""AWS クライアントの初期化"""
return {
'bedrock': boto3.client('bedrock-runtime'),
'dynamodb': boto3.resource('dynamodb'),
'sqs': boto3.client('sqs'),
'article_table': boto3.resource('dynamodb').Table(get_parameter('/what-aws-news/dynamodb/article/table-name'))
}
def get_parameter(parameter_name: str) -> str:
"""Parameter Storeからパラメータを取得"""
return boto3.client('ssm').get_parameter(Name=parameter_name, WithDecryption=True)['Parameter']['Value']
# AWS リソースの初期化
aws = init_aws_clients()
QUEUE_URL = get_parameter('/what-aws-news/sqs/translate/url')
def translate_with_bedrock(text: str, content_type: str = "不明") -> Optional[str]:
"""Bedrockを使用してテキストを翻訳"""
print(f"\n=== {content_type}の処理を開始 ===")
prompts = {
"タイトル": f"""Please translate the following English text to Japanese and make it concise like a news headline.
Follow these rules:
1. Remove any unnecessary words at the end (e.g., 'になりました', 'しました', etc.)
2. End with shorter phrases like 'に対応', '利用可能に', '開始', etc.
3. Keep it concise while maintaining the key information
4. Only return the translated text without any explanations
Text to translate:
{text}""",
"本文": f"""Please follow these steps:
1. Translate the following English text to Japanese
2. Then summarize the translated content in 2-3 concise sentences
3. Return only the summary in Japanese, without any explanations or original text
Text to process:
{text}"""
}
prompt_text = prompts.get(content_type)
if not prompt_text:
print(f"未対応のcontent_type: {content_type}")
return None
try:
for attempt in range(4):
try:
response = aws['bedrock'].invoke_model(
modelId="anthropic.claude-3-5-sonnet-20240620-v1:0",
body=json.dumps({
"anthropic_version": "bedrock-2023-05-31",
"messages": [{"role": "user", "content": [{"type": "text", "text": prompt_text}]}],
"max_tokens": 4000,
"temperature": 0
})
)
result = json.loads(response['body'].read())['content'][0]['text'].strip()
print(f"処理結果: {result}")
return result
except Exception as e:
wait_time = 3 * (3 ** attempt)
print(f"試行{attempt + 1}/4 - エラー: {str(e)}")
if 'ThrottlingException' in str(e) and attempt < 3:
print(f"{wait_time}秒待機します...")
time.sleep(wait_time)
continue
return None
except Exception as e:
print(f"翻訳エラー({content_type}): {e}")
return None
def update_dynamodb_item(article_data: Dict[str, Any]) -> bool:
"""DynamoDBの記事データを更新"""
try:
# 翻訳情報の更新
updated = aws['article_table'].update_item(
Key={'link': article_data['link'], '公開日': article_data['公開日']},
UpdateExpression='SET #tt = :title, #ts = :summary REMOVE #t, #b',
ExpressionAttributeNames={
'#tt': '翻訳タイトル',
'#ts': '翻訳本文',
'#t': 'タイトル',
'#b': '本文'
},
ExpressionAttributeValues={
':title': article_data['translated_title'],
':summary': article_data['translated_summary']
},
ReturnValues='ALL_NEW'
)
return bool(updated.get('Attributes'))
except Exception as e:
print(f"DynamoDB更新エラー: {e}")
return False
def process_message(record: Dict[str, Any], processed_messages: Set[str]) -> bool:
"""SQSメッセージの処理"""
message_id = record['messageId']
if message_id in processed_messages:
print(f"メッセージ {message_id} は既に処理済み")
return False
processed_messages.add(message_id)
message_body = json.loads(record['body'])
# 翻訳処理
translated_title = translate_with_bedrock(message_body['タイトル'], "タイトル")
time.sleep(30) # APIレート制限回避
translated_summary = translate_with_bedrock(message_body['本文'], "本文")
if not translated_title or not translated_summary:
return False
# 更新データの準備
article_data = {
'link': message_body['link'],
'公開日': message_body['公開日'],
'translated_title': translated_title,
'translated_summary': translated_summary
}
# DynamoDB更新と元データ削除
if update_dynamodb_item(article_data):
aws['sqs'].delete_message(
QueueUrl=QUEUE_URL,
ReceiptHandle=record['receiptHandle']
)
return True
return False
def lambda_handler(event: Dict[str, Any], context: Any) -> Dict[str, Any]:
start_time = datetime.now()
try:
print("\n=== 翻訳処理開始 ===")
processed_messages: Set[str] = set()
processed_count = sum(1 for record in event['Records'] if process_message(record, processed_messages))
execution_time = (datetime.now() - start_time).total_seconds()
print(f"=== 翻訳処理終了 === 実行時間: {execution_time:.2f}秒\n")
return {
'statusCode': 200,
'body': json.dumps({
'message': f'{processed_count}件のメッセージを処理しました',
'execution_time': f"{execution_time:.2f}秒"
}, ensure_ascii=False)
}
except Exception as e:
execution_time = (datetime.now() - start_time).total_seconds()
return {
'statusCode': 500,
'body': json.dumps({
'error': f"エラーが発生しました: {e}",
'execution_time': f"{execution_time:.2f}秒"
}, ensure_ascii=False)
}
通知Lambda
import json
import boto3
import urllib.request
from datetime import datetime
from decimal import Decimal
from boto3.dynamodb.conditions import Key
from typing import List, Dict, Any, Optional
# 定数定義
class Config:
"""設定値の管理"""
LINE_API_URL = 'https://api.line.me/v2/bot/message/broadcast'
DATE_FORMAT = '%Y/%m/%d'
SEPARATOR = '-' * 30
class AWSResources:
"""AWSリソースの初期化と管理"""
def __init__(self):
self.dynamodb = boto3.resource('dynamodb')
self.article_table = self.init_table()
def get_parameter(self, parameter_name: str) -> str:
"""Parameter Storeからパラメータを取得"""
return boto3.client('ssm').get_parameter(
Name=parameter_name,
WithDecryption=True
)['Parameter']['Value']
def init_table(self):
"""DynamoDBテーブルの初期化"""
table_name = self.get_parameter('/what-aws-news/dynamodb/article/table-name')
return self.dynamodb.Table(table_name)
class LineNotifier:
"""LINE通知の管理"""
def __init__(self, aws_resources: AWSResources):
self.aws = aws_resources
self.token = self._get_token()
def _get_token(self) -> Optional[str]:
"""LINE Access Tokenを取得"""
try:
return self.aws.get_parameter('/what-aws-news/Line_Access_Token')
except Exception as e:
print(f"Error getting LINE token: {e}")
return None
def _build_message(self, articles: List[Dict[str, Any]]) -> str:
"""通知メッセージを構築"""
today = datetime.now().strftime(Config.DATE_FORMAT)
message = f"{today}の更新は{len(articles)}件でした\n\n"
for i, article in enumerate(articles, 1):
message += f"""{article['翻訳タイトル']}
更新内容:
{article['翻訳本文']}
詳細はこちら:
{article['link']}
{Config.SEPARATOR if i < len(articles) else ''}\n\n"""
return message.strip()
def send_message(self, articles: List[Dict[str, Any]]) -> bool:
"""LINE通知を送信"""
if not articles:
print("通知対象の記事がありません")
return False
if not self.token:
print("LINE token not found")
return False
data = {
'messages': [{
'type': 'text',
'text': self._build_message(articles)
}]
}
try:
req = urllib.request.Request(
Config.LINE_API_URL,
data=json.dumps(data).encode('utf-8'),
headers={
'Content-Type': 'application/json',
'Authorization': f'Bearer {self.token}'
},
method='POST'
)
with urllib.request.urlopen(req) as response:
success = response.status == 200
if success:
print(f"LINE message sent successfully ({len(articles)}件の記事をまとめて送信)")
else:
print(f"LINE API error: {response.status}")
return success
except Exception as e:
print(f"Error sending LINE message: {e}")
return False
class ArticleManager:
"""記事データの管理"""
def __init__(self, aws_resources: AWSResources):
self.table = aws_resources.article_table
def get_todays_articles(self) -> List[Dict[str, Any]]:
"""今日の記事を取得"""
try:
today = datetime.now().strftime(Config.DATE_FORMAT)
print(f"検索対象の日付: {today}")
response = self.table.query(
IndexName='publish-date',
KeyConditionExpression=Key('取得日').eq(today)
)
translated_articles = [
item for item in response['Items']
if '翻訳タイトル' in item and '翻訳本文' in item
]
print(f"今日({today})の記事数: {len(response['Items'])}")
print(f"翻訳済みの記事数: {len(translated_articles)}")
return translated_articles
except Exception as e:
print(f"DynamoDB取得エラー: {e}")
return []
def json_serializer(obj):
"""JSON シリアライザ"""
if isinstance(obj, Decimal):
return int(obj)
raise TypeError(f'Type {type(obj)} is not serializable')
def lambda_handler(event: Dict[str, Any], context: Any) -> Dict[str, Any]:
try:
print("=== LINE通知処理開始 ===")
# リソースの初期化
aws = AWSResources()
article_manager = ArticleManager(aws)
line_notifier = LineNotifier(aws)
# 記事の取得と通知
articles = article_manager.get_todays_articles()
if not articles:
return {
'statusCode': 200,
'body': json.dumps({'message': '通知対象の記事がありません'})
}
notification_success = line_notifier.send_message(articles)
status_code = 200 if notification_success else 500
return {
'statusCode': status_code,
'body': json.dumps({
'message': f'{len(articles)}件の記事を通知しました' if notification_success else 'LINE通知に失敗しました',
'articles': articles
}, ensure_ascii=False, default=json_serializer)
}
except Exception as e:
error_msg = f"エラーが発生しました: {e}"
print(error_msg)
return {
'statusCode': 500,
'body': json.dumps({'error': error_msg}, ensure_ascii=False)
}
📊 運用実績
実際に運用してみた結果画像の通り最新情報を毎日通知してくれるようになりました!
めちゃ便利!
以下の点が今回運用してみて感じたメリットですね
- ✅ AWSの更新情報を自動でキャッチ
- ✅ 日本語での素早い情報把握
- ✅ 運用コストの最小化
🔧 今後の改善案
1. 翻訳品質の向上
- Bedrockのモデル最適化
- claude 3.5を使いましたが、
今後出てくるモデルによっては変更しても問題なさそうです
- claude 3.5を使いましたが、
- 他の翻訳APIとの比較検討
2. 通知の最適化
- 記事の重要度に基づくフィルタリング
- 例えばre:Inventの時期などは重要度の大きいアップデートがあると思うので
その時期だけ通知方法を変えるとか
それ以外のフィルタリングもBedrockでやってくれそうな気もする
- 例えばre:Inventの時期などは重要度の大きいアップデートがあると思うので
3. コスト最適化
- DynamoDBの課金モデル見直し
- 要勉強!
- リソース使用量の監視と調整
- 今のところは問題ないけどログが増えてきたりしたら考えなきゃですね
- DynamoDBではttlを設定しているのでそこまで大きいコストは発生しないはず
🎉 まとめ
最後にまとめです!
このシステムにより、
- ✨ AWS更新情報の自動キャッチ
- 🔄 効率的な翻訳処理
- 📱 タイムリーなLINE通知
- 💰 コスト効率の良い運用
が実現できました!
AWSの最新情報をキャッチアップしたい方は、ぜひ試してみてください。
改善案やフィードバックがありましたら、コメントお待ちしています! 😊