Think of the count-min sketch as a generalization of the Bloom filter: instead of overestimating whether or not we've seen a certain key, the count-min sketch overestimates how many times we've seen it. You could implement a precise structure to solve this problem with a map from keys to counts (a tree, an associative array, or a hash table, for example), but -- just as with the Bloom filter -- there are cases in which the space requirements of a precise structure may be unacceptable.
We'll start by importing some necessary libraries -- numpy
, pandas
, and our hash functions -- again.
from datasketching.hashing import hashes_for
import numpy as np
import pandas as pd
class CMS(object):
def __init__(self, width, hashes):
""" Initializes a Count-min sketch with the
given width and a collection of hashes,
which are functions taking arbitrary
values and returning integers. The depth
of the sketch structure is taken from the
number of supplied hash functions.
hashes can be either a function taking
a value and returning a list of results
or a list of functions. In the latter
case, this constructor will synthesize
the former """
self.__width = width
if hasattr(hashes, '__call__'):
self.__hashes = hashes
# inspect the tuple returned by the hash function to get a depth
self.__depth = len(hashes(bytes()))
else:
funs = hashes[:]
self.__depth = len(hashes)
def h(value):
return [int(f(value)) for f in funs]
self.__hashes = h
self.__buckets = np.zeros((int(width), int(self.__depth)), np.uint64)
def width(self):
return self.__width
def depth(self):
return self.__depth
def insert(self, value):
""" Inserts a value into this sketch """
for (row, col) in enumerate(self.__hashes(value)):
self.__buckets[col % self.__width][row] += 1
def lookup(self, value):
""" Returns a biased estimate of number of times value has been inserted in this sketch"""
return min([self.__buckets[col % self.__width][row] for (row, col) in enumerate(self.__hashes(value))])
def merge_from(self, other):
""" Merges other in to this sketch by
adding the counts from each bucket in other
to the corresponding buckets in this
Updates this. """
self.__buckets += other.__buckets
def merge(self, other):
""" Creates a new sketch by merging this sketch's
counts with those of another sketch. """
cms = CMS(self.width(), self.__hashes)
cms.__buckets += self.__buckets
cms.__buckets += other.__buckets
return cms
def inner(self, other):
""" returns the inner product of self and other, estimating
the equijoin size between the streams modeled by
self and other """
r, = np.tensordot(self.__buckets, other.__buckets).flat
return r
def minimum(self, other):
""" Creates a new sketch by taking the elementwise minimum
of this sketch and another. """
cms = CMS(self.width(), self.__hashes)
np.minimum(self.__buckets, other.__buckets, cms.__buckets)
return cms
def dup(self):
cms = CMS(self.width(), self.__hashes)
cms.merge_from(self)
return cms
cms = CMS(16384, hashes_for(3,8))
cms.lookup("foo")
cms.insert("foo")
cms.lookup("foo")
While hash collisions in Bloom filters lead to false positives, hash collisions in count-min sketches lead to overestimating counts. To see how much this will affect us in practice, we can design an empirical experiment to plot the cumulative distribution of the factors that we've overestimated counts by in sketches of various sizes.
def cms_experiment(sample_count, size, hashes, seed=0x15300625):
import random
from collections import namedtuple
random.seed(seed)
cms = CMS(size, hashes)
result = []
total_count = 0
# update the counts
for i in range(sample_count):
bits = random.getrandbits(64)
if i % 100 == 0:
# every hundredth entry is a heavy hitter
insert_count = (bits % 512) + 1
else:
insert_count = (bits % 8) + 1
for i in range(insert_count):
cms.insert(bits)
random.seed(seed)
# look up the bit sequences again
for i in range(sample_count):
bits = random.getrandbits(64)
if i % 100 == 0:
# every hundredth entry is a heavy hitter
expected_count = (bits % 512) + 1
else:
expected_count = (bits % 8) + 1
result.append((int(cms.lookup(bits)), int(expected_count)))
return result
import altair as alt
alt.renderers.enable('notebook')
results = cms_experiment(1 << 14, 4096, hashes_for(3, 8))
df = pd.DataFrame.from_records(results)
df.rename(columns={0: "actual count", 1: "expected count"}, inplace=True)
from statsmodels.distributions.empirical_distribution import ECDF
df2 = pd.DataFrame()
df2["overestimation factor"] = (df["actual count"] / df["expected count"]).sort_values()
ecdf = ECDF(df2["overestimation factor"])
df2["percentage of samples overestimated by less than"] = ecdf(df2["overestimation factor"])
alt.Chart(df2.drop_duplicates()).mark_line().encode(x="overestimation factor", y="percentage of samples overestimated by less than")
As you can see, about 55% of our counts for this small sketch are overestimated by less than a factor of three, although the worst overestimates are quite large indeed. Let's try with a larger sketch structure.
results = cms_experiment(1 << 14, 8192, hashes_for(3, 8))
df = pd.DataFrame.from_records(results)
df.rename(columns={0: "actual count", 1: "expected count"}, inplace=True)
df2 = pd.DataFrame()
df2["overestimation factor"] = (df["actual count"] / df["expected count"]).sort_values()
ecdf = ECDF(df2["overestimation factor"])
df2["percentage of samples overestimated by less than"] = ecdf(df2["overestimation factor"])
alt.Chart(df2.drop_duplicates()).mark_line().encode(x="overestimation factor", y="percentage of samples overestimated by less than")
With a larger filter size (columns) and more hash functions (rows), we can dramatically reduce the bias.
results = cms_experiment(1 << 14, 8192, hashes_for(8, 5))
df = pd.DataFrame.from_records(results)
df.rename(columns={0: "actual count", 1: "expected count"}, inplace=True)
df2 = pd.DataFrame()
df2["overestimation factor"] = (df["actual count"] / df["expected count"]).sort_values()
ecdf = ECDF(df2["overestimation factor"])
df2["percentage of samples overestimated by less than"] = ecdf(df2["overestimation factor"])
alt.Chart(df2.drop_duplicates()).mark_line().encode(x="overestimation factor", y="percentage of samples overestimated by less than")
In this exercise, you'll combine a count-min sketch and an auxiliary data structure in order to approximately track the top k elements in a stream. When we insert something into a count-min sketch, we can immediately look it up to get an estimate of how many times we've seen it. We can then check a running list of (some of) the most frequent items we've seen and update that list if the count-min sketch indicates that the item we've just added is one of the top elements.
A priority heap is an efficient way to track items by a priority ordering; we can use it to track (some of) the most frequent items we've seen. Python's heapq
module provides an implementation of priority queues. If we store tuples of counts and items, we can use the count as the priority ordering; because Python's heapq
module places the minimum element first, we'll need to invert the priority of counts.
Let's see a couple of ways to do this:
import heapq
one = []
two = []
counts = [700, 600, 500, 400]
items = ["seven hundred", "six hundred", "five hundred", "four hundred"]
for item in zip(counts, items):
# either negate the count...
heapq.heappush(one, (-item[0], item[1]))
# ...or put things in normally and use a
# different ordering function later
heapq.heappush(two, item)
# note that we'll ask for the "smallest"
# items since we're inverting the priority
print([(-count, item) for count, item in heapq.nsmallest(4, one)])
print(heapq.nsmallest(4, two, key=lambda t: -t[0]))
Now it's time to implement the top-k structure. Read the constructor code (__init__
) to see what members the TopK
object has, and fill in the code that the FIXME
comments ask you to. If you get stuck, check out the solution!
import datasketching.cms as cms
import heapq
MAX_OVERHEAD = 10
class TopK(object):
def __init__(self, k, width, hashes):
self.width = width
self.hashes = hashes
self.cms = cms.CMS(width, hashes)
self.k = k
# this is a priority queue; manage it with the heapq module
self.queue = []
# Hint #1: you'll use this to see whether you're inserting
# a new item or updating one that's already in the queue
# Hint #2: you won't keep _everything_ you've seen in this
# set; just the objects that are in the queue
self.seen = set()
def insert(self, obj):
""" Inserts _obj_ in this summary and updates the top k elements if this
element is likely to be in the top k elements as given by the underlying top-k sketch """
# Identify how many times you've seen `obj` already
self.cms.insert(obj)
count = self.cms.lookup(obj)
# FIXME: write code to insert _obj_ into
# the priority queue (`self.queue`).
# Hint #1: How would you sort the priority queue to ensure that it contains the top k elements?
# Hint #2: Python's `heapq.heappush` function puts the smallest things first
# Hint #3: What happens when you need to update the count of something that's already in the queue?
# Hint #4: How will you ensure that the priority queue doesn't grow unbounded?
def topk(self):
""" Returns a list of 2-tuples (value, count) for
the top k elements in this structure """
# FIXME: replace this line with code that
# uses heapq.nsmallest to return the top k elements
return self.queue
def merge(self, other):
# Merge the two count-min sketches
result = TopK(self.width, self.hashes)
result.cms.merge_from(self.cms)
result.cms.merge_from(other.cms)
# determine how many elements to keep
# from the combined priority queue
newsize = max(int(self.k * MAX_OVERHEAD / 2), 1)
# FIXME: write code to merge the two queues here
# Hint #1:
return result
We can run an experiment to see how our top-k structure behaves:
import datasketching.cms as cms
def topk_experiment(sample_count, size, hashes, k=10, seed=0x15300625):
import random
from collections import namedtuple
random.seed(seed)
topk = TopK(k, size, hashes)
result = []
total_count = 0
# update the counts
for i in range(sample_count):
bits = random.getrandbits(64)
if i % 100 == 0:
# every hundredth entry is a heavy hitter
insert_count = (bits % 512) + 1
else:
insert_count = (bits % 8) + 1
for i in range(insert_count):
topk.insert(bits)
return topk.topk()
from datasketching.hashing import hashes_for
topk_experiment(40000, 16384, hashes_for(3,8), k=20)
Here are some more exercises to try out if you're interested in extending the count-min sketch:
minimum
method. What might it be useful for? What limitations might it have?