Created
June 4, 2022 16:35
-
-
Save jalakoo/8aff2c61f283b73214c2d56b4205a826 to your computer and use it in GitHub Desktop.
neo4j python utility class
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| from neo4j import GraphDatabase, unit_of_work, Transaction | |
| class Neo4jConnection: | |
| def __init__(self, uri, user, pwd): | |
| """ | |
| Initialize a driver and test for connectivity. | |
| Args: | |
| uri (str): URI for the Neo4j instance to access | |
| user (str): Auth username | |
| pwd (str): Auth password | |
| Returns: | |
| An instance of Neo4jConnection. | |
| Raises: | |
| Can raise an exception if the connectivity check fails. | |
| """ | |
| self.__driver = GraphDatabase.driver( | |
| uri, auth=(user, pwd)) | |
| try: | |
| self.__driver.verify_connectivity() | |
| except Exception as e: | |
| # Close the driver on failed connectivity to free up resources | |
| self.close() | |
| raise e | |
| def close(self): | |
| if self.__driver is not None: | |
| self.__driver.close() | |
| def read(self, database: str, query: str, **kwargs) -> list: | |
| """ | |
| Run a Neo4j read transaction. | |
| Args: | |
| database (str): The database to use. | |
| query (str): The Cypher query to run. | |
| kwargs: Optionally any named variables to use in the query. | |
| Returns: | |
| list: A list of results. | |
| Raises: | |
| Can raise an exception if the transaction or session encounters an error. | |
| """ | |
| assert self.__driver is not None, "Driver not initialized!" | |
| @unit_of_work(timeout=60) | |
| def transaction(tx: Transaction, query: str, **vars): | |
| tx_result = tx.run(query, **vars) | |
| return list(tx_result) | |
| with self.__driver.session(database=database) as session: | |
| result = session.read_transaction(transaction, query, **kwargs) | |
| return result | |
| def write(self, database:str, query, **kwargs) -> list: | |
| """ | |
| Run a Neo4j read transaction. | |
| Args: | |
| database (str): The database to use. | |
| query (str): The Cypher query to run. | |
| kwargs: Optionally any named variables to use in the query. | |
| Returns: | |
| list: A list of results if a RETURN statement included in cypher query. | |
| Raises: | |
| Can raise an exception if the transaction or session encounters an error. | |
| """ | |
| assert self.__driver is not None, "Driver not initialized!" | |
| @unit_of_work(timeout=30) | |
| def transaction(tx: Transaction, query: str, **vars): | |
| tx_result = tx.run(query, **vars) | |
| return list(tx_result) | |
| with self.__driver.session(database = database) as session: | |
| result = session.write_transaction(transaction, query, **kwargs) | |
| return result |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment