We discussed another text-compression algorithm named Lempel-Ziv. Then we discussed Error correction and detection codes (Specifically, we saw a new code: Index code)
from IPython.core.interactiveshell import InteractiveShell
InteractiveShell.ast_node_interactivity = "all"
The LZ compression algorithm works by encoding succinct representations of repetitions in the text. A repetition of the form $[offset,length]$, means that the next $length$ characters appeared $offset$ characters earlier.
import math
def str_to_ascii(text):
""" Gets rid of on ascii characters in text"""
return ''.join(ch for ch in text if ord(ch)<128)
def maxmatch(T, p, w=2**12-1, max_length=2**5-1):
""" finds a maximum match of length k<=2**5-1 in a
w long window, T[p:p+k] with T[p-m:p-m+k].
Returns m (offset) and k (match length) """
assert isinstance(T,str)
n = len(T)
maxmatch = 0
offset = 0
# Why do we need the min here?
for m in range(1, min(p+1, w)):
k = 0
# Why do we need the min here?
while k < min(n-p, max_length) and T[p-m+k] == T[p+k]:
k += 1
# at this point, T[p-m:p-m+k]==T[p:p+k]
if maxmatch < k:
maxmatch = k
offset = m
return offset, maxmatch
# returned offset is smallest one (closest to p) among
# all max matches (m starts at 1)
def LZW_compress(text, w=2**12-1, max_length=2**5-1):
"""LZW compression of an ascii text. Produces
a list comprising of either ascii characters
or pairs [m,k] where m is an offset and
k is a match (both are non negative integers) """
result = []
n = len(text)
p = 0
while p<n:
m,k = maxmatch(text, p, w, max_length)
if k<2:
result.append(text[p]) # a single char
p += 1
else:
result.append([m,k]) # two or more chars in match
p += k
return result # produces a list composed of chars and pairs
def LZW_decompress(compressed, w=2**12-1, max_length=2**5-1):
"""LZW decompression from intermediate format to ascii text"""
result = []
n = len(compressed)
p = 0
while p<n:
if type(compressed[p]) == str: # char, as opposed to a pair
result.append(compressed[p])
p+=1
else:
m,k = compressed[p]
p += 1
for i in range(0,k):
# append k times to result;
result.append(result[-m])
# fixed offset m "to the left", as result itself grows
return "".join(result)
def LZW_compress2(text, w=2**12-1, max_length=2**5-1):
"""LZW compression of an ascii text. Produces
a list comprising of either ascii characters
or pairs [m,k] where m is an offset and
k>3 is a match (both are non negative integers) """
result = []
n = len(text)
p = 0
while p<n:
m,k = maxmatch(text, p, w, max_length)
if k<3: # modified from k<2
result.append(text[p]) # a single char
p += 1 #even if k was 2 (why?)
else:
result.append([m,k]) # two or more chars in match
p += k
return result # produces a list composed of chars and pairs
def inter_to_bin(lst, w=2**12-1, max_length=2**5-1):
""" converts intermediate format compressed list
to a string of bits"""
offset_width = math.ceil(math.log(w,2))
match_width = math.ceil(math.log(max_length, 2))
#print(offset_width,match_width) # for debugging
result = []
for elem in lst:
if type(elem) == str:
result.append("0")
result.append('{:07b}'.format(ord(elem)))
elif type(elem) == list:
result.append("1")
m,k = elem
result.append(str(bin(m)[2:]).zfill(offset_width))
result.append(str(bin(k)[2:]).zfill(match_width))
return "".join(ch for ch in result)
def bin_to_inter(compressed, w=2**12-1, max_length=2**5-1):
""" converts a compressed string of bits
to intermediate compressed format """
offset_width = math.ceil(math.log(w,2))
match_width = math.ceil(math.log(max_length,2))
#print(offset_width,match_width) # for debugging
result = []
n = len(compressed)
p = 0
while p<n:
if compressed[p] == "0": # single ascii char
p += 1
char = chr(int(compressed[p:p+7], 2))
result.append(char)
p += 7
elif compressed[p] == "1": # repeat of length > 2
p += 1
m = int(compressed[p:p+offset_width],2)
p += offset_width
k = int(compressed[p:p+match_width],2)
p += match_width
result.append([m,k])
return result
#########################
### Various executions
#########################
def process1(text):
""" packages the whole process using LZ_compress """
atext = str_to_ascii(text)
inter = LZW_compress(atext)
binn = inter_to_bin(inter)
inter2 = bin_to_inter(binn)
text2 = LZW_decompress(inter)
return inter, binn, inter2, text2
def process2(text):
""" packages the whole process using LZ_compress2 """
atext = str_to_ascii(text)
inter = LZW_compress2(atext)
binn = inter_to_bin(inter)
inter2 = bin_to_inter(binn)
text2 = LZW_decompress(inter2)
return inter, binn, inter2, text2
inter1, bits, inter2, text2 = process2("abcdabc")
What is the encoded representation?
inter1
bits
['a', 'b', 'c', 'd', [4, 3]]
'01100001011000100110001101100100100000000010000011'
len(bits)#8*4+18
50
How do we really encode the chars?
[bits[8*i:8*(i+1)] for i in range(4)]
[bin(ord(c))[2:] for c in "abcd"]
['01100001', '01100010', '01100011', '01100100']
['1100001', '1100010', '1100011', '1100100']
Going back
text2
'abcdabc'
Another example
inter1, bits, inter2, text2 = process2("xyzxyzwxyzw")
inter1
['x', 'y', 'z', [3, 3], 'w', [4, 4]]
def lz_ratio(text):
inter1, bits, inter2, text2 = process2(text)
return len(bits)/(len(text)*7)
lz_ratio('a' * 1000)
lz_ratio('a' * 10000)
0.086
0.08317142857142858
So what happens for increasing $lz\_ratio('a' \cdot n)$ as $n \to \infty$?
Each repetition in the LZ compression requires 18 bits and we need $n/31$ such windows. On the other hand, the regular ASCII representation requires 7 bits per character, thus the ratio approaches $18/(31\cdot 7)$
18/(31*7)
0.08294930875576037
lz_ratio('hello')
lz_ratio('hello' * 2)
(8*5+18) / (7*10)
1.1428571428571428
0.8285714285714286
0.8285714285714286
def ascii2bit_stream(text):
""" Translates ASCII text to binary reprersentation using
7 bits per character. Assume only ASCII chars """
return "".join([bin(ord(c))[2:].zfill(7) for c in text])
########################################################
#### HUFFMAN CODE
########################################################
def char_count(text):
""" Counts the number of each character in text.
Returns a dictionary, with keys being the observed characters,
values being the counts """
d = {}
for ch in text:
if ch in d:
d[ch] += 1
else:
d[ch] = 1
return d
def build_huffman_tree(char_count_dict):
""" Recieves dictionary with char:count entries
Generates a LIST structure representing
the binary Huffman encoding tree """
queue = [(c,cnt) for (c,cnt) in char_count_dict.items()]
while len(queue) > 1:
#print(queue)
# combine two smallest elements
A, cntA = extract_min(queue) # smallest in queue
B, cntB = extract_min(queue) # next smallest
chars = [A,B]
weight = cntA + cntB # combined weight
queue.append((chars, weight)) # insert combined node
# only root node left
#print("final queue:", queue)
root, weight_trash = extract_min(queue) # weight_trash unused
return root #a LIST representing the tree structure
##def extract_min(queue):
## """ queue is a list of 2-tuples (x,y).
## remove and return the tuple with minimal y """
## min_pair = queue[0]
## for pair in queue:
## if pair[1] < min_pair[1]:
## min_pair = pair
##
## queue.remove(min_pair)
## return min_pair
def extract_min(queue): #the shorter, "Pythonic" way
""" queue is a list of 2-tuples (x,y).
remove and return the tuple with minimal y """
min_pair = min(queue, key = lambda pair: pair[1])
queue.remove(min_pair)
return min_pair
def generate_code(huff_tree, prefix=""):
""" Receives a Huffman tree with embedded encoding,
and a prefix of encodings.
returns a dictionary where characters are
keys and associated binary strings are values."""
if isinstance(huff_tree, str): # a leaf
return {huff_tree: prefix}
else:
lchild, rchild = huff_tree[0], huff_tree[1]
codebook = {}
codebook.update(generate_code(lchild, prefix+'0'))
codebook.update(generate_code(rchild, prefix+'1'))
# oh, the beauty of recursion...
return codebook
def compress(text, encoding_dict):
""" compress text using encoding dictionary """
assert isinstance(text, str)
return "".join(encoding_dict[ch] for ch in text)
def reverse_dict(d):
""" build the "reverse" of encoding dictionary """
return {y:x for (x,y) in d.items()}
def decompress(bits, decoding_dict):
prefix = ""
result = []
for bit in bits:
prefix += bit
if prefix in decoding_dict:
result.append(decoding_dict[prefix])
prefix = "" #restart
assert prefix == "" # must finish last codeword
return "".join(result) # converts list of chars to a string
def compression_ratio(text, corpus):
d = generate_code(build_huffman_tree(char_count(corpus)))
len_compress = len(compress(text, d))
len_ascii = len(ascii2bit_stream(text)) #len(text)*7
return len_compress/len_ascii
import random
asci = str.join("",[chr(c) for c in range(128)])
rand = "".join([random.choice(asci) for i in range(1000)])
s = "".join([chr(ord("a")+i-1)*i for i in range(1,27)])
s
'abbcccddddeeeeeffffffggggggghhhhhhhhiiiiiiiiijjjjjjjjjjkkkkkkkkkkkllllllllllllmmmmmmmmmmmmmnnnnnnnnnnnnnnoooooooooooooooppppppppppppppppqqqqqqqqqqqqqqqqqrrrrrrrrrrrrrrrrrrsssssssssssssssssssttttttttttttttttttttuuuuuuuuuuuuuuuuuuuuuvvvvvvvvvvvvvvvvvvvvvvwwwwwwwwwwwwwwwwwwwwwwwxxxxxxxxxxxxxxxxxxxxxxxxyyyyyyyyyyyyyyyyyyyyyyyyyzzzzzzzzzzzzzzzzzzzzzzzzzz'
def compare_lz_huffman(text):
print("Huffman compression ratio:", compression_ratio(text,text+asci))
print("LZ compression ratio:", lz_ratio(text))
compare_lz_huffman(rand)
compare_lz_huffman(s)
compare_lz_huffman(open("looking_glass.txt", "r").read()[:10000])
Huffman compression ratio: 0.9888571428571429 LZ compression ratio: 1.1428571428571428 Huffman compression ratio: 0.6866096866096866 LZ compression ratio: 0.2629222629222629 Huffman compression ratio: 0.6538857142857143 LZ compression ratio: 0.6195142857142857
An explanation of the results can be found here
Please read the following summary of what we did in class. The summary is available here.