diff --git a/include/diy/master.hpp b/include/diy/master.hpp index f568018e2a6b35f32febccbea1d7cdf8f3d3cc3b..0ea42fd35be43135a1a86d491135321f5c2aea35 100644 --- a/include/diy/master.hpp +++ b/include/diy/master.hpp @@ -134,11 +134,11 @@ namespace diy using RecordQueue = critical_resource<std::deque<QueueRecord>>; - using IncomingQueues = std::map<int, RecordQueue>; // gid -> [(size, external, buffer), ...] - using OutgoingQueues = std::map<BlockID, RecordQueue>; // bid -> [(size, external, buffer), ...] + using IncomingQueues = concurrent_map<int, RecordQueue>; // gid -> [(size, external, buffer), ...] + using OutgoingQueues = concurrent_map<BlockID, RecordQueue>; // bid -> [(size, external, buffer), ...] - using IncomingQueuesMap = std::map<int, IncomingQueues>; // gid -> { gid -> [(size, external, buffer), ...]} - using OutgoingQueuesMap = std::map<int, OutgoingQueues>; // gid -> { bid -> [(size, external, buffer), ...]} + using IncomingQueuesMap = concurrent_map<int, IncomingQueues>; // gid -> { gid -> [(size, external, buffer), ...]} + using OutgoingQueuesMap = concurrent_map<int, OutgoingQueues>; // gid -> { bid -> [(size, external, buffer), ...]} struct IncomingRound { @@ -674,19 +674,24 @@ iexchange_(const ICallback<Block>& f, IExchangeInfoCollective iexchange(comm_, min_queue_size, max_hold_time, fine, prof); iexchange.add_work(size()); // start with one work unit for each block + fast_mutex m; // FIXME #if !defined(DIY_NO_THREADS) - auto comm_thread = std::thread([this,&iexchange]() + auto comm_thread = std::thread([this,&iexchange,&m]() { while(!iexchange.all_done()) { + lock_guard<fast_mutex> lock(m); // FIXME icommunicate(&iexchange); iexchange.control(); } }); #endif - auto empty_incoming = [this](int gid) + auto empty_incoming = [this,&m](int gid) { +#if !defined(DIY_NO_THREADS) + lock_guard<fast_mutex> lock(m); // FIXME +#endif for (auto& x : incoming(gid)) if (!x.second.access()->empty()) return false; @@ -710,8 +715,13 @@ iexchange_(const ICallback<Block>& f, { prof << "callback"; iexchange.inc_work(); // even if we remove the queues, when constructing the proxy, we still have work to do - ProxyWithLink cp = proxy(i, &iexchange); - done = f(block<Block>(i), cp); + { +#if !defined(DIY_NO_THREADS) + lock_guard<fast_mutex> lock(m); // FIXME +#endif + ProxyWithLink cp = proxy(i, &iexchange); + done = f(block<Block>(i), cp); + } // NB: we need cp to go out of scope and copy out its queues before we can decrement the work iexchange.dec_work(); prof >> "callback"; } diff --git a/include/diy/no-thread.hpp b/include/diy/no-thread.hpp index 219d6ce30288d303e2a47a9231820ad22c983d22..ee8c59febc7aff6218eb0ff67667985c4ae9aab5 100644 --- a/include/diy/no-thread.hpp +++ b/include/diy/no-thread.hpp @@ -35,6 +35,9 @@ namespace diy void unlock() {} }; + template<class T, class U> + using concurrent_map = std::map<T,U>; + namespace this_thread { inline unsigned long int get_id() { return 0; } diff --git a/include/diy/thread.hpp b/include/diy/thread.hpp index 1c9149a42e31715d53d3eff7ce4739d9af05c525..f80476bf6ffff6e00d170d75090f71b673710c8c 100644 --- a/include/diy/thread.hpp +++ b/include/diy/thread.hpp @@ -22,10 +22,61 @@ namespace diy template<class Mutex> using lock_guard = std::unique_lock<Mutex>; + + template<class T, class U> + struct concurrent_map; } #endif #include "critical-resource.hpp" +#if !defined(DIY_NO_THREADS) +template<class T, class U> +struct diy::concurrent_map +{ + using Map = std::map<T,U>; + using SharedPtr = std::shared_ptr<lock_guard<fast_mutex>>; + + template<class MapIterator> + struct iterator_ + { + MapIterator it; + SharedPtr lock_ptr; + + iterator_(const MapIterator& it_, const SharedPtr& lock_ptr_ = SharedPtr()): + it(it_), lock_ptr(lock_ptr_) {} + + iterator_& operator++() { ++it; return *this; } + iterator_ operator++(int) { iterator_ retval = *this; ++(*this); return retval; } + + bool operator==(const iterator_& other) const { return it == other.it;} + bool operator!=(const iterator_& other) const { return !(*this == other); } + + decltype(*it) operator*() const { return *it; } + decltype(it.operator->()) operator->() const { return it.operator->(); } + }; + + using iterator = iterator_<typename Map::iterator>; + using const_iterator = iterator_<typename Map::const_iterator>; + + U& operator[](const T& x) { lock_guard<fast_mutex> l(mutex_); return map_[x]; } + + iterator begin() { auto p = std::make_shared<lock_guard<fast_mutex>>(mutex_); return iterator(map_.begin(), p); } + iterator end() { return iterator(map_.end()); } + + const_iterator begin() const { auto p = std::make_shared<lock_guard<fast_mutex>>(mutex_); return const_iterator(map_.begin(), p); } + const_iterator end() const { return const_iterator(map_.end()); } + + iterator find(const T& x) { auto p = std::make_shared<lock_guard<fast_mutex>>(mutex_); return iterator(map_.find(x), p); } + const_iterator find(const T& x) const { auto p = std::make_shared<lock_guard<fast_mutex>>(mutex_); return const_iterator(map_.find(x), p); } + + void clear() { lock_guard<fast_mutex> l(mutex_); map_.clear(); } + bool empty() { lock_guard<fast_mutex> l(mutex_); return map_.empty(); } + + Map map_; + mutable fast_mutex mutex_; +}; +#endif + #endif