Commit 7f291935 authored by pugmire's avatar pugmire
Browse files

Put an upper limit on the number of async recv buffers that are posted.

Fixes to the way that latency is reported in the masterslave algorithm.
Allow latency hiding of a variable number of streamlines.




git-svn-id: http://visit.ilight.com/svn/visit/trunk/src@6757 18c085ea-50e0-402c-830e-de6fd14e8384
parent 0253e000
......@@ -49,7 +49,8 @@ using namespace std;
#ifdef PARALLEL
static int MAX_DOMAIN_PRINT = 30;
static bool MASTER_BALANCING = true;
static bool MASTER_BALANCING = false;
static int LATENCY_SEND_CNT = 2;
int avtMasterSlaveSLAlgorithm::MSG_STATUS = 420003;
int avtMasterSlaveSLAlgorithm::MSG_DONE = 420004;
......@@ -184,6 +185,9 @@ avtMasterSlaveSLAlgorithm::Create(avtStreamlineFilter *slFilter,
//
// Dave Pugmire, Mon Mar 23 12:48:12 EDT 2009
// Change how timings are reported/calculated.
//
// Dave Pugmire, Wed Apr 1 11:21:05 EDT 2009
// Message size and number of receives put in Initialize().
//
// ****************************************************************************
......@@ -197,9 +201,6 @@ avtMasterSlaveSLAlgorithm::avtMasterSlaveSLAlgorithm(avtStreamlineFilter *slFilt
maxCnt = maxCount;
sleepMicroSec = 1;
latencyTimer = -1;
// Msg type, numTerminated, domain vector.
statusMsgSz = 2 + NUM_DOMAINS;
}
// ****************************************************************************
......@@ -226,12 +227,21 @@ avtMasterSlaveSLAlgorithm::~avtMasterSlaveSLAlgorithm()
// Programmer: Dave Pugmire
// Creation: January 27, 2009
//
// Modifications:
//
// Dave Pugmire, Wed Apr 1 11:21:05 EDT 2009
// Message size and number of receives put in Initialize().
//
// ****************************************************************************
void
avtMasterSlaveSLAlgorithm::Initialize(std::vector<avtStreamlineWrapper *> &seedPts)
{
avtParSLAlgorithm::Initialize(seedPts);
int numRecvs = nProcs-1;
if (numRecvs > 64)
numRecvs = 64;
avtParSLAlgorithm::Initialize(seedPts, 2+NUM_DOMAINS, numRecvs);
}
......@@ -253,7 +263,7 @@ avtMasterSlaveSLAlgorithm::Sleep()
{
//debug1<<"Sleep for "<<sleepMicroSec<<" microSec\n";
int sleepTimer = visitTimer->StartTimer();
struct timespec ts = {0, sleepMicroSec*1000};
struct timespec ts = {0, 0};
nanosleep(&ts, 0);
SleepTime.value += visitTimer->StopTimer(sleepTimer, "SleepTimer");
SleepCnt.value++;
......@@ -684,6 +694,12 @@ avtMasterSLAlgorithm::UpdateSlaveStatus(vector<int> &status)
int nTerm = status[2];
workGroupActiveSLs -= nTerm;
if (workGroupActiveSLs < 0)
{
debug1<<"HACK: Need to figure out how the count got messed up!"<<endl;
workGroupActiveSLs = 0;
}
debug1<<"SlaveStatus: "<<src<<" ";
if (NUM_DOMAINS < MAX_DOMAIN_PRINT) debug1<<status;
debug1<<endl;
......@@ -782,8 +798,8 @@ avtMasterSLAlgorithm::PostLoopProcessing()
if (!haveActiveSlaves)
{
debug1<<"HACK: Need to figure out how the count got messed up!"<<endl;
workGroupActiveSLs = 0;
SendStatus(true);
//workGroupActiveSLs = 0;
//SendStatus(true);
}
}
//end hack....
......@@ -1128,10 +1144,12 @@ avtMasterSLAlgorithm::ManageWorkgroup()
domLoaded[d]++;
}
PrintStatus();
//PrintStatus();
if (!slaveUpdate && !masterUpdate)
{
Sleep();
}
else
{
if (slaveUpdate)
......@@ -1141,7 +1159,7 @@ avtMasterSLAlgorithm::ManageWorkgroup()
}
debug1<<endl<<"Post-Mortem"<<endl;
PrintStatus();
//PrintStatus();
slaveUpdate = false;
masterUpdate = false;
......@@ -1331,6 +1349,9 @@ avtMasterSLAlgorithm::SendAllSlavesMsg(vector<int> &msg)
// Latency saving sends can leave us a count of 2 or less. So, consider these
// as slackers.
//
// Dave Pugmire, Wed Apr 1 11:21:05 EDT 2009
// Use LATENCY_SEND_CNT to determine who is out of work.
//
// ****************************************************************************
void
......@@ -1343,7 +1364,7 @@ avtMasterSLAlgorithm::FindSlackers(int oobFactor,
//if oobFactor != -1, find slaves with between 0 and oobFactor OOB streamlines.
for (int i = 0; i < slaveInfo.size(); i++)
if (slaveInfo[i].slLoadedCount <= 2 ||
if (slaveInfo[i].slLoadedCount <= LATENCY_SEND_CNT ||
(slaveInfo[i].justUpdated && checkJustUpdated))
{
if ( oobFactor != -1 &&
......@@ -1764,6 +1785,9 @@ avtMasterSLAlgorithm::Case4(int oobThreshold,
// Dave Pugmire, Sat Mar 28 10:04:01 EDT 2009
// Finish implementation.
//
// Dave Pugmire, Wed Apr 1 11:21:05 EDT 2009
// Limit the number of case5 messages that are sent.
//
// ****************************************************************************
void
......@@ -1855,6 +1879,8 @@ avtMasterSLAlgorithm::Case5(int overworkThreshold, bool domainCheck, int &counte
debug1<<"Case 5: "<<senders[i]<<" ===> "<<receivers[i]<<" "<<doms[i]<<endl;
SendMsg(senders[i], msg);
counter++;
if (i > 1)
break;
}
}
......@@ -1899,6 +1925,39 @@ avtSlaveSLAlgorithm::~avtSlaveSLAlgorithm()
{
}
// ****************************************************************************
// Method: avtSlaveSLAlgorithm::HandleLatencyTimer
//
// Purpose:
// Handle latency timer.
//
// Programmer: Dave Pugmire
// Creation: April 2, 2009
//
// ****************************************************************************
void
avtSlaveSLAlgorithm::HandleLatencyTimer(int activeSLCnt, bool checkMaxLatency)
{
if (latencyTimer == -1 && activeSLCnt == 0)
{
latencyTimer = visitTimer->StartTimer();
debug1<<"++++++++++++++++++++++++++++++++++++++++++Begin latency!\n";
}
else if (latencyTimer != -1 && activeSLCnt > 0)
{
double t = visitTimer->StopTimer(latencyTimer, "Latency");
debug1<<"------------------------------------------End latency: time= "<<t<<endl;
LatencyTime.value += t;
if (checkMaxLatency && t > MaxLatencyTime.value)
MaxLatencyTime.value = t;
latencyTimer = -1;
}
}
// ****************************************************************************
// Method: avtSlaveSLAlgorithm::Initialize
//
......@@ -2059,6 +2118,10 @@ avtSlaveSLAlgorithm::SendStatus(bool forceSend)
// Dave Pugmire, Sat Mar 28 10:04:01 EDT 2009
// Resend status if getting no work from master. This is related to the tmp fix
// above in the master.
//
// Dave Pugmire, Wed Apr 1 11:21:05 EDT 2009
// Latency time was not always being reported accurately. Only send the latency
// saving status update once.
//
// ****************************************************************************
......@@ -2071,10 +2134,9 @@ avtSlaveSLAlgorithm::Execute()
//Send initial status.
SendStatus(true);
Barrier();
int slackerCount = 0;
while ( 1 )
HandleLatencyTimer(activeSLs.size());
while (1)
{
//Fill oobSLs list.
list<avtStreamlineWrapper *>::iterator si = activeSLs.begin();
......@@ -2088,32 +2150,31 @@ avtSlaveSLAlgorithm::Execute()
else
++si;
}
bool done = false, newMsgs = false, forceSend = false;
bool sentLatencySavingStatus = false;
if (latencyTimer != -1 && !activeSLs.empty())
{
double t = visitTimer->StopTimer(latencyTimer, "Latency");
debug1<<"End latency: time= "<<t<<endl;
LatencyTime.value += t;
if (t > MaxLatencyTime.value)
MaxLatencyTime.value = t;
latencyTimer = -1;
}
bool done = false, newMsgs = false;
slackerCount++;
HandleLatencyTimer(activeSLs.size());
while (!activeSLs.empty() && !done)
{
slackerCount = 0;
avtStreamlineWrapper *s = activeSLs.front();
activeSLs.pop_front();
if (activeSLs.size() <= 1)
forceSend = false;
if (activeSLs.size() <= LATENCY_SEND_CNT)
{
debug1<<"Latency saving sendStatus"<<endl;
LatencySavingCnt.value++;
SendStatus();
if( !sentLatencySavingStatus)
{
SendStatus(true);
LatencySavingCnt.value++;
debug1<<"Latency saving sendStatus"<<endl;
}
forceSend = true;
sentLatencySavingStatus = true;
}
else
sentLatencySavingStatus = false;
activeSLs.pop_front();
debug1<<"Integrate "<<s->domain<<".....";
IntegrateStreamline(s);
if (s->status == avtStreamlineWrapper::TERMINATE)
......@@ -2149,32 +2210,17 @@ avtSlaveSLAlgorithm::Execute()
if (done)
break;
SendStatus();
SendStatus(forceSend);
//Nothing to do, take a snooze....
if (!workToDo)
{
if (slackerCount > 100)
{
SendStatus(true);
slackerCount = 0;
}
if (latencyTimer == -1)
{
latencyTimer = visitTimer->StartTimer();
debug1<<"Begin latency!\n";
}
Sleep();
}
}
if (latencyTimer != -1)
{
double t = visitTimer->StopTimer(latencyTimer, "Latency");
debug1<<"End latency: time= "<<t<<endl;
LatencyTime.value += t;
latencyTimer = -1;
}
HandleLatencyTimer(0,false);
CheckPendingSendRequests();
debug1<<"Slave done: activeSLs= "<<activeSLs.size()<<" oobSLs= "<<oobSLs.size()<<endl;
if (!activeSLs.empty())
......@@ -2519,7 +2565,10 @@ SlaveInfo::Update( vector<int> &status, bool debug )
//
// Dave Pugmire, Wed Mar 25 10:04:29 EDT 2009
// Control print information for large domain problems.
//
//
// Dave Pugmire, Wed Apr 1 11:21:05 EDT 2009
// Print slave status based on LATENCY_SEND_CNT.
//
// ****************************************************************************
void
......@@ -2547,7 +2596,7 @@ SlaveInfo::Debug()
if (justUpdated)
{
debug1<<" ***";
if (slLoadedCount == 0)
if (slLoadedCount <= LATENCY_SEND_CNT)
debug1<<" SLACKER: "<<rank;
else
debug1<<" UPDATE: "<<rank;
......
......@@ -93,15 +93,15 @@ class avtMasterSlaveSLAlgorithm : public avtParSLAlgorithm
int DomToIdx(const DomainType &dom) const
{
int n = dom.domain/numTimeSteps + dom.timeStep;
debug5<<"numTS= "<<numTimeSteps<<endl;
debug5<<"dom "<<dom<<" ==> "<<n<<endl;
//debug5<<"numTS= "<<numTimeSteps<<endl;
//debug5<<"dom "<<dom<<" ==> "<<n<<endl;
return n;
}
DomainType IdxToDom(const int &num) const
{
DomainType d(num/numTimeSteps + num%numTimeSteps);
debug5<<"idx "<<num<<" ==> "<<d<<endl;
//debug5<<"idx "<<num<<" ==> "<<d<<endl;
return d;
}
......@@ -218,7 +218,10 @@ class avtMasterSLAlgorithm : public avtMasterSlaveSLAlgorithm
// Modifications:
//
// Dave Pugmire, Mon Feb 23 13:38:49 EST 2009
// Add timeout counter for slaves.
// Add timeout counter for slaves.
//
// Dave Pugmire, Wed Apr 1 11:21:05 EDT 2009
// Add HandleLatencyTimer method.
//
// ****************************************************************************
......@@ -242,6 +245,8 @@ class avtSlaveSLAlgorithm : public avtMasterSlaveSLAlgorithm
std::list<avtStreamlineWrapper *> activeSLs, oobSLs;
void ProcessMessages(bool &done, bool &newMsg);
void HandleLatencyTimer(int activeSLCnt,
bool checkMaxLatency=true);
};
......
......@@ -61,6 +61,9 @@ using namespace std;
// Dave Pugmire, Fri Feb 6 08:43:00 EST 2009
// Change numTerminated to numSLChange.
//
// Dave Pugmire, Wed Apr 1 11:21:05 EDT 2009
// Message size and number of receives moved to Initialize().
//
// ****************************************************************************
avtParDomSLAlgorithm::avtParDomSLAlgorithm(avtStreamlineFilter *slFilter,
......@@ -69,7 +72,6 @@ avtParDomSLAlgorithm::avtParDomSLAlgorithm(avtStreamlineFilter *slFilter,
{
numSLChange = 0;
totalNumStreamlines = 0;
statusMsgSz = 1;
maxCnt = maxCount;
}
......@@ -105,13 +107,20 @@ avtParDomSLAlgorithm::~avtParDomSLAlgorithm()
// Dave Pugmire, Tue Mar 10 12:41:11 EDT 2009
// Generalized domain to include domain/time. Pathine cleanup.
//
// Dave Pugmire, Wed Apr 1 11:21:05 EDT 2009
// Message size and number of receives moved to Initialize().
//
// ****************************************************************************
void
avtParDomSLAlgorithm::Initialize(vector<avtStreamlineWrapper *> &seedPts)
{
avtParSLAlgorithm::Initialize(seedPts);
int numRecvs = nProcs-1;
if (numRecvs > 64)
numRecvs = 64;
avtParSLAlgorithm::Initialize(seedPts, 1, numRecvs);
totalNumStreamlines = seedPts.size();
numSLChange = 0;
......@@ -188,32 +197,6 @@ avtParDomSLAlgorithm::ExchangeTermination()
}
}
// ****************************************************************************
// Method: avtParDomSLAlgorithm::ExchangeSLs
//
// Purpose:
// Send/recv streamlines.
//
// Programmer: Dave Pugmire
// Creation: January 27, 2009
//
// Modifications:
//
// Dave Pugmire, Fri Feb 6 08:43:00 EST 2009
// Change numTerminated to numSLChange.
//
// ****************************************************************************
void
avtParDomSLAlgorithm::ExchangeSLs(
vector<vector<avtStreamlineWrapper *> > &distributeSLs)
{
int earlyTerminations = 0;
avtParSLAlgorithm::ExchangeSLs(activeSLs, distributeSLs, earlyTerminations);
numSLChange -= earlyTerminations;
}
// ****************************************************************************
// Method: avtParDomSLAlgorithm::Execute
//
......@@ -233,6 +216,9 @@ avtParDomSLAlgorithm::ExchangeSLs(
// Dave Pugmire, Fri Feb 6 08:43:00 EST 2009
// Change numTerminated to numSLChange.
//
// Dave Pugmire, Wed Apr 1 11:21:05 EDT 2009
// Remove ExchangeSLs() method.
//
// ****************************************************************************
void
......@@ -241,8 +227,6 @@ avtParDomSLAlgorithm::Execute()
debug1<<"avtParDomSLAlgorithm::Execute()\n";
int timer = visitTimer->StartTimer();
vector< vector< avtStreamlineWrapper *> > distributeStreamlines(nProcs);
while (totalNumStreamlines > 0)
{
//Integrate upto maxCnt streamlines.
......@@ -260,12 +244,16 @@ avtParDomSLAlgorithm::Execute()
numSLChange--;
}
else
HandleOOBSL(s, distributeStreamlines);
HandleOOBSL(s);
cnt++;
}
ExchangeSLs(distributeStreamlines);
//Check for new SLs.
int earlyTerminations = 0;
RecvSLs(activeSLs, earlyTerminations);
numSLChange -= earlyTerminations;
ExchangeTermination();
CheckPendingSendRequests();
}
......@@ -290,11 +278,13 @@ avtParDomSLAlgorithm::Execute()
// Change numTerminated to numSLChange. Account for SLs 'created' when
// passing one SL multiple times.
//
// Dave Pugmire, Wed Apr 1 11:21:05 EDT 2009
// Send SLs using SendSLs.
//
// ****************************************************************************
void
avtParDomSLAlgorithm::HandleOOBSL(avtStreamlineWrapper *s,
std::vector< std::vector< avtStreamlineWrapper *> > &d)
avtParDomSLAlgorithm::HandleOOBSL(avtStreamlineWrapper *s)
{
MemStream buff;
bool deleteSL = true;
......@@ -324,7 +314,10 @@ avtParDomSLAlgorithm::HandleOOBSL(avtStreamlineWrapper *s,
buff.rewind();
newS->Serialize(MemStream::READ, buff, GetSolver());
newS->domain = s->seedPtDomainList[i];
d[domRank].push_back(newS);
vector<avtStreamlineWrapper *> sls;
sls.push_back(newS);
SendSLs(domRank, sls);
}
}
......
......@@ -65,6 +65,9 @@
// Dave Pugmire, Fri Feb 6 08:43:00 EST 2009
// Change numTerminated to numSLChange.
//
// Dave Pugmire, Wed Apr 1 11:21:05 EDT 2009
// Remove ExchangeSLs.
//
// ****************************************************************************
class avtParDomSLAlgorithm : public avtParSLAlgorithm
......@@ -80,10 +83,8 @@ class avtParDomSLAlgorithm : public avtParSLAlgorithm
protected:
void ExchangeTermination();
void HandleOOBSL(avtStreamlineWrapper *s,
std::vector< std::vector< avtStreamlineWrapper *> >&);
void ExchangeSLs(
std::vector<std::vector<avtStreamlineWrapper *> >&);
void HandleOOBSL(avtStreamlineWrapper *s);
int numSLChange, totalNumStreamlines;
std::list<avtStreamlineWrapper *> activeSLs;
......
......@@ -61,6 +61,9 @@ int avtParSLAlgorithm::STREAMLINE_TAG = 420001;
//
// Dave Pugmire, Mon Mar 23 12:48:12 EDT 2009
// Change how timings are reported/calculated.
//
// Dave Pugmire, Wed Apr 1 11:21:05 EDT 2009
// Limit the number of async recvs outstanding.
//
// ****************************************************************************
......@@ -70,7 +73,9 @@ avtParSLAlgorithm::avtParSLAlgorithm(avtStreamlineFilter *slFilter)
{
nProcs = PAR_Size();
rank = PAR_Rank();
statusMsgSz = 1;
msgID = 0;
statusMsgSz = -1;
numAsyncRecvs = -1;
slMsgSz = 10*1024*1024;
}
......@@ -98,11 +103,25 @@ avtParSLAlgorithm::~avtParSLAlgorithm()
// Programmer: Dave Pugmire
// Creation: January 27, 2009
//
// Modifications:
//
// Dave Pugmire, Wed Apr 1 11:21:05 EDT 2009
// Limit the number of async recvs outstanding.
//
// ****************************************************************************
void
avtParSLAlgorithm::Initialize(vector<avtStreamlineWrapper *> &seedPts)
avtParSLAlgorithm::Initialize(vector<avtStreamlineWrapper *> &seedPts,
int msgSz,
int numRecvs)
{
//Standardmsg + 1(sender rank) +1(msg ID).
statusMsgSz = msgSz+1+1;
numAsyncRecvs = numRecvs;
if (statusMsgSz <= 0 || numAsyncRecvs <= 0)
EXCEPTION0(ImproperUseException);
avtSLAlgorithm::Initialize(seedPts);
InitRequests();
}
......@@ -134,24 +153,23 @@ avtParSLAlgorithm::PostExecute()
// Programmer: Dave Pugmire
// Creation: June 16, 2008
//
// Dave Pugmire, Wed Apr 1 11:21:05 EDT 2009
// Limit the number of async recvs outstanding.
//
// ****************************************************************************
void
avtParSLAlgorithm::InitRequests()
{
debug5<<"avtParSLAlgorithm::InitRequests()\n";
statusRecvRequests.resize(nProcs, MPI_REQUEST_NULL);
slRecvRequests.resize(nProcs, MPI_REQUEST_NULL);
for (int i = 0; i < nProcs; i++)
debug5<<"avtParSLAlgorithm::InitRequests() sz= "<<numAsyncRecvs<<endl;
statusRecvRequests.resize(numAsyncRecvs, MPI_REQUEST_NULL);
slRecvRequests.resize(numAsyncRecvs, MPI_REQUEST_NULL);
for (int i = 0; i < statusRecvRequests.size(); i++)
{
if (i != rank)
{
PostRecvStatusReq(i);
PostRecvSLReq(i);
}
PostRecvStatusReq(i);
PostRecvSLReq(i);
}
}
// ****************************************************************************
......@@ -335,18 +353,22 @@ avtParSLAlgorithm::CheckPendingSendRequests()
// Programmer: Dave Pugmire
// Creation: June 16, 2008
//
// Dave Pugmire, Wed Apr 1 11:21:05 EDT 2009
// Limit the number of async recvs outstanding.
//
// ****************************************************************************
void
avtParSLAlgorithm::PostRecvStatusReq(int proc)
avtParSLAlgorithm::PostRecvStatusReq(int idx)
{
MPI_Request req;
int *buff = new int[statusMsgSz];
MPI_Irecv(buff, statusMsgSz, MPI_INT, proc,
MPI_Irecv(buff, statusMsgSz, MPI_INT, MPI_ANY_SOURCE,
avtParSLAlgorithm::STATUS_TAG,
VISIT_MPI_COMM, &req);
debug5 << "Post Statusrecv from " << proc<<" req= "<<req<<endl;
statusRecvRequests[proc] = req;
debug5 << "Post Statusrecv " <<idx<<" req= "<<req<<endl;
statusRecvRequests[idx] = req;
recvIntBufferMap[req] = buff;
}
......@@ -359,20 +381,25 @@ avtParSLAlgorithm::PostRecvStatusReq(int proc)
// Programmer: Dave Pugmire
// Creation: June 16, 2008
//
// Modifications:
//
// Dave Pugmire, Wed Apr 1 11:21:05 EDT 2009
// Limit the number of async recvs outstanding.
//
// ****************************************************************************