1
5

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 1 year has passed since last update.

【Twitter改めX】詐欺アカウント自動ブロックツール作成②

Posted at

前回の続きです。

【改善・変更点】

・設定項目をiniファイルに記述出来るようにした
・自動モードを追加した
・Logファイル出力機能追加

大きな改善点は、iniファイルに複数のアカウント情報を入力し、自動で全アカウントの詐欺アカブロックを行える「自動モード」を追加した事です。
これでタスクマネージャに登録すれば、定期的に自動ブロックしてくれるので、詐欺アカのストレスは完全に無くなります。(ちなみに捨てアカと鍵アカも一緒にブロックします)
また、Logファイルを出力するようにしました。

■ほぼ実用的に使えるようになったので、我が家ではタスクマネージャで定期実行しています。

【ソースコード】

#■■ blockbot Twitterの詐欺アカウント自動ブロックツール ■■ 2023.7.15 Nobukz
# ver.1.0 API不要のcookie仕様
# ver.2.0 tkinterにてUI実装
# ver.3.0 複数アカウントの切替対応 
# ver.4.0 自分を勝手にリストへ登録する詐欺アカをブロック
#  ※通常blockだとリストから削除されないため、playwrightでブラウザ操作をエミュレート
# ver.5.0 詐欺アカ判定用ワードを外部ファイルへ分離
# ver.6.0 iniファイルで設定読込/Logファイル出力追加/自動モード追加/鍵アカブロック追加

import sys
import utils
import tweepy
import time
import tkinter
from tkinter import *
from tkinter import ttk
from tkinter import filedialog
from tkinter import messagebox
import tkinter as tk
from tkinter import scrolledtext
from playwright.sync_api import Playwright, sync_playwright, expect
import configparser
import os
import errno
import logging
from datetime import datetime, timedelta, timezone

#Log設定
logger = logging.getLogger()
logger.setLevel(logging.DEBUG)
handler = logging.FileHandler('blockbot.log', encoding='utf-8')
handler.setLevel(logging.DEBUG)
formatter = logging.Formatter('%(levelname)s  %(asctime)s  [%(name)s] %(message)s')
handler.setFormatter(formatter)
logger.addHandler(handler)
logger.info('■■■■ 処理開始 ■■■■')

hit_count = 0
max_count = 0
block_users = []
block_IDs = []
block_lists = []
block_list_makers = []
block_list_IDs = []
block_group = []
block_border = []
group_max = []
block_words =[list(range(99)),list(range(99))]
word_check = []

user_ID = []
password = []
checkFollowerNum =[]

row_count=0
word_count=0
group_count=0
mem_group=0

#CSVファイル詠み込み
inifile_path = "" + "\\"
inifile = 'blockbot.ini'
check_path = inifile_path + inifile
if inifile_path =="\\":
    check_path = inifile

# iniファイルの読み込み
config_ini = configparser.ConfigParser()
# 指定したiniファイルが存在しない場合、エラー発生
if not os.path.exists(check_path):
    raise FileNotFoundError(errno.ENOENT, os.strerror(errno.ENOENT), check_path)
config_ini.read(check_path, encoding='utf-8')
num_listing_followers = config_ini.get('Misc', 'num_listing_followers')
csvfile = config_ini.get('Misc', 'csvfile')
AutoMode  = config_ini.get('Mode', 'AutoMode')

lst = [line.rstrip().split(",") for line in open(csvfile, newline='\n',encoding='utf-8').readlines()]
for row in lst:
    if row_count ==0:
        block_group.append(int(row[0])-1)
        block_border.append(int(row[1])-1)
    if int(row[0])-1 != mem_group:
        group_max.append(word_count)
        group_count=group_count+1
        block_group.append(int(row[0])-1)
        block_border.append(int(row[1])-1)
        mem_group=int(row[0])-1
        word_count=0
    
    block_words[group_count][word_count] = str(row[2])
    logger.info('BlockWord[%s,%s] %s/Border:%s',str(group_count+1),str(word_count+1),str(row[2]),str(block_border[group_count]+1))  
    word_count=word_count+1
    row_count=row_count+1
group_max.append(word_count)
    
#詐欺アカチェック処理
def follower_check(id,pwd,max_count):
    api = utils.get_cookie_api(id,pwd)
    followers = []
    hit_count = 0
    for page in tweepy.Cursor(api.get_followers, screen_name=id,count=num_listing_followers).pages():
        followers += [user._json for user in page]
        for i in range(len(page)):
            judge=0
            word_check.clear()
            chk_count=0
            for chk_group in range(len(block_group)):
                chk_count2=0
                for chk in range(group_max[chk_count]):
                    if str(page[i].description).rfind(block_words[chk_count][chk_count2]) > 0:
                        word_check.append(1)
                        print("Hit:"+str(block_words[chk_count][chk_count2])+"/"+str(page[i].screen_name))
                        logger.info('Hit:%s/%s',str(block_words[chk_count][chk_count2]),str(page[i].screen_name))
                    chk_count2=chk_count2+1
                chk_count=chk_count+1
            if sum(word_check) > block_border[chk_count-1]:
                judge=1
            chk1 = page[i].followers_count
            chk2 = page[i].statuses_count
            chk3 = page[i].favourites_count
            chk4 = page[i].protected
            if chk1 < 10 and chk2 < 5 and chk3 < 5:
                judge=1
            if chk4 == True:
                judge=1
            result = "Flw-Tw-Fav:" + str(chk1) + "-" + str(chk2) + "-" + str(chk3)
            if judge == 1:
                block_users.append(page[i].screen_name)
                block_IDs.append(page[i].id)
                hit_count = hit_count +1
                send_textbox("" + str(hit_count) +"" + str(page[i].screen_name) + "/" + result + "/" + str(page[i].description) + "\n\n")
                print("■ HitUser :" + str(page[i].id) + "/" + str(page[i].screen_name) + "/" + result + "/" + str(page[i].description))
        print("取得数",len(followers))
        logger.info('取得数 %s',len(followers))
        time.sleep(1)
        if len(followers) > max_count:
            break 

    mem_list = api.get_list_memberships(screen_name=id)
    for mem in mem_list:

        judge=0
        word_check.clear()
        chk_count=0
        for chk_group in range(len(block_group)):
            chk_count2=0
            for chk in range(group_max[chk_count]):
                if str(mem.description).rfind(block_words[chk_count][chk_count2]) > 0:
                    word_check.append(1)
                    logger.info('Hit:%s',str(block_words[chk_count][chk_count2]))
                chk_count2=chk_count2+1
            chk_count=chk_count+1
        logger.info('[List]Hit:%s/%s',str(sum(word_check)),str(block_border[chk_count-1]))
        if sum(word_check) > block_border[chk_count-1]:
            judge=1
        chk1 = mem.user.followers_count
        chk2 = mem.user.statuses_count
        chk3 = mem.user.favourites_count
        chk4 = mem.user.protected

        if chk1 < 10 and chk2 < 5 and chk3 < 5:
            judge=1
        if chk4 == True:
            judge=1
        result = "Flw-Tw-Fav:" + str(chk1) + "-" + str(chk2) + "-" + str(chk3)
        if judge == 1:
            print("■ Hitlist: "+mem.name, "screen_name: "+mem.user.screen_name)
            logger.info('■ Hitlist:%s name:%s',mem.name, mem.user.screen_name)
            block_lists.append(mem.name)
            block_list_makers.append(mem.user.screen_name)
            block_list_IDs.append(mem.id)
            hit_count = hit_count +1
            send_textbox(""+str(hit_count) +""+"List: "+mem.name+"/"+str(mem.user.screen_name)+"/"+str(mem.id)+"/"+result+"/"+str(mem.user.description)+"\n\n")

#ブロック処理
def follower_block(id,pwd):
    #詐欺アカブロック(ユーザー単位)
    for block_user in range(len(block_users)):
        api.report_spam(screen_name=block_users[block_user])
        send_textbox("■ブロック【" + str(block_user) +"" + str(block_users[block_user]) + "\n")
        print("■■■ ブロック完了:" + str(block_users[block_user])) 
        logger.info('■■■ブロック完了:%s',str(block_users[block_user])) 
    if len(block_lists) >0:
        #詐欺アカに追加されたリストから自分を消す処理1:いったんブロック解除
        for block_list in range(len(block_lists)):
            api.destroy_block(screen_name=block_list_makers[block_list])
            send_textbox("■いったんブロック解除:" + str(block_list_makers[block_list]) + "\n")
            logger.info('■いったんブロック解除:%s',str(block_list_makers[block_list]))

        #詐欺アカに追加されたリストから自分を消す処理2:リストから自分を消すブロック処理
        with sync_playwright() as p:
            browser = p.chromium.launch(headless=False)
            context = browser.new_context()
            page = context.new_page()
            page.goto("https://twitter.com/i/flow/login?redirect_after_login=%2F" + id)
            page.locator("label div").nth(3).click()
            page.get_by_label("電話番号/メールアドレス/ユーザー名").fill(
                id)
            page.get_by_role("button", name="次へ").click()
            page.get_by_label("パスワード", exact=True).click()
            page.get_by_label("パスワード", exact=True).fill(pwd)
            page.get_by_test_id("LoginForm_Login_Button").click()
            page.get_by_label("リスト").click()
            page.get_by_test_id("primaryColumn").get_by_label("もっと見る").click()
            page.get_by_role("menuitem", name="自分がメンバーとなっているリスト").click()
            for block_list in range(len(block_lists)):
                fix_text="https://twitter.com/i/lists/" + str(block_list_IDs[block_list])
                logger.info('■リスト検索:%s',str(fix_text))
                page.goto(fix_text)
                page.get_by_test_id("primaryColumn").get_by_label("もっと見る").click()
                page.get_by_text("@" + block_list_makers[block_list] + "さんをブロック").click()
                page.get_by_test_id("confirmationSheetConfirm").click()
                print("■対象リストブロック完了:" + str(block_lists[block_list]) + "/@" + str(block_list_makers[block_list]))
                logger.info('■対象リストブロック完了:%s/@%s',str(block_lists[block_list]),str(block_list_makers[block_list]))
                time.sleep(1)

# クリア処理
def temp_clear():
    hit_count = 0
    block_users.clear()
    block_IDs.clear()
    block_lists.clear()
    block_list_IDs.clear()
    block_list_makers.clear()
    user_ID.clear()
    password.clear()

#テキストボックスへ転記処理
def send_textbox(word):
    if AutoMode != '1':
        textBox.insert(END, word)
        textBox.see(END)
        textBox.update()

#チェックボタン処理
def btn_check():
    #messagebox.showinfo("確認", "チェックを開始します")
    # テキスト取得
    hit_count = 0
    textBox.delete("1.0", END)
    textBox.update()

    user_ID.append(txt_1.get())
    password.append(txt_2.get())

    if txt_3.get() == '':
        messagebox.showinfo("【エラー】", "全てのボックスに入力して下さい")
        return()
    max_count = int(txt_3.get())
    if user_ID == "" or password == "" or max_count < 1:
        messagebox.showinfo("【エラー】", "全てのボックスに入力して下さい")
        return()
    api = utils.get_cookie_api(user_ID[0],password[0])
    follower_check(user_ID[0],password[0],max_count)
    messagebox.showinfo("チェック処理", "抽出数:" + str(hit_count) + " / " + str(max_count) + "\n\nチェック完了しました")

#ブロックボタン処理
def btn_execute():
    messagebox.askquestion("【ブロック処理】", "検出したアカウントを全てブロックします。\n本当によろしいですか?", icon='warning')
    api = utils.get_cookie_api(user_ID[0],password[0])
    follower_block(user_ID[0],password[0])
    messagebox.showinfo("ブロック処理", "ブロック完了しました")

# クリアボタン処理
def btn_clear():
    temp_clear()
    txt_1.delete(0, tkinter.END)
    txt_2.delete(0, tkinter.END)
    txt_3.delete(0, tkinter.END)
    txt_1.focus_set()

#メイン処理
if __name__ == '__main__': 

    #自動モード処理
    if AutoMode == '1':
        for i in range(4):
            auto_id=config_ini.get('UserData', 'ID'+str(i+1))
            auto_pass=config_ini.get('UserData', 'Pass'+str(i+1))
            max_count=int(config_ini.get('UserData', 'checkFollowerNum'+str(i+1)))

            #pause = input("Please enter:")

            api = utils.get_cookie_api(auto_id,auto_pass)
            follower_check(auto_id,auto_pass,max_count)
            if len(block_users) >0 or len(block_lists) >0: 
                follower_block(auto_id,auto_pass)
            temp_clear()
        #pause = input("Please enter:")
        print("■処理完了")
        logger.info('■処理完了')
        sys.exit()

    # ユーザーインターフェイス画面作成
    tki = tkinter.Tk()
    tki.geometry('500x500')
    tki.title('ブロックツール')

    # Frame1の作成
    frame1 = ttk.Frame(tki, padding=10,width=480,height=120)
    frame1.grid()
    # ラベル

    s = StringVar()
    s.set('あなたのID')
    lbl_1 = ttk.Label(tki,textvariable=s, padding=5)
    lbl_1.place(x=60,y=20)
    # テキストボックス
    txt_1 = ttk.Entry(tki,show='*',width=20)
    txt_1.place(x=120,y=25)

    s2 = StringVar()
    s2.set('パスワード')
    lbl_2 = ttk.Label(tki,textvariable=s2, padding=5)
    lbl_2.place(x=60,y=50)
    # テキストボックス
    txt_2 = ttk.Entry(tki,show='*',width=20)
    txt_2.place(x=120,y=55)

    s3 = StringVar()
    s3.set('チェックユーザ数')
    lbl_3 = ttk.Label(tki,textvariable=s3, padding=5)
    lbl_3.place(x=260,y=20)
    # テキストボックス
    txt_3 = ttk.Entry(tki,width=20)
    txt_3.place(x=350,y=25)

    # Frame2の作成
    frame2 = ttk.Frame(tki, padding=00)
    frame2.place(x=10,y=120)

    # ブロック実行ボタンの作成
    export_button = ttk.Button(tki, text='ブロック実行', command=btn_execute, width=20)
    export_button.place(x=350,y=90)

    # テキスト出力ボックスの作成
    textboxname = StringVar()
    textboxname.set('出力内容')
    label3 = ttk.Label(frame2, textvariable=textboxname)
    label3.grid(row=0, column=0)
    textBox = scrolledtext.ScrolledText(frame2, width=68,height=26)
    textBox.grid(row=1, column=0)

    # チェックボタン
    btn = ttk.Button(tki, text='チェック', command=btn_check, width=20)
    #btn.bind("<Return>", press_enter_event())
    btn.place(x=120,y=90)

    # クリアボタン
    btn = ttk.Button(tki, text='クリア', command=btn_clear, width=10)
    #btn.bind("<Return>", press_enter_event())
    btn.place(x=20,y=90)
    
    # キー入力時のコールバック関数を定義
    def on_key1(event):
        txt_2.focus_set()

    # コールバック関数のバインド
    txt_1.bind("<Key-Return>", on_key1)

    # 以下、entry2用
    def on_key2(event):
        txt_3.focus_set()

    txt_2.bind("<Key-Return>", on_key2)

    def on_key3(event):
        txt_1.focus_set()

    txt_3.bind("<Key-Return>", on_key3)

    tki.mainloop()


【blockbot.ini】

※実行ファイルと同じフォルダに設置してください。

[Misc]
num_listing_followers = 200  1回の処理で表示するフォロアーの数
csvfile = block_words.csv #詐欺アカ判定ワードリストのファイル名

[Mode]
AutoMode = 0:自動モード(1:ON 0:OFF)

[UserData]
ID1~4:TwitterのID
Pass1~4:Twitterのパスワード
checkFollowerNum1~4:チェックするフォロアーの数

【jsonファイルについて】

プログラム本体と同じ場所に、自動でjsonファイルが生成されますが、これはアカウント毎のcookie情報ですので削除しないで下さい。

【今後の予定】

  1. 実行形式に変換
  2. ブラウザベースを検討…?

【お願い】

pyinstaller あるいは他のツールで正常に実行出来るように調整出来る方、方法をご教授頂ければ幸いです。
(変換は出来るのですが、logにtop-levelエラーが連発していて実行しても動きません)
どうぞ宜しくお願いいたします。

1
5
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
1
5

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?