Help us understand the problem. What is going on with this article?

汎用性のある手法を用いて構造化データ群から必要なデータのみを取得する方法

More than 3 years have passed since last update.

WHY

構造化データ(HTML, Json, XML)から任意の情報のみ取得したいという需要は異常検知、クロラー、データ分析にも必要になってきます。
人手で職人的にチューニングしたり、教師ありの機械学習で取得する方法もありますが、今回紹介する方法は構造化データの構造に着目しその構造の差を計算して任意の情報のみ取得する方法を紹介します。

この手法により下記の点が実現できます。

  • 汎用性の高い情報抽出が可能
  • 教師データは必要ないので作成コストが少ない
  • 教師データ作成のための前処理にも応用可能

WHAT

具体的な手順について説明します。

  • 取得したい元となる情報源(HTMLやJSONなど)を用意します。
  • そのページと各情報源を比較して距離計算をします。
  • 距離が近いものを取得します。

この手順になります。距離計算に関しては直感的に分かりやすいTree Edit Distanceから記述し、そのあとに近似的な計算手法であるPQ Edit Distanceについて記述します。

大量のデータを扱う場合や全体の構造差に着目した比較を行いたい場合は”PQ Edit Distance”、個別の要素まで比較したい、扱うデータが少量の場合は”Tree Edit Distance”が良いです。

著者の環境では100倍程度の計算時間の違いが出るので大量データ(環境によるので一概にどの程度かは言えない)を扱う場合は”PQ Edit Distance”一択です。

Tree Edit Distance

構造化されているデータは木構造との相性が良いです。
この時にすぐに思いつくのがTree Edit Distanceです。Tree Edit Distanceは木構造間の距離計算に主に用いられる手法です。

構造データを木構造化してその構造の差を測る時に必要になる操作は3つです。

追加

ファイル名

削除

ファイル名

置き換え

ファイル名

まずが単純な配列Sを想定して下記のように考えます。

gammaは置き換えコスト
Hを先頭要素
Tを残りの要素

上から操作は
置き換え:先頭要素同士の置き換えコストと残りの要素も再帰的にコスト計算をして距離の合算値を返す
削除:先頭要素同士の削除コストと残りの要素と比較する配列のコスト計算を再帰的にして距離の合算値を返す
挿入:先頭要素同士の追加コストと元の配列と比較する配列の追加するもの以外のコスト計算を再帰的にして距離の合算値を返す

dが中に入っているので再帰的に行われます。

d(S_1, S_2) = min(\gamma(H(S_1),H(S_2)) + d(T(S_1),T(S_2)), \\
                  \gamma(H(S_1),\vec{0}) + d(T(S_1),S_2), \\
                  \gamma(\vec{0},H(S_2)) + d(S_1,T(S_2)), 
)

次に木で考えます。木の場合はルートと子ノードで考えたら良いのですが子ノードが複数あるとルートを取り除いた時に複数の木ができて上記のような記述ができません。
木を順序付きの木の集合として捉えます。
ルートノードを削除すると最右木の子ノードの森とそれ以外の森に分解できます。

置き換え:最右木の置き換え距離コスト計算と最右木の子ノードの森の距離計算、それ以外の森の距離計算の合算値
削除:最右木の削除距離コスト計算と削除後の最右木の子ノードの森とそれ以外の森と比較対象の森の距離の合算値
追加:最右木の追加距離コスト計算と元の森と追加後の比較対象の最右木の子ノードの森と比較対象のそれ以外の森の距離の合算値

d(F_1, F_2) = min(\gamma(R(F_1),R(F_2)) + d(C(F_1),C(F_2))+ d(L(F_1),L(F_2)), \\
                  \gamma(R(F_1),\vec{0}) + d(L(F_1) + C(F_1),F_2), \\
                  \gamma(\vec{0},R(F_2)) + d(F_1, L(F_2) + C(F_2)),
)

すべてのノードを比較して計算する場合O(n^2)の計算コストがかかります。
動的計画方により距離コストを記憶して計算しても距離計算コストは高くなります。
そこで厳密な距離計算をせずに近似的な手法を用いることが考えられます。

PQ Edit Distance

PQ Edit Distanceは計算コストがO(nlogn)で収束し計算空間もO(n)に抑えられる手法です。
nは木のノードの数です。

PQ Edit Distanceは置き換え、挿入、削除の操作は考慮せずに木構造からプロファイルと呼ばれるものを作成します。
このプロファイルによって木構造が取りうるパターンを網羅し、そのパターンのマッチ度合いを測る手法になります。
プロファイルの作成はPとQのパラメータに依存します。

本質的に重要な部分は2点です。

  • PQプロファイルの作成
  • 距離計算

PQプロファイルの作成のためにpq-gramは木と部分木を特殊な構造で扱います。

通常の木

Screen Shot 2016-11-29 at 3.12.28 PM.png

PQで扱う木(P:2、Q:3)

このPとQの値が木の構造に依存するため、PとQの値を現状では手動でチューニングして最も良いものを選択する形になっています。これを自動で設定することができれば教えて欲しいです。

Screen Shot 2016-11-29 at 3.11.07 PM.png

下記のように根と葉に対して再帰的な処理を行いプロファイルの作成を行います。

定義6:pqグラムの距離を下記のように定義する

\Delta^{p,q}(\vec{T_1}, \vec{T_2}) = 1 - 2\frac{P^{p,q}(\vec{T_1}) \cap P^{p,q}(\vec{T_2})}{P^{p,q}(\vec{T_1}) \cup P^{p,q}(\vec{T_2})}

2倍の理由は最大の一致率が二つの木の和集合の半分になるため

Tree Edit Distanceとの違い

PQの場合はプロファイル間の距離によって違いを把握します。近似的な手法ですがTree Edit Distanceよりも優れている点があります。

Screen Shot 2016-11-28 at 3.41.56 PM.png

図はTree Edit Distanceの場合はT'もT''も編集距離が2になっています。これはT'は"g"と"k"の要素が足りず、T''は"c"と"e"の要素が足りないためTと比べた編集距離が2となっています。しかしT'とT''は構造が大きく異なっています。これの距離が同一として扱われるのはあまりよくありません。
PQの場合はその差が明確に出ています。
ここが個々の差だけを図るTree Edit Distanceとそうでない全体の差を図るPQ Edit Distanceとの違いです。

HOW

上記の定義を用いてアルゴリズムを紹介して実装例を紹介します。

アルゴリズム

下記のような木構造に対して適用することを考えます。

まず二つのシフトレジスターを用意します。

anc:p(親ノード用)
sib:q(子ノード用)

レジスターの処理は古いものを出して、新しいものを入れる処理を行う。

shift((a,b,c),x) return (b,c,x)

ancとsibを合わせてpq-gramを作成する。

これらを元にPQグラムのプロファイルを返す。

ancはステップダウン
sibは左から右へシフト

具体的なアルゴリズムの流れを図に示します。

Screen Shot 2016-11-28 at 12.49.25 PM.png

この処理の対象の木は下記になります。

Screen Shot 2016-11-28 at 1.05.23 PM.png

この処理で行いたいのはプロファイルの作成になります。
プロファイルの処理は再帰的に記述可能なので下記のようになります。

pg-GRAM-PROFILE(T, p, q)
    P: empty relation with schema(labels)
    anc: shift register of size p (filled with *)
    P = PROFILE(T, p, q, P, root(T), anc)
    return P

最初にプロファイルを初期化しておきます。
ancを"*"で埋めます。
今回プロファイルを作成したい木と設定したpとqの値、プロファイル、木のルートノード、ancをPROFILE関数に渡します。

PROFILE(T, p, q, P, r, anc)
    anc:= shift(anc, l(r))
    sib: shift register of size q (filled with *)
    if r is a leaf then
        P := P U (anc ○ sib)
    else
        for each child c (from left to right) of r do
            sib := shift(sib, l(c))
            P := P U (anc ○ sib)
            P := PROFILE(T, p, q, P, c, anc)
        for k := 1 to q-1
            sib := shift(sib, *)
            P := P U (anc ○ sib)
    return P

ancにrのラベルを挿入します。(rは最初はルートのノード、ノードは再帰的な処理のため変わっていく)
sibを初期化します。
rが葉の場合はancとsibをコンカチしたプロファイルの和集合を作成
そうでない場合はノードの子ノードごとに処理をしていく。
最初の処理の場合はaを根のノードとして子ノードごと(a, b, c)に処理をしていく

Screen Shot 2016-11-28 at 1.34.19 PM.png

最初のancは*, aで固定
2番目をsibに挿入するおt*, *, aとなる。
ここで着目している部分は下記の部分になります。
ancとsibをコンカチして*,a, *, *,aを作成してPROFILEに追加して和集合を作成します。

ファイル名

再帰的な処理のため、子ノードがある場合は下記の部分に着目して処理をしていきます。

Screen Shot 2016-11-28 at 2.41.11 PM.png

ancをa, aとしてeに対して処理をすると

anc:a, a
sib:*, *, e

Screen Shot 2016-11-28 at 2.42.24 PM.png

ancをa, eとして処理をすると

anc:a, e
sib:*, *, *

最終的に葉に達するので今まで和集合を加える。
これを再帰的に繰り返してPQのプロファイルを作成します。

このプロファイルと先ほど定義した距離をプロファイル間ではかり木構造の距離を測ります。

このpとqの値はハイパーパラメータであり、木構造に依存します。木の構造に合わせてもっとも適切なパラメータの設定の必要があります。

実装方法

  • 木構造データのためのNodeの作成
  • 編集処理を行うShiftRegister
  • 編集距離計算のためのPROFILE

シンプルな実装方法で実現可能です。

木構造データのためのNodeの作成

構造化データを木構造に変換して扱う必要があります。下記のコードではシンプルに作成した

Nodeにラベルを設定する初期化処理
Nodeに子ノードを加える処理

class NodePQ(object):

    def __init__(self, label):
        self.label = label
        self.children = list()

    def addkid(self, node, before=False):
        if before:
            self.children.insert(0, node)
        else:
            self.children.append(node)
        return node

編集処理を行うShiftRegister

ShiftRegisterの説明に入ります。

  • 引数のサイズ分*で初期化したリストを用意します
  • コンカチでリストを結合しています。
  • shift処理で最も古い要素を外し、引数で与えた要素を追加しています。
class ShiftRegister(object):

    def __init__(self, size):
        self.register = list()
        for i in range(size):
            self.register.append("*")

    def concatenate(self, reg):
        temp = list(self.register)
        temp.extend(reg.register)
        return temp

    def shift(self, el):
        self.register.pop(0)
        self.register.append(el)

PROFILE

ancとsibの作成を行い再帰的にPROFILEを作成する部分になります。

  • 初期化の部分でancestorsにShiftRegisterを設定します。
  • 設定したancestorsを利用してPROFILEを作成します。
  • 作成されたPROFILEをソート
class ProfilePQ(object):

    def __init__(self, root, p=2, q=3):
        super(ProfilePQ, self).__init__()
        ancestors = ShiftRegister(p)
        self.list = list()

        self.profile(root, p, q, ancestors)
        self.sort()

この部分がPQディスタンスのための重要な処理になります。

  • 引数で与えられたrootのラベルをancestorsに設定します。
  • siblingsにqサイズ分のシフトレジスターを設定します。
  • 子が0になればancestorsとsiblingsをコンカチしてプロファイルを作成
  • rootの子に対してsiblingsに要素を追加し、コンカチ。この処理が子がなくなるまで再帰的に行われる。
  • 一番下の子まで行くとその下のsiblingsを"*"で穴埋めする
    def profile(self, root, p, q, ancestors):
        ancestors.shift(root.label)
        siblings = ShiftRegister(q)

        if(len(root.children) == 0):
            self.append(ancestors.concatenate(siblings))
        else:
            for child in root.children:
                siblings.shift(child.label)
                self.append(ancestors.concatenate(siblings))
                self.profile(child, p, q, copy.deepcopy(ancestors))
            for i in range(q-1):
                siblings.shift("*")
                self.append(ancestors.concatenate(siblings))

距離計算部分になります。
このedit_distanceでPROFILEと引数で与えたotherの距離を計算しています。
intersectionで作成したNodeの要素の一致率を計算しています。

    def edit_distance(self, other):
        with Bench():
            union = len(self) + len(other)
            return 1.0 - 2.0*(self.intersection(other)/union)

    def intersection(self, other):
        intersect = 0.0
        i = j = 0
        while i < len(self) and j < len(other):
            intersect += self.gram_edit_distance(self[i], other[j])
            if self[i] == other[j]:
                i += 1
                j += 1
            elif self[i] < other[j]:
                i += 1
            else:
                j += 1
        return intersect

    def gram_edit_distance(self, gram1, gram2):
        distance = 0.0
        if gram1 == gram2:
            distance = 1.0
        return distance

これだけで実装は終了ですがテストで適切に動作しているか確認する必要があります。

Testについて

下記の処理でProfileが等しいかどうかチェックしています。

  • 長さをチェックして異なる場合はFalseまた最初の要素が異なる場合はFalseを返す
  • profileから要素を取得して各要素が一つでも一致しなけばFalseを返す
def checkProfileEquality(self, profile1, profile2):
        if len(profile1) != len(profile2) or len(profile1[0]) != len(profile2[0]):
            return False
        for gram1 in profile1:
            contains = False
            for gram2 in profile2:
                if gram1 == gram2:
                    contains = True
                    break
            if contains == False:
                return False
        return True

pとqの初期設定をしています。ランダムな値の設定

    def setUp(self):
        self.p = 2
        self.q = 3
        num_random = 10
        self.trees = list()
        self.profiles = list()

テスト用のツリーを作成

        # Construct one-node trees
        self.small_tree1 = NodePQ("a")
        self.small_tree2 = NodePQ("b")
        self.trees.append(self.small_tree1)
        self.trees.append(self.small_tree2)

        self.small_profile1 = [['*','a','*','*','*']]
        self.small_profile2 = [['*','b','*','*','*']]

ランダムな木を作成してその木からのプロファイルを作成します。

        # Construct a few randomly generated trees
        for i in range(0, num_random):
            depth = random.randint(1, 10)
            width = random.randint(1, 5)
            self.trees.append(randtree(depth=depth, width=width, repeat=4))

        # Generate Profiles
        for tree1 in self.trees:
            self.profiles.append(ProfilePQ(tree1, self.p, self.q))

ランダムな木を下記の処理で作成します。

def randtree(depth=2, alpha='abcdefghijklmnopqrstuvwxyz', width=2, repeat=2):
    labels = [''.join(x) for x in itertools.product(alpha, repeat=repeat)]
    random.shuffle(labels)
    labels = (x for x in labels)
    root = NodePQ("root")
    p = [root]
    c = list()
    for x in range(depth-1):
        for y in p:
            for z in range(random.randint(1,1+width)):
                n = NodePQ(next(labels))
                y.addkid(n)
                c.append(n)
        p = c
        c = list()
    return root

距離計算が正しいか計算しています

    def testProfileCreation(self):
        small_tree1_equality = self.checkProfileEquality(self.profiles[0], self.small_profile1)
        small_tree2_equality = self.checkProfileEquality(self.profiles[1], self.small_profile2)

        self.assertEqual(small_tree1_equality, True)
        self.assertEqual(small_tree2_equality, True)

AとBの距離、BとAの距離の逆の場合でも計算が正しいかチェックしています。

def testSymmetry(self):
        """x.edit_distance(y) should be the same as y.edit_distance(x)"""
        for profile1 in self.profiles:
            for profile2 in self.profiles:
                self.assertEqual(profile1.edit_distance(profile2), profile2.edit_distance(profile1))

距離計算が0から1の間にあるかチェック

    def testEditDistanceBoundaries(self):
        for profile1 in self.profiles:
            for profile2 in self.profiles:
                self.assertTrue(profile1.edit_distance(profile2) <= 1.0 and profile1.edit_distance(profile2) >= 0

まとめ

この手法の利点は構造化されているデータに汎用的に使用出来る手法になります。HTML、json、XMLなど木構造に変換できるデータセットがあれば使える手法なので必要なデータの抽出、異常検知などの応用範囲も広いので是非トライして頂けると幸いです。

参考

Tree Edit Distanceと自然言語処理への応用
Approximate Matching of Hierarc hical Data Using pq -Grams

Why not register and get more from Qiita?
  1. We will deliver articles that match you
    By following users and tags, you can catch up information on technical fields that you are interested in as a whole
  2. you can read useful information later efficiently
    By "stocking" the articles you like, you can search right away