LoginSignup
0
0

K-Anonymity for Spam Filtering: Case with Mastodon, and Misskey

Last updated at Posted at 2024-05-09

Introduction

This article introduces a case where K-Anonymity proved effective in responding to a large-scale spam incident exploiting applications based on ActivityPub protocol such as Mastodon and Misskey, which occurred in February 2024 (Case Study).

The incident necessitated a response due to the significantly higher network load compared to typical New Year's greeting messages.

Details

The website Have I Been Pwned maintains a vast database of compromised password information from hacking incidents. They offer services and APIs for assessing the security of passwords using K-Anonymity, free of charge.

Thanks to this, the service can also be utilized as a criterion for determining whether a given character string was generated by a human. I leveraged this aspect.

The following screenshot provides an example of actual spam messages that occurred.

Example of the spam message 1

Example of the spam message 2

(kuroneko6423, spam, ctkpaarr, ...)

Implementation

I utilized a proxy tool that acts as a middleman between HTTP requests with a two(Reverse and Forward) ways. I chose the tool for this purpose. Any tool capable of HTTP proxying could be similarly applied, even if not specifically this one.

As spam types continue to evolve, additional strategies are employed alongside K-Anonymity tests to reduce false positives or negatives. These supplementary strategies serve to validate whether the decision to block a request, as determined by the K-Anonymity test, was indeed correct.

The following code is written to perform spam filtering for Fediverse (ActivityPub).

import io
import re
import requests
import os.path

from decouple import config
from PIL import Image

from server import Extension

try:
    client_encoding = config('CLIENT_ENCODING')
    truecaptcha_userid = config('TRUECAPTCHA_USERID')   # truecaptcha.org
    truecaptcha_apikey = config('TRUECAPTCHA_APIKEY')   # truecaptcha.org
    dictionary_file = config('DICTIONARY_FILE', default='words_alpha.txt')    # https://github.com/dwyl/english-words
    librey_apiurl = config('LIBREY_APIURL')    # https://github.com/Ahwxorg/librey
except Exception as e:
    print ("[*] Invaild configration: %s" % (str(e)))

class Fediverse(Extension):
    def __init__(self):
        self.type = "filter"   # this is a filter
        
        # Load data to use KnownWords4 strategy
        # Download data: https://github.com/dwyl/english-words
        self.known_words = []
        if dictionary_file != '' and os.path.isfile(dictionary_file):
            with open(dictionary_file, "r") as file:
                words = file.readlines()
                self.known_words = [word.strip() for word in words if len(word.strip()) > 3]
                print ("[*] Data loaded to use KnownWords4 strategy")

    def test(self, filtered, data, webserver, port, scheme, method, url):
        # prevent cache confusing
        if data.find(b'<title>Welcome to nginx!</title>') > -1:
            return True

        # allowed conditions
        if method == b'GET' or url.find(b'/api') > -1:
            return False

        # convert to text
        data_length = len(data)
        text = data.decode(client_encoding, errors='ignore')
        error_rate = (data_length - len(text)) / data_length
        if error_rate > 0.2:    # it is a binary data
            return False

        # check ID with K-Anonymity strategy
        pattern = r'\b(?:(?<=\/@)|(?<=acct:))([a-zA-Z0-9]{10})\b'
        matches = list(set(re.findall(pattern, text)))
        if len(matches) > 0:
            print ("[*] Found ID: %s" % (', '.join(matches)))
            try:
                filtered = not all(map(self.pwnedpasswords_test, matches))
            except Exception as e:
                print ("[*] K-Anonymity strategy not working! %s" % (str(e)))
                filtered = True

        # feedback
        if filtered and len(matches) > 0:
            score = 0
            strategies = []

            # check ID with VowelRatio10 strategy
            def vowel_ratio_test(s):
                ratio = self.calculate_vowel_ratio(s)
                return ratio > 0.2 and ratio < 0.8
            if all(map(vowel_ratio_test, matches)):
                score += 1
                strategies.append('VowelRatio10')

            # check ID with Palindrome4 strategy
            if all(map(self.has_palindrome, matches)):
                score += 1
                strategies.append('Palindrome4')

            # check ID with KnownWords4 strategy
            if all(map(self.has_known_word, matches)):
                score += 2
                strategies.append('KnownWords4')

            # check ID with SearchEngine3 strategy
            if librey_apiurl != '' and all(map(self.search_engine_test, matches)):
                score += 1
                strategies.append('SearchEngine3')

            # check ID with RepeatedNumbers3 strategy
            if all(map(self.repeated_numbers_test, matches)):
                score += 1
                strategies.append('RepeatedNumbers3')

            # logging score
            with open('score.log', 'a') as file:
                file.write("%s\t%s\t%s\r\n" % ('+'.join(matches), str(score), '+'.join(strategies)))

            # make decision
            if score > 1:
                filtered = False

        # check an attached images (check images with Not-CAPTCHA strategy)
        if truecaptcha_userid != '' and not filtered and len(matches) > 0:
            def webp_to_png_base64(url):
                try:
                    response = requests.get(url)
                    img = Image.open(io.BytesIO(response.content))
                    img_png = img.convert("RGBA")
                    buffered = io.BytesIO()
                    img_png.save(buffered, format="PNG")
                    encoded_image = base64.b64encode(buffered.getvalue()).decode(client_encoding)
                    return encoded_image
                except:
                    return None

            urls = re.findall(r'https://[^\s"]+\.webp', text)
            if len(urls) > 0:
                for url in urls:
                    if filtered:
                        break

                    print ("[*] downloading... %s" % (url))
                    encoded_image = webp_to_png_base64(url)
                    print ("[*] downloaded.")
                    if encoded_image:
                        print ("[*] solving...")
                        try:
                            solved = truecaptcha_solve(encoded_image)
                            if solved:
                                print ("[*] solved: %s" % (solved))
                                filtered = filtered or (solved.lower() in ['ctkpaarr', 'spam'])
                            else:
                                print ("[*] not solved")
                        except Exception as e:
                            print ("[*] Not CAPTCHA strategy not working! %s" % (str(e)))

        return filtered

    # Strategy: K-Anonymity test - use api.pwnedpasswords.com
    def pwnedpasswords_test(self, s):
        # convert to lowercase
        s = s.lower()

        # SHA1 of the password
        p_sha1 = hashlib.sha1(s.encode()).hexdigest()

        # First 5 char of SHA1 for k-anonymity API use
        f5_sha1 = p_sha1[:5]

        # Last 5 char of SHA1 to match API output
        l5_sha1 = p_sha1[-5:]

        # Making GET request using Requests library
        response = requests.get(f'https://api.pwnedpasswords.com/range/{f5_sha1}')

        # Checking if request was successful
        if response.status_code == 200:
            # Parsing response text
            hashes = response.text.split('\r\n')

            # Using list comprehension to find matching hashes
            matching_hashes = [line.split(':')[0] for line in hashes if line.endswith(l5_sha1)]

            # If there are matching hashes, return True, else return False
            return bool(matching_hashes)
        else:
            raise Exception("api.pwnedpasswords.com response status: %s" % (str(response.status_code)))

        return False

    # Strategy: Not-CAPTCHA - use truecaptcha.org
    def truecaptcha_solve(self, encoded_image):
        url = 'https://api.apitruecaptcha.org/one/gettext'
        data = {
            'userid': truecaptcha_userid,
            'apikey': truecaptcha_apikey,
            'data': encoded_image,
            'mode': 'human'
        }
        response = requests.post(url = url, json = data)

        if response.status_code == 200:
            data = response.json()

            if 'error_message' in data:
                print ("[*] Error: %s" % (data['error_message']))
                return None
            if 'result' in data:
                return data['result']
        else:
            raise Exception("api.apitruecaptcha.org response status: %s" % (str(response.status_code)))

        return None

    # Strategy: VowelRatio10
    def calculate_vowel_ratio(self, s):
        # Calculate the length of the string.
        length = len(s)
        if length == 0:
            return 0.0

        # Count the number of vowels ('a', 'e', 'i', 'o', 'u', 'w', 'y') in the string.
        vowel_count = sum(1 for char in s if char.lower() in 'aeiouwy')

        # Define vowel-ending patterns
        vowel_ending_patterns = ['ang', 'eng', 'ing', 'ong', 'ung', 'ank', 'ink', 'dge']

        # Count the occurrences of vowel-ending patterns in the string.
        vowel_count += sum(s.count(pattern) for pattern in vowel_ending_patterns)

        # Calculate the ratio of vowels to the total length of the string.
        vowel_ratio = vowel_count / length

        return vowel_ratio

    # Strategy: Palindrome4
    def has_palindrome(self, input_string):
        def is_palindrome(s):
            return s == s[::-1]

        input_string = input_string.lower()
        n = len(input_string)
        for i in range(n):
            for j in range(i + 4, n + 1):  # Find substrings of at least 5 characters
                substring = input_string[i:j]
                if is_palindrome(substring):
                    return True
        return False

    # Strategy: KnownWords4
    def has_known_word(self, input_string):
        def is_known_word(s):
            return s in self.known_words

        input_string = input_string.lower()
        n = len(input_string)
        for i in range(n):
            for j in range(i + 4, n + 1):  # Find substrings of at least 5 characters
                substring = input_string[i:j]
                if is_known_word(substring):
                    return True
        return False

    # Strategy: SearchEngine3
    def search_engine_test(self, s):
        url = "%s/api.php?q=%s" % (librey_apiurl, s)
        response = requests.get(url, verify=False)
        if response.status_code != 200:
            return False

        data = response.json()

        if 'results_source' in data:
            del data['results_source']

        num_results = len(data)

        return num_results > 2

    # Strategy: RepeatedNumbers3
    def repeated_numbers_test(self, s):
        return bool(re.search(r'\d{3,}', s))

In Conclusion

I was able to achieve a reduction in load by blocking spam attacks even as they continued to occur.

If you are looking for an example of implementing an HTTP proxy capable of performing these functions, you can refer to the Caterpillar Proxy project.

Thank you.

0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0