LoginSignup
15
7

More than 3 years have passed since last update.

BERTで日本人の姓と名の分離を行う

Posted at

DBによっては姓と名がくっついて保存されており、それを機械的に分離したいという要望もあるかもしれません。
名字の完全なリストがあってもこれを行うのは意外と難しいです。
今何かと話題のBERTですが、人名を学習させることで姓と名の分離を高精度にできたので紹介します。

結果

ソースコードはかなり長いので結果から示します。
検証データ1,200件に対し99.0%の精度で分離できました。検証データの中身と予測(姓のみ)の一部は以下のような感じです。

last_name first_name full_name pred
89 形部 麻侑 形部麻侑 形部
1114 熊添 規男 熊添規男 熊添
1068 木本 奏帆 木本奏帆 木本
55 家鋪 宏希 家鋪宏希 家鋪
44 祥大 基祥大

失敗した12件は以下です。人間でも佐分利勝さんは佐分 利勝に分けてしまいそうです。

last_name first_name full_name pred
11 佐分利 佐分利勝 佐分
341 華寿美 筆華寿美 筆華
345 真藤 真一 真藤真一 真藤真
430 加奈枝 栗加奈枝 栗加
587 圭亮 二奈 圭亮二奈
785 番匠 番匠好
786 和佳奈 豊和佳奈 豊和
995 瀬利 瀬利由
1061 実姫 曽実姫 曽実
1062 木ノ実 甲木ノ実 甲木
1155 穂高 夏穂 穂高夏穂 穂高穂
1190 極並 極並夢

ちなみに、janome(pythonのみで完結する形態素解析ツールで、mecabと同等の性能)でプリセット辞書(ipadic)のみを使って以下のようにシンプルに姓名分離した場合、精度は34.5%でした。

def extract_last_name(sentence):
    for token in tokenizer.tokenize(sentence):
        if '姓' in token.part_of_speech:
            return token.surface

df['pred'] = df['full_name'].apply(lambda full_name: extract_last_name(full_name))

名字辞書を名字データベースから取得して以下のようにjanomeに食べさせたうえで解析させると精度が向上し、79.7%になりました。

tokenizer2 = Tokenizer('last_name_dic.csv', udic_enc="utf8")
def extract_last_name2(sentence):
    token_arr = [token for token in tokenizer2.tokenize(sentence)]
    if '姓' in token_arr[0].part_of_speech:
        return token_arr[0].surface

df['pred2'] = df['full_name'].apply(lambda full_name: extract_last_nam2(full_name))

多分名前リストも加えることでさらに精度が上昇すると思われますが、結構入手・加工するのが大変そうだったので時間があれば検証したいと思います。(やらなそう)

ソースコード

まず必要なものをインポートします。

import pandas as pd
import numpy as np
from transformers import BertConfig, BertTokenizer, BertJapaneseTokenizer, BertForTokenClassification
from keras.preprocessing.sequence import pad_sequences
import torch
import MeCab
import math

プリセットとして、普通はbert-base-japanese-whole-word-maskingを使う例が多いと思いますが、tokenizeの時点で変な分割をされると困るので今回は1文字ずつ分割するcharの方を使用します。

tokenizer = BertJapaneseTokenizer.from_pretrained('cl-tohoku/bert-base-japanese-char-whole-word-masking')

教師データの取得元はすごい名前生成器を使用しました。名前の珍しさとかも数値化されていて眺めているだけでも面白いサイトです。
今回は、名前数48,000で、姓の種類は約22,000となりました。マイナーな姓も含め結構広く分布しています。
csvの形式はfull_name, last_name, first_nameのみです。
まず以下のコードで1文字ずつtokenizeしていきます。

df = pd.read_csv('name_list.csv')
text1s = list(df.full_name.values)
targets = list(df.last_name.values)
text1_tokenize = [tokenizer.encode(s) for s in text1s]
target_tokenize = [[tokenizer.encode(vv)[1:-1] for vv in v]  for v in targets]

流れとしては、full_nameを1文字ずつ分割した後で、それぞれの文字について姓であれば1, 名であれば0となる確率をそれぞれ出していきます。
正解データをBERTが理解できるようにするために、例えば田中太郎 -> ['田', '中', '太', '郎'] -> [1, 1, 0, 0]となる配列を作っていきます。
attention_masksは対象配列を単純に1に置き換えたものです(不要かも)

def make_tags_arr(x, token):
    start_indexes = arr_indexes(x, token)
    max_len = len(x)
    token_len = len(token)
    arr = [0] * max_len
    for i in start_indexes:
        arr[i:i+token_len] = [1] * token_len
    return arr

tags_ids = []
for i in range(len(text1_tokenize)):
    text1 = text1_tokenize[i]
    targets = target_tokenize[i]

    tmp = [0] * len(text1)
    for t in targets:
        # [0,0,1,1,0,0...]タグ配列を作る
        arr = make_tags_arr(text1, t)
        tmp = [min(x + y, 1) for (x, y) in zip(tmp, arr)]
    tags_ids.append(tmp)

attention_masks = [[float(i > 0) for i in ii] for ii in text1_tokenize]

BERTは全部同じトークン配列の長さでないといけないので、padding処理をします。その後、データセットを分けます。

MAX_LEN = 32
input_ids = pad_sequences(text1_tokenize, maxlen=MAX_LEN, dtype="long", truncating="pre", padding="pre")
tags_ids = pad_sequences(tags_ids, maxlen=MAX_LEN, dtype="long", truncating="pre", padding="pre")
attention_masks = pad_sequences(attention_masks, maxlen=MAX_LEN, dtype="long", truncating="pre", padding="pre")

from sklearn.model_selection import train_test_split
RAN_SEED = 2020
train_inputs, validation_inputs, train_labels, validation_labels = train_test_split(input_ids, tags_ids, random_state=RAN_SEED, test_size=0.1)
train_masks, validation_masks = train_test_split(attention_masks, random_state=RAN_SEED, test_size=0.2)

train_inputs = torch.LongTensor(train_inputs)
validation_inputs = torch.LongTensor(validation_inputs)
train_labels = torch.LongTensor(train_labels)
validation_labels = torch.LongTensor(validation_labels)
train_masks = torch.LongTensor(train_masks)
validation_masks = torch.LongTensor(validation_masks)

GPUorCPUを使用し、データセットをロードします。
その後、事前訓練されたモデルを読み込みます。

if torch.cuda.is_available():    
    # Tell PyTorch to use the GPU.    
    device = torch.device("cuda")
    print('There are %d GPU(s) available.' % torch.cuda.device_count())
    print('We will use the GPU:', torch.cuda.get_device_name(0))
# If not...
else:
    print('No GPU available, using the CPU instead.')
    device = torch.device("cpu")

from torch.utils.data import TensorDataset, DataLoader, RandomSampler, SequentialSampler
batch_size = 32

# Create the DataLoader for our training set.
train_data = TensorDataset(train_inputs, train_masks, train_labels)
train_sampler = RandomSampler(train_data)
train_dataloader = DataLoader(train_data, sampler=train_sampler, batch_size=batch_size)
# Create the DataLoader for our validation set.
validation_data = TensorDataset(validation_inputs, validation_masks, validation_labels)
validation_sampler = SequentialSampler(validation_data)
validation_dataloader = DataLoader(validation_data, sampler=validation_sampler, batch_size=batch_size)

from transformers import AdamW, BertConfig
model_token_cls = BertForTokenClassification.from_pretrained('cl-tohoku/bert-base-japanese-char-whole-word-masking', num_labels=2)
model_token_cls.cuda()

モデルの概要を表示します。処理に関係ないので飛ばしても可。

# Get all of the model's parameters as a list of tuples.
params = list(model_token_cls.named_parameters())
print('The BERT model has {:} different named parameters.\n'.format(len(params)))
print('==== Embedding Layer ====\n')
for p in params[0:5]:
    print("{:<55} {:>12}".format(p[0], str(tuple(p[1].size()))))
print('\n==== First Transformer ====\n')
for p in params[5:21]:
    print("{:<55} {:>12}".format(p[0], str(tuple(p[1].size()))))
print('\n==== Output Layer ====\n')
for p in params[-4:]:
    print("{:<55} {:>12}".format(p[0], str(tuple(p[1].size()))))

accuracyの定義をします。検証データの表示にのみ使用します。ここではF1値としました。
それぞれの字について、姓or名を判定して高ければ1に近づきます。

import datetime
def flat_accuracy(pred_masks, labels, input_masks):
    tp = ((pred_masks == 1) * (labels == 1)).sum().item()
    fp = ((pred_masks == 1) * (labels == 0)).sum().item()
    fn = ((pred_masks == 0) * (labels == 1)).sum().item()
    tn = ((pred_masks == 0) * (labels == 0)).sum().item()
    precision = tp/(tp+fp)
    recall = tp/(tp+fn)
    f1 = 2*precision*recall/(precision+recall)

    return f1

def format_time(elapsed):
    # Round to the nearest second.
    elapsed_rounded = int(round((elapsed)))

    # Format as hh:mm:ss
    return str(datetime.timedelta(seconds=elapsed_rounded))

訓練のメイン部分です。

from torch.optim import Adam
from transformers import get_linear_schedule_with_warmup

param_optimizer = list(model_token_cls.named_parameters())
no_decay = ["bias", "gamma", "beta"]
optimizer_grouped_parameters = [
  {'params' : [p for n, p in param_optimizer if not any (nd in n for nd in no_decay)],
  'weight_decay_rate' : 0.01},
  {'params' : [p for n, p in param_optimizer if any(nd in n for nd in no_decay)],
  'weight_decay_rate' : 0.0}
]
optimizer = Adam(optimizer_grouped_parameters, lr=3e-5)

epochs = 3
max_grad_norm = 1.0
total_steps = len(train_dataloader) * epochs
scheduler = get_linear_schedule_with_warmup(optimizer, 
                                            num_warmup_steps = 0,
                                            num_training_steps = total_steps)

# トレーニング
for epoch_i in range(epochs):
    # TRAIN loop
    model_token_cls.train()
    train_loss = 0
    nb_train_examples, nb_train_steps = 0, 0
    t0 = time.time()

    print('======== Epoch {:} / {:} ========'.format(epoch_i + 1, epochs))
    print('Training...')

    for step, batch in enumerate(train_dataloader):

        if step % 40 == 0 and not step == 0:
            elapsed = format_time(time.time() - t0)
            print('  Batch {:>5,}  of  {:>5,}.    Elapsed: {:}.'.format(step, len(train_dataloader), elapsed))

        b_input_ids = batch[0].to(device)
        b_input_mask = batch[1].to(device)
        b_labels = batch[2].to(device)
        # forward pass
        loss = model_token_cls(b_input_ids, token_type_ids = None, attention_mask = b_input_mask, labels = b_labels)
        # backward pass
        loss[0].backward()
        # track train loss
        train_loss += loss[0].item()
        nb_train_examples += b_input_ids.size(0)
        nb_train_steps += 1
        # gradient clipping
        torch.nn.utils.clip_grad_norm_(parameters = model_token_cls.parameters(), max_norm = max_grad_norm)
        # update parameters
        optimizer.step()
        scheduler.step()
        model_token_cls.zero_grad()

    # Calculate the average loss over the training data.
    avg_train_loss = train_loss / len(train_dataloader)

    print("")
    print("  Average training loss: {0:.2f}".format(avg_train_loss))
    print("  Training epcoh took: {:}".format(format_time(time.time() - t0)))

    # ========================================
    #               Validation
    # ========================================
    print("")
    print("Running Validation...")
    t0 = time.time()
    model_token_cls.eval()
    eval_loss, eval_accuracy = 0, 0
    nb_eval_steps, nb_eval_examples = 0, 0
    for batch in validation_dataloader:
        batch = tuple(t.to(device) for t in batch)

        b_input_ids, b_input_mask, b_labels = batch

        with torch.no_grad():        
            outputs = model_token_cls(b_input_ids, token_type_ids = None, attention_mask = b_input_mask, labels = b_labels)

        result = outputs[1].to('cpu')

        labels = b_labels.to('cpu')
        input_mask = b_input_mask.to('cpu')

        # Mask predicted label
        pred_masks = torch.min(torch.argmax(result, dim=2), input_mask)

        # Calculate the accuracy for this batch of test sentences.
        tmp_eval_accuracy = flat_accuracy(pred_masks, labels, input_mask)

        # Accumulate the total accuracy.
        eval_accuracy += tmp_eval_accuracy
        # Track the number of batches
        nb_eval_steps += 1

    # Report the final accuracy for this validation run.
    print("  Accuracy: {0:.2f}".format(eval_accuracy/nb_eval_steps))
    print("  Validation took: {:}".format(format_time(time.time() - t0)))

print("Train loss: {}".format(train_loss / nb_train_steps))

訓練中…10分くらいで終わりました。

ここで訓練したモデルをいったん保存しておきます。

pd.to_pickle(model_token_cls, '姓名分離モデル.pkl')

改めて別に用意しておいた検証用データをもとに実際にkeywordsに姓と判定された文字を入れていきます。
なお、上のtokenizerで定義された文字は4,000文字であり、そのリストに含まれない文字に関しては[UNK]となります。異体字等を含めて全部tokenizerに覚えさせるのは大変そうなので、そのような場合に関しては特別な処理を入れることにしました。

df = pd.read_csv('name_list_valid.csv')
keywords = []
MAX_LEN = 32
alls = list(df.full_name)
batch_size = 100

for i in range(math.ceil(len(alls)/batch_size)):
    print(i)
    s2 = list(df.full_name[i*batch_size:(i+1)*batch_size])
    d = torch.LongTensor(pad_sequences([tokenizer.encode(s) for s in s2], maxlen=MAX_LEN, dtype="long", truncating="pre", padding="pre")).cuda()
    attention_mask = (d > 0) * 1
    output = model_token_cls(d, token_type_ids = None, attention_mask = attention_mask)
    result = output[0].to('cpu')
    pred_masks = torch.min(torch.argmax(result, dim=2), attention_mask.to('cpu'))
    d = d.to('cpu')

    pred_mask_squeeze = pred_masks.nonzero().squeeze()
    b = d[pred_mask_squeeze.T.numpy()]
    pred_mask_squeeze[:,1]=b
    for j in range(len(s2)):
        tmp = pred_mask_squeeze[pred_mask_squeeze[:,0] == j]
        s = tokenizer.convert_ids_to_tokens(tmp[:,1])
        # 復元結果にunknownが含まれる場合は結果の文字数分だけ最初から取得する
        if '[UNK]' in s:
            s = s2[j][0:len(s)]

        keywords.append(''.join(s))

まだ、名字と名前に同じ漢字が含まれると判定が変だったりしますが、名字は連続しているとかの条件を後から加えてあげるとさらに良くなりそうです。
実際に苗字や名前のリストを作らなくても、フルネームをある程度BERTに入れてあげることでうまく学習してくれることがわかりました。
今回はやってることが大分しょぼいのですが、教師データを適切に作ることで文章中のキーワード抽出ロジック等への応用も同じ実装でいけます。

15
7
1

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
15
7