Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- from collections import OrderedDict, defaultdict
- import os
- import sys
- import subprocess
- from copy import deepcopy
- # from monosat import *
- # enable using multiple levels of dict keys automatically, even if nested levels don't yet exist
- NestedDict = lambda: defaultdict(NestedDict)
- class SATGenerator(object):
- def __init__(self, traces, maxx, maxy, maxz):
- self.maxx = maxx
- self.maxy = maxy
- self.maxz = maxz
- self.output_path = 'output.cnf'
- self.vars = []
- self.nodes = []
- self.traces = traces
- self.visited_neighbor_edges = []
- self.setup_output()
- self.create_vars()
- self.create_clauses()
- self.parse_solution(self.solve())
- def setup_output(self):
- self.output = open(self.output_path, 'w')
- self.output.write('p cnf 0 0\ndigraph int 0 0 0\n')
- # self.g = Graph()
- def create_vars(self):
- # setup nodes
- self.grid_by_xyz = OrderedDict()
- for x in range(self.maxx):
- ys = OrderedDict()
- for y in range(self.maxy):
- zs = OrderedDict()
- for z in range(self.maxz):
- v = self.node(('x {} y {} z {} node'.format(x, y, z), (x, y, z)))
- zs[z] = {'node':v, 'edges':[], 'xyz':(x,y,z), 'edges_xyz': NestedDict()}
- ys[y] = zs
- self.grid_by_xyz[x] = ys
- # setup all possible edges
- for x in self.grid_by_xyz:
- for y in self.grid_by_xyz[x]:
- for z in self.grid_by_xyz[x][y]:
- self._neighbor_edges(x, y, z)
- def create_clauses(self):
- # # for every space
- all_edges = set()
- for x in range(self.maxx):
- for y in range(self.maxy):
- for z in range(self.maxz):
- for e in self.grid_by_xyz[x][y][z]['edges']:
- all_edges.add(e)
- for cl_node in self._get_circumferential_locs(x, y, z):
- starting_edge = self._node_edge_to(self.grid_by_xyz[x][y][z], cl_node)
- self._neighbor_constraint(self.grid_by_xyz[x][y][z], cl_node, starting_edge)
- # for edge_vars_for_direction in zip(*locs):
- # # allow only one trace's edge
- # self._naive_mutex(edge_vars_for_direction)
- # the following line should be handled by the neighbor clauses, falling like dominoes from the start edges
- # self.clause(all_edges)
- # go through the traces, OR the start node edges, then setup reachability to the end node
- self.start_end = []
- ios_overall = []
- for trace in self.traces:
- ios=[]
- for io in ['input', 'output']:
- x,y,z = self.traces[trace][io]
- v = self.grid_by_xyz[x][y][z]
- # print(v)
- ios.append(v)
- # at least one of these edges is True
- self.clause(v['edges'])
- self.start_end.append((trace, x, y, z))
- # new var for the clause of reachability between two nodes (start and end)
- rv = self.var('reach {}'.format(trace))
- # reach <graphID> node1 node2 var
- self.output.write('reach 0 {} {} {}\n'.format(ios[0]['node'], ios[1]['node'], rv))
- # the var is True
- self.clause([rv])
- ios_overall.append(ios)
- # disallow an input to connect to any other trace's output
- for ios in ios_overall:
- inp = ios[0]
- # get all other traces beside this current one
- others = list(ios_overall); others.remove(ios)
- # get all other traces' outputs
- other_outputs = [other_io[1] for other_io in others]
- rv = self.var('anti reach {}'.format(trace)) # TODO pull this line outside FOR loop?
- for other_out in other_outputs:
- self.output.write('reach 0 {} {} {}\n'.format(inp['node'], other_out['node'], rv))
- self.clause([-rv]) # TODO pull this line outside FOR loop?
- def solve(self):
- # close the clause file
- self.output.close()
- # call the SAT/SMT solver, pass the clause file path
- proc = subprocess.Popen(['/home/nathan/Projects/github/monosat/monosat', self.output_path, '-witness'], stdout=subprocess.PIPE,
- universal_newlines=True)
- # wait for solver output
- stdout, stderr = proc.communicate()
- stdout = stdout.strip()
- print('num nodes {} num edges {}'.format(len(self.nodes), len(self.vars)))
- with open('solver_out', 'w') as so:
- so.write(stdout)
- if stdout.endswith('UNSATISFIABLE'):
- print('FAILED')
- sys.exit(1)
- else:
- return stdout
- def parse_solution(self, stdout):
- """ for a given trace, walk all edges and save them in a grid for easy printing """
- sol_org = {}
- for line in stdout.split(os.linesep):
- # var lines are prefixed with letter v
- if not line.startswith('v'):
- continue
- # split the string on whitespace, after getting rid of the prefixed letter v
- for i in line[1:].split():
- # only interested in non-negative (True) vars, because those are the utilized edges
- if not i.startswith('-'):
- # convert the variable number back into a list index (the first item is the 0th index)
- ii = int(i)-1
- # there is no var 0, just like there isn't a negative list index
- if ii<0:
- continue
- # grab the variable's metadata
- v = self.vars[ii]
- # make sure it is an edge, which are the only metadata saved as tuples at this point
- if isinstance(v, tuple):
- sol_org[v[0][1]] = v[1][1]
- print('{} {}\n'.format(v[0][1] , v[1][1]))
- else:
- print(v)
- # open a new file to print the readable solution to
- o = open('sol_out', 'w')
- # find the longest trace name
- l = ''
- for trace in list(self.traces) + ['*']:
- if len(trace) > len(l):
- l = trace
- # store the length of the longest trace name
- l = len(l)
- for trace in self.traces:
- out_xyz = []
- sol = deepcopy(sol_org)
- print(trace)
- o.write('\n trace OUT\n')
- start = place = self.traces[trace]['input']
- end = self.traces[trace]['output']
- # because the graph can cycle, we need to be able to backtrack
- trace_progression = []
- backing_up = False
- while True:
- # save where we are, so we can backtrack later if needed... unless we just hit a dead-end
- if not backing_up:
- trace_progression.append(place)
- try:
- # edges have a source and destination, using the source (i.e. starting point), get the next location
- place = sol.pop(place)
- # if we popped off the next location with no error, we are not backtracking
- # and on the next loop, this place will be saved in case we have to revisit
- backing_up = False
- except KeyError:
- # the current location didn't point elsewhere, so start backing up
- backing_up = True
- try:
- np = trace_progression.pop()
- print("end-point {} wasn't found in the solution from point {}!! BACKTRACKING to {}".format(end, place, np))
- place = np
- except Exception as e:
- print("couldn't backtrack anymore!!!")
- raise e
- xx, yy, zz = place
- out_xyz.append((xx, yy, zz))
- if place == end:
- print('hit end')
- break
- # now that we're done walking the graph, we can make a crude sort-of bitmap
- for z in range(self.maxz):
- o.write('Z {}\n'.format(z))
- for x in range(self.maxx):
- for y in range(self.maxy):
- # print a * for start/end points
- if (trace, x, y, z) in self.start_end:
- o.write('*{} '.format(' ' * (l - 1)))
- # print the trace-name for points in a trace
- elif (x,y,z) in out_xyz:
- o.write('{}{} '.format(trace, ' ' * (l - len(trace))))
- # if a space is unused, print a 0
- else:
- o.write('0{} '.format(' ' * (l - 1)))
- o.write('\n')
- o.write('\n')
- o.close()
- # try printing all traces on the same "bitmap"... harder to read, but I wanted a sanity check
- o = open('sol_out2', 'w')
- for z in range(self.maxz):
- o.write('Z {}\n'.format(z))
- for x in range(self.maxx):
- for y in range(self.maxy):
- # check if this point is in any trace's start or end
- if [t for t in self.start_end if t[1:]==(x, y, z)]:
- o.write('*{} '.format(' ' * (l - 1)))
- # check if this point is in any trace, source or destination node
- elif (x,y,z) in sol_org or (x,y,z) in sol_org.values():
- o.write('{} '.format('T '))
- else:
- o.write('0{} '.format(' ' * (l - 1)))
- o.write('\n')
- o.write('\n')
- def var(self, name):
- self.vars.append(name)
- # n = self.g.addNode()
- # self.real_vars.append(n)
- #return (self.num_vars, n)
- return self.num_vars
- def node(self, name):
- self.nodes.append(name)
- return len(self.nodes)
- @property
- def num_vars(self):
- return len(self.vars)
- def clause(self, v_list, comment=None):
- svs = [str(v) for v in v_list]
- if comment is not None:
- self.comment(comment)
- self.output.write('{} 0\n'.format(' '.join(svs)))
- def comment(self, comment):
- self.output.write('c {}\n'.format(comment))
- def _kleiber_kwon(self, vs):
- pass
- def _naive_mutex(self, vs, cmdr_list=None):
- self.comment('_naive_mutex to follow{}'.format(' with cmdrs {} and {}'.format(cmdr_list, vs) if cmdr_list is not None else ''))
- for i, vi in enumerate(vs):
- if (i+1)>=len(vs):
- continue
- for vj in vs[i+1:]:
- self.clause((cmdr_list if cmdr_list is not None else []) + [-vi, -vj])
- self.comment('_naive_mutex finished')
- def _neighbor_edges(self, x, y, z):
- n = self.grid_by_xyz[x][y][z]
- #bvs = []
- circumferential_locs = self._get_circumferential_locs(x, y, z)
- for cl in circumferential_locs:
- others = list(circumferential_locs); others.remove(cl)
- # if v and cl, then one of the others
- #bv1 = BitVector(1)
- #bvs.append(bv1)
- t = n['node']
- f = cl['node']
- tt = self.nodes[t-1]
- ff = self.nodes[f-1]
- ee = (tt, ff)
- ev = self.var(ee)
- self._edge(n['node'], cl['node'], ev)#, bv1)
- n['edges'].append(ev)
- xx,yy,zz = cl['xyz']
- n['edges_xyz'][xx][yy][zz] = ev
- def _node_edge_to(self, n, on):
- return n['edges_xyz'][on['xyz'][0]][on['xyz'][1]][on['xyz'][2]]
- def _neighbor_constraint(self, from_node, center_node, edge_came_from):
- if (from_node, center_node) in self.visited_neighbor_edges or (center_node, from_node) in self.visited_neighbor_edges:
- return False
- if center_node['xyz'] in [self.traces[t]['output'] for t in self.traces]:
- return False
- self.visited_neighbor_edges.append((from_node, center_node))
- circumferential_loc_nodes = self._get_circumferential_locs(*center_node['xyz'])
- if from_node not in circumferential_loc_nodes:
- raise Exception('{} {}'.format(circumferential_loc_nodes, from_node))
- circumferential_loc_nodes.remove(from_node)
- # if the edge_came_from is True, then one of the outgoing edges is True
- self.clause([-edge_came_from] + [self._node_edge_to(center_node, n) for n in circumferential_loc_nodes])
- # Assert(Or(Not(v[1]), *es))
- def _edge(self, _from, _to, v, w=1, graph_id=0):
- """edge <GraphID> <from> <to> <CNF Variable> [weight]"""
- self.output.write('edge {} {} {} {} {}\n'.format(graph_id, _from, _to, v, w if w>1 else ''))
- def _get_circumferential_locs(self, x,y,z):
- # up, down (in Z), left, right, ahead, behind (in-plane), diags
- ensure = 2 + 4 + 4
- # up, down (in Z), left, right, ahead, behind (in-plane)
- ensure = 2 + 4
- nvs = []
- disallow_fortyfives = True
- for xx in [x-1,x,x+1]:
- if xx<0 or xx>=self.maxx:
- ensure = None
- continue
- for yy in [y-1,y,y+1]:
- if yy<0 or yy>=self.maxy:
- ensure = None
- continue
- for zz in [z-1,z,z+1]:
- if xx<0 or xx>=self.maxx or yy<0 or yy>=self.maxy or zz<0 or zz>=self.maxz:
- ensure = None
- continue
- # skip the center point
- if x==xx and y==yy and z==zz:
- continue
- # restrict vias to vertical Z transitions only
- if (xx!=x or yy!=y) and zz!=z:
- continue
- # forty-five degree angles are disabled for now
- # they can be enabled when diagonally crossing edges are disallowed
- if disallow_fortyfives:
- if abs(xx-x) == 1 and abs(yy-y) == 1:
- continue
- try:
- nvs.append(self.grid_by_xyz[xx][yy][zz])
- except:
- print('{} {} {}'.format(xx, yy, zz))
- raise
- if ensure is not None:
- assert len(nvs) == ensure, 'nvs was {}'.format(nvs)
- return nvs
- # make up some start and end points, to be placed within a grid of size: 20 x 20 x 2
- traces = OrderedDict([('t1', {'input': (0,0,1),
- 'output': (10,10,0)}),
- ('t2', {'input': (0,10,0),
- 'output': (10,1,0)}),
- ('t3', {'input': (0,0,0),
- 'output': (10,10,1)}),
- ('t4', {'input': (1,3,0),
- 'output': (10,4,0)})])
- # pass the traces and the grid size, it begins to generate clauses and proceed to call the solver
- SATGenerator(traces, maxx = 20, maxy = 20, maxz = 2)
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement