はじめに
昨年に引き続き、今年も文字情報記事を埋め込み(embedding)データにしてデータベースを構築する作業が発生した。昨年はElasticsearchだったが、今年は、Solr9.7にデータを登録する。また、昨年とは異なり、元データを収めたJSONファイルが「ひとつのJSON配列が書き込まれた巨大ファイル」である。
この記事は、可能な限りマシンパワーを使って短時間で表記件名処理を完遂する手法を、Pythonスクリプトを用いて模索した記録である。
環境
- Ubuntu24.04
- CPU: 24コア(Xeon E5-2640v3 x 2)
- メインメモリ: 64GB
- GPU: Geforce RTX3090
- VRAM: 24GB
- python 3.12
- cuda 12.5
- sentence-transformer 3.3.1
まずは、巨大ファイルJSONについて簡単な説明。
- ファイルサイズ9GB
- 一つのJSON配列が含まれている
- ファイルのJSON配列は、280万件程度の要素でできている
- 280万件配列の1要素は、複数要素で構成されている
- 埋め込み対象となる文章(以下、「記事」または「記事文章」)は、配列の各要素のなかの、いち要素である
- 記事文章は、ほとんどが英文章である。(多言語混在文章はない。本来、すべて英文であるはずなのだが、どうやら誤ってデータが混入しているらしい)
もうね、9GBで1ファイルとか驚かない時代ですよ・・・
課題
(課題1)9GBファイルを読み込む件
ちょっと悩んだ。読み込むだけなら、そりゃ open(file_path, 'r') でいいんだろうけどさ。その読み込みをおわった状態で、ほかの処理をやろうとすると、メモリが足りなくなってPythonが音もなく落ちる(仕事放棄する)。これについては、ijsonというモジュールをpip install して使うことで避けられることを chatGPTさんが教えてくれた。いわく、巨大なJSONファイルを”ストリーミング”で読み込んでくれる、とのこと。
(課題2)embedding対象サイズについて
このijsonを使ってひととおりの処理(JSON読み込み → embedding & metaデータ生成 → Solrに登録)をやってみた。9GBファイルのJSONを1行ずつ順次読み込みしてembeddingする。なにも考えずに、「記事文章まるっと384次元のベクトルに変換」した。こうしてできたSolrインデックスにベクタサーチを行ってRAGを構築したのだが・・・なんか精度が高くない気がする。調べてみると記事の文章の長さはまちまち。100文字程度~数万文字まで。これは、記事の文章を適切にsplit(chop)する必要がある、と感じた。
また、9GBファイルのJSONデータを1行ずつ読み込み処理する今回の方法は、およそ24時間の時間がかかった。embedding処理対象の記事文章をsplitすると、処理回数が増えるので、さらに時間がかかることになる。2分割なら2倍、4分割なら4倍と単純に予想できる。個人の感覚として、2日までは許せるけど4日以上は許しがたい。
課題対処方法
あくまで、本記事での例であり、ほかに良い方法があるのかもしれないが。
(課題1対処方法)元ファイルを分割し、分割したファイルをマルチプロセス処理にかける
昨年の対処方法にもあるように、pythonのマルチプロセス処理において複数ファイルをそれぞれ処理させる方法。CPUコアが複数あり、メインメモリ、VRAMともに十分確保できるなら、これが有効手段だろう。このため、まず、ファイルを分割する仕組みを構築した。
from datetime import datetime
import os
import json
LOG_PATH = './log_file.txt'
def get_file_size(file_path):
with open(file_path, 'r') as f:
f.seek(0, 2) # Move to the end of the file
return f.tell()
def split_file(input_file, lines_per_file):
with open(input_file, 'r') as infile:
file_number = 0
output_file = f"./source_files/crawler_data_part_{file_number}.json"
outfile = open(output_file, 'w')
for i, line in enumerate(infile):
if i % lines_per_file == 0 and i > 0:
outfile.close()
file_number += 1
output_file = f"./source_files/crawler_data_part_{file_number}.json"
outfile = open(output_file, 'w')
outfile.write(line)
outfile.close()
def split_json(input_file, split_prefix, chunk_size):
with open(input_file, 'r') as infile:
# 単純に読み込み
data = json.load(infile)
# JSONデータが配列であることを確認
if not isinstance(data, list):
raise ValueError("The input JSON file must contain an array.")
# チャンクに分割してファイルに保存
for i in range(0, len(data), chunk_size):
chunk = data[i:i + chunk_size]
output_file = f"{split_prefix}_{i // chunk_size}.json"
with open(output_file, 'w') as outfile:
json.dump(chunk, outfile, indent=2)
def split_work(file_size, num_workers):
chunk_size = file_size // num_workers
ranges = [(i * chunk_size, (i + 1) * chunk_size) for i in range(num_workers)]
ranges[-1] = (ranges[-1][0], file_size) # Ensure the last chunk goes to the end of the file
return ranges
def main(input_file: str):
from parallel_worker_sentence_transformer import worker
# 分割ファイル名のプレフィックス
split_prefix = './source_files/crawler_data_part' #(動作環境により異なるよね)
#
chunk_size = 100000
# 巨大ファイルの分割
split_json(input_file, split_prefix, chunk_size) # chunk_size=10万行ごとに分割
if __name__ == "__main__":
# log 出力
now = datetime.now().strftime("%Y/%m/%d %H:%M:%S")
with open(LOG_PATH, mode='w') as f:
f.write(now+"\t[start]\n")
# 対象となる巨大JSONファイルのパスを指定
file_path = '../crawler_data.raw_data.json' #(巨大JSONファイル)
main(file_path)
now = datetime.now().strftime("%Y/%m/%d %H:%M:%S")
with open(LOG_PATH, mode='a') as f:
f.write(now+"\t[end]\n")
ファイルは、巨大なJSON配列なので、単純に行番号で分割するわけにもいかない。JSONデータの配列として一度全体を読み込んで、指定の大きさのJSON配列として別ファイルに保存する。ファイル保存は重複しないようにファイル名の番号をインクリメントしながら分割ファイルを増やしていく、という仕組み。標準モジュールだけで対処している。シンプルな力業。この方法で、9GB270万件データの27ファイルへの分割は、約4時間で終了。
(課題2対処方法)embedding処理
まずは、文章の分割について検討した。たとえば昨年は、characterSplitter(指定の文字数で分割する)を使った。この分割は、文章のなかの単語を容赦なく分断するので、分割された文章の文意がうまくベクトル化されるのか不安だ(←人間にとって不安なだけで、embeddingモデルにとっては問題ないのかもしれないが)。そこで、ntkをつかって、いったん、文章を「一文ずつ要素にもつ配列」に変換して、さらに「その配列を指定の要素数ごとに要素としてもつ配列(以下、「記事最終配列」)」を作ることにした。こうしてできた記事最終配列を、要素ごとにembeddingする。embeddingデータと一緒にメタデータを記録することにするが、これは記事最終配列=1記事で対応するメタデータを共通してもつ、という設計にした。
(注)NltkTool.py は、上述の文章分割(記事最終配列生成)以外のメソッドも記述してある。筆者の知識およばず、split_textで、downloadした情報をつかっているのかいないのか判断できなかったから。詳しい方は、ご自身で不要なメソッドを削除したクラスを設定していただければ幸いです。
import nltk
from nltk.tokenize import word_tokenize
from nltk import pos_tag
from nltk.stem import PorterStemmer
from nltk.stem import WordNetLemmatizer
class NltkTool:
def __init__(self):
nltk.download('punkt')
nltk.download('averaged_perceptron_tagger')
nltk.download('wordnet')
def split_text(self, text):
return nltk.sent_tokenize(text)
def tokenize_words(self, text):
return word_tokenize(text)
def tag_parts_of_speech(self, text):
words = self.tokenize_words(text)
return pos_tag(words)
def stem_words(self, text):
stemmer = PorterStemmer()
words = self.tokenize_words(text)
return [stemmer.stem(word) for word in words]
def lemmatize_words(self, text):
lemmatizer = WordNetLemmatizer()
words = self.tokenize_words(text)
return [lemmatizer.lemmatize(word) for word in words]
if __name__ == "__main__":
nltk.download('punkt')
nltk.download('punkt_tab')
nltk.download('averaged_perceptron_tagger')
nltk.download('wordnet')
埋め込み(embedding)は、sentence-transformer で、"paraphrase-multilingual-MiniLM-L12-v2"を使った。
import json
from multiprocessing import Pool
from sentence_transformers import SentenceTransformer
from NltkTool import NltkTool
from typing import List, Dict, Any
import mmap
import ijson
# Constants
COLLECTION_NAME = "articles"
TEXT_FIELD_NAME = "text"
EMBEDDING_FIELD_NAME = "embedding"
TITLE_FIELD_NAME = "title"
URL_FIELD_NAME = "url"
PUBLISHED_DATE_FIELD_NAME = "published_at"
ORGANIZATION_FIELD_NAME = "organization"
COUNTRY_FIELD_NAME ="country"
COUNTRY_ISO_CODE_FIELD_NAME = "country_iso_code"
COUNTRY_TAGS_FIELD_NAME = "country_tags"
MODEL = "paraphrase-multilingual-MiniLM-L12-v2"
def create_embedding(sentences, embedder_model):
# Calculate embeddings by calling model.encode()
embeddings = embedder_model.encode(sentences, device="cuda:0")
return embeddings.tolist()
def array_splitter(num: int, arr: list):
#配列の要素数をカウント
length = len(arr)
#開始位置を指定
n = 0
results = []
#配列を指定した個数で分割していくループ処理
while n < length:
results.append("".join(arr[n:n+num:1]))
n += num
#分割余りをresultsの最後にappend
results.append("".join(arr[n-num:length-1:1]))
return results
def chunk_document(document: dict, nltkTool:NltkTool) -> list[dict]:
sentences = nltkTool.split_text(text=document['description'])
paragraphs = array_splitter(8, sentences)
country_tags = []
for x in document['country_tags']:
country_tags.append(x['name'])
organization_name =''
country_name =''
if 'organization' in document:
if len(document['organization'])>0:
if 'name' in document['organization'][0]:
organization_name = document['organization'][0]['name']
if 'country' in document['organization'][0]:
country_name = document['organization'][0]['country']
iso_code=''
if 'country' in document:
if len(document['country'])>0:
if 'iso_code' in document['country'][0]:
iso_code = document['country'][0]['iso_code']
if 'name' in document['country'][0]:
country_name = document['country'][0]['name']
published_datetime = ""
if 'published_datetime' in document:
if '$date' in document['published_datetime']:
published_datetime = document['published_datetime']['$date']
chunks = []
for i, paragraph in enumerate(paragraphs):
try:
chunk = {
'title': document['title'],
'content': paragraph.strip(),
'url': document['link'],
'published_at': published_datetime,
'chunk_id': f"{document['link']}_{i}",
'organization_name': organization_name,
'organization_country': country_name,
'country_iso_code': iso_code,
'country_tags': country_tags
}
chunks.append(chunk)
except Exception as e:
print(f"An error occurred: {e}")
pass
return chunks
def save_chunk_and_embedding(chunk: dict, embedding: List[float], output_file_prefix, file_number):
try:
data = {
TITLE_FIELD_NAME: chunk['title'],
TEXT_FIELD_NAME: chunk['content'],
EMBEDDING_FIELD_NAME: embedding,
URL_FIELD_NAME: chunk['url'],
PUBLISHED_DATE_FIELD_NAME: chunk['published_at'],
ORGANIZATION_FIELD_NAME: chunk['organization_name'],
COUNTRY_FIELD_NAME: chunk['organization_country'],
COUNTRY_ISO_CODE_FIELD_NAME: chunk['country_iso_code'],
COUNTRY_TAGS_FIELD_NAME: chunk['country_tags'],
'chunk_id': chunk['chunk_id']
}
# asyncio.run(async_post_request(url=SOLR_URL, data=data))
# 出力ファイルに書き込む
output_file = f'{output_file_prefix}{file_number}.json'
with open(output_file, 'a') as outfile:
json.dump(data, outfile)
outfile.write("\n")
except Exception as e:
print("cound not send data: ", {e})
def process_document(documents, chunk_number, output_file_prefix, embedder_model, nltkTool):
try:
for document in documents:
chunks = chunk_document(document, nltkTool=nltkTool)
for chunk in chunks:
print(f"Start process chunk: {chunk['title'][:50]}...")
embedding = create_embedding(
chunk['published_at'] +";"
+ chunk['organization_country'] +";"
+ chunk['organization_name'] +";"
+ chunk['title'] +";"
+ chunk['content'],
embedder_model
)
save_chunk_and_embedding(chunk, embedding, output_file_prefix, chunk_number)
print(f"Processed chunks: {chunk['title'][:50]}...")
except Exception as e:
print("cound not process documents: ", {e})
def worker(file_path, output_file_prefix, worker_id, chunk_size):
documents = []
# Load a pretrained Sentence Transformer model
embedder_model = SentenceTransformer(MODEL, model_kwargs={"torch_dtype": "float16"})
nltkTool = NltkTool()
print('worker ready: ', worker_id)
with open(file_path, 'r+b') as infile:
# メモリマップファイルを使用
mmapped_file = mmap.mmap(infile.fileno(), 0, access=mmap.ACCESS_READ)
parser = ijson.items(mmapped_file, 'item')
chunk_number = worker_id*1000
for item in parser:
documents.append(item)
if len(documents) >= chunk_size:
process_document(documents, chunk_number, output_file_prefix, embedder_model, nltkTool=nltkTool)
chunk_number += 1
documents = []
# 最後のチャンクが残っている場合
if documents:
process_document(documents, chunk_number, output_file_prefix, embedder_model, nltkTool=nltkTool)
if __name__ == "__main__":
# 分割されたファイルのリストを取得
num_files = len([name for name in os.listdir('./source_files') if name.startswith('crawler_data') and name.endswith('.json')])
file_list = [f"{split_prefix}_{i}.json" for i in range(num_files)]
# 出力ファイルのプレフィックス
output_file_prefix = './json/telli_ge_chunk_'
# CPUのコア数に応じたワーカー数
num_workers = 20 # multiprocessing.cpu_count()で全力出してもいいかも・・・
with Pool(num_workers) as pool:
for i, splitted_file in enumerate(file_list):
worker_args = (splitted_file, output_file_prefix, i, 5000)
pool.apply_async(worker, worker_args)
pool.close()
pool.join()
embeddingするときに、記事最終配列の要素である分割文章だけでなく、なんとなく記事のメタデータも含めてみた。これは、あとでRAGをやるときに好影響を期待して、の余計な情報追加なのだ。
昨年と異なり、sentence-transformer で、"paraphrase-multilingual-MiniLM-L12-v2"モデルをつかった。このモデルは、VRAM使用サイズが500MB以下。なので、ワーカーを20立てても、GPUのVRAM容量に収まった。
結果、3時間程度で530個程度合計15GB弱のファイル群が出力された。
(おまけ)フォルダ内の複数ファイルを、Solr9に投入する。
curlを使って、SolrにJSONデータを投入する。こういうのもシェル芸っていうのかな?
(注)あらかじめスキーマ設定は作成し、登録済み(articles_config)の例。
#!/bin/sh
# 今日の日付を取得してフォーマットする
today=$(date +"%Y%m%d%H%M%S")
# 新規コアの名称
SOLR_CORE="articles_$today"
CONFIG_NAME="articles_config"
TARGET_HOST='localhost'
PORT_NUM='8983'
TARGET_ALIAS='articles'
# 新規にindexを作る
curl -H 'Content-Type: application/json' \
"http://${TARGET_HOST}:${PORT_NUM}/solr/admin/collections?action=CREATE&name=${SOLR_CORE}&collection.configName=${CONFIG_NAME}&router.name=compositeId&numShards=2&replicationFactor=1&waitForFinalState=true"
# 新規に作製したindexをリロードする
curl -H 'Content-Type: application/json' "http://${TARGET_HOST}:${PORT_NUM}/solr/admin/collections?action=RELOAD&name=${SOLR_CORE}"
# 作成済みのデータ json ファイルを指定する
dir_path="$(pwd)/json/"
echo $dir_path
dirs=`find $dir_path -maxdepth 1 -type f -name *.json`
# 新規indexにデータを投入する
for dir in $dirs;
do
echo $dir
file_basename=`basename $dir`
echo $file_basename
curl "http://${TARGET_HOST}:${PORT_NUM}/solr/${SOLR_CORE}/update/json/docs?commit=true" --data-binary @$dir -H 'Content-type:application/json'
done
# 新規に作製したindexをリロードする
curl -H 'Content-Type: application/json' "http://${TARGET_HOST}:${PORT_NUM}/solr/admin/collections?action=RELOAD&name=${SOLR_CORE}"
# ターゲットAliasのコアを切り替える。
curl -X POST -H 'Content-type:application/json' \
"http://${TARGET_HOST}:${PORT_NUM}/solr/admin/collections?action=CREATEALIAS&name=${TARGET_ALIAS}&collections=${SOLR_CORE}"
この処理は、Solr側のマシンスピードにもよる(特にディスクの読み書き)と思う。筆者の環境は、HDDにSolrを構築しているためか、3時間程度かかった。
考察
昨年のElasticsearch利用にくらべ、今年のSolr9インデクシングはさらに時間短縮できた。これは、ElasticsearchとSolrの性能差ではなく、今年使った Sentence-transformerのモデルが昨年に比べて大幅に小さいことによると思う。モデルが小さく、複数並列処理数を昨年より増やすことができたことから、昨年よりも処理時間を短縮できた。
余談であるが、この並列処理にいたるまでに、順次処理による試行錯誤を複数回行っている。終わってみれば1日かからずに終わる処理にまとまったが、その試行錯誤の過程においては、数日間PCを占有されるような期間もあったことは、苦労話としてご理解いただきたい。(←この記事をここまで読んでくれる人に感謝しつつ、愚痴を聞いてもらったってことね)
まとめ
Solr9のベクタサーチを使ったRAG構築等において、元データ(JSON配列ファイル)を極力短時間でEmbeddingしてSolr9にインデクシングする作業の参考として、本記事を書いた。270万件ほどのデータを12時間程度で処理する例として参考になれば、幸いである。