Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import random
- class GrowCounter:
- '''Convergent conflict-free replicated data type state based grow-only counter.
- Guarantees strong eventual consistency.
- '''
- def __init__(self, n):
- assert n > 0, 'at least one node in cluster'
- self.n = n
- self.p = [0 for _ in range(n)]
- def value(self):
- return sum(self.p)
- def increment(self, i):
- assert 0 <= i < n, 'node id in cluster bounds'
- self.p[i] += 1
- def merge(self, other):
- assert self.n == other.n
- self.p = list(map(max, zip(self.p, other.p)))
- def compare(self, other):
- assert self.n == other.n
- return all(l <= r for (l, r) in zip(self.p, other.p))
- def __repr__(self):
- return '<{}: value={}, n={}, p={}>'.format(self.__class__.__name__,
- self.value(), self.n, self.p)
- class PosNegCounter:
- '''Convergent conflict-free replicated data type state based positive-negative counter.
- Guarantees strong eventual consistency.
- '''
- def __init__(self, n):
- self.pos = GrowCounter(n)
- self.neg = GrowCounter(n)
- def value(self):
- return self.pos.value() - self.neg.value()
- def increment(self, i):
- self.pos.increment(i)
- def decrement(self, i):
- self.neg.increment(i)
- def merge(self, other):
- self.pos.merge(other.pos)
- self.neg.merge(other.neg)
- def compare(self, other):
- return self.pos.compare(other.pos) and self.neg.compare(other.neg)
- def __repr__(self):
- return '<{}: value={}, pos={}, neg={}>'.format(self.__class__.__name__,
- self.value(), self.pos, self.neg)
- if __name__ == '__main__':
- # Simulate cluster of n nodes, each node with its own local counter
- n = 10
- counters = [GrowCounter(n) for _ in range(n)]
- # Count to 100 in a distributed cluster of n nodes
- for _ in range(100):
- # Pick an arbitrary node and increment its counter
- node = random.randrange(n)
- counters[node].increment(node)
- # Simulate eventual consistency through arbitrary node communication
- for i in range(1000):
- lhs = random.choice(counters)
- rhs = random.choice(counters)
- # Merge is commutative, associative, idempotent
- lhs.merge(rhs)
- # Merge always increases state monotonically
- assert rhs.compare(lhs)
- # If we waited long enough we will see a cluster-wide consistent state
- for i, counter in enumerate(counters):
- print('{}: {}'.format(i, counter))
- assert all(v.value() == 100 for v in counters), 'final consistent state reached'
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement