diff --git a/Common/Core/SMP/STDThread/vtkSMPThreadLocalBackend.cxx b/Common/Core/SMP/STDThread/vtkSMPThreadLocalBackend.cxx index 60eaac541ca4f52060bf5f2d52b47f085e6ee361..06d9d4062aac5fa2e87e57bd82865af19d4cb33b 100644 --- a/Common/Core/SMP/STDThread/vtkSMPThreadLocalBackend.cxx +++ b/Common/Core/SMP/STDThread/vtkSMPThreadLocalBackend.cxx @@ -15,6 +15,8 @@ #include "SMP/STDThread/vtkSMPThreadLocalBackend.h" +#include "SMP/STDThread/vtkSMPThreadPool.h" + #include <algorithm> #include <cmath> // For std::floor & std::log2 #include <functional> // For std::hash @@ -32,7 +34,7 @@ VTK_ABI_NAMESPACE_BEGIN static ThreadIdType GetThreadId() { - return std::hash<std::thread::id>{}(std::this_thread::get_id()); + return vtkSMPThreadPool::GetInstance().GetThreadId(); } // 32 bit FNV-1a hash function diff --git a/Common/Core/SMP/STDThread/vtkSMPThreadPool.cxx b/Common/Core/SMP/STDThread/vtkSMPThreadPool.cxx index 08ac09170d1d1d336a30c2b749e42b65ce3695dd..ec91c39c04a93bd015c3348fc0f36802979eed52 100644 --- a/Common/Core/SMP/STDThread/vtkSMPThreadPool.cxx +++ b/Common/Core/SMP/STDThread/vtkSMPThreadPool.cxx @@ -15,6 +15,12 @@ #include "SMP/STDThread/vtkSMPThreadPool.h" +#include <vtkObject.h> + +#include <algorithm> +#include <cassert> +#include <condition_variable> +#include <future> #include <iostream> namespace vtk @@ -25,66 +31,410 @@ namespace smp { VTK_ABI_NAMESPACE_BEGIN -vtkSMPThreadPool::vtkSMPThreadPool(int threadNumber) +static constexpr std::size_t NoRunningJob = (std::numeric_limits<std::size_t>::max)(); + +struct vtkSMPThreadPool::ThreadJob +{ + // This construtor is needed because aggregate initialization can not have default value + // (prior to C++14) + // also because emplace_back can not use aggregate initialization (prior to C++20) + ThreadJob(ProxyData* proxy = nullptr, std::function<void()> function = nullptr) + : Proxy{ proxy } + , Function{ std::move(function) } + { + } + + ProxyData* Proxy{}; // Proxy that allocated this job + std::function<void()> Function{}; // Actual user job + std::promise<void> Promise{}; // Set when job is done +}; + +struct vtkSMPThreadPool::ThreadData +{ + // stack of jobs, any thread can push, and only push, jobs (and Mutex must be locked) + std::vector<ThreadJob> Jobs{}; + // Current job (used to map thread to Proxy), using an index is okay as only this thread can + // erase the job and other threads can only push back new jobs not insert. This constaint could + // be relaxed by using unique ids instead. + std::size_t RunningJob{ NoRunningJob }; + std::thread SystemThread{}; // the system thread, not really used + std::mutex Mutex{}; // thread mutex, used for Jobs manipulation + std::condition_variable ConditionVariable{}; // thread cv, used to wake up the thread +}; + +struct vtkSMPThreadPool::ProxyThreadData +{ + // This construtor is needed because aggregate initialization can not have default value + // (prior to C++14) + // also because emplace_back can not use aggregate initialization (prior to C++20) + ProxyThreadData(ThreadData* threadData = nullptr, std::size_t id = 0) + : Thread{ threadData } + , Id{ id } + { + } + + ThreadData* Thread{}; // The thread data from the pool + std::size_t Id{}; // Virtual thread ID, mainly used for thread local variables +}; + +struct vtkSMPThreadPool::ProxyData +{ + vtkSMPThreadPool* Pool{}; // Pool that created this proxy + ProxyData* Parent{}; // either null (for top level) or the parent + std::vector<ProxyThreadData> Threads{}; // Threads used by this + std::size_t NextThread{}; // Round-robin thread for jobs + std::vector<std::future<void>> JobsFutures{}; // Used to know when job is done + std::mutex Mutex{}; // Used to synchronize +}; + +void vtkSMPThreadPool::RunJob( + ThreadData& data, std::size_t jobIndex, std::unique_lock<std::mutex>& lock) +{ + assert(lock.owns_lock() && "Caller must have locked mutex"); + assert(jobIndex < data.Jobs.size() && "jobIndex out of range"); + + const auto oldRunningJob = data.RunningJob; // store old running job for nested threads + data.RunningJob = jobIndex; // Set thread running job + auto function = std::move(data.Jobs[data.RunningJob].Function); + lock.unlock(); // MSVC: warning C26110 is a false positive + + try + { + function(); // run the function + } + catch (const std::exception& e) + { + vtkErrorWithObjectMacro(nullptr, + "Function called by " << vtkSMPThreadPool::GetInstance().GetThreadId() + << " has thrown an exception. The exception is ignored. what():\n" + << e.what()); + } + catch (...) + { + vtkErrorWithObjectMacro(nullptr, + "Function called by " << vtkSMPThreadPool::GetInstance().GetThreadId() + << " has thrown an unknown exception. The exception is ignored."); + } + + lock.lock(); + data.Jobs[data.RunningJob].Promise.set_value(); + data.Jobs.erase(data.Jobs.begin() + jobIndex); + data.RunningJob = oldRunningJob; +} + +vtkSMPThreadPool::Proxy::Proxy(std::unique_ptr<ProxyData>&& data) + : Data{ std::move(data) } +{ +} + +vtkSMPThreadPool::Proxy::~Proxy() +{ + if (!this->Data->JobsFutures.empty()) + { + vtkErrorWithObjectMacro(nullptr, "Proxy not joined. Terminating."); + std::terminate(); + } +} + +vtkSMPThreadPool::Proxy::Proxy(Proxy&&) noexcept = default; +vtkSMPThreadPool::Proxy& vtkSMPThreadPool::Proxy::operator=(Proxy&&) noexcept = default; + +void vtkSMPThreadPool::Proxy::Join() +{ + if (this->IsTopLevel()) // wait for all futures, all jobs are done by other threads + { + for (auto& future : this->Data->JobsFutures) + { + future.wait(); + } + } + else // nested run code in calling thread too + { + // Run jobs associated with this thread and proxy + ThreadData& threadData = *this->Data->Threads[0].Thread; + assert(threadData.SystemThread.get_id() == std::this_thread::get_id()); + + while (true) + { + // protect access in case other thread push work for current thread + std::unique_lock<std::mutex> lock{ threadData.Mutex }; + + auto it = std::find_if(threadData.Jobs.begin(), threadData.Jobs.end(), + [this](ThreadJob& job) { return job.Proxy == this->Data.get(); }); + + if (it == threadData.Jobs.end()) // no remaining job associated to this proxy + { + break; + } + + const auto jobIndex = static_cast<std::size_t>(std::distance(threadData.Jobs.begin(), it)); + RunJob(threadData, jobIndex, lock); + } + + for (auto& future : this->Data->JobsFutures) + { + future.wait(); + } + } + + this->Data->JobsFutures.clear(); +} + +void vtkSMPThreadPool::Proxy::DoJob(std::function<void()> job) +{ + this->Data->NextThread = (this->Data->NextThread + 1) % this->Data->Threads.size(); + auto& proxyThread = this->Data->Threads[this->Data->NextThread]; + + if (!this->IsTopLevel() && this->Data->NextThread == 0) // when nested, thread 0 is "this_thread" + { + assert(std::this_thread::get_id() == proxyThread.Thread->SystemThread.get_id()); + + std::unique_lock<std::mutex> lock{ proxyThread.Thread->Mutex }; + proxyThread.Thread->Jobs.emplace_back(this->Data.get(), std::move(job)); + } + else + { + std::unique_lock<std::mutex> lock{ proxyThread.Thread->Mutex }; + + auto& jobs = proxyThread.Thread->Jobs; + jobs.emplace_back(this->Data.get(), std::move(job)); + this->Data->JobsFutures.emplace_back(jobs.back().Promise.get_future()); + + lock.unlock(); + + proxyThread.Thread->ConditionVariable.notify_one(); + } +} + +std::vector<std::reference_wrapper<std::thread>> vtkSMPThreadPool::Proxy::GetThreads() const +{ + std::vector<std::reference_wrapper<std::thread>> output; + + for (auto& proxyThread : this->Data->Threads) + { + output.emplace_back(proxyThread.Thread->SystemThread); + } + + return output; +} + +bool vtkSMPThreadPool::Proxy::IsTopLevel() const noexcept +{ + return this->Data->Parent == nullptr; +} + +vtkSMPThreadPool::vtkSMPThreadPool() { - this->Threads.reserve(threadNumber); - for (int i = 0; i < threadNumber; ++i) + const auto threadCount = static_cast<std::size_t>(std::thread::hardware_concurrency()); + + this->Threads.reserve(threadCount); + for (std::size_t i{}; i < threadCount; ++i) { - this->Threads.emplace_back(std::bind(&vtkSMPThreadPool::ThreadJob, this)); + std::unique_ptr<ThreadData> data{ new ThreadData{} }; + data->SystemThread = this->MakeThread(); + this->Threads.emplace_back(std::move(data)); } + + this->Initialized.store(true, std::memory_order_release); } -void vtkSMPThreadPool::Join() +vtkSMPThreadPool::~vtkSMPThreadPool() { + this->Joining.store(true, std::memory_order_release); + + for (auto& threadData : this->Threads) { - std::unique_lock<std::mutex> lock(this->Mutex); + threadData->ConditionVariable.notify_one(); + } - this->Joining = true; - this->ConditionVariable.notify_all(); + for (auto& threadData : this->Threads) + { + threadData->SystemThread.join(); } +} - for (auto& it : this->Threads) +vtkSMPThreadPool::Proxy vtkSMPThreadPool::AllocateThreads(std::size_t threadCount) +{ + if (threadCount == 0 || threadCount > this->ThreadCount()) { - it.join(); + threadCount = this->ThreadCount(); + } + + std::unique_ptr<ProxyData> proxy{ new ProxyData{} }; + proxy->Pool = this; + proxy->Threads.reserve(threadCount); + + // Check if we are in the pool + ThreadData* threadData = this->GetCallerThreadData(); + if (threadData) + { + // Don't lock since we are in the running job, in this thread + proxy->Parent = threadData->Jobs[threadData->RunningJob].Proxy; + // First thread is always current thread + proxy->Threads.emplace_back(threadData, this->GetNextThreadId()); + this->FillThreadsForNestedProxy(proxy.get(), threadCount); + } + else + { + proxy->Parent = nullptr; + for (std::size_t i{}; i < threadCount; ++i) + { + proxy->Threads.emplace_back(this->Threads[i].get(), this->GetNextThreadId()); + } } + + return Proxy{ std::move(proxy) }; } -void vtkSMPThreadPool::DoJob(std::function<void(void)> job) +std::size_t vtkSMPThreadPool::GetThreadId() const noexcept { - std::unique_lock<std::mutex> lock(this->Mutex); + auto* threadData = this->GetCallerThreadData(); + + if (threadData) + { + std::unique_lock<std::mutex> lock{ threadData->Mutex }; // protect threadData->Jobs access + assert(threadData->RunningJob != NoRunningJob && "Invalid state"); + auto& proxyThreads = threadData->Jobs[threadData->RunningJob].Proxy->Threads; + lock.unlock(); + + for (const auto& proxyThread : proxyThreads) + { + if (proxyThread.Thread == threadData) + { + return proxyThread.Id; + } + } + } - this->JobQueue.emplace(std::move(job)); - this->ConditionVariable.notify_one(); + // Use 1 for any thread outside the pool and 2+ for ids of proxy thread because thread local + // implementation uses ID "0" for invalid state + return ExternalThreadID; } -void vtkSMPThreadPool::ThreadJob() +bool vtkSMPThreadPool::IsParallelScope() const noexcept { - std::function<void(void)> job; + return GetCallerThreadData() != nullptr; +} + +bool vtkSMPThreadPool::GetSingleThread() const +{ + // Return true if the caller is the thread[0] of the current running proxy - while (true) + auto* threadData = GetCallerThreadData(); + if (threadData) { + std::lock_guard<std::mutex> lock{ threadData->Mutex }; + assert(threadData->RunningJob != NoRunningJob && "Invalid state"); + return threadData->Jobs[threadData->RunningJob].Proxy->Threads[0].Thread == threadData; + } + + return false; +} + +std::size_t vtkSMPThreadPool::ThreadCount() const noexcept +{ + return this->Threads.size(); +} + +vtkSMPThreadPool::ThreadData* vtkSMPThreadPool::GetCallerThreadData() const noexcept +{ + for (const auto& threadData : this->Threads) + { + if (threadData->SystemThread.get_id() == std::this_thread::get_id()) { - std::unique_lock<std::mutex> lock(this->Mutex); + return threadData.get(); + } + } - this->ConditionVariable.wait( - lock, [this] { return (!this->JobQueue.empty() || this->Joining); }); + return nullptr; +} + +std::thread vtkSMPThreadPool::MakeThread() +{ + return std::thread{ [this]() { + while (!this->Initialized.load(std::memory_order_acquire)) + { + } - if (this->JobQueue.empty()) + ThreadData& threadData = *this->GetCallerThreadData(); + + // Main loop for threads of the pool + // When they are woke up, they check for new job and stop if "this->Joining" is true + // and no more jobs are running + while (true) + { + std::unique_lock<std::mutex> lock{ threadData.Mutex }; + + // Job stealing could be implemented but it will requires some changes in the process + // A thread that as no longer work to do could look at other threads jobs to "steal" a job + // from them and thus increase parallelism. This must take care of not generating deadlocks + // and should not increase Proxy parallelism above requested thread count. + // This goes out of the scope of current implementation. + threadData.ConditionVariable.wait(lock, [this, &threadData] { + return !threadData.Jobs.empty() || this->Joining.load(std::memory_order_acquire); + }); + + if (threadData.Jobs.empty()) { - return; + break; // joining } - job = std::move(this->JobQueue.front()); - this->JobQueue.pop(); + RunJob(threadData, threadData.Jobs.size() - 1, lock); + } + } }; +} + +void vtkSMPThreadPool::FillThreadsForNestedProxy(ProxyData* proxy, std::size_t maxCount) +{ + // This function assigns thread for proxies, this function assumes that the calling thread is + // already part of the assigned thread for the proxy. + // Otherwise it will assign thread pool threads that are not already used by any of proxy parents + + if (proxy->Parent->Threads.size() == this->Threads.size()) + { + return; // No thread will be available + } + + const auto isFree = [proxy](ThreadData* threadData) { + for (auto* parent = proxy->Parent; parent != nullptr; parent = parent->Parent) + { + for (auto& proxyThread : parent->Threads) + { + if (proxyThread.Thread == threadData) + { + return false; + } + } + } + + return true; + }; + + for (auto& threadData : this->Threads) + { + if (isFree(threadData.get())) + { + proxy->Threads.emplace_back(threadData.get(), this->GetNextThreadId()); + } + + if (proxy->Threads.size() == maxCount) + { + break; } - job(); } } -std::vector<std::thread>* vtk::detail::smp::vtkSMPThreadPool::GetThreads() +std::size_t vtkSMPThreadPool::GetNextThreadId() noexcept { - return &(this->Threads); + return this->NextProxyThreadId.fetch_add(1, std::memory_order_relaxed) + 1; } + +vtkSMPThreadPool& vtkSMPThreadPool::GetInstance() +{ + static vtkSMPThreadPool instance{}; + return instance; +} + VTK_ABI_NAMESPACE_END } // namespace smp } // namespace detail diff --git a/Common/Core/SMP/STDThread/vtkSMPThreadPool.h b/Common/Core/SMP/STDThread/vtkSMPThreadPool.h index 5ed90001573550fcb3254906a46ff67b7afb8dd4..713462fc9e6f9177d20b9bd71a83348cea41b20e 100644 --- a/Common/Core/SMP/STDThread/vtkSMPThreadPool.h +++ b/Common/Core/SMP/STDThread/vtkSMPThreadPool.h @@ -27,11 +27,11 @@ #include "vtkCommonCoreModule.h" // For export macro #include "vtkSystemIncludes.h" -#include <condition_variable> // For std::condition_variable -#include <functional> // For std::function -#include <mutex> // For std::mutex -#include <queue> // For std::queue -#include <thread> // For std::thread +#include <atomic> // For std::atomic +#include <functional> // For std::function +#include <mutex> // For std::unique_lock +#include <thread> // For std::thread +#include <vector> // For std::vector namespace vtk { @@ -41,24 +41,147 @@ namespace smp { VTK_ABI_NAMESPACE_BEGIN +/** + * @brief Internal thread pool implementation used in SMP functions + * + * This class is designed to be a Singleton thread pool, but local pool can be allocated too. + * This thread pool use a Proxy system that is used to allocate a certain amount of threads from + * the pool, which enable support for SMP local scopes. + * You need to have a Proxy to submit job to the pool. + */ class VTKCOMMONCORE_EXPORT vtkSMPThreadPool { + // Internal data structures + struct ThreadJob; + struct ThreadData; + struct ProxyThreadData; + struct ProxyData; + public: - explicit vtkSMPThreadPool(int ThreadNumber); + /** + * @brief Proxy class used to submit work to the thread pool. + * + * A proxy act like a single thread pool, but it submits work to its parent thread pool. + * Using a proxy from multiple threads at the same time is undefined behaviour. + * + * Note: Even if nothing prevent a proxy to be moved around threads, it should either be used in + * the creating thread or in a thread that does not belong to the pool, otherwise it may create a + * deadlock when joining. + */ + class VTKCOMMONCORE_EXPORT Proxy final + { + public: + /** + * @brief Destructor + * + * Join must have been called since the last DoJob before destroying the proxy. + */ + ~Proxy(); + Proxy(const Proxy&) = delete; + Proxy& operator=(const Proxy&) = delete; + Proxy(Proxy&&) noexcept; + Proxy& operator=(Proxy&&) noexcept; - void Join(); - void DoJob(std::function<void(void)> job); - std::vector<std::thread>* GetThreads(); + /** + * @brief Blocks calling thread until all jobs are done. + * + * Note: nested proxies may execute jobs on calling thread during this function to maximize + * parallelism. + */ + void Join(); -private: - void ThreadJob(); + /** + * @brief Add a job to the thread pool queue + */ + void DoJob(std::function<void()> job); + + /** + * @brief Get a reference on all system threads used by this proxy + */ + std::vector<std::reference_wrapper<std::thread>> GetThreads() const; + + /** + * @brief Return true is this proxy is allocated from a thread that does not belong to the pool + */ + bool IsTopLevel() const noexcept; + + private: + friend class vtkSMPThreadPool; // Only the thread pool can construct this object + + Proxy(std::unique_ptr<ProxyData>&& data); + + std::unique_ptr<ProxyData> Data; + }; + + vtkSMPThreadPool(); + ~vtkSMPThreadPool(); + vtkSMPThreadPool(const vtkSMPThreadPool&) = delete; + vtkSMPThreadPool& operator=(const vtkSMPThreadPool&) = delete; + + /** + * @brief Create a proxy + * + * Create a proxy that will use at most threadCount thread of the thread pool. + * Proxy act as a thread pool on its own, but will in practice submit its work to this pool, + * this prevent threads to be created everytime a SMP function is called. + * + * If the current thread not in the pool, it will create a "top-level" proxy, otherwise it will + * create a nested proxy. A nested proxy will never use a thread that is already in use by its + * "parent" proxies to prevent deadlocks. It means that nested paralism may have a more limited + * amount of threads. + * + * @param threadCount max amount of thread to use. If 0, uses the number of thread of the pool. + * If greater than the number of thread of the pool, uses the number of thread of the pool. + * @return A proxy. + */ + Proxy AllocateThreads(std::size_t threadCount = 0); + + /** + * Value returned by `GetThreadID` when called by a thread that does not belong to the pool. + */ + static constexpr std::size_t ExternalThreadID = 1; + + /** + * @brief Get caller proxy thread virtual ID + * + * This function must be called from a proxy thread. + * If this function is called from non proxy thread, returns `ExternalThreadID`. + * Valid proxy thread virtual ID are always >= 2 + */ + std::size_t GetThreadId() const noexcept; + + /** + * @brief Returns true when called from a proxy thread, false otherwise. + */ + bool IsParallelScope() const noexcept; + + /** + * @brief Returns true for a single proxy thread, false for the others. + */ + bool GetSingleThread() const; + + /** + * @brief Returns number of system thread used by the thread pool. + */ + std::size_t ThreadCount() const noexcept; private: - std::mutex Mutex; - bool Joining = false; - std::condition_variable ConditionVariable; - std::queue<std::function<void(void)>> JobQueue; - std::vector<std::thread> Threads; + // static because also used by proxy + static void RunJob(ThreadData& data, std::size_t jobIndex, std::unique_lock<std::mutex>& lock); + + ThreadData* GetCallerThreadData() const noexcept; + + std::thread MakeThread(); + void FillThreadsForNestedProxy(ProxyData* proxy, std::size_t maxCount); + std::size_t GetNextThreadId() noexcept; + + std::atomic<bool> Initialized{}; + std::atomic<bool> Joining{}; + std::vector<std::unique_ptr<ThreadData>> Threads; // Thread pool, fixed size + std::atomic<std::size_t> NextProxyThreadId{ 1 }; + +public: + static vtkSMPThreadPool& GetInstance(); }; VTK_ABI_NAMESPACE_END diff --git a/Common/Core/SMP/STDThread/vtkSMPToolsImpl.cxx b/Common/Core/SMP/STDThread/vtkSMPToolsImpl.cxx index c6d1c3d4c3810fdf51dcbd4aab5738b503f3691c..7ebda7378aa06ca0544d39ff03a692bb48d82fb5 100644 --- a/Common/Core/SMP/STDThread/vtkSMPToolsImpl.cxx +++ b/Common/Core/SMP/STDThread/vtkSMPToolsImpl.cxx @@ -17,7 +17,6 @@ #include "SMP/STDThread/vtkSMPToolsImpl.txx" #include <cstdlib> // For std::getenv() -#include <stack> // For std::stack #include <thread> // For std::thread::hardware_concurrency() namespace vtk @@ -28,8 +27,12 @@ namespace smp { VTK_ABI_NAMESPACE_BEGIN static int specifiedNumThreads = 0; -static std::stack<std::thread::id> threadIdStack; -static std::mutex threadIdStackLock; + +//------------------------------------------------------------------------------ +int GetNumberOfThreadsSTDThread() +{ + return specifiedNumThreads ? specifiedNumThreads : std::thread::hardware_concurrency(); +} //------------------------------------------------------------------------------ template <> @@ -56,45 +59,24 @@ void vtkSMPToolsImpl<BackendType::STDThread>::Initialize(int numThreads) } //------------------------------------------------------------------------------ -int GetNumberOfThreadsSTDThread() -{ - return specifiedNumThreads ? specifiedNumThreads : std::thread::hardware_concurrency(); -} - -//------------------------------------------------------------------------------ -void PushThreadId(std::thread::id id) -{ - threadIdStackLock.lock(); - threadIdStack.emplace(id); - threadIdStackLock.unlock(); -} - -//------------------------------------------------------------------------------ -void PopThreadId() -{ - threadIdStackLock.lock(); - threadIdStack.pop(); - threadIdStackLock.unlock(); -} - -//------------------------------------------------------------------------------ -bool GetSingleThreadSTDThread() +template <> +int vtkSMPToolsImpl<BackendType::STDThread>::GetEstimatedNumberOfThreads() { - return threadIdStack.top() == std::this_thread::get_id(); + return specifiedNumThreads > 0 ? specifiedNumThreads : std::thread::hardware_concurrency(); } //------------------------------------------------------------------------------ template <> -int vtkSMPToolsImpl<BackendType::STDThread>::GetEstimatedNumberOfThreads() +bool vtkSMPToolsImpl<BackendType::STDThread>::GetSingleThread() { - return GetNumberOfThreadsSTDThread(); + return vtkSMPThreadPool::GetInstance().GetSingleThread(); } //------------------------------------------------------------------------------ template <> -bool vtkSMPToolsImpl<BackendType::STDThread>::GetSingleThread() +bool vtkSMPToolsImpl<BackendType::STDThread>::IsParallelScope() { - return GetSingleThreadSTDThread(); + return vtkSMPThreadPool::GetInstance().IsParallelScope(); } VTK_ABI_NAMESPACE_END diff --git a/Common/Core/SMP/STDThread/vtkSMPToolsImpl.txx b/Common/Core/SMP/STDThread/vtkSMPToolsImpl.txx index f25557441af9b1ab500ec6d55d9a4d5c7e6714ce..d62dcb39f62b392fc8938473ddb1f609362733d9 100644 --- a/Common/Core/SMP/STDThread/vtkSMPToolsImpl.txx +++ b/Common/Core/SMP/STDThread/vtkSMPToolsImpl.txx @@ -33,19 +33,6 @@ namespace smp VTK_ABI_NAMESPACE_BEGIN int VTKCOMMONCORE_EXPORT GetNumberOfThreadsSTDThread(); -bool VTKCOMMONCORE_EXPORT GetSingleThreadSTDThread(); -void VTKCOMMONCORE_EXPORT PushThreadId(std::thread::id id); -void VTKCOMMONCORE_EXPORT PopThreadId(); - -//-------------------------------------------------------------------------------- -template <typename FunctorInternal> -void ExecuteFunctorSTDThread(void* functor, vtkIdType from, vtkIdType grain, vtkIdType last) -{ - const vtkIdType to = std::min(from + grain, last); - - FunctorInternal& fi = *reinterpret_cast<FunctorInternal*>(functor); - fi.Execute(from, to); -} //-------------------------------------------------------------------------------- template <> @@ -59,7 +46,7 @@ void vtkSMPToolsImpl<BackendType::STDThread>::For( return; } - if (grain >= n || (!this->NestedActivated && this->IsParallel)) + if (grain >= n || (!this->NestedActivated && vtkSMPThreadPool::GetInstance().IsParallelScope())) { fi.Execute(first, last); } @@ -73,32 +60,15 @@ void vtkSMPToolsImpl<BackendType::STDThread>::For( grain = (estimateGrain > 0) ? estimateGrain : 1; } - // /!\ This behaviour should be changed if we want more control on nested - // (e.g only the 2 first nested For are in parallel) - bool fromParallelCode = this->IsParallel.exchange(true); - - vtkSMPThreadPool pool(threadNumber); - PushThreadId((pool.GetThreads())->at(0).get_id()); + auto proxy = vtkSMPThreadPool::GetInstance().AllocateThreads(threadNumber); for (vtkIdType from = first; from < last; from += grain) { - auto job = std::bind(ExecuteFunctorSTDThread<FunctorInternal>, &fi, from, grain, last); - pool.DoJob(job); + const auto to = (std::min)(from + grain, last); + proxy.DoJob([&fi, from, to] { fi.Execute(from, to); }); } - pool.Join(); - - PopThreadId(); - // Atomic contortion to achieve this->IsParallel &= fromParallelCode. - // This compare&exchange basically boils down to: - // if (IsParallel == trueFlag) - // IsParallel = fromParallelCode; - // else - // trueFlag = IsParallel; - // Which either leaves IsParallel as false or sets it to fromParallelCode (i.e. &=). - // Note that the return value of compare_exchange_weak() is not needed, - // and that no looping is necessary. - bool trueFlag = true; - this->IsParallel.compare_exchange_weak(trueFlag, fromParallelCode); + + proxy.Join(); } } @@ -169,6 +139,10 @@ int vtkSMPToolsImpl<BackendType::STDThread>::GetEstimatedNumberOfThreads(); template <> bool vtkSMPToolsImpl<BackendType::STDThread>::GetSingleThread(); +//-------------------------------------------------------------------------------- +template <> +bool vtkSMPToolsImpl<BackendType::STDThread>::IsParallelScope(); + VTK_ABI_NAMESPACE_END } // namespace smp } // namespace detail diff --git a/Documentation/release/dev/imporve-smp-std-thread-backend.md b/Documentation/release/dev/imporve-smp-std-thread-backend.md new file mode 100644 index 0000000000000000000000000000000000000000..9b7898507a90694d76ef5af72b08532f7fb1ca4c --- /dev/null +++ b/Documentation/release/dev/imporve-smp-std-thread-backend.md @@ -0,0 +1,6 @@ +# Improve VTK SMP STDThread backend + +A common, global, thread pool is shared between all SMP calls, so they no longer create threads. +Nested SMP calls no longer spawn additional threads. + +These changes lead to much better performances for small SMP calls and a better scalability for nested calls!