Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- #Step 1
- %python
- #write a file to DBFS using Python I/O APIs
- with open("/dbfs/tmp/neo4_test.csv", 'w') as f:
- f.write("id,name,emp_id,employer\n")
- for x in range(500):
- f.write(str(x) + ",name_" + str(x) + "," + str(x) + ",emp_name_" + str(x) + "\n")
- f.close()
- #Step 2 Load to Dataframe
- %scala
- val sampleNeo4jCSV = spark.read.format("csv")
- .option("sep", ",")
- .option("inferSchema", "true")
- .option("header", "true")
- .load("/tmp/neo4_test.csv")
- sampleNeo4jCSV.createOrReplaceTempView("personList")
- #Step 3 - Run Cypher queries
- %scala
- import org.neo4j.spark._
- import org.graphframes._
- import org.neo4j.spark.Neo4jDataFrame
- import scala.collection.JavaConverters._
- val config = Neo4jConfig(sc.getConf)
- val neo = Neo4j(sc)
- var queries: Array[String]=Array(
- "MATCH (a) DETACH DELETE a RETURN count(a);",
- "CREATE CONSTRAINT ON (p:Person) ASSERT p.id IS UNIQUE;",
- "CREATE INDEX ON :Person(name);",
- "CREATE INDEX ON :Employer(emp_id);",
- "CREATE INDEX ON :Device(id);"
- )
- for (q <- queries) {
- Neo4jDataFrame.execute(config,q,Map().asJava);
- }
- # Step 4 - Create Nodes
- %scala
- import org.neo4j.spark._
- val neo = Neo4j(sc)
- val my_dataframe = spark.sql("""select id, name from personList""")
- Neo4jDataFrame.createNodes(sc, my_dataframe, ("Person",Seq("id", "name")))
- # Step 5 - Create Nodes
- %scala
- import org.neo4j.spark._
- val neo = Neo4j(sc)
- val my_dataframe = spark.sql("""select emp_id,employer from personList""")
- Neo4jDataFrame.createNodes(sc, my_dataframe, ("Employer",Seq("emp_id", "employer")))
- # Step 6 - Create Relationships
- %scala
- import org.neo4j.spark._
- val neo = Neo4j(sc)
- val my_dataframe = spark.sql("""select emp_id,id from personList""")
- Neo4jDataFrame.mergeEdgeList(sc, my_dataframe, ("Employer",Seq("emp_id")),("HAS_EMPLOYEE",Seq()),("Person",Seq("id")))
- # Step 7 - Load from JSON
- %scala
- import spark.implicits._
- import org.neo4j.spark._
- // A JSON dataset is pointed to by path.
- // The path can be either a single text file or a directory storing text files
- val path = "/databricks-datasets/iot/iot_devices.json"
- val peopleDF = spark.read.json(path)
- // The inferred schema can be visualized using the printSchema() method
- peopleDF.printSchema()
- peopleDF.createOrReplaceTempView("people")
- // SQL statements can be run by using the sql methods provided by spark
- val teenagerNamesDF = spark.sql("SELECT ip,device_id, device_name FROM people")
- val neo = Neo4j(sc)
- Neo4jDataFrame.createNodes(sc, teenagerNamesDF, ("Device",Seq("ip", "device_id","device_name")))
- teenagerNamesDF.show()
- # Other info - List sample files
- %python
- dbutils.fs.ls("/databricks-datasets/")
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement