Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- rdd = sc.mongoRDD('mongodb://user:pass@ip:port/db.coll?authSource=admin_db)
- def include_id(doc):
- doc["mongo_id"] = str(doc.pop('_id', ''))
- doc["ip"] = m_ip_port
- return doc
- def it_projection(doc_it):
- projection_list = m_projection
- for doc in doc_it:
- projected_doc = {i: doc.get(i) for i in m_projection = ['_id', 'key1', 'key2', 'key3']}
- yield projected_doc
- json_rdd = rdd.mapPartitions(it_projection).map(include_id).map(json.dumps).map(lambda x: ('key', x))
- json_rdd.saveAsNewAPIHadoopFile(
- path='-',
- outputFormatClass="org.elasticsearch.hadoop.mr.EsOutputFormat",
- keyClass="org.apache.hadoop.io.NullWritable",
- valueClass="org.elasticsearch.hadoop.mr.LinkedMapWritable",
- conf={
- "es.nodes" : e_ip_port,
- "es.resource" : e_index + '/' + e_type,
- "es.input.json": "true"
- }
- )
Add Comment
Please, Sign In to add comment