Probabilistic Data Structures for Realtime Analytics

About me

  • My name is Martin Laprise (@mlaprise)
  • Algorithms Lead @ Parse.ly
  • Discovered Parse.ly @ PyData 2012 !

Parse.ly

  • Parse.ly --> analytics company
  • Dash is a analytics dashboard for big news providers
    • The Atlantic, Mashable, Arstechnica
    • etc.
  • Multiple realtime metrics
    • PVs
    • Uniques
    • Social
    • etc.
  • Content Analysis on the publisher content
  • Recommendation API

Parse.ly as a stream processor

  • Input: Stream of events from all our publishers (js code)
  • Output: Insighful metrics and analysis

Data Stream Model

  • All this with those constraints:
    • Computed "on the fly" (realtime-ish)
    • Sequential order
    • Single pass on a the data
    • Can't iterate over the entire dataset
  • Probabilitics data structure are very useful in this context
  • Data Stream Model
    • Necessary even if you're not realtime (scalability)

Probabilitics data structure

  • No Uber-Datastructure, pick the right one.
  • more actionable to know a bunch of them
  • Overview of:
    • Bloom Filter
    • Temporal Bloom Filter
    • Count-Min Sketch
    • HyperLogLog
  • Easy to implemented... harder to understand
  • https://github.com/Parsely/python-pds.git

Pattern: Have we met before ?

  • Trigger an action when you see an item for the first time
  • Ex:
    • We see an url for the first time --> Crawl the link
    • We see a social network as a referrer --> fetch the number of share
  • Look at ALL items in the stream
  • Can't query the databases every time
  • Need to have an in-memory data structure
    • Python Set,
    • Set in a Redis database , etc...
In [37]:
known_urls = set()

known_urls.add('http://www.google.com')
known_urls.add('http://www.pydata.org')

known_urls2 = set()
known_urls2.add('http://www.python.org')
In [38]:
'http://www.pydata.org' in known_urls # Membership
Out[38]:
True
In [39]:
print len(known_urls) # Cardinality
2
In [40]:
'http://www.pydata.org' in (known_urls or known_urls2) # Union
Out[40]:
True
In [41]:
'http://www.pydata.org' in (known_urls and known_urls2) # Intersection
Out[41]:
False
In [42]:
print known_urls # Retrieve the actual content
set(['http://www.google.com', 'http://www.pydata.org'])
In [43]:
known_urls.discard('http://www.google.com') # Delete an element in the set
In [44]:
print known_urls
set(['http://www.pydata.org'])
  • Do we really need all those query ? -> Most of time : not really
  • Can I estimate some of them ?
  • pds typically built for one specific kind of query

Typical implementation on a set

Hashtable

  • HashTable:

    • Multiple buckets
    • Good hash function (ex: 32-bits murmur hash)
    • Collision resolution (ex: linked list)
  • Available Query:

    • Membership
    • Cardinality
    • Union, Intersection
    • Retrieve values
    • Delete an items
  • Good:

    • Can solve most of your problem !
    • Speed
    • CPU : $ M \sim O\left( 1 \right) $
    • Can retrieve data
    • No error
  • Limitation:

    • Memory: $ M \sim O\left( c \cdot n \cdot log(N) \right) $
In [45]:
import numpy as np
import matplotlib.pyplot as plt
import sys,os
In [46]:
capacities = np.linspace(100,int(1E9))
typical_url = 'http://arstechnica.com/science/2013/10/new-type-of-quantum-excitation-behaves-like-a-solitary-particle/'
memory_footprint = len(typical_url) * 8
memory_scaling = [memory_footprint * c for c in capacities]

plt.plot(capacities, [((n / (8))) / (1024 * 1024 * 1024) for n in memory_scaling])
plt.grid()
plt.title('Memory consumption for a set')
plt.xlabel('Nbr. of items')
plt.ylabel('Memory consumption in GB')
Out[46]:
<matplotlib.text.Text at 0x1087230d0>

Almost-correct hashtable for membership

  • What if we only need to do a membership query ? (no cardinality, no content)
  • We can afford some errors
  • Transform a classical hashtable to a more lightweith structure (with error):
    • Don't store the content !
In [47]:
from hashfunctions import generate_hashfunctions
In [48]:
class LightweightHashtable(object):
    """ Almost-correct hashtable for membership query only """

    def __init__(self, nbr_buckets=10):
        self.bitarray = [0] * nbr_buckets
        self.hashfunction = generate_hashfunctions(nbr_buckets,1)

    def add(self, element):
        self.bitarray[self.hashfunction(element)[0]] = 1

    def __contains__(self, element):
        """ Membership query """
        return self.bitarray[self.hashfunction(element)[0]] == 1
In [110]:
s = LightweightHashtable(10)
for i in range(10):
    s.add(str(i))
In [111]:
s.bitarray
Out[111]:
[1, 1, 0, 1, 1, 0, 0, 1, 1, 1]
In [51]:
'1' in s # Membership query available
Out[51]:
True
In [52]:
'105' in s
Out[52]:
False
In [53]:
'22' in s # False positive
Out[53]:
True
  • False positive --> no collision resolution
  • How can we reduce the amount of false positives ?
    • Increase the number of bits (M)
    • Use multiple hash functions (k)
  • Lightweight hash tables with optimal k and M values -> Bloom Filter

Bloom Filter

  • Procedure:
    1. We instantiate a array of bit initialized to zero
    2. Hash the content using k hash functions
    3. Each filter updates set the bit in the k positions
    4. Membership if all bit are set
  • Classical Bloom Filter use a single bitarray with multiple hash
  • Popular variant: partitioning the M bits among the k hash functions:
  • How many hash functions should I use ?
    • Using many hash should decrease error no ?
    • Probability of error(collisation) $ \approx $ number of 1 in the array
    • Multiple hash is good (error rate) ... up to a point of diminishing return
    • For a limited amount of bits: there is optimal number of hash functions
  • In this variant the optimal numder of hashes for a specific error rate P is $ k = log_2 \left( \frac{1}{P} \right) $
  • If we want to store $n_{max}$ with a an error rate P, the optimal number of bits per slice is: $ m = \frac{n_{max} \cdot |ln(P)|}{k \cdot (ln(2))^2} $
    • Important: capacity $n_{max}$
    • $n_{max}$ : half of the bit are 1
In [54]:
class BloomFilter(object):
    """ Basic Bloom Filter """

    def __init__(self, capacity, error_rate):
        self.error_rate = error_rate
        self.capacity = capacity
        self.nbr_slices = int(np.ceil(np.log2(1.0 / error_rate)))
        self.bits_per_slice = int(np.ceil((capacity * abs(np.log(error_rate))) / (self.nbr_slices * (np.log(2) ** 2))))
        self.nbr_bits = self.nbr_slices * self.bits_per_slice
        self.bitarray = np.zeros(self.nbr_bits, dtype=np.bool)
        self.count = 0
        self.hashes = generate_hashfunctions(self.bits_per_slice, self.nbr_slices)
        self.hashed_values = []

    def __contains__(self, key):
        """ Membership query """
        self.hashed_values = self.hashes(key)
        offset = 0
        for value in self.hashed_values:
            if not self.bitarray[offset + value]:
                return False
            offset += self.bits_per_slice
        return True

    def add(self, key):
        """ Add an item to the set """
        if key in self:
            return True
        offset = 0
        if not self.hashed_values:
            self.hashed_values = self.hashes(key)
        for value in self.hashed_values:
            self.bitarray[offset + value] = True
            offset += self.bits_per_slice
        self.count += 1
        return False
In [55]:
bf = BloomFilter(1000, 0.1)

random_items = [str(r) for r in np.random.randn(20000)]
for item in random_items[:1000]:
    bf.add(item)
    false_positive = 0
for item in random_items[1000:20000]:
    if item in bf:
        false_positive += 1

print "Error rate (false positive): %s" % str(float(false_positive) / 19000)
Error rate (false positive): 0.105315789474
  • Good:
    • Memory: $ M \approx O\left( n \cdot k \cdot log (error) \right) $
    • CPU: $ M \approx O\left( k \right) $
    • Intersection and Union possible by bitwise AND & OR
  • Limitation:
    • False positive answer (no false negative): the BF will return a membership when there is not
    • Can't retrieve data.
    • Can't delete item (because of the hash collision) -> possible solution Counting Bloom Filter

Pattern: Have we met before ? (with ttl)

Trigger an action when you see an item for the first in a given time period

  • Ex:

    • Perform a expensive procesing each hours for each url
    • New vs returning visitor (30 mins session)
    • Any duplicate detection on unbounded data stream (and limited amount of memory)
      • Filling ratio goes up, precision goes down
  • Two choices:

    • Landmark window (each hours of the day) -> easy: reset the bloom filter periodically
    • Sliding window (last 30 mins) -> "interesting"

Bloom Filter with expiration

  • Naive implementation
    • Some magic combination of plain BF
      • ex: rotating pool of BFs (good tech interview question !)
    • ok, let store a timestamp instead of a bit: Time-Stamp Bloom Filter
      • membership by comparing the new timestamp with the stored timestamp
      • if non membership ($ts_2$ < $ts_1$) -> reset to zero
In [56]:
def mem_vs_errors_and_capacity(error=0.01, capacity=10000, bit_mul = 1):
    nbr_slices = int(np.ceil(np.log2(1.0 / error)))
    bits_per_slice = int(np.ceil((capacity * abs(np.log(error))) / (nbr_slices * (np.log(2) ** 2))))
    mem = float(((nbr_slices*bits_per_slice) / (8))) / (1024 * 1024 * 1024) * bit_mul
    return mem

errors = [0.01,0.02,0.03,0.05]
capacities = np.linspace(100,int(1E9))
mems = np.zeros((len(errors), len(capacities)))

for e,error in enumerate(errors):
    for c,capacity in enumerate(capacities):
        mems[e,c] = mem_vs_errors_and_capacity(error, capacity)

for e,error in enumerate(errors):
    plt.plot(capacities,mems[e])
plt.grid(True)
plt.legend([str(item) for item in errors], loc=0)
plt.xlabel('Capacity')
plt.ylabel('Memory Consumtion [Gb]')
plt.title('Memory Consumption for a plain Bloom Filter')
Out[56]:
<matplotlib.text.Text at 0x10893d850>
In [57]:
for e,error in enumerate(errors):
    for c,capacity in enumerate(capacities):
        mems[e,c] = mem_vs_errors_and_capacity(error, capacity, 64)

for e,error in enumerate(errors):
    plt.plot(capacities,mems[e])
plt.grid(True)
plt.legend([str(item) for item in errors], loc=0)
plt.xlabel('Capacity')
plt.ylabel('Memory Consumtion [Gb]')
plt.title('Memory Consumption for a Bloom Filter (with timestamp)')
Out[57]:
<matplotlib.text.Text at 0x1077d0350>

Do I really need the microsecond resolution ?

In [58]:
for e,error in enumerate(errors):
    for c,capacity in enumerate(capacities):
        mems[e,c] = mem_vs_errors_and_capacity(error, capacity, 6)

for e,error in enumerate(errors):
    plt.plot(capacities,mems[e])
plt.grid(True)
plt.legend([str(item) for item in errors], loc=0)
plt.xlabel('Capacity')
plt.ylabel('Memory Consumtion [Gb]')
plt.title('Memory Consumption for a Bloom Filter (6 bits "timestamp")')
Out[58]:
<matplotlib.text.Text at 0x10771d4d0>
  • Efficient way to use a low precision timestamp : Countdown

Countdown Bloom Filter

  • A Bloom Filter where each element leave the set after t time units:
    1. We instantiate an array of small integer (counter) initialized with a fixed value c (ex: 255)
    2. Each filter updates reinitialized this counter in the k positions
    3. Membership if all k buckets are non-zero
    4. With a certain fixed frequency, positions are slowly decremented (maintenance procedure).
  • Maintenance procedure
    • Ensures that items are expired after t time units on average.
- Refresh period *s*:
$$ s = \frac{t}{M \left( c - 1 + \frac{1}{z \cdot (k + 1)} \right)} $$
  • Good:
    • Memory usage: 4-8 bits counter
    • Can tweak expiration precision
    • Good fit for high frequency stuff
  • Limitation:
    • Addtional CPU and memory access overhead for the maintenance (memory vs cpu)
    • False Positive and False negative (temporal error)
    • Running the maintenance could be a pain
In [ ]:
from pds import CountdownBloomFilter

Stable Bloom Filter (quick mention)

  • Counting Bloom Filter where we decrement some random positions periodically -> make room for fresh elements.
  • No precise TTL (?)

Pattern: First class only

  • Considere only the first class items
  • Ex:
    • Increase the refresh rate of the most popular urls
    • Detect active users
    • More broadly : expensive Query / analysis
In [112]:
from collections import Counter # <== Probabilistic version of this
In [113]:
urls = Counter()
urls['foo'] += 2
urls['bar'] += 100

print urls
Counter({'bar': 100, 'foo': 2})

Count-Min Sketch

  • Count items in a Multiset

  • Precedure:

    1. We instantiate an array of small integer (counter) initialized to zero
    2. Each filter updates increment the counter in the k positions
    3. Get the count of each items -> min() values of the k positions
  • Almost identical to a Counting Bloom Filter

    • But here we want to control the precision differently
    • No simple error rate
    • What is the error on the estimate of the items count ?
    • Two parameters $\epsilon$ and $\delta$: ex: $\epsilon=1E-3$ and $\delta=0.01$
    • For m events seen in the stream, the upper bound error is on the estimate $f_i'$:
    • if count << m -> large rel. error

      $ k = log \left( \frac{1}{\epsilon} \right) $

      $ n = \frac{e^1}{\delta} $

      $ Pr[f_i' <= f_i + \epsilon m] >= 1 -\delta $

- In English: there 99% chance that estimate <= real estimate + some error


  • Work well for skewed distribution
  • Point Query : Single frequency of a single value ($f_i$)
  • Range Query ($\sum\limits_{i=1}^r{f_i}$):
    • Sum of the estimate for a range of value
    • For small ranges, the range sum can be estimated as a sum of point queries; however, as the range grows, the error in this approach also grows linearly.
    • Use a set of CMS instead: log(m) CMS using dyadic interval
In [109]:
s = np.random.zipf(2.0, 10000)
skew_dist = ((s/float(max(s)))*1000).astype(np.int)
count, bins, ignored = plt.hist(skew_dist[skew_dist<50], 50)
plt.grid(True)
plt.show()
In [96]:
class CountMinSketch(object):
    """ Basic Count-Min Sketch """

    def __init__(self, delta, epsilon):
        self.bits_per_slice = int(np.ceil(np.exp(1) / epsilon))
        self.nbr_slices = int(np.ceil(np.log(1 / delta)))
        self.bitarray = np.zeros((self.nbr_slices, self.bits_per_slice), dtype=np.int32)
        self.make_hashes = generate_hashfunctions(self.bits_per_slice, self.nbr_slices)

    def update(self, key, increment):
        for row, column in enumerate(self.make_hashes(key)):
            self.bitarray[int(row), int(column)] += increment

    def get(self, key):
        value = sys.maxint
        for row, column in enumerate(self.make_hashes(key)):
            value = min(self.bitarray[row, column], value)
        return value
In [97]:
import random

cms = CountMinSketch(1E-1, 0.03)

# Create a fake randomized stream for n occurences of str(n)
stream = []
for i in range(100):
    stream = stream + [str(i)] * i
random.shuffle(stream)

for s in stream:
    p = cms.update(s,1)
In [98]:
cms.get('1')
Out[98]:
1
In [102]:
cms.get('19')
Out[102]:
19
  • Good:
    • Keep track of the frequency of the top items
  • Limitation:

    • Work ok for skew distribution
  • Cool use case:

    • Keep track of the top-N items using (count-min sketch + heap)
    • q-Quantile on a stream (set of count-min sketch counter)

Pattern: Uniques items

  • Ex:

    • Number of uniques visitors on a site, post or section (with union)
  • Only cardinality, No membership, No content

  • => Proabilistic counting (Hyperloglog)
  • observable in the bitpattern of the hash

Let's flip some coins

  • Longest Run in the coin tossing experiment:
    • If I flip a coin n time, what is the probability P to have the Rn consecutive heads ?
    • independent Bernoulli trials, (q=0.5), if n is large:
    • expected value of longuest consecutive heads : $ R_n \approx log_{1/q} (nq) $
In [64]:
import pandas as pd

ns = [50,200,1000,1E4,1E5,1E6]

def Rn(n,q):
    return int(round(np.log2(n*q)))

We can have an approximation of the distribution by using an extreme value distribution:

In [65]:
def prob_longest(a, b, n, p):
    r = Rn(n,1-p)
    cdf = lambda x: np.exp(- p**x )
    return cdf(b + 1 - r) - cdf(a - r)

For example, for 1000 coin flips (n=1000, p=0.5 for a fair coin), the probability of having a longest run of 10 is 27%:

In [66]:
prob_longest(10,11,1000,0.5)
Out[66]:
0.27596624287196203

The longuest runs distribution will looks like

In [67]:
ns = range(1,30)
ps_50 = pd.Series([prob_longest(ns[i],ns[i+1], 50, 0.5) for i in ns[:-2]])
ps_50.plot(kind='bar')
Out[67]:
<matplotlib.axes.AxesSubplot at 0x107680710>
In [68]:
ps_200 = pd.Series([prob_longest(ns[i],ns[i+1], 200, 0.5) for i in ns[:-2]])
ps_200.plot(kind='bar')
Out[68]:
<matplotlib.axes.AxesSubplot at 0x1077a8610>
In [69]:
ps_100k = pd.Series([prob_longest(ns[i],ns[i+1], 1E6, 0.5) for i in ns[:-2]])
ps_100k.plot(kind='bar')
Out[69]:
<matplotlib.axes.AxesSubplot at 0x107906210>
  • length of the longest run shift towards larger value at a rate $\propto$ log()

  • By looking at the longuest run we can now estimate n simply using $n\approx (1/q) \cdot {2^{R_n}}$.

  • The Hyperloglog : Estimate the cardinality by counting the longest sequence on zero in hashed input stream

Hyperloglog

  • In the simple flipping coin the observables use to estimate the cardinality is longuest sequence on zero
  • In Hyperloglog we use a differente observable: bit-pattern $ O^{\rho-1}1$
  • Evaluate this pattern is very fast -> least signifiant bit
  • Cardinality : $\sim 2^\rho.$
  • More precision: m experiments (hash functions) instead of 1
  • We can emulate m experiment using stochastic averaging
  • Error rate: $\frac{1.04}{\sqrt{m}}$
  • Cardinality is estimate using the average (harmonic mean) of $\rho$

Longuest run

bit-pattern $ O^{\rho-1}1$

bit-pattern $ O^{\rho-1}1$ with stochastics averaging with 32 registers

  • Good :
    • Really efficient memory usage: "All the uniques" of one the bigger publisher in US: 2kB (2.5% error)
    • Union
  • Limitation:
    • Only query for cardinality
    • Intersection not really possible (with a predictable error)
In [69]: