Commit 9794b72d authored by Sebastian Holtermann's avatar Sebastian Holtermann

cmWorkerPool: Set worker thread count separately to Process()

Don't pass the desired worker thread count to the `cmWorkerPool::Process()`
method but set it separately with the new `cmWorkerPool::SetThreadCount`
method.  This allows calling `cmWorkerPool::Process()` repeatedly without
having to pass the thread count every time.
parent 993dfa89
...@@ -1186,6 +1186,7 @@ bool cmQtAutoMocUic::Init(cmMakefile* makefile) ...@@ -1186,6 +1186,7 @@ bool cmQtAutoMocUic::Init(cmMakefile* makefile)
num = std::min<unsigned long>(num, ParallelMax); num = std::min<unsigned long>(num, ParallelMax);
Base_.NumThreads = static_cast<unsigned int>(num); Base_.NumThreads = static_cast<unsigned int>(num);
} }
WorkerPool_.SetThreadCount(Base_.NumThreads);
} }
// - Files and directories // - Files and directories
...@@ -1482,15 +1483,12 @@ bool cmQtAutoMocUic::Process() ...@@ -1482,15 +1483,12 @@ bool cmQtAutoMocUic::Process()
if (!CreateDirectories()) { if (!CreateDirectories()) {
return false; return false;
} }
if (!WorkerPool_.Process(this)) {
if (!WorkerPool_.Process(Base().NumThreads, this)) {
return false; return false;
} }
if (JobError_) { if (JobError_) {
return false; return false;
} }
return SettingsFileWrite(); return SettingsFileWrite();
} }
......
...@@ -468,6 +468,7 @@ public: ...@@ -468,6 +468,7 @@ public:
// -- Thread pool and job queue // -- Thread pool and job queue
std::mutex Mutex; std::mutex Mutex;
bool Processing = false;
bool Aborting = false; bool Aborting = false;
bool FenceProcessing = false; bool FenceProcessing = false;
unsigned int WorkersRunning = 0; unsigned int WorkersRunning = 0;
...@@ -591,7 +592,8 @@ cmWorkerPoolInternal::~cmWorkerPoolInternal() ...@@ -591,7 +592,8 @@ cmWorkerPoolInternal::~cmWorkerPoolInternal()
bool cmWorkerPoolInternal::Process() bool cmWorkerPoolInternal::Process()
{ {
// Reset state // Reset state flags
Processing = true;
Aborting = false; Aborting = false;
// Initialize libuv asynchronous request // Initialize libuv asynchronous request
UVRequestBegin.init(*UVLoop, &cmWorkerPoolInternal::UVSlotBegin, this); UVRequestBegin.init(*UVLoop, &cmWorkerPoolInternal::UVSlotBegin, this);
...@@ -599,23 +601,27 @@ bool cmWorkerPoolInternal::Process() ...@@ -599,23 +601,27 @@ bool cmWorkerPoolInternal::Process()
// Send begin request // Send begin request
UVRequestBegin.send(); UVRequestBegin.send();
// Run libuv loop // Run libuv loop
return (uv_run(UVLoop.get(), UV_RUN_DEFAULT) == 0); bool success = (uv_run(UVLoop.get(), UV_RUN_DEFAULT) == 0);
// Update state flags
Processing = false;
Aborting = false;
return success;
} }
void cmWorkerPoolInternal::Abort() void cmWorkerPoolInternal::Abort()
{ {
bool firstCall = false; bool notifyThreads = false;
// Clear all jobs and set abort flag // Clear all jobs and set abort flag
{ {
std::lock_guard<std::mutex> guard(Mutex); std::lock_guard<std::mutex> guard(Mutex);
if (!Aborting) { if (Processing && !Aborting) {
// Register abort and clear queue // Register abort and clear queue
Aborting = true; Aborting = true;
Queue.clear(); Queue.clear();
firstCall = true; notifyThreads = true;
} }
} }
if (firstCall) { if (notifyThreads) {
// Wake threads // Wake threads
Condition.notify_all(); Condition.notify_all();
} }
...@@ -627,15 +633,13 @@ inline bool cmWorkerPoolInternal::PushJob(cmWorkerPool::JobHandleT&& jobHandle) ...@@ -627,15 +633,13 @@ inline bool cmWorkerPoolInternal::PushJob(cmWorkerPool::JobHandleT&& jobHandle)
if (Aborting) { if (Aborting) {
return false; return false;
} }
// Append the job to the queue // Append the job to the queue
Queue.emplace_back(std::move(jobHandle)); Queue.emplace_back(std::move(jobHandle));
// Notify an idle worker if there's one // Notify an idle worker if there's one
if (WorkersIdle != 0) { if (WorkersIdle != 0) {
Condition.notify_one(); Condition.notify_one();
} }
// Return success
return true; return true;
} }
...@@ -743,19 +747,22 @@ cmWorkerPool::cmWorkerPool() ...@@ -743,19 +747,22 @@ cmWorkerPool::cmWorkerPool()
cmWorkerPool::~cmWorkerPool() = default; cmWorkerPool::~cmWorkerPool() = default;
bool cmWorkerPool::Process(unsigned int threadCount, void* userData) void cmWorkerPool::SetThreadCount(unsigned int threadCount)
{
if (!Int_->Processing) {
ThreadCount_ = (threadCount > 0) ? threadCount : 1u;
}
}
bool cmWorkerPool::Process(void* userData)
{ {
// Setup user data // Setup user data
UserData_ = userData; UserData_ = userData;
ThreadCount_ = (threadCount > 0) ? threadCount : 1u;
// Run libuv loop // Run libuv loop
bool success = Int_->Process(); bool success = Int_->Process();
// Clear user data // Clear user data
UserData_ = nullptr; UserData_ = nullptr;
ThreadCount_ = 0; // Return
return success; return success;
} }
......
...@@ -50,12 +50,12 @@ public: ...@@ -50,12 +50,12 @@ public:
JobT& operator=(JobT const&) = delete; JobT& operator=(JobT const&) = delete;
/** /**
* @brief Virtual destructor. * Virtual destructor.
*/ */
virtual ~JobT(); virtual ~JobT();
/** /**
* @brief Fence job flag * Fence job flag
* *
* Fence jobs require that: * Fence jobs require that:
* - all jobs before in the queue have been processed * - all jobs before in the queue have been processed
...@@ -66,7 +66,7 @@ public: ...@@ -66,7 +66,7 @@ public:
protected: protected:
/** /**
* @brief Protected default constructor * Protected default constructor
*/ */
JobT(bool fence = false) JobT(bool fence = false)
: Fence_(fence) : Fence_(fence)
...@@ -125,12 +125,12 @@ public: ...@@ -125,12 +125,12 @@ public:
}; };
/** /**
* @brief Job handle type * Job handle type
*/ */
typedef std::unique_ptr<JobT> JobHandleT; typedef std::unique_ptr<JobT> JobHandleT;
/** /**
* @brief Fence job base class * Fence job base class
*/ */
class JobFenceT : public JobT class JobFenceT : public JobT
{ {
...@@ -144,8 +144,9 @@ public: ...@@ -144,8 +144,9 @@ public:
}; };
/** /**
* @brief Fence job that aborts the worker pool. * Fence job that aborts the worker pool.
* This class is useful as the last job in the job queue. *
* Useful as the last job in the job queue.
*/ */
class JobEndT : JobFenceT class JobEndT : JobFenceT
{ {
...@@ -160,23 +161,29 @@ public: ...@@ -160,23 +161,29 @@ public:
~cmWorkerPool(); ~cmWorkerPool();
/** /**
* @brief Blocking function that starts threads to process all Jobs in * Number of worker threads.
* the queue. */
unsigned int ThreadCount() const { return ThreadCount_; }
/**
* Set the number of worker threads.
* *
* This method blocks until a job calls the Abort() method. * Calling this method during Process() has no effect.
* @arg threadCount Number of threads to process jobs.
* @arg userData Common user data pointer available in all Jobs.
*/ */
bool Process(unsigned int threadCount, void* userData = nullptr); void SetThreadCount(unsigned int threadCount);
/** /**
* Number of worker threads passed to Process(). * Blocking function that starts threads to process all Jobs in the queue.
* Only valid during Process(). *
* This method blocks until a job calls the Abort() method.
* @arg threadCount Number of threads to process jobs.
* @arg userData Common user data pointer available in all Jobs.
*/ */
unsigned int ThreadCount() const { return ThreadCount_; } bool Process(void* userData = nullptr);
/** /**
* User data reference passed to Process(). * User data reference passed to Process().
*
* Only valid during Process(). * Only valid during Process().
*/ */
void* UserData() const { return UserData_; } void* UserData() const { return UserData_; }
...@@ -184,14 +191,14 @@ public: ...@@ -184,14 +191,14 @@ public:
// -- Job processing interface // -- Job processing interface
/** /**
* @brief Clears the job queue and aborts all worker threads. * Clears the job queue and aborts all worker threads.
* *
* This method is thread safe and can be called from inside a job. * This method is thread safe and can be called from inside a job.
*/ */
void Abort(); void Abort();
/** /**
* @brief Push job to the queue. * Push job to the queue.
* *
* This method is thread safe and can be called from inside a job or before * This method is thread safe and can be called from inside a job or before
* Process(). * Process().
...@@ -199,7 +206,7 @@ public: ...@@ -199,7 +206,7 @@ public:
bool PushJob(JobHandleT&& jobHandle); bool PushJob(JobHandleT&& jobHandle);
/** /**
* @brief Push job to the queue * Push job to the queue
* *
* This method is thread safe and can be called from inside a job or before * This method is thread safe and can be called from inside a job or before
* Process(). * Process().
...@@ -212,7 +219,7 @@ public: ...@@ -212,7 +219,7 @@ public:
private: private:
void* UserData_ = nullptr; void* UserData_ = nullptr;
unsigned int ThreadCount_ = 0; unsigned int ThreadCount_ = 1;
std::unique_ptr<cmWorkerPoolInternal> Int_; std::unique_ptr<cmWorkerPoolInternal> Int_;
}; };
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment