Commit c88c1d3b authored by T.J. Corona's avatar T.J. Corona Committed by Kitware Robot

Merge topic 'thread-pool'

267c7a52 Enable multithreading for operation execution
cc355b49 Add thread pool for operation execution
Acked-by: Kitware Robot's avatarKitware Robot <kwrobot@kitware.com>
Acked-by: John Tourtellott's avatarJohn Tourtellott <john.tourtellott@kitware.com>
Merge-request: !1495
parents 23b10f4b 267c7a52
Pipeline #132027 running with stage
......@@ -343,6 +343,11 @@ if(SMTK_ENABLE_PARAVIEW_SUPPORT)
find_package(ParaView)
endif()
################################################################################
# Thread Related Settings
################################################################################
find_package(Threads REQUIRED)
################################################################################
# VTK Related Settings
################################################################################
......
## Introduce a thread pool for multithreaded operation execution
To prevent applications from appearing to "hang" when long-running operations
are executed, a thread pool has been introduced for the managed
launching of operation tasks. The use of a thread pool allows for a
finite number of threads to be continuously reused for subsequent
operations, eliminating the overhead and potential bottleneck of
spawning a new thread for each operation.
......@@ -94,6 +94,7 @@ if(SMTK_ENABLE_PYTHON_WRAPPING)
list(APPEND smtkCore_private_link_libraries
pybind11::embed
Threads::Threads
)
add_subdirectory(pybind11)
endif()
......
......@@ -34,6 +34,7 @@ set(commonHeaders
Registry.h
Singleton.h
StringUtil.h
ThreadPool.h
TimeZone.h
TypeName.h
UUID.h
......
//=========================================================================
// Copyright (c) Kitware, Inc.
// All rights reserved.
// See LICENSE.txt for details.
//
// This software is distributed WITHOUT ANY WARRANTY; without even
// the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
// PURPOSE. See the above copyright notice for more information.
//=========================================================================
#ifndef smtk_common_ThreadPool_h
#define smtk_common_ThreadPool_h
#include "smtk/CoreExports.h"
#include <atomic>
#include <condition_variable>
#include <functional>
#include <future>
#include <mutex>
#include <queue>
#include <thread>
#include <type_traits>
#include <vector>
namespace smtk
{
namespace common
{
/// A basic thread pool that executes functors in separate threads. It accepts
/// the number of threads to build at construction, and waits for all tasks to
/// complete before being destroyed. The <ReturnType> is a default-constructible
/// type that tasks return via std::future.
template <typename ReturnType = void>
class ThreadPool
{
static_assert(
std::is_same<ReturnType, void>::value || std::is_default_constructible<ReturnType>::value,
"Templated return type must be void or a default constructible type");
public:
ThreadPool(unsigned int maxThreads = 0);
~ThreadPool();
/// Add a task to be performed by the thread queue. Once a thread becomes
/// available, it will pop the task from the queue and execute it. The return
/// value can be accessed from the returned future.
template <typename Function, typename... Types>
std::future<ReturnType> operator()(Function&& function, Types&&... args)
{
return appendToQueue(std::bind(function, std::forward<Types>(args)...));
}
private:
/// Append a functor with no inputs to the task queue. This is used in tandem
/// with std::bind to construct the class's call method.
std::future<ReturnType> appendToQueue(std::function<ReturnType()>&& task);
/// Run by a worker thread: poll the task queue for tasks to perform.
void exec();
std::condition_variable m_condition;
std::mutex m_queueMutex;
std::vector<std::thread> m_threads;
std::queue<std::packaged_task<ReturnType()> > m_queue;
std::atomic<bool> m_active;
};
template <typename ReturnType>
ThreadPool<ReturnType>::ThreadPool(unsigned int maxThreads)
: m_active(true)
{
// The maximum number of threads is either defined at construction or (by
// default) the maximum number of threads the hardware is configured to run in
// parallel.
unsigned int nThreads = (maxThreads == 0 ? std::thread::hardware_concurrency() : maxThreads);
for (unsigned int i = 0; i < nThreads; ++i)
{
m_threads.push_back(std::thread(&ThreadPool::exec, this));
}
}
template <typename ReturnType>
ThreadPool<ReturnType>::~ThreadPool()
{
// Change the state of the thread pool to signify that threads should no
// longer run.
m_active = false;
// For each thread in the pool, send a task that will cause it to exit its
// infinite loop.
for (std::size_t i = 0; i < m_threads.size(); i++)
{
(*this)([] { return ReturnType(); });
}
// Now that all of the threads have exited, join each thread with the parent
// thread.
for (auto& thread : m_threads)
{
thread.join();
}
}
template <typename ReturnType>
std::future<ReturnType> ThreadPool<ReturnType>::appendToQueue(std::function<ReturnType()>&& task)
{
std::future<ReturnType> future;
// Scope access to the queue.
{
std::unique_lock<std::mutex> queueLock(m_queueMutex);
// Construct a packaged_task to launch the input task and access its
// future.
m_queue.emplace(task);
future = m_queue.back().get_future();
}
// Signal to the next thread that the queue is ready for access.
m_condition.notify_one();
// Return the future associated with the promise created above.
return future;
}
template <typename ReturnType>
void ThreadPool<ReturnType>::exec()
{
// Always check if the containing class has signaled that the thread pool
// should no longer be active.
while (m_active)
{
std::packaged_task<ReturnType()> task;
// Scope access to the queue.
{
std::unique_lock<std::mutex> queueLock(m_queueMutex);
// Access a task from the queue.
m_condition.wait(queueLock, [this] { return !m_queue.empty(); });
task = std::move(m_queue.front());
m_queue.pop();
}
// Execute the task.
task();
}
}
}
}
#endif // smtk_common_ThreadPool_h
......@@ -35,10 +35,12 @@ set(unit_tests
UnitTestDateTimeZonePair.cxx
UnitTestLinks
UnitTestRegistry.cxx
UnitTestThreadPool
)
smtk_unit_tests(
Label "Common"
SOURCES ${unit_tests}
LIBRARIES smtkCore ${Boost_LIBRARIES}
LIBRARIES smtkCore ${Boost_LIBRARIES} Threads::Threads
)
//=========================================================================
// Copyright (c) Kitware, Inc.
// All rights reserved.
// See LICENSE.txt for details.
//
// This software is distributed WITHOUT ANY WARRANTY; without even
// the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
// PURPOSE. See the above copyright notice for more information.
//=========================================================================
#include "smtk/common/ThreadPool.h"
#include "smtk/common/testing/cxx/helpers.h"
#include <chrono>
#include <iostream>
namespace
{
void task3(int i)
{
std::cout << "Hello from task 3, worker thread " << i << std::endl;
}
}
int UnitTestThreadPool(int, char** const)
{
{
smtk::common::ThreadPool<void>* pool = new smtk::common::ThreadPool<void>();
(*pool)([] { std::cout << "Hello from one of the worker threads" << std::endl; });
auto task2 = [](int i) { std::cout << "Hello from worker thread " << i << std::endl; };
(*pool)(std::bind(task2, 3));
(*pool)(task3, 5);
std::this_thread::sleep_for(std::chrono::milliseconds(100));
(*pool)([] {
std::cout << "forcing worker to outlive pool" << std::endl;
std::this_thread::sleep_for(std::chrono::milliseconds(100));
});
delete pool;
}
{
smtk::common::ThreadPool<int>* pool = new smtk::common::ThreadPool<int>();
std::future<int> result = (*pool)([] { return 1; });
smtkTest(result.get() == 1, "Returned result doesn't match input");
auto task2 = [](int i) {
std::cout << "Hello from worker thread " << i << std::endl;
return i;
};
(*pool)(std::bind(task2, 3));
std::this_thread::sleep_for(std::chrono::milliseconds(100));
(*pool)([] {
std::cout << "forcing worker to outlive pool" << std::endl;
std::this_thread::sleep_for(std::chrono::milliseconds(100));
return 2;
});
delete pool;
}
return 0;
}
......@@ -10,7 +10,7 @@
#include "smtk/extension/qt/qtOperationLauncher.h"
#define SINGLE_THREAD
// #define SINGLE_THREAD
namespace smtk
{
......@@ -19,6 +19,8 @@ namespace extension
std::future<smtk::operation::Operation::Result> qtOperationLauncher::operator()(
const smtk::operation::Operation::Ptr& op)
{
#ifdef SINGLE_THREAD
// Construct a promise to pass to the subthread. Its associated future is the
// output of this method.
std::promise<smtk::operation::Operation::Result> promise;
......@@ -27,8 +29,6 @@ std::future<smtk::operation::Operation::Result> qtOperationLauncher::operator()(
// subthread.
std::future<smtk::operation::Operation::Result> future = promise.get_future();
#ifdef SINGLE_THREAD
// Execute the operation in the subthread.
auto result = op->operate();
......@@ -40,15 +40,6 @@ std::future<smtk::operation::Operation::Result> qtOperationLauncher::operator()(
#else
// Construct a thread to execute the operation. It takes:
//
// 1. a pointer to this class so its emitted signal can be accessed by our
// associated slot,
// 2. a shared pointer to the operation by value to prevent the underlying
// operation from being changed before the operation completes, and
// 3. the associated promise to this method's returned future.
std::thread thread(&qtOperationLauncher::run, this, op, std::move(promise));
// Construct a copy of the operation shared pointer to pass to the subsequent
// lambda to prevent the underlying operation from being changed before the
// operation completes.
......@@ -73,9 +64,8 @@ std::future<smtk::operation::Operation::Result> qtOperationLauncher::operator()(
delete connection;
});
// We have the future result and we have made the one-shot connection to the
// output. We no longer need to track this thread, so we detach it.
thread.detach();
std::future<smtk::operation::Operation::Result> future =
m_threadPool(&qtOperationLauncher::run, this, op);
#endif
......@@ -83,18 +73,17 @@ std::future<smtk::operation::Operation::Result> qtOperationLauncher::operator()(
return future;
}
void qtOperationLauncher::run(smtk::operation::Operation::Ptr operation,
std::promise<smtk::operation::Operation::Result>&& promise)
smtk::operation::Operation::Result qtOperationLauncher::run(
smtk::operation::Operation::Ptr operation)
{
// Execute the operation in the subthread.
auto result = operation->operate();
// Set the promise to the output result.
promise.set_value(result);
// Privately emit the name of the output result so the contents of this class
// that reside on the original thread can access it.
emit operationHasResult(QString::fromStdString(result->name()), QPrivateSignal());
return result;
}
}
}
......@@ -16,6 +16,8 @@
#include "smtk/attribute/Attribute.h"
#include "smtk/attribute/Resource.h"
#include "smtk/common/ThreadPool.h"
#include "smtk/operation/Operation.h"
#include <QObject>
......@@ -55,8 +57,9 @@ signals:
private:
/// Internal method run on a subthread to invoke the operation.
void run(smtk::operation::Operation::Ptr operation,
std::promise<smtk::operation::Operation::Result>&& promise);
smtk::operation::Operation::Result run(smtk::operation::Operation::Ptr operation);
smtk::common::ThreadPool<smtk::operation::Operation::Result> m_threadPool;
};
namespace qt
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment