Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- """6.009 Lab 7 -- Faster Gift Delivery."""
- from graph import Graph
- # NO ADDITIONAL IMPORTS ALLOWED!
- class GraphFactory:
- """Factory methods for creating instances of `Graph`."""
- def __init__(self, graph_class):
- """Return a new factory that creates instances of `graph_class`."""
- self.graph_class = graph_class
- def from_list(self, adj_list, labels=None):
- """Create and return a new graph instance.
- Use a simple adjacency list as source, where the `labels` dictionary
- maps each node name to its label.
- Parameters:
- `adj_list`: adjacency list representation of a graph
- (as a list of lists)
- `labels`: dictionary mapping each node name to its label;
- by default it's None, which means no label should be set
- for any of the nodes
- Returns:
- new instance of class implementing `Graph`
- """
- graph = self.graph_class()
- # add each node from the adjacency list with its corresponding label
- for name in range(len(adj_list)):
- if labels is not None and labels[name] is not None:
- label = labels[name] # will there always be a label if it is not none?
- graph.add_node(name, label)
- else:
- graph.add_node(name)
- # add each edge by considering all nodes of every node's sublist in adj_list
- for name in range(len(adj_list)):
- for neighbor in adj_list[name]:
- graph.add_edge(name, neighbor)
- return graph
- def from_dict(self, adj_dict, labels=None):
- """Create and return a new graph instance.
- Use a simple adjacency dictionary as source where the `labels`
- dictionary maps each node name its label.
- Parameters:
- `adj_dict`: adjacency dictionary representation of a graph
- `labels`: dictionary mapping each node name to its label;
- by default it's None, which means no label should be set
- for any of the nodes
- Returns:
- new instance of class implementing `Graph`
- """
- graph = self.graph_class()
- # add each node from the adjacency dict with its corresponding label
- for name in adj_dict:
- if labels is not None and labels[name] is not None:
- label = labels[name] # will there always be a label if it is not none?
- graph.add_node(name, label)
- else:
- graph.add_node(name)
- # add each edge by considering all nodes of every node's sublist in adj_dict. ***ISNT THIS AN ISSUE FOR COMPACT GRAPH***
- for name in adj_dict:
- for neighbor in adj_dict[name]:
- graph.add_edge(name, neighbor)
- return graph
- class FastGraph(Graph):
- """Faster implementation of `Graph`.
- Has extra optimizations for star and clique patterns.
- """
- def __init__(self):
- self.adj_dict = {}
- self.labels = {}
- self.cliques = {} # key = clique size, val = set of tuples of cliques of that size. clique tuples are sorted
- self.nscliques = {} # key = node, val = set of tuples of cliques associated w that node. tuples are internally sorted
- # key = node, val = dict: key = clique size, val = list of nodes in clique associated with node
- self.stars = {} # key = central node, val = outdegree
- def query(self, pattern):
- def is_clique():
- n = len(pattern)
- for i in range(n):
- if pattern[i][1] != [j for j in range(n) if j != i]:
- return False
- return n
- def does_a_clique_match_labels(clique):
- # clique = sorted tuple of nodes
- labelsset = set()
- for v in range(len(clique)):
- labelsset.add(self.labels[v])
- for i in range(len(pattern)):
- if pattern[i][0] not in labelsset:
- return False
- return True
- def query_clique(clique, index, nodes=None, assignments=None, usednodes=None): # get the matching(s) already knowing this clique works
- # clique = sorted tuple of nodes
- flag = False
- if not usednodes:
- usednodes = set()
- flag = True
- combos = [] # list of all assignment dicts
- if assignments is None:
- assignments = {}
- if nodes is None:
- nodes = {}
- for v in clique:
- nodes[v] = [i for i in clique if i != v]
- if index in assignments:
- if assignments[index] in nodes:
- return [assignments]
- else:
- return []
- label = pattern[index][0]
- for node, neighbors in nodes.items():
- if node not in usednodes:
- if self.labels[node] == label or label == '*':
- assignments[index] = node
- usednodes.add(node)
- else:
- continue
- else:
- continue
- if pattern[index][1]: #should def be true for clique pattern
- for child in pattern[index][1]:
- assignmentscopy = assignments.copy()
- new_adj_dict = {}
- for node in neighbors:
- new_adj_dict[node] = [i for i in clique if i != node] # shrink adj dict to only the nodes we want to consider
- combos += query_clique(clique, child, new_adj_dict, assignmentscopy, usednodes)
- else: # if empty list
- combos.append(assignments.copy())
- if flag: usednodes = set() # reset it for next iteration of for loop at same recursion depth (only if it was first call)
- return combos
- def step_2_helper(index, nodes=None, assignments=None, usednodes=None): # for non clique queries
- flag = False
- if not usednodes:
- usednodes = set()
- flag = True
- combos = [] # list of all assignment dicts
- if nodes is None:
- nodes = self.adj_dict
- if assignments is None:
- assignments = {}
- if index in assignments:
- if assignments[index] in nodes:
- return [assignments]
- else:
- return []
- label = pattern[index][0]
- for node, neighbors in nodes.items():
- if node not in usednodes:
- # if pattern is a clique:
- # nodes = self.cliques all cliques for this particular node
- if self.labels[node] == label or label == '*':
- assignments[index] = node
- usednodes.add(node)
- else:
- continue
- else:
- continue
- if pattern[index][1]:
- for child in pattern[index][1]:
- assignmentscopy = assignments.copy()
- new_adj_dict = {}
- for node in neighbors:
- new_adj_dict[node] = self.adj_dict[node] # shrink adj dict to only the nodes we want to consider
- combos += step_2_helper(child, new_adj_dict, assignmentscopy, usednodes)
- else: # if empty list
- combos.append(assignments.copy())
- if flag: usednodes = set() #reset it for next iteration of for loop at same recursion depth (only if it was first call)
- return combos
- def get_proper_results(results):
- formattedresults = [] # results returned in correct format (list of lists)
- are_blanks = True
- while are_blanks:
- are_blanks = False # until we find a blank
- new_results = []
- for d in results:
- if len(d) < len(pattern): # there is at least 1 blank
- are_blanks = True
- for i in range(len(pattern)):
- if i not in d:
- usednodes = set()
- for k, v in d.items():
- usednodes.add(v)
- new_results += step_2_helper(i, assignments=d, usednodes=usednodes) #continue the process from dead end (i)
- break
- else:
- new_results.append(d)
- results = new_results
- for i, dictionary in enumerate(results):
- firsttime = True
- for k, v in dictionary.items():
- if firsttime:
- formattedresults.append([v])
- else:
- formattedresults[i].append(v)
- firsttime = False
- return formattedresults
- a = is_clique()
- if a:
- final_clique_results = []
- cliques = self.cliques[a]
- for clique in cliques:
- if does_a_clique_match_labels(clique):
- results = query_clique(clique, 0)
- formatted_results = get_proper_results(results)
- final_clique_results.append(formatted_results)
- else:
- results = step_2_helper(0)
- return get_proper_results(results)
- #todo check for duplicates in results, results having combos that reuse nodes, intersections fix
- #todo for cloques: get all clique subsets of appropirate size and return the ones that match that pattern labels
- def add_node(self, name, label=''):
- """Add a node with name `name` and label `label`."""
- if name in self.adj_dict:
- raise ValueError
- else:
- self.adj_dict[name] = [] # add name to adj dict
- self.labels[name] = label # add corresponding label
- if 1 not in self.cliques:
- self.cliques[1] = {(name)}
- else:
- self.cliques[1].add((name))
- def remove_node(self, name):
- """Remove the node with name `name`."""
- if name not in self.adj_dict:
- raise LookupError
- else:
- self.adj_dict.pop(name) # remove name from adj dict
- # remove node from star if it was part of a star
- if name in self.stars: # if it was central node
- self.stars.pop(name)
- for node in self.adj_dict:
- if name in self.adj_dict[node]: # If this node points to a name
- self.adj_dict[node].remove(name)
- self.stars[name] -= 1 # should exist and be at least 1-legged
- if self.stars[name] == 0: # no longer a center of a star
- self.stars.pop(name)
- # CLIQUES ANALYSIS
- self.nscliques.pop(name)
- # remove/readd-name any cliques that had name
- for node, cliques in self.nscliques:
- for clique in cliques:
- if name in clique:
- lclique = list(clique)
- lclique.remove(name)
- newclique = tuple(sorted(tuple(lclique)))
- self.nscliques[node].add(newclique)
- a = len(clique)
- # remove clique from its self.cliques dict at its size
- self.cliques[a].remove(clique)
- # add clique minus end to self.cliques dict at size-1
- if a - 1 in self.cliques:
- self.cliques[a - 1].add(newclique)
- def add_edge(self, start, end):
- """Add a edge from `start` to `end`."""
- if start not in self.adj_dict or end not in self.adj_dict:
- raise LookupError
- if end in self.adj_dict[start]:
- raise ValueError
- else:
- self.adj_dict[start].append(end) # add end node to start node's value list
- if start in self.stars: # if star is about to have one more node
- self.stars[start] += 1
- else: # else add a new 1-legged star for that start node
- self.stars[start] = 1
- #CLIQUES ANALYSIS
- # if there is an edge from end to start: now undirected, possibility of cliques
- if start in self.adj_dict[end]:
- # if this end is undirectedly connected to all other edges for any associated clique of start
- if start in self.nscliques:
- for clique in self.nscliques[start]:
- if clique in self.nscliques[end]:
- # add this +1 size new clique
- a = len(clique)
- new_clique_l = list(clique).append(end)
- new_clique = tuple(sorted(new_clique))
- if a not in self.cliques:
- self.cliques[a+1] = new_clique
- else:
- self.cliques[a+1].add(new_clique)
- self.nscliques[start].add(new_clique)
- self.nscliques[end].add(new_clique)
- # add size 2 clique start,end
- if 2 not in self.cliques:
- cliquey = (start, end)
- clique = tuple(sorted(cliquey))
- self.cliques[2] = {clique}
- if start in self.nscliques:
- self.nscliques[start].add(clique)
- self.nscliques[end].add(clique)
- def remove_edge(self, start, end):
- """Remove the edge from `start` to `end`."""
- if start not in self.adj_dict or end not in self.adj_dict:
- raise LookupError
- if end not in self.adj_dict[start]:
- raise LookupError
- else:
- self.adj_dict[start].remove(end)
- self.stars[start] -= 1 # should exist and be at least 1-legged
- if self.stars[start] == 0: # no longer a center of a star
- self.stars.pop(start)
- # CLIQUES ANALYSIS
- if start in self.nscliques:
- for clique in self.nscliques[start]:
- if end in clique:
- #reomve that clique, add the same one without end in it
- self.nscliques[start].remove(clique)
- lclique = list(clique)
- lclique.remove(end)
- newclique = tuple(sorted(tuple(lclique)))
- self.nscliques[start].add(newclique)
- a = len(clique)
- # remove clique from its self.cliques dict at its size
- self.cliques[a].remove(clique)
- # add clique minus end to self.cliques dict at size-1
- if a - 1 in self.cliques:
- self.cliques[a - 1].add(newclique)
- if end in self.nscliques:
- for clique in self.nscliques[end]:
- if start in clique:
- #reomve that clique, add the same one without start in it
- self.nscliques[end].remove(clique)
- lclique = list(clique)
- lclique.remove(start)
- newclique = tuple(sorted(tuple(lclique)))
- self.nscliques[end].add(newclique)
- a = len(clique)
- #remove clique from its self.cliques dict at its size
- self.cliques[a].remove(clique)
- # add clique minus start to self.cliques dict at size-1
- if a-1 in self.cliques:
- self.cliques[a-1].add(newclique)
- if __name__ == '__main__':
- # factory = GraphFactory(FastGraph)
- # adj = [[1, 3], [2], [0], []]
- # labels = {0: "a", 1: "a", 2: "a", 3: "b"}
- # graph = factory.from_list(adj, labels=labels)
- #
- #
- # # All nodes.
- # pattern = [('*', [1, 2]), ('*', []), ('*', [])]
- # expected = [[0, 1, 3], [0, 3, 1]]
- # a = graph.query(pattern)
- # print(a)
- def make_clique_pattern(n, labels=None):
- labels = labels or ["*" for i in range(n)]
- return [(labels[i], [j for j in range(n) if j != i])
- for i in range(n)]
- p = make_clique_pattern(5)
- def is_clique(pattern):
- n = len(pattern)
- for i in range(n):
- if pattern[i][1] != [j for j in range(n) if j != i]:
- return False
- return n
- def does_a_clique_match_labels(clique, pattern, labels):
- # clique = sorted tuple of nodes
- labelsset = set()
- for v in clique:
- labelsset.add(labels[v])
- for i in range(len(pattern)):
- if pattern[i][0] not in labelsset:
- return False
- return True
- l = {'a': 'yo', 'b': 'yo', 'c': 'sup'}
- labels = ['hi', 'yo', 'sup']
- clique = ('a', 'b', 'c')
- pattern = make_clique_pattern(3, labels)
- print(pattern)
- a = does_a_clique_match_labels(clique, pattern, l)
- print(a)
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement