Help us understand the problem. What is going on with this article?

twitterのデータをSPAMとHAMに分けてみる

More than 5 years have passed since last update.

動機

前々からtwitterのデータを使って何かやってみたいな、と思っていました。そこで、MeCabとcabochaを入れるところから、SQLiteを活用した簡単な分類器を作るってみた次第です。
(遊びでやってみただ(ry )
以下、やったことを適当にまとめていきます。

インストールしたもの

  • MeCab 0.996
  • mecab-python-0.996
  • Virtual C++ 2008 Express Edition(vcvarsall.batが必要なためです)
  • Cabocha(今回は使いませんので説明も全くしませんが、今後のために...)

インストール方法などに関してはWindowsにmecab-pythonを導入を参考にして下さい。
このサイトでも太字で書いてあるのですが、mecab.hを書き換える時は必ず管理者権限で開いて変更を行って下さい。でないと、変更されたと認識されません。僕もハマりました。。。

ZAORIKUさんのやられているやり方でもMeCab自体は使えるのですが、latticeが使えないとCabochaが使えないためMeCab 0.996をインストールされる方が良いと思います。

twitter APIでデータを取得する

Twitter APIの使い方

Tweepyの使い方

search APIでのデータ取得時は、とりあえずcursorを使うのと、docでどんなモノが返ってくるのかを確認された方が良いと思います。

MeCabの辞書登録

windowsのインストーラでインストールした場合は、utf-8を選択したとしても、辞書の元となるcsvファイルはshift-jisのままです(システム辞書はutf-8なのに...)。ですのでシステム辞書への追加をする場合は以下のコマンドを参考にして下さい。

mecab-dict-index  -f shift-jis -t utf-8

-fでcsvファイルの文字コード、-tでシステム辞書の文字コードを指定できますので、上のコマンドでshift-jisのcsvファイルの内容をutf-8のシステム辞書に登録できます。
(もしくは、全てのファイルをutf-8に統一させるのが一番楽かもしれません。。)

分類器にかける題材

ここまでで必要なアプリをインストールして、twitter API使ってデータ取得して、つぶやきの解析に必要な辞書も整備出来ました。
あとは分類する題材です。
パッと浮かんだのが、「明日、ママがいない」というTV番組です。
これって「明日、ママと買い物に~」とか「明日、ママがいないからお留守番~」とか、っていうノイズが検索結果に結構入ってきそうですよね。
ということで検索キーワードは「明日ママ」にしてデータを取得しました。
ちなみに、streaming APIは日本語などの区切りが曖昧な言語の検索はまだ対応出来ていないようですので、ご注意下さい。

参考コード

あとは「明日、ママがいない」という番組に関するツイートとそうでないものにラベルを付けて、学習させます。
前処理方法(unicode正規化、大文字化、全角化など)、取得するモノ(Hash tag, URL, words, ...)などについては各自目的に合わせて工夫していただきたく。
ということで今回は前処理方法などについてのコードは割愛します。

NBのコード

将来的に忘却係数なども使えるように、tweet毎にwordをDBに保存するようにしています。
それと、簡易的な変数選択方法として、spam確率が0.4~0.6のwordは使用しないようにしています。
あとは、ラプラススムージングを使ってます。ノイズの軽減と0値対策のためです。
では以下にコードを記載します。

# coding: utf-8

import sqlite3 as sqlite
import pickle
from math import log, exp

from separatewords import MecabTokenize # コードは載せませんが、ここで表記ゆれなども修正しています

class BF(object):
    """ベイズ分類器の訓練とテストを行う
    defaultでテーブルが存在していたら削除するので、
    既存のDBを使用する場合は、create_table=0を引数に加える
    """    
    def __init__(self, fname, dbname, use=0):
        """trainingではuse=0
        testではuse=1
        classifyではuse=2
        """
        self.fname = fname # input file name
        self.con = sqlite.connect(dbname)
        self.con.text_factory = str # utf-8を使用するためにstrを指定
        if use==0:
            self.createindextables() # tableの作成

        self.spam_denominator = 0.0
        self.ham_denominator = 0.0

        self.ham_weight = 1.0
        self.init_pai = 0.4
        self.threshold = 0.1

    def __del__(self):
        self.con.close()

    def dbcommit(self):
        self.con.commit()

    def train(self):
        """10文字未満のtweetは対象外とする"""
        with open(self.fname,'r', 1000) as trainf:
            for line in trainf:
                tid, dtime, aid, tweet, y = line.strip().split('\t')

                wordlist = self.get_wordlist(tweet)
                # 文が10文字未満だとmecabがバグるので飛ばす
                if wordlist == True: print 'skip: %s' % (tweet); continue

                y = int(0) if int(y)<1 else int(1)  # spam=1, ham=0に統一する

                self.addtoindex_tweet(tweet, wordlist, y, dtime)
                if y==1: self.addtoindex_class(wordlist,'spam_words')
                else: self.addtoindex_class(wordlist,'ham_words')
                self.addtoindex_score(wordlist)
        self.calc_denominator()
        self.calc_word_prob()
        self.predict()

    def test(self, ifname):
        """訓練済みDBを使用して交差検証を行う
        10文字未満のtweetは対象外とする
        """
        with open(ifname, 'r', 1000) as testf:
            prior_spam, prior_ham = self.calc_cat_prob() # p(spam), p(ham)
            log_prior_spam = log(prior_spam)
            log_prior_ham = log(prior_ham)

            res = []
            ans = [0.0, 0.0, 0.0, 0.0]

            for line in testf:
                tid, dtime, aid, tweet, y = line.strip().split('\t')
                print 'testing:', tweet

                wordlist = self.get_wordlist(tweet)
                # 文が10文字未満だとmecabがバグるので飛ばす
                if wordlist == True: print 'skip: %s' % (tweet); continue

                y = int(0) if int(y)<1 else int(1)  # spam=1, ham=0に統一する

                spam_score = self.pred_score(wordlist,log_prior_spam,log_prior_ham)
                res = 1 if spam_score > 0.5 else 0

                # 結果の表の計算
                ans = self.get_ans(ans, y, res)
            print ans

    def classify(self,clfname,classify_dbname):
        """10文字未満のtweetは対象外とする"""
        self.clsfdb_con = sqlite.connect(classify_dbname)
        self.create_classified_indextables()
        self.clsfdb_con.text_factory = str # utf-8を使用するためにstrを指定

        with open(clfname, 'r', 1000) as testf:
            prior_spam, prior_ham = self.calc_cat_prob() # p(spam), p(ham)
            log_prior_spam = log(prior_spam)
            log_prior_ham = log(prior_ham)

            for line in testf:
                tid, dtime, aid, tweet = line.strip().split('\t')

                wordlist = self.get_wordlist(tweet)
                # 文が10文字未満だとmecabがバグるので飛ばす
                if wordlist == True: print 'skip: %s' % (tweet); continue

                spam_score = self.pred_score(wordlist,log_prior_spam,log_prior_ham)
                label = 1 if spam_score > 0.5 else 0
                self.addtoindex_classified_table(tweet, wordlist, spam_score, label, dtime)

    def pred_score(self,wordlist,log_prior_spam,log_prior_ham):
        """spam_scoreの推定する"""
        m = len(wordlist) - 1
        psm = m*log_prior_spam
        phm = m*log_prior_ham
        denom_prior = phm - psm
        denom_score = 0.0
        for word in wordlist:
            w_score = self.con.execute("select spam_score from words_score where word='%s'" % (word)).fetchone()
            if w_score is None: w_score = self.init_pai
            else: w_score = w_score[0]
            if abs(w_score-0.5) > self.threshold:
                denom_score += log(1-w_score) - log(w_score)
        denom = exp(denom_prior + denom_score)
        denom += 1
        prob_spam = float(1.0)/denom
        print 'spam_probability:', prob_spam

        return prob_spam
        # return 1 if prob_spam > 0.5 else 0

    def get_wordlist(self, tweet):
        # 文が10文字未満だとmecabがバグるので飛ばす
        if len(tweet.decode('utf-8')) < 10: return True
        wordlist = MecabTokenize.tokenize(tweet)
        if wordlist is None: return True
        else: return wordlist

    def get_ans(self,ans,y,res):
        if y==1 and res==1: # 真陽性
            ans[0] += 1
        elif y==1 and res==0: # 偽陰性
            ans[1] += 1
        elif y==0 and res==1: # 偽陽性
            ans[2] += 1
        else: # 真陰性
            ans[3] += 1

        return ans

    def predict(self):
        """ documentのカテゴリ所属確率を求め、所属するカテゴリを判定 
        p(category|document)
        """
        # 精度確認のための箱
        ans = [0.0, 0.0, 0.0, 0.0]

        prior_spam, prior_ham = self.calc_cat_prob() # p(spam), p(ham)
        log_prior_spam = log(prior_spam)
        log_prior_ham = log(prior_ham)
        wordlists = self.con.execute("select wordlist from tweet_master")
        true_labels = self.con.execute("select label from tweet_master")
        res = []
        while 1:
            tmp = wordlists.fetchone()
            if tmp == None: break
            wordlist = pickle.loads( tmp[0] )
            m = len(wordlist) - 1
            psm = m*log_prior_spam
            phm = m*log_prior_ham
            denom_prior = phm - psm
            denom_score = 0.0
            for word in wordlist:
                w_score = self.con.execute("select spam_score from words_score where word='%s'" % (word)).fetchone()
                if w_score is None: w_score = self.init_pai
                else: w_score = w_score[0]
                if abs(w_score-0.5) > self.threshold:
                    denom_score += log(1-w_score) - log(w_score)
            denom = exp(denom_prior + denom_score)
            denom += 1
            prob_spam = float(1.0)/denom
            print 'spam_probability:', prob_spam

            label = 1 if prob_spam > 0.5 else 0
            res.append(label)
            ans = self.get_ans(ans, true_labels.fetchone()[0], label)
        print ans
        print res

    def calc_word_prob(self):
        """ カテゴリ中の単語のスコア(確率)を求める 
        p(word_i|category)
        """
        # 計算にはラプラススムージングを使用する
        wordlist = self.con.execute("select word from words_score")
        while 1:
            word = wordlist.fetchone()
            if word == None: break
            word = word[0]
            w_cnt_spam, w_cnt_ham = self.cnt_word_of_cat(word)
            spam_prob = float(w_cnt_spam+1)/self.spam_denominator # ラプラススムージングのために1をプラス
            ham_prob = min(1, self.ham_weight*float(w_cnt_ham+1)/self.ham_denominator)
            spam_score = spam_prob/(spam_prob+ham_prob)
            self.update_word_score(word, spam_score)
        self.dbcommit()

    def calc_denominator(self):
        """ カテゴリ中の単語のスコア(確率)を求めるための計算用の分母を求めておく 
        """
        # 計算にはラプラススムージングを使用する
        uniq_cnt_spam, uniq_cnt_ham = self.cnt_uniq_word_of_cat()
        total_cnt_spam, total_cnt_ham = self.cnt_total_word_of_cat()
        self.spam_denominator = total_cnt_spam + uniq_cnt_spam # ラプラススムージングのためにユニークの数をカウント
        self.ham_denominator = total_cnt_ham + uniq_cnt_ham

    def cnt_word_of_cat(self,word):
        """ 各カテゴリ中の特定の単語数をカウント 
        T(cat,word_i)
        """
        w_cnt_spam = self.con.execute("select count(*) from spam_words where word ='%s'" % (word)).fetchone()[0]
        w_cnt_ham = self.con.execute("select count(*) from ham_words where word ='%s'" % (word)).fetchone()[0]
        if w_cnt_spam is None: w_cnt_spam = 0
        if w_cnt_ham is None: w_cnt_ham = 0
        return w_cnt_spam, w_cnt_ham

    def cnt_uniq_word_of_cat(self):
        """ 各カテゴリ中の全単語数をカウント
        p(word_i|cat)の分母の|V|
        """
        uniq_cnt_spam = self.con.execute("select count(distinct word) from spam_words").fetchone()[0]
        uniq_cnt_ham = self.con.execute("select count(distinct word) from ham_words").fetchone()[0]
        return uniq_cnt_spam, uniq_cnt_ham

    def cnt_total_word_of_cat(self):
        """ 各カテゴリ中の全単語の出現回数の総和
        ΣT(cat, word')
        """
        total_cnt_spam = self.con.execute("select count(*) from spam_words").fetchone()[0]
        total_cnt_ham = self.con.execute("select count(*) from ham_words").fetchone()[0]
        return total_cnt_spam, total_cnt_ham

    def calc_cat_prob(self):
        """ p(categories)の算出 """
        cnt_spam_tweet = self.con.execute("select count(*) from tweet_master where label=1").fetchone()[0]
        cnt_total_tweet = self.con.execute("select count(*) from tweet_master").fetchone()[0]

        cat_prob_spam = float(cnt_spam_tweet)/cnt_total_tweet
        return cat_prob_spam, 1.0-cat_prob_spam

    def addtoindex_tweet(self, tweet, wordlist, label, dtime):
        """ tweetを格納する """
#        if self.isindexed(tweet): return
        print 'Indexing: ' + tweet

        # tweet毎に単語リストをDBに格納
        self.con.execute( "insert into tweet_master values(?,?,?,?)", \
                            (tweet, pickle.dumps(wordlist), label, dtime) )
        self.dbcommit()

    def addtoindex_class(self, wordlist, class_table_name):
        """class毎にwordsを格納する"""
        # get tweet_id
        tweet_id = self.con.execute("select max(rowid) from tweet_master").fetchone()[0]

        # tweet_id毎に単語リストをDBに格納
        for word in wordlist:
            self.con.execute( "insert into %s values(?,?)" % (class_table_name), (tweet_id, word) )
        self.dbcommit()

    def addtoindex_score(self,wordlist):
        """score tableに単語を保存"""
        # 語リストをDBに格納
        for word in wordlist:
            if self.isindexed(word): continue
            else: 
                self.con.execute( "insert into words_score values(?,?)", (word, self.init_pai) ) # scoreには仮の値を入れる
        self.dbcommit()

    def addtoindex_classified_table(self, tweet, wordlist, spam_score, label, dtime):
        """ labelのついていないtweetを分類し格納する """
#        if self.isindexed(tweet): return
        print 'Classifying: ' + tweet

        # tweet毎に単語リストをDBに格納
        self.clsfdb_con.execute( "insert into tweet_master values(?,?,?,?,?)", \
                            (tweet, pickle.dumps(wordlist), spam_score, label, dtime) )
        self.clsfdb_con.commit()

    def isindexed(self,word):
        """ tweetが既にインデックスされていたらtureを返す """
        u=self.con.execute \
            ("select word from words_score where word='%s'" % (word)).fetchone()
        if u!=None: return True
        return False

    def update_word_score(self,word, spam_score):
        """単語ごとの各カテゴリへの所属確率を求める"""
        self.con.execute("update words_score set spam_score=%f where word='%s'" % \
                            (spam_score, word))

    def createindextables(self):
        """ データベースのテーブルを作る """
        tnlist = ['tweet_master' ,'spam_words', 'ham_words', 'words_score']

        for table_name in tnlist:        
            sql="SELECT name FROM sqlite_master WHERE type='table' AND name='MYTABLE';" \
                    .replace('MYTABLE', table_name)
            res = self.con.execute(sql).fetchone()
            if res is not None: # tableの存在確認
                self.con.execute('drop table %s' % (table_name))

        self.con.execute('create table tweet_master(tweet, wordlist, label, create_time)') # spamは1, hamは0
        self.con.execute('create table spam_words(tweet_id, word)')
        self.con.execute('create table ham_words(tweet_id, word)')
        self.con.execute('create table words_score(word, spam_score)')

        self.con.execute('create index tweetidx on tweet_master(tweet)')
        self.con.execute('create index spamidx on spam_words(word)')
        self.con.execute('create index hamidx on ham_words(word)')
        self.con.execute('create index scoreidx on words_score(word)')

        self.dbcommit()        

    def create_classified_indextables(self):
        """ データベースのテーブルを作る """
        table_name = 'tweet_master'

        sql="SELECT name FROM sqlite_master WHERE type='table' AND name='MYTABLE';" \
                .replace('MYTABLE', table_name)
        res = self.clsfdb_con.execute(sql).fetchone()
        if res is not None: # tableの存在確認
            self.clsfdb_con.execute('drop table %s' % (table_name))

        self.clsfdb_con.execute('create table tweet_master(tweet, wordlist, spam_score, label, create_time)') # spamは1, hamは0        
        self.clsfdb_con.execute('create index tweetidx on tweet_master(tweet)')
        self.clsfdb_con.commit()

if __name__=='__main__':
    trfname = 'training data file name'
    dbname = 'asumama_bf.db'
    bf = BF(trfname, dbname, use=0)
    bf.train()

    tefname = 'test data file name'
    dbname = 'asumama_bf.db'
    bf = BF(tefname, dbname, use=1)
    bf.test(tefname)

    clfname = 'classify data filename'
    trained_dbname = 'asumama_bf.db'
    classify_dbname = 'asumama_bf_classify.db'
    bf = BF(clfname, trained_dbname, use=2)
    bf.classify(clfname, classify_dbname)

実験結果

学習に使ったデータは1,000件(ラベル付けがしんどかったので1,000件です)。
テストデータは1,200件。
accuracyは96%、recall(HAMの検出率)は99%でした。
同じようなスパムツイートが多かったのと、対象と関係の無いツイートには「ケーキが~」「車で買い物~」とかが多く、対象のツイートは「愛菜ちゃんが~」「~面白い」「三浦翔平~」とか、割りと単純に分類が出来る感じのデータになっていたためこのような結果にたまたまなったのだと思います。

まとめ

小学生みたいなコメントですけど、twitterデータいじるの面白いですね。
前処理はめんどくさいですけど。。。
今後は時間があったら時系列を使った何かをやってみたいと思います。観光情報を追うとか。
(他にもまとめたいネタが1つあるので転職先の勤務が開始されるまでの間に何とかまとめたい・・・)

お手数ですが間違いありましたら、ご指摘いただけますと助かります。

Why do not you register as a user and use Qiita more conveniently?
  1. We will deliver articles that match you
    By following users and tags, you can catch up information on technical fields that you are interested in as a whole
  2. you can read useful information later efficiently
    By "stocking" the articles you like, you can search right away
Comments
Sign up for free and join this conversation.
If you already have a Qiita account
Why do not you register as a user and use Qiita more conveniently?
You need to log in to use this function. Qiita can be used more conveniently after logging in.
You seem to be reading articles frequently this month. Qiita can be used more conveniently after logging in.
  1. We will deliver articles that match you
    By following users and tags, you can catch up information on technical fields that you are interested in as a whole
  2. you can read useful information later efficiently
    By "stocking" the articles you like, you can search right away