Skip to content
Snippets Groups Projects
Commit 0c4cdaf1 authored by Yohann Bearzi (Kitware)'s avatar Yohann Bearzi (Kitware)
Browse files

vtkThreadedCallbackQueue: changing SetNumberOfThreads behavior

`vtkThreadedCallbackQueue::SetNumberOfThreads` used to stop all running
threads, then allocate new threads but not run them. The user would have
had to restart the threads. As a consequence, we go from a state of `n`
threads to a state of zero threads to a state of `m` threads. Ideally,
we'd like the transition to be smoother so we just directly go from `n`
to `m` threads.

The method has been updated to behave like the latter scenario.
`SerialStop` is not needed anymore either so it is removed.
parent 3c6bdc69
No related branches found
No related tags found
No related merge requests found
......@@ -21,10 +21,13 @@
#include <atomic>
int TestThreadedCallbackQueue(int, char*[])
namespace
{
//-----------------------------------------------------------------------------
void RunThreads(int nthreadsBegin, int nthreadsEnd)
{
vtkNew<vtkThreadedCallbackQueue> queue;
queue->SetNumberOfThreads(2);
queue->SetNumberOfThreads(nthreadsBegin);
queue->Start();
std::atomic_int count(0);
int N = 100000;
......@@ -33,6 +36,10 @@ int TestThreadedCallbackQueue(int, char*[])
for (vtkIdType i = 0; i < N; ++i)
{
if (i == N / 2)
{
queue->Start();
}
if (i == N / 4)
{
queue->Stop();
}
......@@ -46,12 +53,22 @@ int TestThreadedCallbackQueue(int, char*[])
i, 0, vtkNew<vtkIntArray>(), array);
}
queue->SetNumberOfThreads(16);
queue->Start();
queue->SetNumberOfThreads(nthreadsEnd);
// If the jobs are not run, this test will do an infinite loop
while (count != N)
;
}
} // anonymous namespace
int TestThreadedCallbackQueue(int, char*[])
{
vtkLog(INFO, "Testing expanding from 2 to 8 threads");
// Testing expanding the number of threads
::RunThreads(2, 8);
vtkLog(INFO, "Testing shrinking from 8 to 2 threads");
// Testing shrinking the number of threads
::RunThreads(8, 2);
return EXIT_SUCCESS;
}
......@@ -16,6 +16,9 @@
#include "vtkThreadedCallbackQueue.h"
#include "vtkObjectFactory.h"
#include <algorithm>
#include <functional>
VTK_ABI_NAMESPACE_BEGIN
vtkStandardNewMacro(vtkThreadedCallbackQueue);
......@@ -98,48 +101,87 @@ vtkThreadedCallbackQueue::~vtkThreadedCallbackQueue()
}
}
//-----------------------------------------------------------------------------
void vtkThreadedCallbackQueue::ThreadRoutine(int threadId)
{
while (threadId < this->NumberOfThreads && this->Running && (!this->Destroying || !this->Empty))
{
this->Pop(threadId);
}
}
//-----------------------------------------------------------------------------
void vtkThreadedCallbackQueue::SetNumberOfThreads(int numberOfThreads)
{
::Execute(
this->Controller,
[](vtkThreadedCallbackQueue* self, int n) {
if (static_cast<std::size_t>(n) == self->Threads.size())
int size = static_cast<int>(self->Threads.size());
if (size == n)
{
// Nothing to do
return;
}
// We only need to protect the shared atomic NumberOfThreads if we are shrinking.
else if (size < n || !self->Running)
{
self->NumberOfThreads = n;
}
else
{
std::lock_guard<std::mutex> lock(self->Mutex);
self->NumberOfThreads = n;
}
// If there are no threads running, we can just allocate the vector of threads.
if (!self->Running)
{
self->Threads.resize(n);
return;
}
// We need to use SerialStop so it is not pushed into the controller's queue.
// If someone calls Start after SetNumberOfThreads and Start is inserted before Stop,
// then we enter a deadlock. Serializing the Stop call prevents that.
self->SerialStop(self);
self->NumberOfThreads = n;
self->Threads.resize(n);
// If we are expanding the number of threads, then we just need to spawn
// the missing threads.
if (size < n)
{
std::generate_n(std::back_inserter(self->Threads), n - size, [&self] {
return std::thread(
std::bind(&vtkThreadedCallbackQueue::ThreadRoutine, self, self->Threads.size() - 1));
});
}
// If we are shrinking the number of threads, let's notify all threads
// so the threads whose id is more than the updated NumberOfThreads terminate.
else
{
self->ConditionVariable.notify_all();
self->Sync(self->NumberOfThreads);
self->Threads.resize(n);
}
},
this, numberOfThreads);
}
//-----------------------------------------------------------------------------
void vtkThreadedCallbackQueue::SerialStop(vtkThreadedCallbackQueue* self)
void vtkThreadedCallbackQueue::Stop()
{
if (!self->Running)
{
return;
}
{
std::lock_guard<std::mutex> lock(self->Mutex);
self->Running = false;
}
::Execute(
this->Controller,
[](vtkThreadedCallbackQueue* self) {
if (!self->Running)
{
return;
}
self->ConditionVariable.notify_all();
self->Sync();
}
{
std::lock_guard<std::mutex> lock(self->Mutex);
self->Running = false;
}
//-----------------------------------------------------------------------------
void vtkThreadedCallbackQueue::Stop()
{
::Execute(this->Controller, this->SerialStop, this);
self->ConditionVariable.notify_all();
self->Sync();
},
this);
}
//-----------------------------------------------------------------------------
......@@ -155,40 +197,35 @@ void vtkThreadedCallbackQueue::Start()
self->Running = true;
for (std::thread& thread : self->Threads)
{
thread = std::thread([self] {
while (self->Running && (!self->Destroying || !self->Empty))
{
self->Pop();
}
});
}
int threadId = -1;
std::generate(self->Threads.begin(), self->Threads.end(), [&self, &threadId] {
return std::thread(std::bind(&vtkThreadedCallbackQueue::ThreadRoutine, self, ++threadId));
});
},
this);
}
//-----------------------------------------------------------------------------
void vtkThreadedCallbackQueue::Sync()
void vtkThreadedCallbackQueue::Sync(int startId)
{
for (std::thread& thread : this->Threads)
{
thread.join();
}
std::for_each(this->Threads.begin() + startId, this->Threads.end(),
[](std::thread& thread) { thread.join(); });
}
//-----------------------------------------------------------------------------
void vtkThreadedCallbackQueue::Pop()
void vtkThreadedCallbackQueue::Pop(int threadId)
{
std::unique_lock<std::mutex> lock(this->Mutex);
if (!this->Destroying && this->Workers.empty())
if (threadId < this->NumberOfThreads && !this->Destroying && this->Workers.empty())
{
this->ConditionVariable.wait(
lock, [this] { return !this->Workers.empty() || !this->Running || this->Destroying; });
this->ConditionVariable.wait(lock, [this, &threadId] {
return threadId >= this->NumberOfThreads || !this->Workers.empty() || !this->Running ||
this->Destroying;
});
}
if (!this->Running || this->Workers.empty())
if (threadId >= this->NumberOfThreads || !this->Running || this->Workers.empty())
{
return;
}
......
......@@ -71,9 +71,7 @@ public:
void Push(FT&& f, ArgsT&&... args);
/**
* Sets the number of threads. If the new number of threads is different than the current one,
* this method calls `Stop()`, so one needs to call `Start()` after the number of threads are set
* in order to make this queue running.
* Sets the number of threads. The running state of the queue is not impacted by this method.
*
* This method is executed by the `Controller` on a different thread, so this method may terminate
* before the threads were allocated. Nevertheless, this method is thread-safe. Other calls to
......@@ -143,18 +141,24 @@ private:
/**
* This method terminates when all threads have finished. If `Destroying` is not true or `Running`
* is not false, then calling this method results in a deadlock.
*
* @param startId The thread id from which we synchronize the threads.
*/
void Sync();
void Sync(int startId = 0);
/**
* Pops the queue and runs the stored worker.
*
* @param The id of the thread currently calling this method.
*/
void Pop();
void Pop(int threadId);
/**
* Stop routine forced to be run serially. This function doesn't use the `Controller`.
* Main function that each thread runs. It pops the callback queue until notified otherwise.
*
* @param threadId The thread id assigned to the thread running this routine.
*/
static void SerialStop(vtkThreadedCallbackQueue* self);
void ThreadRoutine(int threadId);
/**
* Queue of workers responsible for running the jobs that are inserted.
......@@ -189,9 +193,6 @@ private:
*/
std::atomic_int NumberOfThreads;
/**
* Collection of threads running the jobs.
*/
std::vector<std::thread> Threads;
/**
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment