Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- from collections import defaultdict
- from fractions import Fraction
- from math import log2, ceil
- END_OF_MESSAGE = 256
- from functools import wraps
- from time import time
- def timing(f):
- @wraps(f)
- def wrap(*args, **kw):
- ts = time()
- result = f(*args, **kw)
- te = time()
- print('func:%r args:[%r, %r] took: %2.4f sec' % \
- (f.__name__, args, kw, te-ts))
- return result
- return wrap
- def build_prob(occurences_list):
- # occurences_list = {
- # "A":2,
- # "B":1,
- # "C":1,
- # END_OF_MESSAGE:1
- # }
- output_prob = dict()
- occurences_all_count = sum(occurences_list.values())
- cumulative_count = 0
- for symbol, occurences in occurences_list.items():
- # prob_pair = Fraction(cumulative_count, occurences_all_count), Fraction(cumulative_count + occurences, occurences_all_count)
- prob_pair =cumulative_count/ occurences_all_count, (cumulative_count + occurences)/occurences_all_count
- output_prob[symbol] = prob_pair
- cumulative_count += occurences
- return output_prob
- def returnBits(interval, bit_counter):
- output = ""
- while True:
- l, r = interval
- if r < 0.5: #if interval is contained in [0, 0.5)
- output += '0' + '1'*bit_counter
- bit_counter = 0
- interval = 2*l, 2*r
- elif l >= 0.5: #if interval is contained in [0.5, 1)
- output += '1' + '0'*bit_counter
- interval = 2*l - 1, 2*r - 1
- bit_counter = 0
- elif (l >= 0.25 and r < 0.75): #if interval is contained in [0.25, 0.75)
- bit_counter += 1
- interval = 2*l - 0.5, 2*r - 0.5
- else:
- break
- return interval, bit_counter, output
- def end_arithmetic_encoding(interval, bit_counter):
- bit_counter += 1
- l, r = interval
- if l < 0.25:
- output = '0' + '1'*bit_counter
- else:
- output = '1' + '0'*bit_counter
- return output
- def arithmetic_encoding(input_codes, occurences):
- start = 0.0
- end = 1.0
- output = ""
- bit_counter = 0
- occurences_list = occurences.copy()
- for code in input_codes:
- probability_dict = build_prob(occurences_list)
- l, r = probability_dict[code]
- start, end = start + (end - start)*l, start + (end - start)*r
- (start, end), bit_counter, temp_output = returnBits((start, end), bit_counter)
- output += temp_output
- occurences_list[code] += 1
- temp_output = end_arithmetic_encoding((start, end), bit_counter)
- output += temp_output
- return output
- # def arithmetic_decoding(input_fraction, occurences_list):
- # start = 0.0
- # end = 1.0
- # output_codes = []
- # code = None
- # while code != END_OF_MESSAGE:
- # probability_dict = build_prob(occurences_list)
- # for code, (l, r) in probability_dict.items():
- # if(start + (end-start)*l <= input_fraction < start + (end-start)*r):
- # start, end = start + (end-start)*l, start + (end-start)*r
- # if(code != END_OF_MESSAGE):
- # output_codes.append(code)
- # print(input_fraction)
- # while True:
- # if end < 0.5: #if interval is contained in [0, 0.5)
- # pass
- # elif start >= 0.5: #if interval is contained in [0.5, 1)
- # input_fraction -= 0.5
- # start -= 0.5
- # end -= 0.5
- # elif (start >= 0.25 and end <= 0.75): #if interval is contained in [0.25, 0.75)
- # input_fraction -= 0.25
- # start -= 0.25
- # end -= 0.25
- # else:
- # break
- # start *= 2
- # end *= 2
- # input_fraction *= 2
- # occurences_list[code] += 1
- # break
- # return "".join(output_codes)
- def arithmetic_decoding(input_bits, occurences):
- start = 0.0
- end = 1.0
- output_codes = []
- code = None
- occurences_list = occurences.copy()
- interval_sizes = [r-l for (l,r) in build_prob(occurences_list).values()]
- smallest_probability = min(interval_sizes)
- # buffer_size = 2 + ceil(-log2(smallest_probability))
- buffer_size = len(input_bits)
- buffer = BitBuffer(buffer_size, input_bits)
- buffer.load_buffer()
- while code != END_OF_MESSAGE:
- probability_dict = build_prob(occurences_list)
- for code, (l, r) in probability_dict.items():
- buffer_value = float(buffer)
- if(start + (end-start)*l <= buffer_value < start + (end-start)*r):
- start, end = start + (end-start)*l, start + (end-start)*r
- if(code != END_OF_MESSAGE):
- output_codes.append(code)
- while True:
- if end < 0.5: #if interval is contained in [0, 0.5)
- pass
- elif start >= 0.5: #if interval is contained in [0.5, 1)
- buffer.substract_bit(0) #substract 0.5
- start -= 0.5
- end -= 0.5
- elif (start >= 0.25 and end < 0.75): #if interval is contained in [0.25, 0.75)
- buffer.substract_bit(1) #substract 0.25
- start -= 0.25
- end -= 0.25
- else:
- break
- start *= 2
- end *= 2
- buffer <<= 1 # shift all bits in buffer to the left, and move one bit from input to the right
- occurences_list[code] += 1
- break
- return (output_codes)
- @timing
- def encode_from_file(in_filename, out_filename='out.bin'):
- occurences = {x: 1 for x in range(256+1)}
- data = list(open(in_filename, "rb").read())
- encoded_binary = arithmetic_encoding(data+[END_OF_MESSAGE], occurences)
- bits = bitarray(encoded_binary)
- with open(out_filename, 'wb') as f:
- bits.tofile(f)
- @timing
- def decode_from_file(in_filename, out_filename='decoded.bin'):
- occurences = {x: 1 for x in range(256+1)}
- encoded_binary = bitarray()
- with open(in_filename, 'rb') as f:
- encoded_binary.fromfile(f)
- decoded = arithmetic_decoding(encoded_binary, occurences)
- with open(out_filename, 'wb') as f:
- f.write(bytearray(decoded))
- # occurences = {x: 1 for x in range(256+1)}
- # message = "ABCA"
- # message = [0, 1, 2, 3, 2, 2, 2, 255, 1, 2]
- # encoded_binary = arithmetic_encoding(message+[END_OF_MESSAGE], occurences)
- # print(encoded_binary)
- # decoded = arithmetic_decoding(encoded_binary, occurences) #
- # print(message)
- # print(decoded)
- # assert message == decoded
- # print("ok")
- in_file = 'test0.bin'
- encoded_file = 'out.bin'
- decoded_file = 'decoded0.bin'
- print('starting encoding')
- encode_from_file(in_file, encoded_file)
- print('finished encoding')
- print('starting decoding')
- decode_from_file(encoded_file, decoded_file)
- print('finished decoding')
- import filecmp
- if filecmp.cmp(in_file, decoded_file):
- print("DZIALA!")
- else:
- print("jest problem")
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement