Last night, I had an idea for a new lock-free algorithm: a single-producer, single-producer queue with a wait-free push and a lock-free pop. The algorithm in question, which is remarkably efficient for, e.g., logging (which is why I called the project HSL for High-Speed Logging) is prone to what I call “spurious popping” – i.e., the queue can produce the same value more than once if it runs into a race condition, due to the wait-free nature of the push. The details of this are beyond the scope of the C++ for the self-taught column, but suffice it to say that I worked around this limitation (which is not a bug, by the way) by detecting it. An added peculiarity of my solution was that several queues had to be synchronized together by the consumer, to sort the values popped from the queues. Suffice it to say that I decided to kill two birds with one stone and used a heap.

This article will explain why: I’ll first explain what a heap is, how it works, show you the complete implementation of a heap in C and analyze its computational complexity. I will also discuss the alternatives I had and why I chose to use a heap in stead.

## What is a heap?

A heap (in our case) is a binary tree of which each node’s invariant is determined by a strict weak ordering predicate. I’ll explain what that means by explaining each of the terms I just used: *invariant*, *predicate*,*strict weak ordering* and *binary tree*.

### What is an invariant?

An invariant, in its largest definition, is something that must always be the same. In our case, it is an assertion, or a set of assertions, that must always be true in order for our abstract data type (ADT, the heap) to be in a consistent state. For example: a *trapezium* is a shape that has four corners, each of which is connected to the next by a straight line, and those lines cannot cross. The invariants of a trapezium are:

- it has four corners
- each corner is connected to two other corners by a straight line
- none of the lines cross another line

.

The more invariants you add, the more you know about the object you’re adding invariants to. For example: a parallelogram is a trapezium of which each side is parallel to another side, which implies that each corner has the same angle as the opposing corner. Hence, the assertions

- each side is parallel to another side, and
- each corner has the same angle as the opposing corner

are both true, but are also mutually inclusive (if one is true, so is the other). Now, you can see that a parallelogram is really a special case of a trapezium – we’d model that in C++ as two classes, Trapezium and Parallelogram, which derive from each other (Parallelogram is derived from Trapezium). This implies that if you derive one class from another, the derived class must uphold all the invariants of the base class.

Let’s put this notion in pseudo-code:

class Trapezium { void checkInvariants() { assert(this has four corners); assert(each corner is connected to two others by a straight line); assert(none of the lines cross any of the other lines); assert(at least one of the lines is parallel to another line); } }; class Parallelogram : public Trapezium { void checkInvariants() { Trapezium::checkInvariants(); assert(each of the sides is parallel to another side); assert(each of the corners has the same angle as the opposing corner); } };

We can push this notion a bit further by considering lozenges and rectangles: lozenges are parallelograms of which all the sides have the same length; rectangles are parallelograms of which all corners have a 90 degree angle:

class Lozenge : public Parallelogram { void checkInvariants() { Parallelogram::checkInvariants(); assert(all of the sides have the same length); } }; class Rectangle : public Parallelogram { void checkInvariants() { Parallelogram::checkInvariants(); assert(all corners have a 90 degree angle); } };

Now, you can see that by combining sets of invariants, we end up creating classes that don’t have any invariants of their own, but are still very useful:

class Square : Rectangle, Lozenge { void checkInvariants() { Rectangle::checkInvariants(); Lozenge::checkInvariants(); } }

In object-oriented design, invariants are very important, though sometimes they cannot be easily expressed in code. When they *can* be expressed in code, however, it is almost always useful to do so – though it is not always useful to check them during the execution of the code in production. When they are checked, there are a few caveats you should know about:

- Invariants don’t only apply to the
*visible*state of an object - Often, e.g. when implementing abstract data types such as heaps, the implementation can be wildly different from the visible state: the visible state of a heap, for example, is much the same as a stack, but sorted. Internally, unlike a stack, it is a tree which, unlike other trees, may be mapped into an array. In such a case, the invariants of an array apply, as do the invariants of our binary tree, but those of a “sorted stack” actually don’t apply, even though the ADT behaves as one.
Similarly, a circular buffer is usually implemented as an array and two pointers into the array and is often exposed as a contiguous space though an API. The buffer is mapped into the array using the two pointers, meaning the invariants of the buffer (such as the size of the writable space plus the size of the readable space is a constant) are different from the invariants of the implementation (such as both pointers always point within the array or one past the array).

- Invariants may not hold at any moment, but should always hold after public operations have been performed
- When implementing any non-trivial class that has invariants that can be expressed in code, those invariants invariably get broken while operations are in progress. Even externally visible invariants may be broken during such operations – and re-established afterwards. Take the circular buffer as an example (we’ll get to the heap when we start discussing its implementation): one of its invariants is that the total write space size plus the total read space size is a constant (i.e. if you have a circular buffer of N items, the number of items you can write when the buffer is empty is N, the number of items you can read when it is full is also N, and the if there are X items to read, you can still write N – X items). That constant is the capacity of the circular buffer. Now say you want to insert something into the buffer. Often, you’ll first “reserve a bucket” by advancing the write pointer. If at this moment you look at the write space, and there are X items already in the buffer, you get N – X – 1 because you can’t read from the bucket you’re writing to yet, but you can’t write to it from another write either. However, once you’re done with the bucket you were writing to, you release it and it becomes available for writing – thus re-establishing the invariant.
In practice, this means that when you’re implementing a class with an invariant, private members of the class cannot count on the invariant of the class being valid when they are called, while public and protected methods can. Also, private members don’t always re-establish the invariant when done, while protected and public methods do.

Similarly, when you’re working on an abstract data structure that uses locking internally to make it thread-safe, invariants should typically be re-established before a lock is released.

Now, you may remember about the three guarantees: the

*weak, strong and no-fail guarantee*. These guarantees, formulated in terms of invariants are:- The weak guarantee:
- the operation may fail, but will re-establish all invariants if it does so, such that the state of the object remains valid
- the strong guarantee
- the operation may fail, but in that case it has no effect (and therefore the object’s invariants still hold)
- the no-fail guarantee
- the operation cannot fail

You’ll remember that the weak guarantee is a minimum guarantee, so constructors must also obey it. I single out constructors because this means two-part construction (call a constructor, then call and initialization function) is generally a bad idea because the initialization function is typically used to establish the invariants for the first time.

### What is a predicate?

A predicate is a function that returns a boolean. Usually, in C++, we use function objects (functors) for this:

struct Pred : unary_function< C, bool > { bool operator()(C c) const { return /* something or other w.r.t. c */ } }; |

Sorted ADTs usually use binary predicates (predicates that take two arguments) for their sorting – i.e. to compare two values to each other. Our heap will do that too.

### What is strict weak ordering?

See this explanation for an exact definition. Basically, it means that a predicate that provides this ordering provides the same kind of behavior as less-than does on numbers.

### What is a binary tree?

A binary tree is a tree of which each node (except the leaf nodes) have (at most) two children. Trees like this are very common in computing because they are fairly easy to work with. Among other things, they can be stored in a contiguous array in an “Ahnentafel list”. This is a feature we will be using in our implementation as well. Another feature is that from any node, a direct path to the root is at most hops, where n is the number of nodes in the tree. This is called the *depth* of the tree. Due to these two advantages, the heap we are about to implement will have complexity for insertion and for removing an item from the heap, but, due to the invariants we will put in place, search will be in the worst case.

### Putting it all together: the invariants of a heap’s binary tree

In the binary tree we’ll be implementing, each node will hold a value and that value will be smaller than the values of each if the node’s child nodes. I.e., we might model this as follows:

template < typename T > class Node { public : Node(const T & t) : t_(t) { /* no-op */ } void checkInvariants() { assert(!lhs_ || t_ < lhs_->t_); assert(!rhs_ || t_ < rhs_->t_); } private : T t_; Node< T > * lhs_; Node< T > * rhs_; }; |

Another invariant is that if a node has a right-hand-side node, it also has a left-hand-side node:

@@ -10,6 +10,7 @@ { assert(!lhs_ || t_ < lhs_->t_); assert(!rhs_ || t_ < rhs_->t_); + assert(!rhs_ || lhs_); } private : |

This latter invariant allows us to efficiently store the tree in an array – i.e. without losing any space. `T`

, in the case of our first implementation, will be an integer.

## Implementing the Heap

Like I mentioned above, the heap will be implemented as an array. We will need to know the number of elements in the array for almost any operation we might want to perform on the heap as well, so we will wrap the thing into a structure like this:

typedef struct Heap_struct { int values_[HEAP_MAX_SIZE]; unsigned int end_; } Heap; |

As with anything in C, this structure will need to be initialized and, possibly, finalized, so the first two functions of our API are straight-forward:

int Heap_init(Heap * heap) { heap->end_ = 0; return HEAP_S_OK; } void Heap_fini(Heap * heap) { /* no-op */ } |

Our API will consist of three other functions for now:

int Heap_push(Heap * heap, int value); int Heap_pop(Heap * heap, int * value); int Heap_find(Heap * heap, int value); |

We’ll add other functions later.

### Adding unit tests

As we’re writing an abstract data type, we really need to start testing it right away, and make sure our tests both cover as much as possible of our code, and don’t fail when we add new features to the code. We’ll be using Check for this, as we’ll be programming in C. Here’s what those tests look like for now:

#ifdef USING_CHECK Suite * createTestSuite() { Suite * s = suite_create("Heap"); TCase * tc = tcase_create("Heap"); suite_add_tcase(s, tc); return s; } int main() { int fail_count; Suite * s = createTestSuite(); SRunner * sr = srunner_create(s); srunner_run_all(sr, CK_NORMAL); fail_count = srunner_ntests_failed(sr); srunner_free(sr); return (fail_count == 0) ? EXIT_SUCCESS : EXIT_FAILURE; } #endif |

### Navigating the tree

Like I said before, we will be using an array to store our tree, which means that nodes are found as indices in the array. The root is at 0, and the left-hand-side child of any node `N`

is found by `(N * 2) + 1`

; the right hand side by `(N * 2) + 2`

; the parent by `(N - 1) / 2`

. Let’s take a look at the code that that gives:

#define Heap_nodeParent(h,n) (((n) - 1) / 2) #define Heap_nodeLhs(h,n) (((n) * 2) + 1) #define Heap_nodeRhs(h,n) (((n) * 2) + 2) #define Heap_nodeSibling(h,n) (((n) % 2) ? ((n) + 1) : ((n) - 1)) |

There are two things you’ll perhaps note here: one is that these are macros, the second is that they each take a parameter they won’t actually use. The reason for using macros is that there’s really no reason to use functions for this – at least not in C. In C++, these would have been private methods of a class. The reason for the first parameter is that by convention, I pass the object being worked on as the first parameter to any function or anything that looks like a function – which would include macros. There’s no cost to this (except perhaps a bit of typing) and the benefit is that coding becomes more predictable. For the same reason, anything that looks like a function follows the same naming convention as a function, so my macros are not necessary all in uppercase.

So, right now, heap.h looks like this:

/* heap.h: a generic heap implementation in C * Copyright (C) 2010 Ronald Landheer-Cieslak * * This program is free software: you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by * the Free Software Foundation, either version 3 of the License, or * (at your option) any later version. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU General Public License for more details. * * You should have received a copy of the GNU General Public License * along with this program. If not, see <http://www.gnu.org/licenses/>. */ #ifndef vlinder_heap_h #define vlinder_heap_h #define HEAP_MAX_SIZE 64 #define HEAP_S_OK 0x00000000 #define HEAP_E_FULL 0x80000000 #define HEAP_E_EMPTY 0x80000001 #define HEAP_RESULT_IS_ERROR(s) ((s) & 0x80000000) typedef struct Heap_struct { int values_[HEAP_MAX_SIZE]; unsigned int end_; } Heap; int Heap_init(Heap * heap); void Heap_fini(Heap * heap); int Heap_push(Heap * heap, int value); int Heap_pop(Heap * heap, int * value); int Heap_find(Heap * heap, int value); #endif |

and heap.c looks like this:

/* heap.c: a generic heap implementation in C * Copyright (C) 2010 Ronald Landheer-Cieslak * * This program is free software: you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by * the Free Software Foundation, either version 3 of the License, or * (at your option) any later version. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU General Public License for more details. * * You should have received a copy of the GNU General Public License * along with this program. If not, see <http://www.gnu.org/licenses/>. */ #include "heap.h" #ifdef USING_CHECK #include <check.h> #endif #define Heap_nodeParent(h,n) (((n) - 1) / 2) #define Heap_nodeLhs(h,n) (((n) * 2) + 1) #define Heap_nodeRhs(h,n) (((n) * 2) + 2) #define Heap_nodeSibling(h,n) (((n) % 2) ? ((n) + 1) : ((n) - 1)) int Heap_init(Heap * heap) { heap->end_ = 0; return HEAP_S_OK; } void Heap_fini(Heap * heap) { /* no-op */ } #ifdef USING_CHECK Suite * createTestSuite() { Suite * s = suite_create("Heap"); TCase * tc = tcase_create("Heap"); suite_add_tcase(s, tc); return s; } int main() { int fail_count; Suite * s = createTestSuite(); SRunner * sr = srunner_create(s); srunner_run_all(sr, CK_NORMAL); fail_count = srunner_ntests_failed(sr); srunner_free(sr); return (fail_count == 0) ? EXIT_SUCCESS : EXIT_FAILURE; } #endif |

As you can see, push, pop and find aren’t implemented yet, so we will start with find, which is perhaps the easiest to understand, though also the most complex, of the three.

As of here, code will be presented as patches whenever that makes sense.

### Searching the heap

In order to search in the heap, we have to implement Heap_find. We’ll walk through the code bit by bit:

@@ -14,14 +14,30 @@ * You should have received a copy of the GNU General Public License * along with this program. If not, see <http://www.gnu.org/licenses/>. */ #include "heap.h" +#include <assert.h> #ifdef USING_CHECK #include <check.h> +#include <stdlib.h> #endif #define Heap_nodeParent(h,n) (((n) - 1) / 2) #define Heap_nodeLhs(h,n) (((n) * 2) + 1) #define Heap_nodeRhs(h,n) (((n) * 2) + 2) #define Heap_nodeSibling(h,n) (((n) % 2) ? ((n) + 1) : ((n) - 1)) +#define Heap_nodeIsLeaf(h, n) ((h)->end_ <= Heap_nodeLhs(h, n)) +#define Heap_nodeIsRhs(h, n) (((n) % 2) == 0) +#define Heap_nodeEq(h,n,v) ((h)->values_[n] == v) +#define Heap_nodeLessV(h,n,v) ((h)->values_[n] < v) +#define Heap_capacity(h) (sizeof((h)->values_) / sizeof((h)->values_[0])) |

These macros allow us to query the heap and its nodes. As you can see, these are very dependent on the stored values being in an array of integers right now. We will revisit this later when the algorithmic parts are all set up.

The interesting part is this:

+ +static unsigned char Heap_depth(Heap * heap) +{ + unsigned int heap_size = Heap_capacity(heap); + unsigned char depth = 1; + while (heap_size /= 2) ++depth; + + return depth; +} int Heap_init(Heap * heap) { |

This bit calculates the depth of the tree, which is where n is the number of nodes. We use this to check (in an assertion) that the stack we use to track our progress through the tree is large enough.

Now for the find function itself: I’ll put the whole code here first, then an annotated version.

@@ -33,12 +50,92 @@ int Heap_init(Heap * heap) void Heap_fini(Heap * heap) { /* no-op */ } +int Heap_find(Heap * heap, int val) +{ + unsigned int root = 0, done = 0, retval = heap->end_; + int stack[HEAP_MAX_DEPTH], stackp = 0; + assert(Heap_depth(heap) <= (sizeof(stack) / sizeof(stack[0]))); + + if (root != heap->end_) while (!done) + { + if (Heap_nodeEq(heap, root, val)) + { + done = 1; + retval = root; + } + else if (!Heap_nodeIsLeaf(heap, root) && + Heap_nodeEq(heap, Heap_nodeLhs(heap, root), val)) + { + retval = Heap_nodeLhs(heap, root); + done = 1; + } + else if (!Heap_nodeIsLeaf(heap, root) && + Heap_nodeLessV(heap, Heap_nodeLhs(heap, root), val)) + { + stack[stackp++] = root; + root = Heap_nodeLhs(heap, root); + } + else if (!Heap_nodeIsRhs(heap, root) && + Heap_nodeSibling(heap, root) < heap->end_) + { // we're in a leaf node, visit the sibling, if one is there + root = Heap_nodeSibling(heap, root); + } + else if (stackp == 0) + { + done = 1; + } + else // go up the stack + { + do + { + root = stack[--stackp]; + } while (root != 0 && (Heap_nodeRhs(heap, root) > heap->end_)); + root = Heap_nodeRhs(heap, root); + } + } + + return retval != heap->end_; +} + #ifdef USING_CHECK +START_TEST(Test_Heap_find) +{ + int i, j; + Heap * heap = calloc(1, sizeof(Heap)); + Heap_init(heap); + for (i = 0; i < HEAP_MAX_SIZE; ++i) + { // a linear sorted set of values is a valid heap, which is good + // because that means we can test with that. + heap->values_[i] = i + 1; + } + // note we haven't changed end_ yet, so the heap is still empty + // k = 0; + for (i = 0; i < (HEAP_MAX_SIZE + 1); ++i) + { + heap->end_ = i; + for (j = 0; j < HEAP_MAX_SIZE; ++j) + { + if (j < i) + { + fail_unless(Heap_find(heap, j + 1)); + } + else + { + fail_if(Heap_find(heap, j + 1)); + } + } + } + Heap_fini(heap); + free(heap); +} +END_TEST + Suite * createTestSuite() { Suite * s = suite_create("Heap"); TCase * tc = tcase_create("Heap"); + tcase_add_test(tc, Test_Heap_find); suite_add_tcase(s, tc); return s; |

Of course, this chunk includes the unit test. The test basically creates a heap without using the insert function, which hasn’t been written yet, and loops over it to find all possible values.

Now let’s walk through the code. The function returns a boolean: non-zero if the value is found, 0 if it isn’t.

int Heap_find(Heap * heap, int val) { unsigned int root = 0, done = 0, retval = heap->end_; int stack[HEAP_MAX_DEPTH], stackp = 0; assert(Heap_depth(heap) <= (sizeof(stack) / sizeof(stack[0]))); |

We use the stack to track the parent we were in. Though we could do that b just calculating the parent of the current node, it also allows us to easily see when we’re done, and it provides information when debugging.

Stacks aren’t all that interesting w.r.t. the algorithms: pushing and popping are both and they’re done in LIFO (last-in first-out) order. This particular implementation is just an array and a pointer (or rather: index) into it. Other algorithms are certainly possible, but the basic idea of a stack is that you can adapt almost any non-sorting linear container to become a stack, which is definitely useful.

if (root != heap->end_) |

Note this clause: we don’t start finding in an empty heap. If we would, the first thing we’d do would be to compare the value at the root with the one we’re looking for – which could result in unexpected behavior if the heap did not contain only integers.

while (!done) { if (Heap_nodeEq(heap, root, val)) { |

Right now, we check for equality. Later, we will check for equivalence in stead. If the value at the root of the current sub-tree is equal (or later: equivalent) to the one we’re looking for, we’re done.

done = 1; retval = root; } else if (!Heap_nodeIsLeaf(heap, root) && Heap_nodeEq(heap, Heap_nodeLhs(heap, root), val)) { |

We take a bit of a short-cut here: if the left-hand-side child node is equal (later: equivalent) to what we’re looking for, we return it immediately. The reason why this is a short cut is that otherwise, we’d have to descend into the left-hand-side subtree if the left hand side is not greater than the value we’re looking for. It comes at the potential cost of an extra comparison, though. We could code an alternate version with one less comparison, if less-than comparison were expensive. It isn’t for integers, and shouldn’t be for any valid key.

retval = Heap_nodeLhs(heap, root); done = 1; } else if (!Heap_nodeIsLeaf(heap, root) && Heap_nodeLessV(heap, Heap_nodeLhs(heap, root), val)) { |

When we get this far, we already know that the left-hand-side leaf is not equal, but it might be less than the value we’re looking for, in which case the value we are looking for might be in one of its children. In that case, we push the current root onto the stack and “dive” into the subtree.

stack[stackp++] = root; root = Heap_nodeLhs(heap, root); } else if (!Heap_nodeIsRhs(heap, root) && Heap_nodeSibling(heap, root) < heap->end_) |

From a left-hand-side leaf node we go to the right, to see if the value is there. Note that `Heap_nodeSibling`

can move either way, so we need to know where we are before we call it, or we will get stuck here.

{ // we're in a leaf node, visit the sibling, if one is there root = Heap_nodeSibling(heap, root); } else if (stackp == 0) { |

If we’ve gotten this far and we don’t have a stack to climb up in, we’re at the root of the tree. That means we’re done.

done = 1; } else // go up the stack { |

Otherwise, we start going up the stack and going right.

do { root = stack[--stackp]; } while (root != 0 && (Heap_nodeRhs(heap, root) > heap->end_)); root = Heap_nodeRhs(heap, root); } } return retval != heap->end_; } |

There’s something missing in this code – something our unit test didn’t pick up. We’ll find it shortly, but don’t use this code as-is just yet!

Now, the tree we’ve created in the test looks like this (in ASCII art):

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64

As you can see, the tree is balanced, but not complete. It would have been the result of multiple calls to insert if insert were written and called with these numbers in sorted order.

Now, let’s say we use ,code>Heap_find to find number 35 in this heap. It would go over the following nodes, in this order: 1, 2, 4, 8, 16 32, 64, 33, 17, 34, 35. That’s 11 steps – which is better than 35 steps which would have been a linear search. In fact, the works-cast scenario would be trying to find 63 in this heap, because it would actually be slightly worse than linear: in a linear search, it would have been found after 63 steps. In our case, it’s found after 64 steps because it’s the last leaf we look at – and 64 is one of the first leafs we look at.

You can also see that we’ve been a bit pessimistic when we allocated our stack: we will actually never have more than 5 roots on the stack (1, 2, 8, 16, 32) but we have seven spaces in the stack. There are two reasons for this: one is that the Heap_depth function is pessimistic: it returns the log2 of the power of two above the given value, so it will return one higher than necessary in our case ( but the function returns 7 because the power of two above 64 is 128). A simple patch can fix that:

@@ -34,7 +34,7 @@ static unsigned char Heap_depth(Heap * heap) { unsigned int heap_size = Heap_capacity(heap); - unsigned char depth = 1; + unsigned char depth = 0; while (heap_size /= 2) ++depth; return depth; |

The second reason is that we never put the entire path on the stack: we only put the parent nodes on the stack when we’ve dived into the left-hand-side child of the node, so the current node is never on the stack. We don’t want `Heap_depth`

to reflect that, though, so if we want our stack to be less pessimistic, we have to fix the assertion:

@@ -54,7 +54,7 @@ int Heap_find(Heap * heap, int val) { unsigned int root = 0, done = 0, retval = heap->end_; int stack[HEAP_MAX_DEPTH], stackp = 0; - assert(Heap_depth(heap) <= (sizeof(stack) / sizeof(stack[0]))); + assert(Heap_depth(heap) - 1 <= (sizeof(stack) / sizeof(stack[0]))); if (root != heap->end_) while (!done) {

### Fixing the bug

Like I said, there’s a bug in the current version of the code that didn’t get caught by our unit test – even though the coverage of the unit test couldn’t be much better and it did perform 4160 iterations, which is a respectable amount of iterations for something like this. However, when describing the code (above), I left something out that may or may not have caught your attention. If it didn’t, read the (annotated) code again before reading on.

The point of this little exercise is to show you that, though unit tests are good and will help you a lot when developing your own code, you should not forget to analyze your code thoroughly and think it through before putting it in production anywhere: you should put as much (relative) effort into your testing as you do in your development, if your testing is to be of the same quality as your development – otherwise, your development will turn out to be of the same quality as your testing.

Let’s take a look at a test that will bring out the problem:

@@ -97,7 +97,7 @@ int Heap_find(Heap * heap, int val) } #ifdef USING_CHECK -START_TEST(Test_Heap_find) +START_TEST(Test_Heap_find_01) { int i, j; Heap * heap = calloc(1, sizeof(Heap)); @@ -129,12 +129,55 @@ START_TEST(Test_Heap_find) } END_TEST +START_TEST(Test_Heap_find_02) +{ + int i, j; + static int heap_values[HEAP_MAX_SIZE] = { + 1, + 3, 2, + 5, 4, 7, 6, + 9, 8, 11, 10, 13, 12, 15, 14, + 17, 16, 19, 18, 21, 20, 23, 22, 25, 24, 27, 26, 29, 28, 31, 30, + 33, 32, 35, 34, 37, 36, 39, 38, 41, 40, 43, 42, 45, 44, 47, 46, 49, 48, 51, 50, 53, 52, 55, 54, 57, 56, 59, 58, 61, 60, 63, 62, +64, + }; + Heap * heap = calloc(1, sizeof(Heap)); + Heap_init(heap); + for (i = 0; i < HEAP_MAX_SIZE; ++i) + { // a linear sorted set of values is a valid heap, which is good + // because that means we can test with that. + heap->values_[i] = heap_values[i]; + } + // note we haven't changed end_ yet, so the heap is still empty + k = 0; + for (i = 0; i < (HEAP_MAX_SIZE + 1); ++i) + { + heap->end_ = i; + for (j = 0; j < HEAP_MAX_SIZE; ++j) + { + if (j < i) + { + int rv = Heap_find(heap, heap_values[j]); + fail_unless(rv); + } + else + { + fail_if(Heap_find(heap, heap_values[j])); + } + } + } + Heap_fini(heap); + free(heap); +} +END_TEST + Suite * createTestSuite() { Suite * s = suite_create("Heap"); TCase * tc = tcase_create("Heap"); - tcase_add_test(tc, Test_Heap_find); + tcase_add_test(tc, Test_Heap_find_01); + tcase_add_test(tc, Test_Heap_find_02); suite_add_tcase(s, tc); return s; |

As you can see, this test implements a heap that is designed to bring out the error: if you were to run the code with this test, the test would fail when trying to find the value 2 in the tree. The reason for that is that the find algorithm is missing a crucial step:

@@ -25,6 +25,7 @@ #define Heap_nodeRhs(h,n) (((n) * 2) + 2) #define Heap_nodeSibling(h,n) (((n) % 2) ? ((n) + 1) : ((n) - 1)) #define Heap_nodeIsLeaf(h, n) ((h)->end_ <= Heap_nodeLhs(h, n)) +#define Heap_nodeHasRhs(h, n) ((h)->end_ > Heap_nodeRhs(h, n)) #define Heap_nodeIsRhs(h, n) (((n) % 2) == 0) #define Heap_nodeEq(h,n,v) ((h)->values_[n] == v) #define Heap_nodeLessV(h,n,v) ((h)->values_[n] < v) @@ -74,6 +76,17 @@ int Heap_find(Heap * heap, int val) stack[stackp++] = root; root = Heap_nodeLhs(heap, root); } + else if (Heap_nodeHasRhs(heap, root) && + Heap_nodeEq(heap, Heap_nodeRhs(heap, root), val)) + { + retval = Heap_nodeRhs(heap, root); + done = 1; + } + else if (Heap_nodeHasRhs(heap, root) && + Heap_nodeLessV(heap, Heap_nodeRhs(heap, root), val)) + { + root = Heap_nodeRhs(heap, root); + } else if (!Heap_nodeIsRhs(heap, root) && Heap_nodeSibling(heap, root) < heap->end_) { // we're in a leaf node, visit the sibling, if one is there |

This chunk of code checks the right-hand-side child node of the current node and handles the case where the value on the left-hand side of the node is greater than the one we’re looking for, but the value on the right-hand side isn’t.

If we take a close look at our invariants, we can see why this is important; the node invariant says that any given node’s value should be smaller than the values of each of its children (if it has children). It *doesn’t* say that the left-hand-side child should also be smaller than the right-hand-side child. The tree in our new test is a valid one, and inverts each of the values of adjacent children w.r.t. the tree in the first test. It brings this bug to light.

### Pushing onto the heap

The algorithm to push a value onto the heap is pretty straight-forward: we use what’s called a “sift-up” method in that we place the new value at a leaf in the tree and move it up (by swapping it with its parent) until the node invariant holds for the node it is in. An alternative method would have been to find a place to put it by “trickling down” from the root but because of the way we store our heap – in an array – we already know where the left-most leaf node is (it’s at the end of the array: `heap->values_[hea->end_]`

), which makes the sift-up method a lot easier to implement.

Let’s take a look at the code:

int Heap_push(Heap * heap, int value) { int retval = HEAP_S_OK; if (heap->end_ == Heap_capacity(heap)) { retval = HEAP_E_FULL; } else { int curr = heap->end_++; heap->values_[curr] = value; while ((curr != 0)) { if (Heap_nodeLess(heap, curr, Heap_nodeParent(heap, curr))) { Heap_nodeSwap(heap, curr, Heap_nodeParent(heap, curr)); curr = Heap_nodeParent(heap, curr); } else { break; } } } #ifndef NDEBUG Heap_checkInvariants(heap); #endif return retval; } |

Of course, the very first thing the function checks is whether it’s actually possible to push anything on the heap – which it isn’t if the heap is full. If it is full, it will return a valuye indicating that it is and do nothing more. Otherwise, it will put the new value at the end of the heap (which is the left-most available leaf node) and swap it up until it is either at the root, or its parent is smaller than it is.

Simple though this algorithm may be, it is useful to add unit tests for it. One test – a “smoke test” if you will – is to push random values onto the heap and make sure all of them can be found in the heap when done. It can be rather tedious to write such a smoke test, especially if you want more than one to be run (otherwise, how do you know that it’s not just the random set of values you chose that happened to work?) but writing a script that generates the test case is easy enough: the following script generates 64 test cases, each with random values to put in the heap.

#! /bin/bash rm -f heap.val.in heap.val.tcase.in for ((i = 0; i < 64; ++i)); do echo "#include \"heap.val.${i}.in\"" >> heap.val.in echo " tcase_add_test(tc, Test_Heap_push_rnd${i});" >> heap.val.tcase.in cat > heap.val.${i}.in <<eof START_TEST(Test_Heap_push_rnd${i}) { Heap * heap = calloc(1, sizeof(*heap)); int i; int heap_values[HEAP_MAX_SIZE] = { EOF dd bs=4 count=64 if=/dev/random of=tt od -A n -t x4 tt | sed -e 's/^ / 0x/' | sed -e 's/ /, 0x/g' | sed -e 's/$/,/g' >> heap.val.${i}.in cat >> heap.val.${i}.in <<eof }; Heap_init(heap); for (i = 0; i < HEAP_MAX_SIZE; ++i) { fail_unless(Heap_push(heap, heap_values[i]) == HEAP_S_OK); } // pushing anything in here now should fail fail_unless(Heap_push(heap, 0) == HEAP_E_FULL); for (i = 0; i < HEAP_MAX_SIZE; ++i) { fail_unless(Heap_find(heap, heap_values[i])); } // Heap_dump(heap, stdout); Heap_fini(heap); free(heap); } END_TEST EOF done |

This is what one of those tests looks like:

START_TEST(Test_Heap_push_rnd54) { Heap * heap = calloc(1, sizeof(*heap)); int i; int heap_values[HEAP_MAX_SIZE] = { 0x3196a3b8, 0xa53e4989, 0x23b36d80, 0xd324e8a7, 0x73d71b10, 0xd5dea692, 0x2faf7ef1, 0x06515cb3, 0xbd8affd4, 0x0eada2a5, 0xf068d6da, 0x5d720625, 0x8077b60c, 0xdbd1fffd, 0x9f357c8c, 0x38ef64eb, 0x4a1e1513, 0x891a7be9, 0xd738d651, 0xa715584c, 0x223a4a78, 0xd02354a7, 0x503c4a3d, 0x5c5deb30, 0xdff69472, 0xa5effd60, 0x2584ef20, 0x41f7f45b, 0x846633f2, 0xf7bce84f, 0x70f1f224, 0xbc10b2ff, 0xc1229e66, 0x67f83a40, 0xa7ef445c, 0xa25d9b97, 0x2b86dea3, 0xb2b37e57, 0x450305ad, 0x3e194d93, 0xe1422d07, 0xaa3c37f6, 0x92540227, 0x849b216a, 0x18958218, 0xea89e7b1, 0x94b2a9e8, 0x7a244559, 0x450879bd, 0x029059c9, 0xc121cb75, 0x73c21ced, 0x09bba9f5, 0xdc1b66fc, 0xb8381cc8, 0xb7484b98, 0xcc60b798, 0xc18653ef, 0x758ad5cc, 0x6c8e7d1d, 0x54378939, 0x1562c20b, 0xe6c64b6d, 0xeb86a57b, }; Heap_init(heap); for (i = 0; i < HEAP_MAX_SIZE; ++i) { fail_unless(Heap_push(heap, heap_values[i]) == HEAP_S_OK); } // pushing anything in here now should fail fail_unless(Heap_push(heap, 0) == HEAP_E_FULL); for (i = 0; i < HEAP_MAX_SIZE; ++i) { fail_unless(Heap_find(heap, heap_values[i])); } // Heap_dump(heap, stdout); Heap_fini(heap); free(heap); } END_TEST |

These tests are generated by the Makefile if they don’t exist.

Like I said, these are smoke tests. We also want to know whether the layout of the tree is as expected, which is why I wrote a function called `Heap_dump`

to manually inspect the contents of a tree (I won’t put the code of that here – it’s a simple for loop). Also, at the end of each execution of the push function, the invariants of the nodes are checked to make sure the heap is still valid. The `Heap_checkInvariants`

function looks like this:

static void Heap_checkInvariants(Heap * heap) { int curr; for (curr = 0; curr < heap->end_; ++curr) { assert(Heap_nodeIsLeaf(heap, curr) || Heap_nodeLess(heap, curr, Heap_nodeLhs(heap, curr))); assert(Heap_nodeHasRhs(heap, curr) || Heap_nodeLess(heap, curr, Heap_nodeRhs(heap, curr))); } } |

Of course, there is no need to follow the tree to check the invariants, as we already know where all the nodes are.

### Popping from the heap

The algorithm we use for popping from the heap is a trickle-down algorithm: we swap the value at the root of the heap with the left-most leaf (which, again, is easy to find when you’re storing the tree in an array as wel are) and trickle what was in the leaf down the tree until we find its final resting place. Here’s the code:

int Heap_pop(Heap * heap, int * value) { int root, child; if (heap->end_ == 0) { return HEAP_E_EMPTY; } else { /* heap is not empty */ } Heap_nodeSwap(heap, 0, heap->end_ - 1); --heap->end_; root = 0; while (Heap_nodeLhs(heap, root) < heap->end_) { child = Heap_nodeLhs(heap, root); if ((Heap_nodeSibling(heap, child) < heap->end_) && Heap_nodeLess( heap, Heap_nodeSibling(heap, child), child)) { child = Heap_nodeSibling(heap, child); } else { /* go left */ } if (Heap_nodeLess(heap, child, root)) { Heap_nodeSwap(heap, child, root); root = child; } else { break; } } *value = heap->values_[heap->end_]; return HEAP_S_OK; } |

Of course, this code comes with its own tests. Those tests are generated like the others. There’s no real need for hand-written tests as we’d just be testing the same thing: regardless of the order in which items are put in the heap, we want them to come out in ascending order. The generating script now looks like this:

#! /bin/bash rm -f heap.val.in heap.val.tcase.in for ((i = 0; i < 64; ++i)); do echo "#include \"heap.val.${i}.in\"" >> heap.val.in echo " tcase_add_test(tc, Test_Heap_push_rnd${i});" >> heap.val.tcase.in echo " tcase_add_test(tc, Test_Heap_pop_rnd${i});" >> heap.val.tcase.in cat > heap.val.${i}.in <<eof #line 9 "mkvals.sh" START_TEST(Test_Heap_push_rnd${i}) { Heap * heap = calloc(1, sizeof(*heap)); int i; int heap_values[HEAP_MAX_SIZE] = { EOF dd bs=4 count=64 if=/dev/random of=tt od -A n -t x4 tt | sed -e 's/^ / 0x/' | sed -e 's/ /, 0x/g' | sed -e 's/$/,/g' >> heap.val.${i}.in cat >> heap.val.${i}.in <<eof #line 19 "mkvals.sh" }; Heap_init(heap); for (i = 0; i < HEAP_MAX_SIZE; ++i) { fail_unless(Heap_push(heap, heap_values[i]) == HEAP_S_OK); } // pushing anything in here now should fail fail_unless(Heap_push(heap, 0) == HEAP_E_FULL); for (i = 0; i < HEAP_MAX_SIZE; ++i) { fail_unless(Heap_find(heap, heap_values[i])); } // Heap_dump(heap, stdout); Heap_fini(heap); free(heap); } END_TEST EOF cat >> heap.val.${i}.in <<eof #line 41 "mkvals.sh" START_TEST(Test_Heap_pop_rnd${i}) { Heap * heap = calloc(1, sizeof(*heap)); int i, prev, curr; int heap_values[HEAP_MAX_SIZE] = { EOF dd bs=4 count=64 if=/dev/random of=tt od -A n -t x4 tt | sed -e 's/^ / 0x/' | sed -e 's/ /, 0x/g' | sed -e 's/$/,/g' >> heap.val.${i}.in cat >> heap.val.${i}.in <<eof #line 51 "mkvals.sh" }; Heap_init(heap); for (i = 0; i < HEAP_MAX_SIZE; ++i) { fail_unless(Heap_push(heap, heap_values[i]) == HEAP_S_OK); } // pushing anything in here now should fail fail_unless(Heap_push(heap, 0) == HEAP_E_FULL); for (i = 0; i < HEAP_MAX_SIZE; ++i) { fail_unless(Heap_find(heap, heap_values[i])); } fail_unless(Heap_pop(heap, &prev) == HEAP_S_OK); for (i = 1; i < HEAP_MAX_SIZE; ++i) { fail_unless(Heap_pop(heap, &curr) == HEAP_S_OK); fail_unless(prev <= curr); prev = curr; } fail_unless(Heap_pop(heap, &curr) == HEAP_E_EMPTY); // Heap_dump(heap, stdout); Heap_fini(heap); free(heap); } END_TEST EOF done |

## Continuing from here: Predicates

With the tests and the heap in its current state, we have 132 tests (128 of which are automatically generated), all of which pass:

Running suite(s): Heap 100%: Checks: 132, Failures: 0, Errors: 0

This version is available here, but we’re not quite done yet: we have that little thing called a predicate to handle.

Up to now, we’ve used three predicates without really naming them: the macros `Heap_nodeEq`

, `Heap_nodeLess`

and `Heap_nodeLessV`

are all predicates. This really tells us only one thing, though: what our predicate should look like. I.e., it should be a function that takes two pointers to otherwise unknown objects (after all: this is C, we don’t have templates to make the type known at compile-time) and return a boolean (nonzero for true, zero for false).

The changes to our code and test cases are really – perhaps surprisingly – limited:

@@ -29,12 +29,14 @@ #define Heap_nodeHasLhs(h,n) !Heap_nodeIsLeaf(h,n) #define Heap_nodeHasRhs(h, n) (Heap_nodeRhs(h, n) < (h)->end_) #define Heap_nodeIsRhs(h, n) (((n) % 2) == 0) -#define Heap_nodeEq(h,n,v) ((h)->values_[n] == v) -#define Heap_nodeLess(h,n1,n2) ((h)->values_[n1] < (h)->values_[n2]) -#define Heap_nodeLessV(h,n,v) ((h)->values_[n] < v) +#define Heap_nodeEq(h,n,v) \ + ((!((h)->pred_((h)->values_[n], v))) && (!((h)->pred_(v, (h)->values_[n])))) +#define Heap_nodeLess(h,n1,n2) \ + ((h)->pred_((h)->values_[n1], (h)->values_[n2])) +#define Heap_nodeLessV(h,n,v) ((h)->pred_((h)->values_[n], v)) #define Heap_nodeSwap(h,n1,n2) \ { \ - int temp = (h)->values_[n1]; \ + void * temp = (h)->values_[n1]; \ (h)->values_[n1] = (h)->values_[n2]; \ (h)->values_[n2] = temp; \ } @@ -47,7 +49,7 @@ void Heap_dump(Heap * heap, FILE * out) fprintf(out, "{ "); for (i = 0; i < heap->end_; ++i) { - fprintf(out, (i == (heap->end_ - 1) ? "%d " : "%d, "), heap->values_[i]); + fprintf(out, (i == (heap->end_ - 1) ? "%p " : "%p, "), heap->values_[i]); } fprintf(out, "}\n"); } @@ -59,8 +61,12 @@ static void Heap_checkInvariants(Heap * for (curr = 0; curr < heap->end_; ++curr) { - assert(Heap_nodeIsLeaf(heap, curr) || Heap_nodeLess(heap, curr, Heap_nodeLhs(heap, curr))); - assert(!Heap_nodeHasRhs(heap, curr) || Heap_nodeLess(heap, curr, Heap_nodeRhs(heap, curr))); + assert( + Heap_nodeIsLeaf(heap, curr) || + Heap_nodeLess(heap, curr, Heap_nodeLhs(heap, curr))); + assert( + !Heap_nodeHasRhs(heap, curr) || + Heap_nodeLess(heap, curr, Heap_nodeRhs(heap, curr))); } } @@ -74,9 +80,10 @@ static unsigned char Heap_depth(Heap * h } #endif -int Heap_init(Heap * heap) +int Heap_init(Heap * heap, Heap_pred pred) { heap->end_ = 0; + heap->pred_ = pred; return HEAP_S_OK; } @@ -84,7 +91,7 @@ int Heap_init(Heap * heap) void Heap_fini(Heap * heap) { /* no-op */ } -int Heap_push(Heap * heap, int value) +int Heap_push(Heap * heap, void * value) { int retval = HEAP_S_OK; if (heap->end_ == Heap_capacity(heap)) @@ -114,7 +121,7 @@ int Heap_push(Heap * heap, int value) return retval; } -int Heap_pop(Heap * heap, int * value) +int Heap_pop(Heap * heap, void ** value) { int root, child; @@ -157,7 +164,7 @@ int Heap_pop(Heap * heap, int * value) return HEAP_S_OK; } -int Heap_find(Heap * heap, int val) +int Heap_find(Heap * heap, void * val) { unsigned int root = 0, done = 0, retval = heap->end_; int stack[HEAP_MAX_DEPTH], stackp = 0; @@ -216,15 +223,19 @@ int Heap_find(Heap * heap, int val) } #ifdef USING_CHECK +static int Test_less(void * lhs, void * rhs) +{ + return (lhs < rhs); +} START_TEST(Test_Heap_find_01) { int i, j; Heap * heap = calloc(1, sizeof(Heap)); - Heap_init(heap); + Heap_init(heap, Test_less); for (i = 0; i < HEAP_MAX_SIZE; ++i) { // a linear sorted set of values is a valid heap, which is good // because that means we can test with that. - heap->values_[i] = i + 1; + heap->values_[i] = (void*)(i + 1); } // note we haven't changed end_ yet, so the heap is still empty for (i = 0; i < (HEAP_MAX_SIZE + 1); ++i) @@ -234,11 +245,11 @@ START_TEST(Test_Heap_find_01) { if (j < i) { - fail_unless(Heap_find(heap, j + 1)); + fail_unless(Heap_find(heap, (void*)(j + 1))); } else { - fail_if(Heap_find(heap, j + 1)); + fail_if(Heap_find(heap, (void*)(j + 1))); } } } @@ -260,11 +271,11 @@ START_TEST(Test_Heap_find_02) 64, }; Heap * heap = calloc(1, sizeof(Heap)); - Heap_init(heap); + Heap_init(heap, Test_less); for (i = 0; i < HEAP_MAX_SIZE; ++i) { // a linear sorted set of values is a valid heap, which is good // because that means we can test with that. - heap->values_[i] = heap_values[i]; + heap->values_[i] = (void*)heap_values[i]; } // note we haven't changed end_ yet, so the heap is still empty for (i = 0; i < (HEAP_MAX_SIZE + 1); ++i) @@ -274,12 +285,12 @@ START_TEST(Test_Heap_find_02) { if (j < i) { - int rv = Heap_find(heap, heap_values[j]); + int rv = Heap_find(heap, (void*)heap_values[j]); fail_unless(rv); } else { - fail_if(Heap_find(heap, heap_values[j])); + fail_if(Heap_find(heap, (void*)heap_values[j])); } } } @@ -294,16 +305,16 @@ START_TEST(Test_Heap_push_01) Heap * heap = calloc(1, sizeof(*heap)); int i; - Heap_init(heap); + Heap_init(heap, Test_less); for (i = 0; i < HEAP_MAX_SIZE; ++i) { - fail_unless(Heap_push(heap, i + 1) == HEAP_S_OK); + fail_unless(Heap_push(heap, (void*)(i + 1)) == HEAP_S_OK); } // pushing anything in here now should fail fail_unless(Heap_push(heap, 0) == HEAP_E_FULL); for (i = 0; i < HEAP_MAX_SIZE; ++i) { - fail_unless(heap->values_[i] == i + 1); + fail_unless(heap->values_[i] == (void*)(i + 1)); } Heap_fini(heap); free(heap); @@ -325,16 +336,16 @@ START_TEST(Test_Heap_push_02) 64 }; - Heap_init(heap); + Heap_init(heap, Test_less); for (i = 0; i < HEAP_MAX_SIZE; ++i) { - fail_unless(Heap_push(heap, HEAP_MAX_SIZE - i) == HEAP_S_OK); + fail_unless(Heap_push(heap, (void*)(HEAP_MAX_SIZE - i)) == HEAP_S_OK); } // pushing anything in here now should fail fail_unless(Heap_push(heap, 0) == HEAP_E_FULL); for (i = 0; i < HEAP_MAX_SIZE; ++i) { - fail_unless(heap->values_[i] == heap_values[i]); + fail_unless(heap->values_[i] == (void*)heap_values[i]); } Heap_fini(heap); free(heap); |

Our predicate-wielding macros have become a bit more complicated because they call functions now, and the “equals” macro has become an “equivalent” macro (predicate fails on the two values in both directions), but that’s really all there is to it.

## Final details: duplicate values

There is one thing we haven’t explicitly tested yet, but we should: we haven’t tried putting two or more equivalent values on the heap. Our random tests could conceivably run into the case from time to time, but if the system’s random number generator (in /dev/random) is any good, the likelihood of that happening is pretty slim (one in about sixty-eight million). So, what happens if we do this:

START_TEST(Test_heapPush_duplicates) { Heap * heap = calloc(1, sizeof(*heap)); int i; Heap_init(heap, Test_less); for (i = 0; i < HEAP_MAX_SIZE; ++i) { fail_unless(Heap_push(heap, 0) == HEAP_S_OK); } // pushing anything in here now should fail fail_unless(Heap_push(heap, 0) == HEAP_E_FULL); for (i = 0; i < HEAP_MAX_SIZE; ++i) { fail_unless(heap->values_[i] == 0); } for (i = 0; i < HEAP_MAX_SIZE; ++i) { void * val; fail_unless(Heap_pop(heap, &val) == HEAP_S_OK); fail_unless(val == 0); } fail_unless(Heap_pop(heap, 0) == HEAP_E_EMPTY); Heap_fini(heap); free(heap); } END_TEST |

The answer is this:

Running suite(s): Heap assertion "Heap_nodeIsLeaf(heap, curr) || Heap_nodeLess(heap, curr, Heap_nodeLhs(heap, curr))" failed: file "heap.c", line 66, function: Heap_checkInvariants 99%: Checks: 133, Failures: 0, Errors: 1 heap.c:365:E:Heap:Test_heapPush_duplicates:0: (after this point) Received signal 6 (Aborted)

meaning one of the invariants failed.

There are two ways to handle this: either we change the invariant to allow duplicate values, in which case the invariant becomes “the parent is smaller than *or equivalent to* each of its children”, or we detect the duplicate value when it’s inserted into the heap and fail at that point, with an error. In order to decide which one we’ll do, we’ll consider three things: the requirements our heap has to meet, the risk of not meeting those requirements in either implementation, and the consequences of both implementations if executed correctly.

In terms of requirements, the basic requirements for our heap are pretty straight-forward: we want a push onto the heap, and a pop from the heap, to have complexity, and a search in the heap to have complexity on average, and complexity in the worst case. In our current implementation, pops are slightly more expensive that pushes because pushes only perform one comparison per node encountered, and comparisons in the worst case. The pop performs two comparisons per node (one with the right-hand-side child, one with the left-hand-side child) and comparisons in the worst case. The find algorithm does three comparisons in the worst case, but may go through the whole tree before it gives up, so it’s in the worst case. This is still within our complexity requirements, but we should avoid making it any more expensive if we can.

In terms of guarantees, we need both push and pop to give the strong guarantee: neither should break the invariants and neither should have any effect on the visible state of the heap if they fail: a pop should always either remove a single object from the heap – and that object should be the smallest object on the heap – or fail if the heap is empty; and a push should always either put a single object on the heap or fail if the heap is full. Find should never fail, nor change the state of the heap (nor break any invariants).

Let’s consider the way the push algorithm works: it puts the new value in the left-most vacant leaf and propagates it up the tree toward to root. In doing so, it only considers the nodes on the path from the left-most vacant leaf to the root, disregarding any other nodes. This is why the complexity of push is . The most naive implementation of push that would attempt to detect duplicate values would simply detect a duplicate value on that path an report an error if one is found, rolling back the changes it made up to that point. Such an implementation would retain its complexity – but would it reliably detect duplicates? Let’s consider the following bit of code (and imagine that we’re using `int`

s in stead of `void*`

s again):

Heap_push(heap, 0); Heap_push(heap, 1); Heap_push(heap, 1); |

This would construct a tree with three nodes: zero at the root, one in each of its children. More importantly, this would not fail because the second time we insert 1, the left-most vacant leaf is the right-hand-side leaf of the root node, so the algorithm would only compare with the root. If we’d want it to compare with the other node containing 1, we’d have to compare with the new node’s sibling as well. This is all fine and dandy, until the tree becomes a bit more complex, e.g. by inserting the following values in an empty heap:

Heap_push(heap, 0); Heap_push(heap, 10); Heap_push(heap, 12); Heap_push(heap, 20); Heap_push(heap, 15); Heap_push(heap, 17); Heap_push(heap, 15);

This results in the following tree:

0 10 12 20 15 17 15

Note that this tree is still perfectly valid according to the current invariants, but would cease to be so at the first pop, which would result in the following tree:

10 15 12 20 15 17

According to our current invariants, this tree is not valid, because the upper node with value 15 has a child that is not greater than itself (i.e. it is not less than both of its children). That means that, in this case, *pop* would break the invariant – not push.

The only way to reliably avoid this would be to search the heap before inserting the new value but, as we’ve seen, searching the heap is costlier than inserting a value into it: this would make insertion . Another option that would work for integers is to detect duplicate values at the moment they are popped, by keeping the latest popped value and comparing with it before giving the value back. That won’t work for real objects, though, for two reasons: first of all, our implementation doesn’t own the objects in the heap but only has pointers to them. This could be handled by having the heap own its contents, but that would bring all sorts of problems with it, which I won’t go into right now. The second reason is that for real objects we don’t check equality, but equivalence. As the heap does not provide FIFO order for equivalent values (even in our current implementation) there is really no way to predict the order in which two equivalent values would come out of the heap, so *which* pop would be quenched by this approach would also be unpredictable – and we would still have to alter the invariants to allow for such an approach.

Suffice it to say that remaining within our performance requirements, this feature cannot be implemented reliably.

Simply not implementing the feature and leaving the state as-is is not really an option: any self-respecting implementation of an abstract data type should not delegate the responsibility of its own integrity – the validity of its internal invariants – to its users. Not changing the current state of affairs and keeping the invariants as they are would allow the client code to break the invariants simply by inserting the same value into the heap twice. Forcing the client code to detect that before they push a value onto the heap would mean forcing them to perform a find before each push, effectively creating a situation in which pushing is both error-prone and .

So, our only remaining option is to change the invariant of the heap, allowing duplicate values and, therefore, allowing equivalence as well as the less-than relationship between parent and child. This patch will do that:

@@ -63,10 +63,14 @@ static void Heap_checkInvariants(Heap * { assert( Heap_nodeIsLeaf(heap, curr) || - Heap_nodeLess(heap, curr, Heap_nodeLhs(heap, curr))); + Heap_nodeLess(heap, curr, Heap_nodeLhs(heap, curr)) || + (!Heap_nodeLess(heap, curr, Heap_nodeLhs(heap, curr)) && + !Heap_nodeLess(heap, Heap_nodeLhs(heap, curr), curr))); assert( !Heap_nodeHasRhs(heap, curr) || - Heap_nodeLess(heap, curr, Heap_nodeRhs(heap, curr))); + Heap_nodeLess(heap, curr, Heap_nodeRhs(heap, curr)) || + (!Heap_nodeLess(heap, curr, Heap_nodeRhs(heap, curr)) && + !Heap_nodeLess(heap, Heap_nodeRhs(heap, curr), curr))); } }

Now, if the client code does not want to allow duplicate values, it will have to perform a find on the heap before inserting but if it *does* want to allow duplicate values, it can simply insert them. If wants to go for the cheaper option of having duplicate values filtered at the pop-end of the heap, it can do that with full knowledge of what equivalence means (as the client code provides the predicate) and of how element life-time is handled (as that is also handled by the client code). Our heap makes no assumptions on either.

## Conclusion

We’ve created a monster!

Actually, no. We’ve created a fairly efficient implementation of a heap with all the bells and whistles, that also happens to be fairly generic. It’s written in C, but the same thing would certainly have been possible in C++ (and I might be tempted to write a version in C++ at some point). We’ve seen invariants in action, analyzed the computational complexity of our algorithm and solved a few problems along the way.

The generic version of the heap implementation is available as version 1.1.00, here.