Last active
January 29, 2021 02:05
-
-
Save nabilm/beb31e960c099d59297c0ef0cfbac55a to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
Since the async support in neo4j python driver is not clear ( released ) yet | |
i create a simple async class that wrap up neo4j write and read transaction functions | |
to run this class simple | |
import asyncio | |
loop = asyncio.get_event_loop() | |
config = {'user': 'neo4j' , 'password' : 'test' , 'uri' : 'bolt://localhost:7687'} | |
an = AsyncNeo4j(config=config, loop=loop) | |
and of course it require python3.7 and neo4j | |
now you should easily create queries with dynamic variables like | |
CREATE_NODE_QUERY = """ | |
CREATE (h:human { name: $name, | |
user_name: $user_name, | |
human_id: $human_id, | |
city: $city, | |
country: $country, | |
email: $email, | |
birhdate: date($birthdate) | |
}) | |
RETURN ID(h) as id | |
""" | |
as the functions accept kwargs now ! | |
""" | |
from concurrent import futures | |
import logging | |
from neo4j import GraphDatabase, READ_ACCESS, WRITE_ACCESS | |
from neo4j.exceptions import ServiceUnavailable | |
import time | |
import traceback | |
# How long to wait after each successive failure. | |
RETRY_WAITS = [0, 1, 4] | |
class AsyncNeo4j: | |
""" | |
Async neo4j connection/query wrapper | |
""" | |
def __init__(self, config: dict, loop: object): | |
""" | |
Create executer for running asyn calls | |
""" | |
self.config = config | |
self.loop = loop | |
self.executer = futures.ThreadPoolExecutor(max_workers=30) | |
# retry mechanism | |
for retry_wait in RETRY_WAITS: | |
try: | |
self.init_driver() | |
break | |
except Exception: | |
if retry_wait == RETRY_WAITS[-1]: | |
raise | |
else: | |
logging.error("WARNING: retrying to Init DB; err:") | |
traceback.print_exc() | |
time.sleep(retry_wait) | |
def init_driver(self): | |
""" | |
neo4j driver initializer | |
""" | |
self.driver = GraphDatabase.driver( | |
self.config["uri"], auth=(self.config["user"], self.config["password"]) | |
) | |
def create_transation(self, quey_function: object, query: str, mode=READ_ACCESS): | |
""" | |
Create neo4j session transaction | |
transactions allow the driver to handle retries and transient errors | |
""" | |
@staticmethod | |
def _run_query(tx, query: str, **kwargs): | |
""" | |
Run neo4j query | |
""" | |
result = tx.run(query, **kwargs) | |
try: | |
return [record for record in result] | |
# Capture any errors along with the query and data for traceability | |
except ServiceUnavailable as exception: | |
logging.error( | |
"{query} raised an error: \n {exception}".format( | |
query=query, exception=exception | |
) | |
) | |
raise | |
async def run_query(self, query: str, read: bool, **kwargs): | |
""" | |
Read query neo4j | |
""" | |
def run_transaction(): | |
""" | |
Run a read query | |
""" | |
with self.driver.session() as session: | |
# Write transactions allow the driver to handle | |
# retries and transient errors | |
if read: | |
result = session.read_transaction(self._run_query, query, **kwargs) | |
else: | |
result = session.write_transaction(self._run_query, query, **kwargs) | |
for record in result: | |
yield dict(record) | |
return await self.loop.run_in_executor(None, run_transaction) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment