2022/9/1から Slack 無料版のデータ保持期間が90日になりますね。
90日前以前のデータが消える前にメッセージとファイルをダウンロードしておきたかったので、自動でそれを行うコードを作成しました。
この記事はその時のメモです。
以下の記事の方法をベースに、全メッセージを取得したり、ファイルをダウンロードするようになっています。
import requests
import os
import json
# Slack API の conversations.history を利用して、
# チャンネルの全てのメッセージとファイルを取得・保存するクラス
#
# トークンの作成方法は、この記事に書いてある:
# https://qiita.com/yoshii0110/items/2a7ea29ca8a40a9e42f4
# ただし、この記事の内容に加えて、ファイルをダウンロードできるようにするために、
# User Token Scopes に files:read を追加する必要がある
class Fetcher:
def __init__(self, token, workspace_name):
# Slack API アプリのトークン
self.token = token
# 保存時に使う
self.workspace_name = workspace_name
# 複数個所で使うのでここで定義
self.headersAuth = {
'Authorization': 'Bearer '+ str(self.token),
}
# ここにリクエストする
self.SLACK_URL = "https://slack.com/api/conversations.history"
# channel_id の書いてある場所:
# Slack デスクトップアプリを開く → チャンネルを開く → チャンネル名を押す → モーダルの一番下に書いてある
#
# channel_name は保存時に使う
def fetch(self, channel_id, channel_name):
print(f'fetch 開始: [{channel_id}] {channel_name}')
# Slack API の conversations.history は、一度に取得できるメッセージの数に上限がある。
# 返却データに含まれる cursor を次のリクエストで送ることでページネーションできる。
# (参考:https://api.slack.com/methods/conversations.history)
# cursor を入れ替えてこの request メソッドを何度も呼び出して全てのメッセージを取得する。
def request(cursor = None):
payload = {
"channel": channel_id,
}
# 一番最初にリクエストするときは None
# 2回目以降は値が入っている。
# (前回の返却データにあるcursorを次のリクエストで送ることで、返却値が次のページに移動する。)
if cursor is not None:
payload['cursor'] = cursor
response = requests.get(self.SLACK_URL, headers=self.headersAuth, params=payload)
json_data = response.json()
return json_data
# ここにメッセージを詰めていく
msgs_all = []
# 最初は cursor はない
cursor = None
# print 用
request_count = 0
while True:
# 何回リクエストしているか表示しておけば、
# メッセージ数が多すぎて実行が終わらないような時に、そのことがわかりやすい。
request_count += 1
print(f'{request_count}回目のrequest')
json_data = request(cursor)
# 上手く取得できない場合に表示される
if not json_data['ok']:
print(json_data)
msgs = json_data['messages']
msgs_all += msgs
if json_data['has_more']:
# cursor を更新
cursor = json_data['response_metadata']['next_cursor']
else:
break
# このフォルダの下にメッセージとファイルを保存する
# ファイル名にチャンネルIDが含まれていると、後々どこかで役に立つかもしれないので、入れておく。
top_dirpath = f'{self.workspace_name}/{channel_name}__ID-{channel_id}'
# フォルダがないと保存時にエラーが出るので作成しておく
def makedirs_if_not_exists(path):
if not os.path.exists(path):
os.makedirs(path)
# メッセージを保存
def dump_msgs():
path = top_dirpath + '/messages.json'
makedirs_if_not_exists(top_dirpath)
with open(path, 'w') as f:
json.dump(msgs_all, f)
print(f'dumped in {path}')
dump_msgs()
# ファイルを取得して保存する
def fetch_and_save_files():
for msg in msgs_all:
# ファイルが含まれるメッセージには files キーが入っているのでそれをチェック
if not 'files' in msg.keys():
continue
files = msg['files']
for file in files:
print('ダウンロード開始:', file['name'])
# 何かしらの理由でエラーが発生した時にダウンロードが途中で止まってしまわないように try しておく
try:
# 画像があるURL
url = file['url_private']
# 画像があるURLの一部に ID らしきものが入っているのでそれを使う。
# file の中に id というキーを持つ値があるが、
# その値に、何かしらの prefix をつけたような文字列。
id_like = url.split('/')[-2]
# jpg など
filetype = file['filetype']
filename = f'{id_like}.{filetype}'
dirpath = top_dirpath + '/files'
path = dirpath + '/' + filename
makedirs_if_not_exists(dirpath)
response = requests.get(url, headers=self.headersAuth)
with open(path, mode='wb') as f:
f.write(response.content)
except Exception as e:
print('ダウンロードできませんでした。エラー:')
print(e)
fetch_and_save_files()
# 便利なラッパー関数
def fetch(fetcher, params_ary):
for params in params_ary:
fetcher.fetch(*params)
使用例は以下の通りです。
fetcher = Fetcher(
# Slack API アプリのトークン
'xoxp-130969369218-131056631301-3952340024597-c6285d27be14ae944c23aed3fadcee64',
# ワークスペースの名前
'ワークスペース1'
)
params_ary = [
# (チャンネルID, チャンネル名)
('D4W3GES8A', 'チャンネル1'),
('D7FSVRHA3', 'チャンネル2'),
]
# 実行
fetch(fetcher, params_ary)
実行すると、コードがあるフォルダの中に、以下の様にメッセージとファイルが出力されます。
ワークスペース1/
チャンネル1__ID-D4W3GES8A/
messages.json
files/
T4GJTSER-FGFFFDDW5.jpg
T4GJTSER-FAFKSBGS5.jpg
...
チャンネル2__ID-D7FSVRHA3
messages.json
files/
T4GJTSER-FHGSKSKS5.jpg
T4GJTSER-FGGHPWKW5.jpg
...
ファイル名は ID になっていて、 messages.json の中で検索することで、ファイルのメタデータや、ファイルが含まれているメッセージを取得できます。
検索するコードの例:
# dirpath: チャンネルのフォルダまでのパス
# filename: メタデータが知りたいファイルの名前
def get_file_metadata(dirpath, filename):
# 全メッセージを読み込む
with open(dirpath + '/messages.json', 'r') as f:
messages = json.load(f)
# ファイル名はURLに含まれるID(のような文字列)
id_like = filename.split('.')[0]
# 探す
for message in messages:
if not 'files' in message.keys():
continue
files = message['files']
for file in files:
if id_like in file['url_private']:
# 発見したので表示
print('メタデータ:')
print(file)
print('')
print('ファイルが含まれているメッセージ:')
print(message)
# 使用例
get_file_metadata(
dirpath='ワークスペース1/チャンネル1__ID-D4W3GES8A',
filename='T4GJTSER-FGFFFDDW5.jpg'
)
余談
Slackのメッセージを自動でダウンロードする方法として、以下の記事に書いてあるツールを使おうかなと最初思ったのですが、
- ツールは公開されておらず、作者に問い合わせる必要がある
- ファイルのダウンロード機能はなさそう?(記事には書いていない)
という理由から見送りました。