13
13

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 1 year has passed since last update.

【Python】Slack API を使用してチャンネルの全メッセージとファイルをダウンロードするコード

Last updated at Posted at 2022-08-18

2022/9/1から Slack 無料版のデータ保持期間が90日になりますね。
90日前以前のデータが消える前にメッセージとファイルをダウンロードしておきたかったので、自動でそれを行うコードを作成しました。

この記事はその時のメモです。

以下の記事の方法をベースに、全メッセージを取得したり、ファイルをダウンロードするようになっています。

import requests
import os
import json

# Slack API の conversations.history を利用して、
# チャンネルの全てのメッセージとファイルを取得・保存するクラス
# 
# トークンの作成方法は、この記事に書いてある:
# https://qiita.com/yoshii0110/items/2a7ea29ca8a40a9e42f4
# ただし、この記事の内容に加えて、ファイルをダウンロードできるようにするために、
# User Token Scopes に files:read を追加する必要がある
class Fetcher:
    
    def __init__(self, token, workspace_name):
        
        # Slack API アプリのトークン
        self.token = token
        
        # 保存時に使う
        self.workspace_name = workspace_name
        
        # 複数個所で使うのでここで定義
        self.headersAuth = {
            'Authorization': 'Bearer '+ str(self.token),
        }
        
        # ここにリクエストする
        self.SLACK_URL = "https://slack.com/api/conversations.history"
    
    # channel_id の書いてある場所:
    # Slack デスクトップアプリを開く → チャンネルを開く → チャンネル名を押す → モーダルの一番下に書いてある
    # 
    # channel_name は保存時に使う
    def fetch(self, channel_id, channel_name):
        
        print(f'fetch 開始: [{channel_id}] {channel_name}')
        
        # Slack API の conversations.history は、一度に取得できるメッセージの数に上限がある。
        # 返却データに含まれる cursor を次のリクエストで送ることでページネーションできる。
        # (参考:https://api.slack.com/methods/conversations.history)
        # cursor を入れ替えてこの request メソッドを何度も呼び出して全てのメッセージを取得する。
        def request(cursor = None):
            payload = {
                "channel": channel_id,
            }
            # 一番最初にリクエストするときは None
            # 2回目以降は値が入っている。
            # (前回の返却データにあるcursorを次のリクエストで送ることで、返却値が次のページに移動する。)
            if cursor is not None:
                payload['cursor'] = cursor
            
            response = requests.get(self.SLACK_URL, headers=self.headersAuth, params=payload)
            json_data = response.json()
            return json_data
        
        # ここにメッセージを詰めていく
        msgs_all = []
        
        # 最初は cursor はない
        cursor = None
        
        # print 用
        request_count = 0
        
        while True:
            # 何回リクエストしているか表示しておけば、
            # メッセージ数が多すぎて実行が終わらないような時に、そのことがわかりやすい。
            request_count += 1
            print(f'{request_count}回目のrequest')
            
            json_data = request(cursor)
            
            # 上手く取得できない場合に表示される
            if not json_data['ok']:
                print(json_data)
            
            msgs = json_data['messages']
            msgs_all += msgs
            if json_data['has_more']:
                # cursor を更新
                cursor = json_data['response_metadata']['next_cursor']
            else:
                break
        
        # このフォルダの下にメッセージとファイルを保存する
        # ファイル名にチャンネルIDが含まれていると、後々どこかで役に立つかもしれないので、入れておく。
        top_dirpath = f'{self.workspace_name}/{channel_name}__ID-{channel_id}'
        
        # フォルダがないと保存時にエラーが出るので作成しておく
        def makedirs_if_not_exists(path):
            if not os.path.exists(path):
                os.makedirs(path)
        
        # メッセージを保存
        def dump_msgs():
            path = top_dirpath + '/messages.json'
            makedirs_if_not_exists(top_dirpath)
            
            with open(path, 'w') as f:
                json.dump(msgs_all, f)
            
            print(f'dumped in {path}')
        
        dump_msgs()
        
        # ファイルを取得して保存する
        def fetch_and_save_files():
            for msg in msgs_all:
                # ファイルが含まれるメッセージには files キーが入っているのでそれをチェック
                if not 'files' in msg.keys():
                    continue
                
                files = msg['files']
                for file in files:
                    print('ダウンロード開始:', file['name'])
                    
                    # 何かしらの理由でエラーが発生した時にダウンロードが途中で止まってしまわないように try しておく
                    try:
                        # 画像があるURL
                        url = file['url_private']
                        
                        # 画像があるURLの一部に ID らしきものが入っているのでそれを使う。
                        # file の中に id というキーを持つ値があるが、
                        # その値に、何かしらの prefix をつけたような文字列。
                        id_like = url.split('/')[-2]
                        
                        # jpg など
                        filetype = file['filetype']
                        
                        filename = f'{id_like}.{filetype}'
                        dirpath = top_dirpath + '/files'
                        path = dirpath + '/' + filename
                        makedirs_if_not_exists(dirpath)

                        response = requests.get(url, headers=self.headersAuth)
                        with open(path, mode='wb') as f:
                            f.write(response.content)
                    
                    except Exception as e:
                        print('ダウンロードできませんでした。エラー:')
                        print(e)
        
        fetch_and_save_files()

