Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- # coding: utf8
- from __future__ import unicode_literals
- import sys, errno
- import signal, resource
- import operator as op, itertools as it, collections, functools as ft
- import fractions, bisect
- # runtime control functions
- def cpu_used():
- "Ask the system for CPU statistics"
- data= resource.getrusage(resource.RUSAGE_SELF)
- return data.ru_utime + data.ru_stime
- def possibly_report_locals(stack_frame):
- "Attempt to display status at the time of interruption"
- # only needed once here, so import locally
- import inspect, gc, types
- import ast
- while stack_frame:
- code= stack_frame.f_code
- for func in gc.get_referrers(code):
- if isinstance(func, types.FunctionType):
- reports= getattr(func, "__report_on_fail__", None)
- if not reports: continue
- sys.stderr.write("%s:\n" % func.__name__)
- for local_var in reports:
- try:
- body= ast.parse(local_var, mode='eval').body
- except SyntaxError:
- continue
- else:
- if isinstance(body, ast.Name):
- value= stack_frame.f_locals.get(local_var, Ellipsis)
- else:
- try:
- value= eval(local_var, stack_frame.f_locals)
- except Exception as exc:
- value= `exc`
- sys.stderr.write(" %s=%r\n" % (local_var, value))
- stack_frame= stack_frame.f_back
- def on_fail_report(*args):
- "A decorator for easy reporting of locals on fail"
- def _setter(func):
- func.__report_on_fail__= args
- return func
- return _setter
- def signal_xcpu_handler(sig_number, stack_frame):
- "Handle gracefully time limit"
- sys.stderr.write("Time limit reached, exiting.\n")
- possibly_report_locals(stack_frame)
- raise SystemExit
- def signal_usr1_handler(sig_number, stack_frame):
- "Report progress"
- print "cpu used so far:", cpu_used()
- possibly_report_locals(stack_frame)
- def only_one_minute(seconds=60):
- "Make sure that the program runs at most for a minute"
- signal.signal(signal.SIGXCPU, signal_xcpu_handler)
- signal.signal(signal.SIGUSR1, signal_usr1_handler) # report progress
- signal.signal(signal.SIGQUIT, signal_usr1_handler) # report progress
- # limit CPU time
- soft_limit, hard_limit= resource.getrlimit(resource.RLIMIT_CPU)
- soft_limit= seconds + cpu_used() + 1
- sys.stderr.write("Setting max cpu time: %g seconds\n" % soft_limit)
- resource.setrlimit(resource.RLIMIT_CPU, (seconds+cpu_used()+1, hard_limit))
- # limit data to 80% of machine RAM if possible
- try:
- with open("/proc/meminfo", "r") as fp:
- first_line= fp.readline() # assume it's MemTotal: 3014864 kB
- except IOError as exc:
- if exc.errno == errno.ENOENT: # non-existent
- pass
- else: raise
- else:
- kb_total= first_line.partition(':')[2].strip().partition(' ')
- if kb_total[2] == 'kB':
- memory_bytes= 1024*int(kb_total[0], 10)
- soft_limit, hard_limit= resource.getrlimit(resource.RLIMIT_DATA)
- soft_limit= memory_bytes//10*8
- soft_limit= 4096 * (soft_limit+4095)//4096
- sys.stderr.write("Setting max memory use: %g MiB\n" % (soft_limit/1048576.0))
- resource.setrlimit(resource.RLIMIT_DATA, (soft_limit, soft_limit))
- resource.setrlimit(resource.RLIMIT_RSS, (soft_limit, soft_limit))
- try:
- resource.setrlimit(resource.RLIMIT_VMEM, (soft_limit, soft_limit))
- except AttributeError: # no RLIMIT_VMEM here
- pass
- # utility functions
- @on_fail_report("n")
- def fibonaccis():
- "Generate Fibonacci sequence terms; NB: 1, 2, 3, 5, …"
- last_n= 1
- n= 1
- while 1:
- yield n
- n, last_n= n+last_n, n
- @on_fail_report("n")
- def factorial(n):
- "Return the factorial of a number"
- return reduce(op.mul, xrange(1, 1+n), 1)
- @on_fail_report("n")
- def triangulars():
- "Generate all triangular numbers"
- n= 0
- i= 1
- while 1:
- n+= i
- yield n
- i+= 1
- def triangular(n):
- "Return triangular number n ∈ [1…∞)"
- return n*(n + 1)//2
- @on_fail_report("n", "dn")
- def hexagonals():
- "Generate all hexagonal numbers"
- n= 1
- dn= 5
- while 1:
- yield n
- n+= dn
- dn+= 4
- def hexagonal(n):
- "Return hexagonal number n ∈ [1…∞)"
- return n*(2*n-1)
- @on_fail_report("n", "dn")
- def pentagonals():
- "Generate all pentagonal numbers"
- n= 1
- dn= 4
- while 1:
- yield n
- n+= dn
- dn+= 3
- def pentagonal(n):
- "Return pentagonal number n ∈ [1…∞)"
- return n*(3*n - 1)//2
- @on_fail_report("best_n")
- def min_n_satisfying(limit, op, function):
- "Return minimum n satisfying op(limit, function(n))"
- n= 1
- value= function(n)
- while not op(limit, value):
- n<<= 1
- value= function(n)
- best_n= n
- bit= n>>1
- n-= 1
- while bit:
- if op(limit, function(n - bit)):
- n-= bit
- best_n= n
- bit>>= 1
- return best_n
- @on_fail_report("q")
- def primegen():
- "Generate all primes as fast as possible"
- D = { 9: 3, 25: 5 }
- yield 2
- yield 3
- yield 5
- MASK= 1, 0, 1, 1, 0, 1, 1, 0, 1, 0, 0, 1, 1, 0, 0,
- MODULOS= frozenset( (1, 7, 11, 13, 17, 19, 23, 29) )
- for q in it.compress(
- it.islice(it.count(7), 0, None, 2),
- it.cycle(MASK)):
- p= D.pop(q, None)
- if p is None:
- D[q*q]= q
- yield q
- else:
- x= q + 2*p
- while x in D or (x%30) not in MODULOS:
- x+= 2*p
- D[x]= p
- class Factorizer(object):
- "A class to be used for multiple factorizing related questions."
- def __init__(self):
- self.primes= collections.deque()
- self._primegen= primegen()
- self.primes.append(next(self._primegen))
- self.primes.append(next(self._primegen))
- @on_fail_report("prime")
- def _enlarge_primes(self):
- "Generate more primes while appending them to self.primes for later reference"
- while 1:
- prime= next(self._primegen)
- self.primes.append(prime)
- ## sys.stderr.write("%d\r" % prime)
- yield prime
- @on_fail_report("n")
- def is_prime(self, n):
- while self.primes[-1] < n:
- self.primes.append(next(self._primegen))
- return self.primes[bisect.bisect_left(self.primes, n)] == n
- @on_fail_report("n", "prime")
- def prime_factors(self, n):
- "Generate all prime factors of n"
- for prime in it.chain(self.primes, self._enlarge_primes()):
- if prime*prime > n:
- yield n
- break
- while n > 1:
- new_n, modulo= divmod(n, prime)
- if modulo == 0:
- n= new_n
- yield prime
- else:
- break
- if n == 1:
- break
- @on_fail_report("n", "prime")
- def grouped_factors(self, n):
- "Return (prime, power) for every prime factor of n"
- for prime, occurences in it.groupby(self.prime_factors(n)):
- occurences_count= sum(1 for _ in occurences)
- yield prime, occurences_count
- @on_fail_report("n")
- def divisor_count(self, n):
- "Return the divisor count of n"
- if n == 1: return 1
- divisor_count= 1
- for prime, power in self.grouped_factors(n):
- divisor_count*= 1 + power
- return divisor_count
- @on_fail_report("n")
- def divisors(self, n):
- if n == 1: return 1,
- terms= []
- for prime, power in self.grouped_factors(n):
- terms.append([prime**p for p in xrange(power+1)])
- return set(
- reduce(op.mul, factors, 1)
- for factors in
- it.product(
- *(
- [prime**p for p in xrange(power+1)]
- for prime, power in self.grouped_factors(n)
- )
- )
- )
- @on_fail_report("n", "prime")
- def prime_factors(n):
- "Return the prime factors of n (oneshot)"
- factorizer= Factorizer()
- for prime in factorizer.prime_factors(n):
- yield prime
- @on_fail_report("m", "n")
- def primitive_pythagorean_triples():
- m= 2
- while 1:
- for n in xrange(m-1, 0, -2):
- if fractions.gcd(m, n) == 1:
- yield m*m - n*n, 2*m*n, m*m + n*n
- m+= 1
- @on_fail_report("triple", "factor")
- def all_pythagorean_triples(n=1000):
- faults= 0
- for triple in primitive_pythagorean_triples():
- perimeter= sum(triple)
- if perimeter > n:
- faults+= 1
- if faults > 10: break
- continue
- factor= 1
- while factor*perimeter <= n:
- yield factor*perimeter, (factor*triple[0], factor*triple[1], factor*triple[2])
- factor+= 1
- factorizer= Factorizer()
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement