HLLs and Polluted Registers

Introduction

It’s worth thinking about how things can go wrong, and what the implications of such occurrences might be. In this post, I’ll be taking a look at the HyperLogLog (HLL) algorithm for cardinality estimation, which we’ve discussed before.

The Setup

HLLs have the property that their register values increase monotonically as they run. The basic update rule is:

for item in stream:
    index, proposed_value = process_hashed_item(hash(item))
    hll.registers[index] = max(hll.registers[index], proposed_value)

There’s an obvious vulnerability here: what happens to your counts if you get pathological data that blows up a register value to some really large number? These values are never allowed to decrease according to the vanilla algorithm. How much of a beating can these sketches take from such pathological data before their estimates are wholly unreliable?

Experiment The First

To get some sense of this, I took a 1024 bucket HLL, ran a stream through it, and then computed the error in the estimate. I then proceeded to randomly choose a register, max it out, and compute the error again. I repeated this process until I had maxed out 10% of the registers. In pseudo-python:

print("n_registers_touched,relative_error")
print(0, relative_error(hll.cardinality(), stream_size), sep = ",")
for index, reg in random.sample(range(1024), num_to_edit):
    hll.registers[reg] = 32
    print(index + 1, relative_error(hll.cardinality(), stream_size), sep = ",")

In practice, HLL registers are fixed to be a certain bit width. In our case, registers are 5 bits wide, as this allows us to count runs of 0s up to length 32. This allows us to count astronomically high in a 1024 register HLL.

Repeating this for many trials, and stream sizes of 100k, 1M, and 10M, we have the following picture. The green line is the best fit line.

Error grows linearly with polluted register values

What we see is actually pretty reassuring. Roughly speaking, totally poisoning x% of registers results in about an x% error in your cardinality estimate. For example, here are the error means and variances across all the trials for the 1M element stream:

Number of Registers Modified Percentage of Registers Modified Error Mean Error Variance
0 0 -0.0005806094 0.001271119
10 0.97% 0.0094607324 0.001300780
20 1.9% 0.0194860481 0.001356282
30 2.9% 0.0297495396 0.001381753
40 3.9% 0.0395013208 0.001436418
50 4.9% 0.0494727527 0.001460182
60 5.9% 0.0600436774 0.001525749
70 6.8% 0.0706375356 0.001522320
80 7.8% 0.0826034639 0.001599104
90 8.8% 0.0937465662 0.001587156
100 9.8% 0.1060810958 0.001600348

Initial Reactions

I was actually not too surprised to see that the induced error was modest when only a small fraction of the registers were poisoned. Along with some other machinery, the HLL algorithm uses the harmonic mean of the individual register estimates when computing its guess for the number of distinct values in the data stream. The harmonic mean does a very nice job of downweighting values that are significantly larger than others in the set under consideration:

In [1]: from scipy.stats import hmean

In [2]: from numpy import mean

In [3]: f = [1] * 100000 + [1000000000]

In [4]: mean(f)
Out[4]: 10000.899991000089

In [5]: hmean(f)
Out[5]: 1.0000099999999899

It is this property that provides protection against totally wrecking the sketch’s estimate when we blow up a fairly small fraction of the registers.

Experiment The Second

Of course, the algorithm can only hold out so long. While I was not surprised by the modesty of the error, I was very surprised by how linear the error growth was in the first figure. I ran the same experiment, but instead of stopping at 10% of the registers, I went all the way to the end. This time, I have plotted the results with a log-scaled y-axis:

Note that some experiments appear to start after others. This is due to missing data from taking the logarithm of negative errors.

Without getting overly formal in our analysis, there are roughly three phases in error growth here. At first, it’s sublinear on the log-scale, then linear, then superlinear. This roughly corresponds to “slow”, “exponential”, and “really, really, fast”. As our mathemagician-in-residence points out, the error will grow roughly as p/(1-p) where p is the fraction of polluted registers. The derivation of this isn’t too hard to work out, if you want to give it a shot! The implication of this little formula matches exactly what we see above. When p is small, the denominator does not change much, and the error grows roughly linearly. As p approaches 1, the error begins to grow super-exponentially. Isn’t it nice when experiment matches theory?

Final Thoughts

It’s certainly nice to see that the estimates produced by HLLs are not overly vulnerable to a few errant register hits. As is often the case with this sort of analysis, the academic point must be put in balance with the practical. The chance of maxing out even a single register under normal operation is vanishingly small, assuming you chose a sane hash function for your keys. If I was running an HLL in the wild, and saw that 10% of my registers were pegged, my first thought would be “What is going wrong with my system!?” and not “Oh, well, at least I know my estimate to within 10%!” I would be disinclined to trust the whole data set until I got a better sense of what caused the blowups, and why I should give any credence at all to the supposedly unpolluted registers.

Doubling the Size of an HLL Dynamically – Recovery

Author’s Note: This post is related to a few previous posts dealing with the HyperLogLog algorithm. See Matt’s overview of HLL, and see this post for an overview of “folding” or shrinking HLLs in order to perform set operations. It is also the first in a series of three posts on doubling the size of HLLs – the next two will be about set operations and utilizing additional bits of data, respectively.

Overview

In this post, we explore the error of the cardinality estimate of an HLL whose size has been doubled using several different fill techniques. Specifically, we’re looking at how the error changes as additional keys are seen by the HLL.

A Quick Reminder – Terminology and Fill Strategies

If we have an HLL of size 2^n and we double it to be an HLL of size 2^{n+1}, we call two bins “partners” if their bin number differs by 2^n.  For example, in an HLL double to be size 8, the bins 1 and 5 are partners, as are 2 and 6, etc. The Zeroes doubling strategy fills in the newly created bins with zeroes. The Concatenate strategy fills in the newly created bins with the values of their partner bins. MinusTwo fills in each bin with two less than its partner bin’s value. RE fills in the newly created bins according to the empirical distribution of each bin.

Some Sample Recovery Paths

Below, we ran four experiments to check recovery time. Each experiment consisted of running an HLL of size 210 on 500,000 unique hashed keys (modeled here using a random number generator), doubling the HLL to be size 211, and then ran 500,000 more hashed keys through the HLL. Below, we have graphs showing how the error decreases as more keys are added.  Both graphs show the same data (the only difference being the scale on the y-axis). We have also graphed “Large,” an HLL of size 2^{11}, and “Small,” an HLL of size 2^{10}, which are shown only for comparison and are never doubled.

One thing to note about the graphs is that the error is relative.

Notice that Concatenate and Zeroes perform particularly poorly. Even after 500,000 extra keys have been added, they still don’t come within 5% of the true value! For Zeroes, this isn’t too surprising. Clearly the initial error of Zeroes, that is the error immediately after doubling, should be high.  A quick look at the harmonic mean shows why this occurs. If a single bin has a zero as its value, the harmonic mean of the values in the bins will be zero. Essentially, the harmonic mean of a list always tends towards the lowest elements of the list. Hence, even after all the zeroes have been replaced with positive values, the cardinality estimate will be very low.

On the other hand, a more surprising result is that Concatenate gives such a poor guess. To see this we need to look at the formula for the estimate again.  The formula for the cardinality estimate is \frac{\alpha_m m^2}{\sum_{i=1}^{m} 2^{-M_i}} where M_i is the value in the i^{th} bin, m is the number of bins, and \alpha_m is a constant approaching about .72. For Concatenate, the value M_{i + m} is equal to M_i.  Hence we have that the cardinality estimate for Concatenate is:

\begin{array}{ll}\displaystyle\frac{\alpha_{2m} (2m)^2}{\left(\displaystyle\sum_{i=1}^{2m} 2^{-M_i}\right)} \vspace{10pt}&\approx \displaystyle\frac{.72\cdot 4\cdot m^2}{\left(\displaystyle\sum_{i=1}^m 2^{-M_i}\right) + \left(\displaystyle\sum_{i=1}^m 2^{-M_i}\right) }\vspace{10pt} \\&\displaystyle= \displaystyle 4\cdot \frac{.72 \cdot m^2}{2\cdot \left(\displaystyle\sum_{i=1}^m 2^{-M_i}\right)}\vspace{10pt}\\&= \displaystyle 2 \cdot \frac{.72 \cdot m^2}{\left(\displaystyle\sum_{i=1}^m 2^{-M_i}\right)}\vspace{10pt}\\&\approx \displaystyle 2 \cdot \frac{ \alpha_m \cdot m^2}{\left(\displaystyle\sum_{i=1}^m 2^{-M_i}\right)}\end{array}

Notice that this last term is about equal to 2 times the cardinality estimate of the HLL before doubling. One quick thing that we can take away from this is that it is unlikely for two “partner” bins to have the same value in them (since if this happens frequently, we get an estimate close to that given by Concatenate – which is very inaccurate!).

As for MinusTwo and RE, these have small initial error and the error only falls afterwards. The initial error is small since the rules for these give guesses approximately equal to the guess of the original HLL before doubling. From here, the error should continue to shrink, and eventually, it should match that of the large HLL.

One thing we noticed was that error for Concatenate in the graph above suggested that the absolute error wasn’t decreasing at all. To check this we looked at the trials and, sure enough, the absolute error stays pretty flat. Essentially, Concatenate overestimates pretty badly, and puts the HLL in a state where it thinks it has seen twice as many keys as it actually has. In the short term, it will continue to make estimates as if it has seen 500,000 extra keys. We can see this clearly in the graphs below.

Recovery Time Data

I also ran 100 experiments where we doubled the HLLs after adding 500,000 keys, then continued to add keys until the cardinality estimate fell within 5% of the true cardinality.  The HLLs were set up to stop running at 2,000,000 keys if they hadn’t arrived at the error bound.

Notice how badly Concatenate did! In no trials did it make it under 5% error. Zeroes did poorly as well, though it did recover eventually. My guess here is that the harmonic mean had a bit to do with this – any bin with a low value, call it k, in it would pull the estimate down to be about m^2 \cdot 2^k. As a result, the estimate produced by the Zeroes HLL will remain depressed until every bin is hit with a(n unlikely) high value. Zeroes and Concatenate should not do well since essentially the initial estimate (after doubling) of each HLL is off by a very large fixed amount. The graph of absolute errors, above, shows this.

On the other hand, RE and MinusTwo performed fairly well. Certainly, RE looks better in terms of median and middle 50%, though its variance is much higher than MinusTwo‘s.This should make sense as we are injecting a lot of randomness into RE when we fill in the values, whereas MinusTwo‘s bins are filled in deterministically.

Recovery Time As A Function of Size

One might wonder whether the recovery time of MinusTwo and RE depend on the size of the HLL before the doubling process. To get a quick view of whether or not this is true, we did 1,000 trials like those above but by adding 200K, 400K, 600K, 800K, 1M keys and with a a cutoff of 3% this time. Below, we have the box plots for the data for each of these. The headings of each graph gives the size of the HLL before doubling, and the y-axis gives the fractional recovery time (the true recovery time divided by the size of the HLL before doubling).

Notice that, for each doubling rule, there is almost no variation between each of the plots. This suggests that the size of the HLL before doubling doesn’t change the fractional recovery time. As a side note, one thing that we found really surprising is that RE is no longer king – MinusTwo has a slightly better average case. We think that this is just a factor of the higher variation of RE and the change in cutoff.

Summary

Of the four rules, MinusTwo and RE are clearly the best. Both take about 50 – 75% more keys after doubling to get within 3% error, and both are recover extremely quickly if you ask for them to only get within 5% error.

To leave you with one last little brainteaser, an HLL of size 2^{10}, which is then doubled, will eventually have the same values in its bins as an HLL of size 2^{11} which ran on the same data. About how long will it take for these HLLs to converge? One (weak) requirement for this to happen is to have the value in every bin of both HLLs be changed. To get an upper bound on how long this should take, one should read about the coupon collector problem.

Sketch of the Day: HyperLogLog — Cornerstone of a Big Data Infrastructure

Intro

In the Zipfian world of AK, the HyperLogLog distinct value (DV) sketch reigns supreme. This DV sketch is the workhorse behind the majority of our DV counters (and we’re not alone) and enables us to have a real time, in memory data store with incredibly high throughput. HLL was conceived of by Flajolet et. al. in the phenomenal paper HyperLogLog: the analysis of a near-optimal cardinality estimation algorithm. This sketch extends upon the earlier Loglog Counting of Large Cardinalities (Durand et. al.) which in turn is based on the seminal AMS work FM-85, Flajolet and Martin’s original work on probabilistic counting. (Many thanks to Jérémie Lumbroso for the correction of the history here. I am very much looking forward to his upcoming introduction to probabilistic counting in Flajolet’s complete works.) UPDATE – Rob has recently published a blog about PCSA, a direct precursor to LogLog counting which is filled with interesting thoughts. There have been a few posts on HLL recently so I thought I would dive into the intuition behind the sketch and into some of the details.

Just like all the other DV sketches, HyperLogLog looks for interesting things in the hashed values of your incoming data.  However, unlike other DV sketches HLL is based on bit pattern observables as opposed to KMV (and others) which are based on order statistics of a stream.  As Flajolet himself states:

Bit-pattern observables: these are based on certain patterns of bits occurring at the beginning of the (binary) S-values. For instance, observing in the stream S at the beginning of a string a bit- pattern O^{\rho-1}1 is more or less a likely indication that the cardinality n of S is at least 2^\rho.

Order statistics observables: these are based on order statistics, like the smallest (real) values, that appear in S. For instance, if X = min(S), we may legitimately hope that n is roughly of the order of 1/X…

In my mind HyperLogLog is really composed of two insights: Lots of crappy things are sometimes better than one really good thing; and bit pattern observables tell you a lot about a stream. We’re going to look at each component in turn.

Bad Estimator

Even though the literature refers to the HyperLogLog sketch as a different family of estimator than KMV I think they are very similar. It’s useful to understand the approach of HLL by reviewing the KMV sketch. Recall that KMV stores the smallest k values that you have seen in a stream. From these k values you get an estimate of the number of distinct elements you have seen so far. HLL also stores something similar to the smallest values ever seen. To see how this works it’s useful to ask “How could we make the KMV sketch smaller?” KMV stores the actual value of the incoming numbers. So you have to store k 64 bit values which is tiny, but not that tiny. What if we just stored the “rank” of the numbers?  Let’s say the number 94103 comes through (I’ll use base 10 here to make things easier). That number is basically 9*10^4 plus some stuff. So, let’s just store the exponent, i.e. 4. In this way I get an approximation of the size of numbers I have seen so far. That turns the original KMV algorithm into only having to store the numbers 1-19 (since 2^{64} \approx 10^{19}) which is a whole lot less than 2^{64} numbers. Of course, this estimate will be much worse than storing the actual values.

Bit Pattern Observables

In actuality HLL, just like all the other DV sketches, uses hashes of the incoming data in base 2. And instead of storing the “rank” of the incoming numbers HLL uses a nice trick of looking for runs of zeroes in the hash values. These runs of zeroes are an example of “bit pattern observables”. This concept is similar to recording the longest run of heads in a series of coin flips and using that to guess the number of times the coin was flipped. For instance, if you told me that you spent some time this afternoon flipping a coin and the longest run of heads you saw was 2 I could guess you didn’t flip the coin very many times. However, if you told me you saw a run of 100 heads in a row I would gather you were flipping the coin for quite a while. This “bit pattern observable”, the run of heads, gives me information about the stream of data it was pulled from. An interesting thing to note is just how probable long runs of heads are. As Mark Shilling points out, you can almost always tell the difference between a human generated set of coin flips and an actual one, due to humans not generating long runs. (The world of coin flipping seems to be a deep and crazy pit.) Disclaimer: The only thing I am trying to motivate here is that by keeping a very small piece of information (the longest run of heads) I can get some understanding of what has happened in a stream. Of course, you could probably guess that even though we have now reduced the storage of our sketch the DV estimate is pretty crummy. But what if we kept more than one of them?

Stochastic Averaging

In order to improve the estimate, the HLL algorithm stores many estimators instead of one and averages the results. However, in order to do this you would have to hash the incoming data through a bunch of independent hash functions. This approach isn’t a very good idea since hashing each value a bunch of times is expensive and finding good independent hash families is quite difficult in practice. The work around for this is to just use one hash function and “split up” the input into m buckets while maintaining the observable (longest run of zeroes) for each bucket. This procedure is called stochastic averaging. You could do this split in KMV as well and it’s easier to visualize. For an m of 3 it would look like:

To break the input into the m buckets, Durand suggests using the first few (k) bits of the hash value as an index into a bucket and compute the longest run of zeroes (R) on what is left over. For example, if your incoming datum looks like 010100000110 and k = 3 you could use the 3 rightmost bits, 110, to tell you which register to update (110_2 = 6) and from the remaining bits, 010100000, you could take the longest run of zeroes (up to some max), which in this case is 5. In order to compute the number of distinct values in the stream you would just take the average of all of the m buckets:

\displaystyle DV_{LL} = \displaystyle\text{constant} * m*2^{\overline{R}}

Here \overline{R} is the average of the values R in all the buckets. The formula above is actually the estimator for the LogLog algorithm, not HyperLogLog. To get HLL, you need one more piece…

Harmonic Mean

A fundamental insight that Flajolet had to improve LogLog into HyperLogLog was that he noticed the distribution of the values in the m registers is skewed to the right, and there can be some dramatic outliers that really mess up the average used in LogLog (see Fig. 1 below). He and Durand knew this when they wrote LogLog and did a bunch of hand-wavey stuff (like cut off the top 30% of the register values) to create what he called the “SuperLogLog”, but in retrospect this seems kind of dumb. He fixed this in HLL by tossing out the odd rules in SuperLogLog and deciding to take the harmonic mean of the DV estimates. The harmonic mean tends to throw out extreme values and behave well in this type of environment. This seems like an obvious thing to do. I’m a bit surprised they didn’t try this in the LogLog paper, but perhaps the math is harder to deal with when using the harmonic mean vs the geometric mean.

Fig. 1:  The theoretical distribution of register values after v distinct values have been run through an HLL.

Throw all these pieces together and you get the HyperLogLog DV estimator:

\displaystyle DV_{HLL} = \displaystyle\text{constant} * m^2 *\left (\sum_{j=1}^m 2^{-R_j} \right )^{-1}

Here R_j is the longest run of zeroes in the i^{th} bucket.

Putting it All Together

Even with the harmonic mean Flajolet still has to introduce a few “corrections” to the algorithm. When the HLL begins counting, most of the registers are empty and it takes a while to fill them up. In this range he introduces a “small range correction”. The other correction is when the HLL gets full. If a lot of distinct values have been run through an HLL the odds of collisions in your hash space increases. To correct for hash collisions Flajolet introduces the “large range collection”. The final algorithm looks like (it might be easier for some of you to just look at the source in the JavaScript HLL simulation):

m = 2^b #with b in [4...16]

if m == 16:
    alpha = 0.673
elif m == 32:
    alpha = 0.697
elif m == 64:
    alpha = 0.709
else:
    alpha = 0.7213/(1 + 1.079/m)

registers = [0]*m # initialize m registers to 0

##############################################################################################
# Construct the HLL structure
for h in hashed(data):
    register_index = 1 + get_register_index( h,b ) # binary address of the rightmost b bits
    run_length = run_of_zeros( h,b ) # length of the run of zeroes starting at bit b+1
    registers[ register_index ] = max( registers[ register_index ], run_length )

##############################################################################################
# Determine the cardinality
DV_est = alpha * m^2 * 1/sum( 2^ -register )  # the DV estimate

if DV_est < 5/2 * m: # small range correction
    V = count_of_zero_registers( registers ) # the number of registers equal to zero
    if V == 0:  # if none of the registers are empty, use the HLL estimate
          DV = DV_est
    else:
          DV = m * log(m/V)  # i.e. balls and bins correction

if DV_est <= ( 1/30 * 2^32 ):  # intermediate range, no correction
     DV = DV_est
if DV_est > ( 1/30 * 2^32 ):  # large range correction
     DV = -2^32 * log( 1 - DV_est/2^32)

Rob wrote up an awesome HLL simulation for this post. You can get a real sense of how this thing works by playing around with different values and just watching how it grows over time. Click below to see how this all fits together.

HyperLogLog Simulation

Click above to run the HyperLogLog simulation

Unions

Unions are very straightforward to compute in HLL and, like KMV, are lossless. All you need to do to combine the register values of the 2 (or n) HLL sketches is take the max of the 2 (or n) register values and assign that to the union HLL. With a little thought you should realize that this is the same thing as if you had fed in the union stream to begin with. A nice side effect about lossless unions is that HLL sketches are trivially parallelizable. This is great if, like us, you are trying to digest a firehose of data and need multiple boxes to do summarization. So, you have:

for i in range(0, len(R_1)):
     R_new[i] = max( R_1[i], R_2[i] )

To combine HLL sketches that have differing sizes read Chris’s blog post about it.

Wrapping Up

In our research, and as the literature says, the HyperLogLog algorithm gives you the biggest bang for the buck for DV counting. It has the best accuracy per storage of all the DV counters to date. The biggest drawbacks we have seen are around intersections. Unlike KMV, there is no explicit intersection logic, you have to use the inclusion/exclusion principle and this gets really annoying for anything more than 3 sets. Aside from that, we’ve been tickled pink using HLL for our production reporting. We have even written a PostgreSQL HLL data type that supports cardinality, union, and intersection. This has enabled all kinds of efficiencies for our analytics teams as the round trips to Hadoop are less and most of the analysis can be done in SQL. We have seen a massive increase in the types of analytics that go on at AK since we have adopted a sketching infrastructure and I don’t think I’m crazy saying that many big data platforms will be built this way in the future.

P.S.  Sadly, Philippe Flajolet passed away in March 2011. It was actually a very sad day for us at Aggregate Knowledge because we were so deep in our HLL research at the time and would have loved to reach out to him, he seems like he would have been happy to see his theory put to practice. Based on all I’ve read about him I’m very sorry to have not met him in person. I’m sure his work will live on but we have definitely lost a great mind both in industry and academia. Keep counting Philippe!

Photo courtesy of http://www.ae-info.org/

Adventures in Concurrency

 

The Past

The Summarizer, our main piece of aggregation infrastructure, used to have a very simple architecture:

  1. RSyslog handed Netty some bytes.
  2. A Netty worker turned those bytes into a String.
  3. The Netty worker then peeled off the RSyslog envelope to reveal a payload and an event type. We call this combination an IMessage.
  4. The IMessage‘s payload got turned into an IInputEvent (basically a POJO).
  5. The IInputEvent was mapped to one or many summaries, based on its event type, and which would then be updated with the event.
Original Summarizer Architecture with Summarization Lock

Netty workers contend for a central summarization lock before updating summaries.

All of this work was done inside the Netty workers, and the synchronization of the summary objects was handled by a single lock on all of them. Some events were mapped to a single summary, others to a half dozen. Luckily the payloads were simple (CSV-formatted) and the summaries were even simpler. It didn’t really matter if the events hit one summary or ten, we could summarize events much faster than we could parse them. Under these conditions we could handle 200k messages per second, no sweat.

Slowly, new reporting features were added and the summaries became more complex. The number of operations per event increased, and throughput dropped to 100k/sec. Progressively, summarization supplanted parsing as the bottleneck.

Then we introduced a more complex, nested JSON event format (v2 messages) in order to support new product features. Complex, nested events meant ever more complex, nested summaries, which meant ever more time holding the single lock while updating them. Parsing time increased with the new payload format, but was still far faster than updating the summaries. Throughput for v1 messages had dipped to 60k/sec, and 10k/sec for the v2 messages.

Moreover, the new features the v2 messages permitted weren’t simply an open-ended exercise: with them came the customers that demanded those features and their additional traffic.  The Summarizer simply wouldn’t stand up to the predicted traffic without some work. This post is an overview of the multithreaded solution we used and hopefully will provide some insight into the pitfalls of concurrency in Java 6 and 7.

Objective

Get v1 message throughput back to 200k/sec and v2 throughput to 100k/sec, ideally on our production hardware. Simple enough, given that I knew the main bottlenecks were summarization and to a lesser extent the parsing of the IMessages to IInputEvents.

Let’s put in some queues

The basic premise of concurrency is that you find the time-consuming bits of work, throw some queues and workers at them, and out comes performance, right? (Wrong! But I’ve got a whole narrative going here, so bear with me.) The natural places for inserting these queues seemed to be between steps 3 and 4, and steps 4 and 5. If parsing IMessages to IInputEvents and summarizing IInputEvents are the only time-consuming work units, adding concurrency there should open up those bottlenecks. According to the “train book” I had three options for queues:

  • ArrayBlockingQueue (henceforth ABQ) – bounded, backed by an array (duh), uses a single ReentrantLock
  • LinkedBlockingQueue (henceforth LBQ)- bounded, backed by a linked list (duh), uses two ReentrantLocks (one for the head and one for the tail)
  • ConcurrentLinkedQueue (henceforth CLQ)- unbounded, backed by a linked-list, uses no locks, instead relies on a work-stealing algorithm and CAS
Queued Summarizer Architecture with BlockingQueues

IMessage to IInputEvent parsing and IInputEvent summarization are buffered by BlockingQueues.

We added a message parsing queue to which the Netty workers would dump IMessages. Message parser workers would take those IMessages and turn them into IInputEvents. They would then distribute those IInputEvents to a summarization queue I added to each summarization worker. Since I didn’t want to lock each report object, I decided that only a single summarization worker would ever write to a particular summary. (Martin Thompson’s blog posts about the Single Writer Principle were inspiration for this.) That is, each summarization worker would be assigned (by round-robin at startup) one or many summaries to own exclusively. So, in total I added one multiple-producer, multiple-consumer (MPMC) message parsing queue and N multiple-producer, single-consumer (MPSC) summarization queues (one for each summarization worker).

The First Bottleneck: Parsing

I slotted in the various queues available, replayed some traffic to get a feel for what was going on.

  • The message parsing queue was always full, which confirmed my suspicion that parsing, not Netty, was the first bottleneck.
  • The summarization queues were split between two groups: those that were always full and always empty. The reason was clear: some summarization workers were assigned high-volume summaries and others low-volume summaries.

This was my first lesson in queueing: queues are on average completely full or empty because it is nearly impossible to perfectly balance production/consumption rates. Which leads to the second lesson: CLQ (well, any unbounded queue) probably shouldn’t be used as a producer/consumer queue because “completely full” means “always growing” for an unbounded queue. Naturally, that’s an OK situation when consumption outpaces production, but in that scenario I wouldn’t have needed the queue in the first place. I needed back-pressure and only the blocking (in this case, bounded) queues could give me that.

In order to address the parsing bottleneck, I wanted to get a better grasp of the IMessage to IInputEvent throughput rate under different configurations. I constructed a test in which Netty workers would either:

  • do all the parsing work themselves, and then discard the message, or
  • would enqueue IMessages to the message parsing queue (either ABQ or LBQ), and parser workers would dequeue, parse, and then discard the IInputEvent. CLQ was not included here since it would consistently OOM as the queue grew without bound.

Each Netty worker would be responsible for a single connection and each connection could provide as many as 150k messages per second. Results for v1 and v2 message parsing were nearly identical, as were the results for Java 6/7, so they are presented here without distinction.

Netty-Only Message Parsing

When Netty did all of the parsing work, throughput maxed out at about 130k/sec.

 

Queued Message Parsing Throughput

Message parsing throughput with a BlockingQueue between the Netty workers and the message parser workers. The facet labels are [Queue Implementation, Queue Size].

  • Without a queue, throughput topped out at 130k/s. With a queue and the right parser worker count, each of the four Netty workers could produce 60k/sec worth of IMessages. Notably, neither situation provoked anywhere near 100% saturation on (# netty worker + # parser worker) cores, so I have to believe that it’s simply a matter of having dedicated parsing threads that are not affected by the context switching required to read from the network. Say context switching takes up r% of your time, then 5 netty workers can do at most w_0 = 5(1-r/100) units of work. However, 4 Netty workers and 1 parser worker can do w_1 = 4(1-r/100) + 1 > w_0 units. The fact that 4 Netty workers + 1 parser worker yields about 130-150k/sec, which is a small gain over just 5 Netty workers, suggests this. It might also be a matter of code “locality”: by allowing each thread to focus on a narrower scope of work, better branch prediction or compilation may be possible.
  • ABQ, touted as the end all of high-performance Java queues, gave us “atrocious” throughput, compared to LBQ, if more than two consumer threads were hitting it. This was surprising until I poked an active VM with SIGQUIT a few hundred times only to find that most of the workers were waiting on the ABQ’s ReentrantLock. The difference between two and three consumers hammering that lock amounted to a 50% drop in throughput.
  • LBQ’s split lock seemed to degrade more gracefully in the presence of “extra” producers or consumers. Specifically, the overhead of GC and a linked-list (vs. array) was less than that produced by lock contention on ABQ’s single lock. More remarkably, 2-8 parser workers always produced better results than a single parser worker, so a misconfiguration here couldn’t really do worse than revert to the 1 worker scenario. ABQ was not so lenient, however, dropping to throughput numbers lower than the Netty-only setup after 2 parser workers.
  • Queue size was largely irrelevant compared to the impact of adding or removing even a single producer/consumer. As long as the queue is large enough to buffer jitters, there’s really little point in spending hours tuning it.

Progress! At least I knew I could parse the 200k messages per second I needed.

The Second Bottleneck: Summarization

I knew that I wouldn’t be able to preserve the full parsing throughput simply because the queuing/dequeuing latency of another queue would always be present. The tradeoff was going to be adding more summarization workers at the cost of more time spent by the message parsing workers distributing the newly parsed IInputEvents to all relevant summarization workers. Each event would likely be distributed to more than one summarization worker, which meant a sequential lock acquisition for each summarization worker.

The cost of delivery was affected by the number of message parser workers, the number of summaries, the number of summarization workers, as well as the fan-out factor of each particular event, and hence on the proportions of different events to each other in a “nominal” data stream. This seemed like too many variables to isolate and too brittle of a measurement to be of any use. Instead, I threw out the fine-grained rigor and just plotted as many things as I could. I ran all the queues at one size: 2048.

BlockingQueue V1 Throughput on Production Hardware

At high throughputs, adding more workers simply makes things worse. Facet labels are [Message Parser Queue Impl-Summarization Queue Impl, JDK].

BlockingQueue V2 Throughput on Production Hardware

At lower throughputs, lock overhead becomes less of a factor. Facet labels are [Message Parser Queue Impl-Summarization Queue Impl, JDK].

  • Again, the touted ABQ is matched or bested by LBQ in many configurations. It’s very, very interesting to me that GC on such fast-moving LBQs isn’t a massive issue. That said, for these tests I was running 30GB heaps, so the new generation was rather large, and the nodes of the linked list are extremely short-lived. Don’t write off LBQ, especially with higher producer/consumer counts!
  • Again, it’s simply stunning how much of a difference a single added or removed producer or consumer can make on the total throughput. Our production machines have enough hardware threads to cover the worker threads, so it’s unlikely that resource starvation is a problem here. It seems that application performance can suffer immensely from simple lock convoys caused by too many workers. Comparing the v1 and v2 plots, it’s clear that the queue lock(s) can’t support any more contention from new workers at high throughputs. Adding more workers at 100k/sec completely guts performance in a way that simply does not occur at 25k/sec. Lock overhead can destroy performance at high throughputs!
  • The “best” worker configurations for v1 are, from a performance perspective, incompatible with v2 workloads and vice versa. It is absolutely crucial to distinguish and separate the different types of workloads. Trying to run them together will lead to misleading and muddled results. Tease them apart and you can fairly easily optimize both. For instance, LBQ-LBQ seems to work best with 2 summarization workers for v1 workloads. However, that configuration sacrifices 50% of peak performance on v2 workloads, which worked best with 3 or 4 summarization workers. The way this separation is implemented in production is via a rule in our event routing layer: all v1 messages are routed to one Summarizer and all v2 messages are routed to another. If that kind of separation isn’t possible, it’s probably worth instantiating two different queues and balancing worker pools separately, instead of trying to lump all the events together.
  • Java 6 to Java 7 bought us nothing on this hardware. You may note that under some configurations, performance appears to dip slightly under Java 7, but that’s slightly misleading because I’ve used averages of throughputs in these plots for visual clarity. The performance “dip” easily falls within the jitter of the raw data.

The problem was that despite these improvements I hadn’t reached my stated goals. It was time to look a bit further afield than java.util.concurrent.

Disruptor

I’d mentioned that Martin Thompson’s Mechanical Sympathy blog had been inspiration for some of our design choices. It was also an excellent introduction to LMAX’s Disruptor, which can be used to simulate a graph of bounded queues. Since it advertised vast improvements in throughput over LBQ and ABQ, I decided to give it a shot.

Side note: Yes, I know the Disruptor is meant to be used when the actual bytes of data are in the RingBuffer‘s entries, as opposed to just references. No, I can’t do that easily because we have variable message sizes and using the max size as an upper bound for the entries would make the buffer either too small (in entry count) or too large to fit into L3, as advised. If I get desperate, I might consider re-architecting the application to move to a smaller message representation and move the deserialization into the “business logic” as suggested by the first link. The following results and analysis are NOT an endorsement or condemnation of the Disruptor under any kind of rigorous testing regimen. I wanted to see if I could use it as a slot-in replacement for our queues, nothing more, nothing less.

I tried out the Disruptor in the least invasive way I could think of: one Disruptor per summarization worker. Each summarization worker would have a RingBuffer<IInputEvent> that would be fed off of the various message parser workers. This fits nicely because it supports an easy MPSC configuration with the MultiThreadedClaimStrategy. I considered using it for the message parsing queue, but the hoops I’d having to jump through to stripe the RingBuffer to allow an MPMC configuration just seemed like overkill for a preliminary test. I tried out various WaitStrategys but the results shown below are from the ‘busy-spin’ strategy, which gave the best throughput.

Summarizer Architecture with Message Parsing Queue and Disruptor Summarization Queues

Disruptor as a slot-in replacement for an MPSC BlockingQueue.

Disruptor V1 Throughput on Production Hardware

Facet labels are [Message Parser Queue Impl-Summarization Queue Impl, JDK].

Disruptor V2 Throughput on Production Hardware

Facet labels are [Message Parser Queue Impl-Summarization Queue Impl, JDK].

The results here were unsurprising: the Disruptor did not magically fix my problems. It performed quite well on our production hardware, matching the best results for both v1 and v2 messages, but ended up utilizing more CPU resources despite being allocated an equal number of threads, regardless of WaitStrategy. This brings up another interesting point: I had the choice of using put/take or offer/poll on our BlockingQueues and ended up choosing put/takefor the same reason. A marginal increase in throughput didn’t seem worthwhile if the tradeoff was having every thread in a busy spin consuming 100% of its core. For us, even a 10% performance increase wasn’t enough to justify the decreased visibility into the “true” utilization of the CPU resources.

Hardware as a crutch

I was left in a pickle. Short of re-architecting around the “ideal” Disruptor workflow or reworking the way the summarization workers shared work (allowing N workers to 1 summary as well as 1 worker to N summaries) I was without a quick software option for fixing this. So, like the lazy clod that I am, I turned to hardware to buy some more time. I happened to have a faster, more modern CPU on hand, so I gave that a spin. The baseline v2 message throughput was 20k/sec, for reference.

V2 Throughput on Modern Hardware

The facet labels are [Message Queue Impl-Summarization Queue Impl, JDK].

Talk about hardware making a difference! Moving onto fresh hardware can literally double performance, without a doubling in clock speed.

  • Though the Disruptor configurations gave the best results, the “mundane” LBQ-LBQ ones only trailed them by 8%, using 2-4 fewer threads and nearly a full core’s less of CPU, at that. LBQ-LBQ also beat ABQ-ABQ out handily by about 10% in most configurations.
  • The performance benefits of the Java 7 Hotspot over Java 6 are clear on this newer hardware. I saw a 10-20% performance boost across the board. Notably its impact on the Disruptor configurations is more pronounced than on the others.

Note also that the optimal worker counts differ based on hardware, which is expected given the differences between Nehalem and Sandy Bridge. Every little bit of configuration seems to make a difference, and a meaningful one at that!

Major takeaways:

  1. Explore your configuration space: worker counts, JVMs, hardware. One step in any direction in any of those spaces can provide a meaningful performance boost.
  2. Separate your workloads! Tune for each workload!
  3. Don’t bother tuning queue size except for the purpose of jitter or keeping it in L3.
  4. Even if you don’t know what the black box at the bottom (or even the middle) of the stack is doing, you can still make progress! Experiment and plot and keep good notes!
  5. The Java 7 Hotspot offers a small but consistent performance improvement over Java 6 on newer hardware.

Doubling the Size of an HLL Dynamically

Introduction

In my last post, I explained how to halve the number of bins used in an HLL as a way to allow set operations between that HLL and smaller HLLs.  Unfortunately, the accuracy of an HLL is tied to the number of bins used, so one major drawback with this “folding” method is that each time you have the number of bins, you reduce that accuracy by a factor of \sqrt{2}.

In this series of posts I’ll focus on the opposite line of thinking: given an HLL, can one double the number of bins, assigning the new bins values according to some strategy, and recover some of the accuracy that a larger HLL would have had?  Certainly, one shouldn’t be able to do this (short of creating a new algorithm for counting distinct values) since once we use the HLL on a dataset the extra information that a larger HLL would have gleaned is gone.  We can’t recover it and so we can’t expect to magically pull a better estimate out of thin air (assuming Flajolet et al. have done their homework properly and the algorithm makes the best possible guess with the given information – which is a pretty good bet!).  Instead, in this series of posts, I’ll focus on how doubling plays with recovery time and set operations.  By this, I mean the following:  Suppose we have an HLL of size 2n and while its running, we double it to be an HLL of size 2n+1. Initially, this may have huge error, but if we allow it to continue running, how long will it take for its error to be relatively small?  I’ll also discuss some ways of modifying the algorithm to carry slightly more information.

The Candidates

Before we begin, a quick piece of terminology.  Suppose we have an HLL of size 2n and we double it to be an HLL of size 2 n+1.  We consider two bins to be partners if their bin numbers differ by 2n.  To see why this is important – check the post on HLL folding.

Colin and I did some thinking and came up with a few naive strategies to fill in the newly created bins after the doubling. I’ve provided a basic outline of the strategies below.

  • Zeroes – Fill in with zeroes.
  • Concatenate – Fill in each bin with the value of its partner.
  • MinusTwo – Fill in each bin with the value of its partner minus two. Two may seem like an arbitrary amount, but quick look at the formulas involved in the algorithm show that this leaves the cardinality estimate approximately unchanged.
  • RandomEstimate (RE) – Fill in each bin according to its probability distribution. I’ll describe more about this later.
  • ProportionDouble (PD) – This strategy is only for use with set operations. We estimate the number of bins in the two HLLs which should have the same value, filling in the second half so that that proportion holds and the rest are filled in according to RE.

Nitty Gritty of RE

The first three strategies given above are pretty self-explanatory, but the last two are a bit more complicated. To understand these, one needs to understand the distribution of values in a given bin.  In the original paper, Flajolet et al. calculate the probability that a given bin takes the value k to be given by (1 - 0.5^k)^v - (1 - 0.5^{k-1})^v where v is the number of keys that the bin has seen so far. Of course, we don’t know this value (v) exactly, but we can easily estimate it by dividing the cardinality estimate by the number of bins. However, we have even more information than this. When choosing a value for our doubled HLL, we know that that value cannot exceed its partner’s value. To understand why this is so, look back at my post on folding, and notice how the value in the partner bins in a larger HLL correspond to the value in the related bin in the smaller HLL.

Hence, to get the distribution for the value in a given bin, we take the original distribution, chop it off at the relevant value, and rescale it to have total area 1. This may seem kind of hokey but let’s quickly look at a toy example. Suppose you ask me to guess a number between 1 and 10, and you will try to guess which number I picked. At this moment, assuming I’m a reasonable random number generator, there is a 1/10 chance that I chose the number one, a 1/10 chance that I chose the number two, etc. However, if I tell you that my guess is no larger than two, you can now say there there is a 1/2 chance that my guess is a one, a 1/2 chance that my guess is a two, and there is no chance that my guess is larger. So what happened here? We took the original probability distribution, used our knowledge to cut off and ignore the values above the maximum possible value, and then rescaled them so that the sum of the possible probabilities is equal to zero.

RE consists simply of finding this distribution, picking a value according to it, and placing that value in the relevant bin.

Nitty Gritty of PD

Recall that we only use PD for set operations. One thing we found was that the accuracy of doubling with set operations according to RE is highly dependent on the the intersection size of the two HLLs. To account for this, we examine the fraction of bins in the two HLLs which contain the same value, and then we force the doubled HLL to preserve this fraction

So how do we do this? Let’s say we have two HLLs: H and G. We wish to double H before taking its union with G. To estimate the proportion of their intersection, make a copy of G and fold it to be the same size as H. Then count the number of bins where G and H agree, call this number a. Then if m is the number of bins in H, we can estimate that H and G should overlap in about a/m bins. Then for each bin, with probability a/m we fill in the bin with the the minimum of the relevant bin from G and that bin’s partner in G. With probability 1 - a/m we fill in the bin according to the rules of RE.

The Posts

(Links will be added as the posts are published. Keep checking back for updates!)

Set Operations On HLLs of Different Sizes

Introduction

Here at AK, we’re in the business of storing huge amounts of information in the form of 64 bit keys. As shown in other blog posts and in the HLL post by Matt, one efficient way of getting an estimate of the size of the set of these keys is by using the HyperLogLog (HLL) algorithm.  There are two important decisions one has to make when implementing this algorithm.  The first is how many bins one will use and the second is the maximum value one allows in each bin.  As a result, the amount of space this will take up is going to be the number of bins times the log of the maximum value you allow in each bin.  For this post we’ll ignore this second consideration and focus instead on the number of bins one uses.  The accuracy for an estimate is given approximately by 1.04/√b, where b is the number of bins.  Hence there is a tradeoff between the accuracy of the estimate and the amount of space you wish to dedicate to this estimate. Certainly, projects will have various requirements that call for different choices of number of bins.

The HLL algorithm natively supports the union operation.  However, one requirement for this operation is that the HLLs involved are of the same size, i.e. have the same number of bins.  In practice, there’s no guarantee that HLLs will satisfy this requirement.  In this post, I’ll outline the method by which we transform an HLL with a certain number of bins to one with a fewer number of bins, allowing us to perform set operations on any two HLLs, regardless of size.

Key Processing

As discussed in the HyperLogLog paper, to get a cardinality estimate with an HLL with 2n bins on a data set we pass over each key, using the placement of the rightmost “1″ to determine the value of the key and the next n digits to the left to determine in which bin to place that value.  In each bin, we only store the maximum value that that bin has “seen.”

Below I’ve shown how two HLLs (one of size 23 and one of size 24) process two different keys.  Here, the keys have the same value, because the purpose of this example is to illustrate how the location in which we place the key changes when the HLL has twice the number of bins.

HLL Folding: Simple Bin/Reg Example

Above, the keys which are attributed to the fifth and thirteenth bins in the larger HLL would both have been attributed to the fifth bin in the smaller HLL.  Hence, unraveling the algorithm a bit, we see that the values which are seen by the fifth and thirteenth bins in the larger HLL would have been seen by the fifth bin in the smaller HLL had they run on the same dataset.  Because of this, in the case where the two algorithms estimate the same dataset, the value stored in the fifth bin in the smaller HLL is the maximum of the values stored in the fifth and thirteenth bins in the larger HLL.

Folding HLLs

What happened above is not an isolated phenomenon.  In general, if one uses the HLL algorithm twice on a dataset, once with 2n+1 bins and once with 2n bins, the value in the kth bin in the smaller HLL will be the maximum of the values in the kth and (k + 2n)th bins of the larger HLL.  As a result, if given an HLL of size 2n+1 that one wishes to transform to an HLL of size 2n, one can simply fold the HLL by letting the value of the kth bin in the folded HLL be given by the maximum of the values in the kth and (k + 2n)th bins of the original HLL.

In fact, we can fold any HLL an arbitrary number of times.  Repeating this process, we can take an HLL of size 2n to an HLL of size 2m for any m which is less than or equal to n.  Hence if we wish to perform a set operation on two HLLs of different sizes, we can simply fold the larger HLL repeatedly until it is the same size as the smaller HLL.  After this, we can take unions and intersections as we wish.

Folding – An Example

Below, we show a simple example of how folding works.  Here we have an HLL with 23 bins which we fold to be an HLL with 22 bins.  In the diagram, I’ve torn an HLL of size 23 in half and placed the strips side by side to emphasize how we line up bins and take maximums in the folding process.  Notice that the values in the folded the bins of the folded HLL are the maximum of the relevant bins in the larger HLL.

HLL Folding Example

Advantages and Drawbacks

This technique gives us the flexibility to be able to perform set operations on any two HLLs regardless of the number of bins used in the algorithms.  It’s usefulness in this regard is a bit offset by the fact that the accuracy of the estimate on these is limited by the accuracy of the least accurate HLL.  For example, an HLL of size 210 will have accuracy roughly 23 times better than an HLL of size 2 (to see where I’m getting these numbers from, you’ll have to read the paper!).  Unfortunately, if we combine these with a set operation, our resulting estimate will have the same accuracy as the smaller HLL.

Summary

The HyperLogLog algorithm supports set operations in a nice way only if the number of bins used is fixed.  Using folding, one can correct for this by reducing the size of the larger HLL to the size of the smaller.  The cost of this convenience is in the accuracy of the estimates after the folding process.  In my next post, I’ll explore some methods of performing the set operations without this loss of accuracy.

K-Minimum Values: Sketching Error, Hash Functions, and You

Introduction

“All known efficient cardinality estimators rely on randomization, which is ensured by the use of hash functions.”
–Flajolet, et al

Recalling the KMV algorithm Matt presented in his last post, one will note that every stream element is passed to a hash function as part of the processing step. This is meant to transform the data being operated on from its native distribution into something uniformly distributed. Unfortunately, we don’t live in a perfect world, and since all of the algorithm’s analysis assumes that this hash function does its job well, we wanted to get some sense of how it behaves under less friendly conditions. The first half of this post will investigate the algorithm’s performance when we artificially introduce bias, and the second half will look at its behavior with a handful of real hash functions.

A Simple Error Model

The first hash function error model that came to mind is admittedly unrealistic and ham-fisted, but hopefully illustrative. Suppose you have a stream of fixed sized, an ideal hash function, and from these you produce a distinct value estimate using the KMV algorithm. Now suppose that for some unlucky reason, one bit from your hash function is stuck; it’s always a zero or a one, but the other 31 bits are free of this curse.

Biased hash schematic

There’s nothing to stop you from computing a distinct value estimate using this janky hash with KMV, but your intuition suggests that it shouldn’t be very good. We went through this exact process with various choices of k, using a random number generator to simulate a perfect hash function.

Before we look at the data, let’s think about what we should expect. From the perspective of KMV, it shouldn’t make a whole lot of difference if your kth smallest element is odd or even (for instance, in a case where the lowest order bit always/never set, respectively). It does, however, make a difference if you’re actually incapable of seeing values smaller than 231, which is what happens when the highest order bit is always set. Thus, in both the 0-biasing and 1-biasing cases, we should expect that higher order bits have a much more dramatic effect on error than lower order bits.

Error from setting bits in KMV

Notice how the performance degradation follows two different patterns. When we are fixing bits as ones, the observed error increases fairly smoothly, and tends to result in under estimates. In contrast, setting bits to zeros results in no change until the error increases producing catastrophic over estimates. Additionally, larger values of k have protective effects against these biases.

A Somewhat Less Simple Error Model

Now that we have some intuition for the problem, let’s get a little more subtle. Instead of always setting the nth bit as a 0 or 1, let’s add a probabilistic element. We’ll do the same experiment as before, except we will now fix the nth bit with probability p. Thus, when p = 0 we have a perfectly well behaved KMV, and when p = 1, we have the experiment we just finished discussing. In the following diagram, each tile represents the average error across several experiments in which a stream of 1,000,000 unique elements was fed to a KMV sketch (k = 1024) which was rigged to modify the nth bit with probability p.

Heatmap of KMV error

Many of the same lessons can be seen here — high order bits matter more, downward biasing degrades performance sharply, upward biasing degrades more smoothly. Additionally, as we’d expect, within a given bit, more bias means more error.

Send in the Hash Functions

All of the experiments to this point have involved using a random number generator instead of hashing real data. I think it’s time that we took a look at what happens when we drop in a few real hash functions with real data. For the following experiments, I’m using four 32-bit hashes — Murmur3, SDBM, Arash Partow’s hash, and one of the old Donald Knuth hashes. You may recall these from our series on choosing a good hash function (although 64-bit versions were used there). I chose four text corpuses:

  • Romeo and Juliet, stripped of all punctuation and converted to lower case (3794 words)
  • /usr/share/dict/words (99171 words)
  • 1,000,000 random 12 character long strings, each sharing the same suffix: “123456″
  • 1,000,000 random 12 character long strings, each sharing the same prefix: “123456″

Using formulas from this paper, we can compute the relative error that 99% of KMV estimates should theoretically fall within. This turns out to depend on k and the stream size.

To make these pictures, I chose random values of k within each hash/document pair at which I evaluate the cardinality estimate and compute the relative error. The lines are linear interpolations between sampled points and are shown solely for clarity. The y-axis scale is adjusted on a per-picture basis to best display the theoretical envelope within which we expect our errors to lie.

Now that we’ve gotten through all the necessary preamble, let’s take a look at the results!

KMV error for Romeo and Juliet

One picture in and we’ve already learned a lesson: choice of hash function seriously matters! SDBM and DEK cause the algorithm to perform well below its capabilities. DEK’s error is actually off the charts for most of this graph, which is why it does not appear until k > 3,000.

KMV error for /usr/bin/dict/words

On a bigger corpus with tighter theoretical error bounds, Murmur3 and AP are still doing quite well. Do note, however, that AP dips outside the envelope for a while at k = 70,000 or so.

KMV error for suffixKMV error for shared-prefix strings

With the random strings, SDBM performs much better than it did on English words. DEK, however, is still hopeless. It’s a little tough to see on these pictures, but at high k, AP starts to fall off the wagon, and even Murmur3 dips outside the envelope, though not beyond what we’d expect, statistically speaking. Honestly, I was hoping for some fireworks here, but they didn’t materialize. I was wondering if we might see some hashes break on one version of these strings, and do fine on the other due to the location of the varying key bits (high order/low order). Sadly, that didn’t happen, but a negative result is a result none the less.

To summarize these, I made the following table, which shows us the percentage of time that an one of the samples falls outside the theoretical envelope. In this view, Murmur3′s superiority is clear.

AP DEK Murmur3 SDBM
Romeo and Juliet 0.00% 100.00% 0.00% 61.54%
/usr/share/dict/words
10.76% 100.00% 0.00% 68.46%
Common Suffix 7.22% 99.11% 1.10% 0.27%
Common Prefix 3.33% 100.00% 0.22% 0.0001%

Fin

KMV is a very nice little algorithm that is incredibly simple to understand, implement, and use. That said, if you’re going to make use of it, you really do need to practice some due diligence when choosing your hash function. With packages like smhasher available, trying out multiple hash functions is a cinch, and a little legwork at the start of a project can save you from confusion and despair later on!

Efficient Field-Striped, Nested, Disk-backed Record Storage

At AK we deal with a torrent of data every day. We can report on the lifetime of a campaign which may encompass more than a year’s worth of data. To be able to efficiently access our data we are constantly looking at different approaches to storage, retrieval and querying. One approach that we have been interested in involves dissecting data into its individual fields (or “columns” if you’re thinking in database terms) so that we only need to access the fields that are pertinent to a query. This is not a new approach to dealing with large volumes of data – it’s the basis of column-oriented databases like HBase.

Much of our data contains nested structures and this causes things to start to get a little more interesting, since this no longer easily fits within the data-model of traditional column-stores. Our Summarizer uses an in-memory approach to nested, field-striped storage but we wanted to investigate this for our on-disk data. Google published the Dremel paper a few years ago covering this exact topic. As with most papers, it only provides a limited overview of the approach without covering many of the “why”s and trade-offs made. So, we felt that we needed to start from the ground up and investigate how nested, field-striped storage works in order to really understand the problem.

Due to time constraints we have only been able to scratch the surface. Since the community is obviously interested in a Dremel-like project, we want to make the work that we have done available. We apologize in advance for the rough edges.

Without further ado: Efficient Field-Striped, Nested, Disk-backed Record Storage (on GitHub).

Sketch of the Day: K-Minimum Values

Intro

We’ve been talking about probabilistic distinct value counting with sketches (DV sketches) for a while now and have had some fun experiences implementing them into our production environment. In this post I want to talk about a DV sketch that is very intuitive and easy to implement, the K-minimum Values sketch (KMV). While KMV sketches are relatively lightweight and accurate, they are not the best of breed when it comes to DV counting. They are useful in two ways to me though, for exposition and multi-set operations.

History

KMV seems to have been first introduced in 2002 by Ziv Bar-Yossef et. al. in the great paper Counting distinct elements in a data stream. In this paper they talk about improving on the basic intuition by the seminal DV sketch papers of Flajolet and Martin and Alon, Matias, and Szegedy (AMS) (AMS put some formality around the frequency moment problems, bounds of algorithms etc.) Flajolet and Martin’s paper is in turn based upon work from Morris 1978 (looking for streaks of right-most zeroes i.e. the predecessor to LogLog and HyperLogLog). These are fun to read (although they admittedly get pretty mathy) and it’s cool to see the progression of knowledge, accuracy, and efficiency as these guys do their work. You can almost imagine the fist fights that happen during their meet-ups! The final detailed work on KMV is by Beyer et. al. in On Synopses for Distinct-Value Estimation Under Multiset Operations.

How it works

The intuition behind KMV is straightforward. Supposing you have a good hash function, i.e. hash values are evenly distributed over the hash space (I will normalize the hash space output to [0-1] for the rest of this), then you could estimate the number of distinct values you have seen by knowing the average spacing between values in the hash space. If I see 10 distinct values, I would expect them on average to be spaced about 1/10th apart from each other. We could do this cheaply by keeping track of, say, the smallest value you have ever seen. If the values are indeed uniformly distributed and provided you’ve thrown a decent amount of data through it, you could guess that the smallest value you have seen is a decent estimate of the average spacing of hash values in your space.

Of course, this doesn’t have a lot of “nice” properties. Taking only one value opens you up to a ton of variance and you are fairly dependent on the “goodness” of your hash. In order to improve upon this Bar-Yossef suggests keeping the k smallest values you have ever seen around. The algorithm becomes:

Initialize KMV with first k values
for all h(n):
     if h(n) < max(KMV):
          insert h(n) into KMV set
          remove largest value from KMV

Cardinality(KMV):
     return: (k-1)/max(KMV)

For a KMV sketch of size k=3, graphically you have:

A very straightforward approach. Note that the “-1″ in the numerator comes from a bias correction in the estimate. You’re going to have to read the paper for that. So, the size of the sketch is basically k 64bit values large. Click below to run a KMV simulation:

Click above to run the KMV simulation

Set Operations

Performing set operations with KMV’s is also incredibly straightforward. The intuition around unions is that there is no difference between combining 2 KMV sketches and keeping the k minimum values in both versus just keeping one to start with, so unions are “lossless”. To perform union, you merely take 2 sketches and combine their values and keep the k smallest ones (if the 2 sketches are of different sizes, k and k’, then you keep the min(k,k’) values in order to keep the lowest resolution).

Union(A,B):
     k = min( |A|, |B|)
     return: min_k( A U B )

For intersections you use the KMV to estimate the Jaccard coefficient for the 2 (or n) sets. Basically, you treat the 2 KMV sketches for each set as a random uniform sample and intersect these to estimate Jaccard. So, you assemble the k minimum values of the two sets (as you did in union above), and intersect this result with the original sketches to obtain an estimate of the overlap of the 2 sets. The steps are:

IntersectionCard(A,B):
     L = UnionSet(A,B)  # the set this time, not just the cardinality
     k = min( |A|, |B|)
     K = | L ∩ A ∩ B |
     return: K/k * Cardinality(L)

One of the nice features of KMV which is different than say HyperLogLog, is that you can take n-way intersections by extending the algorithm above. To do this with HyperLogLog you actually need to compute the n-way algebra for set intersection i.e.

|A ∩ B| = |A| + |B| - |A U B|

However, in our experience of using KMV for set operations on Zipfian data, KMV’s still don’t perform as well HyperLogLog sketches for computing n-way intersections using the same amount of memory.

Expansion to Multisets

One of the nice features of KMV sketches is their expansion to supporting multiset operations, dubbed the AKMV sketch. This is great if you are using them for document representations and want to support document similarity operations like tf-idf (or any other multiset operation). In order to expand the basic KMV structure to support multisets (described here) you just add a counter on top of the values you are storing. In this way you get a decent sample of the counts of things in the stream/document to use for multiset operations. Most other DV sketches, HyperLogLog in particular, don’t support these types of queries.

To see how well this might work in practice, I took a look at some simple tf-idf similarity against the 20 news groups data set. This data set contains about 1000 news group emails on various topics such as atheism and motorcycles (woo!). For each article I constructed an AKMV sketch of the words in it and used this representation as the basis for tf-idf.  I cleaned up the data marginally by limiting my analysis to the 5000 most common words in the corpus (as seems to be the norm) and only considered alpahnumeric “words”.   Additionally, I cherry picked only a few newsgroups from the set that showed “nice” separation in the SVD.  You can think of the documents looking a bit like this where the red dots are the entries in the AKMV and the green dots are not (as above):

Once I created the tf-idf matrix, I SVD-ed it and plotted each newsgroup against the second and third singular vectors (the first vector in this case contained mostly information about the mean of the document vectors and contained little real information for classification).  The intermediate singular vectors for differing k were projected onto the actual singular vectors from the complete matrix (k = Inf).  Running through increasing k, the newsgroups look like this (click on the graphic to restart the animation):

Click image to restart animation

You can see the structure start to appear relatively quickly for small k and you can also see how some of the articles “stick” to their final spots due to them having less than k words.  Clearly you would have to do more work and testing if you wanted to implement something like this in a real classifier or search engine but it seems to be a promising approach.

Here is the same thing for a corpus composed of 23 articles about the Tom Cruise/Katie Holmes divorce and 20 articles about the Higgs boson.

Click image to restart animation

Using document sketches as a basis for a recommender system/search engine or any other application that requires similarity metrics seems like a promising avenue.  It would be very interesting indeed to run some real tests of precision/recall and memory footprint for sketch based recommenders/classifiers against other more standard approaches.

Disclaimer:

I make no claims about having built a classifier of any sort here. A lot of work and decisions would be necessary to move from these ideas to a useful classification scheme in a real environment. I was interested in how much of the flavor of a document would be retained in an AKMV sketch. Based on the above results, I think that the answer is “quite a bit,” even for modest values of k. I don’t think it would be out of the question to try to build a system that allowed you to compute similarities or apply classification tools after the sampling process inherent in the construction of these sketches.

Compression

An interesting thing to notice is that as your DV count gets larger, your max value of the k items is getting smaller. What this means is a simple compression algorithm that works is to just throw away the higher order unused bits of all the k values. Oddly, as the DV count gets larger your KMV will get smaller without losing accuracy.

Summary

There are many DV sketches in the world and KMV is one of the most interesting due to how easy it is to comprehend and implement. I particularly enjoy using KMV as a pedagogical tool and a solid jumping off point for DV sketching. The fact that KMV is so straightforward makes it stand out in a world of more confusing math and complicated sketching algorithms. In the right context it very well could be the right solution for your sketching needs, especially given the multiset support.

No BS Data Salon #3

On Saturday Aggregate Knowledge hosted the third No BS Data Salon on databases and data infrastructure. A handful of people showed up to hear Scott Andreas of Boundary talk about distributed, streaming service architecture, and I also gave a talk about AK’s use of probabilistic data structures.

The smaller group made for some fantastic, honest conversation about the different approaches to streaming architectures, the perils of distributing analytics workloads in a streaming setting, and the challenges of pushing scientific and engineering breakthroughs all the way through to product innovation.

We’re all looking forward to the next event, which will be in San Francisco, in a month or two. If you have topics you’d like to see covered, we’d love to hear from you in the comments below!

As promised, I’ve assembled something of a “References” section to my talk, which you can find below.

(Hyper)LogLog

Random

  • Sean Gourley’s talk on human-scale analytics and decision-making
  • Muthu Muthukrishnan’s home page, where research on streaming in general abounds
  • A collection of C and Java implementations of different probabilistic sketches

Attendees of the third No BS Data Salon

Follow

Get every new post delivered to your Inbox.

Join 117 other followers