## HyperLogLog Engineering: Choosing The Right Bits

Author’s Note: this is just a quick post about an engineering hiccup we ran into while implementing HyperLogLog features that aren’t mentioned in the original paper. We have an introduction to the algorithm and several other posts on the topic if you’re interested.

Say you had two HyperLogLog data structures with 5-bit-wide registers, one with $log_{2}m = 11$ and the other with $log_{2}m = 15$, and wanted to compute their union. You could just follow my colleague Chris’ advice and “fold” the larger one down to the size of the smaller one and then proceed as usual taking the pairwise max of the registers. This turns out to be a more involved process than Chris makes it out to be if you designed your HLL implementation in a particular way. For instance, if you use the 15 least(/most) significant bits of the 64-bit hashed input to determine register index and the next 30 bits to determine the register value, you end up in a tricky situation when you truncate the last 4 bits of the index to get the new 11-bit index.

If you imagine feeding the same element into an HLL of the smaller size, then the 4 bits you truncated from the index would have actually been used in the computation of the register value.

You couldn’t simply take the original register value you computed, you’d have to take into account the new prefix added to the register value bit string. If the prefix has a 1 in it, you would recompute the run of zeroes on just the prefix (because you know it contains a 1 and thus all the information you need), and if not, you’d add the length of the prefix to the original register value computed. Not a ton of work, but having clutter like this in algorithmic code distracts the reader from the true intention. So how do we avoid this?

Well, you could say that it’s very, very unlikely that you’ll ever need more than 30 bits for your register value, so you could assume that the register width would remain constant forever and use the bottom 30 bits for your register value and the next $log_{2}m$ bits for your register index. That way you could just truncate the last 4 bits of the index and know that your register value would still be the same. On the other hand, if you’re Google, that may not be true. In that case, what you should do is use the $log_{2}m$ least (/most) significant bits of your hashed value for the register index and the 30 most (/least) significant bits for the register value.

Now you can just truncate the register index and use the original register value.

If you’re using a good hash function like MurmurHash3 that gives you 128 bits of entropy, you could simply compute the register index from the first 64-bit word in the hash and compute the register value from the second 64-bit word and completely ignore this problem up to a mind-bending $log_{2}m = 64$ and register width of 6 (aka the heat death of the universe).

I know it’s not always possible to anticipate this problem in the early stages of implementing and vetting an algorithm, but hopefully with a bit of research the next time someone looks to implement HLL they’ll see this and learn from our mistake.

` `

### The Past

The Summarizer, our main piece of aggregation infrastructure, used to have a very simple architecture:

1. RSyslog handed Netty some bytes.
2. A Netty worker turned those bytes into a `String`.
3. The Netty worker then peeled off the RSyslog envelope to reveal a payload and an event type. We call this combination an `IMessage`.
4. The `IMessage`‘s payload got turned into an `IInputEvent` (basically a POJO).
5. The `IInputEvent` was mapped to one or many summaries, based on its event type, and which would then be updated with the event.

Netty workers contend for a central summarization lock before updating summaries.

All of this work was done inside the Netty workers, and the synchronization of the summary objects was handled by a single lock on all of them. Some events were mapped to a single summary, others to a half dozen. Luckily the payloads were simple (CSV-formatted) and the summaries were even simpler. It didn’t really matter if the events hit one summary or ten, we could summarize events much faster than we could parse them. Under these conditions we could handle 200k messages per second, no sweat.

Slowly, new reporting features were added and the summaries became more complex. The number of operations per event increased, and throughput dropped to 100k/sec. Progressively, summarization supplanted parsing as the bottleneck.

Then we introduced a more complex, nested JSON event format (v2 messages) in order to support new product features. Complex, nested events meant ever more complex, nested summaries, which meant ever more time holding the single lock while updating them. Parsing time increased with the new payload format, but was still far faster than updating the summaries. Throughput for v1 messages had dipped to 60k/sec, and 10k/sec for the v2 messages.

Moreover, the new features the v2 messages permitted weren’t simply an open-ended exercise: with them came the customers that demanded those features and their additional traffic.  The Summarizer simply wouldn’t stand up to the predicted traffic without some work. This post is an overview of the multithreaded solution we used and hopefully will provide some insight into the pitfalls of concurrency in Java 6 and 7.

### Objective

Get v1 message throughput back to 200k/sec and v2 throughput to 100k/sec, ideally on our production hardware. Simple enough, given that I knew the main bottlenecks were summarization and to a lesser extent the parsing of the `IMessage`s to `IInputEvent`s.

### Let’s put in some queues

The basic premise of concurrency is that you find the time-consuming bits of work, throw some queues and workers at them, and out comes performance, right? (Wrong! But I’ve got a whole narrative going here, so bear with me.) The natural places for inserting these queues seemed to be between steps 3 and 4, and steps 4 and 5. If parsing `IMessage`s to `IInputEvent`s and summarizing `IInputEvent`s are the only time-consuming work units, adding concurrency there should open up those bottlenecks. According to the “train book” I had three options for queues:

• `ArrayBlockingQueue` (henceforth ABQ) – bounded, backed by an array (duh), uses a single ReentrantLock
• `LinkedBlockingQueue` (henceforth LBQ)- bounded, backed by a linked list (duh), uses two ReentrantLocks (one for the head and one for the tail)
• `ConcurrentLinkedQueue` (henceforth CLQ)- unbounded, backed by a linked-list, uses no locks, instead relies on a work-stealing algorithm and CAS

`IMessage` to `IInputEvent` parsing and `IInputEvent` summarization are buffered by `BlockingQueue`s.

We added a message parsing queue to which the Netty workers would dump `IMessage`s. Message parser workers would take those `IMessage`s and turn them into `IInputEvent`s. They would then distribute those `IInputEvent`s to a summarization queue I added to each summarization worker. Since I didn’t want to lock each report object, I decided that only a single summarization worker would ever write to a particular summary. (Martin Thompson’s blog posts about the Single Writer Principle were inspiration for this.) That is, each summarization worker would be assigned (by round-robin at startup) one or many summaries to own exclusively. So, in total I added one multiple-producer, multiple-consumer (MPMC) message parsing queue and N multiple-producer, single-consumer (MPSC) summarization queues (one for each summarization worker).

### The First Bottleneck: Parsing

I slotted in the various queues available, replayed some traffic to get a feel for what was going on.

• The message parsing queue was always full, which confirmed my suspicion that parsing, not Netty, was the first bottleneck.
• The summarization queues were split between two groups: those that were always full and always empty. The reason was clear: some summarization workers were assigned high-volume summaries and others low-volume summaries.

This was my first lesson in queueing: queues are on average completely full or empty because it is nearly impossible to perfectly balance production/consumption rates. Which leads to the second lesson: CLQ (well, any unbounded queue) probably shouldn’t be used as a producer/consumer queue because “completely full” means “always growing” for an unbounded queue. Naturally, that’s an OK situation when consumption outpaces production, but in that scenario I wouldn’t have needed the queue in the first place. I needed back-pressure and only the blocking (in this case, bounded) queues could give me that.

In order to address the parsing bottleneck, I wanted to get a better grasp of the `IMessage` to `IInputEvent` throughput rate under different configurations. I constructed a test in which Netty workers would either:

• do all the parsing work themselves, and then discard the message, or
• would enqueue `IMessage`s to the message parsing queue (either ABQ or LBQ), and parser workers would dequeue, parse, and then discard the `IInputEvent`. CLQ was not included here since it would consistently OOM as the queue grew without bound.

Each Netty worker would be responsible for a single connection and each connection could provide as many as 150k messages per second. Results for v1 and v2 message parsing were nearly identical, as were the results for Java 6/7, so they are presented here without distinction.

When Netty did all of the parsing work, throughput maxed out at about 130k/sec.

Message parsing throughput with a `BlockingQueue` between the Netty workers and the message parser workers. The facet labels are [Queue Implementation, Queue Size].

• Without a queue, throughput topped out at 130k/s. With a queue and the right parser worker count, each of the four Netty workers could produce 60k/sec worth of `IMessage`s. Notably, neither situation provoked anywhere near 100% saturation on (# netty worker + # parser worker) cores, so I have to believe that it’s simply a matter of having dedicated parsing threads that are not affected by the context switching required to read from the network. Say context switching takes up $r$% of your time, then 5 netty workers can do at most $w_0 = 5(1-r/100)$ units of work. However, 4 Netty workers and 1 parser worker can do $w_1 = 4(1-r/100) + 1 > w_0$ units. The fact that 4 Netty workers + 1 parser worker yields about 130-150k/sec, which is a small gain over just 5 Netty workers, suggests this. It might also be a matter of code “locality”: by allowing each thread to focus on a narrower scope of work, better branch prediction or compilation may be possible.
• ABQ, touted as the end all of high-performance Java queues, gave us “atrocious” throughput, compared to LBQ, if more than two consumer threads were hitting it. This was surprising until I poked an active VM with `SIGQUIT` a few hundred times only to find that most of the workers were waiting on the ABQ’s `ReentrantLock`. The difference between two and three consumers hammering that lock amounted to a 50% drop in throughput.
• LBQ’s split lock seemed to degrade more gracefully in the presence of “extra” producers or consumers. Specifically, the overhead of GC and a linked-list (vs. array) was less than that produced by lock contention on ABQ’s single lock. More remarkably, 2-8 parser workers always produced better results than a single parser worker, so a misconfiguration here couldn’t really do worse than revert to the 1 worker scenario. ABQ was not so lenient, however, dropping to throughput numbers lower than the Netty-only setup after 2 parser workers.
• Queue size was largely irrelevant compared to the impact of adding or removing even a single producer/consumer. As long as the queue is large enough to buffer jitters, there’s really little point in spending hours tuning it.

Progress! At least I knew I could parse the 200k messages per second I needed.

### The Second Bottleneck: Summarization

I knew that I wouldn’t be able to preserve the full parsing throughput simply because the queuing/dequeuing latency of another queue would always be present. The tradeoff was going to be adding more summarization workers at the cost of more time spent by the message parsing workers distributing the newly parsed `IInputEvent`s to all relevant summarization workers. Each event would likely be distributed to more than one summarization worker, which meant a sequential lock acquisition for each summarization worker.

The cost of delivery was affected by the number of message parser workers, the number of summaries, the number of summarization workers, as well as the fan-out factor of each particular event, and hence on the proportions of different events to each other in a “nominal” data stream. This seemed like too many variables to isolate and too brittle of a measurement to be of any use. Instead, I threw out the fine-grained rigor and just plotted as many things as I could. I ran all the queues at one size: 2048.

At high throughputs, adding more workers simply makes things worse. Facet labels are [Message Parser Queue Impl-Summarization Queue Impl, JDK].

At lower throughputs, lock overhead becomes less of a factor. Facet labels are [Message Parser Queue Impl-Summarization Queue Impl, JDK].

• Again, the touted ABQ is matched or bested by LBQ in many configurations. It’s very, very interesting to me that GC on such fast-moving LBQs isn’t a massive issue. That said, for these tests I was running 30GB heaps, so the new generation was rather large, and the nodes of the linked list are extremely short-lived. Don’t write off LBQ, especially with higher producer/consumer counts!
• Again, it’s simply stunning how much of a difference a single added or removed producer or consumer can make on the total throughput. Our production machines have enough hardware threads to cover the worker threads, so it’s unlikely that resource starvation is a problem here. It seems that application performance can suffer immensely from simple lock convoys caused by too many workers. Comparing the v1 and v2 plots, it’s clear that the queue lock(s) can’t support any more contention from new workers at high throughputs. Adding more workers at 100k/sec completely guts performance in a way that simply does not occur at 25k/sec. Lock overhead can destroy performance at high throughputs!
• The “best” worker configurations for v1 are, from a performance perspective, incompatible with v2 workloads and vice versa. It is absolutely crucial to distinguish and separate the different types of workloads. Trying to run them together will lead to misleading and muddled results. Tease them apart and you can fairly easily optimize both. For instance, LBQ-LBQ seems to work best with 2 summarization workers for v1 workloads. However, that configuration sacrifices 50% of peak performance on v2 workloads, which worked best with 3 or 4 summarization workers. The way this separation is implemented in production is via a rule in our event routing layer: all v1 messages are routed to one Summarizer and all v2 messages are routed to another. If that kind of separation isn’t possible, it’s probably worth instantiating two different queues and balancing worker pools separately, instead of trying to lump all the events together.
• Java 6 to Java 7 bought us nothing on this hardware. You may note that under some configurations, performance appears to dip slightly under Java 7, but that’s slightly misleading because I’ve used averages of throughputs in these plots for visual clarity. The performance “dip” easily falls within the jitter of the raw data.

The problem was that despite these improvements I hadn’t reached my stated goals. It was time to look a bit further afield than java.util.concurrent.

### Disruptor

I’d mentioned that Martin Thompson’s Mechanical Sympathy blog had been inspiration for some of our design choices. It was also an excellent introduction to LMAX’s Disruptor, which can be used to simulate a graph of bounded queues. Since it advertised vast improvements in throughput over LBQ and ABQ, I decided to give it a shot.

Side note: Yes, I know the Disruptor is meant to be used when the actual bytes of data are in the `RingBuffer`‘s entries, as opposed to just references. No, I can’t do that easily because we have variable message sizes and using the max size as an upper bound for the entries would make the buffer either too small (in entry count) or too large to fit into L3, as advised. If I get desperate, I might consider re-architecting the application to move to a smaller message representation and move the deserialization into the “business logic” as suggested by the first link. The following results and analysis are NOT an endorsement or condemnation of the Disruptor under any kind of rigorous testing regimen. I wanted to see if I could use it as a slot-in replacement for our queues, nothing more, nothing less.

I tried out the Disruptor in the least invasive way I could think of: one `Disruptor` per summarization worker. Each summarization worker would have a `RingBuffer<IInputEvent>` that would be fed off of the various message parser workers. This fits nicely because it supports an easy MPSC configuration with the `MultiThreadedClaimStrategy.` I considered using it for the message parsing queue, but the hoops I’d having to jump through to stripe the RingBuffer to allow an MPMC configuration just seemed like overkill for a preliminary test. I tried out various `WaitStrategy`s but the results shown below are from the ‘busy-spin’ strategy, which gave the best throughput.

Disruptor as a slot-in replacement for an MPSC `BlockingQueue`.

Facet labels are [Message Parser Queue Impl-Summarization Queue Impl, JDK].

Facet labels are [Message Parser Queue Impl-Summarization Queue Impl, JDK].

The results here were unsurprising: the Disruptor did not magically fix my problems. It performed quite well on our production hardware, matching the best results for both v1 and v2 messages, but ended up utilizing more CPU resources despite being allocated an equal number of threads, regardless of `WaitStrategy`. This brings up another interesting point: I had the choice of using `put`/`take` or `offer`/`poll` on our `BlockingQueue`s and ended up choosing `put`/`take`for the same reason. A marginal increase in throughput didn’t seem worthwhile if the tradeoff was having every thread in a busy spin consuming 100% of its core. For us, even a 10% performance increase wasn’t enough to justify the decreased visibility into the “true” utilization of the CPU resources.

### Hardware as a crutch

I was left in a pickle. Short of re-architecting around the “ideal” Disruptor workflow or reworking the way the summarization workers shared work (allowing N workers to 1 summary as well as 1 worker to N summaries) I was without a quick software option for fixing this. So, like the lazy clod that I am, I turned to hardware to buy some more time. I happened to have a faster, more modern CPU on hand, so I gave that a spin. The baseline v2 message throughput was 20k/sec, for reference.

The facet labels are [Message Queue Impl-Summarization Queue Impl, JDK].

Talk about hardware making a difference! Moving onto fresh hardware can literally double performance, without a doubling in clock speed.

• Though the Disruptor configurations gave the best results, the “mundane” LBQ-LBQ ones only trailed them by 8%, using 2-4 fewer threads and nearly a full core’s less of CPU, at that. LBQ-LBQ also beat ABQ-ABQ out handily by about 10% in most configurations.
• The performance benefits of the Java 7 Hotspot over Java 6 are clear on this newer hardware. I saw a 10-20% performance boost across the board. Notably its impact on the Disruptor configurations is more pronounced than on the others.

Note also that the optimal worker counts differ based on hardware, which is expected given the differences between Nehalem and Sandy Bridge. Every little bit of configuration seems to make a difference, and a meaningful one at that!

### Major takeaways:

1. Explore your configuration space: worker counts, JVMs, hardware. One step in any direction in any of those spaces can provide a meaningful performance boost.
3. Don’t bother tuning queue size except for the purpose of jitter or keeping it in L3.
4. Even if you don’t know what the black box at the bottom (or even the middle) of the stack is doing, you can still make progress! Experiment and plot and keep good notes!
5. The Java 7 Hotspot offers a small but consistent performance improvement over Java 6 on newer hardware.

## Efficient Field-Striped, Nested, Disk-backed Record Storage

At AK we deal with a torrent of data every day. We can report on the lifetime of a campaign which may encompass more than a year’s worth of data. To be able to efficiently access our data we are constantly looking at different approaches to storage, retrieval and querying. One approach that we have been interested in involves dissecting data into its individual fields (or “columns” if you’re thinking in database terms) so that we only need to access the fields that are pertinent to a query. This is not a new approach to dealing with large volumes of data – it’s the basis of column-oriented databases like HBase.

Much of our data contains nested structures and this causes things to start to get a little more interesting, since this no longer easily fits within the data-model of traditional column-stores. Our Summarizer uses an in-memory approach to nested, field-striped storage but we wanted to investigate this for our on-disk data. Google published the Dremel paper a few years ago covering this exact topic. As with most papers, it only provides a limited overview of the approach without covering many of the “why”s and trade-offs made. So, we felt that we needed to start from the ground up and investigate how nested, field-striped storage works in order to really understand the problem.

Due to time constraints we have only been able to scratch the surface. Since the community is obviously interested in a Dremel-like project, we want to make the work that we have done available. We apologize in advance for the rough edges.

Without further ado: Efficient Field-Striped, Nested, Disk-backed Record Storage (on GitHub).

## Netty’s CodecEmbedder

We love Netty. It’s a great full-featured network framework for Java. One of the features that rounds out the framework is the CodecEmbedder. It allows you to test your encoders and decoders without any fuss using a offer-poll paradigm. For example, to test our Rsyslog decoder, we simply:

```ChannelBuffer messageBuffer =
ChannelBuffers.copiedBuffer("2011-06-30T00:00:03-07:00 some.host.agkn.net EVT_NM column1,column2\n", CharsetUtil.UTF_8);
RsyslogDecoder decoder = new RsyslogDecoder();
DecoderEmbedder<IRsyslogMessage> embedder = new DecoderEmbedder<IRsyslogMessage>(decoder);
embedder.offer(messageBuffer);

IRsyslogMessage message = embedder.poll();
assertNotNull(message, "Decoded message");
assertEquals(message.getTimestamp(), "2011-06-30T00:00:03-07:00", "Timestamp");
assertEquals(message.getHostname(), "some.host.agkn.net", "Host");
assertEquals(message.getProgramname(), "EVT_NM", "Programname");
assertEquals(message.getBody(), "column1,column2", "Body");
```

One gotcha to watch out for (which always manages to bite me in the butt, and is the impetus for writing this post) is that handlers will only process the type of data that they understand. Data of other types is passed along completely untouched. For example, while the following successfully compiles, it throws a java.lang.ClassCastException: java.lang.String cannot be cast to IRsyslogMessage at embedder.poll():

```RsyslogDecoder decoder = new RsyslogDecoder();
DecoderEmbedder<IRsyslogMessage> embedder = new DecoderEmbedder<IRsyslogMessage>(decoder);
embedder.offer("2011-06-30T00:00:03-07:00 some.host.agkn.net EVT_NM column1,column2\n");

IRsyslogMessage message = embedder.poll();
assertNotNull(message, "Decoded message");
```

The ChannelPipeline that backs the embedder can handle any type of input object. In the above case the object offer‘d is a string which is simply passed through the RsyslogDecoder untouched and tries unsuccessfully to pop out of the poll as an IRsyslogMessage. As long as you always make sure that your offered object is understood by one of your handlers then the embedder will work as you expect.

## Big Data Ain’t Fat Data: A Case Study

We’ve always had a hunch that our users stick to the same geographic region. Sure, there’s the occasional jet-setter that takes their laptop from New York to Los Angeles (or like Rob, goes Chicago to San Francisco) on a daily or weekly basis, but they’re the exception and not the rule. Knowing how true this is can simplify the way we work with user-centric data across multiple data centers.

When Rob asked me to find this out for sure, my first instinct was to groan and fire up Hive on an Elastic MapReduce cluster, but after a second, I heard Matt’s voice in my head saying, “Big Data isn’t Fat Data”. Why bother with Hadoop?

#### The Setup

If I was solving this problem on a small data-set, it’d be pretty straight-forward. I could write a Python script in about 10 minutes that would take care of the problem. It would probably look something like:

```users = {}

for line in sys.stdin:
user, data_center = parse(line)
try:
users[user].append(data_center)
except KeyError:
users[user] = [data_center]

total_users = len(users)
multiple_dc_users = len([u for u in users if len(users[u]) > 1])
```

Easy peasy. However, explicitly storing such a large hash-table gets a little problematic once you start approaching medium-sized data (1GB+). Your memory needs grow pretty rapidly – with M users and N data centers, storage is O(MN) – , and things start to get a little slow in Python. At this point there are two options. You can brute force the problem by throwing hardware at it, either with a bigger machine or with something like Hadoop. Or, we can put on our Computer Science and Statistics hats and get a little bit clever.

What if we turn the problem sideways? Above, we’re keeping a hash table that holds a set of data-center for each user. Instead, let’s keep a set of users per data-center, splitting the problem up into multiple hash tables. This lets us keep a small, fixed number of tables – since I’d hope any company knows exactly how many data centers they have – and spread the load across them, hopefully making the load on each table more tolerable. We can then check how many sets each user falls into, and call it a day.

```data_centers = dict([(dc, set()) for dc in AK_DATA_CENTERS])

for line in sys.stdin:
user, data_center = parse(line)

# Get the total users by intersecting all of the data center sets
...

# Get all users who are in exactly one set by taking symmetric differences (XOR) of data-center sets
# and count the size of that set.
...
```

While this approach theoretically has better performance with the same O(MN) space requirements, with big enough data the space requirements of the problem totally dominate whatever improvement this approach would provide. In other words, it doesn’t matter how small each hash table is, you can’t fit 80GB of user IDs into the 8GB of RAM on your laptop.

It’s looking pretty bleak for the Clever Way of doing things, since what we really want is a magic hash table that can store our 80GB of user IDs in the memory on our laptops.

#### Bloom Filters

Enter Bloom Filters. A bloom filter is a fixed-size set data structure with two minor features/drawbacks:

1. You can never ask a Bloom Filter for the set of elements it contains.
2. Membership queries have a small, controllable, false-positive probability. Bloom filters will never return false negatives.

With a little bit of work, it’s pretty easy to substitute Bloom Filters for plain old hash tables in our sideways approach above. There’s a slight tweak we have to make to our algorithm to accommodate the fact that we can’t ever query a bloom filter for the elements it contains, but the idea remains the same.

#### The Payoff

Suppose now we’re keeping a bloom-filter of users per data center. The only thing we have to work around is the fact that we’ll never be able to recover the list of users we’ve added to each set. So, we’ll just deal with users each time we see them instead of deferring our counting to the end.

With that idea in the bag, there are really only a few things to worry about when a request comes in for a given data center.

• Check the bloom filter for that data center to see if the user has been to that one before
• Check the other bloom filters to see how many other data-centers that user has been to before
• Count the number of total data-centers that user has seen before. If the user is new to this data center, and the user has seen exactly one other data center before, increment the multiple data center user counter
• If the user has never seen any of your data centers before, that user is a completely new user. Increment the total number of users seen.
• If the user has already seen this data-center, this user is a repeat. Do nothing!

We ran our version of this overnight. It took us one core, 8GB of RAM, and just under than 4 hours to count the number of users who hit multiple data centers in a full week worth of logs.

## Never trust a profiler

A week or so ago I had mentioned to Timon that for the first time a profiler had actually pointed me in a direction that directly lead to a positive increase in performance. Initially Timon just gave me that “you’re just a crotchety old man” look (which, in most cases, is the correct response). I pointed him to Josh Bloch’s Performance Anxiety presentation which dives into why it is so hard (in fact “impossible” in Josh’s words) to benchmark modern applications. It also references the interesting paper “Evaluating the Accuracy of Java Proﬁlers”.

Just last week I was trying to track down a severe performance degradation in my snapshot recovery code. I was under some time pressure so I turned on my profiler to try to point me in the right direction. The result that it gave was clear, repeatable and unambiguous and pointed me into the linear probing algorithm of the hash table that I am using. Since I had recently moved to a new hash table (one that allowed for rehashing and resizing) it was possible that this was in fact the root of my performance problem but I had my doubts. (More on this in a future post.) I swapped out my hash table and re-profiled. The profiler again gave me a clear, repeatable and unambiguous result that my performance woes were solved so I moved on. When we were able to test the snapshot recovery code on production snapshots, we found that the performance problems still existed.

My profiler lied to me. Never trust your profiler.

## Streaming Algorithms and Sketches

Here at Aggregate Knowledge we spend a lot of time thinking about how to do analytics on a massive amount of data. Rob recently posted about building our streaming datastore and the architecture that helps us deal with “big data”. Given a streaming architecture, the obvious question for the data scientist is “How do we fit in?”. Clearly we need to look towards streaming algorithms to match the speed and performance of our datastore.

A streaming algorithm is defined generally as having finite memory – significantly smaller than the data presented to it – and must process the input in one pass. Streaming algorithms start pretty simple, for instance counting the number of elements in the stream:

```counter = 0
for event in stream:
counter += 1
```

While eventually `counter` will overflow (and you can be somewhat clever about avoiding that) this is way better than the non-streaming alternative.

```elements = list(stream)
counter = len(elements)
```

Pretty simple stuff. Even a novice programmer can tell you why the second method is way worse than the first. You can get more complicated and keep the same basic approach – computing the mean of a floating point number stream is almost as simple: keep a around `counter` as above, and add a new variable, `total_sum += value_new`. Now that we’re feeling smart, what about the quantiles of the stream? Ah! Now that is harder.

While it may not be immediately obvious, you can prove (as Munro and Paterson did in 1980) that computing exact quantiles of a stream requires memory that is at least linear with respect to the size of the stream. So, we’re left approximating a solution to the quantiles problem. A first stab might be sampling where you keep every 1000th element. While this isn’t horrible, it has it’s downsides – if your stream is infinite, you’ll still run out of space. It’s a good thing there are much better solutions. One of the first and most elegant was proposed by Cormode and Muthukrishnan in 2003 where they introduce the Count-Min sketch data structure. (A nice reference for sketching data structures can be found here.)

Count-Min sketch works much like a bloom filter. You compose `k` empty tables and `k` hash functions. For each incoming element we simply hash it through each function and increment the appropriate element in the corresponding table. To find out how many times we have historically seen a particular element we simply hash our query and take the MINIMUM value that we find in the tables. In this way we limit the effects of hash collision, and clearly we balance the size of the Count-Min sketch with the accuracy we require for the final answer. Heres how it works:

The Count-Min sketch is an approximation to the histogram of the incoming data, in fact it’s really only probabilistic when hashes collide. In order to compute quantiles we want to find the “mass” of the histogram above/below a certain point. Luckily Count-Min sketches support range queries of the type “`select count(*) where val between 1 and x;`“. Now it is just a matter of finding the quantile of choice.

To actually find the quantiles is slightly tricky, but not that hard. You basically have to perform a binary search with the range queries. So to find the first decile value, and supposing you kept around the the number of elements you have seen in the stream, you would binary search through values of x until the return count of the range query is 1/10 of the total count.

Pretty neat, huh?

## Custom Input/Output Formats in Hadoop Streaming

Like I’ve mentioned before, working with Hadoop’s documentation is not my favorite thing in the world, so I thought I’d provide a straightforward explanation of one of Hadoop’s coolest features – custom input/output formats in Hadoop streaming jobs.

### Use Case

It is common for us to have jobs that get results across a few weeks, or months, and it’s convenient to look at the data by day, week, or month. Sometimes including the date (preferably in a nice standard format) in the output isn’t quite enough, and for whatever reason it’s just more convenient to have each output file correspond to some logical unit.

Suppose we wanted to count unique users in our logs by state, by day. The streaming job probably starts looking something like:

```hadoop jar /path/to/hadoop-streaming.jar \
-input log_data/ \
-output geo_output/ \
-mapper geo_mapper.py \
-reducer user_geo_count_reducer.py \
-cacheFile ip_geo_mapping.dat#geo.dat
```

And the output from the job might look like:

```2011-06-20,CA,1512301
2011-06-21,CA,1541111
2011-06-22,CA,1300001
...
2011-06-20,IL,23244
2011-06-21,IL,23357
2011-0-21,IL,12213
...
```

This is kind of a pain. If we do this for a month of data and all 50 states appear every day, that’s at least 1500 records – not quite so easy to eyeball. So, let’s ask Hadoop to give us a file per day, named YYYY-MM-DD.csv, that contains all the counts for that day. 30 files containing 50 records each is much more manageable.

### Write Some Java

The first step is to write some Java. I know, this is a tutorial about writing Input/Output formats for Hadoop streaming jobs. Unfortunately, there is no way to write a custom output format other than in Java.

The good news is that once you’re set up to develop, both input and output formats tend to take minimal effort to write. Hadoop provides a class just for putting output records into different files based on the content of each record. Since we’re looking to split records based on the first field of each record, let’s subclass it.

```public class DateFieldMultipleOutputFormat
extends MultipleTextOutputFormat<Text, Text> {

@Override
protected String generateFileNameForKeyValue(Text key, Text value, String name) {
String date = key.toString().split(",")[0];
return date + ".csv";
}
}
```

It’s a pretty simple exercise, even if you’ve never written a single line of Java. All the code does is take the first field of the key and use it as the output filename. Honestly, the hardest part is going to be setting up your IDE to work with Hadoop (the second hardest part was finding this blog post).

### Use It

The most recent Hadoop documentation I can find, still has documentation on using custom Input/Output formats in Hadoop 0.14. Very helpful.

It turns out life is still easy. When you look at a less-misleading part of the Hadoop Streaming documentation, all the pieces you need are right there. There are flags, `-inputformat` and `-outputformat` that let you specify Java classes as your input and output format. They have to be in the Java classpath when Hadoop runs, which is taken care of by the `-libjars` generic Hadoop option. There is no need to compile a custom streaming jar or anything crazy (I found worse suggestions on StackOverflow while figuring this out).

Using this newfound wisdom, it’s pretty trivial to add the output format to the existing streaming job. The next version of the job is going to look something like:

```hadoop jar /path/to/hadoop-streaming.jar \
-libjars /path/to/custom-formats.jar \
-input log_data/ \
-output geo_output/ \
-outputformat net.agkn.example.outputformat.DateFieldMultipleOutputFormat \
-mapper geo_mapper.py \
-reducer user_geo_count_reducer.py \
-cacheFile ip_geo_mapping.dat#geo.dat
```

### Pay Attention to the Details

If you write an output format like the one above and try to run a job, you’ll notice that some output records disappear. The overly-simple explanation is that Hadoop ends up opening the file for a specific date once for every reducer that date appears in, clobbering the data that was there before. Fortunately, it’s also easy to tell Hadoop how to send all the data from a date to the same reducer, so each file is opened exactly once. There isn’t even any Java involved.

All it takes is specifying the right partitioner class for the job on the command line. This partitioner is configured just like unix `cut`, so Data Scientists should have an easy time figuring out how to use it. To keep data from disappearing, tell the partitioner that the first field of the comma-delimited output record is the value to partition on.

With those options included, the final streaming job ends up looking like:

```hadoop jar /path/to/hadoop-streaming.jar \
-libjars /path/to/custom-formats.jar \
-D map.output.key.field.separator=, \
-D mapred.text.key.partitioner.options=-k1,1 \
-input log_data/ \
-output geo_output/ \
-outputformat net.agkn.example.outputformat.DateFieldMultipleOutputFormat \
-mapper geo_mapper.py \
-reducer user_geo_count_reducer.py \
-cacheFile ip_geo_mapping.dat#geo.dat
```

On the next run all the data should appear again, and the output directory should contain a file per day there is output. It’s not hard to take this example and get a little more complicated – it’d take minimal changes to make this job output to a file per state, for example. Not bad for a dozen lines of code and some command line flags.

## For the bit hacker in you

My colleagues and I are wrapping up version 2 of our unique counting algorithms. The last pieces are the database tools that our analysts use — some custom functions in Postgres9 to manipulate our custom types. One step in our algorithm is to use the least-significant bit of a bit array. In Java we use a lookup technique (a 256-entry lookup table that you use to look up each byte) and in C we can use the bsf assembly instruction. (Both techniques can been researched further on Sean Eron Anderson’s awesome Bit Twiddling Hacks.)

Since I’m a hacker to the core I wanted to do a little more research to see what other folks were doing. Here are some additional references:

In the last reference, there is a response that mentions the fascinating technique of using the floating-point representation to extract the exponent:

```unsigned GetLowestBitPos(unsigned value)
{
double d = value ^ (value - !!value);
return (((int*)&d)[1]>>20)-1023;
}
```

This response refers you to another post that explains a bit more the technique. In that response, Tony Lee mentions one of my favorite constants 0x5f3759df. I spent many years doing graphics and gaming programming (some of my first “real” programs in college where doing visual simulations on SGIs) and I remember the first time that I came across 0x5f3759df in the Quake source code. I’m always amazed at the bit techniques that people will come up with to solve a problem.

I’ll end this post with a link to one of my favorite articles on the internet: a history of the origins of 0x5f3759df in the Quake source. It’s a wonderful walk down memory lane.

## Batch Acknowledged Pipelines with ZeroMQ

Parallel processing with a task ventilator is a common pattern with ZeroMQ.  The basics of this pattern are outlined in the “Divide and Conquer” section of the ZeroMQ guide.  The pattern consists of the following components:

• A number of workers that do the processing work.
• A sink that collects results from the worker processes.

This pattern works wonderfully as long as your consumers can outpace your producers. If you start producing tasks faster than you can process them, then messages will start backing up in ZeroMQ socket queues.  This will drive the memory utilization of the processes up, make a clean shutdown of the distributed processing system difficult, and result in a sizeable number of messages lost in the event of a crash.  This can be avoided by using ZMQ_REP and ZMQ_REQ sockets, but in that case you lose the speed advantage of a pipeline pattern.

To maintain the speed of a pipeline pattern while allowing for some control over the number of messages in flight at any given time, we can add batch acknowledgements to the basic ventilator / worker / sink pattern.  Accomplishing this only requires a few minor changes:

• Add a pull socket to the ventilator for receiving acknowledgements
• Add a push socket to the manager for sending acknowledgements
• Add a batch size variable

So without further ado, let’s dive into some code.  I’m going to keep things simple for this example.  I’ll define a function for the ventilator, a function for the worker, and a function for the sink.  These will be started using multiprocessing.Process. For any Java programmers in the audience: Python has first class functions.  There is no requirement to wrap a function in a class.

The ventilator uses a ZMQ_PUSH socket to send tasks to listening workers, and a ZMQ_PULL socket to receive acknowledgements from the sink process.  The ventilator will send N messages (where N is the batch size) and then wait for an acknowledgement:

```import zmq
from time import time
from multiprocessing import Process

def ventilator(batch_size, test_size):

"""set up a zeromq context"""
context = zmq.Context()

"""create a push socket for sending tasks to workers"""
send_sock = context.socket(zmq.PUSH)
send_sock.bind("tcp://*:5555")

"""create a pull socket for receiving acks from the sink"""
recv_sock = context.socket(zmq.PULL)
recv_sock.bind("tcp://*:5557")

current_batch_count = 0

"""start the message loop"""
for x in range(test_size):

"""send until we reach our batch limit"""
while current_batch_count < batch_size:
current_batch_count += 1

"""reset the batch count"""
current_batch_count = 0

"""wait for an acknowledgement and block while waiting -
note this could be more sophisticated and provide
support for other message types from the sink,
but keeping it simple for this example"""
msg = recv_sock.recv()
```

The workers use a ZMQ_PULL socket to receive tasks from the ventilator, and a ZMQ_PUSH socket to send results to the sink process:

```def worker():

"""set up a zeromq context"""
context = zmq.Context()

"""create a pull socket for receiving tasks from the ventilator"""
recv_socket = context.socket(zmq.PULL)
recv_socket.connect("tcp://*:5555")

"""create a push socket for sending results to the sink"""
send_socket = context.socket(zmq.PUSH)
send_socket.connect("tcp://*:5556")

while True:
send_socket.send("result")
```

The sink process uses a ZMQ_PULL socket to receive results from the workers, and a ZMQ_PUSH socket to send batch acknowledgements to the ventilator process:

```def sink(batch_size, test_size):

"""set up a zmq context"""
context = zmq.Context()

"""create a pull socket for receiving results from the workers"""
recv_socket = context.socket(zmq.PULL)
recv_socket.bind("tcp://*:5556")

"""create a push socket for sending acknowledgements to the ventilator"""
send_socket = context.socket(zmq.PUSH)
send_socket.connect("tcp://*:5557")

result_count = 0
batch_start_time = time()
test_start_time = batch_start_time

for x in range(test_size):
"""receive a result and increment the count"""
msg = recv_socket.recv()
result_count += 1

"""acknowledge that we've completed a batch"""
if result_count == batch_size:
send_socket.send("ACK")
result_count = 0
batch_start_time = time()

duration = time() - test_start_time
tps = test_size / duration
print "messages per second: %s" % (tps)
```

The main routine for the test allows for a configurable batch size, test size, and number of workers:

```if __name__ == '__main__':
num_workers = 4
batch_size = 100
test_size = 1000000

workers = {}
ventilator = Process(target=ventilator, args=(batch_size, test_size,))
sink = Process(target=sink, args=(batch_size, test_size,))

sink.start()

for x in range(num_workers):
workers[x] = Process(target=worker, args=())
workers[x].start()

ventilator.start()
```

For the test, I sent 1,000,000 small messages, in batch sizes of 1,000,000, 1,000, 100, 50, and 10:

There are two main conclusions I draw from these results.  The first is that the pipeline pattern obtains an extremely high throughput due to the fact that it does not have to ACK messages.  The second is that if your workers are performing work that takes any amount of time, you can limit the batch size to a rather small amount of messages without impacting your overall throughput.  Even with a batch size of 10, a throughput of around 81k messages a second was obtained.  As long as the combined messages per second your workers can process is less than the raw throughput of your pipeline for a given batch size, batched acknowledgements will give you some degree of flow control without compromising your throughput.

The “batched pipeline pattern” allows tradeoffs to be made between acceptable message loss and throughput, and offers a great deal of control that a simple pipeline pattern with no batching does not offer. In my experience it is an extremely useful starting point a distributed system for processing streaming data.