# 便利なラッパー関数
def fetch(fetcher, params_ary):
    for params in params_ary:
        fetcher.fetch(*params)

使用例は以下の通りです。

fetcher = Fetcher(    
    # Slack API アプリのトークン
    'xoxp-130969369218-131056631301-3952340024597-c6285d27be14ae944c23aed3fadcee64',
    # ワークスペースの名前
    'ワークスペース1'
)
params_ary = [
    # (チャンネルID, チャンネル名)
    ('D4W3GES8A', 'チャンネル1'),
    ('D7FSVRHA3', 'チャンネル2'),
]
# 実行
fetch(fetcher, params_ary)

実行すると、コードがあるフォルダの中に、以下の様にメッセージとファイルが出力されます。

ワークスペース1/
    チャンネル1__ID-D4W3GES8A/
        messages.json
        files/
            T4GJTSER-FGFFFDDW5.jpg
            T4GJTSER-FAFKSBGS5.jpg
            ...
    チャンネル2__ID-D7FSVRHA3
        messages.json
        files/
            T4GJTSER-FHGSKSKS5.jpg
            T4GJTSER-FGGHPWKW5.jpg
            ...

ファイル名は ID になっていて、 messages.json の中で検索することで、ファイルのメタデータや、ファイルが含まれているメッセージを取得できます。

検索するコードの例:

# dirpath: チャンネルのフォルダまでのパス
# filename: メタデータが知りたいファイルの名前
def get_file_metadata(dirpath, filename):
    # 全メッセージを読み込む
    with open(dirpath + '/messages.json', 'r') as f:
        messages = json.load(f)

    # ファイル名はURLに含まれるID(のような文字列)
    id_like = filename.split('.')[0]

    # 探す
    for message in messages:
        if not 'files' in message.keys():
            continue

        files = message['files']
        for file in files:
            if id_like in file['url_private']:
                # 発見したので表示
                print('メタデータ:')
                print(file)
                print('')
                print('ファイルが含まれているメッセージ:')
                print(message)

# 使用例
get_file_metadata(
    dirpath='ワークスペース1/チャンネル1__ID-D4W3GES8A', 
    filename='T4GJTSER-FGFFFDDW5.jpg'
)

余談

Slackのメッセージを自動でダウンロードする方法として、以下の記事に書いてあるツールを使おうかなと最初思ったのですが、

  • ツールは公開されておらず、作者に問い合わせる必要がある
  • ファイルのダウンロード機能はなさそう?(記事には書いていない)

という理由から見送りました。

13
13
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
13
13

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?