今回の取組み / 取り組みから得た知見
- GraphDBは以前から知っていたが、使うタイミングがなく触る機会がなかった
- LLMが出てきてGraphDBとLLMとの連動の記事は日本語でなく(投稿時点)、サンプルとして利用可能な状態になれたら有用かと思い、記事を書く
- 日本語のwikidataを登録するまでに仕事を除く時間で合計2週間詰めていた
- 今回利用するGraphDBはNeo4jであり、そもそも日本語ドキュメントが弱く、公式ページを見るのがやはり良かった
- 思っていたよりwikidataをneo4jにインポートする記事はあるが断片的な情報が多く、つなぎ合わせる必要があった
- 作成した結果のGraphDBは、使い方によってはRDBよりもデータ構造の理解は簡単になりそうで、使いどころが制限されるが有用なDBとなりそう
- 検索においてはElasticSearchと連携することでLLMでも有用な肌感
参考にしたページ
-
Wikipediaのダンプからページを取り出す
- 最初に取り組んだ際に出てきた記事
- データがバイト文字で検索できる点はとても使い勝手が良い
- データに精通していない場合、アーキテクチャも含め検討するのが難しいため、Microサービスとしてリリースしていくのが良さそうなデータ
- IndexデータとMultistreamデータで分割されているため、最初はIndexの解析から進めるのが良い
- データの分割
- Wikidataの知識をNeo4jで可視化してみる
- 過去(2019年)の情報だが、今も現役で利用可能
- ただし、一部は修正した方が操作性が上昇する
- Wikidataについて
- 参考ページ
- 結局公式ドキュメントからイメージを構築した方が早い
- どのようなデータがあるか、は有志のページが参照のこと
-
wikiextractor
- wikiのデータを加工してくれるツールがGithub上に公開されている
- 結果としてGoogle Colabでも半日-1日かかったし、結果をさらに解析して検索可能な状態にさせる必要がある(Index化)のため、使い方の検討が必要
wiki データの取得方法
- データを取得はWikipediaに公式でdumpファイルがあるため、そこからデータを取得する
- データが1週間ごとに更新されるため、同時に作業している人がいる場合は同期できるように管理する必要があるため注意
- MultiStream
- wikidatawiki-20231220-pages-articles-multistream-index.txt.bz2(100GB 以上!←時間がかかるので注意)
- wikidatawiki-20231220-pages-articles-multistream.xml.bz2(400 MB)
- 実際に利用したJsonデータ
- ドキュメント
- 共通URL
- latest-all.json.bz2(86052265689 Byte)
- 上記のバイト数を見ていただければわかるように大きいので、ネットワーク環境はしっかり構築しておいてください!
- 自宅は有線だったので止まりませんでした。(12時間程度?)
- Google Colab上でダウンロードした方が早かったです。(1時間もかからずだった気が.)
- それ以外に良い方法があれば教えてください!追記します。
- 以下は、上記の圧縮されたjsonデータを用いて作業を進めます
wiki データの展開方法
- ここからは実際のプログラムを記載していきます
- プログラム1(latest-all.json.bz2をそのままでneo4jに必要そうなデータを抽出する)
import bz2
import json
def get_claims_mainsnak(relation_p, claim):
relation_list = []
if claim.get("mainsnak") is None:
pass
else:
claim_mainsnak = claim["mainsnak"]
# print(claim_mainsnak)
claim_mainsnak_datatype = claim_mainsnak.get("datatype", "")
if claim_mainsnak_datatype == "wikibase-item":
try:
# print(claim_mainsnak)
claim_mainsnak_datavalue = claim_mainsnak.get("datavalue")
# claim_mainsnak_datavalue_type = (
# claim_mainsnak_datavalue.get("type")
# )
claim_mainsnak_datavalue_value = claim_mainsnak_datavalue.get("value")
claim_mainsnak_datavalue_value_id = claim_mainsnak_datavalue_value.get(
"id"
)
claim_mainsnak_datavalue_value_entity_type = (
claim_mainsnak_datavalue_value
).get("entity-type")
if claim_mainsnak_datavalue_value_id is not None and (
claim_mainsnak_datavalue_value_entity_type == "item"
):
# print(claim_mainsnak_datavalue_value_id)
# relation の 作成
relation_list.append(
[
relation_p,
claim_mainsnak_datavalue_value_id,
"direct",
]
)
# print(relation_list[-1])
# print(claim_mainsnak_datavalue)
# print(
# # claim_mainsnak_datavalue_type,
# claim_mainsnak_datavalue_value
# )
except Exception as e:
# print(e)
pass
pass
pass
return relation_list
def get_claims_references(claim_references_list):
relation_list = []
for claim_reference in claim_references_list:
# print(claim_reference)
# try:
claim_reference_snaks = claim_reference.get("snaks")
for relation_p in claim_reference_snaks.keys():
# print(relation_p)
# claim_reference_snak = claim_reference_snaks[relation_p]
# print(claim_reference_snak)
# print(len(claim_reference_snak))
for claim_reference_snak in claim_reference_snaks[relation_p]:
# for claim_reference_snak in claim_reference_snak_list:
# print(claim_reference_snak)
## a
claim_mainsnak_datatype = claim_reference_snak.get("datatype", "")
if claim_mainsnak_datatype == "wikibase-item":
# print(claim_mainsnak)
claim_mainsnak_datavalue = claim_reference_snak.get("datavalue")
# claim_mainsnak_datavalue_type = (
# claim_mainsnak_datavalue.get("type")
# )
claim_mainsnak_datavalue_value = claim_mainsnak_datavalue.get(
"value"
)
claim_mainsnak_datavalue_value_id = (
claim_mainsnak_datavalue_value.get("id")
)
claim_mainsnak_datavalue_value_entity_type = (
claim_mainsnak_datavalue_value
).get("entity-type")
if claim_mainsnak_datavalue_value_id is not None and (
claim_mainsnak_datavalue_value_entity_type == "item"
):
# print(claim_mainsnak_datavalue_value_id)
# relation の 作成
relation_list.append(
[
relation_p,
claim_mainsnak_datavalue_value_id,
"reference",
]
)
# print(relation_list[-1])
# print(claim_mainsnak_datavalue)
# print(
# # claim_mainsnak_datavalue_type,
# claim_mainsnak_datavalue_value
# )
pass
pass
pass
pass
# except Exception as e:
# print(e)
# pass
# pass
## a
return relation_list
def get_claims_qualifiers(claim_qualifiers_dict):
relation_list = []
for relation_p in claim_qualifiers_dict.keys():
for claim_qualifier_snak in claim_qualifiers_dict[relation_p]:
# for claim_reference_snak in claim_reference_snak_list:
# print(claim_reference_snak)
## a
claim_mainsnak_datatype = claim_qualifier_snak.get("datatype", "")
if claim_mainsnak_datatype == "wikibase-item":
try:
# print(claim_mainsnak)
claim_mainsnak_datavalue = claim_qualifier_snak.get("datavalue")
# claim_mainsnak_datavalue_type = (
# claim_mainsnak_datavalue.get("type")
# )
claim_mainsnak_datavalue_value = claim_mainsnak_datavalue.get(
"value"
)
claim_mainsnak_datavalue_value_id = (
claim_mainsnak_datavalue_value.get("id")
)
claim_mainsnak_datavalue_value_entity_type = (
claim_mainsnak_datavalue_value
).get("entity-type")
if claim_mainsnak_datavalue_value_id is not None and (
claim_mainsnak_datavalue_value_entity_type == "item"
):
# print(claim_mainsnak_datavalue_value_id)
# relation の 作成
relation_list.append(
[
relation_p,
claim_mainsnak_datavalue_value_id,
"qualifiers",
]
)
# print(relation_list[-1])
# print(claim_mainsnak_datavalue)
# print(
# # claim_mainsnak_datavalue_type,
# claim_mainsnak_datavalue_value
# )
except Exception as e:
# print(e)
pass
pass
pass
pass
return relation_list
def get_claims(claims):
relation_list = []
for relation_p in claims.keys():
# print(relation_p)
for claim in claims[relation_p]:
relation_list.extend(get_claims_mainsnak(relation_p, claim))
relation_list.extend(get_claims_references(claim.get("references", [])))
relation_list.extend(get_claims_qualifiers(claim.get("qualifiers", {})))
# qualifiers
pass
pass
return relation_list
file_path = "./latest-all.json.bz2"
wf_file_path = "./tmp.csv"
wf = open(wf_file_path, "w", encoding="utf-8")
wf.write("q_id_start,relation,q_id_end,relation_type_rank" + "\n")
q_id_wf = open("./q_id/q_id.txt", "w", encoding="utf-8")
q_id_wf.write("base_q_id,label_en,label_ja,descriptions_en,descriptions_ja" + "\n")
# rdf_wf = open("./rdf/rdf.txt", "w", encoding="utf-8")
# relation_list = []
with bz2.BZ2File(file_path, "r") as rf:
next(rf) # 1行目を飛ばす
for i, line in enumerate(rf, 1):
try:
line = json.loads(line[:-2])
except json.decoder.JSONDecodeError as e:
print(e)
# print(i)
# triples = []
# qs = []
continue
# print(line)
# print(i, "*" * 100)
# if i >= 5 * 1e2:
# break
# ############################################################
# ############################################################
# ############################################################
# relation_list = get_claims(claims)
# relation_list.extend(get_claims(claims))
try:
key = "id"
base_q_id = line[key]
label_en = line["labels"]["en"]["value"]
label_ja = line["labels"]["ja"]["value"]
descriptions_en = line["descriptions"]["en"]["value"]
descriptions_ja = line["descriptions"]["ja"]["value"]
claims = line["claims"]
# claims_relation_list = []
q_id_wf.write(
'"'
+ '","'.join(
[base_q_id, label_en, label_ja, descriptions_en, descriptions_ja]
)
+ '"'
+ "\n"
)
claims_relation_list = get_claims(claims)
# save_claims_relation_list = []
for row in claims_relation_list:
_save_claims_relation_list = []
_save_claims_relation_list.append(base_q_id)
_save_claims_relation_list.extend(row)
# save_claims_relation_list.append(_save_claims_relation_list)
wf.write(",".join(_save_claims_relation_list) + "\n")
del _save_claims_relation_list
# relation_list.extend(save_claims_relation_list)
del claims_relation_list
except Exception as e:
pass
# print(relation_list)
# print(len(relation_list))
# if i >= 2:
# break
q_id_wf.close()
# rdf_wf.close()
wf.close()
# 923 / 60 hours
# 15.xxx hours
- プログラム2(作成したデータ内で重複がある場合、インポート時にエラーとなり、また、データ構造として重複は不要なため今回は削除)
- sqlite3を利用しライトに重複を削除した後、再度csv化する
sqlite3に作成したcsvをインポートする
sqlite3のコマンドでデータを作成する
以下はslite3のコマンド
// 作成したデータをインポートする
.mode csv
.import tmp.csv rdf
sqlite3に作成したDBからcsvとしてエクスポートする
// 上記でインポートしたデータをrdf_n.csvに出力する
// distinctで重複を削除する
.headers on
.mode csv
.output rdf_n.csv
SELECT distinct "q_id_start", relation, "q_id_end", "relation_type_rank"
FROM rdf;
.quit
- プログラム3(最終的なデータがcsv化されたので、このデータを基準にGraphDBに登録できる形(Node, Relation)に再作成する)
### q_id を neo4j にインポートするためのデータに変換する
cnt = 0
q_wf = open(f"./q_id/q_id_neo4j_{str(cnt).zfill(2)}.csv", mode="w", encoding="utf8")
q_wf.write(
"entityId:ID,label_en,descriptions_en,label_ja,descriptions_ja,:LABEL" + "\n"
)
with open("./q_id/q_id.txt", "r", encoding="utf8") as f:
for i, line in enumerate(f):
if i == 0:
continue
line = line.strip()
s_line = line.split(",")
q_id = s_line[0].replace('"', "")
write_txt = q_id + "," + q_id + ",Entity"
entityId = s_line[0]
label_en = s_line[1]
label_ja = s_line[2]
descriptions_en = s_line[3]
descriptions_ja = s_line[4]
s_line2 = [label_en, descriptions_en, label_ja, descriptions_ja, "Entity"]
write_txt = (
q_id
+ ',"'
+ '","'.join([x.replace('"', "").replace('"', "") for x in s_line2])
+ '"'
)
q_wf.write(write_txt + "\n")
q_wf.close()
### q_id で作成した情報で、ノード一覧を取得し、relation作成後に存在しないノードを確認できるようにする
cnt = 0
node_list = set()
with open(f"./q_id/q_id_neo4j_{str(cnt).zfill(2)}.csv", "r", encoding="utf8") as f:
for i, line in enumerate(f):
line = line.strip()
# print(line)
if i == 0:
continue
# if i > 10:
# break
node_str = line.split(",")[0]
if node_str not in node_list:
node_list.add(node_str)
### relation を neo4j にインポートするためのデータに変換する
### relation に紐づくノードが存在しない場合は、ノードを保存する
none_node_list = set()
rdf_wf = open(f"./rdf_n2.csv", mode="w", encoding="utf8")
rdf_wf.write(":start_id,rank,:end_id,:type" + "\n")
# print(":start_id,role,:end_id,:type")
with open("./rdf_n.csv", mode="r", encoding="utf8") as f:
for i, line in enumerate(f):
line = line.strip()
# print(line)
if i == 0:
continue
s_line = line.split(",")
from_node = s_line[0]
relation = s_line[1]
to_node = s_line[2]
rank = s_line[3]
# rdf_wf.write(from_node + "," + relation + "," + to_node + ",n," + rank + "\n")
rdf_wf.write(from_node + "," + rank + "," + to_node + "," + relation + "\n")
# if i > 10:
# break
if from_node not in node_list:
none_node_list.add(from_node)
if to_node not in node_list:
none_node_list.add(to_node)
rdf_wf.close()
print(i)
print(len(none_node_list))
# 足りないノードを保存する
cnt = 0
q_wf = open(
f"./q_id/q_id_neo4j_{str(cnt).zfill(2)}_none.csv", mode="w", encoding="utf8"
)
# q_wf = open("./q_id/q_id_neo4j.csv", mode="w", encoding="sjis")
"""entityId:ID,name,:LABEL
Q31,Q31,Entity"""
q_wf.write("entityId:ID,name,:LABEL" + "\n")
for x in none_node_list:
# print(x)
q_wf.write(",".join([x, x, "Entity"]) + "\n")
# break
q_wf.close()
- neo4j への導入方法
- neo4jへPythonでデータを作成する
- 参考URL
- DBへはSQLではなくCypherという言語になるが、SQL+Graph特有の記述なのでRDBMを触っている人はちょっと触れば簡単になるかと思います。
-
データの登録速度: 激遅(途中で止めたが、1年程度かかる計算をした気がする)
- データ量が少ない場合は良いが多くなる想定であれば辞めた方が良い
- neo4j Browserでデータを作成する
- 参考URL
- CSVデータ全体に対して、また、1行ずつに対して登録処理はできる
- データ量に依存して重くなるため、neo4j2Pythonと同様の感想
- neo4j-admin でデータを作成する
- 参考URL
- 大規模なデータを登録する場合(今回のwikiデータも含む)はこの方法でないと終わらない
- デメリット
- importのフォルダ構造が普通のツールと異なる
- 毎度Serverの再起動が必要
- どこでエラーとなっているかの切り分けが難しい(Error文は出るがJavaのエラーとして出力される)
- メリット
- 超速(10分-20分程度)
- neo4jへPythonでデータを作成する
- neo4j-admin コマンド(イメージ)
bin\neo4j-admin database import full --skip-bad-entries-logging --skip-duplicate-nodes --nodes=import\q_id_neo4j_00.csv --nodes=import\q_id_neo4j_00_none.csv --relationships=import\rdf_n2.csv --skip-bad-relationships --overwrite-destination wiki-all --verbose
- とりあえず取り込みがしたかったので、構文の最適化は別途調整の必要有
実際に作業してみて
-
Neo4jが思っていた以上に曲者で、かつ、ChatGPTを利用しても良い回答が得られなかった.
-
結果として必要な作業内容は、デバッグをしながらエラーを取り除く、または、wikidataの構造理解、などで、AIが出てきても結局はプログラマーがやってきた原始的な作業は変わらないことを感じた
- 上記の原始的な作業の詳細を書くと、AGIによって人の作業が無くなったSF的な未来が訪れたら無職になると思っているので、それまでの間のことを表しています。
-
2024年に入りChatGPTが出てから約9ヵ月ほど汎用AIの民主化が起こっているが、GraphDBが使われた記事がなかなか出てきていないことから、企業内で知見としてはあるが出せないか、そもそも有用でないか、または、取り組みが進んでいないか、などが考えられます。
- 良いサイトがあればぜひ教えてください!
終わりに
- 去年から今年にかけて加速するAI時代、良い感じに触れながらEnjoyしていきましょう!