Aussie AI
Chapter 32. Thread Pools
-
Book Excerpt from "C++ Ultra-Low Latency: Multithreading and Low-Level Optimizations"
-
by David Spuler, Ph.D.
Chapter 32. Thread Pools
What are Thread Pools?
Threads going swimming in warm ocean water. Who doesn’t love the beach?
Thread pools are a design pattern in C++ multithreading that avoids the cost of creating and destroying threads by using long-running threads. Instead of incurring this thread overhead, a “pool” of available threads have been pre-created, which sit there until work is available to be done. The main characteristics are:
- Idle threads wait for work (e.g., off a task queue).
- Threads are not destroyed after completing a chunk of work.
Thread pools are mostly used in a “producer-consumer” design pattern, although thread pools can also be used in other ways. There are effectively two thread pools in this design pattern:
- Producer thread pool — or sometimes a single producer.
- Consumer thread pool — always multiple, or what’s the point?
Typically, one or more producer threads adds work items to a queue, such as when it receives new data from a network source. Another group of consumer threads is idle while waiting to pull work off the queue. Consumers do the work, return the results, and then add themselves back to the pool of idle consumer threads awaiting more work.
Work Queue Implementation
The typical features of the thread-safe queue used in a producer-consumer work queue include:
- Vector of worker threads
- Queue of arbitrary tasks (e.g., usually lambdas, functors or
std::functionwrappers) - Stop flag for shutting down
The main interfaces are:
- Enqueue work (push) — by the producer.
- Deque work (pop) — by the consumer worker threads.
- Shutdown — tell all the threads to stop.
For a more advanced thread pool, some extra convenience features of the work submission interface to consider include:
- Work functions with arbitrary arguments (via parameter packs, variadic functions)
- Perfect forwarding of function arguments (e.g.,
std::forward)
The work queue can be implemented in various ways:
- Use
std::queueorstd::dequeinside the thread pool object. - Hand-coded locking queue with mutex and condition variable.
- Lock-free queue with atomics and “Compare-And-Set” (CAS) primitives.
Thread Pool Example
I tried hard to make this example simpler; I really did! In fact, my aim was to use only explicit function names, and avoid any uses of the syntactic sugar for:
- Lambda functions
std::function- Functor mechanics
However, it was a triple fail. Perhaps the last point was unavoidable,
since a worker task is a function object.
But I also had to add a little lambda function just to get the worker thread function to run
and another one for the predicate in the condition variable wait.
I also used std::function for the type of the function objects.
Anyway, here’s the first attempt at a “simple” thread pool with these features:
- Wraps around a
std::queueof tasks — not anything home-grown. - Each task is a function object — so they can be put on a queue.
- Vector of threads — each one waits forever for a task.
- Mutex and condition variable for synchronization — i.e., basic locking, not lock-free).
- Stop flag — only used when shutting down the entire thread pool.
And here’s the code of the basic interface and private data members:
class ThreadPool {
using TaskType = std::function<void()>; // Type alias
private:
std::vector<std::thread> threads_; // Threads in pool
std::queue<TaskType> qtasks_; // Queue of tasks to run
std::mutex mtx_;
std::condition_variable cv_;
bool stopflag_; // Shutdown flag (set in destructor)
public:
....
};
For safety, I’ve deleted some of the whole-thread-pool methods:
// Safety preventions
ThreadPool(const ThreadPool&) = delete;
ThreadPool(ThreadPool&&) = delete;
ThreadPool& operator=(const ThreadPool&) = delete;
ThreadPool& operator=(ThreadPool&) = delete;
Here’s the basic constructor with the number of threads to create in the pool, by adding them to a vector of threads:
ThreadPool(size_t nthreads) : stopflag_(false) {
for (int i = 0; i < nthreads; i++) {
// Create new thread
auto tobj = [this]() { worker_thread(); };
threads_.emplace_back(tobj);
}
}
Here’s the worker function that each thread runs, with an infinite loop waiting for work tasks.
void worker_thread() {
for (;;) { // forever
std::unique_lock<std::mutex> lock(mtx_);
cv_.wait(lock, [this] { return !qtasks_.empty() || stopflag_; });
if (!qtasks_.empty()) {
TaskType t = qtasks_.front();
qtasks_.pop();
lock.unlock(); // Unlock before running task!
t(); // Run the task!
}
else { // Empty queue
if (stopflag_) {
return; // Quit!
}
}
}
}
Each thread will only exit if (a) the destructor sets the stop flag, and (b) there’s no more tasks still on the work queue. This ensures the whole thread pool gracefully shuts down by first finishing all jobs.
And here’s the destructor, which sets a global stop flag, notifies all the threads, and then waits for each one to stop.
~ThreadPool() {
std::unique_lock<std::mutex> lock(mtx_);
stopflag_ = true; // Set the shutdown flag
lock.unlock();
cv_.notify_all(); // Tell everyone to stop
for (auto &t : threads_) {
t.join(); // Wait for all threads
}
threads_.clear();
}
Here’s the enqueue function to add a work task for a thread to run:
void enqueue_task(TaskType t) {
std::unique_lock<std::mutex> lock(mtx_);
qtasks_.emplace(t);
lock.unlock();
cv_.notify_one(); // Wake one worker
}
Here are some of the ways to call the enqueue function to submit work to run:
p.enqueue_task(my_test_task); // Pointer-to-function
p.enqueue_task(std::function<void()>(my_test_task));
p.enqueue_task([]() { /*lambda function*/ });
auto functor = []() { /*lambda function*/ };
p.enqueue_task(functor);
The whole thread pool is far from perfect, and I’m sure you can see some areas needing work.
Problems to Avoid
There are a lot of little fiddly problems to overcome in the thread pool implementation, even with a wrapper around a standard queue object.
- Fiddly to get the scope right so that the worker function can access the queue object, but is also able to be put into a function object.
- Ensure that we unlock before running any task (otherwise, all jobs are serialized!).
- Lambda function for the predicate function on the wait of the condition variable.
The above code needs some fixes:
- Enqueue should warn or throw if the thread pool is already stopped.
- Should use move semantics fully to avoid copying any task or thread objects.
- Call
joinable()beforejoin(), just in case.
Various fixes to move semantics are needed here.
enqueue_task()should usestd::move()to move a new task onto the queue.worker_thread()should use std::move() to pull a new task off the front of the queue.
Advanced Thread Pool Features
Some of the features that can be added to a more advanced thread pool implementation:
- Dynamically increase and decrease the number of workers.
- Priorities for the work jobs to run important work faster.
- Scheduling of jobs to run with a delay or at a specific time.
- Work stealing and thread-specific work queues.
- Support for a graph of interdependent jobs (i.e., a “compute graph” or “task graph”).
The interface to the thread pool job submission could also need these capabilities:
- Arguments for tasks (e.g., via parameter packs and
std::forward). - Status results indicating success or failure (e.g., non-
voidfunctions). - General capabilities to return answer objects to the work submitter.
- Interface for the work submitter to query job status.
Some additional devops infrastructure would be desirable for these thread pool classes:
- Monitoring support via logging, and instrumentation for production usage.
- Self-monitoring to detect straggler/hang jobs (e.g., never-finishing).
- Self-test capabilities for use while regression testing (non-production).
- Timing features for performance measurement (non-production).
- Statistics reporting for production or testing usage.
It’s just a small matter of coding.
Task Graphs
Thread pools are mostly designed on the assumption that each piece of work is independent. Hence, the worker threads don’t depend on each other in any way, but only on the producer thread that’s adding work to the queue. This is the simplest and also the most common requirement.
However, work jobs that depend on each other are not uncommon. The network of dependencies between concurrent jobs can create a “graph” of work to be done, with additional synchronization required between the individual workers. An example of a more generalized thread pool that supports a work graph is listed in the references; see Puyda (2024).
References
- Emily Dawson, April 2025, Multithreading with C++: Parallel Programming Guide, https://www.amazon.com/dp/B0F494Z76L/
- Geeks for Geeks, 3 Jan, 2024, Thread Pool in C++, https://www.geeksforgeeks.org/thread-pool-in-cpp/
- Ishan Tripathi, Dec 11, 2023, Making a Thread Pool in C++ from scratch, https://dev.to/ish4n10/making-a-thread-pool-in-c-from-scratch-bnm
- Matheus Gomes, July 5, 2023, Thread Pool In C++ – Writing a Concurrent Task Queue, https://matgomes.com/thread-pools-cpp-with-queues/
- Barak Shoshany, 27 Dec 2023 (v4), A C++17 Thread Pool for High-Performance Scientific Computing, https://arxiv.org/abs/2105.00613v2, Code: https://github.com/bshoshany/thread-pool
- Dmytro Puyda, 23 Jul 2024 (v2), A simple and fast C++ thread pool implementation capable of running task graphs, https://arxiv.org/abs/2407.15805, Code: https://github.com/dpuyda/scheduling
|
• Online: Table of Contents • PDF: Free PDF book download • Buy: C++ Ultra-Low Latency |
|
C++ Ultra-Low Latency: Multithreading and Low-Level Optimizations:
Get your copy from Amazon: C++ Ultra-Low Latency |