2
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 1 year has passed since last update.

Slackの過去投稿をダウンロードする方法(Python Slack SDK版)

Last updated at Posted at 2022-08-11

2022年9月1日以降、Slackのフリープランのユーザーは、90日が経過したメッセージが非表示となるということが発表されました。非表示になる前にデータを退避したい時、フリープラン含めて提供されているエクスポート機能で普通にダウンロードも出来ますが、Python Slack SDKの勉強がてらPython版のアプリを作ったのでご査収ください。

手順1 : Python Slack SDKのインストール

pip install slack_sdk

手順2 : Slack APIトークンに必要なOAuthスコープの確認

(ダウンロードだけ出来ればいい方は手順3に進んで構いません)

下記アプリでは、APIとしてusers.list、conversations.list、conversations.history、conversations.repliesの4種類のAPIを使っていますので、それぞれのAPIを叩くのに必要なスコープを調べます:

  • users.list
    "Required scopes"の"User Tokens"のところに書かれているスコープusers:readが必要になります。
  • conversations.list
    "User Tokens"のところにchannels:read、groups:read、im:read、mpim:readの4つが書かれていますが、ここではチャンネルに付随した情報だけ取れればいいのでchannels:readが必要になります。
  • conversations.history
    同様に、channels:historyが必要。
  • conversations.replies
    同様に、channels:historyが必要。

というわけでまとめると必要なスコープはusers:read、channels:history、channels:readの三つと分かります。

手順3 : Tokenの生成入手

手順2で判明した必要なOAuthスコープ三つ(users:read、channels:history、channels:read)を備えたTokenを入手します。単純に下記手順で大丈夫です。

  1. https://api.slack.com/start/building/bolt-python で "Create a Slack app"のボタンをクリック。
  2. "From scratch"を選択。
  3. "App Name"に好きなアプリの名前、"Pick a workspace to develop your app in:"で対象とするワークスペースを選択、"Create App"ボタンをクリック。
  4. "Permissions"をクリック。
  5. "User Token Scopes"のところで"Add an OAuth Scope"をクリック、順次必要なスコープ(上記三つ)を選んで追加する。
  6. "OAuth Tokens for Your Workspace"のところで"Install to Workspace"をクリック。
  7. 許可を求めるページに遷移するので"許可する"をクリック。
  8. 元のページに自動で戻る。すると、"User OAuth Token"のところに"xoxp-…"というTokenが生成されているので、それをコピーして取っておく。

手順4 : 下記pythonコード実行

下記コードを実行します。conversations_history_pagenationメソッドの中でページネーションを辿って全メッセージを取得しています。
また、各APIにはrate limitが設定されていますので、time.sleepの長さをSLEEPDURで設定しています。ここは自己責任で設定してください。

Slack-download.py
# 上記で入手したTokenをコピペ。
SLACKTOKEN = 'xoxp-…'

# sleep duration  
# methodごとのrate limitを超えない様にする(https://api.slack.com/docs/rate-limits)
SLEEPDUR = 3.0

import os
from slack_sdk import WebClient
from slack_sdk.errors import SlackApiError
import time
from datetime import datetime, timedelta, timezone
import json

token = SLACKTOKEN

class slack_downloader:
    def __init__(self, token):
        self.token = token
        self.client = WebClient(token=token)

    def slack_exception_handling(func):
        def _wrapper(*args, **kwargs):
            try:
                result = func(*args, **kwargs)
            except SlackApiError as e:
                print(f"Error occurs when calling slack api : {e.response['error']}")
            return result
        return _wrapper

    @slack_exception_handling
    def get_users_list(self):
        response = self.client.users_list()
        return response['members']

    @slack_exception_handling
    def get_channels_list(self):
        response = self.client.conversations_list()
        channels_info = [ {'id' : x['id'] , 'name' : x['name']}  for x in response['channels'] ]
        return channels_info

    @slack_exception_handling
    def get_channels_history(self, channels_info):
        for channel in channels_info:

            id = channel['id']
            channel_name = channel['name']

            #pagenationを辿ってメッセージ(リプライ含まず)を取得
            messages = self.conversations_history_pagenation(channel=id, limit=100)
 
            # メッセージとリプライ(もしあれば)のdictのリストを取得
            messages_list = []
            for message in messages: 
                messages_dict = dict()
                messages_dict['message'] = message
                if 'reply_count' in message:
                    # replyは元のメッセージとタイムスタンプで紐づけられている。
                    ts = message['thread_ts']
                    response = self.client.conversations_replies(channel=id, ts=ts)
                    replies = response['messages']
                    #print('Number of replies : {}'.format(len(replies)))
                    messages_dict['replies'] = replies
                    time.sleep(SLEEPDUR)
                messages_list.append(messages_dict)

            channel['messages'] = messages_list

    # pagenationを辿って全メッセージ取得。一度に取れるのは1000メッセージまで(limit=100がデフォールト)。
    @slack_exception_handling
    def conversations_history_pagenation(self, channel, limit=100):
        messages_list = []
        response_history = self.client.conversations_history(channel=channel, limit=limit)
        messages_list.extend(response_history['messages'])
        time.sleep(SLEEPDUR)

        while( 'response_metadata' in response_history and 'next_cursor' in response_history['response_metadata']):
            next_cursor = response_history['response_metadata']['next_cursor']
            response_history = self.client.conversations_history(channel=channel, limit=limit, cursor=next_cursor)
            messages_list.extend(response_history['messages'])
            time.sleep(SLEEPDUR)
        return messages_list

# downloaderインスタンス
downloader = slack_downloader(token)

# ダウンロード結果格納用辞書
slack_download = dict()

# ユーザー一覧取得
# SDKでのmethod名は https://api.slack.com/methods の . (ドット)を _ (アンダースコア)に置換したもの
slack_download['members'] = downloader.get_users_list()

# チャンネル一覧の取得
channels_info = downloader.get_channels_list()
slack_download['channels_info'] = channels_info

# 全チャンネルの全メッセージ(リプライ含む)の取得
downloader.get_channels_history(channels_info)
slack_download['channels_messages_and_repplies'] = channels_info

# ファイル保存
JST = timezone(timedelta(hours=+9), 'JST')
now = datetime.now(JST)
filename = 'slack-download_' + now.strftime('%Y%m%d_%H%M%S') + '.json'
with open(filename, 'w') as f:
    json.dump(slack_download, f, indent=4)

お願い

バグ、改善点等ありましたらコメントもしくはtwitterでDMいただければ幸いです。

2
1
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
2
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?