Spiria logo.

Solving the problems with the “futures” in C++

October 15, 2020.

Today’s high core-count processors require parallel algorithms for programs to run fast. To achieve this, asynchronization and future values are not a reliable solution.

Many modern programming languages allow you to achieve this acceleration with the help of async code and future values. The basic principle behind async and futures is that called functions are run on another thread, and the return values are converted to what is called a future-value. Such future-values don’t hold a real value until the asynchronous function ends. The function runs concurrently and, when it eventually returns with a value, the future-value variable is updated behind the scene to hold that return value. No need for explicit mutex or messaging: all the synchronization between the initial call and the background thread is performed behind the scenes. When the initial thread accesses the future-value, it is automatically paused until the value is ready.

The main benefit of this design is that making a given function run asynchronously is ridiculously easy. Of course, it is still up to the programmer to make sure the function can actually be run on another thread safely, and that there are no data races; async and futures solely provide an easy way to spawn threads and receive a result.

The Non-Issues

My goal here is not to discuss how to design race-free algorithms, nor how to design data to make it easy to run-multi-threaded algorithms. I’ll merely mention that a possible way to achieve that requires avoiding all global variables and passing all data to the asynchronous function by value. This way, nothing is shared between threads and thus no race can occur.

The Issues

While async and futures make it easy to turn a function into a thread, this very simplicity is what causes problems. Simplicity means complete lack of control. You have no control over:

  • how many asynchronous functions are being run,
  • how many threads are being created to run those functions,
  • how many threads are waiting for results.

This requires a fine balancing act between maximizing processor usage versus maintaining some control. On the one hand, you want as many asynchronous functions to run as possible, to ensure the processor is fully occupied; on the other hand, you don’t want to overload the processor with too many threads.

The Solution

The best solution to this problem is to introduce some complexity. The additional complexity allows you to regain control over all of the issues listed above.

First Step: thread pool

The first step is to forego async and futures for processor usage maximization. They can still be used for starting the core of parallel algorithms, but not to create multiple threads. Instead, it is best to use a thread pool.

A thread pool gives you control the number of threads created to run parallel algorithms. You can create exactly as many threads as there are cores in the processor, ensuring the exact maximum throughput without overloading the processor.

Second Step: Work Queue

While the thread pool controls how many threads are used, it does not control how functions are run by those threads. This is the job of the work queue. Asynchronous functions are added to the queue, and the thread pool takes functions from this queue to execute them and produce results.

Third Step: Results

While the work queue takes care of the input of the parallel algorithms, we need another function to handle the wait for results. While a classic solution is to use a result queue, we have a better option: futures! Synchronizing the producer of a result and the consumer of that result is exactly what futures are for. The main difference here is that they are created by the thread pool.

Fourth Step: Thread Stealing

One problem with this design, as it stands, is that if the parallel algorithm submits sub-algorithms to the work queue and waits for their results, we could run out of threads! Each thread could be waiting for results to be produced while no threads are available to produce these results.

The solution to this is the concept of thread stealing while waiting for a result. Basically, you create a function that tells the thread to execute work from the work queue while waiting for its own result. We no longer directly access the values produced by the futures returned by the work queue. That would block the thread. Instead, we give the future-value back to the work queue, which can execute work items while waiting for the future to become ready.

Concrete Code Example

I’ve implemented such a scheme multiple times in the past. I’ve re-implemented it recently in an open-source application, written in C++. The application is called Tantrix Solver and it solves Tantrix puzzles. The application code is available on GitHub and contains multiple git branches:

  • One branch shows an example using pure async and futures.
  • Another branch shows the same algorithm using the suggested design.

The git repo on GitHub is available here.

Pure Async and Futures

The git branch containing the pure async and futures code design is called “thread-by-futures”.

The code design in this branch is simple. After all, that’s the selling point of async and futures. It uses the C++ std::async function with the std::launch::async mode to create threads. However, the problems we mentioned materialize as predicted, with an uncontrolled number of threads. A simple Tantrix puzzle can create a couple of dozen threads, which is probably too many, but still manageable. Complex Tantrix puzzles, on the other hand, can create many hundreds of threads, which can badly bog down most computers.

Thread Pool and Work Queue

The git branch containing the thread pool and work queue code design is called “thread-pool”. I will describe the code design more thoroughly, as it is more complex, although I’ve tried to keep it as simple as possible.

Code Design: The Easy Bits

