ScannのAnisotropic Vector Quantizationを実装してみる

RAG案件など、Vector Searchを触ることも増えてきたので今更ながら入門しscannについてまとめてみました。


  • scannの公式実装がよくわからかったため手を動かしてみる。
  • scannライブラリ全体の実装を追うのは限界があったため論文中のAnisotropic Vector Quantizationを実装した




ANNのアルゴリズムは多く存在しますが、今回はその中でも2020年にGoogleが発表したScann1と呼ばれるアルゴリズムを見ていきます。ScannはVertex AI Mathcing Engineの内部で利用されており、大規模で低レイテンシーなベクトル検索を提供しています。








直積量子化(Product Quantization, PQ)は[Jégou,2011]で提案された手法です。ANNのなかでよく見られるアプローチの一つで、ベクトル量子化をベースに高い保持効率を保ちつつ高い精度を出すことができます。

これにより、もとのfloat * Dのベクトルから $int * (D/M)$へとデータを変換します。

  1. (学習時) 学習データから各subspaceごとにcodebookを求める
  2. (検索時) クエリをもまた$M$個に分割してそれぞれでクラスタを計算
  3. クエリ$x$と検索データ$y$との距離 $\sum_{m=1}^M d(q(x^{(m)}),q(y^{(y)}))$を計算

$$\hat{d}(x^{(m)},y^{(y)})=d(x^{(m)},q(y^{(y)}))$$と行われます。(Asymmetric Distance Computing)


Anisotropic Vector Quantization


min_c|q^Tx_1 - q^Tc| = c_2


l(x,\hat{x},w) = \mathbb{E}_{q \sim Q}[w(q^Tx)(q^T(x-\hat{x}))^2]


l(x,\hat{x},w) = h_{\parallel}(w,||x||)||r_{\parallel}(x,\hat{x})||^2 + h_{\perp}(w,||x||)||r_{\perp}(x,\hat{x})||^2


h_{\parallel}(w,||x||) = (d - 1) \int_{0}^{\pi} w(||x|| \cos \theta) (\sin^{d-2} \theta - \sin^d \theta) d\theta

h_{\perp}(w,||x||) = \int_{0}^{\pi} w(||x|| \cos \theta)\sin^d \theta d\theta

ただ、$w(t) = I(t > T)$とすることで

l(x,\hat{x},w) \propto \eta(w,||x||)||r_{\parallel}(x,\hat{x})||^2 + r_{\perp}(x,\hat{x})||^2

ここで$\eta= \frac{h_{\parallel}}{h_{\perp}}$であるが、$\eta$は以下の関係式が示されているのでこの式から簡単に計算できます

\lim_{d \rightarrow \infty} \frac{\eta(w,||x||)}{d-1} = \frac{(T/||x||)^2}{1-(T/||x||)^2}


Anisotropic Vector Quantizationのアルゴリズムは通常のベクトル量子化とほとんど同じで、

  1. 各ベクトルをAnistropic Lossが最も小さくなるcodebookに割り当てる
  2. codebookを次式に基づいて更新する

c_j^* = \left( I\mathop{\rm \sum}\limits_{x_i \in X_j} h_{i,\perp} + \mathop{\rm \sum}\limits_{x_i \in X_j} \frac{h_{i,\parallel}-h_{i,\perp}}{||x||^2}x_ix_i^T \right )^{-1} \mathop{\rm \sum}\limits_{x_i \in X_j}h_{i,\parallel}x_i



  1. 更新式に$h_{\parallel}$,$h_{\perp}$が出てくる(積分計算が必要となり解析的な解が求まらない)
  2. ベクトル量子化は行えるが、そのまま直積量子化に応用できそうにない(後述)



c_j \leftarrow \mathop{\rm argmin}\limits_{c \in \mathrm{R
}^d} \mathop{\rm \sum}\limits_{x_i \in X_j} \eta(w,c) ||r_{\parallel}(x_i,\hat{x})||^2 + ||r_{\perp}(x_i,\hat{x_i})||^2

これを論文Appendix 7.4に沿って解いていくと、

l(x_i,c) &= \eta(w,||x_i||) ||r_{\parallel}(x_i,\hat{x})||^2 + ||r_{\perp}(x_i,\hat{x_i})||^2 \\ 
&= \eta(w,||x_i||) \left( ||x_i||^2 +  \frac{\hat{x}_i^Tx_ix_i^T\hat{x}_i}{||x_i||^2} - 2x_i^T\hat{x}_i\right) + ||x_i||^2 + ||\hat{x}_i||^2 - 2x_i^T\hat{x}_i - ||r_{\parallel}||^2 \\
&= (\eta(w,||x_i||)-1)\left( ||x_i||^2 +  \frac{\hat{x}_i^Tx_ix_i^T\hat{x}_i}{||x_i||^2} - 2x_i^T\hat{x}_i\right) + ||x_i||^2 + ||\hat{x}_i||^2 - 2x_i^T\hat{x}_i\\
&= \hat{x}_i^T\left(\frac{\eta(w,||x_i||)-1}{||x_i||^2} x_ix_i^T + I\right)\hat{x}_i - 2x_i^T\hat{x}_i + \eta(w,||x_i||)||x_i||^2\\

\left(\mathop{\rm \sum}\limits_{x_i \in X_j}\frac{\eta(w,||x_i||)-1}{||x_i||^2} x_ix_i^T + I\right)^{-1}\left(\mathop{\rm \sum}\limits_{x_i \in X_j}\eta(w,||x_i||)x_i\right)


通常の直積量子化では各subspace毎にベクトル量子化を行うことができるのですが、Anisotropic Lossではあるsubspaceのベクトル量子化で$r$を計算するために他のsubspaceでのcodebookが必要となります。
つまり一点に対してすべてのsubspace、クラスターにおいてanistoropic lossを計算しようと思うと $m^k$($m$はsubspaceの数、$k$はk-meansクラスターの数を)通りのlossの最小の組み合わせを計算する必要がありそうです。論文のこのあたりに対する言及がありそうなのですがいまいち理解できませんでした。




import numpy as np
from scipy.cluster.vq import kmeans2, vq

def dist_l2(q, x):
    return np.linalg.norm(q - x, ord=2, axis=1) ** 2

def dist_ip(q, x):
    return q @ x.T

metric_function_map = {"l2": dist_l2, "dot": dist_ip}

