Commit 8cb26a0a authored by Sebastian Holtermann's avatar Sebastian Holtermann

Autogen: Factor out concurrency framework to cmWorkerPool class

This factors out the concurrency framework in `cmQtAutoGeneratorMocUic` to a
dedicated class `cmWorkerPool` which might be reused in other places.

`cmWorkerPool` supports fence jobs that require that
- all other jobs before in the queue have been processed before the fence
  job processing gets started,
- no jobs later in the queue will be processed before the fence job processing
  has been completed.
Fence jobs are needed where the completion of all previous jobs in the queue
is a requirement for further processing.  E.g. in `cmQtAutoGeneratorMocUic`
the generation of `mocs_compilation.cpp` requires that all previous
source file parse jobs have been completed.
parent 7f83e803
......@@ -391,6 +391,8 @@ set(SRCS
cmVariableWatch.h
cmVersion.cxx
cmVersion.h
cmWorkerPool.cxx
cmWorkerPool.h
cmWorkingDirectory.cxx
cmWorkingDirectory.h
cmXMLParser.cxx
......
......@@ -14,12 +14,6 @@
#include "cmSystemTools.h"
#include "cmake.h"
#include <algorithm>
#include <sstream>
#include <utility>
// -- Class methods
cmQtAutoGenerator::Logger::Logger()
{
// Initialize logger
......@@ -431,232 +425,6 @@ bool cmQtAutoGenerator::FileSystem::MakeParentDirectory(
return cmQtAutoGenerator::MakeParentDirectory(filename);
}
int cmQtAutoGenerator::ReadOnlyProcessT::PipeT::init(uv_loop_t* uv_loop,
ReadOnlyProcessT* process)
{
Process_ = process;
Target_ = nullptr;
return UVPipe_.init(*uv_loop, 0, this);
}
int cmQtAutoGenerator::ReadOnlyProcessT::PipeT::startRead(std::string* target)
{
Target_ = target;
return uv_read_start(uv_stream(), &PipeT::UVAlloc, &PipeT::UVData);
}
void cmQtAutoGenerator::ReadOnlyProcessT::PipeT::reset()
{
Process_ = nullptr;
Target_ = nullptr;
UVPipe_.reset();
Buffer_.clear();
Buffer_.shrink_to_fit();
}
void cmQtAutoGenerator::ReadOnlyProcessT::PipeT::UVAlloc(uv_handle_t* handle,
size_t suggestedSize,
uv_buf_t* buf)
{
auto& pipe = *reinterpret_cast<PipeT*>(handle->data);
pipe.Buffer_.resize(suggestedSize);
buf->base = pipe.Buffer_.data();
buf->len = pipe.Buffer_.size();
}
void cmQtAutoGenerator::ReadOnlyProcessT::PipeT::UVData(uv_stream_t* stream,
ssize_t nread,
const uv_buf_t* buf)
{
auto& pipe = *reinterpret_cast<PipeT*>(stream->data);
if (nread > 0) {
// Append data to merged output
if ((buf->base != nullptr) && (pipe.Target_ != nullptr)) {
pipe.Target_->append(buf->base, nread);
}
} else if (nread < 0) {
// EOF or error
auto* proc = pipe.Process_;
// Check it this an unusual error
if (nread != UV_EOF) {
if (!proc->Result()->error()) {
proc->Result()->ErrorMessage =
"libuv reading from pipe failed with error code ";
proc->Result()->ErrorMessage += std::to_string(nread);
}
}
// Clear libuv pipe handle and try to finish
pipe.reset();
proc->UVTryFinish();
}
}
void cmQtAutoGenerator::ProcessResultT::reset()
{
ExitStatus = 0;
TermSignal = 0;
if (!StdOut.empty()) {
StdOut.clear();
StdOut.shrink_to_fit();
}
if (!StdErr.empty()) {
StdErr.clear();
StdErr.shrink_to_fit();
}
if (!ErrorMessage.empty()) {
ErrorMessage.clear();
ErrorMessage.shrink_to_fit();
}
}
void cmQtAutoGenerator::ReadOnlyProcessT::setup(
ProcessResultT* result, bool mergedOutput,
std::vector<std::string> const& command, std::string const& workingDirectory)
{
Setup_.WorkingDirectory = workingDirectory;
Setup_.Command = command;
Setup_.Result = result;
Setup_.MergedOutput = mergedOutput;
}
static std::string getUVError(const char* prefixString, int uvErrorCode)
{
std::ostringstream ost;
ost << prefixString << ": " << uv_strerror(uvErrorCode);
return ost.str();
}
bool cmQtAutoGenerator::ReadOnlyProcessT::start(
uv_loop_t* uv_loop, std::function<void()>&& finishedCallback)
{
if (IsStarted() || (Result() == nullptr)) {
return false;
}
// Reset result before the start
Result()->reset();
// Fill command string pointers
if (!Setup().Command.empty()) {
CommandPtr_.reserve(Setup().Command.size() + 1);
for (std::string const& arg : Setup().Command) {
CommandPtr_.push_back(arg.c_str());
}
CommandPtr_.push_back(nullptr);
} else {
Result()->ErrorMessage = "Empty command";
}
if (!Result()->error()) {
if (UVPipeOut_.init(uv_loop, this) != 0) {
Result()->ErrorMessage = "libuv stdout pipe initialization failed";
}
}
if (!Result()->error()) {
if (UVPipeErr_.init(uv_loop, this) != 0) {
Result()->ErrorMessage = "libuv stderr pipe initialization failed";
}
}
if (!Result()->error()) {
// -- Setup process stdio options
// stdin
UVOptionsStdIO_[0].flags = UV_IGNORE;
UVOptionsStdIO_[0].data.stream = nullptr;
// stdout
UVOptionsStdIO_[1].flags =
static_cast<uv_stdio_flags>(UV_CREATE_PIPE | UV_WRITABLE_PIPE);
UVOptionsStdIO_[1].data.stream = UVPipeOut_.uv_stream();
// stderr
UVOptionsStdIO_[2].flags =
static_cast<uv_stdio_flags>(UV_CREATE_PIPE | UV_WRITABLE_PIPE);
UVOptionsStdIO_[2].data.stream = UVPipeErr_.uv_stream();
// -- Setup process options
std::fill_n(reinterpret_cast<char*>(&UVOptions_), sizeof(UVOptions_), 0);
UVOptions_.exit_cb = &ReadOnlyProcessT::UVExit;
UVOptions_.file = CommandPtr_[0];
UVOptions_.args = const_cast<char**>(CommandPtr_.data());
UVOptions_.cwd = Setup_.WorkingDirectory.c_str();
UVOptions_.flags = UV_PROCESS_WINDOWS_HIDE;
UVOptions_.stdio_count = static_cast<int>(UVOptionsStdIO_.size());
UVOptions_.stdio = UVOptionsStdIO_.data();
// -- Spawn process
int uvErrorCode = UVProcess_.spawn(*uv_loop, UVOptions_, this);
if (uvErrorCode != 0) {
Result()->ErrorMessage =
getUVError("libuv process spawn failed ", uvErrorCode);
}
}
// -- Start reading from stdio streams
if (!Result()->error()) {
if (UVPipeOut_.startRead(&Result()->StdOut) != 0) {
Result()->ErrorMessage = "libuv start reading from stdout pipe failed";
}
}
if (!Result()->error()) {
if (UVPipeErr_.startRead(Setup_.MergedOutput ? &Result()->StdOut
: &Result()->StdErr) != 0) {
Result()->ErrorMessage = "libuv start reading from stderr pipe failed";
}
}
if (!Result()->error()) {
IsStarted_ = true;
FinishedCallback_ = std::move(finishedCallback);
} else {
// Clear libuv handles and finish
UVProcess_.reset();
UVPipeOut_.reset();
UVPipeErr_.reset();
CommandPtr_.clear();
}
return IsStarted();
}
void cmQtAutoGenerator::ReadOnlyProcessT::UVExit(uv_process_t* handle,
int64_t exitStatus,
int termSignal)
{
auto& proc = *reinterpret_cast<ReadOnlyProcessT*>(handle->data);
if (proc.IsStarted() && !proc.IsFinished()) {
// Set error message on demand
proc.Result()->ExitStatus = exitStatus;
proc.Result()->TermSignal = termSignal;
if (!proc.Result()->error()) {
if (termSignal != 0) {
proc.Result()->ErrorMessage = "Process was terminated by signal ";
proc.Result()->ErrorMessage +=
std::to_string(proc.Result()->TermSignal);
} else if (exitStatus != 0) {
proc.Result()->ErrorMessage = "Process failed with return value ";
proc.Result()->ErrorMessage +=
std::to_string(proc.Result()->ExitStatus);
}
}
// Reset process handle and try to finish
proc.UVProcess_.reset();
proc.UVTryFinish();
}
}
void cmQtAutoGenerator::ReadOnlyProcessT::UVTryFinish()
{
// There still might be data in the pipes after the process has finished.
// Therefore check if the process is finished AND all pipes are closed
// before signaling the worker thread to continue.
if (UVProcess_.get() == nullptr) {
if (UVPipeOut_.uv_pipe() == nullptr) {
if (UVPipeErr_.uv_pipe() == nullptr) {
IsFinished_ = true;
FinishedCallback_();
}
}
}
}
cmQtAutoGenerator::cmQtAutoGenerator() = default;
cmQtAutoGenerator::~cmQtAutoGenerator() = default;
......
......@@ -7,14 +7,8 @@
#include "cmFilePathChecksum.h"
#include "cmQtAutoGen.h"
#include "cmUVHandlePtr.h"
#include "cm_uv.h"
#include <array>
#include <functional>
#include <mutex>
#include <stddef.h>
#include <stdint.h>
#include <string>
#include <vector>
......@@ -137,102 +131,6 @@ public:
cmFilePathChecksum FilePathChecksum_;
};
/// @brief Return value and output of an external process
struct ProcessResultT
{
void reset();
bool error() const
{
return (ExitStatus != 0) || (TermSignal != 0) || !ErrorMessage.empty();
}
std::int64_t ExitStatus = 0;
int TermSignal = 0;
std::string StdOut;
std::string StdErr;
std::string ErrorMessage;
};
/// @brief External process management class
struct ReadOnlyProcessT
{
// -- Types
/// @brief libuv pipe buffer class
class PipeT
{
public:
int init(uv_loop_t* uv_loop, ReadOnlyProcessT* process);
int startRead(std::string* target);
void reset();
// -- Libuv casts
uv_pipe_t* uv_pipe() { return UVPipe_.get(); }
uv_stream_t* uv_stream()
{
return reinterpret_cast<uv_stream_t*>(uv_pipe());
}
uv_handle_t* uv_handle()
{
return reinterpret_cast<uv_handle_t*>(uv_pipe());
}
// -- Libuv callbacks
static void UVAlloc(uv_handle_t* handle, size_t suggestedSize,
uv_buf_t* buf);
static void UVData(uv_stream_t* stream, ssize_t nread,
const uv_buf_t* buf);
private:
ReadOnlyProcessT* Process_ = nullptr;
std::string* Target_ = nullptr;
std::vector<char> Buffer_;
cm::uv_pipe_ptr UVPipe_;
};
/// @brief Process settings
struct SetupT
{
std::string WorkingDirectory;
std::vector<std::string> Command;
ProcessResultT* Result = nullptr;
bool MergedOutput = false;
};
// -- Const accessors
const SetupT& Setup() const { return Setup_; }
ProcessResultT* Result() const { return Setup_.Result; }
bool IsStarted() const { return IsStarted_; }
bool IsFinished() const { return IsFinished_; }
// -- Runtime
void setup(ProcessResultT* result, bool mergedOutput,
std::vector<std::string> const& command,
std::string const& workingDirectory = std::string());
bool start(uv_loop_t* uv_loop, std::function<void()>&& finishedCallback);
private:
// -- Friends
friend class PipeT;
// -- Libuv callbacks
static void UVExit(uv_process_t* handle, int64_t exitStatus,
int termSignal);
void UVTryFinish();
// -- Setup
SetupT Setup_;
// -- Runtime
bool IsStarted_ = false;
bool IsFinished_ = false;
std::function<void()> FinishedCallback_;
std::vector<const char*> CommandPtr_;
std::array<uv_stdio_container_t, 3> UVOptionsStdIO_;
uv_process_options_t UVOptions_;
cm::uv_process_ptr UVProcess_;
PipeT UVPipeOut_;
PipeT UVPipeErr_;
};
public:
// -- Constructors
cmQtAutoGenerator();
......
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
/* Distributed under the OSI-approved BSD 3-Clause License. See accompanying
file Copyright.txt or https://cmake.org/licensing for details. */
#ifndef cmWorkerPool_h
#define cmWorkerPool_h
#include "cmConfigure.h" // IWYU pragma: keep
#include "cmAlgorithms.h" // IWYU pragma: keep
#include <memory> // IWYU pragma: keep
#include <stdint.h>
#include <string>
#include <utility>
#include <vector>
// -- Types
class cmWorkerPoolInternal;
/** @class cmWorkerPool
* @brief Thread pool with job queue
*/
class cmWorkerPool
{
public:
/**
* Return value and output of an external process.
*/
struct ProcessResultT
{
void reset();
bool error() const
{
return (ExitStatus != 0) || (TermSignal != 0) || !ErrorMessage.empty();
}
std::int64_t ExitStatus = 0;
int TermSignal = 0;
std::string StdOut;
std::string StdErr;
std::string ErrorMessage;
};
/**
* Abstract job class for concurrent job processing.
*/
class JobT
{
public:
JobT(JobT const&) = delete;
JobT& operator=(JobT const&) = delete;
/**
* @brief Virtual destructor.
*/
virtual ~JobT();
/**
* @brief Fence job flag
*
* Fence jobs require that:
* - all jobs before in the queue have been processed
* - no jobs later in the queue will be processed before this job was
* processed
*/
bool IsFence() const { return Fence_; }
protected:
/**
* @brief Protected default constructor
*/
JobT(bool fence = false)
: Fence_(fence)
{
}
/**
* Abstract processing interface that must be implement in derived classes.
*/
virtual void Process() = 0;
/**
* Get the worker pool.
* Only valid during the JobT::Process() call!
*/
cmWorkerPool* Pool() const { return Pool_; }
/**
* Get the user data.
* Only valid during the JobT::Process() call!
*/
void* UserData() const { return Pool_->UserData(); };
/**
* Get the worker index.
* This is the index of the thread processing this job and is in the range
* [0..ThreadCount).
* Concurrently processing jobs will never have the same WorkerIndex().
* Only valid during the JobT::Process() call!
*/
unsigned int WorkerIndex() const { return WorkerIndex_; }
/**
* Run an external read only process.
* Use only during JobT::Process() call!
*/
bool RunProcess(ProcessResultT& result,
std::vector<std::string> const& command,
std::string const& workingDirectory);
private:
//! Needs access to Work()
friend class cmWorkerPoolInternal;
//! Worker thread entry method.
void Work(cmWorkerPool* pool, unsigned int workerIndex)
{
Pool_ = pool;
WorkerIndex_ = workerIndex;
this->Process();
}
private:
cmWorkerPool* Pool_ = nullptr;
unsigned int WorkerIndex_ = 0;
bool Fence_ = false;
};
/**
* @brief Job handle type
*/
typedef std::unique_ptr<JobT> JobHandleT;
/**
* @brief Fence job base class
*/
class JobFenceT : public JobT
{
public:
JobFenceT()
: JobT(true)
{
}
//! Does nothing
void Process() override{};
};
/**
* @brief Fence job that aborts the worker pool.
* This class is useful as the last job in the job queue.
*/
class JobEndT : JobFenceT
{
public:
//! Does nothing
void Process() override { Pool()->Abort(); }
};
public:
// -- Methods
cmWorkerPool();
~cmWorkerPool();
/**
* @brief Blocking function that starts threads to process all Jobs in
* the queue.
*
* This method blocks until a job calls the Abort() method.
* @arg threadCount Number of threads to process jobs.
* @arg userData Common user data pointer available in all Jobs.
*/
bool Process(unsigned int threadCount, void* userData = nullptr);
/**
* Number of worker threads passed to Process().
* Only valid during Process().
*/
unsigned int ThreadCount() const { return ThreadCount_; }
/**
* User data reference passed to Process().
* Only valid during Process().
*/
void* UserData() const { return UserData_; }
// -- Job processing interface
/**
* @brief Clears the job queue and aborts all worker threads.
*
* This method is thread safe and can be called from inside a job.
*/
void Abort();
/**
* @brief Push job to the queue.
*
* This method is thread safe and can be called from inside a job or before
* Process().
*/
bool PushJob(JobHandleT&& jobHandle);
/**
* @brief Push job to the queue
*
* This method is thread safe and can be called from inside a job or before
* Process().
*/
template <class T, typename... Args>
bool EmplaceJob(Args&&... args)
{
return PushJob(cm::make_unique<T>(std::forward<Args>(args)...));
}
private:
void* UserData_ = nullptr;
unsigned int ThreadCount_ = 0;
std::unique_ptr<cmWorkerPoolInternal> Int_;
};
#endif
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment