diff --git a/Source/CMakeLists.txt b/Source/CMakeLists.txt index 37d407ba4d709717d7a1729305a56cb6a4ea365c..3ab6d2a5db2f3db32cc74ecd26e72c286bfcde51 100644 --- a/Source/CMakeLists.txt +++ b/Source/CMakeLists.txt @@ -1071,6 +1071,7 @@ add_library( CTest/cmCTestUpdateHandler.cxx CTest/cmCTestUploadCommand.cxx CTest/cmCTestUploadHandler.cxx + CTest/cmCTestJobServerClient.cxx CTest/cmCTestVC.cxx CTest/cmCTestVC.h @@ -1093,6 +1094,17 @@ add_library( LexerParser/cmCTestResourceGroupsLexer.h LexerParser/cmCTestResourceGroupsLexer.in.l ) + +if(WIN32 AND NOT CYGWIN) + # target_sources(CTestLib PRIVATE + # CTest/cmCTestJobServerWindows.cxx + # ) +else() + target_sources(CTestLib PRIVATE + CTest/cmCTestJobServerPosix.cxx + ) +endif() + target_include_directories( CTestLib PUBLIC diff --git a/Source/CTest/cmCTestJobServerClient.cxx b/Source/CTest/cmCTestJobServerClient.cxx new file mode 100644 index 0000000000000000000000000000000000000000..de02c3e65a9efd8cb9565028716a8d68377085cf --- /dev/null +++ b/Source/CTest/cmCTestJobServerClient.cxx @@ -0,0 +1,26 @@ +/* Distributed under the OSI-approved BSD 3-Clause License. See accompanying + file Copyright.txt or https://cmake.org/licensing for details. */ +#include "cmCTestJobServerClient.h" + +#include + +#include +#include + +#include "cmCTestJobServerPosix.h" + +cm::optional> +cmCTestJobServerClient::Connect(uv_loop_t* loop, size_t maxJobs) +{ +#if defined(_WIN32) + return cm::nullopt; +#else + std::unique_ptr client = + cm::make_unique(maxJobs); + if (client->Connect(loop)) { + return cm::optional>( + std::move(client)); + } +#endif + return cm::nullopt; +} diff --git a/Source/CTest/cmCTestJobServerClient.h b/Source/CTest/cmCTestJobServerClient.h new file mode 100644 index 0000000000000000000000000000000000000000..8fa905472df4147ede5362ebfc810665c16f977d --- /dev/null +++ b/Source/CTest/cmCTestJobServerClient.h @@ -0,0 +1,83 @@ +/* Distributed under the OSI-approved BSD 3-Clause License. See accompanying + file Copyright.txt or https://cmake.org/licensing for details. */ +#pragma once + +#include "cmConfigure.h" // IWYU pragma: keep + +#include +#include + +#include +#include + +#include +#include + +// Interface for a jobserver client. +// +// Users enqueue jobs be calling Enqueue(std::function f, size_t +// slots) And the jobserver client will automatically handle acquiring tokens +// and solving deadlocks. +// +// Once enough tokens are available (or the jobserver decides it's deadlocked) +// f(size_t tokens) is called with the number of tokens allocated to that task +// The user should eventually call Release(tokens) to return the resources +class cmCTestJobServerClient +{ +protected: + struct Task + { + std::function run; + size_t slots; + }; + +public: + virtual ~cmCTestJobServerClient() = default; + virtual cm::string_view Name() const = 0; + + /** + * @brief Enqueues a task to be executed when enough slots are available. + */ + virtual void Enqueue(std::function task, size_t slots) = 0; + + /** + * @brief Releases a specified number of slots on the job server. + * + * This function releases the specified number of slots on the job server, + * allowing other jobs to use them. + * + * @param slots The number of slots to release. + */ + virtual void Release(size_t slots) = 0; + + /** + * @brief Blocking call that force releases all jobslots and stops reading + * from the jobserver. + * + * @param force + */ + virtual void Close(bool force) = 0; + + /** + * @brief Adds a callback function to be called if the jobserver client + * encounters a fatal error. + * + * It should be assumed that no new jobs will be executed after this + * callback. The object should just be deleted. + * + * @param f The callback function + */ + virtual void SetFatalCallback(std::function f) = 0; + + /** + * @brief Connect to a jobserver by reading the system environment + * + * Returns a Windows or POSIX jobserver client if a valid definition exists + * + * @param loop The libuv event loop to use + * @param maxJobs The maximum number of jobs that can be run simultaneously + * @return A unique_ptr to the jobserver client implementation + */ + static cm::optional> Connect( + uv_loop_t* loop, size_t maxJobs); +}; diff --git a/Source/CTest/cmCTestJobServerPosix.cxx b/Source/CTest/cmCTestJobServerPosix.cxx new file mode 100644 index 0000000000000000000000000000000000000000..7e180686e502021be7ff35a9c0191d3253277633 --- /dev/null +++ b/Source/CTest/cmCTestJobServerPosix.cxx @@ -0,0 +1,333 @@ +#include "cmCTestJobServerPosix.h" + +#include +#include +#include +#include +#include + +#include + +#include + +#include "cmStringAlgorithms.h" +#include "cmSystemTools.h" + +uv_stream_t* cmCTestJobServerPosix::_get_writer() +{ + switch (connection_type) { + case ConnectionType::Pipe: + return reinterpret_cast(&write_pipe); + case ConnectionType::Fifo: + return reinterpret_cast(&fifo); + default: + return nullptr; + } +} + +uv_stream_t* cmCTestJobServerPosix::_get_reader() +{ + switch (connection_type) { + case ConnectionType::Pipe: + return reinterpret_cast(&read_pipe); + case ConnectionType::Fifo: + return reinterpret_cast(&fifo); + default: + return nullptr; + } +} + +void cmCTestJobServerPosix::_on_read(uv_stream_t* stream, ssize_t nread, + const uv_buf_t* buf) +{ + cmCTestJobServerPosix* self = + reinterpret_cast(stream->data); + + if (nread < 0) { + if (self->FatalCallback.has_value()) { + cmSystemTools::Error( + cmStrCat("Failed to read jobserver tokens: ", uv_strerror(nread))); + self->FatalCallback.value()(); + } + return; + } + + if (nread > 0) { + self->tokens.insert(self->tokens.end(), buf->base, buf->base + nread); + self->_process_queue(); + } + + delete buf->base; +} + +// Marks tokens as used, this is solely bookkeeping and doesn't actually read +void cmCTestJobServerPosix::_acquire(size_t slots) +{ + if (slots == 0) { + return; + } + + // Prefer to use the free token, if available + if (this->freeTokenAvailable) { + this->freeTokenAvailable = false; + slots--; + } + + this->usedTokens += slots; +} + +// Starts as many jobs as possible, given the number of tokens available +// +// If we exhaust the queue, we release any unused tokens +// If we were unable to start the next job, and no jobs are running, we might +// have entered a deadlock. So we start the next job with as many tokens as we +// have available +void cmCTestJobServerPosix::_process_queue() +{ + size_t slots = freeTokenAvailable + this->tokens.size() - this->usedTokens; + size_t started = 0; + while (slots > 0 && !queue.empty()) { + size_t slotsNeeded = std::min(queue.front().slots, maxSlots); + if (slotsNeeded > slots) { + break; + } + + cmCTestJobServerClient::Task task = std::move(queue.front()); + queue.pop_front(); + slots -= task.slots; + _acquire(task.slots); + task.run(task.slots); + + started++; + } + + if (queue.empty()) { + // Be a friend and release any tokens we didn't use + _write(this->tokens.size() - this->usedTokens, true); + } else if (started == 0 && this->freeTokenAvailable && + this->usedTokens == 0) { + // If no jobs were started, and we're not running any jobs, then we start + // the next job with as many tokens as we have available + cmCTestJobServerClient::Task task = std::move(queue.front()); + queue.pop_front(); + _acquire(slots); + task.run(slots); + } +} + +// Synchronously write the tokens to the pipe, blocking here is convenient +// for implementing `Close()`. There shouldn't be much of a performance +// impact because there are always other writers on the pipe, and the number +// of tokens is small. +// +// If `retry` is false then we only attempt to write once and return +void cmCTestJobServerPosix::_write(size_t n, bool retry) +{ + if (n == 0) { + return; + } + + // Takes n tokens from the queue and copies them into a buffer + char buf[n]; + std::memcpy(buf, this->tokens.data(), n); + this->tokens.erase(this->tokens.begin(), this->tokens.begin() + n); + + uv_stream_t* writer = _get_writer(); + uv_buf_t uvbuf = uv_buf_init(buf, n); + size_t total = 0; + do { + int written = uv_try_write(writer, &uvbuf, 1); + if (written < 0 && written != UV_EAGAIN) { + cmSystemTools::Error( + cmStrCat("Failed to write jobserver tokens: ", uv_strerror(written))); + if (FatalCallback.has_value()) { + FatalCallback.value()(); + } + return; + } + + total += written; + uvbuf.base += written; + uvbuf.len -= written; + } while (retry && total < n); +} + +bool cmCTestJobServerPosix::Connect(uv_loop_t* loop, int rfd, int wfd) +{ + this->Loop = loop; + this->connection_type = ConnectionType::Pipe; + + if (uv_pipe_init(loop, &read_pipe, 0) != 0 || + uv_pipe_init(loop, &write_pipe, 0) != 0) { + return false; + } + + if (uv_pipe_open(&read_pipe, rfd) != 0 || + uv_pipe_open(&write_pipe, wfd) != 0) { + return false; + } + + // Verify that the read side is readable, and the write side is writable + if (!uv_is_readable(reinterpret_cast(&read_pipe)) || + !uv_is_writable(reinterpret_cast(&write_pipe))) { + return false; + } + + read_pipe.data = this; + write_pipe.data = this; + + uv_read_start( + (uv_stream_t*)&read_pipe, + [](uv_handle_t* /*handle*/, size_t suggested_size, uv_buf_t* buf) { + buf->base = new char[suggested_size]; + buf->len = suggested_size; + }, + _on_read); + + return true; +} + +bool cmCTestJobServerPosix::Connect(uv_loop_t* loop, const char* path) +{ + this->Loop = loop; + this->connection_type = ConnectionType::Fifo; + + if (uv_pipe_init(loop, &fifo, 0) != 0) { + return false; + } + + int fd = open(path, O_RDWR); + if (fd == -1) { + return false; + } + + if (uv_pipe_open(&fifo, fd) != 0) { + return false; + } + + uv_read_start( + (uv_stream_t*)&fifo, + [](uv_handle_t* /*handle*/, size_t suggested_size, uv_buf_t* buf) { + buf->base = new char[suggested_size]; + buf->len = suggested_size; + }, + _on_read); + + fifo.data = this; + return true; +} + +bool cmCTestJobServerPosix::Connect(uv_loop_t* loop) +{ + // --jobserver-auth= for gnu make versions >= 4.2 + // --jobserver-fds= for gnu make versions < 4.2 + // -J for bsd make + const std::vector prefixes = { "--jobserver-auth=", + "--jobserver-fds=", "-J" }; + + cm::optional maxJobs = cm::nullopt; + cm::optional auth = cm::nullopt; + + std::string makeflags; + if (!cmSystemTools::GetEnv("MAKEFLAGS", makeflags)) { + return false; + } + + std::vector args; + cmSystemTools::ParseUnixCommandLine(makeflags.c_str(), args); + for (const std::string& arg : args) { + cm::string_view arg_view(arg); + + if (cmHasLiteralPrefix(arg_view, "-j")) { + auth = arg_view.substr(cmStrLen("-j")); + } else { + for (const cm::string_view& prefix : prefixes) { + if (cmHasPrefix(arg_view, prefix)) { + auth = cmTrimWhitespace(arg_view.substr(prefix.length())); + break; + } + } + } + } + + if (maxJobs) { + try { + this->maxSlots = std::min(this->maxSlots, std::stoul(*maxJobs)); + } catch (...) { + } + } + + if (!auth) { + return false; + } + + // fifo:PATH + if (cmHasLiteralPrefix(*auth, "fifo:")) { + return Connect(loop, auth->substr(cmStrLen("fifo:")).c_str()); + } + + // reader,writer + size_t reader; + size_t writer; + try { + size_t comma = auth->find(','); + if (comma == std::string::npos) { + return false; + } + + reader = std::stoul(auth->substr(0, comma)); + writer = std::stoul(auth->substr(comma + 1)); + } catch (...) { + return false; + } + + return Connect(loop, reader, writer); +} + +void cmCTestJobServerPosix::Enqueue(std::function task, + size_t slots) +{ + this->queue.push_back({ task, slots }); +} + +void cmCTestJobServerPosix::Release(size_t n) +{ + if (n == 0) { + return; + } + + if (n > this->usedTokens) { + assert(n == this->usedTokens + 1); + this->freeTokenAvailable = true; + n--; + } + this->usedTokens -= n; + + _write(n, true); + _process_queue(); +} + +void cmCTestJobServerPosix::Close(bool force) +{ + uv_read_stop(_get_reader()); + _write(this->tokens.size(), !force); + + switch (this->connection_type) { + case ConnectionType::Fifo: + uv_close(reinterpret_cast(&fifo), + [](uv_handle_t* /*handle*/) {}); + break; + case ConnectionType::Pipe: + uv_close(reinterpret_cast(&read_pipe), + [](uv_handle_t* /*handle*/) {}); + + uv_close(reinterpret_cast(&write_pipe), + [](uv_handle_t* /*handle*/) {}); + break; + } +} + +void cmCTestJobServerPosix::SetFatalCallback(std::function f) +{ + this->FatalCallback = std::move(f); +} diff --git a/Source/CTest/cmCTestJobServerPosix.h b/Source/CTest/cmCTestJobServerPosix.h new file mode 100644 index 0000000000000000000000000000000000000000..73ca7accfffae8dd61f52a6cceb573bc83aceb95 --- /dev/null +++ b/Source/CTest/cmCTestJobServerPosix.h @@ -0,0 +1,97 @@ +/* Distributed under the OSI-approved BSD 3-Clause License. See accompanying + file Copyright.txt or https://cmake.org/licensing for details. */ +#pragma once + +#include "cmConfigure.h" // IWYU pragma: keep + +#include +#include +#include + +#include +#include + +#include +#include + +#include "cmCTestJobServerClient.h" + +// POSIX jobserver client implementation +// https://www.gnu.org/software/make/manual/html_node/POSIX-Jobserver.html +class cmCTestJobServerPosix : public cmCTestJobServerClient +{ +private: + union + { + struct + { + uv_pipe_t read_pipe; + uv_pipe_t write_pipe; + }; + uv_pipe_t fifo; + }; + + enum class ConnectionType + { + // read_pipe and write_pipe are valid + Pipe, + // fifo is valid + Fifo + }; + ConnectionType connection_type; + + // min(local number of processes, upstream number of processes) + size_t maxSlots; + + uv_loop_t* Loop = nullptr; + uv_stream_t* _get_writer(); + uv_stream_t* _get_reader(); + static void _on_read(uv_stream_t* stream, ssize_t nread, + const uv_buf_t* buf); + + // A callback to be called if the jobserver client encounters a fatal error + cm::optional> FatalCallback; + + // Stack of tokens that have been acquired from the jobserver + std::vector tokens = std::vector(); + size_t usedTokens = 0; + bool freeTokenAvailable = true; + + std::list queue = + std::list(); + + void _acquire(size_t slots); + void _write(size_t slots, bool noRetry); + + void _process_queue(); + +public: + cmCTestJobServerPosix(size_t maxJobs) + : maxSlots(maxJobs) + { + } + ~cmCTestJobServerPosix() override = default; + + cm::string_view Name() const override { return cm::string_view("posix"); } + + /** + * @brief Connect to jobserver using a pair of file descriptors + * @return true if the connection seems successful + */ + bool Connect(uv_loop_t* loop, int rfd, int wfd); + /** + * @brief Connect to jobserver using a path to a fifo + * @return true if the connection seems successful + */ + bool Connect(uv_loop_t* loop, const char* path); + /** + * @brief Connect to jobserver using the environment variables + * @return true if the connection seems successful + */ + bool Connect(uv_loop_t* loop); + + void Enqueue(std::function task, size_t slots) override; + void Release(size_t slots) override; + void Close(bool force) override; + void SetFatalCallback(std::function f) override; +}; diff --git a/Source/CTest/cmCTestJobServerWindows.cxx b/Source/CTest/cmCTestJobServerWindows.cxx new file mode 100644 index 0000000000000000000000000000000000000000..0bfc16a17d8af1c5fd7bd1f67c9b2d6412ea9e0b --- /dev/null +++ b/Source/CTest/cmCTestJobServerWindows.cxx @@ -0,0 +1,155 @@ +#include "cmCTestJobServerWindows.h" + +#include + +void cmCTestJobServerWindows::_wait_for_semaphore(uv_idle_t* handle) +{ + cmCTestJobServerWindows* self = + reinterpret_cast(handle->data); + + // Repeatedly wait until we get a timeout + size_t slots = 0; + while (true) { + DWORD r = WaitForSingleObject(self->semephore, 0); + + if (r == WAIT_OBJECT_0) { + slots++; + } else if (r == WAIT_TIMEOUT) { + break; + } else { + if (self->FatalCallback.has_value()) { + self->FatalCallback.value()(); + } + return; + } + } + + if (slots > 0) { + self->acquiredTokens += slots; + self->_process_queue(); + } +} + +void cmCTestJobServerWindows::_process_queue() +{ + size_t slots = this->acquiredTokens - this->usedTokens; + size_t started = 0; + while (slots > 0 && !queue.empty()) { + size_t slotsNeeded = std::min(queue.front().slots, maxSlots); + if (slotsNeeded > slots) { + break; + } + + cmCTestJobServerClient::Task task = std::move(queue.front()); + queue.pop_front(); + slots -= task.slots; + this->usedTokens += task.slots; + task.run(task.slots); + + started++; + } + + if (queue.empty()) { + // Be a friend and release any tokens we didn't use + _write(this->acquiredTokens - this->usedTokens); + } else if (started == 0 && this->usedTokens == 0) { + // If no jobs were started, and we're not running any jobs, then we start + // the next job with as many tokens as we have available + cmCTestJobServerClient::Task task = std::move(queue.front()); + queue.pop_front(); + this->usedTokens += slots; + task.run(slots); + } +} + +void cmCTestJobServerWindows::_write(size_t slots) +{ + DWORD r = ReleaseSemaphore(this->semephore, slots, nullptr); + if (r == 0) { + if (this->FatalCallback.has_value()) { + this->FatalCallback.value()(); + } + } + + this->acquiredTokens -= slots; +} + +bool cmCTestJobServerWindows::Connect(uv_loop_t* loop, const wchar_t* name) +{ + this->Loop = loop; + this->semephore = OpenSemaphore(SEMAPHORE_ALL_ACCESS, FALSE, name); + if (this->semephore == nullptr) { + return false; + } + + // Start idle task to wait for semaphore + uv_idle_init(loop, &this->idle); + this->idle.data = this; + uv_idle_start(&this->idle, _wait_for_semaphore); + + return true; +} + +bool cmCTestJobServerWindows::Connect(uv_loop_t* loop) +{ + const cm::string_view prefix = { "--jobserver-auth=" }; + + cm::optional maxJobs = cm::nullopt; + cm::optional auth = cm::nullopt; + + std::string makeflags; + if (!cmSystemTools::GetEnv("MAKEFLAGS", makeflags)) { + return false; + } + + std::vector args; + cmSystemTools::ParseUnixCommandLine(makeflags.c_str(), args); + for (const std::string& arg : args) { + cm::string_view arg_view(arg); + + if (cmHasLiteralPrefix(arg_view, "-j")) { + auth = arg_view.substr(cmStrLen("-j")); + } else if (cmHasPrefix(arg_view, prefix)) { + auth = cmTrimWhitespace(arg_view.substr(prefix.length())); + break; + } + } + + if (maxJobs) { + try { + this->maxSlots = std::min(this->maxSlots, std::stoul(*maxJobs)); + } catch (...) { + } + } + + if (!auth) { + return false; + } + + return Connect(loop, auth); +} + +void cmCTestJobServerWindows::Enqueue(std::function task, + size_t slots) +{ + this->queue.push_back({ task, slots }); +} + +void cmCTestJobServerWindows::Release(size_t slots) +{ + this->usedTokens -= slots; + _write(slots); +} + +void cmCTestJobServerWindows::Close(bool force) +{ + _write(this->acquiredTokens); + CloseHandle(this->semephore); + this->semephore = nullptr; + uv_idle_stop(&this->idle); +} + +void cmCTestJobServerWindows::SetFatalCallback(std::function f) +{ + this->FatalCallback = f; +} diff --git a/Source/CTest/cmCTestJobServerWindows.h b/Source/CTest/cmCTestJobServerWindows.h new file mode 100644 index 0000000000000000000000000000000000000000..80f25970ee88b56a3c4a7e4cf1a1bd6191d2dfeb --- /dev/null +++ b/Source/CTest/cmCTestJobServerWindows.h @@ -0,0 +1,70 @@ +/* Distributed under the OSI-approved BSD 3-Clause License. See accompanying + file Copyright.txt or https://cmake.org/licensing for details. */ +#pragma once + +#include "cmConfigure.h" // IWYU pragma: keep + +#include +#include +#include +#include + +#include +#include + +#include +#include + +#include "cmCTestJobServerClient.h" + +// Windows jobserver client implementation +// https://www.gnu.org/software/make/manual/html_node/Windows-Jobserver.html +class cmCTestJobServerWindows : public cmCTestJobServerClient +{ +private: + // Semaphore object + HANDLE semaphore = nullptr; + // As an idle task we wait on the semaphore, and if we get more slots we + // trigger the queue processing + uv_idle_t idle; + static void _wait_for_semaphore(uv_idle_t* handle); + + std::list queue = + std::list(); + + uv_mutex_t mutex; + int usedTokens = 0; + int acquiredTokens = 0; + + int maxSlots = 0; + void _process_queue(); + void _write(size_t slots); + + // A callback to be called if the jobserver client encounters a fatal error + cm::optional> FatalCallback; + +public: + cmCTestJobServerWindows(size_t maxJobs) + : maxSlots(maxJobs) + { + } + ~cmCTestJobServerWindows() override = default; + + cm::string_view Name() const override { return cm::string_view("windows"); } + + /** + * @brief Connect to jobserver using a named semaphore + * @return true if the connection seems successful + */ + bool Connect(uv_loop_t* loop, const wchar_t* name); + /** + * @brief Connect to jobserver using the environment variables + * @return true if the connection seems successful + */ + bool Connect(uv_loop_t* loop); + + void Enqueue(std::function task, size_t slots) override; + void Release(size_t slots) override; + void Close(bool force) override; + void SetFatalCallback(std::function f) override; +}; diff --git a/Source/CTest/cmCTestMultiProcessHandler.cxx b/Source/CTest/cmCTestMultiProcessHandler.cxx index ca07a081eafced40697d82b08c0e2a504939fc4d..c0826f8bb979c5ca853ca1cd1c2ee1a5003c042d 100644 --- a/Source/CTest/cmCTestMultiProcessHandler.cxx +++ b/Source/CTest/cmCTestMultiProcessHandler.cxx @@ -6,6 +6,7 @@ #include #include #include +#include #include // IWYU pragma: keep #include #include @@ -20,6 +21,7 @@ #include #include +#include #include #include @@ -32,6 +34,7 @@ #include "cmAffinity.h" #include "cmCTest.h" #include "cmCTestBinPacker.h" +#include "cmCTestJobServerClient.h" #include "cmCTestRunTest.h" #include "cmCTestTestHandler.h" #include "cmDuration.h" @@ -112,6 +115,49 @@ void cmCTestMultiProcessHandler::SetParallelLevel(size_t level) this->ParallelLevel = level < 1 ? 1 : level; } +void cmCTestMultiProcessHandler::ConnectJobServer() +{ + this->JobServerClient = + cmCTestJobServerClient::Connect(&this->Loop, this->ParallelLevel); + + if (this->JobServerClient.has_value()) { + this->JobServerClient.value()->SetFatalCallback([this]() { + cmCTestLog(this->CTest, ERROR_MESSAGE, + "Fatal error from jobserver" << std::endl); + this->JobServerClient = cm::nullopt; + }); + +// Release jobslots if this process is killed. +#ifdef _WIN32 + const int signals[1] = { SIGINT }; +#else + const int signals[4] = { SIGINT, SIGQUIT, SIGABRT, SIGTERM }; +#endif + + for (int sig : signals) { + cm::uv_signal_ptr signal; + signal.init(this->Loop, this); + signal.start(&cmCTestMultiProcessHandler::SignalHandler, sig); + this->Signals.push_back(std::move(signal)); + } + + cmCTestLog(this->CTest, HANDLER_VERBOSE_OUTPUT, + "Connected to " << this->JobServerClient.value()->Name() + << " jobserver" << std::endl); + } +} + +void cmCTestMultiProcessHandler::DisconnectJobServer() +{ + if (this->JobServerClient.has_value()) { + this->JobServerClient.value()->Close(false); + this->JobServerClient = cm::nullopt; + for (auto& signal : this->Signals) { + signal.stop(); + } + } +} + void cmCTestMultiProcessHandler::SetTestLoad(unsigned long load) { this->TestLoad = load; @@ -126,6 +172,21 @@ void cmCTestMultiProcessHandler::SetTestLoad(unsigned long load) } } +void cmCTestMultiProcessHandler::SignalHandler(uv_signal_t* handle, int signum) +{ + auto* self = static_cast(handle->data); + + cmCTestLog(self->CTest, ERROR_MESSAGE, + "Received signal " << signum << std::endl); + + if (self->JobServerClient.has_value()) { + self->JobServerClient.value()->Close(true); + } + + // Exit with the signal number. + std::exit(signum); +} + void cmCTestMultiProcessHandler::RunTests() { this->CheckResume(); @@ -138,6 +199,7 @@ void cmCTestMultiProcessHandler::RunTests() this->TestHandler->SetMaxIndex(this->FindMaxIndex()); uv_loop_init(&this->Loop); + this->ConnectJobServer(); this->StartNextTests(); uv_run(&this->Loop, UV_RUN_DEFAULT); uv_loop_close(&this->Loop); @@ -156,9 +218,6 @@ bool cmCTestMultiProcessHandler::StartTestProcess(int test) { if (this->HaveAffinity && this->Properties[test]->WantAffinity) { size_t needProcessors = this->GetProcessorsUsed(test); - if (needProcessors > this->ProcessorsAvailable.size()) { - return false; - } std::vector affinity; affinity.reserve(needProcessors); for (size_t i = 0; i < needProcessors; ++i) { @@ -249,6 +308,22 @@ bool cmCTestMultiProcessHandler::StartTestProcess(int test) return false; } + // If there is a jobserver we must wait for a token before starting the test. + if (this->JobServerClient.has_value()) { + // TODO: C++17 supports moving into lambda captures. + cmCTestRunTest* testRunPtr = testRun.release(); + auto task = [this, testRunPtr](size_t slots) { + auto testRunner = std::unique_ptr(testRunPtr); + testRunner->SetJobSlots(slots); + cmCTestRunTest::StartTest(std::move(testRunner), this->Completed, + this->Total); + }; + + this->JobServerClient.value()->Enqueue(task, + this->GetProcessorsUsed(test)); + return true; + } + // Ownership of 'testRun' has moved to another structure. // When the test finishes, FinishTestProcess will be called. return cmCTestRunTest::StartTest(std::move(testRun), this->Completed, @@ -636,6 +711,7 @@ void cmCTestMultiProcessHandler::FinishTestProcess( this->Completed++; int test = runner->GetIndex(); + int slots = runner->GetJobSlots(); auto* properties = runner->GetTestProperties(); cmCTestRunTest::EndTestResult testResult = @@ -667,12 +743,20 @@ void cmCTestMultiProcessHandler::FinishTestProcess( this->DeallocateResources(test); this->UnlockResources(test); this->RunningCount -= this->GetProcessorsUsed(test); + if (this->JobServerClient.has_value()) { + this->JobServerClient.value()->Release(slots); + } for (auto p : properties->Affinity) { this->ProcessorsAvailable.insert(p); } properties->Affinity.clear(); + // If this was the last test, close the jobserver connection + if (this->Completed == this->Total) { + this->DisconnectJobServer(); + } + runner.reset(); if (started) { this->StartNextTests(); diff --git a/Source/CTest/cmCTestMultiProcessHandler.h b/Source/CTest/cmCTestMultiProcessHandler.h index 3b4e9c59ad1871168d8528be0586831e2416ae36..cd2371897c9944704e1bd2872e7bdb8b51b2c608 100644 --- a/Source/CTest/cmCTestMultiProcessHandler.h +++ b/Source/CTest/cmCTestMultiProcessHandler.h @@ -16,6 +16,7 @@ #include #include "cmCTest.h" +#include "cmCTestJobServerClient.h" #include "cmCTestResourceAllocator.h" #include "cmCTestResourceSpec.h" #include "cmCTestTestHandler.h" @@ -60,6 +61,11 @@ public: void SetTests(TestMap& tests, PropertiesMap& properties); // Set the max number of tests that can be run at the same time. void SetParallelLevel(size_t); + + void ConnectJobServer(); + void DisconnectJobServer(); + static void SignalHandler(uv_signal_t* handle, int signum); + void SetTestLoad(unsigned long load); virtual void RunTests(); void PrintOutputAsJson(); @@ -107,6 +113,7 @@ protected: void StartNextTests(); bool StartTestProcess(int test); bool StartTest(int test); + // Mark the checkpoint for the given test void WriteCheckpoint(int index); @@ -172,7 +179,9 @@ protected: // map from test number to set of depend tests TestMap Tests; + // list of tests that are ready to run, sorted by cost TestList SortedTests; + // Total number of tests we'll be running size_t Total; // Number of tests that are complete @@ -181,6 +190,7 @@ protected: std::set ProcessorsAvailable; size_t HaveAffinity; bool StopTimePassed = false; + // list of test properties (indices concurrent to the test map) PropertiesMap Properties; std::map TestRunningMap; @@ -198,10 +208,16 @@ protected: cmCTestResourceAllocator ResourceAllocator; std::vector* TestResults; size_t ParallelLevel; // max number of process that can be run at once + + // GNU Make jobserver client. If there's a valid jobserver connection then + // tests require jobslots before they can be started + cm::optional> JobServerClient; + unsigned long TestLoad; unsigned long FakeLoadForTesting; uv_loop_t Loop; cm::uv_timer_ptr TestLoadRetryTimer; + std::vector Signals; cmCTestTestHandler* TestHandler; cmCTest* CTest; bool HasCycles; diff --git a/Source/CTest/cmCTestRunTest.h b/Source/CTest/cmCTestRunTest.h index 34f23c4ccf79ccf581a728d3b5b75e84271116dc..100fdbd101e1cb1379ed2cb021be231d12ec6487 100644 --- a/Source/CTest/cmCTestRunTest.h +++ b/Source/CTest/cmCTestRunTest.h @@ -47,6 +47,9 @@ public: int GetIndex() { return this->Index; } + void SetJobSlots(size_t slots) { this->JobSlots = slots; } + size_t GetJobSlots() { return this->JobSlots; } + void AddFailedDependency(const std::string& failedTest) { this->FailedDependencies.insert(failedTest); @@ -147,6 +150,8 @@ private: int NumberOfRunsTotal = 1; // default to 1 run of the test bool RunAgain = false; // default to not having to run again size_t TotalNumberOfTests; + // The number of jobserver slots that were used to start this test + size_t JobSlots; }; inline int getNumWidth(size_t n) diff --git a/Tests/RunCMake/CMakeLists.txt b/Tests/RunCMake/CMakeLists.txt index a4f61410392e7205324a5c93c552eb196b785325..bce4ac0e92baf4260204ecd4dbd7177da7834470 100644 --- a/Tests/RunCMake/CMakeLists.txt +++ b/Tests/RunCMake/CMakeLists.txt @@ -178,8 +178,13 @@ if(NOT CMAKE_GENERATOR MATCHES "Visual Studio|Xcode") add_RunCMake_test(CMP0065 -DCMAKE_SYSTEM_NAME=${CMAKE_SYSTEM_NAME}) endif() add_executable(detect_jobserver detect_jobserver.c) +add_executable(sleepy_echo sleepy_echo.c) if(CMAKE_GENERATOR MATCHES "Make") - add_RunCMake_test(Make -DMAKE_IS_GNU=${MAKE_IS_GNU} -DDETECT_JOBSERVER=$) + add_RunCMake_test(Make + -DMAKE_IS_GNU=${MAKE_IS_GNU} + -DDETECT_JOBSERVER=$ + -DSLEEPY_ECHO=$ + ) endif() unset(ninja_test_with_qt_version) unset(ninja_qt_args) diff --git a/Tests/RunCMake/Make/CTestJobServer.cmake b/Tests/RunCMake/Make/CTestJobServer.cmake new file mode 100644 index 0000000000000000000000000000000000000000..4a8c3e8fea254b51b61030146061132a71caccfa --- /dev/null +++ b/Tests/RunCMake/Make/CTestJobServer.cmake @@ -0,0 +1,24 @@ +enable_testing() +function(add_parallel_test NUMBER LEVEL) + add_test(NAME test_${LEVEL}_${NUMBER} COMMAND ${SLEEPY_ECHO} ${NUMBER}) + set_tests_properties(test_${LEVEL}_${NUMBER} PROPERTIES PASS_REGULAR_EXPRESSION ${NUMBER}) + set_tests_properties(test_${LEVEL}_${NUMBER} PROPERTIES PROCESSORS ${LEVEL}) +endfunction() + +set(test_count 24) + +foreach(i RANGE ${test_count}) + add_parallel_test(${i} 1) +endforeach() + +foreach(i RANGE ${test_count}) + add_parallel_test(${i} 2) +endforeach() + +foreach(i RANGE ${test_count}) + add_parallel_test(${i} 3) +endforeach() + +foreach(i RANGE ${test_count}) + add_parallel_test(${i} 4) +endforeach() diff --git a/Tests/RunCMake/Make/CTestJobServer.make b/Tests/RunCMake/Make/CTestJobServer.make new file mode 100644 index 0000000000000000000000000000000000000000..92e637404904f6014699380b69123975bb136ed0 --- /dev/null +++ b/Tests/RunCMake/Make/CTestJobServer.make @@ -0,0 +1,25 @@ +CTEST_FLAGS="--VV" + +test1-4: + +$(CMAKE_CTEST_COMMAND) -j4 $(CTEST_FLAGS) + +test2-4: + + $(CMAKE_CTEST_COMMAND) -j4 $(CTEST_FLAGS) & \ + $(CMAKE_CTEST_COMMAND) -j4 $(CTEST_FLAGS) & \ + wait +.PHONY: test2-4 + +test3-4: + + $(CMAKE_CTEST_COMMAND) -j4 $(CTEST_FLAGS) & \ + $(CMAKE_CTEST_COMMAND) -j4 $(CTEST_FLAGS) & \ + $(CMAKE_CTEST_COMMAND) -j4 $(CTEST_FLAGS) & \ + wait +.PHONY: test3-4 + +test4-4: + + $(CMAKE_CTEST_COMMAND) -j4 $(CTEST_FLAGS) & \ + $(CMAKE_CTEST_COMMAND) -j4 $(CTEST_FLAGS) & \ + $(CMAKE_CTEST_COMMAND) -j4 $(CTEST_FLAGS) & \ + $(CMAKE_CTEST_COMMAND) -j4 $(CTEST_FLAGS) & \ + wait +.PHONY: test4-4 diff --git a/Tests/RunCMake/Make/RunCMakeTest.cmake b/Tests/RunCMake/Make/RunCMakeTest.cmake index 5d1ba4874660734842f23540734f885377442c3b..119f1cde9b140c5f115d7a3b12f9d051c3acef38 100644 --- a/Tests/RunCMake/Make/RunCMakeTest.cmake +++ b/Tests/RunCMake/Make/RunCMakeTest.cmake @@ -79,9 +79,52 @@ function(detect_jobserver_present) run_cmake_command(DetectJobServer-present-parallel-build ${CMAKE_COMMAND} --build . -j4) endfunction() +# Runs CTest in a jobserver environment +function(run_ctest_jobserver) + set(RunCMake_TEST_BINARY_DIR ${RunCMake_BINARY_DIR}/CTestJobServer-build) + set(RunCMake_TEST_OPTIONS "-DSLEEPY_ECHO=${SLEEPY_ECHO}") + run_cmake(CTestJobServer) + set(RunCMake_TEST_NO_CLEAN 1) + set(RunCMake_TEST_TIMEOUT 300) + run_cmake_command(CTestJobServer-build ${CMAKE_COMMAND} --build .) + + set(MAKE_ENV ${CMAKE_COMMAND} -E env CMAKE_COMMAND="${CMAKE_COMMAND}" CMAKE_CTEST_COMMAND="${CMAKE_CTEST_COMMAND}") + function(run_make_rule rule job_count) + run_cmake_command(${rule}-j${job_count} ${MAKE_ENV} ${RunCMake_MAKE_PROGRAM} -f "${RunCMake_SOURCE_DIR}/CTestJobServer.make" ${rule} -j${job_count}) + endfunction() + + # 2 ctest processes, -j4 + run_make_rule(test2-4 1) + run_make_rule(test2-4 2) + run_make_rule(test2-4 3) + run_make_rule(test2-4 4) + run_make_rule(test2-4 5) + run_make_rule(test2-4 8) + + # 3 ctest processes, -j4 + run_make_rule(test3-4 1) + run_make_rule(test3-4 2) + run_make_rule(test3-4 3) + run_make_rule(test3-4 4) + run_make_rule(test3-4 5) + run_make_rule(test3-4 8) + run_make_rule(test3-4 12) + + # 4 ctest processes, -j4 + run_make_rule(test4-4 1) + run_make_rule(test4-4 2) + run_make_rule(test4-4 3) + run_make_rule(test4-4 4) + run_make_rule(test4-4 5) + run_make_rule(test4-4 8) + run_make_rule(test4-4 12) + run_make_rule(test4-4 16) +endfunction() + # Jobservers are currently only supported by GNU makes, except MSYS2 make if(MAKE_IS_GNU AND NOT RunCMake_GENERATOR MATCHES "MSYS Makefiles") detect_jobserver_present() + run_ctest_jobserver() endif() if(MAKE_IS_GNU) diff --git a/Tests/RunCMake/sleepy_echo.c b/Tests/RunCMake/sleepy_echo.c new file mode 100644 index 0000000000000000000000000000000000000000..c06c4d2d863466d4b884610cc8e3dad94a3dfd3a --- /dev/null +++ b/Tests/RunCMake/sleepy_echo.c @@ -0,0 +1,28 @@ +#include +#include +#include + +#ifdef _WIN32 +# include +#else +# include +#endif + +int main(int argc, char* argv[]) +{ + if (argc != 2) { + printf("Usage: %s \n", argv[0]); + return 1; + } + + srand(time(NULL)); + +#ifdef _WIN32 + Sleep(rand() % 500); +#else + usleep((rand() % 500) * 1000); +#endif + + printf("%s\n", argv[1]); + return 0; +}