Edited at

DBNとLabelSpreadingを使った半教師ありラベル学習

More than 3 years have passed since last update.

教師あり学習で文書分類をするためのラベル付け作業に疲れた方がいらっしゃったので、少ないラベルで分類が可能なように半教師あり学習でテキスト分類を行うものを作ってみました。


参考資料


やったこと


  • DBNで特徴量抽出

  • LabelSpreadingで半教師あり学習によるラベル付け


手順


DBNで特徴抽出


  • グリッドサーチで適当に層の深さ、学習係数、各層のユニット数を決定(Top層のユニット数は圧縮後の次元数)

  • DBNでpre-trainingを行いweightとバイアスを学習

  • Top層の出力を次元圧縮後の特徴量として分類器に投入する


LabelSpreadingで半教師あり学習


  • 各カテゴリの教師ラベルの上限数を決める

  • 各カテゴリのデータをランダムに選択し、上限数までラベルを付与

  • 教師ラベルの上限数に達した場合、カテゴリ内の残りのデータはラベルなしデータとして扱う

  • 学習パラメタの設定(今回はkNNを使用したので、ラベル伝搬させるご近所さんの数)


利点


DBNで特徴抽出する利点


  • SVDと違い、非線形な特徴が抽出可能


LabelSpreadingの利点


  • 半教師あり学習全般の利点になってしまうのだが、人間の労力を最小限に抑えて比較的精度の良いラベル付けが可能


参考コード

かなり汚いコードで申し訳有りませんが、以下貼り付けます。

DBNのコードはDeep Learning Tutorialの丸コピです。Deep Learning Tutorialのサイトには数式付きで解説がありますので、詳しい解説はそちらに譲りたいと思います。


DBNのコード

BoW形式のCSVデータを読み込んで特徴抽出したあとに、CSVを吐き出す感じにしています。

使用していないメソッドをちょいちょい含んでいますので、ご注意ください。


DBN.py

# coding:utf-8


from __future__ import unicode_literals
import time

import numpy as np
import theano
import theano.tensor as T
from theano.tensor.shared_randomstreams import RandomStreams

class DBN:
def __init__(self, numpy_rng, theano_rng=None, n_ins=784,
hidden_layers_sizes=[500, 500], n_outs=10):
self.sigmoid_layers = []
self.rbm_layers = []
self.params = []
self.n_layers = len(hidden_layers_sizes)

assert self.n_layers > 0

if not theano_rng:
theano_rng = RandomStreams(numpy_rng.randint(2**30))

# allocate symbolic variables for the data
self.x = T.matrix('x')
self.y = T.ivector('y')

for i in xrange(self.n_layers):
if i==0:
input_size = n_ins
layer_input = self.x
else:
input_size = hidden_layers_sizes[i - 1]
layer_input = self.sigmoid_layers[-1].output

sigmoid_layer = HiddenLayer(rng=numpy_rng,
input=layer_input,
n_in=input_size,
n_out=hidden_layers_sizes[i],
activation=T.nnet.sigmoid)
self.sigmoid_layers.append( sigmoid_layer )
self.params.extend(sigmoid_layer.params)

rbm_layer = RBM(numpy_rng=numpy_rng,
theano_rng=theano_rng,
input=layer_input,
n_visible=input_size,
n_hidden=hidden_layers_sizes[i],
W=sigmoid_layer.W,
hbias=sigmoid_layer.b)
self.rbm_layers.append(rbm_layer)

self.logLayer = LogisticRegression(
input=self.sigmoid_layers[-1].output,
n_in=hidden_layers_sizes[-1],
n_out=n_outs)
self.params.extend(self.logLayer.params)
self.finetune_cost = self.logLayer.negative_log_likelihood(self.y)
self.errors = self.logLayer.errors(self.y)

def pretrainig_functions(self, train_set_x, batch_size, k):
index = T.lscalar('index')
learning_rate = T.scalar('lr') # learning rate to use
# number of batches
n_batches = train_set_x.get_value(borrow=True).shape[0] / batch_size
# begining of a batch, given `index`
batch_begin = index * batch_size
# ending of a batch given `index`
batch_end = batch_begin + batch_size

pretrain_fns = []
for rbm in self.rbm_layers:

cost, updates = rbm.get_cost_updates(learning_rate,
persistent=None, k=k)

# compile the theano function
fn = theano.function(
inputs=[index, theano.Param(learning_rate, default=0.1)],
outputs=cost,
updates=updates,
givens={
self.x: train_set_x[batch_begin:batch_end]
}
)
# append `fn` to the list of functions
pretrain_fns.append(fn)

return pretrain_fns

def build_finetune_functions(self, datasets, batch_size, learning_rate):
(train_set_x, train_set_y) = datasets[0]
(valid_set_x, valid_set_y) = datasets[1]
(test_set_x, test_set_y) = datasets[2]

# compute number of minibatches for training, validation and testing
n_valid_batches = valid_set_x.get_value(borrow=True).shape[0]
n_valid_batches /= batch_size
n_test_batches = test_set_x.get_value(borrow=True).shape[0]
n_test_batches /= batch_size

index = T.lscalar('index') # index to a [mini]batch

# compute the gradients with respect to the model parameters
gparams = T.grad(self.finetune_cost, self.params)

# compute list of fine-tuning updates
updates = []
for param, gparam in zip(self.params, gparams):
updates.append((param, param - gparam * learning_rate))

train_fn = theano.function(
inputs=[index],
outputs=self.finetune_cost,
updates=updates,
givens={
self.x: train_set_x[
index * batch_size: (index + 1) * batch_size
],
self.y: train_set_y[
index * batch_size: (index + 1) * batch_size
]
}
)

test_score_i = theano.function(
[index],
self.errors,
givens={
self.x: test_set_x[
index * batch_size: (index + 1) * batch_size
],
self.y: test_set_y[
index * batch_size: (index + 1) * batch_size
]
}
)

valid_score_i = theano.function(
[index],
self.errors,
givens={
self.x: valid_set_x[
index * batch_size: (index + 1) * batch_size
],
self.y: valid_set_y[
index * batch_size: (index + 1) * batch_size
]
}
)

# Create a function that scans the entire validation set
def valid_score():
return [valid_score_i(i) for i in xrange(n_valid_batches)]

# Create a function that scans the entire test set
def test_score():
return [test_score_i(i) for i in xrange(n_test_batches)]

return train_fn, valid_score, test_score

class HiddenLayer:
def __init__(self, rng, input, n_in, n_out, W=None, b=None,
activation=T.tanh):
self.input = input

if W is None:
W_values = np.asarray(
rng.uniform(
low=-np.sqrt(6. / (n_in+n_out)),
high=np.sqrt(6. / (n_in+n_out)),
size=(n_in, n_out)
),
dtype=theano.config.floatX
)
if activation == theano.tensor.nnet.sigmoid:
W_values *=4
W = theano.shared(value=W_values, name='W', borrow=True)

if b is None:
b_values = np.zeros((n_out,), dtype=theano.config.floatX)
b = theano.shared(value=b_values, name='b', borrow=True)
self.W = W
self.b = b

lin_output = T.dot(input, self.W) + self.b
self.output = (
lin_output if activation is None
else activation(lin_output)
)
self.params = [self.W, self.b]

class LogisticRegression:
def __init__(self, input, n_in, n_out):
self.W = theano.shared(
value=np.zeros(
(n_in, n_out),
dtype=theano.config.floatX
),
name='W',
borrow=True
)
# initialize the baises b as a vector of n_out 0s
self.b = theano.shared(
value=np.zeros(
(n_out,),
dtype=theano.config.floatX
),
name='b',
borrow=True
)
self.p_y_given_x = T.nnet.softmax(T.dot(input, self.W) + self.b)
self.y_pred = T.argmax(self.p_y_given_x, axis=1)
self.params = [self.W, self.b]

def negative_log_likelihood(self, y):
return -T.mean(T.log(self.p_y_given_x)[T.arange(y.shape[0]), y])

def errors(self, y):
# check if y has same dimension of y_pred
if y.ndim != self.y_pred.ndim:
raise TypeError(
'y should have the same shape as self.y_pred',
('y', y.type, 'y_pred', self.y_pred.type)
)
# check if y is of the correct datatype
if y.dtype.startswith('int'):
return T.mean(T.neq(self.y_pred, y))
else:
raise NotImplementedError()

class RBM(object):
"""Restricted Boltzmann Machine (RBM) """
def __init__(
self,
input=None,
n_visible=784,
n_hidden=500,
W=None,
hbias=None,
vbias=None,
numpy_rng=None,
theano_rng=None
):

self.n_visible = n_visible
self.n_hidden = n_hidden

if numpy_rng is None:
# create a number generator
numpy_rng = np.random.RandomState(1234)

if theano_rng is None:
theano_rng = RandomStreams(numpy_rng.randint(2 ** 30))

if W is None:
initial_W = np.asarray(
numpy_rng.uniform(
low=-4 * np.sqrt(6. / (n_hidden + n_visible)),
high=4 * np.sqrt(6. / (n_hidden + n_visible)),
size=(n_visible, n_hidden)
),
dtype=theano.config.floatX
)
# theano shared variables for weights and biases
W = theano.shared(value=initial_W, name='W', borrow=True)

if hbias is None:
# create shared variable for hidden units bias
hbias = theano.shared(
value=np.zeros(
n_hidden,
dtype=theano.config.floatX
),
name='hbias',
borrow=True
)

if vbias is None:
# create shared variable for visible units bias
vbias = theano.shared(
value=np.zeros(
n_visible,
dtype=theano.config.floatX
),
name='vbias',
borrow=True
)

# initialize input layer for standalone RBM or layer0 of DBN
self.input = input
if not input:
self.input = T.matrix('input')

self.W = W
self.hbias = hbias
self.vbias = vbias
self.theano_rng = theano_rng
self.params = [self.W, self.hbias, self.vbias]

def propup(self, vis):
pre_sigmoid_activation = T.dot(vis, self.W) + self.hbias
return [pre_sigmoid_activation, T.nnet.sigmoid(pre_sigmoid_activation)]

def sample_h_given_v(self, v0_sample):
''' This function infers state of hidden units given visible units '''
pre_sigmoid_h1, h1_mean = self.propup(v0_sample)
h1_sample = self.theano_rng.binomial(size=h1_mean.shape,
n=1, p=h1_mean,
dtype=theano.config.floatX)
return [pre_sigmoid_h1, h1_mean, h1_sample]

def propdown(self, hid):
pre_sigmoid_activation = T.dot(hid, self.W.T) + self.vbias
return [pre_sigmoid_activation, T.nnet.sigmoid(pre_sigmoid_activation)]

def sample_v_given_h(self, h0_sample):
''' This function infers state of visible units given hidden units '''
# compute the activation of the visible given the hidden sample
pre_sigmoid_v1, v1_mean = self.propdown(h0_sample)
v1_sample = self.theano_rng.binomial(size=v1_mean.shape,
n=1, p=v1_mean,
dtype=theano.config.floatX)
return [pre_sigmoid_v1, v1_mean, v1_sample]

def gibbs_hvh(self, h0_sample):
''' This function implements one step of Gibbs sampling,
starting from the hidden state'''

pre_sigmoid_v1, v1_mean, v1_sample = self.sample_v_given_h(h0_sample)
pre_sigmoid_h1, h1_mean, h1_sample = self.sample_h_given_v(v1_sample)
return [pre_sigmoid_v1, v1_mean, v1_sample,
pre_sigmoid_h1, h1_mean, h1_sample]

def gibbs_vhv(self, v0_sample):
''' This function implements one step of Gibbs sampling,
starting from the visible state'''

pre_sigmoid_h1, h1_mean, h1_sample = self.sample_h_given_v(v0_sample)
pre_sigmoid_v1, v1_mean, v1_sample = self.sample_v_given_h(h1_sample)
return [pre_sigmoid_h1, h1_mean, h1_sample,
pre_sigmoid_v1, v1_mean, v1_sample]

def free_energy(self, v_sample):
''' Function to compute the free energy '''
wx_b = T.dot(v_sample, self.W) + self.hbias
vbias_term = T.dot(v_sample, self.vbias)
hidden_term = T.sum(T.log(1 + T.exp(wx_b)), axis=1)
return -hidden_term - vbias_term

def get_cost_updates(self, lr=0.1, persistent=None, k=1):
# compute positive phase
pre_sigmoid_ph, ph_mean, ph_sample = self.sample_h_given_v(self.input)

if persistent is None:
chain_start = ph_sample
else:
chain_start = persistent
(
[
pre_sigmoid_nvs,
nv_means,
nv_samples,
pre_sigmoid_nhs,
nh_means,
nh_samples
],
updates
) = theano.scan(
self.gibbs_hvh,
outputs_info=[None, None, None, None, None, chain_start],
n_steps=k
)

chain_end = nv_samples[-1]
cost = T.mean(self.free_energy(self.input)) - T.mean(
self.free_energy(chain_end))
# We must not compute the gradient through the gibbs sampling
gparams = T.grad(cost, self.params, consider_constant=[chain_end])

for gparam, param in zip(gparams, self.params):
# make sure that the learning rate is of the right dtype
updates[param] = param - gparam * T.cast(
lr,
dtype=theano.config.floatX
)
if persistent:
# Note that this works only if persistent is a shared variable
updates[persistent] = nh_samples[-1]
# pseudo-likelihood is a better proxy for PCD
monitoring_cost = self.get_pseudo_likelihood_cost(updates)
else:
# reconstruction cross-entropy is a better proxy for CD
monitoring_cost = self.get_reconstruction_cost(updates,
pre_sigmoid_nvs[-1])

return monitoring_cost, updates

def get_pseudo_likelihood_cost(self, updates):
"""Stochastic approximation to the pseudo-likelihood"""

# index of bit i in expression p(x_i | x_{\i})
bit_i_idx = theano.shared(value=0, name='bit_i_idx')

# binarize the input image by rounding to nearest integer
xi = T.round(self.input)

# calculate free energy for the given bit configuration
fe_xi = self.free_energy(xi)

# flip bit x_i of matrix xi and preserve all other bits x_{\i}
# Equivalent to xi[:,bit_i_idx] = 1-xi[:, bit_i_idx], but assigns
# the result to xi_flip, instead of working in place on xi.
xi_flip = T.set_subtensor(xi[:, bit_i_idx], 1 - xi[:, bit_i_idx])

# calculate free energy with bit flipped
fe_xi_flip = self.free_energy(xi_flip)

# equivalent to e^(-FE(x_i)) / (e^(-FE(x_i)) + e^(-FE(x_{\i})))
cost = T.mean(self.n_visible * T.log(T.nnet.sigmoid(fe_xi_flip -
fe_xi)))

# increment bit_i_idx % number as part of updates
updates[bit_i_idx] = (bit_i_idx + 1) % self.n_visible

return cost

def get_reconstruction_cost(self, updates, pre_sigmoid_nv):
cross_entropy = T.mean(
T.sum(
self.input * T.log(T.nnet.sigmoid(pre_sigmoid_nv)) +
(1 - self.input) * T.log(1 - T.nnet.sigmoid(pre_sigmoid_nv)),
axis=1
)
)

return cross_entropy

def output(input_data, w, b):
x = np.dot(input_data,w)+np.kron( np.ones((input_data.shape[0],1)),b)
return 1/(1+np.exp(-x))

if __name__=='__main__':
numpy_rng = np.random.RandomState(123)
print '... building the model'

ifname = 'bow_data.csv'
data = np.loadtxt(ifname, delimiter=',')
train_set_x = theano.shared(np.asarray(data, np.float64))

dbn = DBN(numpy_rng=numpy_rng, n_ins=data.shape[1],
hidden_layers_sizes=[2000, 1000, 100],
n_outs=10)
#########################
# PRETRAINING THE MODEL #
#########################
print '... getting the pretraining functions'
batch_size=10
k = 5
pretraining_fns = dbn.pretrainig_functions(train_set_x=train_set_x,
batch_size=batch_size,
k=k)

print '... pre-training the model'
pretraining_epochs = 100
n_train_batches = 10
pretrain_lr = 0.1
## Pre-train layer-wise
for i in xrange(dbn.n_layers):
# go through pretraining epochs
for epoch in xrange(pretraining_epochs):
# go through the training set
c = []
for batch_index in xrange(n_train_batches):
c.append(pretraining_fns[i](index=batch_index,
lr=pretrain_lr))
print 'Pre-training layer %i, epoch %d, cost ' % (i, epoch),
print np.mean(c)

layer_output =[]
for i in xrange(dbn.n_layers):
w = dbn.rbm_layers[i].W.get_value()
hbias = dbn.rbm_layers[i].hbias.get_value()
if i==0:
layer_output.append( train_set_x.get_value() )
layer_output.append( output(layer_output[-1],w, hbias) )
else:
layer_output.append( output(layer_output[-1],w, hbias) )
print layer_output[-1]
np.savetxt('DBN_features.csv',layer_output[-1], delimiter=',')



LabelSpreadingのコード

original_data.csvの中身は(0¥t1 0 1 0 0 0 0¥txxxx)で1行が構成されています。

何を表しているかというと、左から、

複数クラスにまたがっているデータかどうかのflag、各クラスのフラグ、テキスト

となっています。

下のコードでは学習させる際に、なるべく1つしかラベルが付いていないデータを教師データにするようになっています。

一応irisデータで確認出来るように関数をつけておきましたので、手元に適当なデータがない場合でLabelSpreadingを試したい場合はお使いください。


LabelSpreading.py

# coding: utf-8


from sklearn import datasets
from sklearn.semi_supervised import LabelSpreading
import numpy as np
from numpy.random import seed
seed(555)
from collections import defaultdict

def iris():
iris = datasets.load_iris()
random_unlabeled_points = np.where(np.random.random_integers(0, 1, size=len(iris.target)))
labels = np.copy(iris.target)
labels[random_unlabeled_points] = -1

label_prop_model = LabelSpreading()
label_prop_model.fit(iris.data, labels) # unlabeled as -1
pred_prop = label_prop_model.predict_proba(iris.data)
pred_label = label_prop_model.predict(iris.data)

for pp, pl, label, trgt in zip(pred_prop,pred_label,labels,iris.target):
print pp, pl, label, trgt

def main(X, labels_info, min_number=20, label_num=6, n_neighbors=7, alpha=0.3, typ='knn', threshold=0.5):
target = get_target(labels_info)
random_unlabeled_points = np.where(np.random.random_integers(0, 1, size=len(target)))[0]
cnt_dict = defaultdict(int)
for i, t in enumerate(target):
if len(t)==1 and (i in random_unlabeled_points):
target[i] = -1
cnt_dict[-1] += 1
elif len(t)>=2:
target[i] = -1
cnt_dict[-1] += 1
elif cnt_dict[target[i][0]]<min_number:
target[i] = target[i][0]
cnt_dict[target[i]] += 1
elif cnt_dict[target[i][0]]>=min_number:
target[i] = -1
cnt_dict[target[i]] += 1
print cnt_dict

if typ=='knn':
label_prop_model = LabelSpreading(kernel=typ, n_neighbors=n_neighbors)
else:
label_prop_model = LabelSpreading(kernel=typ, alpha=alpha)
label_prop_model.fit(X, target) # unlabeled as -1
pred_prop = label_prop_model.predict_proba(X)
pred_label = label_prop_model.predict(X)

res_dict = defaultdict(dict) # TP, FP, FN, TNを格納する
for label in ('TP', 'FP', 'FN', 'TN'):
res_dict[label] = defaultdict(int)
label_dict = defaultdict(int)

for pp, pl, labels, trgt in zip(pred_prop,pred_label,get_target(labels_info),target):
# labelは正解ラベル
print pp, np.where(pp>=threshold)[0]+1, labels, trgt
# 予測で出てきたラベル
# softmax化しておく
predicted_labels = np.where(pp/np.sum(pp)>=threshold)[0]+1
# predicted_labels = [int(pl)]
# 正解に含まれていないラベル
F_labels = set([l+1 for l in xrange(label_num)]).difference(label)
# 予測に含まれていないラベル
predicted_F_labels = \
set([l+1 for l in xrange(label_num)]).difference(predicted_labels)

# TPを辞書に格納
print 'TP labels:'
print set(labels).intersection(predicted_labels)
for tp_l in set(labels).intersection(predicted_labels):
res_dict['TP'][tp_l] += 1
# FPを辞書に格納
print 'FP labels:'
print set(predicted_labels).difference(labels)
for fp_l in set(predicted_labels).difference(labels):
res_dict['FP'][fp_l] += 1
# FNを辞書に格納
print 'FN labels'
print set(labels).difference(predicted_labels)
for fn_l in set(labels).difference(predicted_labels):
res_dict['FN'][fn_l] += 1
# TNを辞書に格納
print 'TN labels'
print set(F_labels).intersection(predicted_F_labels)
for tn_l in set(F_labels).intersection(predicted_F_labels):
res_dict['TN'][tn_l] += 1
# 各正解ラベルの数をカウント
for l in labels:
label_dict[l] += 1

for i_label in xrange(label_num):
print "label=",i_label+1
print 'TP:', res_dict['TP'][i_label+1], 'FP:',res_dict['FP'][i_label+1], 'FN:', res_dict['FN'][i_label+1], 'TN:',res_dict['TN'][i_label+1]
print float(res_dict['TP'][i_label+1])/label_dict[i_label+1], float(res_dict['FP'][i_label+1])/label_dict[i_label+1], float(res_dict['FN'][i_label+1])/label_dict[i_label+1], float(res_dict['TN'][i_label+1])/label_dict[i_label+1]
accuracy = float(res_dict['TP'][i_label+1]+res_dict['TN'][i_label+1])/(res_dict['TP'][i_label+1]+res_dict['FP'][i_label+1]+res_dict['FN'][i_label+1]+res_dict['TN'][i_label+1])
precision = float(res_dict['TP'][i_label+1])/(res_dict['TP'][i_label+1]+res_dict['FP'][i_label+1])
recall = float(res_dict['TP'][i_label+1])/(res_dict['TP'][i_label+1]+res_dict['FN'][i_label+1])
f_measure = (2*recall*precision)/(recall+precision)
print 'Accuracy:', accuracy, 'Precision:', precision, 'Recall:', recall, 'F-measure:', f_measure

# 正解データラベルを1からnまでの番号に修正
def get_target(labels_info):
result = []
raw_target = labels_info[:,1:]
for line in raw_target:
result.append( np.where(line==1)[0]+1 )
return result

def get_labels():
pass

def get_labels_info(label_fname):
label_flag = []
label_flag_apd = label_flag.append
labels_info = []
labels_info_apd = labels_info.append
with open(label_fname, 'r') as f:
for line in f:
data = line.strip().split('\t')
label_flag_apd(int(data[0]))
labels_info_apd(
np.array(data[1].strip().split(' '), dtype=np.int32 )
)
return np.hstack( (np.array(label_flag).reshape((len(label_flag), 1)), np.array(labels_info)) )

if __name__=='__main__':
ifname = 'DBN_features.csv'
label_fname = 'original_data.csv'

X =np.loadtxt(ifname, delimiter=',')
labels_info = get_labels_info(label_fname)

## typは{knn,rbf}から選択
main(X, labels_info, 50, label_num=6, n_neighbors=7, alpha=0.2, typ='knn', threshold=0.5)


お手数ですが間違いがありましたらご指摘いただけますと助かります。