## Sketch of the Day: Frugal Streaming

We are always worried about the size of our sketches. At AK we like to count stuff, and if you can count stuff with smaller sketches then you can count more stuff! We constantly have conversations about how we might be able to make our sketches, and subsequently our datastores, smaller. During our science summit, Muthu pointed us at some of the new work in Frugal Streaming. The concept of Frugal Streaming is to process your data as it comes, O(N), but severely limit the amount of memory you are using for your sketch, and by “severely” they mean using perhaps one or two pieces of information. Needless to say, we were interested!

### History

The concept of Frugal Streaming reminds me of an algorithm for finding the dominant item in a stream, MJRTY written by Boyer and Moore in 1980. (This paper has a very interesting history). The MJRTY algorithm sets out to solve the problem of finding the majority element in a stream (an element comprising more than 50% of the stream). Moore proposed to solve this by using only 2 pieces of information and a single scan of the data.

Imagine you have a stream of names (“matt”, “timon”, “matt”, “matt”, “rob”, “ben”, …) and you wanted to know if any name appeared in more than half the stream. Boyer and Moore proposed the following:

count = 0
majority = ""

for val in stream:
if count == 0:
majority = val
count = 1
elif val == majority:
count += 1
else:
count -= 1

print majority if count > 0 else "no majority!"


If you’re anything like me you will go through a few phases: “That can’t work!”, “Wait, that works?!”, “Nah, this data will break that”, “Holy sh*t that works!!”. If you think about it for a minute you realize that it HAS to work. If any element in the stream comprises more than half the stream values there is no way to get to the end and have a counter of zero. To convince yourself suppose the majority element only comprises the first half + 1 of your N-length stream. The counter would count up to $N/2+1$ and then start decrementing until all N values have been seen, which would leave the counter at $2 = (N/2+1) - (N/2-1)$*. This will hold regardless of the ordering of the elements in the stream. A simple simulation is provided by the authors. Philippe Flajolet apparently “liked this algorithm very much and called it the ‘gang war’, because in his mind, every element is a gang member, and members of opposite gangs are paired in a standoff, and shoot each other. The gang members remaining are the more numerous”**.

The astute among you will have noticed that this algorithm only works if there is, in fact, a majority element. If there is not then it will fail. A stream such as {“matt”,”matt”,”timon”,”timon”,”rob”} would result in the algorithm returning “rob”. In practice you need ways of ensuring that your stream does indeed have a majority element or have a guarantee ahead of time.

* Note, that formula is for an even length stream. For a stream of odd length the counter will indeed be at 1. Proof is left to the reader.

** Thanks to Jeremie Lumbroso for his insightful feedback on the history of MJRTY and his memory of Philippe’s explanation.

### One “bit” – Frugal-1U

In their Frugal Streaming paper, Qiang and Muthu decided to see if they could find a frugal solution to the streaming quantile problem. The streaming quantiles problem is one I enjoy quite a bit and I have used it as an interview question for potential candidates for some time. Simply stated it is: “How would you find the quantiles of a stream of numbers in O(N) with limited memory?” There are a few different approaches to solve this problem, with the most widely known probably being Count-Min Sketch. However, with Count-Min you need to keep your keys around in order to query the sketch. There are other approaches to this question as well.

Instead of focusing on quantiles generally, Qiang and Muthu’s first solution is a frugal way to find the median of a stream. As with MJRTY above, I’ll just write it down and see how you react:

median_est = 0
for val in stream:
if val > median_est:
median_est += 1
elif val < median_est:
median_est -= 1


Granted, the above is just for the median, where the stream is much longer than the value of the median, but it is so simple that I don’t think I would have ever considered this approach to the problem. The extension to quantiles is equally as shocking. If you are trying to find the 75th percentile of the data stream you do the same as above but increment up randomly 75% of the time and decrement down randomly 25% of the time:


quantile_75 = 0
for val in stream:
r = random()
if val > quantile_75 and r > 1 - 0.75:
quantile_75 += 1
elif val < quantile_75 and r > 0.75:
quantile_75 -= 1


As the paper states:

The trick to generalize median estimation to any $\frac{h}{k}$ -quantile estimation is that not every stream item seen will cause an update. If the current stream item is larger than estimation, an increment update will be triggered only with probability $\frac{h}{k}$. The rationale behind it is that if we are estimating $\frac{h}{k}$ -quantile, and if the current estimate is at stream’s true $\frac{h}{k}$ -quantile, we will expect to see stream items larger than the current estimate with probability $1-\frac{h}{k}$ .

### Finding Quantiles With Two “bits”- Frugal-2U

There are a few obvious drawbacks to the above algorithm. Since we are only incrementing by 1 each time, the time to converge is linear and our initial guess of zero could be very bad. Secondly, and by design, the algorithm has no memory, can fluctuate wildly and, as they show in the paper, the estimates can drift very far away. (Note: while it is extremely unlikely that the estimates will drift far from the correct values the paper has some very loose bounds on how bad it can drift. See Lemma 3 in the paper.) They suggest a few improvements over Frugal-1U where you essentially include a varying step (instead of just incrementing by 1 every time) and 1 “bit” so you know which way you incremented in the last update. The intuition here is that if you have been consistently incrementing up or down for the last few elements of a stream then you are probably “far” away from the quantile in question. If this is the case we can speed up convergence time by incrementing a larger amount. The Frugal-2U algorithm:

def frugal_2u(stream, m = 0, q = 0.5, f = constantly_one):
step, sign = 1, 1

for item in stream:
if item > m and random() > 1 - q:
# Increment the step size if and only if the estimate keeps moving in
# the same direction. Step size is incremented by the result of applying
# the specified step function to the previous step size.
step += f(step) if sign > 0 else -1 * f(step)
# Increment the estimate by step size if step is positive. Otherwise,
# increment the step size by one.
m += step if step > 0 else 1
# Mark that the estimate increased this step
sign = 1
# If the estimate overshot the item in the stream, pull the estimate back
# and re-adjust the step size.
if m > item:
step += (item - m)
m = item
# If the item is less than the stream, follow all of the same steps as
# above, with signs reversed.
elif item < m and random() > q:
step += f(step) if sign < 0 else -1 * f(step)
m -= step if step > 0 else 1
sign = -1
if m < item:
step += (m - item)
m = item
# Damp down the step size to avoid oscillation.
if (m - item) * sign < 0 and step > 1:
step = 1



You can play around with the 1U and 2U algorithms in the simulation below.

Click above to run the Frugal Streaming simulation

As usual, there are a few interesting tidbits as well. If you read the paper you will see that they define the updates to step as a function. This means they are allowing many different types of increments to step. For example, instead of increasing the size of step by 1 each time we could increase it by 10 or even have it increase multiplicatively. They do talk about some uses of different updates to step but there is no analysis around this (yet) and they restrict all of the work in the paper to step increments of 1. We offer a few different step update functions in the simulation and they indeed do interesting things. Further exploration is definitely needed to get some insights here.

A non-obvious thing about the step variable is how it behaves under decrements. My initial thought was that step would get large if your current estimate was far below the actual value (thus allowing you to approach it faster from below) and that step would get to be a large negative number if your current estimate was very far above the actual value. This turns out to just be flat wrong. The negative updates to step have the effect of stabilizing the estimate (notice when step is negative that the updates to your estimates are always ± 1 ). If you read the algorithm closely you will see that step decrements when you consistently alternate up and down updates. This behavior occurs when the estimate is close to the actual value which causes step to become a very large negative number. And I mean VERY large. In practice we have seen this number as small as $-10^{102}$ for some simulations.

### Monitoring

One of the first things I thought of when I saw this sketch was to use it as a monitoring tool. Specifically, perhaps it could be used to replace the monitoring we use on our application server response times. It turns out that using this sketch for monitoring introduces some very interesting issues. So far, we have mostly talked about what I will call “static streams”. These are streams that have data in them which is pulled consistently from some static underlying distribution (as in the examples above). However, what if the underlying distribution changes? For example, what if one of our servers all of the sudden starts responding with slower response times? Does this frugal sketch enable you to quickly figure out that something has broken and fire off an alarm with high confidence? Well, in some ways this is an identical problem to cold start: how long does it take for an initial guess to reach a stable quantile? Unfortunately, there is no easy way to know when you have reached “equilibrium” and this makes identifying when an underlying distribution change has occurred difficult as well. The paper ends with an open challenge:

… as our experiments and insights indicate, frugal streaming algorithms work with so little memory of the past that they are adaptable to changes in the stream characteristics. It will be of great interest to understand this phenomenon better.

The paper shows some interesting experiments with changing data, but I agree with Qiang: we need to understand these algorithms better before I swap out all of our server monitoring tools (and our ops team would kill me). However, since these are so simple to implement and so small, we can easily deploy a few tests and see how the results compare “in the field” (you can play around with this by changing the underlying stream distribution in our simulation above.)

### Conclusion

The frugal quantile algorithms proposed in the paper are fascinating. It is a very interesting turn in the sketching literature and Qiang and Muthu’s creativity really comes across. I am very interested in getting some real world uses out of this sketch and am excited to see what other applications we (and Qiang!) can think of.  Many thanks to MuthuQiang Ma and Jeremie Lumbroso for all their help!

### Appendix

• Variability: While the bounds on the accuracy of these algorithms seem wide to me, clearly in real world experiments we see much better performance than the guaranteed bounds in the paper. In fact, the error bounds in the paper are dependent on the size of the stream and not fixed.
• Size of step: A potential gotcha is the size of the step variable. Depending on your update function it indeed seems possible for this value to get below -MAXINT. Clearly a real implementation would need some error checking.
• Cold Start: One more gotcha is that you have no real way of knowing when you are near the quantile in question. For instance, starting your estimate at zero and measuring a true median which is 100,000,000,000 will take a long time to “catch up” to this value. There are a few ways to limit this, some of which are discussed in the paper. One way is to try and make a sane guess up front (especially if you know something about the distribution) and another is to start your guess with the value from the last counter. For instance, suppose you are in a monitoring situation and you are computing means over the course of a day. It would make sense to start tomorrow’s estimate with yesterdays final value.
• Accuracy:  And, lastly, there is some interesting dependence on “atomicity”. Meaning, the estimates in some sense depend on how “large” the actual values are. In both, your minimum step size is 1. If my median in the stream is, say, 6 then this “atomic” update of 1 causes high relative fluctuation. It seems in real world examples you would like to balance the size of the thing you are estimating with the speed of approach of the algorithm. This could lead you to estimate latencies in microseconds rather than milliseconds for instance. All of this points to the fact that there are a bunch of real world engineering questions that need to be answered and thought about before you actually go and implement a million frugal sketches throughout your organization.

## Slides and Videos are up for AK Data Science Summit

We’re happy to announce that all the talks (slides and video) from our Data Science Summit are available for free, right here on the blog! You can find a full list of the conference’s content here.

Enjoy!

## Data Science Summit – Update

I don’t think I’m going out on a limb saying that our conference last week was a success. Thanks to everyone who attended and thanks again to all the speakers. Muthu actually beat us to it and wrote up a nice summary. Very kind words from a great guy. For those of you that couldn’t make it (or those that want to relive the fun) we posted the videos and slides. Thanks again to everyone for making it such a success!

Muthu being Muthu during David Woodruff’s fantastic talk on streaming linear algebra

## Foundation Capital and Aggregate Knowledge Sponsor Streaming/Sketching Conference

We, along with our friends at Foundation Capital, are pleased to announce a 1 day mini-conference on streaming and sketching algorithms in Big Data.  We have gathered an amazing group of speakers from academia and industry to give talks.  If you are a reader of this blog we would love to have you come!  The conference will be on 6/20 (Thursday) from 10 AM to 5:30 PM at the 111 Minna Gallery in San Francisco and attendance is limited. Breakfast and lunch included!

There will also be a happy hour afterwards if you cannot make the conference or just want a beer.

The speaker list includes:

##### Muthu Muthukrishnan

The Count-Min Sketch, 10 Years Later

The Count-Min Sketch is a data structure for indexing data streams in very small space. In a decade since its introduction, it has found many uses in theory and practice, with data streaming systems and beyond. This talk will survey the developments.

Muthu Muthukrishnan is a Professor at Rutgers University and a Research Scientist at Microsoft, India. His research interest is in development of data stream theory and systems, as well as online advertising systems.

##### David P. Woodruff

Sketching as a Tool for Numerical Linear Algebra

The talk will focus on how sketching techniques from the data stream literature can be used to speed up well-studied algorithms for problems occurring in numerical linear algebra, such as least squares regression and approximate singular value decomposition. It will also discuss how they can be used to achieve very efficient algorithms for variants of these problems, such as robust regression.

David Woodruff joined the algorithms and complexity group at IBM Almaden in 2007 after completing his Ph.D. at MIT in theoretical computer science. His interests are in compressed sensing, communication, numerical linear algebra, sketching, and streaming.

##### Sam Ritchie

Summingbird is a platform for streaming map/reduce used at Twitter to build aggregations in real-time or on hadoop. When the programmer describes her job, that job can be run without change on Storm or Hadoop. Additionally, summingbird can manage merging realtime/online computations with offline batches so that small errors in real-time do not accumulate. Put another way, summingbird gives eventual consistency in a manner that is easy for the programmer to reason about.

Sam Ritchie works on data analysis and infrastructure problems in Twitter’s Revenue engineering team. He is co-author of a number of open-source Scala and Clojure libraries, including Bijection, Algebird, Cascalog 2 and ElephantDB. He holds a bachelor’s degree in mechanical and aerospace engineering.

##### Alexandr Andoni

Similarity Search Algorithms

Nearest Neighbor Search is an ubiquitous problem in analyzing massive datasets: its goal is to process a set of objects (such as images), so that later, one can find the object most similar to a given query object. I will survey the state-of-the-art for this problem, starting from the (Kanellakis-award winning) technique of Locality Sensitive Hashing, to its more modern relatives, and touch upon connection to the theory of sketching.

Alexandr Andoni is a researcher in the Microsoft Research at Silicon Valley since 2010, after finishing his PhD in MIT’s theory group and year-long postdoctoral position at Princeton University. His research interests revolve around algorithms for massive datasets, including similarity search and streaming/sublinear algorithms, as well as theoretical machine learning.

There will be a panel discussion on the topic of harboring research in startups. Speakers include:
Pete Skomoroch from the LinkedIn data science team.
Rob Grzywinski of Aggregate Knowledge.
Joseph Turian of Metaoptimize

##### Lightning talks
• Armon Dadgar (Kiip) - Sketching Data Structures at Kiip
• Blake Mizerany (Heroku) - An Engineer Reads a Data Sketch
• Timon Karnezos (AK) - TBD
• Jérémie Lumbroso (INRIA) - Philippe Flajolet’s Contribution to Streaming Algorithms

Update! We posted the videos and slides for those of you that couldn’t make it (or those that want to relive the fun). Enjoy!

## AWS Redshift: How Amazon Changed The Game

Edit: Thank you to Curt Monash who points out that Netezza is available for as little as $20k/TB/year with hardware (and 2.25x compression) and that there is an inconsistency in my early price estimates and the fraction I quote in my conclusion. I’ve incorporated his observations into my corrections below. I’ve also changed a sentence in the conclusion to make the point that the$5k/TB/year TCO number is the effective TCO given that a Redshift cluster that can perform these queries at the desired speed has far more storage than is needed to just hold the tables for the workloads I tested.

Author’s Note: I’ll preface this post with a warning: some of the content will be inflammatory if you go into it with the mindset that I’m trying to sell you on an alternative to Hadoop. I’m not. I’m here to talk about how an MPP system blew us away, as jaded as we are, and how it is a sign of things to come. There’s enormous, ripe-for-the-picking business value in Redshift and you’d be remiss to ignore it because “we’re a Hadoop shop” or “commercial MPP is too expensive”.

Flash back a few years: We’d struggled mightily with commercial data warehouses that simply couldn’t load the data volumes we wanted to query (at that time this was significantly less than a billion rows) in any decent amount of time, let alone query them semi-interactively. Anecdotally, we’d hear that vendor X had solved our problem or vendor Y had a completely different model that would “completely blow us out of the water“, and so on, but either the prices were astronomical ($100k/TB/year +) or the performance didn’t pan out. In our minds, MPP solutions were a non-starter. To oversimplify the design we chose for our infrastructure: we built the Summarizer and threw in some Postgres for our product reporting needs, mixed in some Elastic Map-Reduce for the “tough” jobs, and went about our business. We’re very happy with the Summarizer because it solves a particular business need very inexpensively. (We do all of our aggregation on one box and let a couple of Postgres boxes handle the querying.) However, our EMR bill was becoming pretty large, and for what? A handful of reports that weren’t (yet) feasible to do in a streaming setting like the Summarizer. That’s it. Did a handful of reports really merit the enormous cost and capability of Hadoop? Our gut always told us no, but who are we to argue with a working solution? We had other fish to fry. Fast-forward to (nearly) the present day: the business has grown, and we have ourselves a medium-to-large data (1-200TB) query problem and some cash to solve it. That old ache in our gut told us it was time again to poke our heads up and see if Spring had come. A bit of catching up on research revealed the tradeoff to be the same as ever, either: • sign a fat check for Greenplum/Teradata/Aster/Vertica/ParAccel/Kognitio/Infobright/Oracle/IBM/SAP/Microsoft etc… and pay per terabyte per year, or • hop onto Hadoop and ride the open-source rodeo with Hive or Pig by your side. (Sure, sure there are other open-source options out there that aren’t Hadoop, but for example Impala 1.0 just came out and Drill’s nowhere near ready for production.) Our first question was: “How much are we willing to spend?” Running a Hadoop cluster in-house or on EMR isn’t cheap, and avoiding some of its hassles is worth something too. The answer varied depending on the vendor, but word on the street was$25k/TB/year $20k/TB/year to$600k/TB/year. We were still stuck between a rock and a hard place: Hadoop was cheaper than the data warehouses, but so much more tool than we needed.

Present day: we hear about AWS’s Redshift offering. It’s basically ParAccel, a columnar, compressed data warehouse that a bunch of AWS engineers “cloud-ified”. I won’t lie, our first reaction was incredulity and dismissive chuckles followed by “Sure, ‘petabyte-scale’. Bet we can break this one with a few days of data.” At their claimed $1k/TB/year TCO, it seemed impossible that they could offer a product that worked. To prove our point, I fired up a few instances, and found out that we were flat out wrong. ### Objective I’m going to try to convince you that Redshift is worth a look by showing you the results of a handful of non-trivial tests I ran, not much more. What’s more is that I’m not going to whitewash any of the issues I ran into. I’m going to tell you about some pretty ugly lumps on this sucker, but by the time I’m done I don’t think it will matter. I think you’ll come to the same conclusion we did: for the price, Redshift is an exceptionally powerful tool that almost any medium data business can use to solve all of their exploration problems for a reasonable price. I’m here to tell the story of how Amazon and MPP caught up in a big way, and how Redshift is the real deal. ### Background I’ll give you the standard caveats: I ran tests on our data, which has a certain shape, a certain size, and a certain density/sparsity with respect to our reporting keys. Yours may be different. My methods may not work well on your data. In all likelihood, however, if your data is human-generated advertising behavioral data or something like it, Redshift is going to work pretty well with it. ### Ground rules for using Redshift • Your data must be flat, in a CSV/TSV/*SV format. No nested structures. • Your data must be loaded from S3 or DynamoDB. • Your data should have a natural group of columns to sort by (that suits the queries you want to run). Your data should also have a column by which it can be evenly distributed across the cluster, when consistently-hashed. • Result sets come out via JDBC/ODBC (don’t do this for anything bigger than a few thousand rows) or are shipped out to S3. ### Overview of my tests 6 cluster configurations. 1x, 4x, 16x, 32x dw.hs1.xlarge, and 2x, 4x dw.hs1.8xlarge. (For brevity’s sake, I’m going to refer to the node types as ‘xlarge’ and ’8xlarge’ from here on.) 4 data sets. 2B rows, 10B rows, 44B rows, 57B rows in the main fact table. As gzip’d CSV they ranged from 80GB to 2.4TB. Not all data sets were run on all clusters. Anything smaller than 16x xlarge/2x 8xlarge was simply too slow for the larger (read: more realistic) data sets. I tested them to the point where I could safely rule them out for our workloads and then moved up to the next cluster. Data sets tested by cluster 2B 10B 44B 57B 1x dw1.hs1.xlarge X 4x dw1.hs1.xlarge X X 16x dw1.hs1.xlarge X X X X 32x dw1.hs1.xlarge X 2x dw1.hs1.8xlarge X X X X 4x dw1.hs1.8xlarge X Standard data warehouse table setup. About a dozen numeric columns per table. The largest table (impressions) is 100x larger than the next biggest table (clicks), and 5000x larger than the smallest table (conversions). All tables sorted by date and by user id (SORTKEY), distributed by user id (DISTKEY). Two types of queries. (pardon the pseudo-SQL, see Appendix for full queries) 1. SELECT keys, aggregates FROM all_tables_unioned WHERE date_predicate GROUP BY keys 2. WITH windowed_query_over_user_sessions, SELECT keys, session_statistic_value, COUNT(*) FROM windowed_query_over_user_sessions WHERE date_predicate GROUP BY keys That is, (1) a basic aggregate report with some hard stuff like COUNT(DISTINCT), and (2) a windowed walk over user sessions that computes a histogram of a certain statistic about certain ads. Think a histogram for each ad of how many times it shows up as the last thing (impression/click/conversion) in a session, the second-to-last-thing in a session, and so on. The catch is we’re not specifying a particular campaign or advertiser for which we’re running these reports. We’re computing everyone’s reports all at once, just to push the envelope a bit. ### (Unreasonable?) Expectations As I mentioned earlier, we walked into this with no small amount of incredulity. We’ve spent a lot of engineering effort keeping ahead of the billions of events we see each day, and as a result our expectations have grown. I mean, if a handful of loons like us can keep up, Amazon’s insane resources should be miles ahead of us, right? So, I’m going to say some unreasonable things in this section and they should be taken with a grain of salt. For example, when I say I should be able to add nodes to my cluster “quickly”, I mean that I should be able to so within the limitations of copying data across a network, I shouldn’t have to babysit the data transfer, and I shouldn’t have to provision the boxes myself or reconfigure anything. I should be able to click a button, and so on. • Per dollar, I want linear scaling. Period. Load, query, etc… If I’m paying more, it should be faster! • I want my dollar to be just as well-spent on a xlarge cluster as a 8xlarge cluster. That means $N$ 8xlarge should be “worth” $8N$ xlarge instances. (I test $N=2,4$ here.) • I want to load the marginal “day”, about 2B rows for our purposes here, in an hour and I should be able to rerun my queries with linear scaling in the new number of rows. • I want to be able to painlessly and quickly add nodes to my cluster. • I want linear degradation in query times for parallel queries. (Two queries at once will run in twice their normal time, and so on.) • I want to be able to shut down and take a snapshot quickly. I want to be able to start a cluster from a snapshot quickly. (Because if I can, I’d like to turn this thing off when I’m not using it, so I don’t have to pay for it.) ### Starting a cluster and loading data into it Starting a cluster, regardless of what size and type, took between 3 and 20 minutes. No rhyme or reason. Once the cluster was up, I would issue queries via psql from a t1.micro in the same zone and security group in order to avoid a firewall timeout issue which will kill long-lasting connections from outside of EC2. The result sets were gzip’d and sent to S3 directly from the cluster. Each data set had its own S3 bucket and was distributed over hundreds of gzip’d files in that bucket. I started by trying to load the whole bucket at once with something like COPY impression FROM "s3://test-data/impressions/" WITH CREDENTIALS ... GZIP DELIMITER ',' MAXERROR AS 100000; This worked for the smallest data set, but as soon as I moved to the larger ones I would see errors like: ERROR: S3ServiceException:speed limit exceeded,StatusCode 500,ErrorCode N/A,RequestId N/A,ExtendedRequestId N/A,CanRetryException 1 DETAIL: ----------------------------------------------- error: S3ServiceException:speed limit exceeded,StatusCode 500,ErrorCode N/A,RequestId N/A,ExtendedRequestId N/A,CanRetryException 1 code: 9001 context: S3 key being read : impressions/201.csv.gz query: 1446 location: table_s3_scanner.cpp:355 process: query1_s9_2 [pid=5471] ----------------------------------------------- and the whole load would fail. (A failed load leaves the database in the prior, intact state, so you don’t have to worry about being in a halfway-loaded state.) I started splitting the loads into smaller chunks of about 100 files, but sometimes those would fail too. I ended up splitting them into groups of 10-20 files (3GB-60GB total) and those would succeed very reliably. The tradeoff is that there is an overhead to starting up and winding down a new COPY command, so the fewer COPYs, the better for overall load speed. This slideshow requires JavaScript. As you can see, there are dips in CPU utilization/network throughput between COPYs that isn’t present on loads that grouped more files together. The dips come from an initial analysis phase before the download and a final sort phase after download. I saw a 40% decrease in load speed when I went from 200 files-at-a-time to 20 files at a time for the same data set. Once I had settled on a sane way of chunking the loads, the overall COPY speed scaled linearly with cost and with data size, measured over hundreds of chunked loads. I was also pleasantly surprised to find that there was very, very little variability within the load times, when controlling for chunk size and file count. Each xlarge node could load about 3.17MB/sec of compressed S3 data or 78k rows/sec and each 8xlarge node could load about 23.8MB/sec or 584k rows/sec, meaning one of the latter was worth about 7.5 of the former while costing 8 times as much. Roughly speaking, every extra dollar spent bought a marginal 3.5-3.7MB/sec in load speed. In row throughput, each marginal dollar gave us an additional 80-100k rows/sec. The plots below are averages of the chunked load times grouped by data set and cluster. (The costs are all in terms of the on-demand, un-reserved instance costs.) There is an outlier in the load time for the 2B row data set on the 16-node xlarge cluster (lavender circle). I believe that this was caused by the relatively high node count compared to the relatively small file count and data size. Informally, I saw this regression on both 16- and 32-node clusters for data sets with fewer than 200 files and less than 100GB of data. Overall, you can count on equally-priced xlarge clusters outperforming their 8xlarge peers by 5-20% in COPY times as long as the number of files and total sizes are large enough. (That said, there is a 32-node limit on xlarge clusters, so after a certain point your hand will be forced and you’ll have to move to the larger nodes.) After loading the data, it is a best practice to VACUUM the tables to ensure that the rows are all in sorted order. Each COPY will sort the rows in that load, but will not merge them into the existing sorted rows. Not VACUUM’ing will force your queries to execute over the multiple sorted sections (which I’ll call partitions), which can significantly diminish performance on queries that depend heavily on the sorted order of the data. For queries that didn’t take advantage of the sorted-order of the data, the difference between the VACUUM’d data and the load-partitioned data was minimal until I hit dozens of partitions. For queries that did rely on the sorted-order for their execution, I saw a night-and-day effect: either a minor (1-5%) degradation in query time or the query wouldn’t finish for tens of hours. In general, I’d just pay the cost of VACUUMing to avoid the edge cases where the queries would implode. The plot below shows the VACUUM rate per dollar/hour in both types of clusters, for on-demand, non-reserved instances. (I know the units obfuscate the real performance, but it’s just to normalize clusters within their node type and across node types.) Per dollar, the primary driver of the VACUUM rate was how many rows were already present in the table before COPY (assuming they were already VACUUM’d themselves.) xlarge clusters performed 10-25% better on VACUUMs than their 8xlarge counterparts, on similar workloads. Is it feasible to load 2B more rows in about an hour with any of the clusters? Not the way I have my data set up, it isn’t. The COPY part is easy. It would take less than half an hour to COPY onto the 2x 8xlarge and 16x xlarge clusters, but the VACUUM wouldn’t be nearly so quick. With over 50B rows already in the table, both node types struggle to VACUUM more than 200k row/sec, which means at least 3 hours of VACUUM time. I suspect if I had manually split my tables into smaller chunks (say 10B-20B rows) like the docs told me to, I would have been able to COPY and VACUUM the 2B extra rows in just about an hour and a half. Aside: let’s just do the math quickly from the plot above. Say we’re talking about a 8xlarge cluster with two machines. Say I have about 15B rows in my table and I’m adding 2B more. I’ll eyeball it and say that I could do 45k rows/sec/$. That cluster costs  $13.60/hour. That means I should see a VACUUM rate of about 612k rows/sec. That means it should take about 55 minutes. The plots earlier showed that the same cluster could load at a rate of 1.1M rows/sec. That means 2B rows get COPY’d in 30 minutes. Like I said, about an hour and a half. For loading, I saw linear scaling per dollar and observed just a bit worse than the 8:1 price ratio between the small and big nodes. I could also load my marginal day in about the time I wanted. Not bad for a start. Note: I know I haven’t mentioned anything here about column compression, which is a big selling point of Redshift. My objective in this testing was to see how Redshift performed “out-of-the-box”, which meant leaving it to its own devices for compression. It seems like it’s own devices are quite good: the compression types it selected for the columns were exactly what I would have chosen, knowing the exact column statistics. Furthermore, the reported storage used by Redshift was only 1.6 times greater than the original GZIP’d files for all three data sets, which meant that about 115B rows fit into a little under 8TB of cluster storage. A 2-node 8xlarge/16-node xlarge cluster comes with 32 TB of cluster storage, meaning I could run a 24/7 cluster with about 450B rows in it for$70k/year with a 1-year reserve purchase. For a more tactical perspective, 450B/year equates to an average of 14k events/sec ingress for the whole year straight. Just buying 16 machines that look like xlarge instances would probably cost around $50k. If you factor in amortization over three years, we’re looking at a (3 x$70k) – $50k =$160k difference to cover any other operating expenses such as ops salaries, MPP licenses, and facility costs. A ParAccel license for $1k/TB/year alone would cost about$100k for those three years, and that would be a damn cheap license. (For a 30TB cluster over three years, a $2500/TB/year license alone would be more than the all-in AWS cost!) ### Querying the data As I mentioned above, I tested two classes of queries: 1. a simple aggregation that is only restricted by a date predicate, and 2. a windowed walk over user sessions, computing and aggregating statistics for those sessions. The first query class is the bread and butter of our reports, but was actually a very challenging query in the table setup described above because the table is only sorted by one of the six GROUP BY columns (date), which means that either a giant hash map of all the keys (of which there were 10s of millions) needed to be populated for each day, or the whole data set had to be resorted according to those keys. The balance I tried to reach when defining the schema was to make the “impossible” queries (#2) easy even if I had to sacrifice performance on the simpler queries. In a production situation, I’d probably have two schemas side-by-side, one sorted for the first class of queries, and another for the second given how much storage Redshift nodes come with. Though this query does have a date predicate, the plot below is the average execution time for multiple runs with a predicate that includes every row in every table. As you can see, the identically-priced xlarge cluster is about three times faster for the two largest data sets, and finishes the query on the 57B row data set in about three hours. The xlarge cluster query times degrade linearly with the number of rows in the data set, right up until the last data set where there’s a clear super-linear jump. The 8xlarge cluster, from the start, seems to degrade almost-quadratically. The poor performance of the 8xlarge nodes was very surprising. I would have expected that the cost of shuffling/merging results across 16 nodes would hurt more than having 16 separate nodes helped. (Recall that the DISTKEY was in no way connected to the reporting keys of this query.) Just as in the COPY section, the distribution of work over nodes seems to be more efficient than the distribution of work over cores, as shown by the (more) graceful degradation of performance of the xlarge cluster. (This will be a recurring trend.) The second class of queries walked over users’ sessions and produced histograms of how many times each ad appeared in various positional offsets and date offsets from the end of a user’s session. I also produced a version of the query than ran only on user sessions that contained at least one conversion. (In this data set, approximately 1 in 250 users are converters.) As you can see, clusters composed of many small nodes bested their large-node counterparts by about 20% on all but the population date histogram query, where the small nodes were five times faster. The more-selective versions of the two queries were at least an order of magnitude faster than their ‘Population’ counterparts on the same clusters. Of course, it’s nowhere near a one-to-one speedup with the selectivity of the converters predicate, but going from 90 minutes to 7.5 minutes is impressive given that there’s a non-trivial join from a 57B row table to a 20M row table happening in the “Converters” queries. The query times clearly degrade linearly in the number of rows in the data set, perhaps with the exception of the date histogram for the population. Similarly, query times halved (approximately) when I doubled the number of nodes in the cluster, in both clusters, for the battery of queries over the largest data set. (Apologies ahead of time for not synchronizing the colors of the queries between the plot above and the one below. It’s just a real pain.) Note the above graph has log-scaled query time to demonstrate that both types of clusters see linear or better-than-linear per-dollar scaling on all of the queries I performed. Note, also, that certain queries improve more per dollar over the transition in the 8xlarge cluster. (Unfortunately, I couldn’t explore this comparison any further because of Amazon’s 32-node limitation for dw.h1.xlarge clusters.) So I won’t say that I saw linear scaling across rows and nodes, but I will say this: it was pretty close to it. It’s clear that my 16-node xlarge cluster’s linear-ish performance was starting to fall apart on queries that had to touch more than 50B rows (the little uptick seen between the last two data points.) But let’s just back that up for a second. The cluster assembled and walked 4B sessions and computed the histograms for approximately 100,000 distinct ads in just under two hours. When restricting to converters, this happened in just over three minutes. I thought I had written the wrong query the first time the query returned that quickly, so I went back and wrote even more thorough unit tests for it. After convincing myself I had the right query, I restarted the cluster and reran the query to find it had once again run in the same amount of time, to the second. And then I did it again, and it was off by a few seconds. And again, and again, and again. I saw this repeated in all of my queries: the standard error on query time hovered around 1% for the most variable queries. To get back to my point about business value, I caused a small riot among the analysts when I mentioned off-hand how quickly I could run these queries on substantial data sets. On a service that I launched and loaded overnight with about three days of prior fiddling/self-training. Sure, putting this into production is a whole other beast, but I contend it’s no worse than the alternative of trying to keep a Hadoop cluster running or trying to reliably process jobs. And I don’t even have to teach any of the analysts a particular flavor of SQL. They get to reuse the Postgres-flavored stuff that they know and love. I’ll quickly throw down some caveats/lessons-learned on querying: • Running any two of those queries at the same time doubled the nominal execution time of each individually. Three at a time tripled it plus change. At four, however, the cluster crawled to a halt and took about 10x the time. At five concurrent queries, the cluster went into an unhealthy state and rebooted, losing all history of the concurrent queries. The takeaway is that you really need to implement work-load management (WLM) if you’re going to run concurrent queries on the cluster. • Learn to cancel queries before you run anything serious, and be sure to EXPLAIN and analyze them, as you can often avoid costly writes to disk by simply adding redundant predicates and inlining WITH clauses. ### Snapshotting a cluster Snapshotting is a relatively painless process through the UI. It’s as simple as clicking a button and obeying some unreasonably-strict naming policies. I turned off the auto-snapshotting and managed mine manually because I found the periodic dips in performance caused by the auto-snapshotting very annoying. Backing up my 4-5TB cluster reliably took 2-3 hours, which means I was getting about about 400MB/sec to S3 from my 2-node 8xlarge/16-node xlarge clusters which isn’t out of line with what I’ve heard from our internal teams that push big result sets from EMR back to S3. Good enough in my book. It’s important to note that booting a cluster from a snapshot always yields the same node type and count, meaning you can’t up- or down-grade a cluster from a snapshot. Once you have a snapshot, it’s actually very fast to boot a cluster from it. In my experience it took about 20 to 30 minutes which just seemed insane until I realized that the cluster seems to become available well before the “work” of loading the snapshot is complete. The dashboard shows a whole lot of CPU, network, and disk usage all before I submitted my first query. Make no mistake, this is no phantom: those are real resources being used. I saw queries run significantly more slowly during this post-snapshot-load period, as well as a much less responsive web dashboard. (Oh, did I mention that? The web dashboard is querying directly to your cluster, so if your cluster is wedged on something, it’s almost impossible to debug it. Makes WLM all the more important.) This work usually took between 45 minutes to an hour to clear up, after which everything seemed to chug along just fine. I’d say it took a bit over an hour to get a cluster from snapshot to full-speed. ### Resizing a cluster In Redshift, resizing a cluster is basically an automated process of launching a new cluster, starting the data transfer from old to new in the background while answering new and existing queries, then seamlessly finishing the running queries and pointing all new ones at the new cluster. Sure there is some performance degradation of queries run during this period, but no more than I saw when running two concurrent queries. In fact, my experience resizing a cluster was nothing short of remarkable. (My tests were fairly limited, since running one test meant launching a cluster from a snapshot, waiting for the work to finish, launching a resize and then shutting down the cluster so I didn’t waste my testing credit. This whole process could take several hours start to finish, so I won’t pretend that I have anything more than anecdotal data here.) I saw an effective rate of about 175MB/sec transfer when upgrading from 2-node 8xlarge/16-node xlarge to 4-node 8xlarge/32-node xlarge clusters, meaning I looked up the amount of space Redshift reported using on disk and then divided that by the time between hitting the resize button and being able to run my first query on the new cluster. For my data sets, that meant 6-7 hours of degraded query performance (but not downtime!) followed by a seamless transition. Even query endpoints were seamlessly switched to the new cluster without my client even noticing! Not a bad deal, especially since you don’t even have to worry about the old cluster–it shuts down and gets cleaned up in the background. ### Recap So let’s review my crazy expectations and see how Redshift did. • Per dollar, I want linear scaling. Within reasonable bounds (like an order-of-magnitude row count range), I found that to be true for loading, querying, snapshotting, and resizing. • I want my dollar to be just as well-spent on a xlarge cluster as a 8xlarge cluster. For everything but querying, the 8xlarge nodes were about 8x an xlarge node. (But if you can get away with it, stick with xlarge clusters!) • I want to load the marginal “day” in an hour and query it. Looks like an hour and a half is the real number. However, querying scaled linearly and without complaint once the COPY and VACUUM completed. • I want to be able to painlessly and quickly add nodes to my cluster. A few clicks in the UI, wait a few hours, no downtime. Feels painless and quick to me! • I want linear degradation in query times for parallel queries. Mostly true, until you crush the thing. Break it a few times to test the limits on your queries, then set up WLM and don’t worry about it. If you’re using this mostly for periodic reporting, then you probably won’t even notice. • I want to be able to snapshot/boot quickly and easily. I honestly don’t know what kind of magic is going on that lets me boot a snapshot in an hour and then let me query 3TB of data, but I’ll take it, degraded performance and all. Redshift knocked it out of the park, in my opinion. (And this is coming from AK, the “do-everything-streaming” peanut-gallery. If we’re convinced, that’s gotta count for something, right?) In fact, the appeal of Redshift is much like the appeal of the Summarizer to me: it takes a non-trivial business problem (executing expressive SQL over medium data) and completely undercuts the mainstream solutions’ price points by trading away generality for ease-of-use and performance. And sure, it has some flaws: COPY failures, load-induced cluster crashes, dashboard slowness, and so on. But I don’t care. I found workarounds for all of them over the course of a few days. Moreover, my communication with the Redshift team has been nothing but constructive, and given what I heard the beta days were like, they’re making real progress. I have no doubt that the service is only going to get better and cheaper. Oddly enough, Redshift isn’t going to sell because devs think it’s super-duper-whizz-bang. It’s going to sell because it took a problem and an industry famous for it’s opaque pricing, high TCO, and unreliable results and completely turned it on its head. MPP was never a bad idea. Selling that way was. Yeah, the effective TCO is closer to$5k/TB/year than it is to their stated \$1k/TB/year, but the pricing scheme is transparent and it’s half a quarter the price of the other MPP alternatives.

Now, rightfully, you could argue that it’s not Teradata or Vertica that it’s competing with anymore, but rather Hadoop. And clearly, it can’t do everything Hadoop can, but I contend that it doesn’t need to. All it needs to do is convince people that they don’t need Hive, and that isn’t a hard sell for a ton of businesses out there. Having vanilla SQL along with a familiar data model, and the added speed of a system built for these types of queries is probably justification enough to pick this over Hadoop+Hive if BI is what you’re after. The hosted/as-a-service aspect is just frosting on the cake at that point.

### Appendix: Queries

#### Query 1: Simple Aggregate

This query simulates the “wide” aggregates that the Summarizer generates. Specifically, it does multiple simple grouping aggregates and coalesces them with an outer join. It represents the simplest type of reporting we do.

WITH imps AS (
SELECT
campaign_id,
tracking_campaign_id,
inventory_placement_id,
audience_definition_id,
creative_group_id,
creative_id,

SUM(1)                      AS raw_impression,
COUNT(DISTINCT(ak_user_id)) AS uu_impression,
SUM(media_cost)             AS media_cost,
SUM(data_cost)              AS aggregate_attribute_data_cost

FROM
impression
GROUP BY
1,2,3,4,5,6
),

clicks AS (
SELECT
campaign_id,
tracking_campaign_id,
inventory_placement_id,
audience_definition_id,
creative_group_id,
creative_id,

SUM(1)                      AS raw_click,
COUNT(DISTINCT(ak_user_id)) AS uu_click,
0::float8                   AS click_revenue

FROM
click
GROUP BY
1,2,3,4,5,6
),

a_conv_click AS (
SELECT
campaign_id,
tracking_campaign_id,
inventory_placement_id,
audience_definition_id,
creative_group_id,
creative_id,

SUM(1)                      AS raw_click_attributed_conversion,
COUNT(DISTINCT(ak_user_id)) AS uu_click_attributed_conversion,
SUM(attributed_revenue)     AS click_attributed_conversion_revenue
FROM
attributedconversion
WHERE type_id = 1
GROUP BY
1,2,3,4,5,6
),

a_conv_imp AS (
SELECT
campaign_id,
tracking_campaign_id,
inventory_placement_id,
audience_definition_id,
creative_group_id,
creative_id,

SUM(1)                      AS raw_impression_attributed_conversion,
COUNT(DISTINCT(ak_user_id)) AS uu_impression_attributed_conversion,
SUM(attributed_revenue)     AS impression_attributed_conversion_revenue
FROM
attributedconversion
WHERE type_id = 2
GROUP BY
1,2,3,4,5,6
)

SELECT
COALESCE(imps.campaign_id           ,  clicks.campaign_id           , a_conv_click.campaign_id           , a_conv_imp.campaign_id           ) AS campaign_id           ,
COALESCE(imps.tracking_campaign_id  ,  clicks.tracking_campaign_id  , a_conv_click.tracking_campaign_id  , a_conv_imp.tracking_campaign_id  ) AS tracking_campaign_id  ,
COALESCE(imps.inventory_placement_id,  clicks.inventory_placement_id, a_conv_click.inventory_placement_id, a_conv_imp.inventory_placement_id) AS inventory_placement_id,
COALESCE(imps.audience_definition_id,  clicks.audience_definition_id, a_conv_click.audience_definition_id, a_conv_imp.audience_definition_id) AS audience_definition_id,
COALESCE(imps.creative_group_id     ,  clicks.creative_group_id     , a_conv_click.creative_group_id     , a_conv_imp.creative_group_id     ) AS creative_group_id     ,
COALESCE(imps.creative_id           ,  clicks.creative_id           , a_conv_click.creative_id           , a_conv_imp.creative_id           ) AS creative_id           ,
imps.raw_impression,
imps.uu_impression,
imps.media_cost,
imps.aggregate_attribute_data_cost,
clicks.raw_click,
clicks.uu_click,
clicks.click_revenue,
a_conv_click.raw_click_attributed_conversion,
a_conv_click.uu_click_attributed_conversion,
a_conv_click.click_attributed_conversion_revenue,
a_conv_imp.raw_impression_attributed_conversion,
a_conv_imp.uu_impression_attributed_conversion,
a_conv_imp.impression_attributed_conversion_revenue
FROM
imps
FULL OUTER JOIN clicks ON
imps.campaign_id            = clicks.campaign_id            AND
imps.tracking_campaign_id   = clicks.tracking_campaign_id   AND
imps.inventory_placement_id = clicks.inventory_placement_id AND
imps.audience_definition_id = clicks.audience_definition_id AND
imps.creative_group_id      = clicks.creative_group_id      AND
imps.creative_id            = clicks.creative_id
FULL OUTER JOIN a_conv_click ON
imps.campaign_id            = a_conv_click.campaign_id            AND
imps.tracking_campaign_id   = a_conv_click.tracking_campaign_id   AND
imps.inventory_placement_id = a_conv_click.inventory_placement_id AND
imps.audience_definition_id = a_conv_click.audience_definition_id AND
imps.creative_group_id      = a_conv_click.creative_group_id      AND
imps.creative_id            = a_conv_click.creative_id
FULL OUTER JOIN a_conv_imp ON
imps.campaign_id            = a_conv_imp.campaign_id            AND
imps.tracking_campaign_id   = a_conv_imp.tracking_campaign_id   AND
imps.inventory_placement_id = a_conv_imp.inventory_placement_id AND
imps.audience_definition_id = a_conv_imp.audience_definition_id AND
imps.creative_group_id      = a_conv_imp.creative_group_id      AND
imps.creative_id            = a_conv_imp.creative_id
;

#### Query 2: Session-walk Position Histogram (Population)

This query is the simplest demonstration of the expressivity of the SQL you can run on Redshift. The WITH and OVER clauses provide an encapsulated, session-based position (DENSE_RANK) for each user’s impressions, which are aggregated by reporting key and position. This allows us to see if particular advertisements tend to appear earlier or later in users’ sessions. (You may recognize queries like this from market basket analysis.)

    WITH annotated_chains AS (
SELECT campaign_id,
tracking_campaign_id,
inventory_placement_id,
DENSE_RANK() OVER (PARTITION BY ak_user_id ORDER BY record_date DESC) AS position
FROM impression
WHERE record_date >= some_date AND
record_date < other_date
ORDER BY ak_user_id, record_date DESC
)
SELECT campaign_id, tracking_campaign_id, inventory_placement_id, position, COUNT(*) as ct
FROM annotated_chains
GROUP BY 1,2,3,4;


#### Query 3: Session-walk Position Histogram (Converters)

This query does the same thing as #2 but joins against the conversion table to construct a session that reaches 90 days back for each conversion event. This allows us to once again see if certain advertisements tend to be seen “close” to conversions.

    WITH annotated_chains AS (
SELECT i.ak_user_id                                AS ak_user_id,
i.campaign_id                               AS campaign_id,
i.tracking_campaign_id                      AS tracking_campaign_id,
i.inventory_placement_id                    AS inventory_placement_id,
i.record_date                               AS record_date,
DENSE_RANK()
OVER (PARTITION BY i.ak_user_id ORDER BY i.record_date DESC) AS position
FROM
impression i
JOIN  unattributedconversion c
ON     c.ak_user_id = i.ak_user_id
AND c.record_date >= i.record_date
AND (c.record_date - interval '90 days') < i.record_date
AND c.record_date >= some_date
AND c.record_date <  other_date
)
SELECT campaign_id, tracking_campaign_id, inventory_placement_id, position, count(*) AS ct
FROM annotated_chains
GROUP BY 1,2,3,4;


#### Query 4: Session-walk Date Histogram (Population)

This query is the date-based version of #2. Instead of reporting a position-based offset in the session, it reports a day-offset from the end of the session. This query allows us to examine temporal distribution of different advertisements.

    WITH annotated_chains AS (
SELECT campaign_id,
tracking_campaign_id,
inventory_placement_id,
DATEDIFF(day, record_date,
LAST_VALUE(record_date) OVER (PARTITION BY ak_user_id ORDER BY record_date ASC rows between unbounded preceding and unbounded following)
)  AS days_behind_conversion
FROM impression
WHERE record_date >= some_date AND
record_date < other_date
ORDER BY ak_user_id, record_date DESC
)
SELECT campaign_id, tracking_campaign_id, inventory_placement_id, days_behind_conversion, COUNT(*) as ct
FROM annotated_chains
GROUP BY 1,2,3,4;


#### Query 5: Session-walk Date Histogram (Converters)

This query is the date-based version of #3. Again, instead of position-based offsets, it reports day-offsets from conversion. This query explores temporal proximity of advertisements to conversions.

    WITH annotated_chains AS (
SELECT i.ak_user_id                                AS ak_user_id,
i.campaign_id                               AS campaign_id,
i.tracking_campaign_id                      AS tracking_campaign_id,
i.inventory_placement_id                    AS inventory_placement_id,
i.record_date                               AS record_date,
DATEDIFF(day, i.record_date, c.record_date) AS days_behind_conversion
FROM
impression i
JOIN  unattributedconversion c
ON     c.ak_user_id = i.ak_user_id
AND c.record_date >= i.record_date
AND (c.record_date - interval '90 days') < i.record_date
AND c.record_date >= some_date
AND c.record_date <  other_date
)
SELECT campaign_id, tracking_campaign_id, inventory_placement_id, days_behind_conversion, count(*) AS ct
FROM annotated_chains
GROUP BY 1,2,3,4;


## Call for Summer Interns

AK is looking for a summer intern in our R&D group. If any of our blog posts have interested you, then you’ll fit right in!

We’re looking for someone who has a good handle on a few programming languages (pick any two from R/Mathematica/Python/Javascript/Java) and has some math in their background — college-level calculus or algebra is plenty. Ideally, you’re interested in learning about:

• building and tuning high-performance data structures,
• streaming algorithms,
• interesting data visualizations, and

It’s OK if you’ve never seen the stuff we write about on the blog before! We didn’t either until we started researching them!

I can’t emphasize this enough: we don’t expect you to know how to do the things above yet. We simply expect you to have a passion for learning about them and the diligence to work through what (at the time) seem like impossible problems. Work experience is nice, but not necessary. As long as you can write clean code and can work hard, you’re well-qualified for this job.

If you’re interested, please send a brief note about why you’re interested, along with a CV and/or GitHub username to timon at aggregateknowledge dot com. For extra credit, please submit one (or more!) of the following:

• An implementation of HLL, Count-Min Sketch, K-Min Values, or Distinct Sampling in a language of your choice.
• An extension to Colin’s blog post about a good hash function that adds CityHash and SipHash to the shoot-out.
• An explanation of the tradeoffs between using a hash map and Count-Min Sketch for counting item frequency.

(I feel like I shouldn’t have to say this, but yes, these are all answered somewhere on the internet. Don’t plagiarize. What we want is for you to go learn from them and try your own hand at implementing/experimenting. Also, don’t freak out, these are extra credit!)

## Doubling the Size of an HLL Dynamically – Extra Bits…

Author’s Note: This post is related to a few previous posts on the HyperLogLog algorithm.  See Matt’s overview of the algorithm, and see this for an overview of “folding” or shrinking HLLs in order to perform set operations. It is also the final post in a series on doubling the number of bins in HLLs. The first post dealt with the recovery time after doubling, and the second dealt with doubling’s accuracy when taking unions of two HLLs.

### Introduction

The main draw to the HyperLogLog algorithm is its ability to make accurate cardinality estimates using small, fixed memory.  In practice, there are two choices a user makes which determine how much memory the algorithm will use: the number of registers (bins) and the size of each register (how high they can count).  As Timon discussed previously, increasing the size of each register will only increase the accuracy if the true cardinality of the stream is HUGE.

Recall that HyperLogLog (and most other streaming algorithms) is designed to work with a fixed number of registers, $m$, which is chosen as a function of the expected cardinality to approximate. We track a great number of different cardinality streams and in this context it is useful for us to not have one fixed value of $m$, but to have this evolve with the needs of a given estimation.

We are thus confronted with many engineering problems, some of which we have already discussed. In particular, one problem is that the neat feature of sketches, namely that they allow for an estimate of the cardinality of the union of multiple streams at no cost, depends on having sketches of the same size.

We’ve discussed how to get around this by folding HLLs, though with some increase in error. We’ve also explored a few options on how to effectively perform a doubling procedure. However, we started to wonder if any improvements could be made by using just a small amount of extra memory, say an extra bit for each register. In this post we will discuss one such idea and its use in doubling. Note: we don’t talk about quadrupling or more. We limit ourselves to the situation where HLL sketches only differ in $m$‘s by 1.

### The Setup

One of the downfalls in doubling is that it there is no way to know, after doubling, whether a value belongs in its bin or its partner bin. Recall that a “partner bin” is the register that could have been used had our “prefix” (the portion of the hashed value which is used to decide which register to update) been one bit longer. If the binary representation of the bin index used only two bits of the hashed value, e.g. $01$, then in an HLL that used a three-bit index, the same hashed value could have been placed in the bin whose index is either $101$ or $001$. Since $001$ and $01$ are the same number, we call $101$ the “partner bin”. (See the “Key Processing” section in Set Operations On HLLs of Different Sizes).

Consider an example where we have an HLL with $2^{10}$ bins.The $k^{th}$ bin has the value 7 in it, and after doubling we guess that its partner bin, at index $(2^{10} + k)^{th}$, should have a 5 in it. It is equally likely that the $k^{th}$ bin should have the 5 in it and the $(2^{10}+k)^{th}$ bin should have the 7 in it (since the “missing” prefix bit could have been a 1 or a 0)! Certainly the arrangement doesn’t change the basic cardinality estimate, but once we start getting involved with unions, the arrangement can make a very large difference.

To see how drastic the consequences can be, let’s look at a simple example. Suppose we start with an HLL with 2 bins and get the value 6 in each of its bins. Then we run the doubling procedure and decide that the partner bins should both have 1′s in them. With this information, it is equally likely that both of the arrangements below, “A” and “B”, could be the “true” larger HLL.

Further suppose we have some other data with which we wish to estimate the union. Below, I’ve diagrammed what happens when we take the union.

Arrangement A leads to a cardinality estimate (of the union) of about 12 and Arrangement B leads to a cardinality estimate (of the union) of about 122. This is an order of magnitude different! Obviously not all cases are this bad, but this example is instructive. It tells us that knowing the true location of each value is very important. We’ve attempted to improve our doubling estimate by keeping an extra bit of information as we will describe below.

### The Algorithm

Suppose we have an HLL with $m$ bins. Let’s keep another array of data which holds $m$ total bits, one for each bin — we will call these the “Cached Values.” For each bin, we keep a 0 if the value truly belongs in the bin in which it was placed (i.e. if, had we run an HLL with $2m$ bins, the value would have been placed in the first $m$ bins in the HLL), and we keep a 1 if the value truly belongs in the partner bin of the one in which it was placed (i.e. if, had we run an HLL with $2m$ bins, the value would have been placed in the last $m$ bins). See the image below for an example. Here we see two HLLs which have processed the same data. The one on the left is half the size and collects the cached values as it runs on the data. The one on the right is simply the usual HLL algorithm run on the same data.

Looking at the first row of the small HLL (with $m$ bins), the $0$ cache value means that the 2 “belongs” in the top half of the large HLL, i.e. if we had processed the stream using a larger HLL the 2 would be in the same register. Essentially this cached bit allows you to know exactly where the largest value in a bin was located in the larger HLL (if the $i^{th}$ bin has value $V$ and cached value $S$, we place the value $V$ in the $S * 2^{\log{m}} + i$ = $(S\cdot m + i)^{th}$ bin).

In practice, when we double, we populate the doubled HLL first with the (now correct location) bin values from the original HLL then we fill the remaining bins by using our “Proportion Doubling” algorithm.

Before we begin looking at the algorithm’s performance, let’s think about how much extra space this requires. In our new algorithm, notice that for each bin, we keep around either a zero or a one as its cached value. Hence, we require only one extra bit per bin to accommodate the cached values. Our implementation of HLL requires 5 bits per bin, since we want to be able to include values up to $2^5 -1= 31$ in our bins. Thus, a standard HLL with $m$ bins, requires $5m$ bits. Hence, this algorithm requires $5m + m = 6m$ bits (with the extra $m$ bins representing the cached values). This implies that this sketch requires 20% more space.

### The Data

Recall in the last post in this series, we explored doubling with two main strategies: Random Estimate (RE) and Proportion Doubling (PD). We did the same here, though using the additional information from this cached bit. We want to know a few things:

• Does doubling using a cache bit work? i.e. is it better to fold the bigger one or double the smaller one when comparing HLL’s of different sizes?
• Does adding in a cache bit change which doubling strategy is preferred (RE or PD)?
• Does the error in union estimate depend on intersection size as we have seen in the past?

Is it better to double or fold?

For each experiment we took 2 sets of data (each generated from 200k random keys) and estimated the intersection size between them using varying methods.

• “Folded”: estimate by filling up an HLL with $log_{2}(m) = 10$ and  comparing it to a folded HLL starting from $log_{2}(m) = 11$ and folded down $log_{2}(m) = 10$
• “Large”: estimate by using two HLL’s of a larger HLL of $log_{2}(m) = 11$.  This is effectively a lower bound for our doubling approaches.
• “Doubled – PD”: estimate by taking an HLL of $log_{2}(m) = 10$ and double it up to $log_{2}(m) = 11$ using the Proportion Doubling strategy.  Once this larger HLL is approximated we estimate the intersection with another HLL of native size $log_{2}(m) = 11$
• “Doubled – RE”: estimate by taking an HLL of $log_{2}(m) = 10$ and doubling up to $log_{2}(m) = 11$ using Random Estimate strategy.

We performed an experiment 300 times at varying intersection sizes from 0 up to 200k (100%) overlapping elements between sets (in steps of 10k). The plots below show our results (and extrapolate between points).

The graph of the mean error looks pretty bad for Random Estimate doubling. Again we see that the error depends heavily on the intersection size and becomes more biased as the set’s overlap more. On the other hand, Proportion Doubling was much more successful  (recall that this strategy forces the proportion of bins in the to-be-doubled HLL and the HLL with which we will union it to be equal before and after doubling.)  It’s possible there is some error bias with small intersections but we would need to run more trials to know for sure. As expected, the “Folded” and the “Large” are centered around zero. But what about the spread of the error?

The Proportion Doubling strategy looks great! In my last post on this subject, we found that this doubling strategy (without the cached part) really only worked well in the large intersection size regime, but here, with the extra cache bits, we seem to avoid that. Certainly the large intersection regime is where the standard deviation is lowest, but for every intersection size, it is significantly lower than that of the smaller HLL. This suggests that one of our largest sources of error when we use doubling in conjunction with unions is related to our lack of knowledge of the arrangement of the bins (i.e. when doubling, we do not know which of the two partner bins gets the larger, observed value). So it appears that the strategy of keeping cache bits around does indeed work, provided you use a decent doubling scheme.

Interestingly, it is always much better to double a smaller cache HLL than to fold a larger HLL when comparing sketches of different sizes. This is represented above by the lower error of the doubled HLL than the small HLL. The error bounds do seem to depend on the size of the intersection between the two sets but this will require more work to really understand how, especially in the case of Proportion Doubling.

Notes:  In this work we focus solely on doubling a HLL sketch and then immediately using this new structure to compute set operations. It would be interesting to see if set operation accuracy changes as a doubled HLL goes through its “recovery” period under varying doubling methods. It is our assumption that nothing out of the ordinary would come of this, but we definitely could be wrong. We will leave this as an exercise for the reader.

### Summary

We’ve found an interesting way of trading space for accuracy with this cached bit method, but there are certainly other ways of using an extra bit or two (per bucket). For instance, we could keep more information about the distribution of each bin by keeping a bit indicating whether or not the bin’s value minus one has been seen. (If the value is $k$, keep track of whether $k-1$ has shown up.)

We should be able to use any extra piece of information about the distribution or position of the data to help us obtain a more accurate estimate. Certainly, there are a myriad of other ideas ways of storing a bit or two of extra information per bin in order to gain a little leverage — it’s just a matter of figuring out what works best. We’ll be messing around more with this in the coming weeks, so if you have any ideas of what would work best, let us know in the comments!

(P.S. A lot of our recent work has been inspired by Flajolet et al.’s paper on PCSA – check out our post on this here!)

Thanks to Jeremie Lumbroso for his kind input on this post. We are much indebted to him and hopefully you will see more from our collaboration.

## Sketch of the Day: Probabilistic Counting with Stochastic Averaging (PCSA)

Before there was LogLog, SuperLogLog or HyperLogLog there was Probabilistic Counting with Stochastic Averaging (PCSA) from the seminal work “Probabilistic Counting Algorithms for Data Base Applications” (also known as the “FM Sketches” due to its two authors, Flajolet and Martin). The basis of PCSA matches that of the other Flajolet distinct value (DV) counters: hash values from a collection into binary strings, use patterns in those strings as indicators for the number of distinct values in that collection (bit-pattern observables), then use stochastic averaging to combine m trials into a better estimate. Our HyperLogLog post has more details on these estimators as well as stochastic averaging.

### Observables

The choice of observable pattern in PCSA comes from the knowledge that in a collection of randomly generated binary strings, the following probabilities occur:

\begin{aligned} &P( ..... 1) &= 2^{-1} \\ &P( .... 10) &= 2^{-2} \\ &P( ... 100) &= 2^{-3} \end{aligned}

$\vdots$

$P(...10^{k-1}) = 2^{-k}$

For each value added to the DV counter, a suitable hash is created and the position of the least-significant (right-most) 1 is determined. The corresponding position in a bitmap is updated and stored. I’ve created the simulation below so that you can get a feel for how this plays out.

Click above to run the bit-pattern simulation

(All bit representations in this post are numbered from 0 (the least-significant bit) on the right. This is the opposite of the direction in which they’re represented in the paper.)

Run the simulation a few times and notice how the bitmap is filled in. In particular, notice that it doesn’t necessarily fill in from the right side to the left — there are gaps that exist for a time that eventually get filled in. As the cardinality increases there will be a block of 1s on the right (the high probability slots), a block of 0s on the left (the low probability slots) and a “fringe” (as Flajolet et al. called it) of 1s and 0s in the middle.

I added a small pointer below the bitmap in the simulation to show how the cardinality corresponds to the expected bit position (based on the above probabilities). Notice what Flajolet et al. saw when they ran this same experiment: the least-significant (right-most) 0 is a pretty good estimator for the cardinality! In fact when you run multiple trials you see that this least-significant 0 for a given cardinality has a narrow distribution. When you combine the results with stochastic averaging it leads to a small relative error of $0.78 / \sqrt{m}$ and estimates the cardinality of the set quite well. You might have also observed that the most-significant (left-most) 1 can also be used for an estimator for the cardinality but it isn’t as clear-cut. This value is exactly the observable used in LogLog, SuperLogLog and HyperLogLog and does in fact lead to the larger relative error of $1.04 / \sqrt{m}$ (in the case of HLL).

### Algorithm

The PCSA algorithm is elegant in its simplicity:

m = 2^b # with b in [4...16]

bitmaps = [[0]*32]*m # initialize m 32bit wide bitmaps to 0s

##############################################################################################
# Construct the PCSA bitmaps
for h in hashed(data):
bitmap_index = 1 + get_bitmap_index( h,b ) # binary address of the rightmost b bits
run_length = run_of_zeros( h,b ) # length of the run of zeroes starting at bit b+1
bitmaps[ bitmap_index ][ run_length ] = 1 # set the bitmap bit based on the run length observed

##############################################################################################
# Determine the cardinality
phi = 0.77351
DV = m / phi * 2 ^ (sum( least_sig_bit( bitmap ) ) / m) # the DV estimate


Stochastic averaging is accomplished via the arithmetic mean.

You can see PCSA in action by clicking on the image below.

Click above to run the PCSA simulation

There is one point to note about PCSA and small cardinalities: Flajolet et al. mention that there are “initial nonlinearities” in the algorithm which result in poor estimation at small cardinalities ($n/m \approx 10 \, \text{to} \, 20$) which can be dealt with by introducing corrections but they leave it as an exercise for the reader to determine what those corrections are. Scheuermann et al. did the leg work in “Near-Optimal Compression of Probabilistic Counting Sketches for Networking Applications” and came up with a small correction term (see equation 6). Another approach is to simply use the linear (ball-bin) counting introduced in the HLL paper.

### Set Operations

Just like HLL and KMV, unions are trivial to compute and lossless. The PCSA sketch is essentially a “marker” for runs of zeroes, so to perform a union you merely bit-wise OR the two sets of bitmaps together. Folding a PCSA down to a smaller m works the same way as HLL but instead of HLL’s max you bit-wise OR the bitmaps together. Unfortunately for intersections you have the same issue as HLL, you must perform them using the inclusion/exclusion principle. We haven’t done the plots on intersection errors for PCSA but you can imagine they are similar to HLL (and have the benefit of the better relative error $0.78 / \sqrt{m}$).

### PCSA vs. HLL

That fact that PCSA has a better relative error than HyperLogLog with the same number of registers ($1.04 / 0.78 \approx 1.33$) is slightly deceiving in that $m$ (the number of stored observations) are different sizes. A better way to look at it is to fix the accuracy of the sketches and see how they compare. If we would like to have the same relative error from both sketches we can see that the relationship between registers is:

$\text{PCSA}_{RE} = \text{HLL}_{RE}$

$\dfrac{0.78}{\sqrt{m_{\scriptscriptstyle PCSA}}} = \dfrac{1.04}{\sqrt{m_{\scriptscriptstyle HLL}}}$

$m_{\scriptscriptstyle PCSA} = \left( \dfrac{0.78}{1.04} \right)^2 m_{\scriptscriptstyle HLL} \approx 0.563 \ m_{\scriptscriptstyle HLL}$

Interestingly, PCSA only needs a little more than half the registers of an HLL to reach the same relative error. But this is also deceiving. What we should be asking is what is the size of each sketch if they provide the same relative error? HLL commonly uses a register width of 5 bits to count to billions whereas PCSA requires 32 bits. That means a PCSA sketch with the same accuracy as an HLL would be:

\begin{aligned} \text{Size of PCSA} &= 32 \text{bits} \ m_{\scriptscriptstyle PCSA} = 32 \text{bits} \, ( 0.563 \ m_{\scriptscriptstyle HLL} ) \\ \\ \text{Size of HLL} &= 5 \text{bits} \ m_{\scriptscriptstyle HLL} \end{aligned}

Therefore,

$\dfrac{\text{Size of PCSA}} {\text{Size of HLL}} = \dfrac{32 \text{bits} \, ( 0.563 \ m_{\scriptscriptstyle HLL} )}{5 \text{bits} \ m_{\scriptscriptstyle HLL}} \approx 3.6$

A PCSA sketch with the same accuracy is 3.6 times larger than HLL!

### Optimizations

But what if you could make PCSA smaller by reducing the size of the bitmaps? Near the end of the paper in the Scrolling section, Flajolet et al. bring up the point that you can make the bitmaps take up less space. From the simulation you can observe that with a high probability there is a block of consecutive 1s on the right side of the bitmap and a block of consecutive 0s on the left side of the bitmap with a fringe in between. If one found the “global fringe” — that is the region defined by the left-most 1 and right-most 0 across all bitmaps — then only those bits need to be stored (along with an offset value). The authors theorized that a fringe width of 8 bits would be sufficient (though they fail to mention if there are any dependencies on the number of distinct values counted). We put this to the test.

In our simulations it appears that a fringe width of 12 bits is necessary to provide an unbiased estimator comparable to full-fringe PCSA (32-bit) for the range of distinct values we analyzed. (Notice the consistent bias of smaller fringe sizes.) There are many interesting reasons that this “fringe” concept can fail. Look at the notes to this post for more. If we take the above math and update 32 to 12 bits per register (and include the 32 bit offset value) we get:

\begin{aligned} \dfrac{\text{Size of PCSA}} {\text{Size of HLL}} &= \dfrac{12 \text{bits} \, ( 0.563 \ m_{\scriptscriptstyle HLL} ) + 32\text{bits}} {5 \text{bits} \cdot m_{\scriptscriptstyle HLL}} \\ \\ &= \dfrac{12 \text{bits} \, ( 0.563 \ m_{\scriptscriptstyle HLL} )} {5 \text{bits} \cdot m_{\scriptscriptstyle HLL}} + \dfrac{32\text{bits}} {5 \text{bits} \cdot m_{\scriptscriptstyle HLL}} \\ \\ &\approx 1.35 \text{ (for }m\gg64 \text{)} \end{aligned}

This is getting much closer to HLL! The combination of tighter bounds on the estimate and the fact that the fringe isn’t really that wide in practice result in PCSA being very close to the size of the much lauded HLL. This got us thinking about further compression techniques for PCSA. After all, we only need to get the sketch about 1/3 smaller to be comparable in size to HLL. In a future post we will talk about what happens if you Huffman code the PCSA bitmaps and the tradeoffs you make when you do this.

### Summary

PCSA provides for all of the goodness of HLL: very fast updates making it suitable for real-time use, small footprint compared to the information that it provides, tunable accuracy and unions. The fact that it has a much better relative error per register than HLL indicates that it should get more credit than it does. Unfortunately, each bitmap in PCSA requires more space than HLL and you still get less accuracy per bit. Look for a future post on how it is possible to use compression (e.g. Huffman encoding) to reduce the number of bits per bitmap, thus reducing the error per bit to match that of HLL, resulting in an approach that matches HLL in size but exceeds its precision!

### Notes on the Fringe

While we were putting this post together we discovered many interesting things to look at with respect to fringe optimization. One of the questions we wanted to answer was “How often does the limited size of the fringe muck up a bitmap?” Below is a plot that shows how often any given sketch had a truncation event (that affected the DV estimate) in the fringe of any one of its bitmaps for a given fringe width (i.e. some value could not be stored in the space available).

Note that this is an upper bound on the error that could be generated by truncation. If you compare the number of runs that had a truncation event (almost all of the runs) with the error plot in the post it is quite shocking that the errors are as small as they are.

Since we might not get around to all of the interesting research here, we are calling out to the community to help! Some ideas:

1. There are likely a few ways to improve the fringe truncation. Since PCSA is so sensitive to the least-significant 1 in each bitmap, it would be very interesting to see how different approaches affect the algorithm. For example, in our algorithm we “left” truncated meaning that all bitmaps had to have a one in the least-significant position of the bitmap in order to move up the offset. It would be interesting to look at “right” truncation. If one bitmap is causing many of the others to not record incoming values perhaps it should be bumped up. Is there some math to back up this intuition?
2. It is interesting to us that the fringe width truncation events are DV dependent. We struggled with the math on this for a bit before we just stopped. Essentially we want to know what is the width of the theoretical fringe? It obviously appears to be DV dependent and some sort of coupon collector problem with unequal probabilities. Someone with better math skills than us needs to help here.

### Closing thoughts

We uncovered PCSA again as a way to go back to first principles and see if there are lessons to be learned that could be applied to HLL to make it even better. For instance, can all of this work on the fringe be applied to HLL to reduce the number of bits per register while still maintaining the same precision? HLL effectively records the “strandline” (what we call the left-most 1s). More research into how this strandline behaves and if it is possible to improve the storage of it through truncation could reduce the standard HLL register width from 5 bits to 4, a huge savings! Obviously, we uncovered a lot of open questions with this research and we feel there are algorithmic improvements to HLL right around the corner. We have done some preliminary tests and the results so far are intriguing. Stay tuned!

## Doubling the Size of an HLL Dynamically – Unions

Author’s Note: This post is related to a few previous posts dealing with the HyperLogLog algorithm.  See Matt’s overview of the algorithm, and see this post for an overview of “folding” or shrinking HLLs in order to perform set operations. It is also the second in a series of three posts on doubling the number of bins of HLLs. The first post dealt with the recovery time after doubling and the next post will deal with ways to utilize an extra bit or two per bin.

### Overview

Let’s say we have two streams of data which we’re monitoring with the HLL algorithm, and we’d like to get an estimate on the cardinality of these two streams combined, i.e. thought of as one large stream.  In this case, we have to take advantage of the algorithm’s built-in “unionfeature.  Done naively, the accuracy of the estimate will depend entirely on the the number of bins, $m$, of the smaller of the two HLLs.  In this case, to make our estimate more accurate, we would need to increase this $m$ of one (or both) of our HLLs.  This post will investigate the feasibility of doing this; we will apply our idea of “doubling” to see if we can gain any accuracy.  We will not focus on intersections, since the only support the HyperLogLog algorithm has for intersections is via the inclusion/exclusion principle. Hence the error can be kind of funky for this – for a better overview of this, check out Timon’s post here. For this reason, we only focus on how the union works with doubling.

### The Strategy: A Quick Reminder

In my last post we discussed the benefits and drawbacks of many different doubling strategies in the context of recovery time of the HLL after doubling. Eventually we saw that two of our doubling strategies worked significantly better than the others. In this post, instead of testing many different strategies, we’ll focus instead on one strategy, “proportion doubling” (PD), and how to manipulate it to work best in the context of unions. The idea behind PD is to guess the approximate intersection cardinality of the two datasets and to force that estimate to remain after doubling. To be more specific, suppose we have an HLL $A$ and an HLL $B$ with$n$ bins and $2n$ bins, respectively. Then we check what proportion of bins in $A$, call it $p$, agree with the bins in $B$. When we doubled $A$, we fill in the bins by randomly selecting $p\cdot n$ bins, and filling them in with the value in the corresponding bins in $B$. To fill in the rest of the bins, we fill them in randomly according to the distribution.

### The Naive Approach

To get some idea of how well this would work, I put the most naive strategy to the test. The idea was to run 100 trials where I took two HLLs (one of size $2^5 = 32$ and one of size $2^6 = 64$), ran 200K keys through them, doubled the smaller one (according to Random Estimate), and took a union. I had a hunch that the accuracy of our estimate after doubling would depend on how large the true intersection cardinality of the two datasets would be, so I ran this experiment for overlaps of size 0, 10K, 20K, etc. The graphs below are organized by the true intersection cardinality, and each graph shows the boxplot of the error for the trials.

This graph is a little overwhelming and a bit of a strange way to display the data, but is useful for getting a feel for how the three estimates work in the different regimes.  The graph below is from the same data and just compares the “Small” and “Doubled” HLLs.  The shaded region represents the middle 50% of the data, and the blue dots represent the data points.

The first thing to notice about these graphs is the accuracy of the estimate in the small intersection regime. However, outside of this, the estimates are not very accurate – it is clearly a better choice to just use the estimate from the smaller HLL.

Let’s try a second approach. Above we noticed that the algorithm’s accuracy depended on the cardinality of the intersection. Let’s try to take that into consideration. Let’s use the “Proportion Doubling” (PD) strategy we discussed in our first post. That post goes more in depth into the algorithm, but the take away is that this doubling strategy preserves the proportion of bins in the two HLLs which agree. I ran some trials like I did above to get some data on this. The graphs below represent this.

Here we again, show the data in a second graph comparing just the “Doubled” and “Small” HLL estimates.  Notice how much tighter the middle 50% region is on the top graph (for the “Doubled” HLL).  Hence in the large intersection regime, we get very accurate estimates.

One thing to notice about the second set of graphs is how narrow the error bars are.  Even when the estimate is biased, it still has much smaller error.  Also, notice that this works well in the large intersection regime but horribly in the small intersection regime.  This suggests that we may be able to interpolate our strategies. The next set of graphs is for an attempt at this. The algorithm gets an estimate of the intersection cardinality, then decides to either double using PD, double using RE, or not double depending on whether the intersection is large, small, or medium.

Here, the algorithm works well in the large intersection regime and doesn’t totally crap out outside of this regime (like the second algorithm), but doesn’t sustain the accuracy of the first algorithm in the small intersection regime. This is most likely because the algorithm cannot “know” which regime it is in and thus, must make a guess.  Eventually, it will guess wrong will severely underestimate the union cardinality. This will introduce a lot of error, and hence, our boxplot looks silly in this regime. The graph below shows the inefficacy of this new strategy. Notice that there are virtually no gains in accuracy in the top graph.

### Conclusion

With some trickery, it is indeed possible to gain some some accuracy when estimating the cardinality of the union of two HLLs by doubling one.  However, in order for this to be feasible, we need to apply the correct algorithm in the correct regime. This isn’t a major disappointment since for many practical cases, it would be easy to guess which regime the HLLs should fall under and we could build in the necessary safeguards if we guess incorrectly.  In any case, our gains were modest but certainly encouraging!

## HyperLogLog++: Google’s Take On Engineering HLL

Matt Abrams recently pointed me to Google’s excellent paper “HyperLogLog in Practice: Algorithmic Engineering of a State of The Art Cardinality Estimation Algorithm” [UPDATE: changed the link to the paper version without typos] and I thought I’d share my take on it and explain a few points that I had trouble getting through the first time. The paper offers a few interesting improvements that are worth noting:

1. Move to a 64-bit hash function
2. A new small-cardinality estimation regime
3. A sparse representation

I’ll take a look at these one at a time and share our experience with similar optimizations we’ve developed for a streaming (low latency, high throughput) environment.

### 32-bit vs. 64-bit hash function

I’ll motivate the move to a 64-bit hash function in the context of the original paper a bit more since the Google paper doesn’t really cover it except to note that they wanted to count billions of distinct values.

#### Some math

In the original HLL paper, a 32-bit hash function is required with the caveat that measuring cardinalities in the hundreds of millions or billions would become problematic because of hash collisions. Specifically, Flajolet et al. propose a “large range correction” for when the estimate $E$ is greater than $2^{32}/30$.  In this regime, they replace the usual HLL estimate by the estimate

$\displaystyle E^* := -2^{32} \mbox{log}\Big(1 - \frac{E}{2^{32}}\Big)$.

This reduces to a simple probabilistic argument that can be modeled with balls being dropped into bins. Say we have an $L$-bit hash. Each distinct value is a ball and each bin is designated by a value in the hash space.  Hence, you “randomly” drop a ball into a bin if the hashed value of the ball corresponds to the hash value attached to the bin.  Then, if we get an estimate $E$ for the cardinality, that means that (approximately) $E$ of our bins have values in them, and so there are $2^L - E$ empty bins.  The number of empty bins should be about $2^L e^{ - n/2^{L} }$, where $n$ is the number of balls.  Hence $2^{L} - E = 2^{L} e^{-n/2^{L}}$.  Solving this gives us the formula he recommends using: $-2^L \log(1 - \frac{E}{2^{L}})$.

Aside:  The empty bins expected value comes from the fact that

$E(\# \text{ of empty bins}) = m(1 - \frac{1}{m})^{n}$,

where $m$ is the number of bins and $n$ the number of balls.  This is pretty quick to show by induction.  Hence,

$\displaystyle E(\#\text{ of empty bins}) \sim m e^{-n/m}$ as $n \rightarrow \infty$.

Again, the general idea is that the $E$ ends up being some number smaller than $n$ because some of the balls are getting hashed to the same value.  The correction essentially doesn’t do anything in the case when $E$ is small compared to $2^L$ as you can see here. (Plotted is $-\log(1 - x)$, where $x$ represents $E / 2^L$, against the line $y = x$. The difference between the two graphs represents the difference between $E$ and $n$.)

#### A solution and a rebuttal

The natural move to start estimating cardinalities in the billions is to simply move to a larger hash space where the hash collision probability becomes negligibly small. This is fairly straightforward since most good hash functions give you at least 64-bits of entropy these days and it’s also the size of a machine word. There’s really no downside to moving to the larger hash space, from an engineering perspective. However, the Google researchers go one step further: they increase the register width by one bit (to 6), as well, ostensibly to be able to support the higher possible register values in the 64-bit setting. I contend this is unnecessary. If we look at the distribution of register values for a given cardinality, we see that it takes about a trillion elements before a 5-bit register overflows (at the black line):

The distributions above come from the LogLog paper, on page 611, right before formula 2. Look for $\mathbb{P}_{\nu}(M = k)$.

Consider the setting in the paper where $p = \log_2(m) = 14$. Let’s says we wanted to safely count into the 100 billion range. If we have $L = p + (2^5 - 1) = 14 + 31 = 45$ then our new “large range correction” boundary is roughly one trillion, per the adapted formula above. As you can see from the graph below, even at $p = 10, L = 41$ the large range correction only kicks in at a little under 100 billion!

The black line is the cutoff for a 5-bit register, and the points are plotted when the total number of hash bits required reaches 40, 50, and 60.

The real question though is all this practically useful? I would argue no: there are no internet phenomena that I know of that are producing more than tens of billions of distinct values, and there’s not even a practical way of empirically testing the accuracy of HLL at cardinalities above 100 billion. (Assuming you could insert 50 million unique, random hashed values per second, it would take half an hour to fill an HLL to 100 billion elements, and then you’d have to repeat that 5000 times as they do in the paper for a grand total of 4 months of compute time per cardinality in the range.)

[UPDATE: After talking with Marc Nunkesser (one of the authors) it seems that Google may have a legitimate need for both the 100 billion to trillion range right now, and even more later, so I retract my statement! For the rest of us mere mortals, maybe this section will be useful for picking whether or not you need five or six bits.]

At AK we’ve run a few hundred test runs at 1, 1.5, and 2 billion distinct values under the $p = 10-14, L = 41-45$ configuration range and found the relative error to be identical to that of lower cardinalities (in the millions of DVs). There’s really no reason to inflate the storage requirements for large cardinality HLLs by 20% simply because the hash space has expanded. Furthermore, you can do all kinds of simple tricks like storing an offset as metadata (which would only require at most 5 bits) for a whole HLL and storing the register values as the difference from that base offset, in order to make use of a larger hash space.

### Small Cardinality Estimation

Simply put, the paper introduces a second small range correction between the existing one (linear counting) and the “normal” harmonic mean estimator ($E$ in the original paper) in order to eliminate the “large” jump in relative error around the boundary between the two estimators.

They empirically calculate the bias of $E$ and create a lookup table for various $p$, for 200 values less than $5 \cdot 2^p$ with a correction to the overestimate of $E$. They interpolate between the 200 reference points to determine the correction to apply for any given raw $E$ value. Their plots give compelling evidence that this bias correction makes a difference in the $m$ to $5m$ cardinality range (cuts 95th percentile relative error from about 2% to 1.2%).

I’ve been a bit terse about this improvement since sadly it doesn’t help us at AK much because most of our data is Zipfian. Few of our reporting keys live in the narrow cardinality range they are optimizing: they either wallow in the linear counting range or shoot straight up into the normal estimator range.

Nonetheless, if you find you’re doing a lot of DV counting in this range, these corrections are pretty cheap to implement (as they’ve provided numerical values for all the cutoffs and bias corrections in the appendix.)

### Sparse representation

The general theme of this optimization isn’t particularly new (our friends at MetaMarkets mentioned it in this post): for smaller cardinality HLLs there’s no need to materialize all the registers. You can get away with just materializing the set registers in a map. The paper proposes a sorted map (by register index) with a hash map off the side to allow for fast insertions. Once the hash map reaches a certain size, its entries are batch-merged into the sorted list, and once the sorted list reaches the size of the materialized HLL, the whole thing is converted to the fully-materialized representation.

Aside: Though the hash map is a clever optimization, you could skip it if you didn’t want the added complexity. We’ve found that the simple sorted list version is extremely fast (hundreds of thousands of inserts per second). Also beware the variability of the the batched sort-and-merge cost every time the hash map repeatedly outgrows its limits and has to be merged into the sorted list. This can add significant latency spikes to a streaming system, whereas a one-by-one insertion sort to a sorted list will be slower but less variable.

The next bit is very clever: they increase $p$ when the HLL is in the sparse representation because of the saved space. Since they’re already storing entries in 32-bit integers, they increase $p$ to $p^{\prime} = 31 - \mbox{regWidth} = 31 - 6 = 25$. (I’ll get to the leftover bit in a second!) This gives them increased precision which they can simply “fold” down when converting from the sparse to fully materialized representation. Even more clever is their trick of not having to always store the full register value as the value of an entry in the map. Instead, if the longer register index encodes enough bits to determine the value, they use the leftover bit I mentioned before to signal as much.

In the diagram, $p$ and $p^{\prime}$ are as in the Google paper, and $q$ and $q^{\prime}$ are the number of bits that need to be examined to determine $\rho$ for either the $p$ or $p^{\prime}$ regime. I encourage you to read section 5.3.3 as well as EncodeHash and DecodeHash in Figure 8 to see the whole thing. [UPDATE: removed the typo section as it has been fixed in the most recent version of the paper (linked at the top)]

The paper also tacks on a difference encoding (which works very well because it’s a sorted list) and a variable length encoding to the sparse representation to further shrink the storage needed, so that the HLL can use the increased register count, $p^{\prime}$, for longer before reverting to the fully materialized representation at $p$. There’s not much to say about it because it seems to work very well, based on their plots, but I’ll note that in no way is that type of encoding suitable for streaming or “real-time” applications. The encode/decode overhead simply takes an already slow (relative to the fully materialized representation) sparse format and adds more CPU overhead.

### Conclusion

The researchers at Google have done a great job with this paper, meaningfully tackling some hard engineering problems and showing some real cleverness. Most of the optimizations proposed work very well in a database context, where the HLLs are either being used as temporary aggregators or are being stored as read-only data, however some of the optimizations aren’t suitable for streaming/”real-time” work. On a more personal note, it’s very refreshing to see real algorithmic engineering work being published from industry. Rob, Matt, and I just got back from New Orleans and SODA / ALENEX / ANALCO and were hoping to see more work in this area, and Google sure did deliver!

### Appendix

Sebastiano Vigna brought up the point that 6-bit registers are necessary for counting above 4 billion in the comments. I addressed it in the original post (see “A solution and a rebuttal“) but I’ll lay out the math in a bit more detail here to show that you can easily count above 4 billion with only 5-bit registers.

If you examine the original LogLog paper (the same as mentioned above) you’ll see that the register distribution for LogLog (and HyperLogLog consequently) registers is

$\displaystyle \mathbb{P}_{\nu}(M > k) = 1 - \mathbb{P}_{\nu}(M \le k) = 1 - \Big(1 - \frac{1}{2^k}\Big)^{\nu}$

where $k$ is the register value and $\nu$ is the number of (distinct) elements a register has seen.

So, I assert that 5 bits for a register (which allows the maximum value to be 31) is enough to count to ten billion without any special tricks. Let’s fix $p=14$ and say we insert $10^{10}$ distinct elements. That means, any given register will see about $\frac{10^{10}}{2^p} = \frac{10^{10}}{2^{14}} = \approx 6.1 \times 10^5$ elements assuming we have a decent hash function. So, the probability that a given register will have a value greater than 31 is (per the LogLog formula given above)

$\displaystyle \mathbb{P}_{\nu}(M > 31) = 1 - \mathbb{P}_{\nu}(M \le 31) = 1 - \Big(1 - \frac{1}{2^{31}}\Big)^{6.1 \times 10^5} \approx 0.00028$

and hence the expected number of registers that would overflow is approximately $2^{14} \times 0.00028 \approx 4.5$. So five registers out of sixteen thousand would overflow. I am skeptical that this would meaningfully affect the cardinality computation. In fact, I ran a few tests to verify this and found that the average number of registers with values greater than 31 was 4.5 and the relative error had the same standard deviation as that predicted by the paper, $1.04/\sqrt{m}$.

For argument, let’s assume that you find those five overflowed registers to be unacceptable. I argue that you could maintain an offset in 5 bits for the whole HLL that would allow you to still use 5 bit registers but exactly store the value of every register with extremely high probability. I claim that with overwhelmingly high probability, every register the HLL used above is greater than 15 and less than or equal to 40. Again, we can use the same distribution as above and we find that the probability of a given register being outside those bounds is

$\mathbb{P}_{\nu}(M < 15) \approx 10^{-162}$ and

$\mathbb{P}_{\nu}(M > 40) \approx 10^{-7}$.

Effectively, there are no register values outside of $[15,40]$. Now I know that I can just store 15 in my offset and the true value minus the offset (which now fits in 5 bits) in the actual registers.