#include #include #include #include #include #include #include #include #include "Common/Log.h" #include "Common/Thread/ThreadUtil.h" #include "Common/Thread/ThreadManager.h" // Threads and task scheduling // // * The threadpool should contain a number of threads that's the the number of cores, // plus a fixed number more for I/O-limited background tasks. // * Parallel compute-limited loops should use as many threads as there are cores. // They should always be scheduled to the first N threads. // * For some tasks, splitting the input values up linearly between the threads // is not fair. However, we ignore that for now. const int MAX_CORES_TO_USE = 16; const int EXTRA_THREADS = 4; // For I/O limited tasks struct GlobalThreadContext { std::mutex mutex; // associated with each respective condition variable std::deque queue; std::atomic queue_size; std::vector threads_; std::atomic roundRobin; }; struct ThreadContext { std::thread thread; // the worker thread std::condition_variable cond; // used to signal new work std::mutex mutex; // protects the local queue. std::atomic queue_size; int index; std::atomic cancelled; std::atomic private_single; std::deque private_queue; }; ThreadManager::ThreadManager() : global_(new GlobalThreadContext()) { global_->queue_size = 0; global_->roundRobin = 0; } ThreadManager::~ThreadManager() { delete global_; } void ThreadManager::Teardown() { for (size_t i = 0; i < global_->threads_.size(); i++) { global_->threads_[i]->cancelled = true; global_->threads_[i]->cond.notify_one(); } for (size_t i = 0; i < global_->threads_.size(); i++) { global_->threads_[i]->thread.join(); delete global_->threads_[i]; } global_->threads_.clear(); } static void WorkerThreadFunc(GlobalThreadContext *global, ThreadContext *thread) { char threadName[16]; snprintf(threadName, sizeof(threadName), "PoolWorker %d", thread->index); SetCurrentThreadName(threadName); while (!thread->cancelled) { Task *task = thread->private_single.exchange(nullptr); // Check the global queue first, then check the private queue and wait if there's nothing to do. if (!task) { // Grab one from the global queue if there is any. std::unique_lock lock(global->mutex); if (!global->queue.empty()) { task = global->queue.front(); global->queue.pop_front(); global->queue_size--; // We are processing one now, so mark that. thread->queue_size++; } } if (!task) { std::unique_lock lock(thread->mutex); // We must check both queue and single again, while locked. if (!thread->private_queue.empty()) { task = thread->private_queue.front(); thread->private_queue.pop_front(); } else if (thread->private_single.load() == nullptr && global->queue_size.load() == 0) { thread->cond.wait(lock); } } // The task itself takes care of notifying anyone waiting on it. Not the // responsibility of the ThreadManager (although it could be!). if (task) { task->Run(); delete task; // Reduce the queue size once complete. thread->queue_size--; } } } void ThreadManager::Init(int numRealCores, int numLogicalCoresPerCpu) { if (IsInitialized()) { Teardown(); } numComputeThreads_ = std::min(numRealCores * numLogicalCoresPerCpu, MAX_CORES_TO_USE); int numThreads = numComputeThreads_ + EXTRA_THREADS; numThreads_ = numThreads; INFO_LOG(SYSTEM, "ThreadManager::Init(compute threads: %d, all: %d)", numComputeThreads_, numThreads_); for (int i = 0; i < numThreads; i++) { ThreadContext *thread = new ThreadContext(); thread->cancelled.store(false); thread->private_single.store(nullptr); thread->thread = std::thread(&WorkerThreadFunc, global_, thread); thread->index = i; global_->threads_.push_back(thread); } } void ThreadManager::EnqueueTask(Task *task, TaskType taskType) { _assert_msg_(IsInitialized(), "ThreadManager not initialized"); int maxThread; int threadOffset = 0; if (taskType == TaskType::CPU_COMPUTE) { // only the threads reserved for heavy compute. maxThread = numComputeThreads_; threadOffset = 0; } else { // any free thread maxThread = numThreads_; threadOffset = numComputeThreads_; } // Find a thread with no outstanding work. int threadNum = threadOffset; for (int i = 0; i < maxThread; i++, threadNum++) { if (threadNum >= global_->threads_.size()) { threadNum = 0; } ThreadContext *thread = global_->threads_[threadNum]; if (thread->queue_size.load() == 0) { std::unique_lock lock(thread->mutex); thread->private_queue.push_back(task); thread->queue_size++; thread->cond.notify_one(); // Found it - done. return; } } // Still not scheduled? Put it on the global queue and notify a thread chosen by round-robin. // Not particularly scientific, but hopefully we should not run into this too much. { std::unique_lock lock(global_->mutex); global_->queue.push_back(task); global_->queue_size++; } // Lock the thread to ensure it gets the message. int chosenIndex = global_->roundRobin++; ThreadContext *&chosenThread = global_->threads_[chosenIndex % maxThread]; std::unique_lock lock(chosenThread->mutex); chosenThread->cond.notify_one(); } void ThreadManager::EnqueueTaskOnThread(int threadNum, Task *task, TaskType taskType) { _assert_msg_(threadNum >= 0 && threadNum < (int)global_->threads_.size(), "Bad threadnum or not initialized"); ThreadContext *thread = global_->threads_[threadNum]; // Try first atomically, as highest priority. Task *expected = nullptr; thread->private_single.compare_exchange_weak(expected, task); // Whether we got that or will have to wait, increase the queue counter. thread->queue_size++; if (expected == nullptr) { std::unique_lock lock(thread->mutex); thread->cond.notify_one(); } else { std::unique_lock lock(thread->mutex); thread->private_queue.push_back(task); thread->cond.notify_one(); } } int ThreadManager::GetNumLooperThreads() const { return numComputeThreads_; } void ThreadManager::TryCancelTask(uint64_t taskID) { // Do nothing for now, just let it finish. } bool ThreadManager::IsInitialized() const { return !global_->threads_.empty(); }