Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import org.bson.Document
- import com.mongodb.spark.MongoSpark
- import com.mongodb.spark.config.WriteConfig
- import org.apache.spark.rdd.RDD
- /**
- * fake json data
- */
- val recs: List[String] = List(
- """{"a": 123, "b": 456, "c": "apple"}""",
- """{"a": 345, "b": 72, "c": "banana"}""",
- """{"a": 456, "b": 754, "c": "cat"}""",
- """{"a": 876, "b": 43, "c": "donut"}""",
- """{"a": 432, "b": 234, "c": "existential"}"""
- )
- val rdd_json_str: RDD[String] = sc.parallelize(recs, 5)
- val rdd_hex_bson: RDD[Document] = rdd_json_str.map(json_str => Document.parse(json_str))
- // credentials
- val user = ???
- val pwd = ???
- // fixed values
- val db = "db_name"
- val replset = "replset_name"
- val collection_name = "collection_name"
- val host = "url1:27017,url2:27017,url3:27017"
- val host = "ip_address1:27017,ip_address2:27017,ip_address3:27017"
- val uri = s"mongodb://${user}:${pwd}@${host}/${db}?replicaSet=${replset}"
- val uri = s"mongodb://${user}:${pwd}@${host}/?replicaSet=${replset}"
- val uri = s"mongodb://${user}:${pwd}@${replset}/${host}/${db}"
- val uri = s"mongodb://${user}:${pwd}@${replset}/${host}/${db}.${collection_name}"
- val uri = s"mongodb://${user}:${pwd}@${host}" // setting db, collection, replica set in WriteConfig
- val uri = s"mongodb://${user}:${pwd}@${host}/${db}" // this works IF HOST IS PRIMARY ONLY; not for hosts as defined above
- com.mongodb.MongoTimeoutException: Timed out after 30000 ms while waiting
- for a server that matches WritableServerSelector. Client view of cluster
- state is {type=REPLICA_SET, servers=[{address=machine.unix.domain.org:27017,
- type=UNKNOWN, state=CONNECTING, exception={com.mongodb.MongoSocketException:
- machine.unix.domain.org}, caused by {java.net.UnknownHostException:
- machine.unix.domain.org}}, {address=machine.unix.domain.org:27017,
- type=UNKNOWN, state=CONNECTING, exception={com.mongodb.MongoSocketException:
- machine.unix.domain.org}, caused by {java.net.UnknownHostException:
- machine.unix.domain.org}}, {address=machine.unix.domain.org:27017,
- type=UNKNOWN, state=CONNECTING, exception={com.mongodb.MongoSocketException:
- machine.unix.domain.org}, caused by {java.net.UnknownHostException:
- machine.unix.domain.org}}]
- com.mongodb.MongoTimeoutException: Timed out after 30000 ms while waiting
- for a server that matches WritableServerSelector. Client view of cluster
- state is {type=REPLICA_SET, servers=[{address=xx.xx.xx.xx:27017,
- type=UNKNOWN, state=CONNECTING, exception=
- {com.mongodb.MongoSecurityException: Exception authenticating
- MongoCredential{mechanism=null, userName='xx', source='admin', password=
- <hidden>, mechanismProperties={}}}, caused by
- {com.mongodb.MongoCommandException: Command failed with error 18:
- 'Authentication failed.' on server xx.xx.xx.xx:27017. The full response is {
- "ok" : 0.0, "errmsg" : "Authentication failed.", "code" : 18, "codeName" :
- "AuthenticationFailed", "operationTime" : { "$timestamp" : { "t" :
- 1534459121, "i" : 1 } }, "$clusterTime" : { "clusterTime" : { "$timestamp" :
- { "t" : 1534459121, "i" : 1 } }, "signature" : { "hash" : { "$binary" :
- "xxx=", "$type" : "0" }, "keyId" : { "$numberLong" : "123456" } } } }}}...
- val host = s"${primary_ip_address}:27017" // primary only
- val uri = s"mongodb://${user}:${pwd}@${host}/${db}"
- val writeConfig: WriteConfig =
- WriteConfig(Map(
- "uri" -> uri,
- "database" -> db,
- "collection" -> collection_name,
- "replicaSet" -> replset))
- // write data to mongo
- MongoSpark.save(rdd_hex_bson, writeConfig)
Add Comment
Please, Sign In to add comment