Introduction
Task distribution becomes critical in the presence of multiple processors in parallel computing systems performances. The work-stealing algorithm is one effective load balancing approach suited to this environment. The work-stealing approach allows threads that have completed their tasks to 'steal' threads with unfinished tasks, thus maintaining a balanced system and reducing idle processors. It is now well adopted in framework that supports parallelism such as Intel's Threading Building Blocks (TBB) and ForkJoinPool in Java .
In this paper, we will first give a conceptual overview of the work-stealing algorithm and present a C++ code implementing it later on. In addition, work-stealing algorithms will be discussed with regard to their appropriate usage and constraints.
Problem Statement:
In a multi-threaded model, some threads would finish their portions earlier than others and thus, some of the resources would remain unused. This is where the issues of how to efficiently load balance across threads arise in such a way that there is no thread that is idle while other threads have work. Centralized queues are often used in traditional work distribution methods. This approach presents scalability issues as well due to queue bottlenecks that are likely to occur. The goal of the work-stealing algorithm is to create a distributed task queue system where idle threads can "steal" tasks from other threads' queues to maintain balance and improve overall efficiency.
Overview of the Work-Stealing Algorithm
The work-stealing algorithm operates by allowing each thread to have its own double-ended queue (deque) for storing tasks. The threads typically follow these rules:
- Each thread operates primarily on its own deque, where it pushes and pops tasks in a last-in-first-out (LIFO) order.
- If a thread completes its tasks and its deque is empty, it attempts to "steal" a task from another thread's deque.
- Tasks are stolen from the bottom of the deque, allowing the victim thread to maintain its recent local tasks on top.
- This approach minimizes contention as most accesses are to the thread's own deque.
This decentralized approach allows better scalability and avoids a single point of contention, as each thread manages its own task queue and only interacts with others when it's out of tasks.
Implementation in C++
Below is a simple C++ implementation of a work-stealing algorithm using a deque for each worker thread. This is a minimal example for illustration purposes and lacks some optimizations and error handling that would be necessary in a production environment.
Program 1:
Libraries Required
We will use the C++ Standard Library's thread support and containers for the implementation. You can compile this code with a C++11 or newer compiler.
#include <iostream>
#include <deque>
#include <thread>
#include <mutex>
#include <vector>
#include <memory> // For std::unique_ptr
#include <atomic>
// Simple implementation of std::make_unique for C++11
template <typename T, typename... Args>
std::unique_ptr<T> make_unique(Args&&... args) {
return std::unique_ptr<T>(new T(std::forward<Args>(args)...));
}
// A class representing a task that can be run by a thread
struct Task {
int task_id;
Task(int id) : task_id(id) {}
void execute() const {
std::cout << "Executing task " << task_id << " on thread " << std::this_thread::get_id() << std::endl;
}
};
// Class representing the worker thread with a deque and a work-stealing mechanism
class Worker {
public:
Worker() : stop_flag(false) {}
// Adds a task to the worker's deque
void push_task(Task task) {
std::lock_guard<std::mutex> lock(mutex);
deque.push_front(task);
}
// Attempts to steal a task from another worker
std::unique_ptr<Task> steal_task() {
std::lock_guard<std::mutex> lock(mutex);
if (!deque.empty()) {
Task task = deque.back();
deque.pop_back();
return make_unique<Task>(task);
}
return nullptr;
}
// Runs the worker, executing tasks or stealing if the deque is empty
void run() {
while (!stop_flag) {
auto task = pop_task();
if (!task) {
// Attempt to steal from another worker
for (Worker* other : *other_workers) {
if (other != this) {
task = other->steal_task();
if (task) break;
}
}
}
if (task) {
task->execute();
} else {
// No work available, so sleep briefly to reduce contention
std::this_thread::yield();
}
}
}
// Sets a flag to stop the worker
void stop() { stop_flag = true; }
void set_other_workers(const std::vector<Worker*>& workers) {
other_workers = &workers;
}
private:
// Pops a task from the deque
std::unique_ptr<Task> pop_task() {
std::lock_guard<std::mutex> lock(mutex);
if (!deque.empty()) {
Task task = deque.front();
deque.pop_front();
return make_unique<Task>(task);
}
return nullptr;
}
std::deque<Task> deque;
std::mutex mutex;
std::atomic<bool> stop_flag;
const std::vector<Worker*>* other_workers;
};
int main() {
const int num_threads = 4;
std::vector<std::thread> threads;
std::vector<Worker> workers(num_threads);
// Set the other_workers vector for each worker for stealing
std::vector<Worker*> worker_ptrs;
for (auto& worker : workers) worker_ptrs.push_back(&worker);
for (auto& worker : workers) worker.set_other_workers(worker_ptrs);
// Launch threads
for (int i = 0; i < num_threads; ++i) {
threads.emplace_back([&workers, i] { workers[i].run(); });
}
// Distribute tasks to workers
for (int i = 0; i < 10; ++i) {
workers[i % num_threads].push_task(Task(i));
}
// Stop workers after a short delay
std::this_thread::sleep_for(std::chrono::seconds(1));
for (auto& worker : workers) {
worker.stop();
}
// Join threads
for (auto& thread : threads) {
if (thread.joinable()) {
thread.join();
}
}
return 0;
}
Output:
Executing task 6 on thread 22639988467392
Executing task 2 on thread 22639988467392
Executing task 0 on thread 22639988467392
Executing task 4 on thread 22639988467392
Executing task 8 on thread 22639988467392
Executing task 1 on thread 22639988467392
Executing task 5 on thread 22639988467392
Executing task 9 on thread 22639988467392
Executing task 3 on thread 22639988467392
Executing task 7 on thread 22639986366144
Explanation of the Code
- Worker Class: Each worker has a deque for its tasks, a mutex to manage concurrent access, and a method to add tasks (push_task). Each worker can attempt to steal tasks from other workers if their deque is empty.
- Task Execution: Each worker runs in a loop, where it pops tasks from its deque and executes them. If no tasks are found, it attempts to steal tasks from other workers. The stop_flag is used to control when the worker should cease operation.
- Task Stealing: When a worker has no tasks in their deque, it iterates over other workers, attempting to steal a task from the bottom of their deque. If no tasks are available, it yields control briefly to reduce CPU load.
- Main function : The main function initializes the workers and starts a set of threads, each running a worker. It also distributes a few initial tasks among the workers. After a brief delay, all workers are stopped, and threads are joined.
Program 2:
#include <iostream>
#include <deque>
#include <vector>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <memory>
#include <unordered_map>
#include <functional>
#include <atomic>
#include <future>
// Represents a task with dependencies and priority
class Task {
public:
using TaskFunc = std::function<void()>;
Task(int id, TaskFunc func, int priority = 0)
: task_id(id), func(std::move(func)), priority(priority), dependencies(0) {}
void execute() const {
func();
}
void add_dependency() {
dependencies.fetch_add(1, std::memory_order_relaxed);
}
void resolve_dependency() {
dependencies.fetch_sub(1, std::memory_order_relaxed);
}
bool is_ready() const {
return dependencies.load(std::memory_order_relaxed) == 0;
}
int get_priority() const {
return priority;
}
int get_id() const {
return task_id;
}
private:
int task_id;
TaskFunc func;
int priority;
std::atomic<int> dependencies;
};
// Represents a worker thread with a deque for task management
class Worker {
public:
Worker() : stop_flag(false) {}
void push_task(const std::shared_ptr<Task>& task) {
std::lock_guard<std::mutex> lock(mutex);
task_queue.push_front(task);
}
std::shared_ptr<Task> pop_task() {
std::lock_guard<std::mutex> lock(mutex);
if (!task_queue.empty()) {
auto task = task_queue.front();
task_queue.pop_front();
return task;
}
return nullptr; // No task available
}
std::shared_ptr<Task> steal_task() {
std::lock_guard<std::mutex> lock(mutex);
if (!task_queue.empty()) {
auto task = task_queue.back();
task_queue.pop_back();
return task;
}
return nullptr; // No task to steal
}
void set_other_workers(const std::vector<Worker*>& workers) {
other_workers = &workers;
}
void run() {
while (!stop_flag) {
auto task = pop_task();
if (!task) {
for (Worker* other : *other_workers) {
if (other != this) {
task = other->steal_task();
if (task) break;
}
}
}
if (task && task->is_ready()) {
task->execute();
} else {
std::this_thread::yield();
}
}
}
void stop() {
stop_flag = true;
}
private:
std::deque<std::shared_ptr<Task>> task_queue;
std::mutex mutex;
const std::vector<Worker*>* other_workers = nullptr;
std::atomic<bool> stop_flag;
};
// Task Scheduler using Work-Stealing
class TaskScheduler {
public:
TaskScheduler(size_t num_threads)
: workers(num_threads), threads(num_threads) {
initialize_workers();
}
~TaskScheduler() {
stop();
}
void submit_task(const std::shared_ptr<Task>& task) {
if (!task->is_ready()) {
throw std::runtime_error("Task has unresolved dependencies!");
}
assign_task_to_worker(task);
}
void resolve_dependency(int task_id) {
auto it = task_map.find(task_id);
if (it != task_map.end()) {
auto& task = it->second;
task->resolve_dependency();
if (task->is_ready()) {
assign_task_to_worker(task);
}
}
}
void run() {
for (size_t i = 0; i < workers.size(); ++i) {
threads[i] = std::thread([this, i] { workers[i].run(); });
}
}
void stop() {
for (auto& worker : workers) {
worker.stop();
}
for (auto& thread : threads) {
if (thread.joinable()) {
thread.join();
}
}
}
private:
std::vector<Worker> workers;
std::vector<std::thread> threads;
std::unordered_map<int, std::shared_ptr<Task>> task_map;
void initialize_workers() {
std::vector<Worker*> worker_ptrs;
for (auto& worker : workers) {
worker_ptrs.push_back(&worker);
}
for (auto& worker : workers) {
worker.set_other_workers(worker_ptrs);
}
}
void assign_task_to_worker(const std::shared_ptr<Task>& task) {
static size_t current_worker = 0;
workers[current_worker % workers.size()].push_task(task);
current_worker++;
}
};
// Example usage
int main() {
TaskScheduler scheduler(4);
auto task1 = std::make_shared<Task>(1, []() {
std::cout << "Task 1 executed on thread " << std::this_thread::get_id() << std::endl;
});
auto task2 = std::make_shared<Task>(2, []() {
std::cout << "Task 2 executed on thread " << std::this_thread::get_id() << std::endl;
});
auto task3 = std::make_shared<Task>(3, []() {
std::cout << "Task 3 executed on thread " << std::this_thread::get_id() << std::endl;
});
auto task4 = std::make_shared<Task>(4, []() {
std::cout << "Task 4 executed on thread " << std::this_thread::get_id() << std::endl;
});
// Adding dependencies
task2->add_dependency();
task3->add_dependency();
scheduler.submit_task(task1);
scheduler.submit_task(task4);
scheduler.run();
// Resolving dependencies
std::this_thread::sleep_for(std::chrono::seconds(1));
scheduler.resolve_dependency(2);
scheduler.resolve_dependency(3);
scheduler.stop();
return 0;
}
Output:
Task 1 executed on thread 140491566859968
Task 4 executed on thread 140491556370112
Explanation:
- Task Class: It encompasses dependency, and those dependencies include dependency relationships. An atomic counter is used to keep track of dependencies and to ensure a safe and thread-competent decrement of such dependencies.
- Worker Class: They maintain a private local deque of tasks which they fetch as per the requirement. The tasks are executed locally or in the absence of tasks, can even be snatched from the rest of the workers.
- TaskScheduler Class: Oversees the network of worker threads that are spread over the worker honey pot. Keeps a watch on the tasks provided for submission and also resolves these in a on the fly manner.
- Main Function: Shows how the scheduler can be used through task graphs with dependencies and priorities assigned to them. Tasks are not executed before their dependencies are cleared.
- It encompasses dependency, and those dependencies include dependency relationships.
- An atomic counter is used to keep track of dependencies and to ensure a safe and thread-competent decrement of such dependencies.
- They maintain a private local deque of tasks which they fetch as per the requirement.
- The tasks are executed locally or in the absence of tasks, can even be snatched from the rest of the workers.
- Oversees the network of worker threads that are spread over the worker honey pot.
- Keeps a watch on the tasks provided for submission and also resolves these in a on the fly manner.
- Shows how the scheduler can be used through task graphs with dependencies and priorities assigned to them.
- Tasks are not executed before their dependencies are cleared.
Advantages and Trade-offs
The work-stealing algorithm can nicely supplement threads because it allows for workload balancing and consequently, CPU utilization as well as IDLE time is increased.
- Lock Contention: Even if the algorithm allows for most of the deque accesses to be local, the algorithm does permit most of the accesses to die due to the need to leverage the task-stealing concept.
- Task Overhead: Switching tasks and stealing too much can cause a form of task overhead, which in turn makes work-stealing effectively not very practical for tasks that are very short-lasting or have no computation to carry out.
Conclusion
In conclusion, Work-stealing is an excellent method that can be applied in handling dynamic workloads in a multi-threaded application. It allows parts of work to be assigned out in distributed queues and idle threads to help threads that are working too much. It makes the solutions more resource-efficient and highly scalable. Even though the algorithm has its downsides, it has become trendy and is much more applicable in areas where diverse and adjustable tasks are required, therefore making it a highly utilized algorithm within the parallel computing space.