In this section, I will present the more straightforward elements of the design.

The first part of the design is the thread pool class. You only need to give it a provider of work and the number of threads to create:

	// A pool of threads of execution.

	struct thread_pool_t
	{
      // Create a thread pool with a given number of threads
      // that will take its work from the given work provider.
      thread_pool_t(work_provider_t& a_work_provider, size_t a_thread_count = 0);

	  // Wait for all threads to end.
      ~thread_pool_t();

	private:
      // The internal function that execute queued functions in a loop.
      static void execution_loop(thread_pool_t* self);
	};

The work provider tells the threads what to do, by causing algorithms to stop or execute with a wait-and-execute function that entirely encapsulates executing one work item or waiting for an item to be executed. We will see how this is done below, with a concrete implementation; but for now, here is the design of the provider:

	// The provider of work for the pool.

	struct work_provider_t
	{
      // Request that the threads stop.
      virtual void stop() = 0;

      // Check if stop was requested.
      virtual bool is_stopped() const = 0;

      // The wait-or-execute implementation, called in a loop
      // by the threads in the thread =s pool.
      virtual void wait_or_execute() = 0;
	};

These two previous classes are hidden inside the work queue, to the point that they can actually be completely ignored by the users of the design. That’s why we won’t be discussing them further.

Code Design: The Common Bits

The work queue is the more complex piece. Its implementation is templated to make it easy to use for a given algorithm that produces a specific type of results.

Since this is the central part of the design, I will show it in detail, including its implementation details. I will divide the class presentation into multiple parts to make it easier to understand.

The first part of the design is the template parameters:

    template <class WORK_ITEM, class RESULT>
    struct threaded_work_t : work_provider_t
    {
      using result_t = typename RESULT;
      using work_item_t = typename WORK_ITEM;
      using function_t = typename std::function<result_t(work_item_t, size_t)>;

The work_item_t (WORK_ITEM) is the input data of the algorithm. The result_t (RESULT) is the output of the algorithm. The function_t is the actual algorithm. This allows us to support a family of algorithms with the same input and output. When a work item is submitted, the caller also provides the function to run that conform to this family.

The second part of the design of the work queue encompasses all the internal implementation data types and member variables. Here they are:

      using task_t = std::packaged_task<result_t(work_item_t, size_t)>;

      // How the function, work item and recursion depth is kept internally.
      struct work_t
      {
         task_t      task;
         work_item_t item;
      };

      std::mutex                    my_mutex;
      std::condition_variable       my_cond;
      std::atomic<bool>             my_stop = false;
      std::vector<work_t>           my_work_items;
      const size_t                  my_max_recursion;

      // Note: the thread pool must be the last variable so that it gets
      //       destroyed first while the mutex, etc are still valid.  
      thread_pool_t                 my_thread_pool;

The task_t type holds the algorithm function in a C++ type that can call it while producing a C++ std::future. This is how futures are created. The work_t type is the unit of work that can executed by a thread.

The first two member variables in the work queue are the mutex and condition variable, both used to protect the data shared between the threads and the caller.

The atomic my_stop variable is used to signal that all execution should stop (surprise!) The vector of work_t holds the unit of work to be executed. It is the concrete work queue. The max recursion is an implementation detail used to avoid deep stack recursion due to thread stealing. This will be explained in more detail later. The thread pool is where the threads of execution are held, obviously.

The third part of the design includes the creation of the work queue and the implementation of the work_provider_t interface. This is all straightforward. We create the internal thread pool with the exact number of cores in the processor. We also pass the work queue itself as the work provider of the thread pool.

      // Create a threaded work using the given thread pool.
      threaded_work_t(size_t a_max_recursion = 3)
         : my_max_recursion(a_max_recursion), my_thread_pool(*this, std::thread::hardware_concurrency()) {}

      ~threaded_work_t() { stop(); }

      // Stop all waiters.
      void stop() override
      {
         my_stop = true;
         my_cond.notify_all();
      }

      // Check if it is stopped.
      bool is_stopped() const override { return my_stop; }

      // Wait for something to execute or execute something already in queue.
      void wait_or_execute() override
      {
         std::unique_lock lock(my_mutex);
         return internal_wait_or_execute(lock, 0);
      }

The destructor and stop function implementation merely use the stop flag and condition variable to signal all the threads to stop. The wait-or-execute implementation... is deferred to an internal function that will be described in the next section setting out the more complex details.

Code Design: Hard Bits

In this section we finally get to the heart of the design, to the more complex implementation details.

First, let’s look at the function to wait for a given result. This part is still quite simple: as long as the awaited future value is not ready, we keep looking for new results or for new work to execute. This is when we do work for other queued algorithms, instead of snoozing and losing a thread. If the whole threaded work is stopped, we exit promptly with an empty result.

      // Wait for a particular result, execute work while waiting.
      result_t wait_for(std::future<result_t>& a_token, size_t a_recusion_depth)
      {
         while (!is_stopped())
         {
            std::unique_lock lock(my_mutex);

            if (a_token.wait_for(std::chrono::seconds(0)) == std::future_status::ready)
               return a_token.get();

            internal_wait_or_execute(lock, a_recusion_depth);
         }

         return {};
      }

Second, let’s look at the function that really executes the unit of work. When there is nothing to execute, it does nothing. On the other hand, when there is at least one unit of work queued, it executes its function, which will produce a new result.

    private:
      // Wait for something to execute or execute something already in queue.
      void internal_wait_or_execute(std::unique_lock<std::mutex>& a_lock, size_t a_recursion_depth)
      {
         if (my_stop)
            return;

         if (my_work_items.size() <= 0)
         {
            my_cond.wait(a_lock);
            return;
         }

         work_t work = std::move(my_work_items.back());
         my_work_items.pop_back();
         a_lock.unlock();

         work.task(work.item, a_recursion_depth + 1);

         my_cond.notify_all();
      }

The only subtle thing going on is that if the function is left waiting, it returns immediately instead of trying to execute some work. There is a good reason for returning immediately: the awakening can be due to either a result becoming available or a unit of work being added. Since we don’t know what the case may be, and since the caller might be interested in new results, we return to the caller so it can check. Maybe the future value it was waiting for is ready!

Finally, here is the function to submit work for execution:

      // Queue the the given function and work item to be executed in a thread.
      std::future<result_t> add_work(work_item_t a_work_item, size_t a_recusion_depth, function_t a_function)
      {
         if (my_stop)
            return {};

         // Only queue the work item if we've recursed into the threaded work only a few times.
         // Otherwise, we can end-up with too-deep stack recursion and crash.
         if (a_recusion_depth < my_max_recursion)
         {
            // Shallow: queue the function to be called by any thread.
            work_t work;
            work.task = std::move(task_t(a_function));
            work.item = std::move(a_work_item);

            auto result = work.task.get_future();

            {
               std::unique_lock lock(my_mutex);
               my_work_items.emplace_back(std::move(work));
            }

            my_cond.notify_all();

            return result;
         }
         else
         {
            // Too deep: call the function directly instead.
            std::promise<result_t> result;
            result.set_value(a_function(a_work_item, a_recusion_depth + 1));
            return result.get_future();
         }
      }

The main unexpected thing to notice is the check of the recursion depth. The subtle problem this seeks to avoid concerns the implementation of the function wait_for() and wait_or_execute(). Since waiting can cause another unit of work to be executed, and that unit of work could also end up waiting, in turn executing another unit... this could snowball into very deep recursion.

Unfortunately, we cannot refuse to execute work because it could cause all threads to stop executing work if they get too deep. The system would cease to do any work and come to a standstill! So, instead, when the maximum recursion depth is reached within a thread, any work queued by this thread is executed immediately.

While this seems equivalent to queuing the work item, it is not. You see, the amount of work required to evaluate one branch of an algorithm is limited. In contrast, the number of units of work that can be in the queue due to all the branches of the algorithm can be extremely large. So we can safely assume that the algorithm was designed so that one branch will not recurse so deeply that it leads to a crash. We cannot assume the same thing about the total of all the work items waiting in the queue.

That is why it’s also a good idea to check the recursion depth in the algorithm itself and not even queue these work items, once the recursion depth is deep. Instead, it should call their function directly in the algorithm, to make it all more efficient.

Aside from this subtlety, the rest of the code simply queues the work unit and wakes up any thread that was waiting to execute work.

Conclusion

As shown, this implementation of a work queue replaces async and future with a thread pool. The caller only needs two functions: add_work() and wait_for(). This is still a simple interface to use, but internally, it does give additional control over multi-threading to avoid the pitfalls of async and futures.

I hope that one day, the C++ standard will come with a built-in design for work queues and thread pools, so that we don’t have to roll them out by hand. In the meantime, feel free to reuse my design.