Commit 6217a332 authored by Burlen Loring's avatar Burlen Loring

memory profiler

* add a sampling memory profiler. uses RSS. outputs using
  MPI I/O in csv format
* add MPI I/O csv output to timer log, and include start
  and end times.
* add config from environment variables
parent d88d2377
......@@ -65,8 +65,10 @@ int main(int argc, char **argv)
return showHelp ? 0 : 1;
}
timer::SetLogging(log || shortlog);
timer::SetTrackSummariesOverTime(shortlog);
if (log | shortlog)
timer::Enable(shortlog);
timer::Initialize();
SENSEI_STATUS("Opening: \"" << input.c_str() << "\" using method \""
<< readmethod.c_str() << "\"")
......@@ -134,7 +136,7 @@ int main(int argc, char **argv)
dataAdaptor = nullptr;
analysisAdaptor = nullptr;
timer::PrintLog(std::cout, comm);
timer::Finalize();
MPI_Finalize();
......
......@@ -18,6 +18,7 @@
#include "senseiConfig.h"
#ifdef ENABLE_SENSEI
#include "bridge.h"
#include "MemoryProfiler.h"
#else
#include "analysis.h"
#endif
......@@ -45,7 +46,7 @@ inline bool IsVertexInsideBounds(const Vertex& v, const Bounds& b)
struct Block
{
Block(int gid_, const Bounds& bounds_, const Bounds& domain_, const Oscillators& oscillators_, float velocity_scale_):
Block(int gid_, const Bounds& bounds_, const Bounds& domain_, const Oscillators& oscillators_, float velocity_scale_):
gid(gid_),
velocity_scale(velocity_scale_),
bounds(bounds_),
......@@ -55,7 +56,7 @@ struct Block
{
}
void advance(float t)
void advance(float t)
{
grid::for_each(grid.shape(), [&](const Vertex& v)
{
......@@ -253,9 +254,10 @@ int main(int argc, char** argv)
std::default_random_engine rng(static_cast<RandomSeedType>(seed));
for (int i = 0; i < world.rank(); ++i) rng(); // different seed for each rank
if (log || shortlog)
timer::Enable(shortlog);
timer::SetLogging(log || shortlog);
timer::SetTrackSummariesOverTime(shortlog);
timer::Initialize();
timer::MarkStartEvent("oscillators::initialize");
Oscillators oscillators;
......@@ -434,10 +436,11 @@ int main(int argc, char** argv)
#endif
timer::MarkEndEvent("oscillators::finalize");
timer::Finalize();
auto duration = std::chrono::duration_cast<ms>(Time::now() - start);
if (world.rank() == 0)
{
fmt::print("Total run time: {}.{} s\n", duration.count() / 1000, duration.count() % 1000);
}
timer::PrintLog(std::cout, world);
}
......@@ -6,7 +6,7 @@
#include <vtkDataObject.h>
#include <vector>
#include <timer/Timer.h>
#include "Timer.h"
namespace BridgeInternals
{
......@@ -20,6 +20,8 @@ void bridge_initialize(MPI_Comm comm, int g_x, int g_y, int g_z,
uint64_t start_extents_z, int tot_blocks_x, int tot_blocks_y, int tot_blocks_z,
int block_id_x, int block_id_y, int block_id_z, const char* config_file)
{
timer::Initialize();
BridgeInternals::GlobalDataAdaptor = vtkSmartPointer<parallel3d::DataAdaptor>::New();
BridgeInternals::GlobalDataAdaptor->SetCommunicator(comm);
......@@ -45,12 +47,12 @@ void bridge_update(int tstep, double time, double *pressure, double* temperature
}
//-----------------------------------------------------------------------------
void bridge_finalize(MPI_Comm comm)
void bridge_finalize()
{
BridgeInternals::GlobalAnalysisAdaptor->Finalize();
BridgeInternals::GlobalAnalysisAdaptor = nullptr;
BridgeInternals::GlobalDataAdaptor = nullptr;
timer::PrintLog(std::cout, comm);
timer::Finalize();
}
......@@ -7,6 +7,7 @@
#ifdef __cplusplus
extern "C" {
#endif
/// This defines the analysis bridge for parallel_3d miniapp.
/// Called before simulation loop
......@@ -19,7 +20,7 @@ void bridge_initialize(MPI_Comm comm, int g_x, int g_y, int g_z,
void bridge_update(int tstep, double time, double *pressure, double* temperature, double* density);
/// Called just before simulation terminates.
void bridge_finalize(MPI_Comm comm);
void bridge_finalize();
#ifdef __cplusplus
} // extern "C"
......
......@@ -250,7 +250,7 @@ int main(int argc, char **argv)
/////////////////////////////
#ifdef ENABLE_SENSEI
bridge_finalize(MPI_COMM_WORLD);
bridge_finalize();
if (config_file) {
free(config_file);
......
......@@ -190,23 +190,28 @@ int ConfigurableAnalysis::InternalsType::TimeInitialization(
AnalysisAdaptorPtr adaptor, std::function<int()> initializer)
{
const char* analysisName = nullptr;
if (timer::GetLogging())
bool logEnabled = timer::Enabled();
if (logEnabled)
{
std::ostringstream initName;
std::ostringstream execName;
std::ostringstream finiName;
auto analysisNumber = this->Analyses.size();
initName << adaptor->GetClassName() << "::" << analysisNumber << "::initialize";
execName << adaptor->GetClassName() << "::" << analysisNumber << "::execute";
finiName << adaptor->GetClassName() << "::" << analysisNumber << "::finalize";
initName << adaptor->GetClassName() << "::" << analysisNumber << "::Initialize";
execName << adaptor->GetClassName() << "::" << analysisNumber << "::Execute";
finiName << adaptor->GetClassName() << "::" << analysisNumber << "::Finalize";
this->LogEventNames.push_back(initName.str());
this->LogEventNames.push_back(execName.str());
this->LogEventNames.push_back(finiName.str());
analysisName = this->LogEventNames[3 * analysisNumber].c_str();
timer::MarkStartEvent(analysisName);
}
timer::MarkStartEvent(analysisName);
int result = initializer();
timer::MarkEndEvent(analysisName);
if (logEnabled)
timer::MarkEndEvent(analysisName);
return result;
}
......@@ -243,8 +248,8 @@ int ConfigurableAnalysis::InternalsType::AddHistogram(pugi::xml_node node)
this->Analyses.push_back(histogram.GetPointer());
SENSEI_STATUS("Configured histogram with " << bins
<< " " << assocStr << "data array " << array
<< " on mesh " << mesh)
<< " bins on " << assocStr << " data array \"" << array
<< "\" on mesh \"" << mesh << "\"")
return 0;
}
......@@ -905,6 +910,8 @@ int ConfigurableAnalysis::SetCommunicator(MPI_Comm comm)
//----------------------------------------------------------------------------
int ConfigurableAnalysis::Initialize(const std::string& filename)
{
timer::MarkEvent("ConfigurableAnalysis::Initialize");
int rank = 0;
MPI_Comm_rank(this->GetCommunicator(), &rank);
......@@ -918,6 +925,7 @@ int ConfigurableAnalysis::Initialize(const std::string& filename)
int rv = 0;
pugi::xml_node root = doc.child("sensei");
for (pugi::xml_node node = root.child("analysis");
node; node = node.next_sibling("analysis"))
{
......@@ -949,28 +957,34 @@ int ConfigurableAnalysis::Initialize(const std::string& filename)
//----------------------------------------------------------------------------
bool ConfigurableAnalysis::Execute(DataAdaptor* data)
{
timer::MarkEvent("ConfigurableAnalysis::Execute");
int rank = 0;
MPI_Comm_rank(this->GetCommunicator(), &rank);
int rv = 0;
int ai = 0;
const char* analysisName = nullptr;
AnalysisAdaptorVector::iterator iter = this->Internals->Analyses.begin();
AnalysisAdaptorVector::iterator end = this->Internals->Analyses.end();
for (; iter != end; ++iter, ++ai)
{
if (timer::GetLogging())
const char* analysisName = nullptr;
bool logEnabled = timer::Enabled();
if (logEnabled)
{
analysisName = this->Internals->LogEventNames[3 * ai + 1].c_str();
timer::MarkStartEvent(analysisName);
}
timer::MarkStartEvent(analysisName);
if (!(*iter)->Execute(data))
{
if (rank == 0)
SENSEI_ERROR("Failed to execute " << (*iter)->GetClassName())
rv -= 1;
}
timer::MarkEndEvent(analysisName);
if (logEnabled)
timer::MarkEndEvent(analysisName);
}
return rv < 0 ? false : true;
......@@ -979,33 +993,34 @@ bool ConfigurableAnalysis::Execute(DataAdaptor* data)
//----------------------------------------------------------------------------
int ConfigurableAnalysis::Finalize()
{
timer::MarkEvent("ConfigurableAnalysis::Finalize");
int rank = 0;
MPI_Comm_rank(this->GetCommunicator(), &rank);
int rv = 0;
int ai = 0;
const char* analysisName = nullptr;
AnalysisAdaptorVector::iterator iter = this->Internals->Analyses.begin();
AnalysisAdaptorVector::iterator end = this->Internals->Analyses.end();
for (; iter != end; ++iter, ++ai)
{
if (timer::GetLogging())
bool logEnabled = timer::Enabled();
const char* analysisName = nullptr;
if (logEnabled)
{
analysisName = this->Internals->LogEventNames[3 * ai + 2].c_str();
timer::MarkStartEvent(analysisName);
}
timer::MarkStartEvent(analysisName);
if ((*iter)->Finalize())
{
SENSEI_ERROR("Failed to finalize " << (*iter)->GetClassName())
rv -= 1;
}
timer::MarkEndEvent(analysisName);
}
if (timer::GetLogging())
{
timer::PrintLog(std::cout, this->GetCommunicator());
}
if (logEnabled)
timer::MarkEndEvent(analysisName);
}
return rv;
}
......
add_library(timer STATIC Timer.cxx)
add_library(timer STATIC MemoryProfiler.cxx Timer.cxx)
target_include_directories(timer SYSTEM INTERFACE
$<BUILD_INTERFACE:${CMAKE_CURRENT_SOURCE_DIR}>
$<INSTALL_INTERFACE:include/timer>)
target_link_libraries(timer mpi)
target_link_libraries(timer vtk mpi)
install(DIRECTORY "${CMAKE_CURRENT_SOURCE_DIR}/"
DESTINATION include/timer FILES_MATCHING PATTERN "*.h")
......
#include "MemoryProfiler.h"
#include <vtksys/SystemInformation.hxx>
#include <vector>
#include <deque>
#include <sstream>
#include <sys/time.h>
#include <cstring>
#include <errno.h>
#include <iostream>
#include <cmath>
#include <iomanip>
#include <limits>
namespace timer
{
struct MemoryProfiler::InternalsType
{
InternalsType() : Comm(MPI_COMM_WORLD), FileName("MemProf.csv"),
Interval(60.0), DataMutex(PTHREAD_MUTEX_INITIALIZER)
{
}
MPI_Comm Comm;
std::string FileName;
double Interval;
std::deque<long long> MemUse;
std::deque<double> TimePt;
pthread_t Thread;
pthread_mutex_t DataMutex;
vtksys::SystemInformation SysInfo;
};
extern "C" void *Profile(void *argp)
{
timer::MemoryProfiler::InternalsType *internals =
reinterpret_cast<timer::MemoryProfiler::InternalsType*>(argp);
while (1)
{
// capture the current time and memory usage.
struct timeval tv;
gettimeofday(&tv, nullptr);
double curTime = tv.tv_sec + tv.tv_usec/1.0e6;
long long curMem = internals->SysInfo.GetProcMemoryUsed();
pthread_mutex_lock(&internals->DataMutex);
// log time and mem use
internals->TimePt.push_back(curTime);
internals->MemUse.push_back(curMem);
// get next interval
double interval = internals->Interval;
pthread_mutex_unlock(&internals->DataMutex);
// check for shut down code
if (interval < 0)
pthread_exit(nullptr);
// suspend the thread for the requested interval
long long secs = floor(interval);
long nsecs = (interval - secs)*1e9;
struct timespec sleepTime = {secs, nsecs};
int ierr = 0;
int tries = 0;
while ((ierr = nanosleep(&sleepTime, &sleepTime)) && (errno == EINTR) && (++tries < 1000));
if (ierr)
{
const char *estr = strerror(errno);
std::cerr << "Error: nanosleep had an error \"" << estr << "\"" << std::endl;
abort();
}
}
return nullptr;
}
// --------------------------------------------------------------------------
MemoryProfiler::MemoryProfiler()
{
this->Internals = new InternalsType;
}
// --------------------------------------------------------------------------
MemoryProfiler::~MemoryProfiler()
{
delete this->Internals;
}
// --------------------------------------------------------------------------
int MemoryProfiler::Initialize()
{
if (pthread_create(&this->Internals->Thread,
nullptr, Profile, this->Internals))
{
const char *estr = strerror(errno);
std::cerr << "Error: Failed to create memory profiler. "
<< estr << std::endl;
return -1;
}
return 0;
}
// --------------------------------------------------------------------------
int MemoryProfiler::Finalize()
{
int rank = 0;
int nRanks = 1;
MPI_Comm_rank(this->Internals->Comm, &rank);
MPI_Comm_size(this->Internals->Comm, &nRanks);
pthread_mutex_lock(&this->Internals->DataMutex);
// tell the thread to quit
this->Internals->Interval = -1;
// create the ascii buffer
// use ascii in the file as a convenince
std::ostringstream oss;
oss.precision(std::numeric_limits<double>::digits10 + 2);
oss.setf(std::ios::scientific, std::ios::floatfield);
if (rank == 0)
oss << "# rank, time, memory kiB" << std::endl;
long nElem = this->Internals->MemUse.size();
for (long i = 0; i < nElem; ++i)
{
oss << rank << ", " << this->Internals->TimePt[i]
<< ", " << this->Internals->MemUse[i] << std::endl;
}
// free resources
this->Internals->TimePt.clear();
this->Internals->MemUse.clear();
pthread_mutex_unlock(&this->Internals->DataMutex);
// cancle the profiler thread
pthread_cancel(this->Internals->Thread);
// compute the file offset
long nBytes = oss.str().size();
std::vector<long> gsizes(nRanks);
gsizes[rank] = nBytes;
MPI_Allgather(MPI_IN_PLACE, 0, MPI_DATATYPE_NULL,
gsizes.data(), 1, MPI_LONG, this->Internals->Comm);
long offset = 0;
for (int i = 0; i < rank; ++i)
offset += gsizes[i];
long fileSize = 0;
for (int i = 0; i < nRanks; ++i)
fileSize += gsizes[i];
// write the buffer
MPI_File fh;
MPI_File_open(this->Internals->Comm, this->Internals->FileName.c_str(),
MPI_MODE_CREATE|MPI_MODE_WRONLY, MPI_INFO_NULL, &fh);
MPI_File_set_view(fh, offset, MPI_BYTE, MPI_BYTE,
"native", MPI_INFO_NULL);
MPI_File_write(fh, oss.str().c_str(), nBytes,
MPI_BYTE, MPI_STATUS_IGNORE);
MPI_File_set_size(fh, fileSize);
MPI_File_close(&fh);
// wait for the proiler thread to finish
pthread_join(this->Internals->Thread, nullptr);
return 0;
}
// --------------------------------------------------------------------------
double MemoryProfiler::GetInterval() const
{
return this->Internals->Interval;
}
// --------------------------------------------------------------------------
void MemoryProfiler::SetInterval(double interval)
{
pthread_mutex_lock(&this->Internals->DataMutex);
this->Internals->Interval = interval;
pthread_mutex_unlock(&this->Internals->DataMutex);
}
// --------------------------------------------------------------------------
void MemoryProfiler::SetCommunicator(MPI_Comm comm)
{
this->Internals->Comm = comm;
}
// --------------------------------------------------------------------------
void MemoryProfiler::SetFileName(const std::string &fileName)
{
this->Internals->FileName = fileName;
}
// --------------------------------------------------------------------------
const char *MemoryProfiler::GetFileName() const
{
return this->Internals->FileName.c_str();
}
}
#ifndef MemoryProfiler_h
#define MemoryProfiler_h
#include <mpi.h>
#include <string>
namespace timer
{
extern "C" void *Profile(void *argp);
/// MemoryProfiler - A sampling memory use profiler
/**
The class samples process memory usage at the specified interval
given in seconds. For each sample the time is aquired. Calling
Initialize starts profiling, and Finalize ends it. During
Finaliziation the buffers are written using MPI-I/O to the
file name provided
*/
class MemoryProfiler
{
public:
MemoryProfiler(const MemoryProfiler &) = delete;
void operator=(const MemoryProfiler &) = delete;
MemoryProfiler();
~MemoryProfiler();
int Initialize();
int Finalize();
/// Set the interval in seconds between querrying
/// the processes memory use.
void SetInterval(double interval);
double GetInterval() const;
/// Set the comunicator for parallel I/O
void SetCommunicator(MPI_Comm comm);
/// Set the file name to write the data to
void SetFileName(const std::string &fileName);
const char *GetFileName() const;
friend void *timer::Profile(void *argp);
private:
struct InternalsType;
InternalsType *Internals;
};
}
#endif
#include "Timer.h"
#include "TimerC.h"
#include "MemoryProfiler.h"
#include <vtksys/SystemInformation.hxx>
#include <sys/time.h>
#include <fstream>
......@@ -8,10 +10,14 @@
#include <sstream>
#include <stdint.h>
#include <strings.h>
#include <cstdlib>
#include <cstdlib>
#include <map>
#include <list>
#include <vector>
#include <iomanip>
#include <limits>
using std::endl;
using std::cerr;
......@@ -20,166 +26,467 @@ namespace timer
{
namespace impl
{
static double get_mark()
// return high res system time relative to system epoch
static double getSystemTime()
{
struct timeval tv;
gettimeofday(&tv, nullptr);
return tv.tv_sec + tv.tv_usec/1.0e6;
}
// helper for pretty printing events
struct Indent
{
int Count;
explicit Indent(int indent=0): Count(indent) {}
Indent GetNextIndent() const
{ return Indent(this->Count+1); }
};
std::ostream &operator<<(std::ostream &os, const Indent &i)
{
for (int cc=0; cc < i.Count; ++cc)
os << " ";
return os;
}
// container for data captured in a timing event
struct Event
{
Event();
bool Empty() const
{ return this->Count < 1; }
// merges or computes summary of the two events
void AddToSummary(const Event& other);
// prints the log in a human readbale format
void PrettyPrint(std::ostream& stream, Indent indent) const;
// serializes the event in CSV format into the stream.
void ToStream(std::ostream &str) const;
enum { START=0, END=1, DELTA=2 }; // record fields
enum { MIN=0, MAX=1, SUM=2 }; // summary fields
// user provided identifier for the record
std::string Name;
// for nesting of events
std::list<Event> SubEvents;
// event duration, initially start time, end time and duration
// are recorded. when summarizing this contains min,max and sum
// of the summariezed set of events
double Time[3];
// number of events in the summary, or 1 for a single event
int Count;
};
// timer controls and data
static MPI_Comm Comm = MPI_COMM_NULL;
static bool LoggingEnabled = false;
static bool Summarize = false;
static int SummaryModulus = 100000000;
static std::string TimerLogFile = "Timer.csv";
static std::list<Event> Mark;
static std::list<Event> GlobalEvents;
static int ActiveTimeStep = -1;
static double ActiveTime = 0.0;
// memory profiler
static MemoryProfiler MemProf;
// --------------------------------------------------------------------------
Event::Event() : Count(0)
{
this->Time[0] = this->Time[1] = this->Time[2] = 0;
}
// --------------------------------------------------------------------------
void Event::AddToSummary(const Event& other)
{
// convert or add to summary
// first three cases handle conversion of this or other or both
// into the summary form. the last case merges the summaries
if ((this->Count == 1) && (other.Count == 1))
{
this->Time[MIN] = std::min(this->Time[DELTA], other.Time[DELTA]);
this->Time[MAX] = std::max(this->Time[DELTA], other.Time[DELTA]);
this->Time[SUM] = this->Time[DELTA] + other.Time[DELTA];
}
else if (this->Count == 1)
{
struct timeval tv;
gettimeofday(&tv, NULL);
return tv.tv_sec + tv.tv_usec/1.0e6;
this->Time[MIN] = std::min(this->Time[DELTA], other.Time[MIN]);
this->Time[MAX] = std::max(this->Time[DELTA], other.Time[MAX]);
this->Time[SUM] = this->Time[DELTA] + other.Time[SUM];
}
else if (other.Count == 1)
{
this->Time[MIN] = std::min(this->Time[MIN], other.Time[DELTA]);
this->Time[MAX] = std::max(this->Time[MAX], other.Time[DELTA]);
this->Time[SUM] = this->Time[SUM] + other.Time[DELTA];
}
else
{
this->Time[MIN] = std::min(this->Time[MIN], other.Time[MIN]);
this->Time[MAX] = std::max(this->Time[MAX], other.Time[MAX]);
this->Time[SUM] += other.Time[SUM];
}
this->Count += other.Count;
// process nested events
if (this->SubEvents.size() == other.SubEvents.size())
{
auto it = this->SubEvents.begin();
auto end = this->SubEvents.end();
auto oit = other.SubEvents.begin();
for (; it != end; ++it, ++oit)
it->AddToSummary(*oit);
}
}
static uint64_t get_vmhwm()
//-----------------------------------------------------------------------------
void Event::ToStream(std::ostream &str) const
{
#ifndef NDEBUG