Skip to content

Instantly share code, notes, and snippets.

@nabilm
Last active January 29, 2021 02:05
Show Gist options
  • Save nabilm/beb31e960c099d59297c0ef0cfbac55a to your computer and use it in GitHub Desktop.
Save nabilm/beb31e960c099d59297c0ef0cfbac55a to your computer and use it in GitHub Desktop.
"""
Since the async support in neo4j python driver is not clear ( released ) yet
i create a simple async class that wrap up neo4j write and read transaction functions
to run this class simple
import asyncio
loop = asyncio.get_event_loop()
config = {'user': 'neo4j' , 'password' : 'test' , 'uri' : 'bolt://localhost:7687'}
an = AsyncNeo4j(config=config, loop=loop)
and of course it require python3.7 and neo4j
now you should easily create queries with dynamic variables like
CREATE_NODE_QUERY = """
CREATE (h:human { name: $name,
user_name: $user_name,
human_id: $human_id,
city: $city,
country: $country,
email: $email,
birhdate: date($birthdate)
})
RETURN ID(h) as id
"""
as the functions accept kwargs now !
"""
from concurrent import futures
import logging
from neo4j import GraphDatabase, READ_ACCESS, WRITE_ACCESS
from neo4j.exceptions import ServiceUnavailable
import time
import traceback
# How long to wait after each successive failure.
RETRY_WAITS = [0, 1, 4]
class AsyncNeo4j:
"""
Async neo4j connection/query wrapper
"""
def __init__(self, config: dict, loop: object):
"""
Create executer for running asyn calls
"""
self.config = config
self.loop = loop
self.executer = futures.ThreadPoolExecutor(max_workers=30)
# retry mechanism
for retry_wait in RETRY_WAITS:
try:
self.init_driver()
break
except Exception:
if retry_wait == RETRY_WAITS[-1]:
raise
else:
logging.error("WARNING: retrying to Init DB; err:")
traceback.print_exc()
time.sleep(retry_wait)
def init_driver(self):
"""
neo4j driver initializer
"""
self.driver = GraphDatabase.driver(
self.config["uri"], auth=(self.config["user"], self.config["password"])
)
def create_transation(self, quey_function: object, query: str, mode=READ_ACCESS):
"""
Create neo4j session transaction
transactions allow the driver to handle retries and transient errors
"""
@staticmethod
def _run_query(tx, query: str, **kwargs):
"""
Run neo4j query
"""
result = tx.run(query, **kwargs)
try:
return [record for record in result]
# Capture any errors along with the query and data for traceability
except ServiceUnavailable as exception:
logging.error(
"{query} raised an error: \n {exception}".format(
query=query, exception=exception
)
)
raise
async def run_query(self, query: str, read: bool, **kwargs):
"""
Read query neo4j
"""
def run_transaction():
"""
Run a read query
"""
with self.driver.session() as session:
# Write transactions allow the driver to handle
# retries and transient errors
if read:
result = session.read_transaction(self._run_query, query, **kwargs)
else:
result = session.write_transaction(self._run_query, query, **kwargs)
for record in result:
yield dict(record)
return await self.loop.run_in_executor(None, run_transaction)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment