Commit 9713dee3 authored by pugmire's avatar pugmire
Browse files

Modify the masterSlave streamline algorithm to allow master to master balancing.


git-svn-id: http://visit.ilight.com/svn/visit/trunk/src@6570 18c085ea-50e0-402c-830e-de6fd14e8384
parent 7c2d01cf
This diff is collapsed.
......@@ -63,6 +63,9 @@ class SlaveInfo;
// Dave Pugmire, Tue Mar 10 12:41:11 EDT 2009
// Generalized domain to include domain/time. Pathine cleanup.
//
// Dave Pugmire, Wed Mar 18 17:17:40 EDT 2009
// Allow masters to share work loads.
//
// ****************************************************************************
class avtMasterSlaveSLAlgorithm : public avtParSLAlgorithm
......@@ -102,8 +105,10 @@ class avtMasterSlaveSLAlgorithm : public avtParSLAlgorithm
int sleepMicroSec;
void Sleep();
static int MSG_STATUS, MSG_DONE, MSG_SEND_SL, MSG_LOAD_DOMAIN,
MSG_SEND_SL_HINT, MSG_FORCE_SEND_STATUS;
static int MSG_STATUS, MSG_DONE, MSG_SEND_SL,
MSG_LOAD_DOMAIN, MSG_SEND_SL_HINT,
MSG_FORCE_SEND_STATUS, MSG_MASTER_STATUS,
MSG_OFFLOAD_SL;
//Statistics and coutners.
int latencyTimer;
......@@ -133,7 +138,9 @@ class avtMasterSLAlgorithm : public avtMasterSlaveSLAlgorithm
avtMasterSLAlgorithm(avtStreamlineFilter *slFilter,
int maxCount,
int workGrpSz,
std::vector<int> &slaves);
std::vector<int> &slaves,
int master,
std::vector<int> &masters);
virtual ~avtMasterSLAlgorithm();
virtual const char* AlgoName() const {return "MasterSlave";}
......@@ -141,11 +148,25 @@ class avtMasterSLAlgorithm : public avtMasterSlaveSLAlgorithm
virtual void Execute();
protected:
virtual void ProcessMessages();
virtual void PostLoopProcessing();
std::vector<int> status, prevStatus;
virtual void UpdateStatus();
virtual void SendStatus(bool forceSend=false);
virtual void ProcessSlaveUpdate(std::vector<int> &);
virtual void ProcessMasterUpdate(std::vector<int> &);
virtual void ProcessOffloadSL(std::vector<int> &);
virtual void ProcessNewStreamlines();
virtual void ManageWorkgroup();
virtual void ManageSlaves();
virtual void ManageMasters();
virtual void CalculateStatistics();
int totalNumStreamlines, workGroupSz;
int workGroupActiveSLs, workGroupSz;
bool done, slaveUpdate, masterUpdate;
int case1Cnt, case2Cnt, case3Cnt, case4Cnt, case5Cnt;
std::vector<SlaveInfo> slaveInfo;
int master;
std::vector<SlaveInfo> slaveInfo, masterInfo;
std::vector<int> slDomCnts, domLoaded, slackers;
std::list<avtStreamlineWrapper *> activeSLs;
......@@ -155,7 +176,6 @@ class avtMasterSLAlgorithm : public avtMasterSlaveSLAlgorithm
void FindSlackers(int oobFactor=-1,
bool randomize= true,
bool checkJustUpdated=false);
bool UpdateStatus();
bool UpdateSlaveStatus(std::vector<int> &);
void PrintStatus();
......@@ -217,6 +237,11 @@ class avtSlaveSLAlgorithm : public avtMasterSlaveSLAlgorithm
// Programmer: Dave Pugmire
// Creation: Mon Jan 26 13:25:58 EST 2009
//
// Modifications:
//
// Dave Pugmire, Wed Mar 18 17:17:40 EDT 2009
// Allow masters to share work loads.
//
// ****************************************************************************
class SlaveInfo
{
......@@ -231,7 +256,7 @@ class SlaveInfo
void Reset() { justUpdated = false; }
void Debug();
bool justUpdated;
bool justUpdated, initialized;
int canGive, canAccept, slCount, slLoadedCount, slOOBCount, rank;
int domLoadedCount;
vector<int> domainCnt;
......@@ -239,6 +264,24 @@ class SlaveInfo
vector<int> domainHistory;
};
template<class T>
inline std::ostream& operator<<(std::ostream& out, const std::vector<T> &v);
template<class T>
inline std::ostream& operator<<(std::ostream& out, const std::vector<T> &v)
{
out<<"[";
for (int i = 0; i < v.size(); i++)
{
out<<v[i];
if (i != (v.size()-1))
out<<" ";
}
out << "]";
return out;
}
#endif
#endif
......@@ -207,6 +207,11 @@ avtParSLAlgorithm::CleanupAsynchronous()
// Programmer: Dave Pugmire
// Creation: June 16, 2008
//
// Modifications:
//
// Dave Pugmire, Wed Mar 18 17:07:07 EDT 2009
// Delete entry from map after send is complete.
//
// ****************************************************************************
void
avtParSLAlgorithm::CheckPendingSendRequests()
......@@ -249,8 +254,9 @@ avtParSLAlgorithm::CheckPendingSendRequests()
<< (void *)buff << endl;
if (buff)
delete [] buff;
debug5 << "Delete done!\n";
sendSLBufferMap[r] = NULL;
sendSLBufferMap.erase(r);
}
delete [] indices;
......@@ -293,7 +299,9 @@ avtParSLAlgorithm::CheckPendingSendRequests()
<< (void *)buff << endl;
if (buff)
delete [] buff;
sendIntBufferMap[r] = NULL;
sendIntBufferMap.erase(r);
}
delete [] indices;
......@@ -554,7 +562,7 @@ avtParSLAlgorithm::SendSLs(int dst,
int err = MPI_Isend(msg, sz, MPI_UNSIGNED_CHAR, dst,
avtParSLAlgorithm::STREAMLINE_TAG,
VISIT_MPI_COMM, &req);
debug5<<err<<" = MPI_Isend(msg, "<<sz<<", MPI_UNSIGNED_CHAR, "<<dst<<", req= "<<req<<endl;
debug5<<err<<" = MPI_Isend(msg, "<<sz<<", MPI_UNSIGNED_CHAR, to "<<dst<<", req= "<<req<<endl;
sendSLBufferMap[req] = msg;
BytesCnt.value += sz;
......@@ -570,16 +578,18 @@ avtParSLAlgorithm::SendSLs(int dst,
// Recv streamlines.
//
// Programmer: Dave Pugmire
// Creation: June 16, 2008
// Creation: Mon Mar 16 15:45:11 EDT 2009
//
// Modifications:
//
// Dave Pugmire, Wed Mar 18 17:17:40 EDT 2009
// RecvSLs broken into two methods.
//
// ****************************************************************************
int
avtParSLAlgorithm::RecvSLs(list<avtStreamlineWrapper *> &streamlines,
int &earlyTerminations )
avtParSLAlgorithm::RecvSLs(list<avtStreamlineWrapper *> &recvSLs)
{
int slCount = 0;
earlyTerminations = 0;
while (true)
{
......@@ -598,11 +608,11 @@ avtParSLAlgorithm::RecvSLs(list<avtStreamlineWrapper *> &streamlines,
for (int i = 0; i < num; i++)
{
int idx = indices[i];
MPI_Request req = slRecvRequests[idx];
if (req == MPI_REQUEST_NULL)
continue;
//Grab the bytes, unserialize them, add to list.
unsigned char *msg = recvSLBufferMap[req];
recvSLBufferMap.erase(req);
if (msg == NULL)
......@@ -615,33 +625,12 @@ avtParSLAlgorithm::RecvSLs(list<avtStreamlineWrapper *> &streamlines,
size_t numSLs;
buff.read(numSLs);
vector<avtStreamlineWrapper *> recvSLs;
for (int j = 0; j < numSLs; j++)
{
avtStreamlineWrapper *slSeg = new avtStreamlineWrapper;
slSeg->Serialize(MemStream::READ, buff, GetSolver());
recvSLs.push_back(slSeg);
}
// Make sure the streamline is one one of my domains.
for (int j = 0; j < recvSLs.size(); j++)
{
avtStreamlineWrapper *slSeg = recvSLs[j];
avtVector pt;
slSeg->GetEndPoint(pt);
if (PointInDomain(pt, slSeg->domain))
{
streamlines.push_back(slSeg);
slCount++;
}
else
{
// Point not in domain.
delete slSeg;
earlyTerminations++;
}
slCount++;
}
}
......@@ -659,6 +648,56 @@ avtParSLAlgorithm::RecvSLs(list<avtStreamlineWrapper *> &streamlines,
return slCount;
}
// ****************************************************************************
// Method: avtParSLAlgorithm::RecvSLs
//
// Purpose:
// Recv streamlines.
//
// Programmer: Dave Pugmire
// Creation: June 16, 2008
//
// Modifications:
//
// Dave Pugmire, Mon Mar 16 15:45:11 EDT 2009
// Call the other RecvSLs and then check for domain inclusion.
//
// Dave Pugmire, Tue Mar 17 12:02:10 EDT 2009
// Use new new RecvSLs method, then check for terminations.
//
// ****************************************************************************
int
avtParSLAlgorithm::RecvSLs(list<avtStreamlineWrapper *> &streamlines,
int &earlyTerminations )
{
list<avtStreamlineWrapper *> recvSLs;
RecvSLs(recvSLs);
earlyTerminations = 0;
int slCount = 0;
//Check to see if they in this domain.
list<avtStreamlineWrapper *>::iterator s;
for (s = recvSLs.begin(); s != recvSLs.end(); ++s)
{
avtVector pt;
(*s)->GetEndPoint(pt);
if (PointInDomain(pt, (*s)->domain))
{
streamlines.push_back(*s);
slCount++;
}
else
{
// Point not in domain.
delete *s;
earlyTerminations++;
}
}
return slCount;
}
// ****************************************************************************
// Method: avtParSLAlgorithm::ExchangeSLs
......
......@@ -60,6 +60,9 @@
// Dave Pugmire, Thu Feb 12 08:43:01 EST 2009
// Removed ComputeStatistics. (Moved to avtSLAlgorithm)
//
// Dave Pugmire, Tue Mar 17 12:02:10 EDT 2009
// Create a new RecvSLs method that doesn't check for domain inclusion.
//
// ****************************************************************************
class avtParSLAlgorithm : public avtSLAlgorithm
......@@ -72,21 +75,22 @@ class avtParSLAlgorithm : public avtSLAlgorithm
virtual void PostExecute();
protected:
void InitRequests();
void CheckPendingSendRequests();
void CleanupAsynchronous();
void PostRecvStatusReq( int proc );
void PostRecvSLReq( int proc );
void SendMsg(int dest, std::vector<int> &msg);
void SendAllMsg(std::vector<int> &msg);
void RecvMsgs(std::vector<std::vector<int> > &msgs);
void SendSLs(int dst,
std::vector<avtStreamlineWrapper*> &);
int RecvSLs(std::list<avtStreamlineWrapper*> &,
int &earlyTerminations);
bool ExchangeSLs( std::list<avtStreamlineWrapper *> &,
std::vector<std::vector<avtStreamlineWrapper *> >&,
int &earlyTerminations );
void InitRequests();
void CheckPendingSendRequests();
void CleanupAsynchronous();
void PostRecvStatusReq( int proc );
void PostRecvSLReq( int proc );
void SendMsg(int dest, std::vector<int> &msg);
void SendAllMsg(std::vector<int> &msg);
void RecvMsgs(std::vector<std::vector<int> > &msgs);
void SendSLs(int dst,
std::vector<avtStreamlineWrapper*> &);
int RecvSLs(std::list<avtStreamlineWrapper*> &);
int RecvSLs(std::list<avtStreamlineWrapper*> &,
int &earlyTerminations);
bool ExchangeSLs( std::list<avtStreamlineWrapper *> &,
std::vector<std::vector<avtStreamlineWrapper *> >&,
int &earlyTerminations );
int rank, nProcs;
std::map<MPI_Request, unsigned char*> sendSLBufferMap, recvSLBufferMap;
......
......@@ -476,7 +476,8 @@ avtSLAlgorithm::ReportStatistics(ostream &os)
os<<"Method= "<<AlgoName()<<" nCPUs= "<<nCPUs<<" nDom= "<<numDomains;
os<<" nPts= "<<numSeedPoints<<endl;
os<<" maxCount= "<<streamlineFilter->maxCount;
os<<" domCache= "<<streamlineFilter->cacheQLen<<endl;
os<<" domCache= "<<streamlineFilter->cacheQLen;
os<<" workGrp= "<<streamlineFilter->workGroupSz<<endl;
os<<endl;
ReportTimings(os, true);
......@@ -486,6 +487,7 @@ avtSLAlgorithm::ReportStatistics(ostream &os)
os<<endl<<"Per Proccess:"<<endl;
ReportTimings(os, false);
ReportCounters(os, false);
os<<endl;
}
// ****************************************************************************
......
......@@ -2133,10 +2133,10 @@ avtStreamlineFilter::GetSeedPoints(std::vector<avtStreamlineWrapper *> &pts)
continue;
}
debug1<<"Candidate pt: "<<i<<" "<<candidatePts[i];
debug1<<" id= "<<i<<" dom =[";
for (int j = 0; j < dl.size();j++)debug1<<dl[j]<<", ";
debug1<<"]\n";
debug5<<"Candidate pt: "<<i<<" "<<candidatePts[i];
debug5<<" id= "<<i<<" dom =[";
for (int j = 0; j < dl.size();j++)debug5<<dl[j]<<", ";
debug5<<"]\n";
// Add seed for each domain/pt. At this point, we don't know where
// the pt belongs....
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment