Introduction
Task allocation is vital when dealing with numerous processors in parallel computing systems. The work-stealing technique emerges as a valuable method for achieving load balance in such scenarios. This strategy enables threads that have finished their assignments to 'steal' tasks from other threads that are still pending, thereby ensuring an equilibrium within the system and minimizing processor downtime. This approach is widely embraced in frameworks that facilitate parallel processing like Intel's Threading Building Blocks (TBB) and Java's ForkJoinPool.
In this document, we will initially provide a theoretical explanation of the work-stealing mechanism and subsequently demonstrate a C++ implementation of it. Furthermore, we will delve into the considerations surrounding the proper application and limitations of work-stealing algorithms.
Problem Statement:
In a scenario involving multiple threads, it is common for certain threads to complete their tasks before others, leading to underutilized resources. This situation highlights the challenge of effectively distributing work among threads to prevent any thread from being inactive while others are still processing tasks. Conventional methods often rely on centralized queues for work distribution, but this can introduce scalability challenges such as queue bottlenecks. The objective of the work-stealing algorithm is to establish a decentralized task queue system. Here, idle threads have the capability to "steal" tasks from the queues of other threads, promoting balance and enhancing the overall efficiency of the system.
Overview of the Work-Stealing Algorithm
The work-stealing algorithm operates by allowing each thread to have its own double-ended queue (deque) for storing tasks. The threads typically follow these rules:
- Each thread operates primarily on its own deque, where it pushes and pops tasks in a last-in-first-out (LIFO) order.
- If a thread completes its tasks and its deque is empty, it attempts to "steal" a task from another thread's deque.
- Tasks are stolen from the bottom of the deque, allowing the victim thread to maintain its recent local tasks on top.
- This approach minimizes contention as most accesses are to the thread's own deque.
This distributed method enhances scalability and prevents a central point of conflict, as every thread oversees its individual task queue and communicates with others solely when it exhausts its tasks.
Implementation in C++
Below is a basic C++ implementation of a work-stealing algorithm that employs a deque for every worker thread. This example is kept simple for explanatory reasons and does not include certain enhancements and error management required in a real-world scenario.
Program 1:
Libraries Required
We will utilize the thread support and data structures provided by the C++ Standard Library for the development. This code is compatible with compilers that support C++11 or later versions.
#include <iostream>
#include <deque>
#include <thread>
#include <mutex>
#include <vector>
#include <memory> // For std::unique_ptr
#include <atomic>
// Simple implementation of std::make_unique for C++11
template <typename T, typename... Args>
std::unique_ptr<T> make_unique(Args&&... args) {
return std::unique_ptr<T>(new T(std::forward<Args>(args)...));
}
// A class representing a task that can be run by a thread
struct Task {
int task_id;
Task(int id) : task_id(id) {}
void execute() const {
std::cout << "Executing task " << task_id << " on thread " << std::this_thread::get_id() << std::endl;
}
};
// Class representing the worker thread with a deque and a work-stealing mechanism
class Worker {
public:
Worker() : stop_flag(false) {}
// Adds a task to the worker's deque
void push_task(Task task) {
std::lock_guard<std::mutex> lock(mutex);
deque.push_front(task);
}
// Attempts to steal a task from another worker
std::unique_ptr<Task> steal_task() {
std::lock_guard<std::mutex> lock(mutex);
if (!deque.empty()) {
Task task = deque.back();
deque.pop_back();
return make_unique<Task>(task);
}
return nullptr;
}
// Runs the worker, executing tasks or stealing if the deque is empty
void run() {
while (!stop_flag) {
auto task = pop_task();
if (!task) {
// Attempt to steal from another worker
for (Worker* other : *other_workers) {
if (other != this) {
task = other->steal_task();
if (task) break;
}
}
}
if (task) {
task->execute();
} else {
// No work available, so sleep briefly to reduce contention
std::this_thread::yield();
}
}
}
// Sets a flag to stop the worker
void stop() { stop_flag = true; }
void set_other_workers(const std::vector<Worker*>& workers) {
other_workers = &workers;
}
private:
// Pops a task from the deque
std::unique_ptr<Task> pop_task() {
std::lock_guard<std::mutex> lock(mutex);
if (!deque.empty()) {
Task task = deque.front();
deque.pop_front();
return make_unique<Task>(task);
}
return nullptr;
}
std::deque<Task> deque;
std::mutex mutex;
std::atomic<bool> stop_flag;
const std::vector<Worker*>* other_workers;
};
int main() {
const int num_threads = 4;
std::vector<std::thread> threads;
std::vector<Worker> workers(num_threads);
// Set the other_workers vector for each worker for stealing
std::vector<Worker*> worker_ptrs;
for (auto& worker : workers) worker_ptrs.push_back(&worker);
for (auto& worker : workers) worker.set_other_workers(worker_ptrs);
// Launch threads
for (int i = 0; i < num_threads; ++i) {
threads.emplace_back([&workers, i] { workers[i].run(); });
}
// Distribute tasks to workers
for (int i = 0; i < 10; ++i) {
workers[i % num_threads].push_task(Task(i));
}
// Stop workers after a short delay
std::this_thread::sleep_for(std::chrono::seconds(1));
for (auto& worker : workers) {
worker.stop();
}
// Join threads
for (auto& thread : threads) {
if (thread.joinable()) {
thread.join();
}
}
return 0;
}
Output:
Executing task 6 on thread 22639988467392
Executing task 2 on thread 22639988467392
Executing task 0 on thread 22639988467392
Executing task 4 on thread 22639988467392
Executing task 8 on thread 22639988467392
Executing task 1 on thread 22639988467392
Executing task 5 on thread 22639988467392
Executing task 9 on thread 22639988467392
Executing task 3 on thread 22639988467392
Executing task 7 on thread 22639986366144
Explanation of the Code
- Worker Class: Each worker has a deque for its tasks, a mutex to manage concurrent access, and a method to add tasks (push_task). Each worker can attempt to steal tasks from other workers if their deque is empty.
- Task Execution: Each worker runs in a loop, where it pops tasks from its deque and executes them. If no tasks are found, it attempts to steal tasks from other workers. The stop_flag is used to control when the worker should cease operation.
- Task Stealing: When a worker has no tasks in their deque, it iterates over other workers, attempting to steal a task from the bottom of their deque. If no tasks are available, it yields control briefly to reduce CPU load.
- Main function : The main function initializes the workers and starts a set of threads, each running a worker. It also distributes a few initial tasks among the workers. After a brief delay, all workers are stopped, and threads are joined.
Program 2:
#include <iostream>
#include <deque>
#include <vector>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <memory>
#include <unordered_map>
#include <functional>
#include <atomic>
#include <future>
// Represents a task with dependencies and priority
class Task {
public:
using TaskFunc = std::function<void()>;
Task(int id, TaskFunc func, int priority = 0)
: task_id(id), func(std::move(func)), priority(priority), dependencies(0) {}
void execute() const {
func();
}
void add_dependency() {
dependencies.fetch_add(1, std::memory_order_relaxed);
}
void resolve_dependency() {
dependencies.fetch_sub(1, std::memory_order_relaxed);
}
bool is_ready() const {
return dependencies.load(std::memory_order_relaxed) == 0;
}
int get_priority() const {
return priority;
}
int get_id() const {
return task_id;
}
private:
int task_id;
TaskFunc func;
int priority;
std::atomic<int> dependencies;
};
// Represents a worker thread with a deque for task management
class Worker {
public:
Worker() : stop_flag(false) {}
void push_task(const std::shared_ptr<Task>& task) {
std::lock_guard<std::mutex> lock(mutex);
task_queue.push_front(task);
}
std::shared_ptr<Task> pop_task() {
std::lock_guard<std::mutex> lock(mutex);
if (!task_queue.empty()) {
auto task = task_queue.front();
task_queue.pop_front();
return task;
}
return nullptr; // No task available
}
std::shared_ptr<Task> steal_task() {
std::lock_guard<std::mutex> lock(mutex);
if (!task_queue.empty()) {
auto task = task_queue.back();
task_queue.pop_back();
return task;
}
return nullptr; // No task to steal
}
void set_other_workers(const std::vector<Worker*>& workers) {
other_workers = &workers;
}
void run() {
while (!stop_flag) {
auto task = pop_task();
if (!task) {
for (Worker* other : *other_workers) {
if (other != this) {
task = other->steal_task();
if (task) break;
}
}
}
if (task && task->is_ready()) {
task->execute();
} else {
std::this_thread::yield();
}
}
}
void stop() {
stop_flag = true;
}
private:
std::deque<std::shared_ptr<Task>> task_queue;
std::mutex mutex;
const std::vector<Worker*>* other_workers = nullptr;
std::atomic<bool> stop_flag;
};
// Task Scheduler using Work-Stealing
class TaskScheduler {
public:
TaskScheduler(size_t num_threads)
: workers(num_threads), threads(num_threads) {
initialize_workers();
}
~TaskScheduler() {
stop();
}
void submit_task(const std::shared_ptr<Task>& task) {
if (!task->is_ready()) {
throw std::runtime_error("Task has unresolved dependencies!");
}
assign_task_to_worker(task);
}
void resolve_dependency(int task_id) {
auto it = task_map.find(task_id);
if (it != task_map.end()) {
auto& task = it->second;
task->resolve_dependency();
if (task->is_ready()) {
assign_task_to_worker(task);
}
}
}
void run() {
for (size_t i = 0; i < workers.size(); ++i) {
threads[i] = std::thread([this, i] { workers[i].run(); });
}
}
void stop() {
for (auto& worker : workers) {
worker.stop();
}
for (auto& thread : threads) {
if (thread.joinable()) {
thread.join();
}
}
}
private:
std::vector<Worker> workers;
std::vector<std::thread> threads;
std::unordered_map<int, std::shared_ptr<Task>> task_map;
void initialize_workers() {
std::vector<Worker*> worker_ptrs;
for (auto& worker : workers) {
worker_ptrs.push_back(&worker);
}
for (auto& worker : workers) {
worker.set_other_workers(worker_ptrs);
}
}
void assign_task_to_worker(const std::shared_ptr<Task>& task) {
static size_t current_worker = 0;
workers[current_worker % workers.size()].push_task(task);
current_worker++;
}
};
// Example usage
int main() {
TaskScheduler scheduler(4);
auto task1 = std::make_shared<Task>(1, []() {
std::cout << "Task 1 executed on thread " << std::this_thread::get_id() << std::endl;
});
auto task2 = std::make_shared<Task>(2, []() {
std::cout << "Task 2 executed on thread " << std::this_thread::get_id() << std::endl;
});
auto task3 = std::make_shared<Task>(3, []() {
std::cout << "Task 3 executed on thread " << std::this_thread::get_id() << std::endl;
});
auto task4 = std::make_shared<Task>(4, []() {
std::cout << "Task 4 executed on thread " << std::this_thread::get_id() << std::endl;
});
// Adding dependencies
task2->add_dependency();
task3->add_dependency();
scheduler.submit_task(task1);
scheduler.submit_task(task4);
scheduler.run();
// Resolving dependencies
std::this_thread::sleep_for(std::chrono::seconds(1));
scheduler.resolve_dependency(2);
scheduler.resolve_dependency(3);
scheduler.stop();
return 0;
}
Output:
Task 1 executed on thread 140491566859968
Task 4 executed on thread 140491556370112
Explanation:
- Task Class: It encompasses dependency, and those dependencies include dependency relationships. An atomic counter is used to keep track of dependencies and to ensure a safe and thread-competent decrement of such dependencies.
- Worker Class: They maintain a private local deque of tasks which they fetch as per the requirement. The tasks are executed locally or in the absence of tasks, can even be snatched from the rest of the workers.
- TaskScheduler Class: Oversees the network of worker threads that are spread over the worker honey pot. Keeps a watch on the tasks provided for submission and also resolves these in a on the fly manner.
- Main Function: Shows how the scheduler can be used through task graphs with dependencies and priorities assigned to them. Tasks are not executed before their dependencies are cleared.
- It encompasses dependency, and those dependencies include dependency relationships.
- An atomic counter is used to keep track of dependencies and to ensure a safe and thread-competent decrement of such dependencies.
- They maintain a private local deque of tasks which they fetch as per the requirement.
- The tasks are executed locally or in the absence of tasks, can even be snatched from the rest of the workers.
- Oversees the network of worker threads that are spread over the worker honey pot.
- Keeps a watch on the tasks provided for submission and also resolves these in a on the fly manner.
- Shows how the scheduler can be used through task graphs with dependencies and priorities assigned to them.
- Tasks are not executed before their dependencies are cleared.
Advantages and Trade-offs
The work-stealing technique can effectively enhance threads by facilitating workload distribution, thereby boosting both CPU usage and reducing idle time.
- Lock Contention: Despite enabling local access for most deque operations, the algorithm does not prevent some accesses from failing due to the necessity of utilizing the task-stealing mechanism.
- Task Overhead: Excessive task switching and stealing may result in a type of task overhead, rendering work-stealing less practical for tasks that are extremely short-lived or lack substantial computations.
Conclusion
In summary, Work-stealing emerges as a valuable approach for managing fluctuating workloads within a multi-threaded system. By distributing tasks across queues and enabling idle threads to assist overloaded ones, it enhances resource utilization and scalability. Despite its drawbacks, this algorithm has gained popularity and finds extensive use in scenarios demanding adaptability and varied task handling, establishing itself as a prevalent technique in the realm of parallel computing.