Skip to content

Instantly share code, notes, and snippets.

@jalakoo
Created June 4, 2022 16:35
Show Gist options
  • Select an option

  • Save jalakoo/8aff2c61f283b73214c2d56b4205a826 to your computer and use it in GitHub Desktop.

Select an option

Save jalakoo/8aff2c61f283b73214c2d56b4205a826 to your computer and use it in GitHub Desktop.
neo4j python utility class
from neo4j import GraphDatabase, unit_of_work, Transaction
class Neo4jConnection:
def __init__(self, uri, user, pwd):
"""
Initialize a driver and test for connectivity.
Args:
uri (str): URI for the Neo4j instance to access
user (str): Auth username
pwd (str): Auth password
Returns:
An instance of Neo4jConnection.
Raises:
Can raise an exception if the connectivity check fails.
"""
self.__driver = GraphDatabase.driver(
uri, auth=(user, pwd))
try:
self.__driver.verify_connectivity()
except Exception as e:
# Close the driver on failed connectivity to free up resources
self.close()
raise e
def close(self):
if self.__driver is not None:
self.__driver.close()
def read(self, database: str, query: str, **kwargs) -> list:
"""
Run a Neo4j read transaction.
Args:
database (str): The database to use.
query (str): The Cypher query to run.
kwargs: Optionally any named variables to use in the query.
Returns:
list: A list of results.
Raises:
Can raise an exception if the transaction or session encounters an error.
"""
assert self.__driver is not None, "Driver not initialized!"
@unit_of_work(timeout=60)
def transaction(tx: Transaction, query: str, **vars):
tx_result = tx.run(query, **vars)
return list(tx_result)
with self.__driver.session(database=database) as session:
result = session.read_transaction(transaction, query, **kwargs)
return result
def write(self, database:str, query, **kwargs) -> list:
"""
Run a Neo4j read transaction.
Args:
database (str): The database to use.
query (str): The Cypher query to run.
kwargs: Optionally any named variables to use in the query.
Returns:
list: A list of results if a RETURN statement included in cypher query.
Raises:
Can raise an exception if the transaction or session encounters an error.
"""
assert self.__driver is not None, "Driver not initialized!"
@unit_of_work(timeout=30)
def transaction(tx: Transaction, query: str, **vars):
tx_result = tx.run(query, **vars)
return list(tx_result)
with self.__driver.session(database = database) as session:
result = session.write_transaction(transaction, query, **kwargs)
return result
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment