# -*- coding: utf-8 -*-
# Authors: João S. O. Bueno, Humberto C. Innareli
# License: Creative Commons Attribution Required 3.0
"""Problema porposto por Shandler Lyrio --
Idéia de Solução Humebrto Celeste Innareli
Implementação: João Sebastião de Oliveira Bueno
Problema:
Dadas faixas de CEPS atendidas por diferentes distribuidores, uma busca
para encontrar quais distribuidores podem atender quais CEPs
çeva cerca de 1 min. para rodar com a seleção feita toda dentro do SQL,
para cerca de 1500 faixas de CEP e 85000 CEPs
Solução encontrada:
Ordenar os CEPs em memoria, propiciando uma busca O(log(N)) nos mesmos, e, para cada
faixa de CEPs atendida, selecionar a fatia de CEPs atendida pela faixa:
O(M log(N))
(Em vez de para cada CEP procurar em todas as faixas para verificar se
ele se enquadra dentro de seus limites -- log(N^2))
"""
import bisect
class Range(object):
total = 0
ranges = {}
def __init__(self, start, stop, value = None):
self.start = start
self.stop = stop
self.id = Range.total
self.ranges[self.id] = value
Range.total += 1
class Distributors(object):
def __init__(self):
self.ranges = []
def check(self, zips):
"""Zips must be a sorted list of Zips"""
results = {}
for range_ in self.ranges:
first = bisect.bisect_left(zips, range_.start)
last = bisect.bisect(zips, range_.stop)
for index in xrange(first, last):
if not zips[index] in results:
results[zips[index]] = [range_.id]
else:
results[zips[index]].append(range_.id)
ungrouped = set(zips)
ungrouped.difference_update(results.keys())
return results, ungrouped
def __setitem__(self, (start, stop), value=None):
self.ranges.append(Range(start, stop, value))
def __getitem__(self, index):
"""Don\'t use this but for testing purposes -- rather call "check" with a list with all the zip codes"""
r,u = self.check([index])
if r:
return [Range.ranges[range_index] for range_index in r[index]]
return []
if __name__ == "__main__":
import unittest
import random
import timeit
class TestDistributor(unittest.TestCase):
def setUp(self):
self.dist = Distributors()
self.dist[100,120] = 1
self.dist[105, 115] = 2
self.dist[110, 110] = 3
def testEmpty(self):
self.assertEqual(self.dist[99], [])
def testUpperEmpty(self):
self.assertEqual(self.dist[121], [])
def testBelonging(self):
self.assertEqual(self.dist[101],[1])
def testLowerBoundary(self):
self.assertEqual(self.dist[100],[1])
def testUpperBoundary(self):
self.assertEqual(self.dist[120],[1])
def testIntersection(self):
self.assertEqual(self.dist[105],[1, 2])
def testUnitRange(self):
self.assertEqual(self.dist[110],[1,2,3])
RANGES = 2000
ZIPS = 100000
class DistributorPerformance(object):
def setUp(self):
self.dist = Distributors()
for i in xrange(RANGES):
indx1 = random.randrange(10000000, 99999999)
#indx2 = random.randrange(indx1, 99999999)
indx2 = random.randrange(indx1, min(indx1 + 2000000, 99999999))
self.dist[indx1, indx2] = i
def zip_setup(self, numbers):
print "Building %s zip codes" % numbers
zips = []
for i in xrange(numbers):
zips.append(random.randrange(10000000, 99999999))
zips.sort()
self.zips = zips
print "Done"
def testTiming(self):
r, u = self.dist.check(self.zips)
print "Total de CEPS encontrados, e CEPs sem faixa: ", len(r), len(u)
print "Criando %s faixas de CEP" % RANGES
perf_test = DistributorPerformance()
t1 = timeit.timeit(perf_test.setUp, number=1)
print "Tempo de criação: %.2fs\\n" % t1
print "Criando %s codigos de CEP, ordenados" % ZIPS
t2 = timeit.timeit(lambda: perf_test.zip_setup(ZIPS), number=1)
print "Tempo de criação: %.2fs\\n" % t2
print "Executando %s buscas em %s registros de faixas" %(ZIPS, RANGES)
t3 = timeit.timeit(perf_test.testTiming, number=1)
print "Tempo de busca: %.2fs \\n" % t3
print "Tempo total: %.2fs" % (t1 + t2 + t3)
unittest.main()