Introduction
This article introduces a case where K-Anonymity proved effective in responding to a large-scale spam incident exploiting applications based on ActivityPub protocol such as Mastodon and Misskey, which occurred in February 2024 (Case Study).
The incident necessitated a response due to the significantly higher network load compared to typical New Year's greeting messages.
Details
The website Have I Been Pwned maintains a vast database of compromised password information from hacking incidents. They offer services and APIs for assessing the security of passwords using K-Anonymity, free of charge.
Thanks to this, the service can also be utilized as a criterion for determining whether a given character string was generated by a human. I leveraged this aspect.
The following screenshot provides an example of actual spam messages that occurred.
(kuroneko6423, spam, ctkpaarr, ...)
Implementation
I utilized a proxy tool that acts as a middleman between HTTP requests with a two(Reverse and Forward) ways. I chose the tool for this purpose. Any tool capable of HTTP proxying could be similarly applied, even if not specifically this one.
As spam types continue to evolve, additional strategies are employed alongside K-Anonymity tests to reduce false positives or negatives. These supplementary strategies serve to validate whether the decision to block a request, as determined by the K-Anonymity test, was indeed correct.
The following code is written to perform spam filtering for Fediverse (ActivityPub).
import io
import re
import requests
import os.path
from decouple import config
from PIL import Image
from server import Extension
try:
client_encoding = config('CLIENT_ENCODING')
truecaptcha_userid = config('TRUECAPTCHA_USERID') # truecaptcha.org
truecaptcha_apikey = config('TRUECAPTCHA_APIKEY') # truecaptcha.org
dictionary_file = config('DICTIONARY_FILE', default='words_alpha.txt') # https://github.com/dwyl/english-words
librey_apiurl = config('LIBREY_APIURL') # https://github.com/Ahwxorg/librey
except Exception as e:
print ("[*] Invaild configration: %s" % (str(e)))
class Fediverse(Extension):
def __init__(self):
self.type = "filter" # this is a filter
# Load data to use KnownWords4 strategy
# Download data: https://github.com/dwyl/english-words
self.known_words = []
if dictionary_file != '' and os.path.isfile(dictionary_file):
with open(dictionary_file, "r") as file:
words = file.readlines()
self.known_words = [word.strip() for word in words if len(word.strip()) > 3]
print ("[*] Data loaded to use KnownWords4 strategy")
def test(self, filtered, data, webserver, port, scheme, method, url):
# prevent cache confusing
if data.find(b'<title>Welcome to nginx!</title>') > -1:
return True
# allowed conditions
if method == b'GET' or url.find(b'/api') > -1:
return False
# convert to text
data_length = len(data)
text = data.decode(client_encoding, errors='ignore')
error_rate = (data_length - len(text)) / data_length
if error_rate > 0.2: # it is a binary data
return False
# check ID with K-Anonymity strategy
pattern = r'\b(?:(?<=\/@)|(?<=acct:))([a-zA-Z0-9]{10})\b'
matches = list(set(re.findall(pattern, text)))
if len(matches) > 0:
print ("[*] Found ID: %s" % (', '.join(matches)))
try:
filtered = not all(map(self.pwnedpasswords_test, matches))
except Exception as e:
print ("[*] K-Anonymity strategy not working! %s" % (str(e)))
filtered = True
# feedback
if filtered and len(matches) > 0:
score = 0
strategies = []
# check ID with VowelRatio10 strategy
def vowel_ratio_test(s):
ratio = self.calculate_vowel_ratio(s)
return ratio > 0.2 and ratio < 0.8
if all(map(vowel_ratio_test, matches)):
score += 1
strategies.append('VowelRatio10')
# check ID with Palindrome4 strategy
if all(map(self.has_palindrome, matches)):
score += 1
strategies.append('Palindrome4')
# check ID with KnownWords4 strategy
if all(map(self.has_known_word, matches)):
score += 2
strategies.append('KnownWords4')
# check ID with SearchEngine3 strategy
if librey_apiurl != '' and all(map(self.search_engine_test, matches)):
score += 1
strategies.append('SearchEngine3')
# check ID with RepeatedNumbers3 strategy
if all(map(self.repeated_numbers_test, matches)):
score += 1
strategies.append('RepeatedNumbers3')
# logging score
with open('score.log', 'a') as file:
file.write("%s\t%s\t%s\r\n" % ('+'.join(matches), str(score), '+'.join(strategies)))
# make decision
if score > 1:
filtered = False
# check an attached images (check images with Not-CAPTCHA strategy)
if truecaptcha_userid != '' and not filtered and len(matches) > 0:
def webp_to_png_base64(url):
try:
response = requests.get(url)
img = Image.open(io.BytesIO(response.content))
img_png = img.convert("RGBA")
buffered = io.BytesIO()
img_png.save(buffered, format="PNG")
encoded_image = base64.b64encode(buffered.getvalue()).decode(client_encoding)
return encoded_image
except:
return None
urls = re.findall(r'https://[^\s"]+\.webp', text)
if len(urls) > 0:
for url in urls:
if filtered:
break
print ("[*] downloading... %s" % (url))
encoded_image = webp_to_png_base64(url)
print ("[*] downloaded.")
if encoded_image:
print ("[*] solving...")
try:
solved = truecaptcha_solve(encoded_image)
if solved:
print ("[*] solved: %s" % (solved))
filtered = filtered or (solved.lower() in ['ctkpaarr', 'spam'])
else:
print ("[*] not solved")
except Exception as e:
print ("[*] Not CAPTCHA strategy not working! %s" % (str(e)))
return filtered
# Strategy: K-Anonymity test - use api.pwnedpasswords.com
def pwnedpasswords_test(self, s):
# convert to lowercase
s = s.lower()
# SHA1 of the password
p_sha1 = hashlib.sha1(s.encode()).hexdigest()
# First 5 char of SHA1 for k-anonymity API use
f5_sha1 = p_sha1[:5]
# Last 5 char of SHA1 to match API output
l5_sha1 = p_sha1[-5:]
# Making GET request using Requests library
response = requests.get(f'https://api.pwnedpasswords.com/range/{f5_sha1}')
# Checking if request was successful
if response.status_code == 200:
# Parsing response text
hashes = response.text.split('\r\n')
# Using list comprehension to find matching hashes
matching_hashes = [line.split(':')[0] for line in hashes if line.endswith(l5_sha1)]
# If there are matching hashes, return True, else return False
return bool(matching_hashes)
else:
raise Exception("api.pwnedpasswords.com response status: %s" % (str(response.status_code)))
return False
# Strategy: Not-CAPTCHA - use truecaptcha.org
def truecaptcha_solve(self, encoded_image):
url = 'https://api.apitruecaptcha.org/one/gettext'
data = {
'userid': truecaptcha_userid,
'apikey': truecaptcha_apikey,
'data': encoded_image,
'mode': 'human'
}
response = requests.post(url = url, json = data)
if response.status_code == 200:
data = response.json()
if 'error_message' in data:
print ("[*] Error: %s" % (data['error_message']))
return None
if 'result' in data:
return data['result']
else:
raise Exception("api.apitruecaptcha.org response status: %s" % (str(response.status_code)))
return None
# Strategy: VowelRatio10
def calculate_vowel_ratio(self, s):
# Calculate the length of the string.
length = len(s)
if length == 0:
return 0.0
# Count the number of vowels ('a', 'e', 'i', 'o', 'u', 'w', 'y') in the string.
vowel_count = sum(1 for char in s if char.lower() in 'aeiouwy')
# Define vowel-ending patterns
vowel_ending_patterns = ['ang', 'eng', 'ing', 'ong', 'ung', 'ank', 'ink', 'dge']
# Count the occurrences of vowel-ending patterns in the string.
vowel_count += sum(s.count(pattern) for pattern in vowel_ending_patterns)
# Calculate the ratio of vowels to the total length of the string.
vowel_ratio = vowel_count / length
return vowel_ratio
# Strategy: Palindrome4
def has_palindrome(self, input_string):
def is_palindrome(s):
return s == s[::-1]
input_string = input_string.lower()
n = len(input_string)
for i in range(n):
for j in range(i + 4, n + 1): # Find substrings of at least 5 characters
substring = input_string[i:j]
if is_palindrome(substring):
return True
return False
# Strategy: KnownWords4
def has_known_word(self, input_string):
def is_known_word(s):
return s in self.known_words
input_string = input_string.lower()
n = len(input_string)
for i in range(n):
for j in range(i + 4, n + 1): # Find substrings of at least 5 characters
substring = input_string[i:j]
if is_known_word(substring):
return True
return False
# Strategy: SearchEngine3
def search_engine_test(self, s):
url = "%s/api.php?q=%s" % (librey_apiurl, s)
response = requests.get(url, verify=False)
if response.status_code != 200:
return False
data = response.json()
if 'results_source' in data:
del data['results_source']
num_results = len(data)
return num_results > 2
# Strategy: RepeatedNumbers3
def repeated_numbers_test(self, s):
return bool(re.search(r'\d{3,}', s))
In Conclusion
I was able to achieve a reduction in load by blocking spam attacks even as they continued to occur.
If you are looking for an example of implementing an HTTP proxy capable of performing these functions, you can refer to the Caterpillar Proxy project.
Thank you.