Skip to content

Instantly share code, notes, and snippets.

@davidfauth
Last active September 25, 2019 15:36
Show Gist options
  • Save davidfauth/96b3957ccb3b6681f8b2e4754f1b3e41 to your computer and use it in GitHub Desktop.
Save davidfauth/96b3957ccb3b6681f8b2e4754f1b3e41 to your computer and use it in GitHub Desktop.
Spark to Neo4j example
#Step 1
%python
#write a file to DBFS using Python I/O APIs
with open("/dbfs/tmp/neo4_test.csv", 'w') as f:
f.write("id,name,emp_id,employer\n")
for x in range(500):
f.write(str(x) + ",name_" + str(x) + "," + str(x) + ",emp_name_" + str(x) + "\n")
f.close()
#Step 2 Load to Dataframe
%scala
val sampleNeo4jCSV = spark.read.format("csv")
.option("sep", ",")
.option("inferSchema", "true")
.option("header", "true")
.load("/tmp/neo4_test.csv")
sampleNeo4jCSV.createOrReplaceTempView("personList")
#Step 3 - Run Cypher queries
%scala
import org.neo4j.spark._
import org.graphframes._
import org.neo4j.spark.Neo4jDataFrame
import scala.collection.JavaConverters._
val config = Neo4jConfig(sc.getConf)
val neo = Neo4j(sc)
var queries: Array[String]=Array(
"MATCH (a) DETACH DELETE a RETURN count(a);",
"CREATE CONSTRAINT ON (p:Person) ASSERT p.id IS UNIQUE;",
"CREATE INDEX ON :Person(name);",
"CREATE INDEX ON :Employer(emp_id);",
"CREATE INDEX ON :Device(id);"
)
for (q <- queries) {
Neo4jDataFrame.execute(config,q,Map().asJava);
}
# Step 4 - Create Nodes
%scala
import org.neo4j.spark._
val neo = Neo4j(sc)
val my_dataframe = spark.sql("""select id, name from personList""")
Neo4jDataFrame.createNodes(sc, my_dataframe, ("Person",Seq("id", "name")))
# Step 5 - Create Nodes
%scala
import org.neo4j.spark._
val neo = Neo4j(sc)
val my_dataframe = spark.sql("""select emp_id,employer from personList""")
Neo4jDataFrame.createNodes(sc, my_dataframe, ("Employer",Seq("emp_id", "employer")))
# Step 6 - Create Relationships
%scala
import org.neo4j.spark._
val neo = Neo4j(sc)
val my_dataframe = spark.sql("""select emp_id,id from personList""")
Neo4jDataFrame.mergeEdgeList(sc, my_dataframe, ("Employer",Seq("emp_id")),("HAS_EMPLOYEE",Seq()),("Person",Seq("id")))
# Step 7 - Load from JSON
%scala
import spark.implicits._
import org.neo4j.spark._
// A JSON dataset is pointed to by path.
// The path can be either a single text file or a directory storing text files
val path = "/databricks-datasets/iot/iot_devices.json"
val peopleDF = spark.read.json(path)
// The inferred schema can be visualized using the printSchema() method
peopleDF.printSchema()
peopleDF.createOrReplaceTempView("people")
// SQL statements can be run by using the sql methods provided by spark
val teenagerNamesDF = spark.sql("SELECT ip,device_id, device_name FROM people")
val neo = Neo4j(sc)
Neo4jDataFrame.createNodes(sc, teenagerNamesDF, ("Device",Seq("ip", "device_id","device_name")))
teenagerNamesDF.show()
# Other info - List sample files
%python
dbutils.fs.ls("/databricks-datasets/")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment