アドベントカレンダーを盛り上げるために
今までは、会社でアドベントカレンダーを盛り上げるには、記事が投稿されました!と会社のSlackに手動で投稿しておりました。
それだと、エンジニアっぽくないということなので、自動化しようと考えました。
とりあえずはQiitaにはAPIがあるので、そちらを使用して記事が投稿されたことを、会社のSlackに投げる仕組みを組んでみました。
前準備
準備が必要です。
Qiita APIのアクセストークン
APIを実際に叩いて確認してからプログラムを組みます。
そして、叩くにはアクセストークンが必要です。
ログインして設定-アプリケーションに行きます。
スコープはread_qiitaだけで大丈夫です。
発行されたら、トークンをコピーします。
Slackのアクセストークン
Slackに投稿するには、こちらもSlackのアクセストークンが必要になりますので設定します。
アプリケーション作成で、From scratchを選択します。
名前は適当に書いてワークスペースを選択します。
作成したら、左メニューから[OAuth & Permissions]を選択してスコープを設定します。
chat:writeとincoming-webhookを設定したら、アプリケーションのインストールをするときに対象のチャンネルを選ぶと設定されます。
インストールが終わったら左メニューのIncoming Webhooksから、Webhook URLが必要なので、必要なチャンネル分だけコピーしておきます。
これで事前準備は完了です。
フィジビリティ確認
次はできるかどうかの確認から入ります。
curlを使用してQiita APIを叩いてみます。
jqコマンドを介在してタイトルだけ取ってみました。
curl -sSH "Authorization: Bearer hoge" 'https://qiita.com/api/v2/items?query=user:sapi_kawahara' | jq '.[]|.title'
ここまで出来れば!と思いましたが、残念ながら1回のAPIでは20記事しか取得できないので、何度か叩かないといけないようです。
何件という情報は、Qiita APIのページネーションの項目によると、APIのヘッダーに含まれているようです。
それを確認するため、再びcurlで確認します。
curl -sS --head -X GET -H "Authorization: Bearer hoge" 'https://qiita.com/api/v2/items?query=user:sapi_kawahara' | grep -e 'link' -e 'total-count'
total-count: 29
というのが取得できました。
これでQiita APIを使用した記事の取得ができそうです。
データベース
Qiita APIを取得して、それをデータベースに入れておきます。
データベースの設定は、こちらの記事に書きました。
プログラミング
ここまで出来たら、あとはプログラムを書くだけです。
今回はPythonを使用しました。
python -m venv .venv
source .venv/bin/activate
で、とりあえず仮想環境を作ってから、プログラムを書きます。
使うライブラリーは、これぐらいかな?
- requests
- dotenv
- csv
- os
- math
- slackweb
- psycopg2
- json
requirements.txtを作成して、pip -r requirements.txt
でライブラリーをインストールします。
certifi==2024.8.30
charset-normalizer==3.4.0
idna==3.10
psycopg2==2.9.10
python-dateutil==2.9.0.post0
python-dotenv==1.0.1
pytz==2024.2
requests==2.32.3
six==1.16.0
slackweb==1.0.5
tzdata==2024.2
urllib3==2.2.3
次にQiita APIとSlackのwebhook URLを.envに設定しておきます。
key=hoge
webhook_new_article=https://hooks.slack.com/services/1つ目のチャンネル,https://hooks.slack.com/services/2つ目のチャンネル
あと、参加するQiita アカウントをCSVファイルで用意します。
id,advent_calendar
sapi_kawahara,1
あとは、頑張って作ってみました。
import psycopg2
def get_connection():
connection = psycopg2.connect("host=localhost dbname=postgres")
return connection
import psycopg2
import json
import connector
def id_search(id):
with connector.get_connection() as conn:
with conn.cursor() as cursor:
cursor.execute("SELECT id FROM QIITA_ARTICLE WHERE id = %s", (id,))
for id in cursor:
return True
return False
def insert_article(item):
with connector.get_connection() as conn:
with conn.cursor() as cursor:
cursor.execute("INSERT INTO QIITA_ARTICLE (id, title, user_id, likes_count, aricle_created_at, created_at, json_data) VALUES(%s, %s, %s, %s, %s, now(), %s)", (item['id'], item['title'], item['user']['id'], item['likes_count'], item['created_at'], json.dumps(item, ensure_ascii=False)))
import requests
import os
import sys
import time
def request_handle(url):
for i in range(3):
respons_all = request_main(url)
d = respons_all.json()
if respons_all.status_code == 403:
if d['message'] == 'Rate limit exceeded':
print('This is now an API limitation. Stop for 1 hour.')
time.sleep(3600)
print('Continue.')
elif respons_all.status_code != 200:
print('{} times,API System Down.'.format(i))
else:
return d, respons_all.headers
if i == 3:
print('API System Down exit.')
sys.exit(1)
def request_main(url):
headers = {
'Authorization': 'Bearer ' + os.getenv('key')
}
try:
response = requests.get(url=url, headers=headers, timeout=60)
return response
except requests.exceptions.RequestException as e:
print(e)
sys.exit(1)
import math
import csv
import os
from dotenv import load_dotenv
import slackweb
import api_request
import qiita_article
def main():
url = 'https://qiita.com/api/v2/items?query=user:{user}'
users = 'users.csv'
with open(users, newline='') as csvfile:
reader = csv.DictReader(csvfile)
list = [row for row in reader]
for user in list:
if user['advent_calendar'] == '1':
url_page = url.format(user=user['id'])
(respons_data, respons_headers) = api_request.request_handle(url_page)
total_count = respons_headers['Total-Count']
article_insert(respons_data)
for i in range(2, math.ceil(int(total_count) / 20) +1):
url_page = url.format(user=user['id']) + '&page=' + str(i)
(respons_data, respons_headers) = api_request.request_handle(url_page)
article_insert(respons_data)
def article_insert(respons_data):
count = 0
for item in respons_data:
# print('{title}:https://qiita.com/{user_id}/items/{id} is search.'.format(title=item['title'], user_id=item['user']['id'], id=item['id']))
if not (qiita_article.id_search(item['id'])):
qiita_article.insert_article(item)
slack_notification(item)
print('Insert OK')
count += 1
def slack_notification(item):
urls = os.getenv('webhook_new_article').split(',')
for url in urls:
print('{title}:https://qiita.com/{user_id}/items/{id} is posted.'.format(title=item['title'], user_id=item['user']['id'], id=item['id']))
slack = slackweb.Slack(url=url)
text = """
Qiitaに記事が投稿されたよ🎉
{user_id} さん!ありがとうございます!
Title:{title}
URL: https://qiita.com/{user_id}/items/{id}
早速、いいね:hearts:をしましょう
""".format(user_id=item['user']['id'], title=item['title'], id=item['id']).strip()
slack.notify(text=text.strip())
if __name__ == '__main__':
load_dotenv()
main()
初回は、slack_notificationに行かないようにしてから、python get_user_article.py
と実行します。
そのあとは、slack_notificationを復活させてから、cronか何かで実行させると、新着記事があるたびに会社のSlackに通知メッセージが投稿されるようになります。