Skip to content
Snippets Groups Projects
Commit e72a25fd authored by Alexy Pellegrini's avatar Alexy Pellegrini Committed by Kitware Robot
Browse files

Merge topic 'smp-stdthread-rework'

9829465a Rework vtkSMPThreadPool of the STDThread SMP backend

Acked-by: default avatarKitware Robot <>
Tested-by: default avatarbuildbot <>
Acked-by: default avatarMathieu Westphal <>
Merge-request: !10027
parents bbc76cda 9829465a
No related branches found
No related tags found
No related merge requests found
......@@ -15,6 +15,8 @@
#include "SMP/STDThread/vtkSMPThreadLocalBackend.h"
#include "SMP/STDThread/vtkSMPThreadPool.h"
#include <algorithm>
#include <cmath> // For std::floor & std::log2
#include <functional> // For std::hash
......@@ -32,7 +34,7 @@ VTK_ABI_NAMESPACE_BEGIN
static ThreadIdType GetThreadId()
return std::hash<std::thread::id>{}(std::this_thread::get_id());
return vtkSMPThreadPool::GetInstance().GetThreadId();
// 32 bit FNV-1a hash function
......@@ -15,6 +15,12 @@
#include "SMP/STDThread/vtkSMPThreadPool.h"
#include <vtkObject.h>
#include <algorithm>
#include <cassert>
#include <condition_variable>
#include <future>
#include <iostream>
namespace vtk
......@@ -25,66 +31,410 @@ namespace smp
vtkSMPThreadPool::vtkSMPThreadPool(int threadNumber)
static constexpr std::size_t NoRunningJob = (std::numeric_limits<std::size_t>::max)();
struct vtkSMPThreadPool::ThreadJob
// This construtor is needed because aggregate initialization can not have default value
// (prior to C++14)
// also because emplace_back can not use aggregate initialization (prior to C++20)
ThreadJob(ProxyData* proxy = nullptr, std::function<void()> function = nullptr)
: Proxy{ proxy }
, Function{ std::move(function) }
ProxyData* Proxy{}; // Proxy that allocated this job
std::function<void()> Function{}; // Actual user job
std::promise<void> Promise{}; // Set when job is done
struct vtkSMPThreadPool::ThreadData
// stack of jobs, any thread can push, and only push, jobs (and Mutex must be locked)
std::vector<ThreadJob> Jobs{};
// Current job (used to map thread to Proxy), using an index is okay as only this thread can
// erase the job and other threads can only push back new jobs not insert. This constaint could
// be relaxed by using unique ids instead.
std::size_t RunningJob{ NoRunningJob };
std::thread SystemThread{}; // the system thread, not really used
std::mutex Mutex{}; // thread mutex, used for Jobs manipulation
std::condition_variable ConditionVariable{}; // thread cv, used to wake up the thread
struct vtkSMPThreadPool::ProxyThreadData
// This construtor is needed because aggregate initialization can not have default value
// (prior to C++14)
// also because emplace_back can not use aggregate initialization (prior to C++20)
ProxyThreadData(ThreadData* threadData = nullptr, std::size_t id = 0)
: Thread{ threadData }
, Id{ id }
ThreadData* Thread{}; // The thread data from the pool
std::size_t Id{}; // Virtual thread ID, mainly used for thread local variables
struct vtkSMPThreadPool::ProxyData
vtkSMPThreadPool* Pool{}; // Pool that created this proxy
ProxyData* Parent{}; // either null (for top level) or the parent
std::vector<ProxyThreadData> Threads{}; // Threads used by this
std::size_t NextThread{}; // Round-robin thread for jobs
std::vector<std::future<void>> JobsFutures{}; // Used to know when job is done
std::mutex Mutex{}; // Used to synchronize
void vtkSMPThreadPool::RunJob(
ThreadData& data, std::size_t jobIndex, std::unique_lock<std::mutex>& lock)
assert(lock.owns_lock() && "Caller must have locked mutex");
assert(jobIndex < data.Jobs.size() && "jobIndex out of range");
const auto oldRunningJob = data.RunningJob; // store old running job for nested threads
data.RunningJob = jobIndex; // Set thread running job
auto function = std::move(data.Jobs[data.RunningJob].Function);
lock.unlock(); // MSVC: warning C26110 is a false positive
function(); // run the function
catch (const std::exception& e)
"Function called by " << vtkSMPThreadPool::GetInstance().GetThreadId()
<< " has thrown an exception. The exception is ignored. what():\n"
<< e.what());
catch (...)
"Function called by " << vtkSMPThreadPool::GetInstance().GetThreadId()
<< " has thrown an unknown exception. The exception is ignored.");
data.Jobs.erase(data.Jobs.begin() + jobIndex);
data.RunningJob = oldRunningJob;
vtkSMPThreadPool::Proxy::Proxy(std::unique_ptr<ProxyData>&& data)
: Data{ std::move(data) }
if (!this->Data->JobsFutures.empty())
vtkErrorWithObjectMacro(nullptr, "Proxy not joined. Terminating.");
vtkSMPThreadPool::Proxy::Proxy(Proxy&&) noexcept = default;
vtkSMPThreadPool::Proxy& vtkSMPThreadPool::Proxy::operator=(Proxy&&) noexcept = default;
void vtkSMPThreadPool::Proxy::Join()
if (this->IsTopLevel()) // wait for all futures, all jobs are done by other threads
for (auto& future : this->Data->JobsFutures)
else // nested run code in calling thread too
// Run jobs associated with this thread and proxy
ThreadData& threadData = *this->Data->Threads[0].Thread;
assert(threadData.SystemThread.get_id() == std::this_thread::get_id());
while (true)
// protect access in case other thread push work for current thread
std::unique_lock<std::mutex> lock{ threadData.Mutex };
auto it = std::find_if(threadData.Jobs.begin(), threadData.Jobs.end(),
[this](ThreadJob& job) { return job.Proxy == this->Data.get(); });
if (it == threadData.Jobs.end()) // no remaining job associated to this proxy
const auto jobIndex = static_cast<std::size_t>(std::distance(threadData.Jobs.begin(), it));
RunJob(threadData, jobIndex, lock);
for (auto& future : this->Data->JobsFutures)
void vtkSMPThreadPool::Proxy::DoJob(std::function<void()> job)
this->Data->NextThread = (this->Data->NextThread + 1) % this->Data->Threads.size();
auto& proxyThread = this->Data->Threads[this->Data->NextThread];
if (!this->IsTopLevel() && this->Data->NextThread == 0) // when nested, thread 0 is "this_thread"
assert(std::this_thread::get_id() == proxyThread.Thread->SystemThread.get_id());
std::unique_lock<std::mutex> lock{ proxyThread.Thread->Mutex };
proxyThread.Thread->Jobs.emplace_back(this->Data.get(), std::move(job));
std::unique_lock<std::mutex> lock{ proxyThread.Thread->Mutex };
auto& jobs = proxyThread.Thread->Jobs;
jobs.emplace_back(this->Data.get(), std::move(job));
std::vector<std::reference_wrapper<std::thread>> vtkSMPThreadPool::Proxy::GetThreads() const
std::vector<std::reference_wrapper<std::thread>> output;
for (auto& proxyThread : this->Data->Threads)
return output;
bool vtkSMPThreadPool::Proxy::IsTopLevel() const noexcept
return this->Data->Parent == nullptr;
for (int i = 0; i < threadNumber; ++i)
const auto threadCount = static_cast<std::size_t>(std::thread::hardware_concurrency());
for (std::size_t i{}; i < threadCount; ++i)
this->Threads.emplace_back(std::bind(&vtkSMPThreadPool::ThreadJob, this));
std::unique_ptr<ThreadData> data{ new ThreadData{} };
data->SystemThread = this->MakeThread();
this->, std::memory_order_release);
void vtkSMPThreadPool::Join()
this->, std::memory_order_release);
for (auto& threadData : this->Threads)
std::unique_lock<std::mutex> lock(this->Mutex);
this->Joining = true;
for (auto& threadData : this->Threads)
for (auto& it : this->Threads)
vtkSMPThreadPool::Proxy vtkSMPThreadPool::AllocateThreads(std::size_t threadCount)
if (threadCount == 0 || threadCount > this->ThreadCount())
threadCount = this->ThreadCount();
std::unique_ptr<ProxyData> proxy{ new ProxyData{} };
proxy->Pool = this;
// Check if we are in the pool
ThreadData* threadData = this->GetCallerThreadData();
if (threadData)
// Don't lock since we are in the running job, in this thread
proxy->Parent = threadData->Jobs[threadData->RunningJob].Proxy;
// First thread is always current thread
proxy->Threads.emplace_back(threadData, this->GetNextThreadId());
this->FillThreadsForNestedProxy(proxy.get(), threadCount);
proxy->Parent = nullptr;
for (std::size_t i{}; i < threadCount; ++i)
proxy->Threads.emplace_back(this->Threads[i].get(), this->GetNextThreadId());
return Proxy{ std::move(proxy) };
void vtkSMPThreadPool::DoJob(std::function<void(void)> job)
std::size_t vtkSMPThreadPool::GetThreadId() const noexcept
std::unique_lock<std::mutex> lock(this->Mutex);
auto* threadData = this->GetCallerThreadData();
if (threadData)
std::unique_lock<std::mutex> lock{ threadData->Mutex }; // protect threadData->Jobs access
assert(threadData->RunningJob != NoRunningJob && "Invalid state");
auto& proxyThreads = threadData->Jobs[threadData->RunningJob].Proxy->Threads;
for (const auto& proxyThread : proxyThreads)
if (proxyThread.Thread == threadData)
return proxyThread.Id;
// Use 1 for any thread outside the pool and 2+ for ids of proxy thread because thread local
// implementation uses ID "0" for invalid state
return ExternalThreadID;
void vtkSMPThreadPool::ThreadJob()
bool vtkSMPThreadPool::IsParallelScope() const noexcept
std::function<void(void)> job;
return GetCallerThreadData() != nullptr;
bool vtkSMPThreadPool::GetSingleThread() const
// Return true if the caller is the thread[0] of the current running proxy
while (true)
auto* threadData = GetCallerThreadData();
if (threadData)
std::lock_guard<std::mutex> lock{ threadData->Mutex };
assert(threadData->RunningJob != NoRunningJob && "Invalid state");
return threadData->Jobs[threadData->RunningJob].Proxy->Threads[0].Thread == threadData;
return false;
std::size_t vtkSMPThreadPool::ThreadCount() const noexcept
return this->Threads.size();
vtkSMPThreadPool::ThreadData* vtkSMPThreadPool::GetCallerThreadData() const noexcept
for (const auto& threadData : this->Threads)
if (threadData->SystemThread.get_id() == std::this_thread::get_id())
std::unique_lock<std::mutex> lock(this->Mutex);
return threadData.get();
lock, [this] { return (!this->JobQueue.empty() || this->Joining); });
return nullptr;
std::thread vtkSMPThreadPool::MakeThread()
return std::thread{ [this]() {
while (!this->Initialized.load(std::memory_order_acquire))
if (this->JobQueue.empty())
ThreadData& threadData = *this->GetCallerThreadData();
// Main loop for threads of the pool
// When they are woke up, they check for new job and stop if "this->Joining" is true
// and no more jobs are running
while (true)
std::unique_lock<std::mutex> lock{ threadData.Mutex };
// Job stealing could be implemented but it will requires some changes in the process
// A thread that as no longer work to do could look at other threads jobs to "steal" a job
// from them and thus increase parallelism. This must take care of not generating deadlocks
// and should not increase Proxy parallelism above requested thread count.
// This goes out of the scope of current implementation.
threadData.ConditionVariable.wait(lock, [this, &threadData] {
return !threadData.Jobs.empty() || this->Joining.load(std::memory_order_acquire);
if (threadData.Jobs.empty())
break; // joining
job = std::move(this->JobQueue.front());
RunJob(threadData, threadData.Jobs.size() - 1, lock);
} };
void vtkSMPThreadPool::FillThreadsForNestedProxy(ProxyData* proxy, std::size_t maxCount)
// This function assigns thread for proxies, this function assumes that the calling thread is
// already part of the assigned thread for the proxy.
// Otherwise it will assign thread pool threads that are not already used by any of proxy parents
if (proxy->Parent->Threads.size() == this->Threads.size())
return; // No thread will be available
const auto isFree = [proxy](ThreadData* threadData) {
for (auto* parent = proxy->Parent; parent != nullptr; parent = parent->Parent)
for (auto& proxyThread : parent->Threads)
if (proxyThread.Thread == threadData)
return false;
return true;
for (auto& threadData : this->Threads)
if (isFree(threadData.get()))
proxy->Threads.emplace_back(threadData.get(), this->GetNextThreadId());
if (proxy->Threads.size() == maxCount)
std::vector<std::thread>* vtk::detail::smp::vtkSMPThreadPool::GetThreads()
std::size_t vtkSMPThreadPool::GetNextThreadId() noexcept
return &(this->Threads);
return this->NextProxyThreadId.fetch_add(1, std::memory_order_relaxed) + 1;
vtkSMPThreadPool& vtkSMPThreadPool::GetInstance()
static vtkSMPThreadPool instance{};
return instance;
} // namespace smp
} // namespace detail
......@@ -27,11 +27,11 @@
#include "vtkCommonCoreModule.h" // For export macro
#include "vtkSystemIncludes.h"
#include <condition_variable> // For std::condition_variable
#include <functional> // For std::function
#include <mutex> // For std::mutex
#include <queue> // For std::queue
#include <thread> // For std::thread
#include <atomic> // For std::atomic
#include <functional> // For std::function
#include <mutex> // For std::unique_lock
#include <thread> // For std::thread
#include <vector> // For std::vector
namespace vtk
......@@ -41,24 +41,147 @@ namespace smp
* @brief Internal thread pool implementation used in SMP functions
* This class is designed to be a Singleton thread pool, but local pool can be allocated too.
* This thread pool use a Proxy system that is used to allocate a certain amount of threads from
* the pool, which enable support for SMP local scopes.
* You need to have a Proxy to submit job to the pool.
// Internal data structures
struct ThreadJob;
struct ThreadData;
struct ProxyThreadData;
struct ProxyData;
explicit vtkSMPThreadPool(int ThreadNumber);
* @brief Proxy class used to submit work to the thread pool.
* A proxy act like a single thread pool, but it submits work to its parent thread pool.
* Using a proxy from multiple threads at the same time is undefined behaviour.
* Note: Even if nothing prevent a proxy to be moved around threads, it should either be used in
* the creating thread or in a thread that does not belong to the pool, otherwise it may create a
* deadlock when joining.
* @brief Destructor
* Join must have been called since the last DoJob before destroying the proxy.
Proxy(const Proxy&) = delete;
Proxy& operator=(const Proxy&) = delete;
Proxy(Proxy&&) noexcept;
Proxy& operator=(Proxy&&) noexcept;
void Join();
void DoJob(std::function<void(void)> job);
std::vector<std::thread>* GetThreads();
* @brief Blocks calling thread until all jobs are done.
* Note: nested proxies may execute jobs on calling thread during this function to maximize
* parallelism.
void Join();
void ThreadJob();
* @brief Add a job to the thread pool queue
void DoJob(std::function<void()> job);
* @brief Get a reference on all system threads used by this proxy
std::vector<std::reference_wrapper<std::thread>> GetThreads() const;
* @brief Return true is this proxy is allocated from a thread that does not belong to the pool
bool IsTopLevel() const noexcept;
friend class vtkSMPThreadPool; // Only the thread pool can construct this object
Proxy(std::unique_ptr<ProxyData>&& data);
std::unique_ptr<ProxyData> Data;
vtkSMPThreadPool(const vtkSMPThreadPool&) = delete;
vtkSMPThreadPool& operator=(const vtkSMPThreadPool&) = delete;
* @brief Create a proxy
* Create a proxy that will use at most threadCount thread of the thread pool.
* Proxy act as a thread pool on its own, but will in practice submit its work to this pool,
* this prevent threads to be created everytime a SMP function is called.
* If the current thread not in the pool, it will create a "top-level" proxy, otherwise it will
* create a nested proxy. A nested proxy will never use a thread that is already in use by its
* "parent" proxies to prevent deadlocks. It means that nested paralism may have a more limited
* amount of threads.
* @param threadCount max amount of thread to use. If 0, uses the number of thread of the pool.
* If greater than the number of thread of the pool, uses the number of thread of the pool.
* @return A proxy.
Proxy AllocateThreads(std::size_t threadCount = 0);
* Value returned by `GetThreadID` when called by a thread that does not belong to the pool.
static constexpr std::size_t ExternalThreadID = 1;
* @brief Get caller proxy thread virtual ID
* This function must be called from a proxy thread.
* If this function is called from non proxy thread, returns `ExternalThreadID`.
* Valid proxy thread virtual ID are always >= 2
std::size_t GetThreadId() const noexcept;
* @brief Returns true when called from a proxy thread, false otherwise.
bool IsParallelScope() const noexcept;
* @brief Returns true for a single proxy thread, false for the others.
bool GetSingleThread() const;
* @brief Returns number of system thread used by the thread pool.
std::size_t ThreadCount() const noexcept;
std::mutex Mutex;
bool Joining = false;
std::condition_variable ConditionVariable;
std::queue<std::function<void(void)>> JobQueue;
std::vector<std::thread> Threads;
// static because also used by proxy
static void RunJob(ThreadData& data, std::size_t jobIndex, std::unique_lock<std::mutex>& lock);
ThreadData* GetCallerThreadData() const noexcept;
std::thread MakeThread();
void FillThreadsForNestedProxy(ProxyData* proxy, std::size_t maxCount);
std::size_t GetNextThreadId() noexcept;
std::atomic<bool> Initialized{};
std::atomic<bool> Joining{};
std::vector<std::unique_ptr<ThreadData>> Threads; // Thread pool, fixed size
std::atomic<std::size_t> NextProxyThreadId{ 1 };
static vtkSMPThreadPool& GetInstance();
......@@ -17,7 +17,6 @@
#include "SMP/STDThread/vtkSMPToolsImpl.txx"
#include <cstdlib> // For std::getenv()
#include <stack> // For std::stack
#include <thread> // For std::thread::hardware_concurrency()
namespace vtk
......@@ -28,8 +27,12 @@ namespace smp
static int specifiedNumThreads = 0;
static std::stack<std::thread::id> threadIdStack;
static std::mutex threadIdStackLock;
int GetNumberOfThreadsSTDThread()
return specifiedNumThreads ? specifiedNumThreads : std::thread::hardware_concurrency();
template <>
......@@ -56,45 +59,24 @@ void vtkSMPToolsImpl<BackendType::STDThread>::Initialize(int numThreads)
int GetNumberOfThreadsSTDThread()
return specifiedNumThreads ? specifiedNumThreads : std::thread::hardware_concurrency();
void PushThreadId(std::thread::id id)
void PopThreadId()
bool GetSingleThreadSTDThread()
template <>
int vtkSMPToolsImpl<BackendType::STDThread>::GetEstimatedNumberOfThreads()
return == std::this_thread::get_id();
return specifiedNumThreads > 0 ? specifiedNumThreads : std::thread::hardware_concurrency();
template <>
int vtkSMPToolsImpl<BackendType::STDThread>::GetEstimatedNumberOfThreads()
bool vtkSMPToolsImpl<BackendType::STDThread>::GetSingleThread()
return GetNumberOfThreadsSTDThread();
return vtkSMPThreadPool::GetInstance().GetSingleThread();
template <>
bool vtkSMPToolsImpl<BackendType::STDThread>::GetSingleThread()
bool vtkSMPToolsImpl<BackendType::STDThread>::IsParallelScope()
return GetSingleThreadSTDThread();
return vtkSMPThreadPool::GetInstance().IsParallelScope();
......@@ -33,19 +33,6 @@ namespace smp
int VTKCOMMONCORE_EXPORT GetNumberOfThreadsSTDThread();
bool VTKCOMMONCORE_EXPORT GetSingleThreadSTDThread();
void VTKCOMMONCORE_EXPORT PushThreadId(std::thread::id id);
template <typename FunctorInternal>
void ExecuteFunctorSTDThread(void* functor, vtkIdType from, vtkIdType grain, vtkIdType last)
const vtkIdType to = std::min(from + grain, last);
FunctorInternal& fi = *reinterpret_cast<FunctorInternal*>(functor);
fi.Execute(from, to);
template <>
......@@ -59,7 +46,7 @@ void vtkSMPToolsImpl<BackendType::STDThread>::For(
if (grain >= n || (!this->NestedActivated && this->IsParallel))
if (grain >= n || (!this->NestedActivated && vtkSMPThreadPool::GetInstance().IsParallelScope()))
fi.Execute(first, last);
......@@ -73,32 +60,15 @@ void vtkSMPToolsImpl<BackendType::STDThread>::For(
grain = (estimateGrain > 0) ? estimateGrain : 1;
// /!\ This behaviour should be changed if we want more control on nested
// (e.g only the 2 first nested For are in parallel)
bool fromParallelCode = this->;
vtkSMPThreadPool pool(threadNumber);
auto proxy = vtkSMPThreadPool::GetInstance().AllocateThreads(threadNumber);
for (vtkIdType from = first; from < last; from += grain)
auto job = std::bind(ExecuteFunctorSTDThread<FunctorInternal>, &fi, from, grain, last);
const auto to = (std::min)(from + grain, last);
proxy.DoJob([&fi, from, to] { fi.Execute(from, to); });
// Atomic contortion to achieve this->IsParallel &= fromParallelCode.
// This compare&exchange basically boils down to:
// if (IsParallel == trueFlag)
// IsParallel = fromParallelCode;
// else
// trueFlag = IsParallel;
// Which either leaves IsParallel as false or sets it to fromParallelCode (i.e. &=).
// Note that the return value of compare_exchange_weak() is not needed,
// and that no looping is necessary.
bool trueFlag = true;
this->IsParallel.compare_exchange_weak(trueFlag, fromParallelCode);
......@@ -169,6 +139,10 @@ int vtkSMPToolsImpl<BackendType::STDThread>::GetEstimatedNumberOfThreads();
template <>
bool vtkSMPToolsImpl<BackendType::STDThread>::GetSingleThread();
template <>
bool vtkSMPToolsImpl<BackendType::STDThread>::IsParallelScope();
} // namespace smp
} // namespace detail
# Improve VTK SMP STDThread backend
A common, global, thread pool is shared between all SMP calls, so they no longer create threads.
Nested SMP calls no longer spawn additional threads.
These changes lead to much better performances for small SMP calls and a better scalability for nested calls!
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment