Guest User

Untitled

a guest
Aug 22nd, 2018
147
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 3.47 KB | None | 0 0
  1. import org.bson.Document
  2. import com.mongodb.spark.MongoSpark
  3. import com.mongodb.spark.config.WriteConfig
  4.  
  5. import org.apache.spark.rdd.RDD
  6.  
  7. /**
  8. * fake json data
  9. */
  10.  
  11. val recs: List[String] = List(
  12. """{"a": 123, "b": 456, "c": "apple"}""",
  13. """{"a": 345, "b": 72, "c": "banana"}""",
  14. """{"a": 456, "b": 754, "c": "cat"}""",
  15. """{"a": 876, "b": 43, "c": "donut"}""",
  16. """{"a": 432, "b": 234, "c": "existential"}"""
  17. )
  18.  
  19. val rdd_json_str: RDD[String] = sc.parallelize(recs, 5)
  20. val rdd_hex_bson: RDD[Document] = rdd_json_str.map(json_str => Document.parse(json_str))
  21.  
  22. // credentials
  23. val user = ???
  24. val pwd = ???
  25.  
  26. // fixed values
  27. val db = "db_name"
  28. val replset = "replset_name"
  29. val collection_name = "collection_name"
  30.  
  31. val host = "url1:27017,url2:27017,url3:27017"
  32. val host = "ip_address1:27017,ip_address2:27017,ip_address3:27017"
  33.  
  34. val uri = s"mongodb://${user}:${pwd}@${host}/${db}?replicaSet=${replset}"
  35. val uri = s"mongodb://${user}:${pwd}@${host}/?replicaSet=${replset}"
  36. val uri = s"mongodb://${user}:${pwd}@${replset}/${host}/${db}"
  37. val uri = s"mongodb://${user}:${pwd}@${replset}/${host}/${db}.${collection_name}"
  38. val uri = s"mongodb://${user}:${pwd}@${host}" // setting db, collection, replica set in WriteConfig
  39. val uri = s"mongodb://${user}:${pwd}@${host}/${db}" // this works IF HOST IS PRIMARY ONLY; not for hosts as defined above
  40.  
  41. com.mongodb.MongoTimeoutException: Timed out after 30000 ms while waiting
  42. for a server that matches WritableServerSelector. Client view of cluster
  43. state is {type=REPLICA_SET, servers=[{address=machine.unix.domain.org:27017,
  44. type=UNKNOWN, state=CONNECTING, exception={com.mongodb.MongoSocketException:
  45. machine.unix.domain.org}, caused by {java.net.UnknownHostException:
  46. machine.unix.domain.org}}, {address=machine.unix.domain.org:27017,
  47. type=UNKNOWN, state=CONNECTING, exception={com.mongodb.MongoSocketException:
  48. machine.unix.domain.org}, caused by {java.net.UnknownHostException:
  49. machine.unix.domain.org}}, {address=machine.unix.domain.org:27017,
  50. type=UNKNOWN, state=CONNECTING, exception={com.mongodb.MongoSocketException:
  51. machine.unix.domain.org}, caused by {java.net.UnknownHostException:
  52. machine.unix.domain.org}}]
  53.  
  54. com.mongodb.MongoTimeoutException: Timed out after 30000 ms while waiting
  55. for a server that matches WritableServerSelector. Client view of cluster
  56. state is {type=REPLICA_SET, servers=[{address=xx.xx.xx.xx:27017,
  57. type=UNKNOWN, state=CONNECTING, exception=
  58. {com.mongodb.MongoSecurityException: Exception authenticating
  59. MongoCredential{mechanism=null, userName='xx', source='admin', password=
  60. <hidden>, mechanismProperties={}}}, caused by
  61. {com.mongodb.MongoCommandException: Command failed with error 18:
  62. 'Authentication failed.' on server xx.xx.xx.xx:27017. The full response is {
  63. "ok" : 0.0, "errmsg" : "Authentication failed.", "code" : 18, "codeName" :
  64. "AuthenticationFailed", "operationTime" : { "$timestamp" : { "t" :
  65. 1534459121, "i" : 1 } }, "$clusterTime" : { "clusterTime" : { "$timestamp" :
  66. { "t" : 1534459121, "i" : 1 } }, "signature" : { "hash" : { "$binary" :
  67. "xxx=", "$type" : "0" }, "keyId" : { "$numberLong" : "123456" } } } }}}...
  68.  
  69. val host = s"${primary_ip_address}:27017" // primary only
  70. val uri = s"mongodb://${user}:${pwd}@${host}/${db}"
  71.  
  72. val writeConfig: WriteConfig =
  73. WriteConfig(Map(
  74. "uri" -> uri,
  75. "database" -> db,
  76. "collection" -> collection_name,
  77. "replicaSet" -> replset))
  78.  
  79. // write data to mongo
  80. MongoSpark.save(rdd_hex_bson, writeConfig)
Add Comment
Please, Sign In to add comment