3
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

Githubに完全版を上げました。
https://github.com/CinnamonSea2073/qiita_supporter-bot

読み飛ばしてください

おはようございます、しなもんです。

Qiita Engineer Festa 2024、如何お過ごしでしょうか。
弊Organizationも、記事をたくさん書いています。

さて、仲間内で記事が投稿されたら称え合おう?ということで
モチベを少しでも高めるべく、記事が投稿されたらDiscordに通知してくれるプログラムを書きました。

少しでも参考になれば幸いです。

仕組みを考える

とりあえず以下の感じで作っていこうと思います。

10分毎にQiitaのRSSを取得
↓
10分前から現在までの時間に投稿された記事を導出
↓
DiscordにWebhookで通知

環境構築

とりあえずDockerでよしなに構築。

/
├─app
│   └─main.py
└─docker
    ├─requirements.txt
    ├─devcontainer.json
    └─Dockerfile
requirements.txt
feedparser
python-dateutil
requests
compose.yaml
services:
  bot:
    build:
      context: ../docker
      dockerfile: Dockerfile
    volumes:
      - ../app:/app
    working_dir: /app
    environment:
      - TZ=Asia/Tokyo
    command: python main.py
    restart: always
    tty: true
dockerfile
FROM python:3.9

WORKDIR /app

COPY requirements.txt .

RUN pip install --no-cache-dir -r requirements.txt

COPY . .

CMD [ "python", "main.py" ]

そういえばdocker-compose.ymlは非推奨で、正しくはcompose.yamlらしいですよ。

10分毎にOrganizationのRSSを取得

main.py
import time
import feedparser

def fetch_rss(url):
    feed = feedparser.parse(url)
    entries = []
    for entry in feed.entries:
        entries.append({
            'title': entry.title,
            'link': entry.link,
            'published': entry.published
        })
    return entries

def main():
    # QiitaのRSSフィードのURL ユーザーのRSSでもよい
    rss_url = 'https://qiita.com/organizations/team-shahu/activities.atom'
    while True:
        rss_data = fetch_rss(rss_url)
        print(rss_data)
        time.sleep(600)  # 10分待ち

if __name__ == '__main__':
    main()

簡単に書きました。

image.png

取得できていそうです。

10分前から現在までの時間に投稿された記事を導出

dateutilからparserをimportする

main.py
import time
import feedparser
+ from dateutil import parser

main関数を変更

main.py
def main():
    rss_url = 'https://qiita.com/organizations/team-shahu/activities.atom'  # QiitaのRSSフィードのURL ユーザーのRSSでもよい
    while True:
        rss_data = fetch_rss(rss_url)
+       current_time = time.time()
+       filtered_data = [
+           entry for entry in rss_data
+           if current_time - (parser.parse(entry['published']).timestamp()) <= 600
+       ]
+       if filtered_data:
+           print(filtered_data)
+       else:
+           print('No new entries.')
        time.sleep(600)  # 10分待ち

動作確認したいけど記事投稿されないとできない...

DiscordにWebhookで通知

requestsをimportする

main.py
import time
import feedparser
from dateutil import parser
+ import requests

DiscordにWebhookを送信する関数を追加する

main.py
def send_discord_message(webhook_url, message):
    payload = {'content': message}
    response = requests.post(webhook_url, json=payload)
    if response.status_code == 204:
        print('Message sent successfully.')
    else:
        print('Failed to send message.')

main関数を変更

main.py
def main():
    rss_url = 'https://qiita.com/organizations/team-shahu/activities.atom'  # QiitaのRSSフィードのURL ユーザーのRSSでもよい
+   webhook_url = 'https://discord.com/api/webhooks/1234567890/abcdefg' # DiscordのWebhook URL
    while True:
        rss_data = fetch_rss(rss_url)
        current_time = time.time()
        filtered_data = [
            entry for entry in rss_data
            if current_time - (parser.parse(entry['published']).timestamp()) <= 600
        ]
        if filtered_data:
            print(filtered_data)
+           send_discord_message(webhook_url, "Organazitionに新しい記事が投稿されました!\n"+"\n".join([f"**{entry['title']}**\n{entry['link']}\n" for entry in filtered_data]))
        else:
            print('No new entries.')
        time.sleep(600)  # 10分待ち

動作確認

とりあえず、10分内に投稿された記事というフィルターを外して、全部通知してもらいます。

main.py
        filtered_data = [
            entry for entry in rss_data
+           # if current_time - (parser.parse(entry['published']).timestamp()) <= 600
        ]

image.png

できてそうです。

使いやすいようにリファクタリング

import time
import feedparser
from dateutil import parser
import requests

INTERVAL = 600  # 10分
RSS_URL = 'https://qiita.com/organizations/team-shahu/activities.atom'  # QiitaのRSSフィードのURL ユーザーのRSSでもよい
DISCORD_URL= 'https://discord.com/api/webhooks/123456/abcde' # DiscordのWebhook URL

class Article:
    def __init__(self, entry):
        self.title = entry.title
        self.link = entry.link
        self.published = parser.parse(entry.published)

    def __str__(self):
        return f'**{self.title}**\n{self.link}\n{self.published.strftime("%Y/%m/%d %H:%M:%S")}'

    def is_during_interval(self):
        current_time = time.time()
        return current_time - self.published.timestamp() <= INTERVAL

def fetch_rss(url):
    feed = feedparser.parse(url)
    entries: list[Article] = []
    for entry in feed.entries:
        article = Article(entry)
        entries.append(article)
    return entries

def send_discord_message(webhook_url, message):
    payload = {'content': message}
    response = requests.post(webhook_url, json=payload)
    if response.status_code == 204:
        print('Message sent successfully.')
    else:
        print('Failed to send message.')
        print(response.text)

def main():
    while True:
        rss_data = fetch_rss(RSS_URL)
        filtered_data = [
            entry for entry in rss_data if entry.is_during_interval()
        ]
        if filtered_data:
            content = "Organazitionに新しい記事が投稿されました!\n\n"+"\n".join([str(data) for data in filtered_data])
            print(content)
            send_discord_message(DISCORD_URL, content)
        else:
            print('No new entries.')
        time.sleep(INTERVAL)  # 10分待ち

if __name__ == '__main__':
    main()

最後に

ユーザーかOrganizationが記事が投稿したらコードを書きました。
コンテナを起動させるだけで試せるので、よかったらどうぞ。

では。

3
1
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
3
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?