Commit c89f633d authored by Robert Maynard's avatar Robert Maynard Committed by Kitware Robot

Merge topic 'fix_diy2_unused_call_result_warning'

5fa086f7 Merge branch 'upstream-diy' into fix_diy2_unused_call_result_warning
c445f969 diy 2019-01-08 (839fd11e)
Acked-by: Kitware Robot's avatarKitware Robot <kwrobot@kitware.com>
Acked-by: Sujin Philip's avatarSujin Philip <sujin.philip@kitware.com>
Merge-request: !1511
parents ca737437 5fa086f7
......@@ -121,7 +121,7 @@ namespace diy
inline std::tuple<bool,int>
get_rank(int& rk, int gid) const;
inline void set_rank(int rk, int gid, bool flush = true);
inline void set_rank(const int& rk, int gid, bool flush = true);
inline void set_ranks(const std::vector<std::tuple<int,int>>& rank_gids);
std::tuple<int,int>
......@@ -223,7 +223,7 @@ diy::DynamicAssigner::
ranks(const std::vector<int>& gids) const
{
bool all_cached = true;
std::vector<int> result(gids.size());
std::vector<int> result(gids.size(), -1);
for (size_t i = 0; i < gids.size(); ++i)
{
auto cached_gidrk = get_rank(result[i], gids[i]);
......@@ -239,7 +239,7 @@ ranks(const std::vector<int>& gids) const
void
diy::DynamicAssigner::
set_rank(int rk, int gid, bool flush)
set_rank(const int& rk, int gid, bool flush)
{
// TODO: update cache
......
......@@ -3,20 +3,14 @@
namespace diy
{
// TODO: when not running under C++11, i.e., when lock_guard is TinyThread's
// lock_guard, and not C++11's unique_lock, this implementation might
// be buggy since the copy constructor is invoked when
// critical_resource::access() returns an instance of this class. Once
// the temporary is destroyed the mutex is unlocked. I'm not 100%
// certain of this because I'd expect a deadlock on copy constructor,
// but it's clearly not happening -- so I may be missing something.
// (This issue will take care of itself in DIY3 once we switch to C++11 completely.)
template<class T, class Mutex>
class resource_accessor
{
public:
resource_accessor(T& x, Mutex& m):
x_(x), lock_(m) {}
resource_accessor(resource_accessor&&) = default;
resource_accessor(const resource_accessor&) = delete;
T& operator*() { return x_; }
T* operator->() { return &x_; }
......
#ifndef DIY_COLLECTIVES_HPP
#define DIY_COLLECTIVES_HPP
namespace diy
{
namespace detail
{
struct CollectiveOp
{
virtual void init() =0;
virtual void update(const CollectiveOp& other) =0;
virtual void global(const mpi::communicator& comm) =0;
virtual void copy_from(const CollectiveOp& other) =0;
virtual void result_out(void* dest) const =0;
virtual ~CollectiveOp() {}
};
template<class T, class Op>
struct AllReduceOp: public CollectiveOp
{
AllReduceOp(const T& x, Op op):
in_(x), op_(op) {}
void init() { out_ = in_; }
void update(const CollectiveOp& other) { out_ = op_(out_, static_cast<const AllReduceOp&>(other).in_); }
void global(const mpi::communicator& comm) { T res; mpi::all_reduce(comm, out_, res, op_); out_ = res; }
void copy_from(const CollectiveOp& other) { out_ = static_cast<const AllReduceOp&>(other).out_; }
void result_out(void* dest) const { *reinterpret_cast<T*>(dest) = out_; }
private:
T in_, out_;
Op op_;
};
template<class T>
struct Scratch: public CollectiveOp
{
Scratch(const T& x):
x_(x) {}
void init() {}
void update(const CollectiveOp&) {}
void global(const mpi::communicator&) {}
void copy_from(const CollectiveOp&) {}
void result_out(void* dest) const { *reinterpret_cast<T*>(dest) = x_; }
private:
T x_;
};
}
}
#endif
namespace diy
{
namespace detail
{
struct CollectiveOp
{
virtual void init() =0;
virtual void update(const CollectiveOp& other) =0;
virtual void global(const mpi::communicator& comm) =0;
virtual void copy_from(const CollectiveOp& other) =0;
virtual void result_out(void* dest) const =0;
virtual ~CollectiveOp() {}
};
template<class T, class Op>
struct AllReduceOp: public CollectiveOp
{
AllReduceOp(const T& x, Op op):
in_(x), op_(op) {}
void init() { out_ = in_; }
void update(const CollectiveOp& other) { out_ = op_(out_, static_cast<const AllReduceOp&>(other).in_); }
void global(const mpi::communicator& comm) { T res; mpi::all_reduce(comm, out_, res, op_); out_ = res; }
void copy_from(const CollectiveOp& other) { out_ = static_cast<const AllReduceOp&>(other).out_; }
void result_out(void* dest) const { *reinterpret_cast<T*>(dest) = out_; }
private:
T in_, out_;
Op op_;
};
template<class T>
struct Scratch: public CollectiveOp
{
Scratch(const T& x):
x_(x) {}
void init() {}
void update(const CollectiveOp&) {}
void global(const mpi::communicator&) {}
void copy_from(const CollectiveOp&) {}
void result_out(void* dest) const { *reinterpret_cast<T*>(dest) = x_; }
private:
T x_;
};
}
struct Master::Collective
{
Collective():
cop_(0) {}
Collective(detail::CollectiveOp* cop):
cop_(cop) {}
Collective(Collective&& other):
cop_(0) { swap(const_cast<Collective&>(other)); }
~Collective() { delete cop_; }
Collective& operator=(const Collective& other) = delete;
Collective(Collective& other) = delete;
void init() { cop_->init(); }
void swap(Collective& other) { std::swap(cop_, other.cop_); }
void update(const Collective& other) { cop_->update(*other.cop_); }
void global(const mpi::communicator& c) { cop_->global(c); }
void copy_from(Collective& other) const { cop_->copy_from(*other.cop_); }
void result_out(void* x) const { cop_->result_out(x); }
detail::CollectiveOp* cop_;
};
struct Master::CollectivesList: public std::list<Collective>
{};
struct Master::CollectivesMap: public std::map<int, CollectivesList>
{};
}
diy::Master::CollectivesMap&
diy::Master::
collectives()
{
return *collectives_;
}
diy::Master::CollectivesList&
diy::Master::
collectives(int gid__)
{
return (*collectives_)[gid__];
}
void
diy::Master::
process_collectives()
{
auto scoped = prof.scoped("collectives");
DIY_UNUSED(scoped);
if (collectives().empty())
return;
using CollectivesIterator = CollectivesList::iterator;
std::vector<CollectivesIterator> iters;
std::vector<int> gids;
for (auto& x : collectives())
{
gids.push_back(x.first);
iters.push_back(x.second.begin());
}
while (iters[0] != collectives().begin()->second.end())
{
iters[0]->init();
for (unsigned j = 1; j < iters.size(); ++j)
{
// NB: this assumes that the operations are commutative
iters[0]->update(*iters[j]);
}
iters[0]->global(comm_); // do the mpi collective
for (unsigned j = 1; j < iters.size(); ++j)
{
iters[j]->copy_from(*iters[0]);
++iters[j];
}
++iters[0];
}
}
namespace diy
{
struct Master::BaseCommand
{
virtual ~BaseCommand() {} // to delete derived classes
virtual void execute(void* b, const ProxyWithLink& cp) const =0;
virtual bool skip(int i, const Master& master) const =0;
};
template<class Block>
struct Master::Command: public BaseCommand
{
Command(Callback<Block> f_, const Skip& s_):
f(f_), s(s_) {}
void execute(void* b, const ProxyWithLink& cp) const override { f(static_cast<Block*>(b), cp); }
bool skip(int i, const Master& m) const override { return s(i,m); }
Callback<Block> f;
Skip s;
};
}
namespace diy
{
struct Master::tags { enum { queue, piece }; };
struct Master::MessageInfo
{
int from, to;
int round;
};
struct Master::InFlightSend
{
std::shared_ptr<MemoryBuffer> message;
mpi::request request;
MessageInfo info; // for debug purposes
};
struct Master::InFlightRecv
{
MemoryBuffer message;
MessageInfo info { -1, -1, -1 };
bool done = false;
inline void recv(mpi::communicator& comm, const mpi::status& status);
inline void place(IncomingRound* in, bool unload, ExternalStorage* storage, IExchangeInfo* iexchange);
void reset() { *this = InFlightRecv(); }
};
struct Master::InFlightRecvsMap: public std::map<int, InFlightRecv>
{};
struct Master::InFlightSendsList: public std::list<InFlightSend>
{};
struct Master::GidSendOrder
{
size_t size() const { return list.size(); }
bool empty() const { return list.empty(); }
int pop() { int x = list.front(); list.pop_front(); return x; }
std::list<int> list;
size_t limit = 0;
};
struct Master::IExchangeInfo
{
IExchangeInfo():
n(0) {}
IExchangeInfo(size_t n_, mpi::communicator comm_):
n(n_),
comm(comm_),
global_work_(new mpi::window<int>(comm, 1)) { global_work_->lock_all(MPI_MODE_NOCHECK); }
~IExchangeInfo() { global_work_->unlock_all(); }
inline void not_done(int gid);
inline int global_work(); // get global work status (for debugging)
inline bool all_done(); // get global all done status
inline void reset_work(); // reset global work counter
inline int add_work(int work); // add work to global work counter
int inc_work() { return add_work(1); } // increment global work counter
int dec_work() { return add_work(-1); } // decremnent global work counter
size_t n;
mpi::communicator comm;
std::unordered_map<int, bool> done; // gid -> done
std::unique_ptr<mpi::window<int>> global_work_; // global work to do
std::shared_ptr<spd::logger> log = get_logger();
};
// VectorWindow is used to send and receive subsets of a contiguous array in-place
namespace detail
{
template <typename T>
struct VectorWindow
{
T *begin;
size_t count;
};
} // namespace detail
namespace mpi
{
namespace detail
{
template<typename T> struct is_mpi_datatype< diy::detail::VectorWindow<T> > { typedef true_type type; };
template <typename T>
struct mpi_datatype< diy::detail::VectorWindow<T> >
{
using VecWin = diy::detail::VectorWindow<T>;
static MPI_Datatype datatype() { return get_mpi_datatype<T>(); }
static const void* address(const VecWin& x) { return x.begin; }
static void* address(VecWin& x) { return x.begin; }
static int count(const VecWin& x) { return static_cast<int>(x.count); }
};
}
} // namespace mpi::detail
} // namespace diy
void
diy::Master::IExchangeInfo::
not_done(int gid)
{
if (done[gid])
{
done[gid] = false;
int work = inc_work();
log->debug("[{}] Incrementing work when switching done (on receipt): work = {}\n", gid, work);
} else
log->debug("[{}] Not done, no need to increment work\n", gid);
}
diy::Master::InFlightRecv&
diy::Master::
inflight_recv(int proc)
{
return (*inflight_recvs_)[proc];
}
diy::Master::InFlightSendsList&
diy::Master::inflight_sends()
{
return *inflight_sends_;
}
// receive message described by status
void
diy::Master::InFlightRecv::
recv(mpi::communicator& comm, const mpi::status& status)
{
if (info.from == -1) // uninitialized
{
MemoryBuffer bb;
comm.recv(status.source(), status.tag(), bb.buffer);
if (status.tag() == tags::piece) // first piece is the header
{
size_t msg_size;
diy::load(bb, msg_size);
diy::load(bb, info);
message.buffer.reserve(msg_size);
}
else // tags::queue
{
diy::load_back(bb, info);
message.swap(bb);
}
}
else
{
size_t start_idx = message.buffer.size();
size_t count = status.count<char>();
message.buffer.resize(start_idx + count);
detail::VectorWindow<char> window;
window.begin = &message.buffer[start_idx];
window.count = count;
comm.recv(status.source(), status.tag(), window);
}
if (status.tag() == tags::queue)
done = true;
}
// once the InFlightRecv is done, place it either out of core or in the appropriate incoming queue
void
diy::Master::InFlightRecv::
place(IncomingRound* in, bool unload, ExternalStorage* storage, IExchangeInfo* iexchange)
{
size_t size = message.size();
int from = info.from;
int to = info.to;
int external = -1;
if (unload)
{
get_logger()->debug("Directly unloading queue {} <- {}", to, from);
external = storage->put(message); // unload directly
}
else if (!iexchange)
{
in->map[to].queues[from].swap(message);
in->map[to].queues[from].reset(); // buffer position = 0
}
else // iexchange
{
auto log = get_logger();
iexchange->not_done(to);
in->map[to].queues[from].append_binary(&message.buffer[0], message.size()); // append insted of overwrite
int work = iexchange->dec_work();
log->debug("[{}] Decrementing work after receiving: work = {}\n", to, work);
}
in->map[to].records[from] = QueueRecord(size, external);
++(in->received);
}
struct diy::Master::ProcessBlock
{
ProcessBlock(Master& master_,
const std::deque<int>& blocks__,
int local_limit_,
critical_resource<int>& idx_):
master(master_),
blocks(blocks__),
local_limit(local_limit_),
idx(idx_)
{}
ProcessBlock(const ProcessBlock&) = delete;
ProcessBlock(ProcessBlock&&) = default;
void operator()()
{
master.log->debug("Processing with thread: {}", this_thread::get_id());
std::vector<int> local;
do
{
int cur = (*idx.access())++;
if ((size_t)cur >= blocks.size())
return;
int i = blocks[cur];
if (master.block(i))
{
if (local.size() == (size_t)local_limit)
master.unload(local);
local.push_back(i);
}
master.log->debug("Processing block: {}", master.gid(i));
bool skip = all_skip(i);
IncomingQueuesMap &current_incoming = master.incoming_[master.exchange_round_].map;
if (master.block(i) == 0) // block unloaded
{
if (skip)
master.load_queues(i); // even though we are skipping the block, the queues might be necessary
else
{
if (local.size() == (size_t)local_limit) // reached the local limit
master.unload(local);
master.load(i);
local.push_back(i);
}
}
for (auto& cmd : master.commands_)
{
cmd->execute(skip ? 0 : master.block(i), master.proxy(i));
// no longer need them, so get rid of them
current_incoming[master.gid(i)].queues.clear();
current_incoming[master.gid(i)].records.clear();
}
if (skip && master.block(i) == 0)
master.unload_queues(i); // even though we are skipping the block, the queues might be necessary
} while(true);
}
bool all_skip(int i) const
{
bool skip = true;
for (auto& cmd : master.commands_)
{
if (!cmd->skip(i, master))
{
skip = false;
break;
}
}
return skip;
}
Master& master;
const std::deque<int>& blocks;
int local_limit;
critical_resource<int>& idx;
};
void
diy::Master::
execute()
{
log->debug("Entered execute()");
auto scoped = prof.scoped("execute");
DIY_UNUSED(scoped);
//show_incoming_records();
// touch the outgoing and incoming queues as well as collectives to make sure they exist
for (unsigned i = 0; i < size(); ++i)
{
outgoing(gid(i));
incoming(gid(i)); // implicitly touches queue records
collectives(gid(i));
}
if (commands_.empty())
return;
// Order the blocks, so the loaded ones come first
std::deque<int> blocks;
for (unsigned i = 0; i < size(); ++i)
if (block(i) == 0)
blocks.push_back(i);
else
blocks.push_front(i);
// don't use more threads than we can have blocks in memory
int num_threads;
int blocks_per_thread;
if (limit_ == -1)
{
num_threads = threads_;
blocks_per_thread = size();
}
else
{
num_threads = std::min(threads_, limit_);
blocks_per_thread = limit_/num_threads;
}
// idx is shared
critical_resource<int> idx(0);
if (num_threads > 1)
{
// launch the threads
std::list<thread> threads;
for (unsigned i = 0; i < (unsigned)num_threads; ++i)
threads.emplace_back(ProcessBlock(*this, blocks, blocks_per_thread, idx));
for (auto& t : threads)
t.join();
} else
{
ProcessBlock(*this, blocks, blocks_per_thread, idx)();
}
// clear incoming queues
incoming_[exchange_round_].map.clear();
if (limit() != -1 && in_memory() > limit())
throw std::runtime_error(fmt::format("Fatal: {} blocks in memory, with limit {}", in_memory(), limit()));
// clear commands
commands_.clear();
}
......@@ -47,7 +47,14 @@ struct GridRef
GridRef& operator=(C value) { Index s = size(); for (Index i = 0; i < s; ++i) data_[i] = value; return *this; }