Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- new_dict = modify_object_dict(object_dict, object)
- object_dict = {} # dictionary is initially empty
- RDD = (sc.parallelize(Objects)
- .map(lambda object: modify_object_dict(object_dict, object))
- class Foobar(object):
- def __init__(self, name, x=None, y=None, z=None):
- self.name = name
- self.x = x
- self.y = y
- self.z = z
- objects = sc.parallelize([
- {"name": "foo", "x": 1}, {"name": "foo", "y": 3},
- {"name": "bar", "z": 4}
- ]).map(lambda x: Foobar(**x))
- pairs = objects.map(lambda obj: (obj.name, obj))
- rdd = pairs.groupByKey().mapValues(lambda iter: ...)
- def seq_op(obj_dict, obj):
- # equivalent to modify_object_dict
- # Lets assume it is as simple as this
- obj_dict.update((k, getattr(obj, k)) for k in ("x", "y", "z"))
- return obj_dict
- def comb_op(obj_dict_1, obj_dict_2):
- # lets it is a simple union
- obj_dict_1.update(obj_dict_2)
- return obj_dict_1
- dicts = pairs.aggregateByKey({}, seq_op, comb_op)
- dicts.collectAsMap()
- ## {'bar': {'x': None, 'y': None, 'z': 4},
- ## 'foo': {'x': None, 'y': 3, 'z': None}}
Add Comment
Please, Sign In to add comment