Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- """Test for s7."""
- import psycopg2
- class Relations(object):
- """Wrapper class to iterate db rows as sets."""
- def __init__(self):
- """Setting up a cursor."""
- self.connection = psycopg2.connect('dbname=test user=postgres')
- self.cursor = self.connection.cursor('components') # serverside cursor
- self.cursor.execute(
- 'SELECT object_id, relative_ids FROM relations;'
- )
- def __iter__(self):
- """Iterator interface."""
- return self
- def __next__(self):
- """Python 3 compatibiliy."""
- return self.next()
- def next(self):
- """Next row as combined set."""
- # next() call will raise StopIteration on cursor exhaustion,
- # no need to wrap that since for-loop will handle it by itself
- # but we want to free resources as soon as possible.
- try:
- object_id, relations = next(self.cursor)
- except StopIteration:
- self.cursor.close()
- self.connection.close()
- raise
- return set([object_id] + relations)
- def __del__(self):
- """Free resources on gc."""
- self.cursor.close()
- self.connection.close()
- class Ids(object):
- """Wrapper class to iterate db rows as single integers."""
- def __init__(self):
- """Setting up a cursor."""
- self.connection = psycopg2.connect('dbname=test user=postgres')
- self.cursor = self.connection.cursor('ids') # serverside cursor
- self.cursor.execute('SELECT id FROM objects;')
- def __iter__(self):
- """Iterator interface."""
- return self
- def __next__(self):
- """Python 3 compatibiliy."""
- return self.next()
- def next(self):
- """Next row as combined set."""
- # next() call will raise StopIteration on cursor exhaustion,
- # no need to wrap that since for-loop will handle it by itself
- # but we have to free connection as soon as possible.
- ident = next(self.cursor)
- return ident[0]
- def __del__(self):
- """Free resources on gc."""
- self.cursor.close()
- self.connection.close()
- def make_components(relations):
- """Return list of components (sets) from relations list of sets."""
- components = []
- for relation in relations:
- merged = True
- while merged:
- merged = False
- for index, component in enumerate(components):
- if relation & component:
- relation |= components.pop(index)
- merged = True
- break
- components.append(relation)
- return components
- def include_ids(ids, components):
- """Add id as sole-value component if one not found in relations."""
- for ident in ids:
- not_found = True
- for component in components:
- if ident in component:
- not_found = False
- break
- if not_found:
- components.append({ident})
- return components
- relations = Relations()
- components = make_components(relations)
- ids = Ids()
- components = include_ids(ids, components)
- # components have not to intersect
- assert all((
- (not a & b)
- for ia, a in enumerate(components)
- for ib, b in enumerate(components)
- if ia != ib
- )), [ # Assertion error output
- (a, b, a & b)
- for ia, a in enumerate(components)
- for ib, b in enumerate(components)
- if ia != ib and a & b
- ]
- # all ids have to take part in components, even empty ones
- assert ids == set.union(*components), (ids, set.union(*components))
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement