Advertisement
Guest User

Untitled

a guest
Oct 14th, 2019
111
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 2.55 KB | None | 0 0
  1. #Step 1
  2. %python
  3. #write a file to DBFS using Python I/O APIs
  4. with open("/dbfs/tmp/neo4_test.csv", 'w') as f:
  5. f.write("id,name,emp_id,employer\n")
  6. for x in range(500):
  7. f.write(str(x) + ",name_" + str(x) + "," + str(x) + ",emp_name_" + str(x) + "\n")
  8. f.close()
  9.  
  10. #Step 2 Load to Dataframe
  11.  
  12. %scala
  13. val sampleNeo4jCSV = spark.read.format("csv")
  14. .option("sep", ",")
  15. .option("inferSchema", "true")
  16. .option("header", "true")
  17. .load("/tmp/neo4_test.csv")
  18.  
  19. sampleNeo4jCSV.createOrReplaceTempView("personList")
  20.  
  21. #Step 3 - Run Cypher queries
  22. %scala
  23. import org.neo4j.spark._
  24. import org.graphframes._
  25. import org.neo4j.spark.Neo4jDataFrame
  26. import scala.collection.JavaConverters._
  27.  
  28. val config = Neo4jConfig(sc.getConf)
  29.  
  30. val neo = Neo4j(sc)
  31.  
  32. var queries: Array[String]=Array(
  33. "MATCH (a) DETACH DELETE a RETURN count(a);",
  34. "CREATE CONSTRAINT ON (p:Person) ASSERT p.id IS UNIQUE;",
  35. "CREATE INDEX ON :Person(name);",
  36. "CREATE INDEX ON :Employer(emp_id);",
  37. "CREATE INDEX ON :Device(id);"
  38. )
  39.  
  40. for (q <- queries) {
  41. Neo4jDataFrame.execute(config,q,Map().asJava);
  42. }
  43.  
  44. # Step 4 - Create Nodes
  45. %scala
  46. import org.neo4j.spark._
  47. val neo = Neo4j(sc)
  48. val my_dataframe = spark.sql("""select id, name from personList""")
  49. Neo4jDataFrame.createNodes(sc, my_dataframe, ("Person",Seq("id", "name")))
  50.  
  51. # Step 5 - Create Nodes
  52. %scala
  53. import org.neo4j.spark._
  54. val neo = Neo4j(sc)
  55. val my_dataframe = spark.sql("""select emp_id,employer from personList""")
  56. Neo4jDataFrame.createNodes(sc, my_dataframe, ("Employer",Seq("emp_id", "employer")))
  57.  
  58. # Step 6 - Create Relationships
  59. %scala
  60. import org.neo4j.spark._
  61. val neo = Neo4j(sc)
  62. val my_dataframe = spark.sql("""select emp_id,id from personList""")
  63. Neo4jDataFrame.mergeEdgeList(sc, my_dataframe, ("Employer",Seq("emp_id")),("HAS_EMPLOYEE",Seq()),("Person",Seq("id")))
  64.  
  65. # Step 7 - Load from JSON
  66. %scala
  67. import spark.implicits._
  68. import org.neo4j.spark._
  69.  
  70. // A JSON dataset is pointed to by path.
  71. // The path can be either a single text file or a directory storing text files
  72. val path = "/databricks-datasets/iot/iot_devices.json"
  73. val peopleDF = spark.read.json(path)
  74.  
  75. // The inferred schema can be visualized using the printSchema() method
  76. peopleDF.printSchema()
  77.  
  78. peopleDF.createOrReplaceTempView("people")
  79.  
  80. // SQL statements can be run by using the sql methods provided by spark
  81. val teenagerNamesDF = spark.sql("SELECT ip,device_id, device_name FROM people")
  82. val neo = Neo4j(sc)
  83. Neo4jDataFrame.createNodes(sc, teenagerNamesDF, ("Device",Seq("ip", "device_id","device_name")))
  84. teenagerNamesDF.show()
  85.  
  86. # Other info - List sample files
  87. %python
  88. dbutils.fs.ls("/databricks-datasets/")
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement