Githubに完全版を上げました。
https://github.com/CinnamonSea2073/qiita_supporter-bot
読み飛ばしてください
おはようございます、しなもんです。
Qiita Engineer Festa 2024、如何お過ごしでしょうか。
弊Organizationも、記事をたくさん書いています。
さて、仲間内で記事が投稿されたら称え合おう?ということで
モチベを少しでも高めるべく、記事が投稿されたらDiscordに通知してくれるプログラムを書きました。
少しでも参考になれば幸いです。
仕組みを考える
とりあえず以下の感じで作っていこうと思います。
10分毎にQiitaのRSSを取得
↓
10分前から現在までの時間に投稿された記事を導出
↓
DiscordにWebhookで通知
環境構築
とりあえずDockerでよしなに構築。
/
├─app
│ └─main.py
└─docker
├─requirements.txt
├─devcontainer.json
└─Dockerfile
feedparser
python-dateutil
requests
services:
bot:
build:
context: ../docker
dockerfile: Dockerfile
volumes:
- ../app:/app
working_dir: /app
environment:
- TZ=Asia/Tokyo
command: python main.py
restart: always
tty: true
FROM python:3.9
WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
COPY . .
CMD [ "python", "main.py" ]
そういえばdocker-compose.yml
は非推奨で、正しくはcompose.yaml
らしいですよ。
10分毎にOrganizationのRSSを取得
import time
import feedparser
def fetch_rss(url):
feed = feedparser.parse(url)
entries = []
for entry in feed.entries:
entries.append({
'title': entry.title,
'link': entry.link,
'published': entry.published
})
return entries
def main():
# QiitaのRSSフィードのURL ユーザーのRSSでもよい
rss_url = 'https://qiita.com/organizations/team-shahu/activities.atom'
while True:
rss_data = fetch_rss(rss_url)
print(rss_data)
time.sleep(600) # 10分待ち
if __name__ == '__main__':
main()
簡単に書きました。
取得できていそうです。
10分前から現在までの時間に投稿された記事を導出
dateutilからparserをimportする
import time
import feedparser
+ from dateutil import parser
main関数を変更
def main():
rss_url = 'https://qiita.com/organizations/team-shahu/activities.atom' # QiitaのRSSフィードのURL ユーザーのRSSでもよい
while True:
rss_data = fetch_rss(rss_url)
+ current_time = time.time()
+ filtered_data = [
+ entry for entry in rss_data
+ if current_time - (parser.parse(entry['published']).timestamp()) <= 600
+ ]
+ if filtered_data:
+ print(filtered_data)
+ else:
+ print('No new entries.')
time.sleep(600) # 10分待ち
動作確認したいけど記事投稿されないとできない...
DiscordにWebhookで通知
requestsをimportする
import time
import feedparser
from dateutil import parser
+ import requests
DiscordにWebhookを送信する関数を追加する
def send_discord_message(webhook_url, message):
payload = {'content': message}
response = requests.post(webhook_url, json=payload)
if response.status_code == 204:
print('Message sent successfully.')
else:
print('Failed to send message.')
main関数を変更
def main():
rss_url = 'https://qiita.com/organizations/team-shahu/activities.atom' # QiitaのRSSフィードのURL ユーザーのRSSでもよい
+ webhook_url = 'https://discord.com/api/webhooks/1234567890/abcdefg' # DiscordのWebhook URL
while True:
rss_data = fetch_rss(rss_url)
current_time = time.time()
filtered_data = [
entry for entry in rss_data
if current_time - (parser.parse(entry['published']).timestamp()) <= 600
]
if filtered_data:
print(filtered_data)
+ send_discord_message(webhook_url, "Organazitionに新しい記事が投稿されました!\n"+"\n".join([f"**{entry['title']}**\n{entry['link']}\n" for entry in filtered_data]))
else:
print('No new entries.')
time.sleep(600) # 10分待ち
動作確認
とりあえず、10分内に投稿された記事
というフィルターを外して、全部通知してもらいます。
filtered_data = [
entry for entry in rss_data
+ # if current_time - (parser.parse(entry['published']).timestamp()) <= 600
]
できてそうです。
使いやすいようにリファクタリング
import time
import feedparser
from dateutil import parser
import requests
INTERVAL = 600 # 10分
RSS_URL = 'https://qiita.com/organizations/team-shahu/activities.atom' # QiitaのRSSフィードのURL ユーザーのRSSでもよい
DISCORD_URL= 'https://discord.com/api/webhooks/123456/abcde' # DiscordのWebhook URL
class Article:
def __init__(self, entry):
self.title = entry.title
self.link = entry.link
self.published = parser.parse(entry.published)
def __str__(self):
return f'**{self.title}**\n{self.link}\n{self.published.strftime("%Y/%m/%d %H:%M:%S")}'
def is_during_interval(self):
current_time = time.time()
return current_time - self.published.timestamp() <= INTERVAL
def fetch_rss(url):
feed = feedparser.parse(url)
entries: list[Article] = []
for entry in feed.entries:
article = Article(entry)
entries.append(article)
return entries
def send_discord_message(webhook_url, message):
payload = {'content': message}
response = requests.post(webhook_url, json=payload)
if response.status_code == 204:
print('Message sent successfully.')
else:
print('Failed to send message.')
print(response.text)
def main():
while True:
rss_data = fetch_rss(RSS_URL)
filtered_data = [
entry for entry in rss_data if entry.is_during_interval()
]
if filtered_data:
content = "Organazitionに新しい記事が投稿されました!\n\n"+"\n".join([str(data) for data in filtered_data])
print(content)
send_discord_message(DISCORD_URL, content)
else:
print('No new entries.')
time.sleep(INTERVAL) # 10分待ち
if __name__ == '__main__':
main()
最後に
ユーザーかOrganizationが記事が投稿したらコードを書きました。
コンテナを起動させるだけで試せるので、よかったらどうぞ。
では。