Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import lxml.etree as etree
- from copy import deepcopy
- def create_delta_tree(changed_xml_tree, unchanged_xml_tree):
- deleted_elements_tree = create_one_operation_type_element_tree(unchanged_xml_tree, changed_xml_tree,
- get_elements_to_be_deleted)
- created_elements_tree = create_one_operation_type_element_tree(changed_xml_tree, unchanged_xml_tree,
- get_elements_to_be_created)
- updated_elements_tree = create_one_operation_type_element_tree(changed_xml_tree, unchanged_xml_tree,
- get_elements_to_be_updated)
- delta_tree = merge_to_one_tree(deleted_elements_tree, created_elements_tree, updated_elements_tree)
- return delta_tree
- def create_one_operation_type_element_tree(base_tree, reference_tree, elements_filter):
- changeable_btree = deepcopy(base_tree)
- changeable_rtree = deepcopy(reference_tree)
- get_updated_elements(changeable_btree.getroot()[0], changeable_rtree.getroot()[0], elements_filter)
- return changeable_btree
- def get_updated_elements(base_element, reference_element, function_filtering_elements):
- e1_children_names = create_distinct_names_dict(base_element.getchildren())
- e2_children_names = create_distinct_names_dict(reference_element.getchildren())
- children_to_be_left = function_filtering_elements(e1_children_names, e2_children_names)
- remove_child_elements(base_element, children_to_be_left)
- return len(children_to_be_left) > 0
- def create_distinct_names_dict(elements):
- return {create_distinct_name(element): element for element in elements}
- def create_distinct_name(element):
- distinct_name = element.tag
- attribs = element.attrib
- if attribs:
- distinct_name += ''.join(attribs.values())
- return distinct_name
- def remove_child_elements(element, children_to_be_left):
- for child in element.getchildren():
- remove_or_leave(child, children_to_be_left)
- def remove_or_leave(child, to_be_left):
- if create_distinct_name(child) not in to_be_left:
- child.getparent().remove(child)
- def xml_compare(x1, x2, reporter=None):
- try:
- if x1.tag != x2.tag:
- if reporter:
- reporter('Tags do not match: %s and %s' % (x1.tag, x2.tag))
- return False
- for name, value in x1.attrib.items():
- if x2.attrib.get(name) != value:
- if reporter:
- reporter('Attributes do not match: %s=%r, %s=%r'
- % (name, value, name, x2.attrib.get(name)))
- return False
- for name in x2.attrib.keys():
- if name not in x1.attrib:
- if reporter:
- reporter('x2 has an attribute x1 is missing: %s'
- % name)
- return False
- if not x1.text == x2.text:
- if (x1.text and x1.text.strip()) and (x2.text and x2.text.strip()):
- if reporter:
- reporter('text: %r != %r' % (x1.text, x2.text))
- return False
- return True
- except SyntaxError:
- return False
- def get_elements_to_be_deleted(e1_cn, e2_cn):
- to_be_deleted = []
- for dist_name, element in e1_cn.items():
- find_elements_to_be_deleted(dist_name, e2_cn, element, to_be_deleted=to_be_deleted)
- return to_be_deleted
- def find_elements_to_be_deleted(dist_name, e2_cn, element, to_be_deleted):
- if dist_name not in e2_cn.keys():
- to_be_deleted.append(dist_name)
- remove_child_elements(element, [])
- element.text = None
- else:
- get_updated_elements(element, e2_cn[dist_name], get_elements_to_be_deleted)
- def get_elements_to_be_created(e1_cn, e2_cn):
- to_be_created = []
- for dist_name, element in e1_cn.items():
- find_elements_to_be_created(dist_name, e2_cn, element, to_be_created=to_be_created)
- return to_be_created
- def find_elements_to_be_created(dist_name, e2_cn, element, to_be_created):
- if dist_name not in e2_cn.keys():
- to_be_created.append(dist_name)
- else:
- get_updated_elements(element, e2_cn[dist_name], get_elements_to_be_created)
- def get_elements_to_be_updated(e1_cn, e2_cn):
- to_be_updated = []
- for dist_name, element in e1_cn.items():
- find_elements_to_be_updated(dist_name, e2_cn, element, to_be_updated=to_be_updated)
- return to_be_updated
- def find_elements_to_be_updated(dist_name, e2_cn, element, to_be_updated):
- if dist_name in e2_cn.keys():
- if get_updated_elements(element, e2_cn[dist_name], get_elements_to_be_updated) or not xml_compare(element,
- e2_cn[
- dist_name]):
- to_be_updated.append(dist_name)
- def merge_to_one_tree(del_elem_tree, cre_elem_tree, upd_elem_tree):
- non_empty_trees = []
- for elem_tree, operation in zip([del_elem_tree, cre_elem_tree, upd_elem_tree], ['delete', 'create', 'update']):
- append_tree_if_not_empty(elem_tree, operation, non_empty_trees=non_empty_trees)
- for tree in non_empty_trees[1:]:
- move_all_elements_to_first_tree(non_empty_trees, tree)
- return non_empty_trees[0]
- def append_tree_if_not_empty(elem_tree, operation, non_empty_trees):
- if len(elem_tree.getroot()) > 0:
- remove_header(elem_tree)
- change_elements_operation(elem_tree, operation)
- non_empty_trees.append(elem_tree)
- def remove_header(tree):
- for child in tree.getroot()[0]:
- if child.tag == 'header':
- child.getparent().remove(child)
- def change_elements_operation(tree, operation):
- for child in tree.getroot()[0]:
- child.attrib['operation'] = operation
- def move_all_elements_to_first_tree(non_empty_trees, tree):
- for element in tree.getroot()[0]:
- non_empty_trees[0].getroot()[0].append(element)
- if __name__ == '__main__':
- xml_tree = etree.parse(r'2_cells_to_delta.xml')
- dwojeczka = etree.parse(r'3_cells_to_delta.xml')
- # get_updated_elements(xml_tree.getroot()[0], dwojeczka.getroot()[0], get_elements_to_be_deleted)
- # print(etree.tostring(xml_tree))
- print(etree.tostring(create_delta_tree(xml_tree, dwojeczka)))
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement