The last time Hackerfall tried to access this page, it returned a not found error. A cached version of the page is below, or clickhereto continue anyway

*
This is the second post in a series discussing the architecture and
implementation of massively parallel databases, such as Vertica [0],
BigQuery [1] or EventQL [2]. The target audience are
software and system engineers with an interest in databases and distributed systems.
*

In the last post we saw that in order to execute interactive queries on a large data set we have to split the data up into smaller partitions and put each partition on it's own server. This way we can utilize the combined processing power of all servers to answer the query rapidly.

The problem we'll discuss today is how exactly we're going to split up a given dataset into partitions and distribute them among servers.

Say we have a table containing a large number of rows, a couple billion or so. The total size of the table is around 100TB. Our task is to distribute the rows uniformly among 20 servers, i.e. put roughly 5TB of the table on each server.

Of course, solving that task is trivial: We read in our 100TB source table, write the first 5TB of rows to the first server, the next 5TB to the second server and so on.

While this simplistic scheme works well for a static dataset, we'll have to be more clever if we are to implement an entire database that supports adding and modifying rows.

Why? Consider this: If we want to modify a row in our naively partitioned table,
we first have to figure out on which server we have put the row when splitting
the table into pieces. Now, to find the row, we have to search through all rows
until we hit the correct one. In the worst case we would have to examine *all*
rows on *all* servers to find any single row - the whole 100TB of data.

In more technical terms: Locating a row has *linear complexity* [3]: Finding
the row in a table containing one thousand rows would take one thousand times
longer than finding the row in a table containing just one row. It gets slower
and slower as we add more rows.

Clearly, our simplistic solution will not scale: We need a more efficient way to figure out on which server a given row is stored.

To quickly tell the location of a specific row, we could store an index file somewhere that records the location of each row. We could then do a quick lookup into our index to find the correct server instead of searching through all the data.

Sadly, this doesn't solve the problem. The index file would still have linear complexity. Allthough this time it wouldn't be linear in time, but in space: If we were storing a bazillion rows in our table, our index file would also have a bazillion rows.

Essentially, we would just be rephrasing the problem statement from
*"How do we partition a large table?"* to *"How do we partition a large index
file?".*

We'll have to come up with a partitioning scheme that allows us to find rows with less than linear complexity. I.e. we have to find an algorithm that can correctly compute the location of any single row, but doesn't get slower and slower as we add more rows to the table.

One such algorithm is called *modulo hashing*. The good thing about modulo
hashing is that it's not only very efficient but also extremely simple to
implement.

If we want to partition our input table using modulo hashing, we first have to
assign an identifier `ID`

to every row in the table. This `ID`

is usually
dervied from the row itself, for example by designating one of the table's columns
as the primary key. For illustration purposes, we will use numeric identifiers,
but the same works with strings. [4]

The only piece of informationthat modulo hashing keeps is a single variable `N`

.
This variable `N`

contains the number of servers among which the table should
be partitioned.

Now, to figure out on which server a given row belongs, we simply compute the
remainder of the division of the row's `ID`

over `N`

. This use of the modulo
operation is also where the algorithm derives its name from.

```
find_row(row_id) {
server_id := row_id % N;
return server_id;
}
```

If, for example, we wanted to locate the row with `ID=123`

in a table partitioned
among 8 servers (`N=8`

), the row would be stored on server number 3
(`123 % 8 = 3`

).

Of course, this is assuming we have also used the algorithm to decide on which server to put each row while loading the input table in the first place.

Modulo hashing works out so that every possible `ID`

is consistently mapped
to a single server. The distribution of the rows among servers will be
approximately uniform, i.e. every server will get roughly the same number of
rows. [5]

The modulo hashing algorithm is a huge improvement over our naive approach as it
is constant in space and time: Locating a row takes the same amount of time
regardless of the total number of rows in the table. And the only pieces of
information we need to tell where a row goes are the row's `ID`

and the total
number of servers `N`

, which will only take a few bytes to store. Not bad.

So are we done? I'm afraid we're not.

One thing modulo hashing can't handle well is a growing dataset. If we're
continually adding more rows to the table, we will eventually have to add more
capacity and increase the number of servers `N`

. However, once we do that the
locations of almost all of the rows would change, since changing `N`

also changes
the result of all modulo operations.

This means that in order to increase the number of servers `N`

and still keep the
rows where they belong, we would have to copy every single row in the table to
it's new location every time we add or remove a server.

That's not exactly ideal: Even if we did not care about the massive overhead of copying every single row, we would eventually reach a point where our table grows faster than we can rebalance it.

Consistent hashing is a more involved version of modulo hashing. The main improvement of consistent hashing is that it allows to add and remove servers without affecting the locations of all rows.

At the heart of the consistent hashing algorithm is a so called *circular
keyspace*. Before we discuss what exactly that means let's define the term
*keyspace*:

As with modular hashing we need to assign an identifier `ID`

to each row. Now,
the keyspace is the range of all valid `ID`

values. If the `ID`

is numeric, the
keyspace is the range from negative infinity to positive infinity.

A keyspace and the positions of three row identifiers within the keyspace.

Within the keyspace, identifiers are well-ordered [6]. That means each `ID`

has a
*successor* and a *predecessor*. The successor of an `ID`

is the `ID`

that goes
immediately after it and the predecessor is the one that goes immediately before it.

In the illustration above, the successor of green (`ID=123`

) is red (`ID=856`

), the
successor of red (`ID=856`

) is yellow (`ID=923`

) and so on.

But what is the successor of yellow (`ID=923`

)? Does it have one? The answer is
not entirely clear. However, we will later have to come up with a successor for each
possible position in the keyspace, so we will have to define what the successor
of the last position in our keyspace is.

Imagine, we glued one end of our keyspace to the other end:

A circular keyspace and the positions of three row identifiers within the keyspace.

Now we can clearly say that, in clockwise order, yellow's (`ID=923`

) successor
is green (`ID+123`

). Also, it finally looks like a circular keyspace!

Back to consistent hashing. Like we did with modulo hashing, we will choose
an initial number of servers `N`

. For each of the `N`

servers, we will put a marker
at a random position in the circular keyspace.

To decide on which server a given row `ID`

belongs, we first locate the position
of the `ID`

in the circular keyspace and then search clockwise for the next
server marker. In other words: each row goes onto the server whose marker
immediately succeeds the row-id's position in the keyspace.

The illustration below shows a circular keyspace with eight server markers. In
the illustration, the succeeding server marker for row `123`

is `server 1`

,
while the suceeding server marker for rows `856`

and `923`

is `server 3`

.

So row `123`

gets stored on `server 1`

while rows `856`

and `923`

get stored on
`server 3`

.

A circular keyspace with three row identifiers and eight server markers.

Alike the modulo hashing scheme, we still end up with a uniform distribution of rows among servers [7] and can quickly locate each row. The only information we have to store are the positions of each server's marker in the keyspace.

Additionally, any server marker that we add or remove will only affect the rows immediately between it and the previous marker: The location of all other rows remain consistent, hence the name consistent hashing.

This means we can add or remove servers and only have to move a small subset of the
rows into their new locations. The ratio of rows that needs to be moved is `1/N`

where `N`

is the number of servers. So as we add more servers, the percentage of
rows that need to be moved actually gets smaller.

Can we still do better? It depends on your usecase. Consistent hashing is successfully employed in a number of popular key/value databases [8] such as DynamoDB [9], Cassandra [10] or memcache [11]. Nevertheless, here are two things that we could improve about consistent hashing:

Firstly, consistent hashing only supports an exact lookup operation. That is, we
can only find a row quickly if we already know it's `ID`

. If we want to find the
locations of all rows in a range of `IDs`

, for example all rows with an `ID`

between `100`

and `200`

, we're back to scanning the full table.

Because of this, consistent hashing is particularly well suited for key/value databases where range scans are not required but less so for OLAP [12] systems like EventQL.

The other possible improvement is that we still have to copy roughly `1/N`

of the
table's rows after changing the number of servers. If we had 100TB on 20 servers,
that would mean we're - realistically - still copying at least 15TB for every
server addition [13]. Not too bad, but still a lot of overhead network traffic.

The last algorithm we will look at today is best known for it's publication in Google's BigTable paper [14].

The bigtable algorithm takes a completely different approach to the problem, but it also starts by defining a keyspace. Except this time it's not a circular keyspace, but a linear one.

The illustration below shows a bigtable keyspace and the position of three rows
with the identifiers `123`

, `856`

and `923`

within the keyspace.

The next thing bigtable does is to split up the keyspace into a number of
*partitions* that are defined by their start and end positions, i.e. by the
lowest and highest row identifiers that will still be contained in the partition.

The illustration below shows a keyspace that is split into five parititions
A-E. In the illustration, the row with `ID=123`

goes into partition `B`

and
the rows with `ID=856`

and `ID=923`

both go into partition `D`

.

Three record identifiers are mapped onto five partitions

Now, the clever bit about the bigtable algorithm is how it comes up with the partition boundaries. To see why it's clever we have to understand why we can't simply divide the keyspace into equal parts without knowing the exact distribution of the input data:

One reason for that is that if you split up the range from negative to positive infinity into a list of discrete partitions, you end up with an infinite number of partitions.

The other reason is that it's highly likely that the row identifiers will all be piled up in a small area of the keyspace. After all, the identifiers might be user-supplied so we can't nessecarily guarantee anything about their distribution.

Realistic distribution of record identifiers in the keyspace

So it could be, that even though we have split up the keyspace into a large number of partitions, all rows actually end up in the same partition. And we can't solve the problem by making the partitions infinitesimally small either - that would be like going to back to keeping track of every row's location individually, just a bit worse.

Here's how bigtable solves the problem: Initially the table starts out with a single partition that covers the whole keyspace - from negative infinity to positive infiity.

As soon as this first partition has become too large, it will be split in two. The split point will be chosen so that it roughly halves the data in the partition into equal parts. This continues recursively as partitions become too large. At a basic level, it's an application of the classic divide and conquer principle [15].

Partition D is splitting into partitions D1 and D2

This way, you always end up with a number of partitions that are roughly equal in size. Even though the distribution of row identifiers in the keyspace is initially unknown.

And since each partition is defined in terms of it's lowest and highest contained row identifier, we can easily implement efficient range scans: To find all rows in a given range of identifiers, we only have to scan the partitions with overlapping ranges.

Lastly, the bigtable scheme does require a second allocation layer to assign partitions to servers that we didn't discuss here. Suffice to say that this second allocation layer allows us to add new servers to a cluster without physically moving a single row. Of course, we still need to move around some rows every time we split a partition.

So is the bigtable algorithm really "better" than consistent hashing? Again, it depends on the usecase.

The upsides of the bigtable scheme are that it supports range scans and that we can add capacity to a cluster without copying rows. The major downside is that implementing the algorithm in a masterless system requires a fair amount of code and synchronization.

For EventQL, we still chose the bigtable algorithm as the clear winner. After
reading this post, go have a look at the debug interface of EventQL where you
can see the *partition map* for a given table. Hopefully it will make a lot more
sense now:

Partition map for an EventQL table with a DATETIME primary key

That's all for today. In the next post we will discuss how to handle streaming updates on columnar files. You can subscribe to email updates for upcoming posts or the RSS feed in the sidebar.

Tagged with massively parallel databases, internals, eventql, bigquery, vertica, sql, columnar storage