LoginSignup
0
0

More than 5 years have passed since last update.

slackagentを作った時のメモ

Last updated at Posted at 2018-06-04

目的

slackを使ってて標準で備えてない機能をBOTに実現させて便利につかいたい
できるようになったことは目次参照

作ったBOT

hadacchi/slackagent

今ある機能

常駐

    def startListen(self):
        'connect to slack and start to listening'

        realname, uname, uid = self._auth_test()
        imid = self._get_im()
        self._sc = slackclient.SlackClient(privatedata.bot_user_oauth_access_token)

        if self._sc.rtm_connect(with_team_state=False, auto_reconnect=True):
            while True:
                try:
                    buf = self._sc.rtm_read()
                except Exception as e:
                    self._logger.exception(e)
                    raise e
                if len(buf) == 0:
                    time.sleep(1)
                    continue
                for res in buf:
                    if 'type' in res:
                        if res['type'] == 'user_typing':
                            if random.uniform(0,1) > 0.8:
                                self._random_message(res['channel'], configuration.WAITING_MSGS)
                            continue
                        if res['type'] == 'message':  # 発言形式を取るメッセージは全部これ
                            if res.get('channel') == imid[uname]:
                                # BOTへのprivate ch
                                if res.get('subtype') == 'bot_message':
                                    continue
                                self._search_for_pic(res)
                            if 'subtype' not in res and 'attachments' not in res:
                                self._chooseAction(res['channel'], res)
                    else:
                        self._logger.debug(str(res))
                time.sleep(1)
            self._logger.info('now, disconeccted')

メッセージの一括削除

    def _clear_log(self, fch, chname):
        'clear log of the indicated ch'

        chid = self._get_ch()
        imid = self._get_im()
        realname, uname, uid = self._auth_test()

        if chname not in chid:
            if chname not in imid:
                if chname != realname or uname not in imid:
                    self._random_message(fch, configuration.NOCH_MSGS)
                    return
                else:
                    targetid = imid[uname]
            else:
                targetid = imid[chname]
        else:
            targetid = chid[chname]

        self._random_message(fch, configuration.CONFIRM_MSGS)
        history, more = self._get_history(fch, targetid)
        while True:
            for msg in history[10:]:
                r = requests.post( slackparams.del_url
                                 , params = { 'token': privatedata.oauth_access_token
                                            , 'channel': targetid
                                            , 'ts': msg['ts']
                                            , 'as_user': True
                                            }
                                 )
                time.sleep(1)

            if more:
                history, more = self._get_history(fch, targetid)
            else:
                break
        self._random_message(fch, configuration.FINISH_MSGS)
  • im.listではユーザIDとIMのID(多分,2者通話なのでユーザIDペア毎に生成される)しか得られない
    def _get_im(self):
        'return a dict from uname to imid'

        try:
            with open(configuration.impkl, 'rb') as f:
                ims = pickle.load(f)
        except FileNotFoundError:
            r = requests.get( slackparams.imlist_url
                            , params = {'token': privatedata.oauth_access_token}
                            )
            response = json.loads(r.text)
            if response['ok']:
                ims = {res['user']:res['id'] for res in response['ims']}
                with open(configuration.impkl,'wb') as f:
                    pickle.dump(ims,f)
                return ims
            else:
                raise Exception(r.text)
        try:
            with open(configuration.usrpkl, 'rb') as f:
                usrs = pickle.load(f)
        except FileNotFoundError:
            r = requests.get( slackparams.usrlist_url
                            , params = {'token': privatedata.oauth_access_token}
                            )
            response = json.loads(r.text)
            if response['ok']:
                usrs = {res['name']:res['id'] for res in response['members']}
                with open(configuration.usrpkl,'wb') as f:
                    pickle.dump(usrs,f)
            else:
                raise Exception(r.text)
    def _store_pic(self, uname, obj):
        '画像添付を保存する'
        r = requests.get(obj.get('image_url'))
        if r.status_code == 200:
            dname = configuration.IMGBASE + uname + '/'
            try:
                os.mkdir(dname)
            except FileExistsError:
                pass
            with open(dname + obj.get('image_url').split('/')[-1], 'wb') as f:
                f.write(r.content)
0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0