目次
0_前置き
1_基本の使用方法
2_AsyncSessionの注意点
3_トランザクションクエリ
4_まとめ
0_前置き
Neo4j Python Driverのver.5.0で非同期処理ドライバーが導入されました。
これにより、クエリ実行の効率化が図れます。
対応に苦慮したので、参考として記事に残しておきます。
記事の内容を試す際は、少なくともver.5.8以降にしてください。
1_基本の使用方法
Neo4j AsyncDriver
アプリケーションのトップレベルでDriverを呼び出します。
from neo4j import AsyncGraphDatabase
class Application:
def __init__(self, uri, user, password)
self.driver = AsyncGraphDatabase.driver(uri, auth=(user, password))
async def close(self):
await self.driver.close()
クエリ実行 (async with driver.session)
async with driver.session
というように、同期処理の構文にasync
を付けてセッションを開始し、session.run
でクエリを実行します。
with文の中で実行すると、明示的にsession.close()
でsessionを閉じる必要がありません。
async with self.driver.session() as session:
result = await session.run(query, params)
nodes = []
async for record in result: # 結果が複数の場合
nodes.append(record)
record = await result.single() # 結果が単独の場合
session.run
の返り値は、非同期処理により、セッション内で次々と結果を受け取って処理することができます。そのため、大量のデータを扱うのに向いています。
クエリ実行 (driver.execute_query)
セッション内で単一のクエリを実行し、結果を非同期的に取得する必要がない場合は、driver.execute_queryが使用できます。
これだけでセッションの開始と終了を含めてクエリを実行してくれるので、単純なクエリはこれで十分です。
result = await driver.execute_query(query, params)
record = result.records
driver.execute_queryでは、以下の型に結果を格納して返します。この時点でセッションを抜けているため、同期処理と同様にデータを取り扱うことができます。
データ量が少ないクエリであればこれで十分です。
class EagerResult
records: t.List[Record] # クエリの返り値のリスト
summary: ResultSummary # クエリのサマリ(.query: クエリ、.parameters: パラメータ等)
keys: t.List[str] # 返り値のキーのリスト
2_AsyncSessionの注意点
AsyncSessionでは、単一セッションの中で複数のトランザクションが "同時に" 実行されると、エラーが発生します。
# 失敗例
async def single_query(tx: Transaction, name: str): # トランザクションクエリ実行関数
query = "MERGE (a:Person { name: $name }) RETURN a"
await tx.run(query, name=name)
async def multiple_query(names: list[str]):
async with self.driver.session() as session:
# 単一セッション内でトランザクションクエリを同時に実行
tasks = [session.execute_write(self.asingle_query, name) for name in names]
result = await asyncio.gather(*tasks)
# エラーメッセージ
"RuntimeError: read() called while another coroutine is already waiting for incoming data"
以下のイシューで取り上げられていました。
セッションの接続コストは高くないので、非同期処理で大量のクエリを捌く場合は、セッションの作成を含む関数を作って、非同期タスクを作るのが良いそうです。
# 成功例
async def single_query(name: str): # セッションの作成を含む関数
async with self.driver.session() as session:
query = "MERGE (a:Person { name: $name }) RETURN a"
result = await session.run(query, name=name)
record = await result.single()
return record
async def multiple_query(names: list[str]):
tasks = [self.single_query(name) for name in names]
result = await asyncio.gather(*tasks)
このように、async with driver.session
を含む関数を同時に実行すると、問題なく実行できました。
3_トランザクションクエリ
もちろん、ちゃんと使えば複数のトランザクションをセッション内で実行することができ、例外発生時にロールバックできます。
失敗例との違いは、await
により、セッション内で複数のトランザクションが同時に実行されることを防いでいることです。
async def multiple_query(name1: str, name2: str, name3: str):
async with self.driver.session() as session:
# トランザクションクエリの実行をawaitして順番に実行
await session.execute_write(self.single_query, name1)
await session.execute_write(self.single_query, name2)
await session.execute_write(self.single_query, name3)
また、AsyncSession
では、ドライバーが自動でトランザクションの管理を担当するため、こちらで明示的にロールバックを指示する必要はありません。
カスタムトランザクション
明示的にトランザクションを設定することも可能です。同期処理と同様の関数を使用します。
tx = await session.begin_transaction()
try:
await session.execute_read(get_node, name1)
await session.execute_read(get_node, name2)
await session.execute_read(get_node, name3)
await tx.commit()
except Exception as e:
await tx.rollback()
finally:
await tx.close()
4_まとめ
非同期処理ドライバーを使うことで、リアルタイムのクエリ処理がこれまでよりも効率よく処理できるようになりました。
非同期処理はたまたまかもしれませんが、2023年からのNeo4jのアップデートは一部LLM関連を意識しているように感じます。
- Neo4jのラーニングコースでは、Neo4jを使ったLLMの構成に関するチュートリアルも公開されています。
- 3/21には、Neo4jを用いたRAGに関するセミナーが開催されるようです。
また、非同期処理以外にも、クエリパフォーマンスを上げるための公式から推奨されるテクニックが以下にまとめられています。ご参考にしてください。