こんにちは!
ポーラ・オルビスホールディングスのITプロダクト開発チームでスクラムマスターをしている川田です。
良質なアウトプットにはまずインプットから💪、ということで、私たちのチームではテック系の情報をRSSで収集してチーム内で共有する活動を行っています。
Slackに RSSフィードを追加する 機能があるため当初はそれを用いていたのですが、連続で記事が投稿されてどんどん流れてしまうため、若干の見づらさを感じるようになってしまいました💦
今回は、その見づらさを解消するために仕組みを作ったよ、というお話です。
作ったもの
指定されたRSSフィードから定期的に内容を取得し、更新があった場合はSlackにメッセージを送ります。
そのメッセージを親スレッドとし、スレッドの返信として記事の内容やリンクを投稿します。
対象のチャンネルには親スレッドだけが見えるような形になるので、更新された記事の数が多くなってもチャンネルの内容が流れずに見やすくなりました🙌
詳細
全体の構成
構成としては以下の通りです。LambdaでRSSフィードを取得・解析し、Slack App経由でSlackのチャンネルに投稿します。
投稿済みの記事情報を管理するためにDynamoDBを利用しています。また、EventBridgeによってLambdaを定期的に実行し、フィードの内容を確認しています。
DynamoDBの内容
DynamoDBではRSSフィードのURLをパーティションキーとして保持し、RSSの最終更新日時と前回更新時のフィードに含まれていた記事のIDを記録するようにしています。
{
"url": {
"S": "https://qiita.com/popular-items/feed"
},
"last_updated": {
"S": "2026-04-23T20:00:00+00:00"
},
"entry_ids": {
"L": [
{
"S": "tag:qiita.com,2005:PublicArticle/1111111"
},
{
"S": "tag:qiita.com,2005:PublicArticle/2222222"
},
...
]
}
}
Lambdaの内容
Lambdaでは、大まかに以下の流れで処理を実行しています。
- RSSフィードを取得する
- 更新日時が前回取得時より新しくなっていたら、前回取得時の記事IDと今回の記事IDを比較する
- 新しい記事が1件以上あれば、Slackへ投稿する
RSSフィードの取得には、feedparser というPythonのライブラリを使用しました。こちらのライブラリを使用することでRSS1.0/RSS2.0/Atomの違いを意識せずに扱うことができるため、非常に助かりました👍
なおサードパーティー製のライブラリとなるため、Lambdaで利用する際はLayerを作成する必要がある点にご注意ください。
参考:全体のコードはこちら
import calendar
import json
import os
from datetime import datetime, timedelta, timezone
from typing import Optional
import boto3
import feedparser
import requests
from botocore.exceptions import ClientError
# DynamoDB設定
DYNAMODB_TABLE_NAME = "your-table-name"
DYNAMODB_REGION = "your-region-name"
dynamodb = boto3.resource("dynamodb", region_name=DYNAMODB_REGION)
table = dynamodb.Table(DYNAMODB_TABLE_NAME)
def get_last_updated(url: str) -> Optional[tuple[datetime, set[str]]]:
"""DynamoDBからURLに対応する最終更新日時と記事IDセットを取得する"""
try:
response = table.get_item(Key={"url": url})
item = response.get("Item")
if item:
last_updated = datetime.fromisoformat(item["last_updated"])
entry_ids = set(item.get("entry_ids", []))
return last_updated, entry_ids
except ClientError as e:
print(f"DynamoDB取得エラー: {e}")
return None
def save_last_updated(url: str, updated: datetime, entry_ids: set[str]) -> None:
"""DynamoDBにURLに対応する最終更新日時と記事IDセットを保存する"""
try:
table.put_item(
Item={
"url": url,
"last_updated": updated.isoformat(),
"entry_ids": list(entry_ids),
}
)
except ClientError as e:
print(f"DynamoDB保存エラー: {e}")
JST = timezone(timedelta(hours=9))
def struct_time_to_datetime(t) -> datetime:
"""feedparserのtime.struct_time(UTC)をdatetimeオブジェクト(UTC)に変換する"""
return datetime.fromtimestamp(calendar.timegm(t), tz=timezone.utc)
def post_slack_message(
token: str, channel: str, text: str, thread_ts: Optional[str] = None
) -> dict:
"""Slackにメッセージを投稿する。thread_tsを指定するとスレッド返信になる"""
payload: dict = {"channel": channel, "text": text}
if thread_ts:
payload["thread_ts"] = thread_ts
response = requests.post(
"https://slack.com/api/chat.postMessage",
headers={
"Authorization": f"Bearer {token}",
"Content-Type": "application/json",
},
json=payload,
timeout=10,
)
data = response.json()
if not data.get("ok"):
print(f"Slack投稿エラー: {data.get('error')}")
return data
def lambda_handler(event, context):
# Lambdaの環境変数からSlack Botトークンと投稿先チャンネルIDを取得する
slack_token = os.environ["SLACK_BOT_TOKEN"]
slack_channel = os.environ["SLACK_CHANNEL_ID"]
# RSSフィードのURLを指定
# 新しいフィードを追加したい場合はこの配列にURLを追加してください
urls = [
"https://qiita.com/popular-items/feed",
]
for url in urls:
# フィードを解析
feed = feedparser.parse(url)
current_updated = struct_time_to_datetime(feed.feed.updated_parsed)
# 今回のフィードの全エントリIDを収集
current_entry_ids = {entry.get("id", entry.link) for entry in feed.entries}
# DynamoDBから、urlをkeyとして前回実行時の最新記事の公開日時と記事IDセットを取得する
feed_state = get_last_updated(url)
# 取得できなかった場合は、今回取得した公開日時と全エントリIDを保存してcontinueする
if feed_state is None:
save_last_updated(url, current_updated, current_entry_ids)
continue
last_updated, last_entry_ids = feed_state
# 取得できた場合は、今回取得した公開日時と比較して、今回の方が新しければ後続の処理を行う
if current_updated <= last_updated:
continue
# 今回取得した全エントリIDを保存する
save_last_updated(url, current_updated, current_entry_ids)
# 前回実行時のエントリIDに含まれない新着エントリのIDリストを作成する
new_entry_ids = {
entry.get("id", entry.link)
for entry in feed.entries
if entry.get("id", entry.link) not in last_entry_ids
}
# 新着エントリが1件もなければSlackへの投稿をスキップする
if not new_entry_ids:
continue
# Slackにフィードの基本情報を投稿(スレッドの親メッセージ)
parent_text = (
f"「{feed.feed.title}」の最新情報をお届けするよ!\n"
f"更新日時: {current_updated.astimezone(JST).strftime('%Y/%m/%d %H:%M:%S')}"
)
result = post_slack_message(slack_token, slack_channel, parent_text)
thread_ts = result.get("ts")
for entry in feed.entries:
# あらかじめ作成済みの新着IDリストに含まれているエントリのみ処理する
if entry.get("id", entry.link) not in new_entry_ids:
continue
# Slackに記事の情報を投稿(スレッドの返信メッセージ)
entry_text = f"*<{entry.link}|{entry.title}>*\n{entry.summary}"
post_slack_message(
slack_token, slack_channel, entry_text, thread_ts=thread_ts
)
return {"statusCode": 200, "body": json.dumps("OK")}
おまけ:購読しているRSS
現在は以下の5つを購読しています!
- Qiita - 人気の記事(https://qiita.com/popular-items/feed)
- Zennのトレンド(https://zenn.dev/feed)
- 企業テックブログRSS(https://yamadashy.github.io/tech-blog-rss-feed/feeds/rss.xml)
- CodeZine:新着一覧(https://codezine.jp/rss/new/20/index.xml)
- TECH PLAY(https://rss.techplay.jp/event/w3c-rss-format/rss.xml)
さいごに
結果的にRSSリーダーを作る形になってしまいましたが、RSSのしくみ等が勉強になってよかったです😎
Slack通知の前段にLambdaで処理を入れられる構成になったので、将来的には特定のキーワードによるフィルタリングやAIによる要約にもトライできればと思います!


