diff --git a/include/diy/master.hpp b/include/diy/master.hpp index 09fee1df5a402c58a103878680a67bba7a2ad67f..dab2ee0d185abfcc663290a295aab1c253cfc648 100644 --- a/include/diy/master.hpp +++ b/include/diy/master.hpp @@ -137,8 +137,8 @@ namespace diy using IncomingQueues = concurrent_map<int, RecordQueue>; // gid -> [(size, external, buffer), ...] using OutgoingQueues = concurrent_map<BlockID, RecordQueue>; // bid -> [(size, external, buffer), ...] - using IncomingQueuesMap = concurrent_map<int, IncomingQueues>; // gid -> { gid -> [(size, external, buffer), ...]} - using OutgoingQueuesMap = concurrent_map<int, OutgoingQueues>; // gid -> { bid -> [(size, external, buffer), ...]} + using IncomingQueuesMap = std::map<int, IncomingQueues>; // gid -> { gid -> [(size, external, buffer), ...]} + using OutgoingQueuesMap = std::map<int, OutgoingQueues>; // gid -> { bid -> [(size, external, buffer), ...]} struct IncomingRound { @@ -670,6 +670,13 @@ iexchange_(const ICallback<Block>& f, ++exchange_round_; exchange_round_annotation.set(exchange_round_); + // touch the outgoing and incoming queues to make sure they exist + for (unsigned i = 0; i < size(); ++i) + { + outgoing(gid(i)); + incoming(gid(i)); + } + //IExchangeInfoDUD iexchange(comm_, min_queue_size, max_hold_time, fine, prof); IExchangeInfoCollective iexchange(comm_, min_queue_size, max_hold_time, fine, prof); iexchange.add_work(size()); // start with one work unit for each block @@ -715,18 +722,18 @@ iexchange_(const ICallback<Block>& f, { ProxyWithLink cp = proxy(i, &iexchange); done = f(block<Block>(i), cp); + if (done_result[gid] ^ done) // status changed + { + if (done) + iexchange.dec_work(); + else + iexchange.inc_work(); + } } // NB: we need cp to go out of scope and copy out its queues before we can decrement the work iexchange.dec_work(); prof >> "callback"; ++work_done; } - if (done_result[gid] ^ done) // status changed - { - if (done) - iexchange.dec_work(); - else - iexchange.inc_work(); - } done_result[gid] = done; log->debug("Done: {}", done); } @@ -877,16 +884,6 @@ icommunicate(IExchangeInfo* iexchange) log->debug("Entering icommunicate()"); - // lock out other threads - // TODO: not threaded yet - // if (!CAS(comm_flag, 0, 1)) - // return; - - // debug -// log->info("out_queues_limit: {}", out_queues_limit); - - // order gids - auto gid_order = order_gids(); // exchange @@ -916,8 +913,12 @@ send_queue(int from_gid, stats::Annotation::Guard gq( stats::Annotation("diy.q-size").set(stats::Variant(static_cast<uint64_t>(qr.size()))) ); // skip empty queues and hold queues shorter than some limit for some time - if ( iexchange && (qr.size() == 0 || iexchange->hold(qr.size())) ) - return; + if (iexchange) + { + assert(qr.size() != 0); + if (iexchange->hold(qr.size())) + return; + } log->debug("[{}] Sending queue: {} <- {} of size {}, iexchange = {}", comm_.rank(), to_gid, from_gid, qr.size(), iexchange ? 1 : 0); if (iexchange) @@ -940,23 +941,26 @@ send_outgoing_queues(GidSendOrder& gid_order, if (iexchange) // for iexchange, send queues from a single block { - OutgoingQueues& outgoing = outgoing_[iexchange->from_gid]; - for (auto& x : outgoing) + for (int from : gid_order.list) { - BlockID to_block = x.first; - int to_gid = to_block.gid; - int to_proc = to_block.proc; - - auto access = x.second.access(); - while (!access->empty()) + OutgoingQueues& outgoing = this->outgoing(from); + for (auto& x : outgoing) { - auto qr = std::move(access->front()); - access->pop_front(); - access.unlock(); // others can push on this queue, while we are working - assert(!qr.external()); - log->debug("Processing queue: {} <- {} of size {}", to_gid, iexchange->from_gid, qr.size()); - send_queue(iexchange->from_gid, to_gid, to_proc, qr, remote, iexchange); - access.lock(); + BlockID to_block = x.first; + int to_gid = to_block.gid; + int to_proc = to_block.proc; + + auto access = x.second.access(); + while (!access->empty()) + { + auto qr = std::move(access->front()); + access->pop_front(); + access.unlock(); // others can push on this queue, while we are working + assert(!qr.external()); + log->debug("Processing queue: {} <- {} of size {}", to_gid, from, qr.size()); + send_queue(from, to_gid, to_proc, qr, remote, iexchange); + access.lock(); + } } } }