Last active
September 25, 2019 15:36
-
-
Save davidfauth/96b3957ccb3b6681f8b2e4754f1b3e41 to your computer and use it in GitHub Desktop.
Spark to Neo4j example
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#Step 1 | |
%python | |
#write a file to DBFS using Python I/O APIs | |
with open("/dbfs/tmp/neo4_test.csv", 'w') as f: | |
f.write("id,name,emp_id,employer\n") | |
for x in range(500): | |
f.write(str(x) + ",name_" + str(x) + "," + str(x) + ",emp_name_" + str(x) + "\n") | |
f.close() | |
#Step 2 Load to Dataframe | |
%scala | |
val sampleNeo4jCSV = spark.read.format("csv") | |
.option("sep", ",") | |
.option("inferSchema", "true") | |
.option("header", "true") | |
.load("/tmp/neo4_test.csv") | |
sampleNeo4jCSV.createOrReplaceTempView("personList") | |
#Step 3 - Run Cypher queries | |
%scala | |
import org.neo4j.spark._ | |
import org.graphframes._ | |
import org.neo4j.spark.Neo4jDataFrame | |
import scala.collection.JavaConverters._ | |
val config = Neo4jConfig(sc.getConf) | |
val neo = Neo4j(sc) | |
var queries: Array[String]=Array( | |
"MATCH (a) DETACH DELETE a RETURN count(a);", | |
"CREATE CONSTRAINT ON (p:Person) ASSERT p.id IS UNIQUE;", | |
"CREATE INDEX ON :Person(name);", | |
"CREATE INDEX ON :Employer(emp_id);", | |
"CREATE INDEX ON :Device(id);" | |
) | |
for (q <- queries) { | |
Neo4jDataFrame.execute(config,q,Map().asJava); | |
} | |
# Step 4 - Create Nodes | |
%scala | |
import org.neo4j.spark._ | |
val neo = Neo4j(sc) | |
val my_dataframe = spark.sql("""select id, name from personList""") | |
Neo4jDataFrame.createNodes(sc, my_dataframe, ("Person",Seq("id", "name"))) | |
# Step 5 - Create Nodes | |
%scala | |
import org.neo4j.spark._ | |
val neo = Neo4j(sc) | |
val my_dataframe = spark.sql("""select emp_id,employer from personList""") | |
Neo4jDataFrame.createNodes(sc, my_dataframe, ("Employer",Seq("emp_id", "employer"))) | |
# Step 6 - Create Relationships | |
%scala | |
import org.neo4j.spark._ | |
val neo = Neo4j(sc) | |
val my_dataframe = spark.sql("""select emp_id,id from personList""") | |
Neo4jDataFrame.mergeEdgeList(sc, my_dataframe, ("Employer",Seq("emp_id")),("HAS_EMPLOYEE",Seq()),("Person",Seq("id"))) | |
# Step 7 - Load from JSON | |
%scala | |
import spark.implicits._ | |
import org.neo4j.spark._ | |
// A JSON dataset is pointed to by path. | |
// The path can be either a single text file or a directory storing text files | |
val path = "/databricks-datasets/iot/iot_devices.json" | |
val peopleDF = spark.read.json(path) | |
// The inferred schema can be visualized using the printSchema() method | |
peopleDF.printSchema() | |
peopleDF.createOrReplaceTempView("people") | |
// SQL statements can be run by using the sql methods provided by spark | |
val teenagerNamesDF = spark.sql("SELECT ip,device_id, device_name FROM people") | |
val neo = Neo4j(sc) | |
Neo4jDataFrame.createNodes(sc, teenagerNamesDF, ("Device",Seq("ip", "device_id","device_name"))) | |
teenagerNamesDF.show() | |
# Other info - List sample files | |
%python | |
dbutils.fs.ls("/databricks-datasets/") | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment