今回は、若干のバグ修正と細かい調整のみですが、これである程度完成となりました。とりあえずの最終形として、これまでの記事に書いていた説明も含めこの記事で全てわかるように掲載します。
【これまでの記事】
【開発の背景】
2023年6月頃から、詐欺アカからのフォローが急増。7月からは新たな嫌がらせ手法として、自分を勝手に卑猥なタイトルのリストに登録する詐欺垢が多発しました。
【A:開発前から懸念された問題】
- TwitterAPIが有料、しかもボッタクリ価格(月額100ドル:現在約14,000円)となり、無料アカウントでAPI使用は敷居が高くなった
- スクレイピングに対する規制が厳しくなり、botが凍結される事案が多くなってきた
- 開発に時間が掛かるとTwitterの仕様が変更される可能性が高く、実用性が低くなる
- サーバーに設置する仕様では、ハッキングによる使用者のIDとパスワード漏洩やアカウント乗っ取りのリスクがあり、責任を負いきれない
- どのPCでも単体で実行出来るファイル形式にするには C#やVB.Netなどでコード作成しコンパイルするのが一般的だが、私のスキルでは短期間の制作は難しい
【B:開発中に発覚した問題】
- 自分を勝手にリストへ登録する詐欺アカをtweepy経由でブロックしても、リストから自分が消えないバグがある(Twitter側の不具合) →解決済み
- どの環境でも単体ファイルで実行出来る形式に変換する pyinstaller で上手く変換できない→ほぼ解決
【C:現時点の機能状況】
- 自分のTwitterIDとパスワード、検索するフォロアー数を入力してチェックし、検出した詐欺アカを確認の上、一括ブロック出来るようになった
- ブロック処理はtweepyのreport_spamを使用しているため、通報&ブロックしている(ただのブロックでは無く、詐欺アカ凍結の確率が上がる通報)
- 登録した判定ワード以外でも、フォロアー数/ツイート数/いいね数の設定条件によって詐欺アカと判定
- A1~2の問題について、get_cookie_apiを使用してTwitterAPIを不使用(詳細割愛:お察しください)
- B1の問題は、playwright を使用して実現(詳細割愛:お察しください)
- tkinter でユーザーインターフェイスを作成
- 完全自動のオートモードを搭載
- 鍵アカをブロックする機能
- 誤ブロック緩和機能:意味の無い行為ですが、プロフィールに「プレゼント系、セフレ系、投資家お断り」等書いている人が散見され、こういうアカウントも当然、このツールは自動でブロックしてしまいます。そこで暫定対策として、「ブロック」という言葉があればブロック判定を緩和する機能を付けました。
見落としがちなメリットですが、このツールが普及し皆さんが短期スパンでブロックするようになれば、詐欺アカの凍結が劇的に速くなります。(複数からブロックされると、システム側が不審なアカウントと判断するため)
【ソースコード】
#■■ blockbot Twitterの詐欺アカウント自動ブロックツール ■■ 2023.7.15 Nobukz
# ver.1.0 API不要のcookie仕様
# ver.2.0 tkinterにてUI実装
# ver.3.0 複数アカウントの切替対応
# ver.4.0 自分を勝手にリストへ登録する詐欺アカをブロック
# ※通常blockだとリストから削除されないため、playwrightでブラウザ操作をエミュレート
# ver.5.0 詐欺アカ判定用ワードを外部ファイルへ分離
# ver.6.0 iniファイルで設定読込/自動モード追加/鍵アカブロック追加
# ver.6.1 詐欺アカ判定ワードリストのグループが2つまでしか作れない不具合を修正(制限なし)
import sys
import utils
import tweepy
import time
import tkinter
from tkinter import *
from tkinter import ttk
from tkinter import filedialog
from tkinter import messagebox
import tkinter as tk
from tkinter import scrolledtext
from playwright.sync_api import Playwright, sync_playwright, expect
import configparser
import os
import errno
import logging
from datetime import datetime, timedelta, timezone
#Log設定
logger = logging.getLogger()
logger.setLevel(logging.DEBUG)
handler = logging.FileHandler('blockbot.log', encoding='utf-8')
handler.setLevel(logging.DEBUG)
formatter = logging.Formatter('%(levelname)s %(asctime)s [%(name)s] %(message)s')
handler.setFormatter(formatter)
logger.addHandler(handler)
logger.info('■■■■ 処理開始 ■■■■')
hit_count = 0
max_count = 0
block_users = []
block_IDs = []
block_lists = []
block_list_makers = []
block_list_IDs = []
block_group = []
block_border = []
group_max = []
block_words =[]
word_check = []
bwords = []
user_ID = []
password = []
checkFollowerNum =[]
row_count=0
word_count=0
group_count=0
mem_group=0
#iniファイル詠み込み
inifile_path = "" + "\\"
inifile = 'blockbot.ini'
check_path = inifile_path + inifile
if inifile_path =="\\":
check_path = inifile
config_ini = configparser.ConfigParser()
# 指定したiniファイルが存在しない場合、エラー発生
if not os.path.exists(check_path):
raise FileNotFoundError(errno.ENOENT, os.strerror(errno.ENOENT), check_path)
config_ini.read(check_path, encoding='utf-8')
num_listing_followers = int(config_ini.get('Misc', 'num_listing_followers'))
csvfile = config_ini.get('Misc', 'csvfile')
min_followers = int(config_ini.get('Misc', 'min_followers'))
min_statuses = int(config_ini.get('Misc', 'min_statuses'))
min_favourites = int(config_ini.get('Misc', 'min_favourites'))
AutoMode = config_ini.get('Mode', 'AutoMode')
protected_user_block = config_ini.get('Mode', 'protected_user_block')
lst = [line.rstrip().split(",") for line in open(csvfile, newline='\n',encoding='utf-8').readlines()]
for row in lst:
if row_count ==0:
block_group.append(int(row[0])-1)
block_border.append(int(row[1])-1)
lst1 = row[0]
if lst1 !='':
if int(lst1) >0:
if int(lst1)-1 != mem_group:
twords =tuple(bwords)
block_words.append(twords)
group_max.append(word_count)
group_count=group_count+1
block_group.append(int(row[0])-1)
block_border.append(int(row[1])-1)
mem_group=int(row[0])-1
word_count=0
bwords.clear()
bwords.append(str(row[2]))
#print("■BlockWord["+str(group_count+1)+","+str(word_count+1)+"]:"+"/"+str(row[2])+"/border:"+str(block_border[group_count]+1))
logger.info('BlockWord[%s,%s] %s/Border:%s',str(group_count+1),str(word_count+1),str(row[2]),str(block_border[group_count]+1))
word_count=word_count+1
row_count=row_count+1
twords =tuple(bwords)
block_words.append(twords)
group_max.append(word_count)
#詐欺アカチェック処理
def follower_check(id,pwd,max_count):
api = utils.get_cookie_api(id,pwd)
followers = []
hit_count = 0
for page in tweepy.Cursor(api.get_followers, screen_name=id,count=num_listing_followers).pages():
followers += [user._json for user in page]
for i in range(len(page)):
judge=0
word_check.clear()
#print("■■■■ フォロワー:" + str(page[i].screen_name))
#logger.info('■■■■ フォロワー:%s',str(page[i].screen_name))
chk_count=0
for chk_group in range(len(block_group)):
#print("■group_max:"+str(group_max[chk_count]))
chk_count2=0
word_check.clear()
for chk in range(group_max[chk_count]):
#print("■ word:"+str(block_words[chk_count][chk_count2]))
gmax = group_max[chk_count]
fword = block_words[chk_count][chk_count2]
if str(page[i].description).rfind(fword) > 0:
word_check.append(1)
print("■ Hit:"+str(block_words[chk_count][chk_count2])+"/"+str(page[i].screen_name))
logger.info('Hit:%s/%s',str(block_words[chk_count][chk_count2]),str(page[i].screen_name))
chk_count2=chk_count2+1
if sum(word_check) > block_border[chk_count]:
judge=1
chk_count=chk_count+1
#print("■ Hit:"+str(sum(word_check))+"/"+str(block_border[chk_count-1]))
#logger.info('■ Hit:%s/%s:%s',str(sum(word_check)),str(block_border[chk_count-1]),str(page[i].screen_name))
if str(page[i].description).rfind("ブロック")>0 or str(page[i].description).rfind("🥦")>0 or str(page[i].description).rfind("通報")>0:
word_check.append(-1)
chk1 = page[i].followers_count
chk2 = page[i].statuses_count
chk3 = page[i].favourites_count
chk4 = page[i].protected
if chk1 < min_followers-10:
judge=1
if chk1 < min_followers and chk2 < min_statuses and chk3 < min_favourites:
judge=1
if chk2 < 5 and chk3 < 5:
judge=1
if chk4 == True and protected_user_block=='1':
judge=1
result = "Flw-Tw-Fav:" + str(chk1) + "-" + str(chk2) + "-" + str(chk3)
if judge == 1:
block_users.append(page[i].screen_name)
block_IDs.append(page[i].id)
hit_count = hit_count +1
#textBox.insert(END, "【" + str(hit_count) +"】" + str(page[i].screen_name) + "/" + result + "/" + str(page[i].description) + "\n\n")
send_textbox("【" + str(hit_count) +"】" + str(page[i].screen_name) + "/" + result + "/" + str(page[i].description) + "\n\n")
print("■■ HitUser :" + str(page[i].id) + "/" + str(page[i].screen_name) + "/" + result + "/" + str(page[i].description))
print("取得数",len(followers))
logger.info('取得数 %s',len(followers))
time.sleep(1)
if len(followers) > max_count:
#break
return hit_count
mem_list = api.get_list_memberships(screen_name=id)
for mem in mem_list:
judge=0
word_check.clear()
#print("■■■■ List:" + str(mem.name))
chk_count=0
for chk_group in range(len(block_group)):
#print("■group_max:"+str(group_max[chk_count]))
chk_count2=0
word_check.clear()
for chk in range(group_max[chk_count]):
#print("■ word:"+str(block_words[chk_count][chk_count2]))
if str(mem.description).rfind(block_words[chk_count][chk_count2]) > 0:
word_check.append(1)
#print("■ Hit:"+str(block_words[chk_count][chk_count2]))
logger.info('■ Hit:%s',str(block_words[chk_count][chk_count2]))
chk_count2=chk_count2+1
if sum(word_check) > block_border[chk_count]:
judge=1
chk_count=chk_count+1
#print("■ Hit:"+str(sum(word_check))+"/"+str(block_border[chk_count-1]))
logger.info('■ Hit:%s/%s',str(sum(word_check)),str(block_border[chk_count-1]))
if str(page[i].description).rfind("ブロック")>0 or str(page[i].description).rfind("🥦")>0 or str(page[i].description).rfind("通報")>0:
word_check.append(-1)
chk1 = mem.user.followers_count
chk2 = mem.user.statuses_count
chk3 = mem.user.favourites_count
chk4 = mem.user.protected
if chk1 < min_followers-10:
judge=1
if chk1 < min_followers and chk2 < min_statuses and chk3 < min_favourites:
judge=1
if chk2 < 5 and chk3 < 5:
judge=1
if chk4 == True and protected_user_block=='1':
judge=1
result = "Flw-Tw-Fav:" + str(chk1) + "-" + str(chk2) + "-" + str(chk3)
if judge == 1:
#print("■ Hitlist: "+mem.name, "screen_name: "+mem.user.screen_name)
logger.info('■ Hitlist:%s name:%s',mem.name, mem.user.screen_name)
block_lists.append(mem.name)
block_list_makers.append(mem.user.screen_name)
block_list_IDs.append(mem.id)
hit_count = hit_count +1
send_textbox("【"+str(hit_count) +"】"+"List: "+mem.name+"/"+str(mem.user.screen_name)+"/"+str(mem.id)+"/"+result+"/"+str(mem.user.description)+"\n\n")
#ブロック処理
def follower_block(id,pwd):
#詐欺アカブロック(ユーザー単位)
api = utils.get_cookie_api(id,pwd)
for block_user in range(len(block_users)):
api.report_spam(screen_name=block_users[block_user])
send_textbox("■ブロック【" + str(block_user) +"】" + str(block_users[block_user]) + "\n")
#print("■■■ ブロック完了:" + str(block_users[block_user]))
logger.info('■■■ブロック完了:%s',str(block_users[block_user]))
if len(block_lists) >0:
#詐欺アカに追加されたリストから自分を消す処理1:いったんブロック解除
for block_list in range(len(block_lists)):
api.destroy_block(screen_name=block_list_makers[block_list])
send_textbox("■いったんブロック解除:" + str(block_list_makers[block_list]) + "\n")
#print("■いったんブロック解除:" + str(block_list_makers[block_list]))
logger.info('■いったんブロック解除:%s',str(block_list_makers[block_list]))
#詐欺アカに追加されたリストから自分を消す処理2:リストから自分を消すブロック処理
with sync_playwright() as p:
browser = p.chromium.launch(headless=False)
context = browser.new_context()
page = context.new_page()
page.goto("https://twitter.com/i/flow/login?redirect_after_login=%2F" + id)
page.locator("label div").nth(3).click()
page.get_by_label("電話番号/メールアドレス/ユーザー名").fill(
id)
page.get_by_role("button", name="次へ").click()
page.get_by_label("パスワード", exact=True).click()
page.get_by_label("パスワード", exact=True).fill(pwd)
page.get_by_test_id("LoginForm_Login_Button").click()
page.get_by_label("リスト").click()
page.get_by_test_id("primaryColumn").get_by_label("もっと見る").click()
page.get_by_role("menuitem", name="自分がメンバーとなっているリスト").click()
for block_list in range(len(block_lists)):
fix_text="https://twitter.com/i/lists/" + str(block_list_IDs[block_list])
#print("■リスト検索:" + str(fix_text))
logger.info('■リスト検索:%s',str(fix_text))
page.goto(fix_text)
page.get_by_test_id("primaryColumn").get_by_label("もっと見る").click()
page.get_by_text("@" + block_list_makers[block_list] + "さんをブロック").click()
page.get_by_test_id("confirmationSheetConfirm").click()
print("■対象リストブロック完了:" + str(block_lists[block_list]) + "/@" + str(block_list_makers[block_list]))
logger.info('■対象リストブロック完了:%s/@%s',str(block_lists[block_list]),str(block_list_makers[block_list]))
time.sleep(1)
# クリア処理
def temp_clear():
hit_count = 0
block_users.clear()
block_IDs.clear()
block_lists.clear()
block_list_IDs.clear()
block_list_makers.clear()
user_ID.clear()
password.clear()
#テキストボックスへ転記処理
def send_textbox(word):
if AutoMode != '1':
textBox.insert(END, word)
textBox.see(END)
textBox.update()
#チェックボタン処理
def btn_check():
#messagebox.showinfo("確認", "チェックを開始します")
# テキスト取得
#hit_count = 0
textBox.delete("1.0", END)
textBox.update()
user_ID.append(txt_1.get())
password.append(txt_2.get())
if txt_3.get() == '':
messagebox.showinfo("【エラー】", "全てのボックスに入力して下さい")
return()
max_count = int(txt_3.get())
if user_ID == "" or password == "" or max_count < 1:
messagebox.showinfo("【エラー】", "全てのボックスに入力して下さい")
return()
api = utils.get_cookie_api(user_ID[0],password[0])
hit_count = follower_check(user_ID[0],password[0],max_count)
messagebox.showinfo("チェック処理", "抽出数:" + str(hit_count) + " / " + str(max_count) + "\n\nチェック完了しました")
#ブロックボタン処理
def btn_execute():
messagebox.askquestion("【ブロック処理】", "検出したアカウントを全てブロックします。\n本当によろしいですか?", icon='warning')
api = utils.get_cookie_api(user_ID[0],password[0])
follower_block(user_ID[0],password[0])
messagebox.showinfo("ブロック処理", "ブロック完了しました")
# クリアボタン処理
def btn_clear():
temp_clear()
txt_1.delete(0, tkinter.END)
txt_2.delete(0, tkinter.END)
txt_3.delete(0, tkinter.END)
txt_1.focus_set()
#メイン処理
if __name__ == '__main__':
#自動モード処理
if AutoMode == '1':
for i in range(4):
auto_id=config_ini.get('UserData', 'ID'+str(i+1))
auto_pass=config_ini.get('UserData', 'Pass'+str(i+1))
max_count=int(config_ini.get('UserData', 'checkFollowerNum'+str(i+1)))
#pause = input("Please enter:")
api = utils.get_cookie_api(auto_id,auto_pass)
follower_check(auto_id,auto_pass,max_count)
if len(block_users) >0 or len(block_lists) >0:
follower_block(auto_id,auto_pass)
temp_clear()
#pause = input("Please enter:")
print("■処理完了")
logger.info('■処理完了')
sys.exit()
# ユーザーインターフェイス画面作成
tki = tkinter.Tk()
tki.geometry('500x500')
tki.title('ブロックツール')
# Frame1の作成
frame1 = ttk.Frame(tki, padding=10,width=480,height=120)
frame1.grid()
# ラベル
s = StringVar()
s.set('あなたのID')
lbl_1 = ttk.Label(tki,textvariable=s, padding=5)
lbl_1.place(x=60,y=20)
# テキストボックス
txt_1 = ttk.Entry(tki,show='*',width=20)
txt_1.place(x=120,y=25)
s2 = StringVar()
s2.set('パスワード')
lbl_2 = ttk.Label(tki,textvariable=s2, padding=5)
lbl_2.place(x=60,y=50)
# テキストボックス
txt_2 = ttk.Entry(tki,show='*',width=20)
txt_2.place(x=120,y=55)
s3 = StringVar()
s3.set('チェックユーザ数')
lbl_3 = ttk.Label(tki,textvariable=s3, padding=5)
lbl_3.place(x=260,y=20)
# テキストボックス
txt_3 = ttk.Entry(tki,width=20)
txt_3.place(x=350,y=25)
# Frame2の作成
frame2 = ttk.Frame(tki, padding=00)
frame2.place(x=10,y=120)
# ブロック実行ボタンの作成
export_button = ttk.Button(tki, text='ブロック実行', command=btn_execute, width=20)
export_button.place(x=350,y=90)
# テキスト出力ボックスの作成
textboxname = StringVar()
textboxname.set('出力内容')
label3 = ttk.Label(frame2, textvariable=textboxname)
label3.grid(row=0, column=0)
textBox = scrolledtext.ScrolledText(frame2, width=68,height=26)
textBox.grid(row=1, column=0)
# チェックボタン
btn = ttk.Button(tki, text='チェック', command=btn_check, width=20)
#btn.bind("<Return>", press_enter_event())
btn.place(x=120,y=90)
# クリアボタン
btn = ttk.Button(tki, text='クリア', command=btn_clear, width=10)
#btn.bind("<Return>", press_enter_event())
btn.place(x=20,y=90)
# キー入力時のコールバック関数を定義
def on_key1(event):
txt_2.focus_set()
# コールバック関数のバインド
txt_1.bind("<Key-Return>", on_key1)
# 以下、entry2用
def on_key2(event):
txt_3.focus_set()
txt_2.bind("<Key-Return>", on_key2)
def on_key3(event):
txt_1.focus_set()
txt_3.bind("<Key-Return>", on_key3)
tki.mainloop()
【blockbot.ini】
※実行ファイルと同じフォルダに設置してください。
※コメント部分は削除して下さい
[Misc]
num_listing_followers = 200 '1回の処理で表示するフォロアーの数'
csvfile = block_words.csv '詐欺アカ判定ワードリストのファイル名'
min_followers = 30 '最低フォロワー数'
min_statuses = 15 '最低ツイート数'
min_favourites = 15 '最低いいね数'
[Mode]
AutoMode = 0 '自動モード(1:ON 0:OFF)'
protected_user_block = 0 '鍵アカウントをブロックする(1:ON 0:OFF)'
[UserData]
ID1 = test001
Pass1 = pass001
checkFollowerNum1 = 200
ID2 =
:
:
'ID1~4:TwitterのID'
'Pass1~4:Twitterのパスワード'
'checkFollowerNum1~4:チェックするフォロアーの数'
【詐欺アカ判定ワードリスト】
※実行ファイルと同じフォルダに設置してください。
データの説明
- 1つ目:グループ番号(1~特に制限なし)
- 2つ目:いくつのワードが含まれていたら詐欺アカと判定するか(グループ単位)
- 3つ目:詐欺アカと判定するためのワード
下のサンプルで言うと、
グループ1は「現金,資産,高配当株,プレゼント,経営,🚺,♀,おふぱこ,はめ撮り,マン凸」の内、2つ以上のワードがプロフィール欄に含まれていれば詐欺アカ判定する
グループ2は「壺,統一教会,ネトウヨ」の内2つ以上のワードが含まれていれば詐欺アカ判定
1,2,現金
1,2,資産
1,2,高配当株
1,2,プレゼント
1,2,経営
1,2,🚺
1,2,♀
1,2,おふぱこ
1,2,はめ撮り
1,2,マン凸
2,2,壺
2,2,統一教会
2,2,ネトウヨ
【jsonファイルについて】
プログラム本体と同じ場所に、自動でjsonファイルが生成されますが、これはアカウント毎のcookie情報ですので削除しないで下さい。
【完全自動モードについて】
下記のようなバッチファイルを作成し、タスクスケジューラにセットすれば、完全自動化が可能になります。
※コメントは削除して下さい
cd c:\python\blockbot '自分がプログラムを保存した場所'
python blockbot.py 'pythonフォルダにPathが通っている前提'
【実行形式への変換】
pythonスクリプトを実行形式(*.exe)に変換するツールを使って、pythonをインストールしていないPCでも実行が出来ます。(環境によっては不安定)
下記のオプションで正常動作を確認済みです。詳しい仕様などは上記サイトでご確認ください。
pyinstaller blockbot.py --clean --noconfirm --onefile --key ABCDE123