class PQ:

    def __init__(self, M, Ks=256, metric="l2", verbose=True):
        assert 0 < Ks <= 2**32
        assert metric in ["l2", "dot"]
        self.M, self.Ks, self.metric, self.verbose = M, Ks, metric, verbose
        self.code_dtype = (
            np.uint8 if Ks <= 2**8 else (np.uint16 if Ks <= 2**16 else np.uint32)
        self.codewords = None
        self.Ds = None

        if verbose:
                f"M: {M}, Ks: {Ks}, metric : {self.code_dtype}, code_dtype: {metric}",

    def __eq__(self, other):
        if isinstance(other, PQ):
            return (
            ) == (
            ) and np.array_equal(
                self.codewords, other.codewords,
            return False

    def fit(self, vecs, iter=20, seed=123, minit="points"):
        """Given training vectors, run k-means for each sub-space and create
        codewords for each sub-space.

        This function should be run once first of all.

            vecs (np.ndarray): Training vectors with shape=(N, D) and dtype=np.float32.
            iter (int): The number of iteration for k-means
            seed (int): The seed for random process
            minit (str): The method for initialization of centroids for k-means (either 'random', '++', 'points', 'matrix')

            object: self

        assert vecs.dtype == np.float32
        assert vecs.ndim == 2
        N, D = vecs.shape
        assert self.Ks < N, "the number of training vector should be more than Ks"
        assert D % self.M == 0, "input dimension must be dividable by M"
        assert minit in ["random", "++", "points", "matrix"]
        self.Ds = int(D / self.M)

        if self.verbose:
            print(f"iter: {iter}, seed: {seed}")

        # [m][ks][ds]: m-th subspace, ks-the codeword, ds-th dim
        self.codewords = np.zeros((self.M, self.Ks, self.Ds), dtype=np.float32)
        for m in range(self.M):
            if self.verbose:
                print(f"Training the subspace: {m} / {self.M}")
            vecs_sub = vecs[:, m * self.Ds : (m + 1) * self.Ds]
            self.codewords[m], label = kmeans2(vecs_sub, self.Ks, iter=iter, minit=minit)

        return self

    def encode(self, vecs):
        """Encode input vectors into PQ-codes.

            vecs (np.ndarray): Input vectors with shape=(N, D) and dtype=np.float32.

            np.ndarray: PQ codes with shape=(N, M) and dtype=self.code_dtype

        assert vecs.dtype == np.float32
        assert vecs.ndim == 2
        N, D = vecs.shape
        assert self.Ds * self.M == D, "input dimension must be Ds * M"

        # codes[n][m] : code of n-th vec, m-th subspace
        codes = np.empty((N, self.M), dtype=self.code_dtype)
        for m in range(self.M):
            if self.verbose:
                print(f"Encoding the subspace: {m} / {self.M}")
            vecs_sub = vecs[:, m * self.Ds : (m + 1) * self.Ds]
            codes[:, m], _ = vq(vecs_sub, self.codewords[m])

        return codes

    def decode(self, codes):
        """Given PQ-codes, reconstruct original D-dimensional vectors
        approximately by fetching the codewords.

            codes (np.ndarray): PQ-cdoes with shape=(N, M) and dtype=self.code_dtype.
                Each row is a PQ-code

            np.ndarray: Reconstructed vectors with shape=(N, D) and dtype=np.float32

        assert codes.ndim == 2
        N, M = codes.shape
        assert M == self.M
        assert codes.dtype == self.code_dtype

        vecs = np.empty((N, self.Ds * self.M), dtype=np.float32)
        for m in range(self.M):
            vecs[:, m * self.Ds : (m + 1) * self.Ds] = self.codewords[m][codes[:, m], :]

        return vecs

    def dtable(self, query):
        """Compute a distance table for a query vector.
        The distances are computed by comparing each sub-vector of the query
        to the codewords for each sub-subspace.
        `dtable[m][ks]` contains the squared Euclidean distance between
        the `m`-th sub-vector of the query and the `ks`-th codeword
        for the `m`-th sub-space (`self.codewords[m][ks]`).

            query (np.ndarray): Input vector with shape=(D, ) and dtype=np.float32

                Distance table. which contains
                dtable with shape=(M, Ks) and dtype=np.float32

        assert query.dtype == np.float32
        assert query.ndim == 1, "input must be a single vector"
        (D,) = query.shape
        assert self.Ds * self.M == D, "input dimension must be Ds * M"

        # dtable[m] : distance between m-th subvec and m-th codewords (m-th subspace)
        # dtable[m][ks] : distance between m-th subvec and ks-th codeword of m-th codewords
        dtable = np.empty((self.M, self.Ks), dtype=np.float32)
        for m in range(self.M):
            query_sub = query[m * self.Ds : (m + 1) * self.Ds]
            dtable[m, :] = metric_function_map[self.metric](
                query_sub, self.codewords[m],

            # In case of L2, the above line would be:
            # dtable[m, :] = np.linalg.norm(self.codewords[m] - query_sub, axis=1) ** 2

        return DistanceTable(dtable, metric=self.metric)

class DistanceTable:
    """Distance table from query to codewords.
    Given a query vector, a PQ/OPQ instance compute this DistanceTable class
    using :func:`PQ.dtable` or :func:`OPQ.dtable`.
    The Asymmetric Distance from query to each database codes can be computed
    by :func:`DistanceTable.adist`.

        dtable (np.ndarray): Distance table with shape=(M, Ks) and dtype=np.float32
            computed by :func:`PQ.dtable` or :func:`OPQ.dtable`
        metric (str): metric type to calculate distance

        dtable (np.ndarray): Distance table with shape=(M, Ks) and dtype=np.float32.
            Note that dtable[m][ks] contains the squared Euclidean distance between
            (1) m-th sub-vector of query and (2) ks-th codeword for m-th subspace.


    def __init__(self, dtable, metric="l2"):
        assert dtable.ndim == 2
        assert dtable.dtype == np.float32
        assert metric in ["l2", "dot"]
        self.dtable = dtable
        self.metric = metric

    def adist(self, codes):
        """Given PQ-codes, compute Asymmetric Distances between the query (self.dtable)
        and the PQ-codes.

            codes (np.ndarray): PQ codes with shape=(N, M) and
                dtype=pq.code_dtype where pq is a pq instance that creates the codes

            np.ndarray: Asymmetric Distances with shape=(N, ) and dtype=np.float32


        assert codes.ndim == 2
        N, M = codes.shape
        assert self.dtable.shape[0] == M

        # Fetch distance values using codes.
        dists = np.sum(self.dtable[range(M), codes], axis=1)

        # The above line is equivalent to the followings:
        # dists = np.zeros((N, )).astype(np.float32)
        # for n in range(N):
        #     for m in range(M):
        #         dists[n] += self.dtable[m][codes[n][m]]

        return dists

Anisotropic Quantization 学習コード(速度を度外視して愚直に数式通り書きました)

import numpy as np
from tqdm import tqdm

def dist_l2(q, x):
    return np.linalg.norm(q - x, ord=2, axis=1) ** 2

def dist_ip(q, x):
    return q @ x.T

metric_function_map = {"l2": dist_l2, "dot": dist_ip}

def _initialize_kmeans_plusplus(X:np.ndarray, n_clusters:int, random_state:int):
    n_samples, n_features = X.shape
    centers = np.empty((n_clusters, n_features), dtype=X.dtype)
    centers[0] = X[np.random.RandomState(random_state).randint(n_samples)]
    for i in range(1, n_clusters):
        dist = np.linalg.norm(X[:,None] - centers[:i],axis=2)
        dist = np.min(dist, axis=1) / (dist ** 2).sum(axis=1) if i > 1 else np.min(dist, axis=1)

        indices  = np.random.choice(n_samples, p=dist / dist.sum())
        centers[i] = X[indices]
    return centers

class APQ:
    """Pure python implementation of Product Quantization (PQ) [Jegou11]_.

    For the indexing phase of database vectors,
    a `D`-dim input vector is divided into `M` `D`/`M`-dim sub-vectors.
    Each sub-vector is quantized into a small integer via `Ks` codewords.
    For the querying phase, given a new `D`-dim query vector, the distance beween the query
    and the database PQ-codes are efficiently approximated via Asymmetric Distance.

    All vectors must be np.ndarray with np.float32

    .. [Jegou11] H. Jegou et al., "Product Quantization for Nearest Neighbor Search", IEEE TPAMI 2011

        M (int): The number of sub-space
        Ks (int): The number of codewords for each subspace
            (typically 256, so that each sub-vector is quantized
            into 8 bits = 1 byte = uint8)
        metric (str): Type of metric used among vectors (either 'l2' or 'dot')
            Note that even for 'dot', kmeans and encoding are performed in the Euclidean space.
        verbose (bool): Verbose flag

        M (int): The number of sub-space
        Ks (int): The number of codewords for each subspace
        metric (str): Type of metric used among vectors
        verbose (bool): Verbose flag
        code_dtype (object): dtype of PQ-code. Either np.uint{8, 16, 32}
        codewords (np.ndarray): shape=(M, Ks, Ds) with dtype=np.float32.
            codewords[m][ks] means ks-th codeword (Ds-dim) for m-th subspace
        Ds (int): The dim of each sub-vector, i.e., Ds=D/M


    def __init__(self, M: int, Ks:int =256, metric: str="l2", anisotropic_threshold: float=0.2, verbose: bool=True):
        assert 0 < Ks <= 2**32
        assert metric in ["l2", "dot"]
        self.M, self.Ks, self.metric, self.verbose = M, Ks, metric, verbose
        self.code_dtype = (
            np.uint8 if Ks <= 2**8 else (np.uint16 if Ks <= 2**16 else np.uint32)
        self.anisotropic_threshold = anisotropic_threshold
        self.codewords = None
        self.Ds = None

        if verbose:
                f"M: {M}, Ks: {Ks}, metric : {self.code_dtype}, code_dtype: {metric}",

    def fit(self, vecs, iter=20, seed=123, minit="points"):
        """Given training vectors, run k-means for each sub-space and create
        codewords for each sub-space.

        This function should be run once first of all.

            vecs (np.ndarray): Training vectors with shape=(N, D) and dtype=np.float32.
            iter (int): The number of iteration for k-means
            seed (int): The seed for random process
            minit (str): The method for initialization of centroids for k-means (either 'random', '++', 'points', 'matrix')

            object: self

        assert vecs.dtype == np.float32
        assert vecs.ndim == 2
        N, D = vecs.shape
        assert self.Ks < N, "the number of training vector should be more than Ks"
        assert D % self.M == 0, "input dimension must be dividable by M"
        assert minit in ["random", "++", "points", "matrix"]
        self.Ds = int(D / self.M)

        if self.verbose:
            print(f"iter: {iter}, seed: {seed}")

        # [m][ks][ds]: m-th subspace, ks-the codeword, ds-th dim
        init_code_data = vecs[np.random.choice(len(vecs),self.Ks, replace=False)]
        self.codewords = np.zeros(self.Ks *  D)
        for m in range(self.M):
            self.codewords[m*self.Ds*self.Ks:(m+1)*self.Ds*self.Ks] = init_code_data[:,m * self.Ds:(m + 1) * self.Ds].flatten()

        all_assign_label = np.zeros((N,self.M),dtype=np.int32)
        for i in range(iter):

            A = np.zeros((self.Ks * D,self.Ks * D), dtype=np.float32)
            C = np.zeros(self.Ks * D, dtype=np.float32)

            for j,data in enumerate(tqdm(vecs)):
                B = np.zeros((D, D * self.Ks), dtype=np.int32)
                norm_squared = np.linalg.norm(data) ** 2
                eta = self.eta(data)
                quantized = np.zeros(D)

                for m in range(self.M):
                    ind = np.random.randint(0,self.Ks) if i ==0 else all_assign_label[i][m]
                    quantized[m*self.Ds: (m+1)*self.Ds] = self.codewords[m * self.Ds*self.Ks + ind * self.Ds : m * self.Ds*self.Ks + (ind+1) * self.Ds]

                assign_indices = np.zeros(self.M, dtype=np.int32)

                for it in range(3):
                    for m in range(self.M):
                        anisotropic_losses = []
                        for k in range(self.Ks):
                            quantized[m*self.Ds: (m+1)*self.Ds] = self.codewords[m * self.Ds*self.Ks + k * self.Ds : m * self.Ds*self.Ks + (k+1) * self.Ds]

                            loss = self.anisiotropic_loss(data, quantized, eta)
                        sub_cluster_index = np.argmin(anisotropic_losses)
                        assign_indices[m] = sub_cluster_index
                        quantized[m*self.Ds: (m+1)*self.Ds] = self.codewords[m * self.Ds*self.Ks + sub_cluster_index * self.Ds : m * self.Ds*self.Ks + (sub_cluster_index+1) * self.Ds]

                for m in range(self.M):
                    ind = assign_indices[m]
                    B[m*self.Ds:(m+1)*self.Ds,m*self.Ds*self.Ks + ind*self.Ds:m*self.Ds*self.Ks + (ind+1)*self.Ds] = np.eye(self.Ds)

                all_assign_label[j,:] = assign_indices
                x = data[:, np.newaxis]
                A +=  B.T @ ((eta - 1) / norm_squared * x @ x.T + np.eye(D)) @ B
                C += eta * B.T @ data

                from scipy.linalg import cho_factor, cho_solve
                c0, low = cho_factor(A)  # O(K**3 D**3)
                eig_set = np.linalg.eig(A)[0]
                min_eign = eig_set.min()
                max_eign = eig_set.max()
                reg = 1e-3
                dim = A.shape[0]
                c0, low = cho_factor(A + reg * np.eye(dim))

            old_codewords = self.codewords.copy()
            self.codewords = cho_solve((c0, low), C).flatten()#np.linalg.inv(A) @ C
            if np.sum(old_codewords - self.codewords) <= 1e-5:
            print(f"iter:{i}, diff:{np.linalg.norm(old_codewords - self.codewords)}")
        return self

    def eta(self, data:np.ndarray):
        T2 = self.anisotropic_threshold ** 2

        norm_squared = np.linalg.norm(data) ** 2
        dim = data.shape[-1]
        return (dim-1) * (T2 / norm_squared) / (1 - T2 / norm_squared)

    def anisiotropic_loss(self, data:np.ndarray, quantized: np.ndarray, eta=None):
        if eta is None:
            eta = self.eta(data)

        r_parallel = np.dot(data - quantized, data) * data / (np.linalg.norm(data) ** 2)
        r_parallel_norm = np.linalg.norm(r_parallel) ** 2
        r_perp = (data - quantized) - r_parallel
        r_perp_norm = np.linalg.norm(r_perp) ** 2
        return eta * r_parallel_norm + r_perp_norm

    def encode(self, vecs):
        """Encode input vectors into PQ-codes.

            vecs (np.ndarray): Input vectors with shape=(N, D) and dtype=np.float32.

            np.ndarray: PQ codes with shape=(N, M) and dtype=self.code_dtype

        assert vecs.dtype == np.float32
        assert vecs.ndim == 2
        N, D = vecs.shape
        assert self.Ds * self.M == D, "input dimension must be Ds * M"
        quantized = np.zeros(D)

        codes = np.empty((N, self.M), dtype=np.uint8)
        for i,data in enumerate(tqdm(vecs)):
            eta = self.eta(data)
            for m in range(self.M):
                ind = np.random.randint(0,self.Ks)
                quantized[m*self.Ds: (m+1)*self.Ds] = self.codewords[m * self.Ds*self.Ks + ind * self.Ds : m * self.Ds*self.Ks + (ind+1) * self.Ds]

            assign_indices = np.zeros(self.M, dtype=np.int32)
            for m in range(self.M):
                anisotropic_losses = []
                for k in range(self.Ks):
                    quantized[m*self.Ds: (m+1)*self.Ds] = self.codewords[m * self.Ds*self.Ks + k * self.Ds : m * self.Ds*self.Ks + (k+1) * self.Ds]

                    loss= self.anisiotropic_loss(data, quantized, eta)
                sub_cluster_index = np.argmin(anisotropic_losses)
                assign_indices[m] = sub_cluster_index
            codes[i,:] = assign_indices
        return codes

    def decode(self, codes):
        """Given PQ-codes, reconstruct original D-dimensional vectors
        approximately by fetching the codewords.

            codes (np.ndarray): PQ-cdoes with shape=(N, M) and dtype=self.code_dtype.
                Each row is a PQ-code

            np.ndarray: Reconstructed vectors with shape=(N, D) and dtype=np.float32

        assert codes.ndim == 2
        N, M = codes.shape
        assert M == self.M
        assert codes.dtype == self.code_dtype

        vecs = np.empty((N, self.Ds * self.M), dtype=np.float32)
        for m in range(self.M):
            vecs[:, m * self.Ds : (m + 1) * self.Ds] = self.codewords[m][codes[:, m], :]

        return vecs

    def dtable(self, query):
        """Compute a distance table for a query vector.
        The distances are computed by comparing each sub-vector of the query
        to the codewords for each sub-subspace.
        `dtable[m][ks]` contains the squared Euclidean distance between
        the `m`-th sub-vector of the query and the `ks`-th codeword
        for the `m`-th sub-space (`self.codewords[m][ks]`).

            query (np.ndarray): Input vector with shape=(D, ) and dtype=np.float32

                Distance table. which contains
                dtable with shape=(M, Ks) and dtype=np.float32

        assert query.dtype == np.float32
        assert query.ndim == 1, "input must be a single vector"
        (D,) = query.shape
        assert self.Ds * self.M == D, "input dimension must be Ds * M"

        # dtable[m] : distance between m-th subvec and m-th codewords (m-th subspace)
        # dtable[m][ks] : distance between m-th subvec and ks-th codeword of m-th codewords
        dtable = np.empty((self.M, self.Ks), dtype=np.float32)
        for m in range(self.M):
            query_sub = query[m * self.Ds : (m + 1) * self.Ds]
            codewords = self.codewords[m * self.Ds * self.Ks : (m + 1) * self.Ds * self.Ks].reshape(self.Ks,self.Ds)
            dtable[m, :] = metric_function_map[self.metric](
                query_sub, codewords,

            # In case of L2, the above line would be:
            # dtable[m, :] = np.linalg.norm(self.codewords[m] - query_sub, axis=1) ** 2

        return DistanceTable(dtable, metric=self.metric)

class DistanceTable:
    """Distance table from query to codewords.
    Given a query vector, a PQ/OPQ instance compute this DistanceTable class
    using :func:`PQ.dtable` or :func:`OPQ.dtable`.
    The Asymmetric Distance from query to each database codes can be computed
    by :func:`DistanceTable.adist`.

        dtable (np.ndarray): Distance table with shape=(M, Ks) and dtype=np.float32
            computed by :func:`PQ.dtable` or :func:`OPQ.dtable`
        metric (str): metric type to calculate distance

        dtable (np.ndarray): Distance table with shape=(M, Ks) and dtype=np.float32.
            Note that dtable[m][ks] contains the squared Euclidean distance between
            (1) m-th sub-vector of query and (2) ks-th codeword for m-th subspace.


    def __init__(self, dtable, metric="l2"):
        assert dtable.ndim == 2
        assert dtable.dtype == np.float32
        assert metric in ["l2", "dot"]
        self.dtable = dtable
        self.metric = metric

    def adist(self, codes):
        """Given PQ-codes, compute Asymmetric Distances between the query (self.dtable)
        and the PQ-codes.

            codes (np.ndarray): PQ codes with shape=(N, M) and
                dtype=pq.code_dtype where pq is a pq instance that creates the codes

            np.ndarray: Asymmetric Distances with shape=(N, ) and dtype=np.float32


        assert codes.ndim == 2
        N, M = codes.shape
        assert self.dtable.shape[0] == M

        # Fetch distance values using codes.
        dists = np.sum(self.dtable[range(M), codes], axis=1)

        # The above line is equivalent to the followings:
        # dists = np.zeros((N, )).astype(np.float32)
        # for n in range(N):
        #     for m in range(M):
        #         dists[n] += self.dtable[m][codes[n][m]]

        return dists


from pathlib import Path

import h5py
import numpy as np
from tqdm import trange

from scann_impl.apq import APQ
from scann_impl.pq import PQ

def compute_recall(neighbors: np.ndarray, true_neighbors: np.ndarray) -> float:
    total = 0
    for gt_row, row in zip(true_neighbors, neighbors):
        total += np.intersect1d(gt_row, row).shape[0]
    return total / true_neighbors.size

# with tempfile.TemporaryDirectory() as tmp:
#    response = requests.get("http://ann-benchmarks.com/glove-100-angular.hdf5")
#    loc = os.path.join(tmp, "glove.hdf5")
#    with open(loc, "wb") as f:
#        f.write(response.content)

glove_h5py = h5py.File(Path(__file__).parents[2] / "data" / "glove.hdf5", "r")

rng = np.random.default_rng(seed=1234)

dataset = glove_h5py["train"]
training_sample_size = 10000
training_dataset = dataset[()][rng.choice(len(dataset), training_sample_size, replace=False)]
training_dataset = training_dataset / np.linalg.norm(training_dataset, axis=1)[:, np.newaxis]

queries = glove_h5py["test"][()]

true_neighbors = glove_h5py["neighbors"][()]
# Instantiate with M=8 sub-spaces
for vq_class in [PQ,APQ]:
    pq = vq_class(M=25,Ks=16,metric="dot")

    # Train codewords

    # Encode to PQ-codes
    X_code = pq.encode(dataset)  # (10000, 8) with dtype=np.uint8

    # Results: create a distance table online, and compute Asymmetric Distance to each PQ-code
    neighbors = np.empty((len(true_neighbors),100), dtype=np.int64)
    reordered_neighbors = np.empty((len(true_neighbors),100), dtype=np.int64)
    for i in trange(len(queries)):
        dists = pq.dtable(queries[i]).adist(X_code)  # (10000, )
        closest_indices = np.argsort(dists)[::-1][:2000]
        neighbors[i,:] = closest_indices[:100]
        sorted_indices = np.sort(closest_indices)
        # reordering
        reordered_neighbors[i,:] = sorted_indices[np.argsort((queries[i] * dataset[sorted_indices]).sum(axis=1))[::-1][:100]]

    # we are given top 100 neighbors in the ground truth, so select top 10
    print(f"Recall of {vq_class.__name__}(no reorder): {compute_recall(neighbors, true_neighbors[:, :10])}")
    print(f"Recall of {vq_class.__name__}(reorder): {compute_recall(reordered_neighbors, true_neighbors[:, :10])}")


  • 手法
    • PQ: 通常の直積量子化
    • APQ: Scann
  • rescore
    • 最終的に得られた近似解を厳密に再計算して並び替える
> Recall of PQ(no rescore): 0.5848
> Recall of PQ(rescore): 0.6256

> Recall of APQ(no rescore): 0.37489
> Recall of APQ(rescore): 0.69028





  1. Accelerating Large-Scale Inference with Anisotropic Vector Quantization
    Announcing ScaNN: Efficient Vector Similarity Search

  2. 近似最近傍探索の最前線
    近似最近傍探索とVector DBの理論的背景

  3. A Survey of Product Quantization


