Created
February 25, 2022 11:47
-
-
Save jonatanblue/76446af03b3a426b00990fb0560b0186 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
from neo4j import GraphDatabase | |
import logging | |
import os | |
import time | |
import signal | |
from neo4j.exceptions import ServiceUnavailable | |
class App: | |
def __init__(self, uri, user, password): | |
self.driver = GraphDatabase.driver(uri, auth=(user, password)) | |
def close(self): | |
# Don't forget to close the driver connection when you are finished with it | |
self.driver.close() | |
def create_friendship(self, person1_name, person2_name): | |
with self.driver.session() as session: | |
# Write transactions allow the driver to handle retries and transient errors | |
result = session.write_transaction( | |
self._create_and_return_friendship, person1_name, person2_name) | |
for row in result: | |
print("Created friendship between: {p1}, {p2}".format(p1=row['p1'], p2=row['p2'])) | |
@staticmethod | |
def _create_and_return_friendship(tx, person1_name, person2_name): | |
# To learn more about the Cypher syntax, see https://neo4j.com/docs/cypher-manual/current/ | |
# The Reference Card is also a good resource for keywords https://neo4j.com/docs/cypher-refcard/current/ | |
query = ( | |
"CREATE (p1:Person { name: $person1_name }) " | |
"CREATE (p2:Person { name: $person2_name }) " | |
"CREATE (p1)-[:KNOWS]->(p2) " | |
"RETURN p1, p2" | |
) | |
result = tx.run(query, person1_name=person1_name, person2_name=person2_name) | |
try: | |
return [{"p1": row["p1"]["name"], "p2": row["p2"]["name"]} | |
for row in result] | |
# Capture any errors along with the query and data for traceability | |
except ServiceUnavailable as exception: | |
logging.error("{query} raised an error: \n {exception}".format( | |
query=query, exception=exception)) | |
raise | |
def find_person(self, person_name): | |
with self.driver.session() as session: | |
result = session.read_transaction(self._find_and_return_person, person_name) | |
print(f"Found {len(result)} Alices") | |
@staticmethod | |
def _find_and_return_person(tx, person_name): | |
query = ( | |
"MATCH (p:Person) " | |
"WHERE p.name = $person_name " | |
"RETURN p.name AS name" | |
) | |
result = tx.run(query, person_name=person_name) | |
return [row["name"] for row in result] | |
if __name__ == "__main__": | |
# Aura queries use an encrypted connection using the "neo4j+s" URI scheme | |
uri = os.environ["NEO4J_URL"] | |
user = os.environ["NEO4J_USERNAME"] | |
password = os.environ["NEO4J_PASSWORD"] | |
app = App(uri, user, password) | |
while True: | |
print("Writing...") | |
app.create_friendship("Alice", "David") | |
print("Wrote successfully. Reading...") | |
app.find_person("Alice") | |
print("Read successfully.") | |
time.sleep(1) | |
app.close() | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment