Guest User

Untitled

a guest
Jul 18th, 2018
81
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 0.84 KB | None | 0 0
  1. rdd = sc.mongoRDD('mongodb://user:pass@ip:port/db.coll?authSource=admin_db)
  2. def include_id(doc):
  3. doc["mongo_id"] = str(doc.pop('_id', ''))
  4. doc["ip"] = m_ip_port
  5. return doc
  6.  
  7. def it_projection(doc_it):
  8. projection_list = m_projection
  9. for doc in doc_it:
  10. projected_doc = {i: doc.get(i) for i in m_projection = ['_id', 'key1', 'key2', 'key3']}
  11. yield projected_doc
  12.  
  13. json_rdd = rdd.mapPartitions(it_projection).map(include_id).map(json.dumps).map(lambda x: ('key', x))
  14. json_rdd.saveAsNewAPIHadoopFile(
  15. path='-',
  16. outputFormatClass="org.elasticsearch.hadoop.mr.EsOutputFormat",
  17. keyClass="org.apache.hadoop.io.NullWritable",
  18. valueClass="org.elasticsearch.hadoop.mr.LinkedMapWritable",
  19. conf={
  20. "es.nodes" : e_ip_port,
  21. "es.resource" : e_index + '/' + e_type,
  22. "es.input.json": "true"
  23. }
  24. )
Add Comment
Please, Sign In to add comment