14
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 1 year has passed since last update.

「今年の漢字」ならぬ「今年のSlackスタンプ」をLambdaとAPI Gatewayで作ってみた

Last updated at Posted at 2021-12-12

POLプロダクト Advent Calendar 2021 の12日目担当、プロダクト部でエンジニア/エンジニア広報をしている @sho-kanamaru です!Twitterアカウントはこちらでございます。

11日目担当のリコピンさん(@ryu19-1)からバトンを受けました。AppSync最強!!!

さてさて、本日12月12日は漢字の日らしいです!
毎年「今年の漢字」が発表されていますが、今年は13日午後2時に発表みたいですね!(この記事書くから今日発表してほしかった。。)

せっかく漢字の日だし、今年の漢字っぽいことできないかな〜と考えたときに、「Slackのスタンプで今年1年を表せるのでは?」と思ったのでちょっと作ってみました!

完成イメージ

@nenmatsuメンションをすると、「2021年は、あなたは◯◯な1年でした!」と教えてくれます。(ついでに、2021年にもらったスタンプベスト3も)

image.png

全体構成

今回使った技術と流れはこんな感じ

  1. Slack Event SubscriptionsからメンショントリガーでAPI Gatewayを呼ぶ
  2. API GatewayをトリガーにLambdaの関数を実行
  3. LambdaからS3のデータを読み込む
  4. SlackのWebhook URLを使用してメッセージ投稿

image.png

では実際に作っていきましょう!

新しいSlackアプリを作成

こちらから新しいSlackアプリを作成します。

image.png

管理者権限を持ってない人はこのような警告が出ると思います。
いろいろと設定を変えるごとに管理者に承認してもらわないといけないのでちょっと大変です。。

Webhook URLの取得

次に、LambdaからSlackに投稿をするためにWebhook URLを取得します。

image.png

投稿したいチャンネルを選びましょう。

image.png

Scopeの設定

次に、SlackのAPIを叩くために、OAuth Scopeを設定します。
「OAuth & Permissions > Scopes > User Token Scopes」

image.png

ちなみに、追加するべきScopeはAPI methodsのページに記載されてあります。

例えば、users.listのAPIを叩く場合には、「users:read」のScopeが必要になります。

貼り付けた画像_12_12_21_午後6_16.png

2021年のSlackのデータを取得する

やりたいことを実現するために、SlackのAPIを叩いてデータを取得します。

では、それぞれ叩いていきましょう!

ユーザー一覧の取得

fetch_users.py
endpoint = 'https://slack.com/api/users.list'
data = requests.get(endpoint, headers=headers, params={}).json()
df_member = pd.DataFrame(data['members'])

チャンネル一覧の取得

fetch_channels.py
next_cursor = ''
channel_list = []

while True:
    endpoint = 'https://slack.com/api/conversations.list'
    payload = {
        'exclude_archived': True,
        'limit': 200,
        'cursor': next_cursor
    }

    data = requests.get(endpoint, headers=headers, params=payload).json()
    next_cursor = data['response_metadata']['next_cursor']
    channels = data['channels']
    channel_list.extend(channels)
    if (next_cursor == ""):
        break
df_channel = pd.DataFrame(channel_list)

アーカイブされていないチャンネルを取得したいときに、以下のようなリクエストだと取れない場合があるので注意が必要です。

payload = {
    'exclude_archived': True,
    'limit': 200
}

ドキュメントによると、

When paginating, any filters used in the request are applied after retrieving a virtual page’s limit. For example. using exclude_archived=true when limit=20 on a virtual page that would contain 15 archived channels will return you the virtual page with only 5 results. Additional results are available from the next cursor value.

つまり、limitを指定しても必ずしもlimitで指定した分のアーカイブされていないチャンネルが取れるわけではないので、next_cursor をリクエストに入れてあげる必要があります。

メッセージ一覧の取得

fetch_messages.py
def fetch_messages_by_channel(channel_id):
    ls_messages = []
    latest_ts = None
    start_date = pd.to_datetime('2021-01-01')
    endpoint = 'https://slack.com/api/conversations.history'

    while True:
        payload = {
            'channel': channel_id,
            'latest': latest_ts,
            'limit': 200
        }

        data = requests.get(endpoint, headers=headers, params=payload).json()
        messages = data['messages']
        ls_messages.extend(messages)

        if data['has_more']:
            time.sleep(1)
            latest_ts = messages[-1]['ts']
            latest_date = pd.to_datetime(latest_ts, unit='s')
            if latest_date < start_date:
                break
        else:
            break
    df_message = pd.DataFrame(ls_messages)
    df_message['channel_id'] = channel_id
    
    # 2021年の発言に絞る
    df_message['dt'] = pd.to_datetime(df_message['ts'], unit='s')
    df_message = df_message.query('@start_date < dt')
    return df_message

ここでもリクエストを適切に設定してメッセージを取得しましょう。
ドキュメントによると

If the response includes has_more then the client can make another call, using the ts value of the final messages as the latest param to get the next page of messages.

取得した最後のメッセージの ts の値を 次のリクエストの lastest に入れることで続きからメッセージを取得できます。

また、SlackのAPIにはリクエスト数に制限があります。

image.png

conversations.historyはTier3のメソッドで、1分間に約50リクエスト受け付けることできるので、1秒間のリクエスト間隔を空けています。

そして、先ほど取得したチャンネル数を使用してこの関数を呼び出し、チャンネルごとのメッセージを取得しましょう。

fetch_messages.py
df_message_list = []
err_channel_id_list = []

for channel in df_channel.itertuples():
    if ('_log_' in channel.name):
        continue
    try:
        df = fetch_messages_by_channel(channel.id)
    except:
        err_channel_id_list.append(channel.id)
    else:
        df_message_list.append(df)
    time.sleep(1)

df_message = pd.concat(df_message_list)

1年間のメッセージを取得しようとするとかなりのデータ量になるので、不要なチャンネルのメッセージは取得しないようにすると実行時間が短くなると思います。

ログ系のチャンネルは無視する例

if ('_log_' in channel_name):
    continue

データを整形し、S3にアップロードする

ここまでで必要なデータの準備ができました。
これらのデータを使って、ユーザーIDごとにもらった回数が多いスタンプベスト3を取得します。

stamp.py
from collections import defaultdict

df_stamp = pd.DataFrame(columns=["user_id", "first", "second", "third"])

for member in df_member['id']:
    count_reaction = defaultdict(int)
    user_message = df_message[df_message['user'] == member]
    for reactions in user_message[user_message["reactions"].notnull()].itertuples():
        for reaction in reactions.reactions:
            name = reaction['name']
            count_reaction[name] += reaction['count']
    items_sorted = sorted(count_reaction.items(), reverse=True, key=lambda x : x[1])
    if (len(items_sorted) >= 3):
        df_stamp = df_stamp.append({'user_id': member, 'first': items_sorted[0], 'second': items_sorted[1], 'third': items_sorted[2]}, ignore_index=True)

取得できたデータがこちら

image.png

このデータをS3にアップロードしたいので、pickleを使ってバイナリにします。

dump.py
import pickle

with open('./stamp.binaryfile', 'wb') as file:
    pickle.dump(df_stamp, file)

指定のパスにstamp.binaryfileができたことが確認できたら、S3にアップロードしましょう!

image.png

API Gateway / Lambdaの作成

次にAPI GatewayとLambdaの準備をします。
API GatewayとLambdaの作成方法はたくさんの記事があるので省略します。
(ちなみに僕はこのあたりの記事を参考にしました)

実際に作成したLambda関数がこちら

lambda_function.py
import json
import urllib.request
import pickle
import boto3
import pandas as pd
from collections import defaultdict

def post_slack(message):
    send_data = {
        "text": message,
    }
    send_text = "payload=" + json.dumps(send_data)
    request = urllib.request.Request(
        "https://hooks.slack.com/services/xxxxxxxxxxxxxxxxxxxxxxxx",
        data=send_text.encode("utf-8"), 
        method="POST"
    )
    with urllib.request.urlopen(request) as response:
        response_body = response.read().decode("utf-8")
        
def get_message(df_stamp, user_id):
    result = df_stamp[df_stamp['user_id'] == user_id]
    first = result['first'].values.tolist()[0]
    second = result['second'].values.tolist()[0]
    third = result['third'].values.tolist()[0]
    return f'<@{user}> \n2021年、あなたは\n:{first[0]}:\nな1年でした!\n\nあなたが2021年にもらったスタンプ\n1位::{first[0]}:({str(first[1])}回)\n2位::{second[0]}:({str(second[1])}回)\n3位::{third[0]}:({str(third[1])}回)\n'

def lambda_handler(event, context):    
    if ("X-Slack-Retry-Num" in event["headers"]):
        return {
            'statusCode': 200,
            'body': json.dumps("No need to resend")
        }
        
    user = json.loads(event["body"])["event"]["user"]
    
    s3 = boto3.resource('s3')
    df_stamp = pickle.loads(s3.Bucket("nenmatsu-slack").Object("stamp.binaryfile").get()['Body'].read())

    message = get_message(df_stamp, user)
    
    post_slack(message)
    
    return {
        'statusCode': 200,
        'body': json.dumps("Success!")
    }

S3に保存したファイルを読み込んで、slack投稿用の文章を作成しています。

Slack Event APIはEventを発行してから3秒以内にレスポンスが返ってこないと3回リトライ処理を繰り返す仕様みたいなので、複数回投稿されてしまう可能性があります。(ドキュメントはこちら

Your app should respond to the event request with an HTTP 2xx within three seconds. If it does not, we'll consider the event delivery attempt failed. After a failure, we'll retry three times, backing off exponentially.

それを防ぐために以下の処理を入れています。

if ("X-Slack-Retry-Num" in event["headers"]):
    return {
        'statusCode': 200,
        'body': json.dumps("No need to resend")
    }

リトライ処理のリクエストヘッダーには "X-Slack-Retry-Num": ["リトライ回数"] が含まれるので、リトライの場合はreturnしています。
(これだと必要なリトライ処理も実行されなくなってしまうので、とりあえずの暫定対応です。。)

※API Gatewayの統合リクエストの設定で「Lambdaプロキシ統合の使用」にチェックを入れないとheaderの内容がLamdbaに返ってこないので注意

貼り付けた画像_12_12_21_午後0_37.png

Slack Event Subscriptionsの設定

Slack Event Subscriptionsは、Slackの様々なイベントをトリガーに処理できる機能です。
今回は、メンションをトリガーにAPI Gatewayを呼び出したいと思います!

最初は、Slash CommandsOutgoing WebHooksを使おうと思ってたんですが、

Please note, this is a legacy custom integration - an outdated way for teams to integrate with Slack. These integrations lack newer features and they will be deprecated and possibly removed in the future. We do not recommend their use. Instead, we suggest that you check out their replacement: Slack apps.

まさかのどちらもdeprecatedされていたので、初めて「Slack Event Subscriptions」を使ってみました。

先ほど作成したSlackアプリの「Event Subscriptions」に遷移し、Request URLにAPI Gatewayのエンドポイントを入力します。

image.png

「Your URL didn't respond with the value of the challenge parameter.」というエラーが出るので、challengeパラメータを返すようにLambdaの関数を変更します。

lambda_function.py
def lambda_handler(event, context):    
    # Slack Event Subscriptionsの認証のため
    if "challenge" in event:
        return event["challenge"]

すると、無事認証が通りました!

image.png

今回はメンションをトリガーにしたいので、「Subscribe to bot events」に「app_mention」を追加します。

image.png

これで準備完了です!

動作確認

では、実際に動かしてみましょう!

Image from Gyazo

無事「今年のSlackスタンプ」が取得できてますね!
僕の2021年はBUMPな1年だったみたいです!
※「BUMP」というのは、POL社内でお互いのことを称えたり、労ったりする時に使っている言葉です。

おわりに

今回初めてSlackのAPIを叩いてみて、思った以上にいろいろなデータが取れるので、スタンプ以外にも面白いことができそうだなと感じました!スタンプだと全員だいたい同じような結果が返ってきてしまうので、もっと個性が出るようなbotも作ってみたいなと思います!

読んでいただきありがとうございました!
明日のアドベントカレンダーは、同じチームの根岸さん(@yk_ngsyk)です!お楽しみに!

14
1
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
14
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?