Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- from collections import defaultdict
- def get_overlap(pair, v2_extractor=None, v4_extractor=None):
- v2_blobs = pair['v2'].get('data', {}).get('days', {}) # {'YYYY-MM-DD': dict}
- v4_blobs = pair['v4'] # [{'creationDate': 'YYYY-MM-DD:...', 'k': val, ...}, ...]
- # One blob per date in v2, multiple per date in v4
- results = {'v2': {}, 'v4': defaultdict(list)}
- if not (v2_blobs and v4_blobs):
- return results
- v2_dates = v2_blobs.keys()
- v2_dates.sort()
- v4_blobs.sort(key=lambda d: d['creationDate'])
- v2_start, v2_end = v2_dates[0], v2_dates[-1] # possibly same
- start = end = None
- # Find overlap and walk v4 at same time.
- for v4 in v4_blobs:
- v4_date = v4['creationDate'][:10]
- # Walk start up as far as we must.
- if v4_date < v2_start:
- # If v2 is entirely after v4, we never get past here.
- continue
- elif not start:
- start = v4_date
- # We have at least one overlapping date.
- if v4_date <= v2_end:
- # Walk end up as far as we can.
- end = v4_date
- else:
- break
- value = v4_extractor(v4) if v4_extractor else v4
- results['v4'][v4_date].append(value)
- if end is None: # We never reached last line of the loop.
- return results
- for v2_date in v2_dates:
- if v2_date < start:
- continue
- if v2_date > end:
- break
- value = v2_extractor(v2_blobs[v2_date]) if v2_extractor else v2_blobs[v2_date]
- results['v2'][v2_date] = value
- return results
- def test_get_overlap():
- d1 = '1999-12-31'
- dt1 = d1 + 'T23:59:59.999Z'
- d2 = '2000-01-01'
- dt2 = d2 + 'T00:00:00.001Z'
- simple = {'v2': {'data': {'days': {d1: None, d2: test_get_overlap}}},
- 'v4': [{'creationDate': dt1},
- {'creationDate': dt2}]}
- assert get_overlap(simple) == {'v2': simple['v2']['data']['days'],
- 'v4': {d1: [{'creationDate': dt1}],
- d2: [{'creationDate': dt2}]}}
- # If either or both schema versions are empty, so is the result.
- empty_in = {'v2': {}, 'v4': []}
- empty = {'v2': {}, 'v4': {}}
- assert get_overlap(empty_in) == empty
- assert get_overlap({'v2': simple['v2'], 'v4': []}) == empty
- assert get_overlap({'v2': {'data': {'neighs': None}}, 'v4': simple['v4']}) == empty
- d3 = '2001-02-03'
- dt3 = d3 + 'T03:59:59.001Z'
- dt35 = d3 + 'T03:59:59.999Z'
- d4 = '2002-03-04'
- dt4 = d4 + 'T13:07:57.222Z'
- dt45 = d4 + 'T13:09:59.333Z'
- def v2_extractor(d):
- return d['value']
- all_v2 = dict([(v, {'value': i}) for i, v in enumerate([d1, d2, d3, d4])])
- all_v2_out = dict([(k, d['value']) for k, d in all_v2.items()])
- all_v4 = [{'creationDate': dt, 'last': dt[-4:]} for dt in [dt1, dt2, dt3, dt35, dt4, dt45]]
- def v4_extractor(d):
- return d['last']
- all_v4_out = {d1: ['999Z'], d2: ['001Z'], d3: ['001Z', '999Z'], d4: ['222Z', '333Z']}
- # Perfect overlap, multiple blobs per day in v4, extractor functions
- input = {'v2': {'data': {'days': all_v2}},
- 'v4': all_v4}
- output = {'v2': all_v2_out,
- 'v4': all_v4_out}
- result = get_overlap(input, v2_extractor, v4_extractor)
- assert result == output
- # Remove v2's head and v4's tail
- from copy import deepcopy
- input2 = deepcopy(input)
- del input2['v2']['data']['days'][d1]
- input2['v4'].pop()
- input2['v4'].pop()
- output2 = deepcopy(output)
- del output2['v2'][d1]
- del output2['v2'][d4]
- del output2['v4'][d1]
- del output2['v4'][d4]
- assert get_overlap(input2, v2_extractor, v4_extractor) == output2
- # And again
- del input2['v2']['data']['days'][d2]
- del output2['v2'][d2]
- del output2['v4'][d2]
- assert get_overlap(input2, v2_extractor, v4_extractor) == output2
- # And again; now we're empty
- input2['v4'].pop()
- input2['v4'].pop()
- del output2['v2'][d3]
- del output2['v4'][d3]
- assert output2 == empty
- assert get_overlap(input2, v2_extractor, v4_extractor) == output2
- # Since the algorithm treats v2 and v4 asymmetrically, also test opposite
- input3 = deepcopy(input)
- output3 = deepcopy(output)
- del input3['v2']['data']['days'][d4]
- del output3['v2'][d4]
- del output3['v4'][d4]
- assert get_overlap(input3, v2_extractor, v4_extractor) == output3
- input3['v4'].pop(0)
- del output3['v2'][d1]
- del output3['v4'][d1]
- assert get_overlap(input3, v2_extractor, v4_extractor) == output3
- del input3['v2']['data']['days'][d3]
- del output3['v2'][d3]
- del output3['v4'][d3]
- assert get_overlap(input3, v2_extractor, v4_extractor) == output3
- v4_head = input3['v4'].pop(0)
- del output3['v2'][d2]
- del output3['v4'][d2]
- assert empty == output3
- assert get_overlap(input3, v2_extractor, v4_extractor) == output3
- input3['v4'] = [v4_head] + input3['v4']
- del input3['v2']['data']['days'][d2]
- assert get_overlap(input3, v2_extractor, v4_extractor) == output3
- print "all passed"
- test_get_overlap()
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement