Skip to content
Snippets Groups Projects
Commit 01ad9025 authored by Joerg Sonnenberger's avatar Joerg Sonnenberger Committed by Brad King
Browse files

Autogen: Redo locking and state machine for fence handling and the worker pool

(1) All CV use must hold the corresponding mutex, otherwise race
conditions happen. This is mandated by the C++ standard.

(2) Introduce a separate CV for the thread waiting for other jobs to
finish before running a fence. This avoids waking up all other workers
blindly. Correctly wake that thread up when the processing of outstanding
jobs is done.

(3) Split the waiting for a fence to become runnable from a fence is
pending. This avoids problems if more than one fence can end up on the
queue. The thread that took a fence off the queue is responsible for
clearing the fence processing flag.
parent 9ec09f22
No related branches found
No related tags found
No related merge requests found
...@@ -469,11 +469,9 @@ void cmWorkerPoolWorker::UVProcessStart(uv_async_t* handle) ...@@ -469,11 +469,9 @@ void cmWorkerPoolWorker::UVProcessStart(uv_async_t* handle)
void cmWorkerPoolWorker::UVProcessFinished() void cmWorkerPoolWorker::UVProcessFinished()
{ {
{ std::lock_guard<std::mutex> lock(Proc_.Mutex);
std::lock_guard<std::mutex> lock(Proc_.Mutex); if (Proc_.ROP && (Proc_.ROP->IsFinished() || !Proc_.ROP->IsStarted())) {
if (Proc_.ROP && (Proc_.ROP->IsFinished() || !Proc_.ROP->IsStarted())) { Proc_.ROP.reset();
Proc_.ROP.reset();
}
} }
// Notify idling thread // Notify idling thread
Proc_.Condition.notify_one(); Proc_.Condition.notify_one();
...@@ -532,6 +530,7 @@ public: ...@@ -532,6 +530,7 @@ public:
unsigned int JobsProcessing = 0; unsigned int JobsProcessing = 0;
std::deque<cmWorkerPool::JobHandleT> Queue; std::deque<cmWorkerPool::JobHandleT> Queue;
std::condition_variable Condition; std::condition_variable Condition;
std::condition_variable ConditionFence;
std::vector<std::unique_ptr<cmWorkerPoolWorker>> Workers; std::vector<std::unique_ptr<cmWorkerPoolWorker>> Workers;
// -- References // -- References
...@@ -593,19 +592,12 @@ bool cmWorkerPoolInternal::Process() ...@@ -593,19 +592,12 @@ bool cmWorkerPoolInternal::Process()
void cmWorkerPoolInternal::Abort() void cmWorkerPoolInternal::Abort()
{ {
bool notifyThreads = false;
// Clear all jobs and set abort flag // Clear all jobs and set abort flag
{ std::lock_guard<std::mutex> guard(Mutex);
std::lock_guard<std::mutex> guard(Mutex); if (!Aborting) {
if (Processing && !Aborting) { // Register abort and clear queue
// Register abort and clear queue Aborting = true;
Aborting = true; Queue.clear();
Queue.clear();
notifyThreads = true;
}
}
if (notifyThreads) {
// Wake threads
Condition.notify_all(); Condition.notify_all();
} }
} }
...@@ -669,7 +661,7 @@ void cmWorkerPoolInternal::Work(unsigned int workerIndex) ...@@ -669,7 +661,7 @@ void cmWorkerPoolInternal::Work(unsigned int workerIndex)
if (Aborting) { if (Aborting) {
break; break;
} }
// Wait for new jobs // Wait for new jobs on the main CV
if (Queue.empty()) { if (Queue.empty()) {
++WorkersIdle; ++WorkersIdle;
Condition.wait(uLock); Condition.wait(uLock);
...@@ -677,20 +669,34 @@ void cmWorkerPoolInternal::Work(unsigned int workerIndex) ...@@ -677,20 +669,34 @@ void cmWorkerPoolInternal::Work(unsigned int workerIndex)
continue; continue;
} }
// Check for fence jobs // If there is a fence currently active or waiting,
if (FenceProcessing || Queue.front()->IsFence()) { // sleep on the main CV and try again.
if (JobsProcessing != 0) { if (FenceProcessing) {
Condition.wait(uLock); Condition.wait(uLock);
continue; continue;
}
// No jobs get processed. Set the fence job processing flag.
FenceProcessing = true;
} }
// Pop next job from queue // Pop next job from queue
jobHandle = std::move(Queue.front()); jobHandle = std::move(Queue.front());
Queue.pop_front(); Queue.pop_front();
// Check for fence jobs
bool raisedFence = false;
if (jobHandle->IsFence()) {
FenceProcessing = true;
raisedFence = true;
// Wait on the Fence CV until all pending jobs are done.
while (JobsProcessing != 0 && !Aborting) {
ConditionFence.wait(uLock);
}
// When aborting, explicitly kick all threads alive once more.
if (Aborting) {
FenceProcessing = false;
Condition.notify_all();
break;
}
}
// Unlocked scope for job processing // Unlocked scope for job processing
++JobsProcessing; ++JobsProcessing;
{ {
...@@ -701,11 +707,18 @@ void cmWorkerPoolInternal::Work(unsigned int workerIndex) ...@@ -701,11 +707,18 @@ void cmWorkerPoolInternal::Work(unsigned int workerIndex)
} }
--JobsProcessing; --JobsProcessing;
// Was this a fence job? // If this was the thread that entered fence processing
if (FenceProcessing) { // originally, notify all idling workers that the fence
// is done.
if (raisedFence) {
FenceProcessing = false; FenceProcessing = false;
Condition.notify_all(); Condition.notify_all();
} }
// If fence processing is still not done, notify the
// the fencing worker when all active jobs are done.
if (FenceProcessing && JobsProcessing == 0) {
ConditionFence.notify_all();
}
} }
// Decrement running workers count // Decrement running workers count
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment