Commit 53450c13 authored by pugmire's avatar pugmire

New communication pattern.

git-svn-id: http://visit.ilight.com/svn/visit/trunk/src@13524 18c085ea-50e0-402c-830e-de6fd14e8384
parent 45e32b2c
......@@ -83,6 +83,7 @@ avtCommDSOnDemandICAlgorithm::avtCommDSOnDemandICAlgorithm(avtPICSFilter *picsFi
: avtParICAlgorithm(picsFilter), DSLatencyTime("dsLat")
{
domainCacheSizeLimit = cacheSize;
numDone = 0;
}
// ****************************************************************************
......@@ -209,7 +210,7 @@ avtCommDSOnDemandICAlgorithm::RunAlgorithm()
SortIntegralCurves(activeICs);
SortIntegralCurves(oobICs);
int numDone = 0;
numDone = 0;
int numParticlesTotal = activeICs.size() + oobICs.size();
int numParticlesResolved = 0;
......@@ -217,9 +218,9 @@ avtCommDSOnDemandICAlgorithm::RunAlgorithm()
CheckCacheVacancy(true);
Barrier();
Sleep(40);
HandleMessages(numDone);
HandleMessages();
Sleep(40);
HandleMessages(numDone);
HandleMessages();
while (numDone < nProcs)
{
......@@ -268,7 +269,7 @@ avtCommDSOnDemandICAlgorithm::RunAlgorithm()
}
CheckCacheVacancy(true);
HandleMessages(numDone);
HandleMessages();
}
debug1<<"All done. terminated sz= "<<terminatedICs.size()<<endl;
......@@ -353,13 +354,13 @@ avtCommDSOnDemandICAlgorithm::RequestDataset(DomainType &d)
// ****************************************************************************
void
avtCommDSOnDemandICAlgorithm::HandleMessages(int &numDone)
avtCommDSOnDemandICAlgorithm::HandleMessages()
{
vector<MsgCommData> msgs;
vector<DSCommData> ds;
bool blockAndWait = activeICs.empty() && (numDone < nProcs);
blockAndWait = false;
//blockAndWait = false;
RecvAny(&msgs, NULL, &ds, blockAndWait);
......@@ -768,4 +769,51 @@ avtCommDSOnDemandICAlgorithm::CheckCacheVacancy(bool makeReq)
}
}
// ****************************************************************************
// Method: avtCommDSOnDemandICAlgorithm::PostStepCallback
//
// Purpose: Do any processing needed after integration step has taken place.
//
//
// Programmer: Dave Pugmire
// Creation: January 14, 2011
//
// ****************************************************************************
bool
avtCommDSOnDemandICAlgorithm::PostStepCallback()
{
vector<MsgCommData> msgs;
RecvAny(&msgs, NULL, NULL, false);
for (int i = 0; i < msgs.size(); i++)
{
int sendRank = msgs[i].rank;
vector<int> &m = msgs[i].message;
int msgType = m[0];
if (msgType == DONE)
numDone++;
else if (msgType == DATASET_REQUEST)
{
int dom = m[1];
vtkDataSet *d = GetDomain(dom);
if (d)
{
vector<vtkDataSet *> ds;
vector<DomainType> doms;
doms.push_back(DomainType(dom, 0));
ds.push_back(d);
SendDS(sendRank, ds, doms);
}
else
{
//Why am I being asked for a domain I don't own?
EXCEPTION0(ImproperUseException);
}
}
}
}
#endif
......@@ -71,6 +71,9 @@ class domainCacheEntry
// Dave Pugmire, Fri Dec 17 12:15:04 EST 2010
// Override PostRunAlgorithm as a no-op.
//
// Dave Pugmire, Fri Jan 14 11:06:09 EST 2011
// Added PostStepCallback() method, numDone as member data.
//
// ****************************************************************************
class avtCommDSOnDemandICAlgorithm : public avtParICAlgorithm
......@@ -84,6 +87,7 @@ class avtCommDSOnDemandICAlgorithm : public avtParICAlgorithm
virtual void ResetIntegralCurvesForContinueExecute();
virtual bool CheckNextTimeStepNeeded(int curTimeSlice);
virtual void AddIntegralCurves(std::vector<avtIntegralCurve*> &ics);
virtual bool PostStepCallback();
protected:
vtkDataSet *GetDataset(const DomainType &dom);
......@@ -93,7 +97,7 @@ class avtCommDSOnDemandICAlgorithm : public avtParICAlgorithm
virtual void SortIntegralCurves(std::list<avtIntegralCurve *> &);
virtual void HandleOOBIC(avtIntegralCurve *s);
virtual bool RequestDataset(DomainType &d);
virtual void HandleMessages(int &numDone);
virtual void HandleMessages();
void AddRef(const DomainType &dom);
void DelRef(const DomainType &dom);
......@@ -109,6 +113,7 @@ class avtCommDSOnDemandICAlgorithm : public avtParICAlgorithm
virtual vtkDataSet *GetDSFromDomainCache(const DomainType &dom);
virtual void AddDSToDomainCache(std::vector<DSCommData> &ds);
int numDone;
std::list<domainCacheEntry> domainCache;
int domainCacheSizeLimit;
......
......@@ -105,6 +105,9 @@
// Dave Pugmire, Mon Dec 20 15:01:14 EST 2010
// Added Sleep() method.
//
// Dave Pugmire, Fri Jan 14 11:06:09 EST 2011
// Added PostStepCallback() method.
//
// ****************************************************************************
class avtICAlgorithm
......@@ -122,6 +125,7 @@ class avtICAlgorithm
virtual void GetTerminatedICs(vector<avtIntegralCurve *> &v);
virtual void AddIntegralCurves(std::vector<avtIntegralCurve*> &ics) = 0;
virtual void DeleteIntegralCurves(std::vector<int> &icIDs);
virtual bool PostStepCallback() { return false; }
protected:
virtual void RunAlgorithm() = 0;
......
......@@ -109,6 +109,15 @@ Consider the leaveDomains ICs and the balancing at the same time.
#include <mpi.h>
#endif
avtPICSFilter *pcFilter = NULL;
bool PostStepCB(void)
{
if (pcFilter)
return pcFilter->PostStepCallback();
return false;
}
// ****************************************************************************
// Method: avtPICSFilter constructor
//
......@@ -1825,7 +1834,7 @@ void
avtPICSFilter::IntegrateDomain(avtIntegralCurve *ic,
vtkDataSet *ds,
double *extents,
int maxSteps )
int maxSteps)
{
int t0 = visitTimer->StartTimer();
if (DebugStream::Level4())
......@@ -1834,7 +1843,10 @@ avtPICSFilter::IntegrateDomain(avtIntegralCurve *ic,
if (ic->status == avtIntegralCurve::STATUS_OK)
{
avtIVPField* field = GetFieldForDomain(ic->domain, ds);
ic->Advance( field );
//pcFilter = this;
//ic->SetPostStepCallback(&PostStepCB);
ic->Advance(field);
delete field;
}
......@@ -2581,3 +2593,20 @@ avtPICSFilter::CacheLocators(void)
return true;
#endif
}
// ****************************************************************************
// Method: avtPICSFilter::PostStepCallback()
//
// Purpose: Callback after each integration step is taken.
//
//
// Programmer: Dave Pugmire
// Creation: January 14, 2011
//
// ****************************************************************************
bool
avtPICSFilter::PostStepCallback()
{
return (icAlgo ? icAlgo->PostStepCallback() : false);
}
......@@ -124,6 +124,10 @@ class avtICAlgorithm;
// Hank Childs, Sun Nov 28 12:20:12 PST 2010
// Add support for caching locators in the database.
//
// Dave Pugmire, Fri Jan 14 11:09:59 EST 2011
// Add new communication pattern: RestoreSequenceAssembleUniformly, and
// PostStepCallback()
//
// ****************************************************************************
class AVTFILTERS_API avtPICSFilter :
......@@ -133,7 +137,8 @@ class AVTFILTERS_API avtPICSFilter :
public:
enum CommunicationPattern
{
RestoreSequence = 0,
RestoreSequenceAssembleOnCurrentProcessor = 0,
RestoreSequenceAssembleUniformly,
ReturnToOriginatingProcessor,
LeaveOnCurrentProcessor,
UndefinedCommunicationPattern
......@@ -175,6 +180,7 @@ class AVTFILTERS_API avtPICSFilter :
virtual void UpdateDataObjectInfo(void);
void ConvertToCartesian(bool val) { convertToCartesian = val; };
bool PostStepCallback();
protected:
......
......@@ -237,14 +237,17 @@ avtParICAlgorithm::PostRecv(int tag)
}
void
avtParICAlgorithm::PostRecv(int tag, int sz)
avtParICAlgorithm::PostRecv(int tag, int sz, int src)
{
sz += sizeof(avtParICAlgorithm::Header);
unsigned char *buff = new unsigned char[sz];
memset(buff, 0, sz);
MPI_Request req;
MPI_Irecv(buff, sz, MPI_BYTE, MPI_ANY_SOURCE, tag, VISIT_MPI_COMM, &req);
if (src == -1)
MPI_Irecv(buff, sz, MPI_BYTE, MPI_ANY_SOURCE, tag, VISIT_MPI_COMM, &req);
else
MPI_Irecv(buff, sz, MPI_BYTE, src, tag, VISIT_MPI_COMM, &req);
RequestTagPair entry(req, tag);
recvBuffers[entry] = buff;
......@@ -1014,6 +1017,10 @@ avtParICAlgorithm::RecvDS(vector<DSCommData> &ds)
// Hank Childs, Tue Jun 8 09:30:45 CDT 2010
// Add infrastructure to support new communication patterns.
//
// Dave Pugmire, Fri Jan 14 11:07:41 EST 2011
// Added a new communication pattern, RestoreSequenceAssembleUniformly and
// renamed RestoreIntegralCurveSequence to RestoreIntegralCurveSequenceAssembleOnCurrentProcessor
//
// ****************************************************************************
void
......@@ -1030,8 +1037,10 @@ avtParICAlgorithm::PostRunAlgorithm()
avtPICSFilter::CommunicationPattern pattern =
picsFilter->GetCommunicationPattern();
if (pattern == avtPICSFilter::RestoreSequence)
RestoreIntegralCurveSequence();
if (pattern == avtPICSFilter::RestoreSequenceAssembleOnCurrentProcessor)
RestoreIntegralCurveSequenceAssembleOnCurrentProcessor();
else if (pattern == avtPICSFilter::RestoreSequenceAssembleUniformly)
RestoreIntegralCurveSequenceAssembleUniformly();
else if (pattern == avtPICSFilter::LeaveOnCurrentProcessor)
;
else if (pattern == avtPICSFilter::ReturnToOriginatingProcessor)
......@@ -1061,7 +1070,7 @@ CountIDs(list<avtIntegralCurve *> &l, int id)
}
// ****************************************************************************
// Method: avtParICAlgorithm::RestoreIntegralCurveSequence
// Method: avtParICAlgorithm::RestoreIntegralCurveSequenceAssembleOnCurrentProcessor
//
// Purpose:
// Communicate streamlines pieces to destinations.
......@@ -1084,10 +1093,13 @@ CountIDs(list<avtIntegralCurve *> &l, int id)
// Dave Pugmire, Mon Nov 29 09:23:01 EST 2010
// Cleanup only the STREAMLINE_TAG requests.
//
// Dave Pugmire, Fri Jan 14 11:07:41 EST 2011
// Renamed RestoreIntegralCurveSequence to RestoreIntegralCurveSequenceAssembleOnCurrentProcessor
//
// ****************************************************************************
void
avtParICAlgorithm::RestoreIntegralCurveSequence()
avtParICAlgorithm::RestoreIntegralCurveSequenceAssembleOnCurrentProcessor()
{
debug5<<"RestoreIntegralCurveSequence: communicatedICs: "
<<communicatedICs.size()
......@@ -1235,6 +1247,7 @@ avtParICAlgorithm::RestoreIntegralCurveSequence()
//Advance to next N streamlines.
maxId += N;
minId += N;
CheckPendingSendRequests();
}
//All ICs are distributed, merge the sequences into single streamlines.
......@@ -1244,6 +1257,193 @@ avtParICAlgorithm::RestoreIntegralCurveSequence()
delete [] myIDs;
}
// ****************************************************************************
// Method: avtParICAlgorithm::RestoreIntegralCurveSequenceAssembleUniformly
//
// Purpose: Communicate streamlines pieces to destinations.
// When a streamline is communicated, only the state information is sent.
// All the integration steps need to resassmbled. This method assigns curves
// curves uniformly across all procs, and assembles the pieces.
//
// Programmer: Dave Pugmire
// Creation: January 14, 2011
//
// ****************************************************************************
void
avtParICAlgorithm::RestoreIntegralCurveSequenceAssembleUniformly()
{
debug5<<"RestoreIntegralCurveSequenceAssembleUniformly: communicatedICs: "
<<communicatedICs.size()
<<" terminatedICs: "<<terminatedICs.size()<<endl;
//Create larger streamline buffers.
CleanupRequests(avtParICAlgorithm::STREAMLINE_TAG);
messageTagInfo[avtParICAlgorithm::STREAMLINE_TAG] = pair<int,int>(numSLRecvs, 512*1024);
for (int i = 0; i < numSLRecvs; i++)
PostRecv(avtParICAlgorithm::STREAMLINE_TAG);
//Stuff all ICs into one list, and sort.
std::list<avtIntegralCurve *> allICs;
allICs.insert(allICs.end(), terminatedICs.begin(), terminatedICs.end());
allICs.insert(allICs.end(), communicatedICs.begin(), communicatedICs.end());
allICs.sort(avtStateRecorderIntegralCurve::IdSeqCompare);
terminatedICs.clear();
communicatedICs.clear();
//Communicate to everyone where the pieces are located.
//Do this "N" streamlines at a time, so we don't have a super big buffer.
int N;
if (numSeedPoints > 500)
N = 500;
else
N = numSeedPoints;
long *idBuffer = new long[N], *myIDs = new long[N];
int minId = 0;
int maxId = N-1;
int nLoops = 0;
if( N > 0 )
{
nLoops = numSeedPoints/N;
if (numSeedPoints % N != 0)
nLoops++;
}
for (int l = 0; l < nLoops; l++)
{
//Initialize arrays for this round.
for (int i = 0; i < N; i++)
{
idBuffer[i] = 0;
myIDs[i] = 0;
}
//Count ICs by id (could have multiple IDs).
list<avtIntegralCurve*>::iterator it = allICs.begin();
while (it != allICs.end() && (*it)->id <= maxId)
{
if ((*it)->id >= minId)
{
int idx = (*it)->id % N;
myIDs[idx] ++;
//debug1<<"I have id= "<<(*it)->id<<" : "<<(((avtStateRecorderIntegralCurve *)*it))->sequenceCnt<<" idx= "<<idx<<endl;
}
it++;
}
/*
debug1<<"myIDs: [";
for(int i=0; i<N;i++)
debug1<<myIDs[i]<<" ";
debug1<<"]"<<endl;
*/
//Exchange ID owners and sequence counts.
MPI_Allreduce(myIDs, idBuffer, N, MPI_LONG, MPI_SUM, VISIT_MPI_COMM);
/*
debug1<<"idBuffer: [";
for(int i=0; i<N;i++)
debug1<<idBuffer[i]<<" ";
debug1<<"]"<<endl;
*/
//Now we know where all ICs belong and how many sequences for each.
//Send communicatedICs to the owners.
map<int, vector<avtIntegralCurve *> > sendICs;
int numSeqAlreadyHere = 0;
while (!allICs.empty())
{
avtIntegralCurve *s = allICs.front();
if (s->id > maxId)
break;
allICs.pop_front();
int owner = s->id % nProcs;
//IC is mine.
if (owner == rank)
{
terminatedICs.push_back(s);
numSeqAlreadyHere++;
}
else
{
((avtStateRecorderIntegralCurve *)s)->serializeFlags = avtIntegralCurve::SERIALIZE_STEPS;
map<int, vector<avtIntegralCurve *> >::iterator it;
it = sendICs.find(owner);
if (it == sendICs.end())
{
vector<avtIntegralCurve*> v(1);
v[0] = s;
sendICs[owner] = v;
}
else
{
it->second.push_back(s);
}
}
}
//Send all the ICs.
map<int, vector<avtIntegralCurve *> >::iterator s_it;
for (s_it = sendICs.begin(); s_it != sendICs.end(); s_it++)
{
if (s_it->second.size() > 0)
{
//debug1<<"SendIC : "<<s_it->second[0]->id<<" to "<<s_it->first<<" num= "<<s_it->second.size()<<endl;
DoSendICs(s_it->first, s_it->second);
for (int i = 0; i < s_it->second.size(); i++)
delete s_it->second[i];
}
}
sendICs.clear();
//Wait for all the sequences to arrive. The total number is known for
//each IC, so wait until they all come.
int numICsToBeRecvd = 0;
for (int i = minId; i <= maxId; i++)
{
if (i % nProcs == rank)
numICsToBeRecvd += idBuffer[i%N];
}
numICsToBeRecvd -= numSeqAlreadyHere;
while (numICsToBeRecvd > 0)
{
list<ICCommData> ICs;
RecvAny(NULL, &ICs, NULL, true);
list<ICCommData>::iterator it;
for (it = ICs.begin(); it != ICs.end(); it++)
{
//int seq = ((avtStateRecorderIntegralCurve *)(*it).ic)->sequenceCnt;
//debug1<<"Recvd id= "<<(*it).ic->id<<" : "<<seq<<endl;
terminatedICs.push_back((*it).ic);
numICsToBeRecvd--;
}
CheckPendingSendRequests();
}
//Advance to next N curves.
maxId += N;
minId += N;
CheckPendingSendRequests();
}
//All ICs are distributed, merge the sequences into single curves.
MergeTerminatedICSequences();
delete [] idBuffer;
delete [] myIDs;
}
// ****************************************************************************
// Method: avtParICAlgorithm::MergeTerminatedICSequences
//
......
......@@ -111,6 +111,10 @@ class DSCommData;
// Dave Pugmire, Wed Jan 5 07:57:21 EST 2011
// New datastructures for msg/ic/ds.
//
// Dave Pugmire, Fri Jan 14 11:07:41 EST 2011
// Added a new communication pattern, RestoreSequenceAssembleUniformly and
// renamed RestoreIntegralCurveSequence to RestoreIntegralCurveSequenceAssembleOnCurrentProcessor
//
// ****************************************************************************
class avtParICAlgorithm : public avtICAlgorithm
......@@ -154,7 +158,8 @@ class avtParICAlgorithm : public avtICAlgorithm
std::vector<DSCommData> *ds,
bool blockAndWait);
void RestoreIntegralCurveSequence();
void RestoreIntegralCurveSequenceAssembleOnCurrentProcessor();
void RestoreIntegralCurveSequenceAssembleUniformly();
void MergeTerminatedICSequences();
......@@ -170,7 +175,7 @@ class avtParICAlgorithm : public avtICAlgorithm
private:
void PostRecv(int tag);
void PostRecv(int tag, int sz);
void PostRecv(int tag, int sz, int src=-1);
void SendData(int dst, int tag, MemStream *buff);
bool RecvData(std::set<int> &tags,
std::vector<std::pair<int,MemStream *> > &buffers,
......
......@@ -220,6 +220,9 @@ class vtkAppendPolyData;
// Hank Childs, Sun Dec 5 10:43:57 PST 2010
// Add new data members for warnings.
//
// Dave Pugmire, Fri Jan 14 11:10:44 EST 2011
// Set default communication pattern to RestoreSequenceAssembleUniformly.
//
// ****************************************************************************
class AVTFILTERS_API avtStreamlineFilter : virtual public avtPICSFilter
......@@ -327,7 +330,7 @@ class AVTFILTERS_API avtStreamlineFilter : virtual public avtPICSFilter
virtual std::vector<avtVector> GetInitialLocations(void);
virtual CommunicationPattern GetCommunicationPattern(void)
{ return RestoreSequence; };
{ return RestoreSequenceAssembleUniformly; };
};
......
......@@ -260,7 +260,7 @@ avtStreamlinePolyDataFilter::CreateIntegralCurveOutput(vector<avtIntegralCurve *
float theta = 0.0, prevT = 0.0;
avtVector lastPos;
cerr << phiFactor << " " << (phiFactor == 0.0) << endl;
//cerr << phiFactor << " " << (phiFactor == 0.0) << endl;
for (int j = 0; j < numSamps; j++)
{
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment