POLプロダクト Advent Calendar 2021 の12日目担当、プロダクト部でエンジニア/エンジニア広報をしている @sho-kanamaru です!Twitterアカウントはこちらでございます。
11日目担当のリコピンさん(@ryu19-1)からバトンを受けました。AppSync最強!!!
さてさて、本日12月12日は漢字の日らしいです!
毎年「今年の漢字」が発表されていますが、今年は13日午後2時に発表みたいですね!(この記事書くから今日発表してほしかった。。)
せっかく漢字の日だし、今年の漢字っぽいことできないかな〜と考えたときに、「Slackのスタンプで今年1年を表せるのでは?」と思ったのでちょっと作ってみました!
完成イメージ
@nenmatsu
メンションをすると、「2021年は、あなたは◯◯な1年でした!」と教えてくれます。(ついでに、2021年にもらったスタンプベスト3も)
全体構成
今回使った技術と流れはこんな感じ
- Slack Event SubscriptionsからメンショントリガーでAPI Gatewayを呼ぶ
- API GatewayをトリガーにLambdaの関数を実行
- LambdaからS3のデータを読み込む
- SlackのWebhook URLを使用してメッセージ投稿
では実際に作っていきましょう!
新しいSlackアプリを作成
こちらから新しいSlackアプリを作成します。
管理者権限を持ってない人はこのような警告が出ると思います。
いろいろと設定を変えるごとに管理者に承認してもらわないといけないのでちょっと大変です。。
Webhook URLの取得
次に、LambdaからSlackに投稿をするためにWebhook URLを取得します。
投稿したいチャンネルを選びましょう。
Scopeの設定
次に、SlackのAPIを叩くために、OAuth Scopeを設定します。
「OAuth & Permissions > Scopes > User Token Scopes」
ちなみに、追加するべきScopeはAPI methodsのページに記載されてあります。
例えば、users.listのAPIを叩く場合には、「users:read」のScopeが必要になります。
2021年のSlackのデータを取得する
やりたいことを実現するために、SlackのAPIを叩いてデータを取得します。
-
users.list
- ユーザー一覧取得
-
conversations.list
- チャンネル一覧取得
-
coversations.history
- メッセージ一覧取得
- 今回はスレッドの情報は取得していません
- スレッドの情報も欲しい場合はconversations.repliesのAPIも叩く必要があります
では、それぞれ叩いていきましょう!
ユーザー一覧の取得
endpoint = 'https://slack.com/api/users.list'
data = requests.get(endpoint, headers=headers, params={}).json()
df_member = pd.DataFrame(data['members'])
チャンネル一覧の取得
next_cursor = ''
channel_list = []
while True:
endpoint = 'https://slack.com/api/conversations.list'
payload = {
'exclude_archived': True,
'limit': 200,
'cursor': next_cursor
}
data = requests.get(endpoint, headers=headers, params=payload).json()
next_cursor = data['response_metadata']['next_cursor']
channels = data['channels']
channel_list.extend(channels)
if (next_cursor == ""):
break
df_channel = pd.DataFrame(channel_list)
アーカイブされていないチャンネルを取得したいときに、以下のようなリクエストだと取れない場合があるので注意が必要です。
payload = {
'exclude_archived': True,
'limit': 200
}
ドキュメントによると、
When paginating, any filters used in the request are applied after retrieving a virtual page’s limit. For example. using exclude_archived=true when limit=20 on a virtual page that would contain 15 archived channels will return you the virtual page with only 5 results. Additional results are available from the next cursor value.
つまり、limitを指定しても必ずしもlimitで指定した分のアーカイブされていないチャンネルが取れるわけではないので、next_cursor
をリクエストに入れてあげる必要があります。
メッセージ一覧の取得
def fetch_messages_by_channel(channel_id):
ls_messages = []
latest_ts = None
start_date = pd.to_datetime('2021-01-01')
endpoint = 'https://slack.com/api/conversations.history'
while True:
payload = {
'channel': channel_id,
'latest': latest_ts,
'limit': 200
}
data = requests.get(endpoint, headers=headers, params=payload).json()
messages = data['messages']
ls_messages.extend(messages)
if data['has_more']:
time.sleep(1)
latest_ts = messages[-1]['ts']
latest_date = pd.to_datetime(latest_ts, unit='s')
if latest_date < start_date:
break
else:
break
df_message = pd.DataFrame(ls_messages)
df_message['channel_id'] = channel_id
# 2021年の発言に絞る
df_message['dt'] = pd.to_datetime(df_message['ts'], unit='s')
df_message = df_message.query('@start_date < dt')
return df_message
ここでもリクエストを適切に設定してメッセージを取得しましょう。
ドキュメントによると
If the response includes has_more then the client can make another call, using the ts value of the final messages as the latest param to get the next page of messages.
取得した最後のメッセージの ts
の値を 次のリクエストの lastest
に入れることで続きからメッセージを取得できます。
また、SlackのAPIにはリクエスト数に制限があります。
conversations.historyはTier3のメソッドで、1分間に約50リクエスト受け付けることできるので、1秒間のリクエスト間隔を空けています。
そして、先ほど取得したチャンネル数を使用してこの関数を呼び出し、チャンネルごとのメッセージを取得しましょう。
df_message_list = []
err_channel_id_list = []
for channel in df_channel.itertuples():
if ('_log_' in channel.name):
continue
try:
df = fetch_messages_by_channel(channel.id)
except:
err_channel_id_list.append(channel.id)
else:
df_message_list.append(df)
time.sleep(1)
df_message = pd.concat(df_message_list)
1年間のメッセージを取得しようとするとかなりのデータ量になるので、不要なチャンネルのメッセージは取得しないようにすると実行時間が短くなると思います。
ログ系のチャンネルは無視する例
if ('_log_' in channel_name):
continue
データを整形し、S3にアップロードする
ここまでで必要なデータの準備ができました。
これらのデータを使って、ユーザーIDごとにもらった回数が多いスタンプベスト3を取得します。
from collections import defaultdict
df_stamp = pd.DataFrame(columns=["user_id", "first", "second", "third"])
for member in df_member['id']:
count_reaction = defaultdict(int)
user_message = df_message[df_message['user'] == member]
for reactions in user_message[user_message["reactions"].notnull()].itertuples():
for reaction in reactions.reactions:
name = reaction['name']
count_reaction[name] += reaction['count']
items_sorted = sorted(count_reaction.items(), reverse=True, key=lambda x : x[1])
if (len(items_sorted) >= 3):
df_stamp = df_stamp.append({'user_id': member, 'first': items_sorted[0], 'second': items_sorted[1], 'third': items_sorted[2]}, ignore_index=True)
取得できたデータがこちら
このデータをS3にアップロードしたいので、pickleを使ってバイナリにします。
import pickle
with open('./stamp.binaryfile', 'wb') as file:
pickle.dump(df_stamp, file)
指定のパスにstamp.binaryfile
ができたことが確認できたら、S3にアップロードしましょう!
API Gateway / Lambdaの作成
次にAPI GatewayとLambdaの準備をします。
API GatewayとLambdaの作成方法はたくさんの記事があるので省略します。
(ちなみに僕はこのあたりの記事を参考にしました)
実際に作成したLambda関数がこちら
import json
import urllib.request
import pickle
import boto3
import pandas as pd
from collections import defaultdict
def post_slack(message):
send_data = {
"text": message,
}
send_text = "payload=" + json.dumps(send_data)
request = urllib.request.Request(
"https://hooks.slack.com/services/xxxxxxxxxxxxxxxxxxxxxxxx",
data=send_text.encode("utf-8"),
method="POST"
)
with urllib.request.urlopen(request) as response:
response_body = response.read().decode("utf-8")
def get_message(df_stamp, user_id):
result = df_stamp[df_stamp['user_id'] == user_id]
first = result['first'].values.tolist()[0]
second = result['second'].values.tolist()[0]
third = result['third'].values.tolist()[0]
return f'<@{user}> \n2021年、あなたは\n:{first[0]}:\nな1年でした!\n\nあなたが2021年にもらったスタンプ\n1位::{first[0]}:({str(first[1])}回)\n2位::{second[0]}:({str(second[1])}回)\n3位::{third[0]}:({str(third[1])}回)\n'
def lambda_handler(event, context):
if ("X-Slack-Retry-Num" in event["headers"]):
return {
'statusCode': 200,
'body': json.dumps("No need to resend")
}
user = json.loads(event["body"])["event"]["user"]
s3 = boto3.resource('s3')
df_stamp = pickle.loads(s3.Bucket("nenmatsu-slack").Object("stamp.binaryfile").get()['Body'].read())
message = get_message(df_stamp, user)
post_slack(message)
return {
'statusCode': 200,
'body': json.dumps("Success!")
}
S3に保存したファイルを読み込んで、slack投稿用の文章を作成しています。
Slack Event APIはEventを発行してから3秒以内にレスポンスが返ってこないと3回リトライ処理を繰り返す仕様みたいなので、複数回投稿されてしまう可能性があります。(ドキュメントはこちら)
Your app should respond to the event request with an HTTP 2xx within three seconds. If it does not, we'll consider the event delivery attempt failed. After a failure, we'll retry three times, backing off exponentially.
それを防ぐために以下の処理を入れています。
if ("X-Slack-Retry-Num" in event["headers"]):
return {
'statusCode': 200,
'body': json.dumps("No need to resend")
}
リトライ処理のリクエストヘッダーには "X-Slack-Retry-Num": ["リトライ回数"]
が含まれるので、リトライの場合はreturnしています。
(これだと必要なリトライ処理も実行されなくなってしまうので、とりあえずの暫定対応です。。)
※API Gatewayの統合リクエストの設定で「Lambdaプロキシ統合の使用」にチェックを入れないとheaderの内容がLamdbaに返ってこないので注意
Slack Event Subscriptionsの設定
Slack Event Subscriptionsは、Slackの様々なイベントをトリガーに処理できる機能です。
今回は、メンションをトリガーにAPI Gatewayを呼び出したいと思います!
最初は、Slash CommandsやOutgoing WebHooksを使おうと思ってたんですが、
Please note, this is a legacy custom integration - an outdated way for teams to integrate with Slack. These integrations lack newer features and they will be deprecated and possibly removed in the future. We do not recommend their use. Instead, we suggest that you check out their replacement: Slack apps.
まさかのどちらもdeprecatedされていたので、初めて「Slack Event Subscriptions」を使ってみました。
先ほど作成したSlackアプリの「Event Subscriptions」に遷移し、Request URLにAPI Gatewayのエンドポイントを入力します。
「Your URL didn't respond with the value of the challenge parameter.」というエラーが出るので、challengeパラメータを返すようにLambdaの関数を変更します。
def lambda_handler(event, context):
# Slack Event Subscriptionsの認証のため
if "challenge" in event:
return event["challenge"]
すると、無事認証が通りました!
今回はメンションをトリガーにしたいので、「Subscribe to bot events」に「app_mention」を追加します。
これで準備完了です!
動作確認
では、実際に動かしてみましょう!
無事「今年のSlackスタンプ」が取得できてますね!
僕の2021年はBUMPな1年だったみたいです!
※「BUMP」というのは、POL社内でお互いのことを称えたり、労ったりする時に使っている言葉です。
おわりに
今回初めてSlackのAPIを叩いてみて、思った以上にいろいろなデータが取れるので、スタンプ以外にも面白いことができそうだなと感じました!スタンプだと全員だいたい同じような結果が返ってきてしまうので、もっと個性が出るようなbotも作ってみたいなと思います!
読んでいただきありがとうございました!
明日のアドベントカレンダーは、同じチームの根岸さん(@yk_ngsyk)です!お楽しみに!