Skip to content
Snippets Groups Projects
Commit 8acfd6b1 authored by Ben Boeckel's avatar Ben Boeckel
Browse files

Merge branch 'upstream-diy2' into HEAD

* upstream-diy2:
  diy2 2018-10-08 (2ea6ec38)
parents aed127dc e55f5f9f
No related branches found
No related tags found
No related merge requests found
if (NOT VTK_INSTALL_NO_DEVELOPMENT) vtk_module_install_headers(
install(DIRECTORY include/vtkdiy2/ SUBDIR "vtkdiy2"
DESTINATION "${VTK_INSTALL_INCLUDE_DIR}/vtkdiy2" DIRECTORIES "include")
COMPONENT Development)
endif()
#ifndef DIY_IO_SHARED_HPP
#define DIY_IO_SHARED_HPP
#include <sstream>
#include <fstream>
#include "../mpi.hpp"
namespace diy
{
namespace io
{
class SharedOutFile: public std::ostringstream
{
public:
SharedOutFile(std::string filename, diy::mpi::communicator world, int root = 0):
filename_(filename),
world_(world),
root_(root) {}
~SharedOutFile() { close(); }
void close()
{
auto str = this->str();
std::vector<char> contents(str.begin(), str.end());
if (world_.rank() == root_)
{
std::vector<std::vector<char>> all_contents;
diy::mpi::gather(world_, contents, all_contents, root_);
// write the file serially
std::ofstream out(filename_);
for (auto& contents : all_contents)
out.write(contents.data(), contents.size());
} else
diy::mpi::gather(world_, contents, root_);
}
private:
std::string filename_;
diy::mpi::communicator world_;
int root_;
};
}
}
#endif
...@@ -670,6 +670,8 @@ void ...@@ -670,6 +670,8 @@ void
diy::Master:: diy::Master::
iexchange_(const ICallback<Block>& f) iexchange_(const ICallback<Block>& f)
{ {
auto scoped = prof.scoped("iexchange");
// prepare for next round // prepare for next round
incoming_.erase(exchange_round_); incoming_.erase(exchange_round_);
++exchange_round_; ++exchange_round_;
...@@ -684,10 +686,13 @@ iexchange_(const ICallback<Block>& f) ...@@ -684,10 +686,13 @@ iexchange_(const ICallback<Block>& f)
{ {
for (size_t i = 0; i < size(); i++) // for all blocks for (size_t i = 0; i < size(); i++) // for all blocks
{ {
icommunicate(&iexchange); // TODO: separate comm thread std::thread t(icommunicate); icommunicate(&iexchange); // TODO: separate comm thread std::thread t(icommunicate);
ProxyWithLink cp = proxy(i, &iexchange); ProxyWithLink cp = proxy(i, &iexchange);
prof << "callback";
bool done = f(block<Block>(i), cp); bool done = f(block<Block>(i), cp);
prof >> "callback";
int nundeq_after = 0; int nundeq_after = 0;
int nunenq_after = 0; int nunenq_after = 0;
...@@ -731,6 +736,7 @@ void ...@@ -731,6 +736,7 @@ void
diy::Master:: diy::Master::
comm_exchange(GidSendOrder& gid_order, IExchangeInfo* iexchange) comm_exchange(GidSendOrder& gid_order, IExchangeInfo* iexchange)
{ {
auto scoped = prof.scoped("comm-exchange");
send_outgoing_queues(gid_order, false, iexchange); send_outgoing_queues(gid_order, false, iexchange);
while(nudge()); // kick requests while(nudge()); // kick requests
check_incoming_queues(iexchange); check_incoming_queues(iexchange);
...@@ -832,6 +838,8 @@ void ...@@ -832,6 +838,8 @@ void
diy::Master:: diy::Master::
icommunicate(IExchangeInfo* iexchange) icommunicate(IExchangeInfo* iexchange)
{ {
auto scoped = prof.scoped("icommunicate");
log->debug("Entering icommunicate()"); log->debug("Entering icommunicate()");
// lock out other threads // lock out other threads
...@@ -861,6 +869,8 @@ send_outgoing_queues(GidSendOrder& gid_order, ...@@ -861,6 +869,8 @@ send_outgoing_queues(GidSendOrder& gid_order,
bool remote, // TODO: are remote and iexchange mutually exclusive? If so, use single enum? bool remote, // TODO: are remote and iexchange mutually exclusive? If so, use single enum?
IExchangeInfo* iexchange) IExchangeInfo* iexchange)
{ {
auto scoped = prof.scoped("send-outgoing-queues");
while (inflight_sends().size() < gid_order.limit && !gid_order.empty()) while (inflight_sends().size() < gid_order.limit && !gid_order.empty())
{ {
int from = gid_order.pop(); int from = gid_order.pop();
...@@ -1071,6 +1081,8 @@ void ...@@ -1071,6 +1081,8 @@ void
diy::Master:: diy::Master::
check_incoming_queues(IExchangeInfo* iexchange) check_incoming_queues(IExchangeInfo* iexchange)
{ {
auto scoped = prof.scoped("check-incoming-queues");
mpi::optional<mpi::status> ostatus = comm_.iprobe(mpi::any_source, mpi::any_tag); mpi::optional<mpi::status> ostatus = comm_.iprobe(mpi::any_source, mpi::any_tag);
while (ostatus) while (ostatus)
{ {
......
...@@ -74,7 +74,9 @@ file(const communicator& comm__, const std::string& filename, int mode) ...@@ -74,7 +74,9 @@ file(const communicator& comm__, const std::string& filename, int mode)
: comm_(comm__) : comm_(comm__)
{ {
#ifndef DIY_NO_MPI #ifndef DIY_NO_MPI
MPI_File_open(comm__, const_cast<char*>(filename.c_str()), mode, MPI_INFO_NULL, &fh); int ret = MPI_File_open(comm__, const_cast<char*>(filename.c_str()), mode, MPI_INFO_NULL, &fh);
if (ret)
throw std::runtime_error("DIY cannot open file: " + filename);
#else #else
DIY_UNUSED(comm__); DIY_UNUSED(comm__);
DIY_UNUSED(filename); DIY_UNUSED(filename);
......
...@@ -167,6 +167,51 @@ namespace diy ...@@ -167,6 +167,51 @@ namespace diy
void* block_; void* block_;
Link* link_; Link* link_;
IExchangeInfo* iexchange_; // not used for iexchange presently, but later could trigger some special behavior IExchangeInfo* iexchange_; // not used for iexchange presently, but later could trigger some special behavior
public:
template<class T>
void enqueue(const BlockID& to,
const T& x,
void (*save)(BinaryBuffer&, const T&) = &::diy::save<T>) const
{
diy::Master::Proxy::enqueue(to, x, save);
if (iexchange_)
master()->icommunicate(iexchange_);
}
template<class T>
void enqueue(const BlockID& to,
const T* x,
size_t n,
void (*save)(BinaryBuffer&, const T&) = &::diy::save<T>) const
{
diy::Master::Proxy::enqueue(to, x, n, save);
if (iexchange_)
master()->icommunicate(iexchange_);
}
template<class T>
void dequeue(int from,
T& x,
void (*load)(BinaryBuffer&, T&) = &::diy::load<T>) const
{
// TODO: uncomment if necessary, try first without icommunicating on dequeue
// if (iexchange_)
// master()->icommunicate(iexchange_);
diy::Master::Proxy::dequeue(from, x, load);
}
template<class T>
void dequeue(int from,
T* x,
size_t n,
void (*load)(BinaryBuffer&, T&) = &::diy::load<T>) const
{
// TODO: uncomment if necessary, try first without icommunicating on dequeue
// if (iexchange_)
// master()->icommunicate(iexchange_);
diy::Master::Proxy::dequeue(from, x, n, load);
}
}; };
} // diy namespace } // diy namespace
......
...@@ -65,14 +65,18 @@ struct Profiler ...@@ -65,14 +65,18 @@ struct Profiler
void enter(std::string name) { events.push_back(Event(name, true)); } void enter(std::string name) { events.push_back(Event(name, true)); }
void exit(std::string name) { events.push_back(Event(name, false)); } void exit(std::string name) { events.push_back(Event(name, false)); }
void output(std::ostream& out) void output(std::ostream& out, std::string prefix = "")
{ {
if (!prefix.empty())
prefix += " ";
for (size_t i = 0; i < events.size(); ++i) for (size_t i = 0; i < events.size(); ++i)
{ {
const Event& e = events[i]; const Event& e = events[i];
auto time = std::chrono::duration_cast<std::chrono::microseconds>(e.stamp - start).count(); auto time = std::chrono::duration_cast<std::chrono::microseconds>(e.stamp - start).count();
fmt::print(out, "{:02d}:{:02d}:{:02d}.{:06d} {}{}\n", fmt::print(out, "{}{:02d}:{:02d}:{:02d}.{:06d} {}{}\n",
prefix,
time/1000000/60/60, time/1000000/60/60,
time/1000000/60 % 60, time/1000000/60 % 60,
time/1000000 % 60, time/1000000 % 60,
...@@ -103,7 +107,7 @@ struct Profiler ...@@ -103,7 +107,7 @@ struct Profiler
void enter(const std::string&) {} void enter(const std::string&) {}
void exit(const std::string&) {} void exit(const std::string&) {}
void output(std::ostream&) {} void output(std::ostream&, std::string = "") {}
void clear() {} void clear() {}
Scoped scoped(std::string) { return Scoped(); } Scoped scoped(std::string) { return Scoped(); }
......
#ifndef DIY_VERSION_HPP
#define DIY_VERSION_HPP
#define DIY_VERSION_MAJOR 3
#define DIY_VERSION_MINOR 5
#define DIY_VERSION_PATCH 0
#endif
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment