2
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

Neo4j Python Driver 非同期処理

Last updated at Posted at 2024-02-24

目次

0_前置き
1_基本の使用方法
2_AsyncSessionの注意点
3_トランザクションクエリ
4_まとめ

0_前置き

Neo4j Python Driverのver.5.0で非同期処理ドライバーが導入されました。
これにより、クエリ実行の効率化が図れます。
対応に苦慮したので、参考として記事に残しておきます。
記事の内容を試す際は、少なくともver.5.8以降にしてください。

1_基本の使用方法

Neo4j AsyncDriver

アプリケーションのトップレベルでDriverを呼び出します。

from neo4j import AsyncGraphDatabase

class Application:
    def __init__(self, uri, user, password)
        self.driver = AsyncGraphDatabase.driver(uri, auth=(user, password))

    async def close(self):
        await self.driver.close()

クエリ実行 (async with driver.session)

async with driver.session というように、同期処理の構文にasyncを付けてセッションを開始し、session.runでクエリを実行します。
with文の中で実行すると、明示的にsession.close()でsessionを閉じる必要がありません。

async with self.driver.session() as session:
    result = await session.run(query, params)

    nodes = []
    async for record in result: # 結果が複数の場合
        nodes.append(record)
    record = await result.single() # 結果が単独の場合

session.runの返り値は、非同期処理により、セッション内で次々と結果を受け取って処理することができます。そのため、大量のデータを扱うのに向いています。

クエリ実行 (driver.execute_query)

セッション内で単一のクエリを実行し、結果を非同期的に取得する必要がない場合は、driver.execute_queryが使用できます。

これだけでセッションの開始と終了を含めてクエリを実行してくれるので、単純なクエリはこれで十分です。

result = await driver.execute_query(query, params)
record = result.records

driver.execute_queryでは、以下の型に結果を格納して返します。この時点でセッションを抜けているため、同期処理と同様にデータを取り扱うことができます。
データ量が少ないクエリであればこれで十分です。

class EagerResult
    records: t.List[Record]      # クエリの返り値のリスト
    summary: ResultSummary       # クエリのサマリ(.query: クエリ、.parameters: パラメータ等)
    keys: t.List[str]            # 返り値のキーのリスト

2_AsyncSessionの注意点

AsyncSessionでは、単一セッションの中で複数のトランザクションが "同時に" 実行されると、エラーが発生します。

# 失敗例

async def single_query(tx: Transaction, name: str):   # トランザクションクエリ実行関数
    query = "MERGE (a:Person { name: $name }) RETURN a"
    await tx.run(query, name=name) 


async def multiple_query(names: list[str]):
    async with self.driver.session() as session:

        # 単一セッション内でトランザクションクエリを同時に実行
        tasks = [session.execute_write(self.asingle_query, name) for name in names]
        result = await asyncio.gather(*tasks)
# エラーメッセージ
"RuntimeError: read() called while another coroutine is already waiting for incoming data"

以下のイシューで取り上げられていました。
セッションの接続コストは高くないので、非同期処理で大量のクエリを捌く場合は、セッションの作成を含む関数を作って、非同期タスクを作るのが良いそうです。

# 成功例

async def single_query(name: str):                    # セッションの作成を含む関数
    async with self.driver.session() as session:
        query = "MERGE (a:Person { name: $name }) RETURN a"
        result = await session.run(query, name=name)
        record = await result.single()
        return record

async def multiple_query(names: list[str]):
    tasks = [self.single_query(name) for name in names]
    result = await asyncio.gather(*tasks)

このように、async with driver.sessionを含む関数を同時に実行すると、問題なく実行できました。

3_トランザクションクエリ

もちろん、ちゃんと使えば複数のトランザクションをセッション内で実行することができ、例外発生時にロールバックできます。
失敗例との違いは、awaitにより、セッション内で複数のトランザクションが同時に実行されることを防いでいることです。

async def multiple_query(name1: str, name2: str, name3: str):
    async with self.driver.session() as session:

        # トランザクションクエリの実行をawaitして順番に実行
        await session.execute_write(self.single_query, name1)
        await session.execute_write(self.single_query, name2)
        await session.execute_write(self.single_query, name3)

また、AsyncSessionでは、ドライバーが自動でトランザクションの管理を担当するため、こちらで明示的にロールバックを指示する必要はありません。

Managed Async Transactions

カスタムトランザクション

明示的にトランザクションを設定することも可能です。同期処理と同様の関数を使用します。

Explicit Async Transactions

tx = await session.begin_transaction()
try:
    await session.execute_read(get_node, name1)  
    await session.execute_read(get_node, name2)
    await session.execute_read(get_node, name3)

    await tx.commit()

except Exception as e:
    await tx.rollback()
    
finally:
    await tx.close()

4_まとめ

非同期処理ドライバーを使うことで、リアルタイムのクエリ処理がこれまでよりも効率よく処理できるようになりました。

非同期処理はたまたまかもしれませんが、2023年からのNeo4jのアップデートは一部LLM関連を意識しているように感じます。

  • Neo4jのラーニングコースでは、Neo4jを使ったLLMの構成に関するチュートリアルも公開されています。

  • 3/21には、Neo4jを用いたRAGに関するセミナーが開催されるようです。

また、非同期処理以外にも、クエリパフォーマンスを上げるための公式から推奨されるテクニックが以下にまとめられています。ご参考にしてください。

2
1
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
2
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?