Commit 0e9a0bba authored by pugmire's avatar pugmire
Browse files

Add communicate to originating processor. Bug fix for streamlines going both...

Add communicate to originating processor. Bug fix for streamlines going both directions and tube with variable radius

git-svn-id: http://visit.ilight.com/svn/visit/trunk/src@16064 18c085ea-50e0-402c-830e-de6fd14e8384
parent 8269d19f
......@@ -2448,6 +2448,9 @@ avtPICSFilter::AddSeedPoints(std::vector<avtVector> &pts,
// Don't do initialization of distance for integral curves. That happens in
// the derived type of integral curve now.
//
// Dave Pugmire, Thu Sep 1 07:44:48 EDT 2011
// Keep track of fwd/bwd pair IDs, as they might need to be unified later.
//
// ****************************************************************************
void
......@@ -2507,34 +2510,46 @@ avtPICSFilter::CreateIntegralCurvesFromSeeds(std::vector<avtVector> &pts,
else
seedPt = pts[i];
if (integrationDirection == VTK_INTEGRATE_FORWARD ||
integrationDirection == VTK_INTEGRATE_BOTH_DIRECTIONS)
if (integrationDirection == VTK_INTEGRATE_FORWARD)
{
avtIntegralCurve *ic =
CreateIntegralCurve(solver,
avtIntegralCurve::DIRECTION_FORWARD,
seedTime0, seedPt, seedVel,
GetNextCurveID());
avtIntegralCurve *ic = CreateIntegralCurve(solver,
avtIntegralCurve::DIRECTION_FORWARD,
seedTime0, seedPt, seedVel,
GetNextCurveID());
ic->domain = dom;
curves.push_back(ic);
seedPtIds.push_back(ic->id);
}
if (integrationDirection == VTK_INTEGRATE_BACKWARD ||
integrationDirection == VTK_INTEGRATE_BOTH_DIRECTIONS)
else if (integrationDirection == VTK_INTEGRATE_BACKWARD)
{
avtIntegralCurve *ic =
CreateIntegralCurve(solver,
avtIntegralCurve::DIRECTION_BACKWARD,
seedTime0, seedPt, seedVel,
GetNextCurveID());
avtIntegralCurve *ic = CreateIntegralCurve(solver,
avtIntegralCurve::DIRECTION_BACKWARD,
seedTime0, seedPt, seedVel,
GetNextCurveID());
ic->domain = dom;
curves.push_back(ic);
seedPtIds.push_back(ic->id);
}
else if (integrationDirection == VTK_INTEGRATE_BOTH_DIRECTIONS)
{
avtIntegralCurve *ic0 = CreateIntegralCurve(solver,
avtIntegralCurve::DIRECTION_FORWARD,
seedTime0, seedPt, seedVel,
GetNextCurveID());
ic0->domain = dom;
curves.push_back(ic0);
seedPtIds.push_back(ic0->id);
avtIntegralCurve *ic1 = CreateIntegralCurve(solver,
avtIntegralCurve::DIRECTION_BACKWARD,
seedTime0, seedPt, seedVel,
GetNextCurveID());
ic1->domain = dom;
curves.push_back(ic1);
seedPtIds.push_back(ic1->id);
fwdBwdICPairs.push_back(std::pair<int,int> (ic0->id, ic1->id));
}
}
ids.push_back(seedPtIds);
......
......@@ -176,7 +176,8 @@ class AVTFILTERS_API avtPICSFilter :
virtual std::vector<avtVector> GetInitialLocations() = 0;
virtual std::vector<avtVector> GetInitialVelocities() = 0;
virtual CommunicationPattern GetCommunicationPattern() = 0;
std::vector<std::pair<int,int> > GetFwdBwdICPairs() { return fwdBwdICPairs; }
// Methods to set the filter's attributes.
void SetFieldType(int val);
void SetFieldConstant(double val);
......@@ -238,6 +239,7 @@ class AVTFILTERS_API avtPICSFilter :
std::vector<int> domainToRank;
std::vector<vtkDataSet*>dataSets;
std::map<DomainType, avtCellLocator_p> domainToCellLocatorMap;
std::vector<std::pair<int,int> > fwdBwdICPairs;
std::vector<double> pointList;
......
......@@ -40,7 +40,7 @@
// avtParDomICAlgorithm.C //
// ************************************************************************* //
#include "avtParDomICAlgorithm.h"
#include <avtParDomICAlgorithm.h>
#include <TimingsManager.h>
using namespace std;
......@@ -174,13 +174,17 @@ avtParDomICAlgorithm::AddIntegralCurves(std::vector<avtIntegralCurve*> &ics)
for (int i = 0; i < ics.size(); i++)
{
avtIntegralCurve *s = ics[i];
if (OwnDomain(s->domain))
{
avtVector endPt;
s->CurrentLocation(endPt);
if (PointInDomain(endPt, s->domain))
{
activeICs.push_back(s);
s->originatingRank = rank;
}
else
delete s;
}
......
......@@ -1034,6 +1034,9 @@ avtParICAlgorithm::RecvDS(vector<DSCommData> &ds)
// Added a new communication pattern, RestoreSequenceAssembleUniformly and
// renamed RestoreIntegralCurveSequence to RestoreIntegralCurveSequenceAssembleOnCurrentProcessor
//
// Dave Pugmire, Thu Sep 1 07:44:48 EDT 2011
// Implement RestoreIntegralCurveToOriginatingProcessor.
//
// ****************************************************************************
void
......@@ -1057,10 +1060,7 @@ avtParICAlgorithm::PostRunAlgorithm()
else if (pattern == avtPICSFilter::LeaveOnCurrentProcessor)
;
else if (pattern == avtPICSFilter::ReturnToOriginatingProcessor)
{
EXCEPTION1(VisItException,
"This communication pattern has not been implemented.");
}
RestoreIntegralCurveToOriginatingProcessor();
else
{
EXCEPTION1(VisItException, "Undefined communication pattern");
......@@ -1288,6 +1288,45 @@ avtParICAlgorithm::RestoreIntegralCurveSequenceAssembleOnCurrentProcessor()
void
avtParICAlgorithm::RestoreIntegralCurveSequenceAssembleUniformly()
{
RestoreIntegralCurve(true);
}
// ****************************************************************************
// Method: avtParICAlgorithm::RestoreIntegralCurveToOriginatingProcessor
//
// Purpose: Communicate streamlines pieces to destinations.
// When a streamline is communicated, only the state information is sent.
// All the integration steps need to resassmbled. This method assigns curves
// curves uniformly across all procs, and assembles the pieces.
//
// Programmer: Dave Pugmire
// Creation: Mon Aug 29 15:59:30 EDT 2011
//
// ****************************************************************************
void
avtParICAlgorithm::RestoreIntegralCurveToOriginatingProcessor()
{
RestoreIntegralCurve(false);
}
// ****************************************************************************
// Method: avtParICAlgorithm::RestoreIntegralCurve
//
// Purpose:
//
//
// Arguments:
//
//
// Programmer: Dave Pugmire
// Creation: August 29, 2011
//
// ****************************************************************************
void
avtParICAlgorithm::RestoreIntegralCurve(bool uniformlyDistrubute)
{
if (DebugStream::Level5())
debug5<<"RestoreIntegralCurveSequenceAssembleUniformly: communicatedICs: "
......@@ -1319,7 +1358,8 @@ avtParICAlgorithm::RestoreIntegralCurveSequenceAssembleUniformly()
N = numSeedPoints;
long *idBuffer = new long[N], *myIDs = new long[N];
int *sendList = new int[nProcs], *mySendList = new int[nProcs];
int minId = 0;
int maxId = N-1;
int nLoops = 0;
......@@ -1340,34 +1380,38 @@ avtParICAlgorithm::RestoreIntegralCurveSequenceAssembleUniformly()
idBuffer[i] = 0;
myIDs[i] = 0;
}
for (int i = 0; i < nProcs; i++)
{
sendList[i] = 0;
mySendList[i] = 0;
}
//Count ICs by id (could have multiple IDs).
list<avtIntegralCurve*>::iterator it = allICs.begin();
while (it != allICs.end() && (*it)->id <= maxId)
{
if ((*it)->id >= minId)
avtIntegralCurve *ic = *it;
if (ic->id >= minId)
{
int idx = (*it)->id % N;
int idx = ic->id % N;
myIDs[idx] ++;
//debug1<<"I have id= "<<(*it)->id<<" : "<<(((avtStateRecorderIntegralCurve *)*it))->sequenceCnt<<" idx= "<<idx<<endl;
//debug1<<"I have id= "<<(*it)->id<<" : "<<(((avtStateRecorderIntegralCurve *)*it))->sequenceCnt<<" idx= "<<idx<<" originatingRank= "<<(*it)->originatingRank<<endl;
if (ic->originatingRank != rank)
mySendList[ic->originatingRank] ++;
}
it++;
}
/*
debug1<<"myIDs: [";
for(int i=0; i<N;i++)
debug1<<myIDs[i]<<" ";
debug1<<"mySendList= [";
for(int i=0;i<nProcs;i++) debug1<<mySendList[i]<<" ";
debug1<<"]"<<endl;
*/
//Exchange ID owners and sequence counts.
MPI_Allreduce(myIDs, idBuffer, N, MPI_LONG, MPI_SUM, VISIT_MPI_COMM);
/*
debug1<<"idBuffer: [";
for(int i=0; i<N;i++)
debug1<<idBuffer[i]<<" ";
debug1<<"]"<<endl;
*/
if (!uniformlyDistrubute)
MPI_Allreduce(mySendList, sendList, nProcs, MPI_INT, MPI_SUM, VISIT_MPI_COMM);
//debug1<<"sendList= [";
//for(int i=0;i<nProcs;i++) debug1<<sendList[i]<<" ";
//debug1<<"]"<<endl;
//Now we know where all ICs belong and how many sequences for each.
//Send communicatedICs to the owners.
......@@ -1381,7 +1425,13 @@ avtParICAlgorithm::RestoreIntegralCurveSequenceAssembleUniformly()
break;
allICs.pop_front();
int owner = s->id % nProcs;
int owner = -1;
if (uniformlyDistrubute)
owner = s->id % nProcs;
else
owner = s->originatingRank;
//IC is mine.
if (owner == rank)
{
......@@ -1425,13 +1475,21 @@ avtParICAlgorithm::RestoreIntegralCurveSequenceAssembleUniformly()
//Wait for all the sequences to arrive. The total number is known for
//each IC, so wait until they all come.
int numICsToBeRecvd = 0;
for (int i = minId; i <= maxId; i++)
if (uniformlyDistrubute)
{
if (i % nProcs == rank)
numICsToBeRecvd += idBuffer[i%N];
for (int i = minId; i <= maxId; i++)
{
if (i % nProcs == rank)
numICsToBeRecvd += idBuffer[i%N];
}
}
numICsToBeRecvd -= numSeqAlreadyHere;
else
{
numICsToBeRecvd += sendList[rank];
}
//debug1<<"numICsToBeRecvd= "<<numICsToBeRecvd<<endl;
//numICsToBeRecvd -= numSeqAlreadyHere;
debug1<<"numICsToBeRecvd= "<<numICsToBeRecvd<<endl;
while (numICsToBeRecvd > 0)
{
list<ICCommData> ICs;
......@@ -1439,7 +1497,7 @@ avtParICAlgorithm::RestoreIntegralCurveSequenceAssembleUniformly()
list<ICCommData>::iterator it;
for (it = ICs.begin(); it != ICs.end(); it++)
{
//int seq = ((avtStateRecorderIntegralCurve *)(*it).ic)->sequenceCnt;
int seq = ((avtStateRecorderIntegralCurve *)(*it).ic)->sequenceCnt;
//debug1<<"Recvd id= "<<(*it).ic->id<<" : "<<seq<<endl;
terminatedICs.push_back((*it).ic);
numICsToBeRecvd--;
......@@ -1459,6 +1517,8 @@ avtParICAlgorithm::RestoreIntegralCurveSequenceAssembleUniformly()
delete [] idBuffer;
delete [] myIDs;
delete [] sendList;
delete [] mySendList;
}
......
......@@ -158,11 +158,6 @@ class avtParICAlgorithm : public avtICAlgorithm
std::vector<DSCommData> *ds,
bool blockAndWait);
void RestoreIntegralCurveSequenceAssembleOnCurrentProcessor();
void RestoreIntegralCurveSequenceAssembleUniformly();
void MergeTerminatedICSequences();
virtual void CompileTimingStatistics();
virtual void CompileCounterStatistics();
virtual void CalculateExtraTime();
......@@ -192,6 +187,13 @@ class avtParICAlgorithm : public avtICAlgorithm
void ProcessReceivedBuffers(std::vector<unsigned char*> &incomingBuffers,
std::vector<std::pair<int, MemStream *> > &buffers);
void RestoreIntegralCurve(bool uniformlyDistrubute);
void RestoreIntegralCurveSequenceAssembleOnCurrentProcessor();
void RestoreIntegralCurveSequenceAssembleUniformly();
void RestoreIntegralCurveToOriginatingProcessor();
void MergeTerminatedICSequences();
// Send/Recv buffer management structures.
typedef std::pair<MPI_Request, int> RequestTagPair;
typedef std::pair<int, int> RankIdPair;
......
......@@ -254,6 +254,23 @@ avtStreamlineFilter::~avtStreamlineFilter()
{
}
// ****************************************************************************
// Method: avtStreamlineFilter::GetCommunicationPattern()
//
// Programmer: Dave Pugmire
// Creation: September 1, 2011
//
// ****************************************************************************
avtPICSFilter::CommunicationPattern
avtStreamlineFilter::GetCommunicationPattern()
{
if (! scaleTubeRadiusVariable.empty())
return avtPICSFilter::ReturnToOriginatingProcessor;
return avtPICSFilter::RestoreSequenceAssembleUniformly;
}
// ****************************************************************************
// Method: avtStreamlineFilter::GenerateAttributeFields() const
//
......
......@@ -347,8 +347,7 @@ class AVTFILTERS_API avtStreamlineFilter : virtual public avtPICSFilter
virtual std::vector<avtVector> GetInitialLocations(void);
virtual std::vector<avtVector> GetInitialVelocities(void);
virtual CommunicationPattern GetCommunicationPattern(void)
{ return RestoreSequenceAssembleUniformly; };
virtual CommunicationPattern GetCommunicationPattern(void);
};
#endif
......
......@@ -291,6 +291,9 @@ avtStreamlinePolyDataFilter::CreateIntegralCurveOutput(std::vector<avtIntegralCu
correlationDistAngTolToUse = cos(correlationDistanceAngTol *M_PI/180.0);
}
if ( !scaleTubeRadiusVariable.empty())
ProcessVaryTubeRadiusByScalar(ics);
vtkIdType pIdx = 0, idx = 0;
for (int i = 0; i < numICs; i++)
{
......@@ -303,7 +306,6 @@ avtStreamlinePolyDataFilter::CreateIntegralCurveOutput(std::vector<avtIntegralCu
line->GetPointIds()->SetNumberOfIds(numSamps);
float theta = 0.0, prevT = 0.0;
avtVector lastPos;
//cerr << phiScaling << " " << (phiScaling == 0.0) << endl;
......@@ -402,7 +404,6 @@ avtStreamlinePolyDataFilter::CreateIntegralCurveOutput(std::vector<avtIntegralCu
scaleTubeRad->InsertTuple1(pIdx, s.scalar2);
pIdx++;
lastPos = s.position;
}
lines->InsertNextCell(line);
......@@ -518,3 +519,68 @@ avtStreamlinePolyDataFilter::ComputeCorrelationDistance(int idx, avtStateRecord
return val;
}
static avtStateRecorderIntegralCurve * icFromID(int id, std::vector<avtIntegralCurve *> &ics)
{
for (int i = 0; i < ics.size(); i++)
{
if (ics[i]->id == id)
return dynamic_cast<avtStateRecorderIntegralCurve*>(ics[i]);
}
return NULL;
}
// ****************************************************************************
// Method: avtStreamlinePolyDataFilter::ProcessVaryTubeRadiusByScalar
//
// Purpose: Unify the radius scaling parameter for streamlines that go in both
// directions. Since both dir streamlines are split up, they will
// be treated separately, resulting in different scaling.
//
//
// Programmer: Dave Pugmire
// Creation: August 24, 2011
//
// ****************************************************************************
void
avtStreamlinePolyDataFilter::ProcessVaryTubeRadiusByScalar(std::vector<avtIntegralCurve *> &ics)
{
for (int i = 0; i < fwdBwdICPairs.size(); i++)
{
avtStateRecorderIntegralCurve *ic[2] = {icFromID(fwdBwdICPairs[i].first, ics),
icFromID(fwdBwdICPairs[i].second, ics)};
if (ic[0] == NULL || ic[1] == NULL)
{
EXCEPTION1(ImproperUseException, "Integral curve ID not found.");
}
//Get the min/max for each pair of ICs.
double range[2] = {std::numeric_limits<double>::max(), -std::numeric_limits<double>::max()};
for (int i = 0; i < 2; i++)
{
size_t n = ic[i]->GetNumberOfSamples();
for (int j = 0; j < n; j++)
{
avtStateRecorderIntegralCurve::Sample s = ic[i]->GetSample(j);
if (s.scalar2 < range[0])
range[0] = s.scalar2;
if (s.scalar2 > range[1])
range[1] = s.scalar2;
}
}
double dRange = range[1]-range[0];
//Scale them into the same range.
for (int i = 0; i < 2; i++)
{
size_t n = ic[i]->GetNumberOfSamples();
for (int j = 0; j < n; j++)
{
avtStateRecorderIntegralCurve::Sample s = ic[i]->GetSample(j);
s.scalar2 = (s.scalar2-range[0])/dRange;
}
}
}
}
......@@ -111,6 +111,8 @@ class AVTFILTERS_API avtStreamlinePolyDataFilter : public avtStreamlineFilter
void CreateIntegralCurveOutput(std::vector<avtIntegralCurve *> &streamlines);
float ComputeCorrelationDistance(int idx, avtStateRecorderIntegralCurve *ic,
double angTol, double minDist);
void ProcessVaryTubeRadiusByScalar(std::vector<avtIntegralCurve *> &ics);
int coordinateSystem;
bool phiScalingFlag;
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment