Commit 53bc6530 authored by pugmire's avatar pugmire

optimize the exchange of data.

git-svn-id: http://visit.ilight.com/svn/visit/trunk/src@21795 18c085ea-50e0-402c-830e-de6fd14e8384
parent 52d1dffe
......@@ -229,7 +229,7 @@ avtRPOTFilter::CalculateThresholds(int loc)
thresholds.resize(nArrs);
vtkRInterface *RI = vtkRInterface::New();
cout<<"CalculateThresholds("<<loc<<") nArrs= "<<nArrs<<endl;
//cout<<"CalculateThresholds("<<loc<<") nArrs= "<<nArrs<<endl;
for (int i = 0; i < nArrs; i++)
{
......@@ -490,27 +490,9 @@ avtRPOTFilter::Initialize()
//cout<<"values:"<<values.size()<<" x "<<values[0].size()<<" x "<<values[0][0].size()<<endl;
//cout<<"values["<<numTuples<<"]["<<numBins<<"][numTimes]"<<endl;
idx0 = 0;
idxN = numTuples;
#ifdef PARALLEL
int rank = PAR_Rank();
int nProcs = PAR_Size();
int nSamplesPerProc = (numTuples / nProcs);
int oneExtraUntil = (numTuples % nProcs);
if (rank < oneExtraUntil)
{
idx0 = (rank)*(nSamplesPerProc+1);
idxN = (rank+1)*(nSamplesPerProc+1);
}
else
{
idx0 = (rank)*(nSamplesPerProc) + oneExtraUntil;
idxN = (rank+1)*(nSamplesPerProc) + oneExtraUntil;
}
debug1<<"I have: ["<<idx0<<" "<<idxN<<"]"<<endl;
#endif
pair<int,int> locs = LocationsForRank(PAR_Rank());
idx0 = locs.first;
idxN = locs.second;
if (atts.GetDumpData())
{
......@@ -613,30 +595,14 @@ avtRPOTFilter::Execute()
}
}
debug1<<"Processing "<<timeNow<<" index= "<<index<<" loc0 sz= "<<values[0][0].size()<<endl;
visitTimer->StopTimer(t1, "RPOT::Execute");
}
// ****************************************************************************
// Method: avtRPOTFilter::CreateFinalOutput
//
// Programmer: Dave Pugmire
// Creation: February 7, 2012
//
// Modifications:
//
// Dave Pugmire, Wed Apr 11 17:07:26 EDT 2012
// Remove the processing of locations in rounds. No longer needed.
//
// ****************************************************************************
void
avtRPOTFilter::CreateFinalOutput()
avtRPOTFilter::ExchangeData0()
{
//avtCallback::ResetTimeout(0);
int t1 = visitTimer->StartTimer();
//Exchange data....
#ifdef PARALLEL
#if PARALLEL
float *tmp = new float[numTimes];
float *res = new float[numTimes];
int *flags = new int[numTimes];
......@@ -688,8 +654,218 @@ avtRPOTFilter::CreateFinalOutput()
delete [] flags;
delete [] flagsRes;
#endif
visitTimer->StopTimer(t1, "RPOT::FinalOutput exchange data 1");
}
void
avtRPOTFilter::ExchangeData()
{
#if PARALLEL
//Determine how many samples need to be communicated.
int nRanks = PAR_Size();
int rank = PAR_Rank();
int *localSenders = new int[nRanks];
int *localMaxCnt = new int[nRanks];
for (int i = 0; i < nRanks; i++)
localSenders[i] = localMaxCnt[i] = 0;
//Figure out the buffer sizes.
for (int i = 0; i < nRanks; i++)
{
if (rank == i)
continue;
pair<int,int> locs = LocationsForRank(i);
int cnt = 1; //For number of locs.
for (int j = locs.first; j < locs.second; j++)
{
cnt++; // location.
for (int b = 0; b < numBins; b++)
{
cnt++; //number in this bin.
cnt += 2*values[j][b].size(); // num of val/cycles.
}
}
if (cnt > 1)
{
localSenders[i] = 1;
localMaxCnt[i] = cnt;
}
}
// This tells each rank how many senders, and the max send buffer size.
int *globalSenders = new int[nRanks];
int *globalMaxCnt = new int[nRanks];
MPI_Allreduce(localSenders, globalSenders, nRanks, MPI_INT, MPI_SUM, VISIT_MPI_COMM);
MPI_Allreduce(localMaxCnt, globalMaxCnt, nRanks, MPI_INT, MPI_MAX, VISIT_MPI_COMM);
if (rank == 0)
for (int i = 0; i < nRanks; i++)
debug1<<i<<": "<<globalSenders[i]<<" maxSz= "<<globalMaxCnt[i]<<endl;
//Post recvs.
int tag = 10001;
int numToRecv = globalSenders[rank];
MPI_Request *rReqs = new MPI_Request[numToRecv];
float **rBuffs = new float*[numToRecv];
int buffSz = globalMaxCnt[rank];
for (int i = 0; i < numToRecv; i++)
{
rBuffs[i] = new float[buffSz];
int x = MPI_Irecv(rBuffs[i], buffSz, MPI_FLOAT, MPI_ANY_SOURCE, tag, VISIT_MPI_COMM, &rReqs[i]);
debug1<<rank<<": postRecv of sz= "<<buffSz<<" ret= "<<x<<endl;
}
//Post sends.
int numToSend = 0;
for (int i = 0; i < nRanks; i++)
if (localSenders[i] > 0)
numToSend++;
MPI_Request *sReqs = new MPI_Request[numToSend];
float **sBuffs = new float*[numToSend];
int sCnt = 0;
for (int r = 0; r < nRanks; r++)
{
if (r == rank || localSenders[r] == 0)
continue;
int buffSz = localMaxCnt[r];
sBuffs[sCnt] = new float[buffSz];
pair<int,int> locs = LocationsForRank(r);
int cnt = 0;
sBuffs[sCnt][cnt++] = (locs.second-locs.first);
debug1<<rank<<": Send nLocs= "<<sBuffs[sCnt][0];
for (int i = locs.first; i < locs.second; i++)
{
sBuffs[sCnt][cnt++] = i; //location.
int numVals = 0;
for (int b = 0; b < numBins; b++)
numVals += values[i][b].size();
sBuffs[sCnt][cnt++] = numVals;
debug1<<" ("<<i<<","<<numVals<<") ";
for (int b = 0; b < numBins; b++)
{
int nt = values[i][b].size();
for (int t = 0; t < nt; t++)
{
sBuffs[sCnt][cnt++] = values[i][b][t].Cycle;
sBuffs[sCnt][cnt++] = values[i][b][t].val;
}
}
}
debug1<<"CNT= "<<cnt<<endl;
int x = MPI_Isend(sBuffs[sCnt], cnt, MPI_FLOAT, r, tag, VISIT_MPI_COMM, &sReqs[sCnt]);
debug1<<"send "<<cnt<<" buffSz= "<<buffSz<<endl;
debug1<<rank<<" ==> "<<r<<" buffSz= "<<buffSz<<" ret= "<<x<<endl;
sCnt++;
}
MPI_Status *sendStats = new MPI_Status[numToSend];
MPI_Status *recvStats = new MPI_Status[numToRecv];
int ns = numToSend, nr = numToRecv;
debug1<<endl<<endl;
debug1<<"while: ns= "<<ns<<" nr= "<<nr<<endl;
while (ns > 0 || nr > 0)
{
if (nr > 0)
{
int *idx = new int[numToRecv];
MPI_Status *status = new MPI_Status[numToRecv];
int num;
MPI_Testsome(numToRecv, rReqs, &num, idx, status);
if (num > 0)
{
debug1<<"RECV COMPLETE num= "<<num<<endl;
nr -= num;
for (int i = 0; i < num; i++)
{
float *buff = rBuffs[idx[i]];
int nLocs = (int)buff[0];
debug1<<rank<<": Recv "<<num<<" nLocs= "<<nLocs<<" sz= "<<status[i]._count/4;
int cnt = 1;
for (int j = 0; j < nLocs; j++)
{
int loc = (int)buff[cnt++];
int nt = (int)buff[cnt++];
debug1<<" ("<<loc<<","<<nt<<") ";
for (int t = 0; t < nt; t++)
{
int cycle = buff[cnt++];
float val = buff[cnt++];
int index = GetIndexFromDay(cycle);
values[loc][index].push_back(sample(val, cycle));
//if (cnt < 200)
//debug1<<" "<<cycle<<" "<<val<<" idx= "<<index<<endl;
}
}
delete [] buff;
rBuffs[idx[i]] = NULL;
}
debug1<<endl;
}
delete [] idx;
delete [] status;
}
if (ns > 0)
{
int *idx = new int[numToSend];
MPI_Status *status = new MPI_Status[numToSend];
int num;
MPI_Testsome(numToSend, sReqs, &num, idx, status);
if (num > 0)
{
debug1<<"SEND COMPLETE num= "<<num<<endl;
ns -= num;
for (int i = 0; i < num; i++)
{
float *buff = sBuffs[idx[i]];
delete [] buff;
sBuffs[idx[i]] = NULL;
}
}
delete [] idx;
delete [] status;
}
}
delete [] sendStats;
delete [] recvStats;
delete [] sReqs;
delete [] sBuffs;
delete [] rReqs;
delete [] rBuffs;
delete [] localSenders;
delete [] localMaxCnt;
#endif
}
// ****************************************************************************
// Method: avtRPOTFilter::CreateFinalOutput
//
// Programmer: Dave Pugmire
// Creation: February 7, 2012
//
// Modifications:
//
// Dave Pugmire, Wed Apr 11 17:07:26 EDT 2012
// Remove the processing of locations in rounds. No longer needed.
//
// ****************************************************************************
void
avtRPOTFilter::CreateFinalOutput()
{
//avtCallback::ResetTimeout(0);
int t1 = visitTimer->StartTimer();
ExchangeData();
visitTimer->StopTimer(t1, "RPOT::FinalOutput exchange data 1");
int t2 = visitTimer->StartTimer();
vtkRInterface *RI = vtkRInterface::New();
......@@ -750,6 +926,41 @@ avtRPOTFilter::CreateFinalOutput()
int t3 = visitTimer->StartTimer();
#if PARALLEL
#if 1
int nVals = (idxN-idx0)*outputValsPerLoc;
if (PAR_Rank() == 0)
{
int sz = numTuples*outputValsPerLoc;
double *result = new double[sz];
int *recvCnts = new int[PAR_Size()];
int *displs = new int[PAR_Size()];
int disp = 0;
for (int r = 0; r < PAR_Size(); r++)
{
pair<int,int> locs = LocationsForRank(r);
recvCnts[r] = (locs.second-locs.first)*outputValsPerLoc;
displs[r] = disp;
disp += recvCnts[r];
}
MPI_Gatherv(outputData, nVals, MPI_DOUBLE, result, recvCnts, displs, MPI_DOUBLE, 0, VISIT_MPI_COMM);
delete [] recvCnts;
delete [] displs;
delete [] outputData;
outputData = result;
}
else
{
MPI_Gatherv(outputData, nVals, MPI_DOUBLE, NULL, NULL, NULL, MPI_DOUBLE, 0, VISIT_MPI_COMM);
}
#endif
#if 0
int sz = numTuples*outputValsPerLoc;
double *in = new double[sz], *sum = new double[sz];
......@@ -774,6 +985,7 @@ avtRPOTFilter::CreateFinalOutput()
outputData[i] = sum[i];
delete [] in;
delete [] sum;
#endif
#endif
if (PAR_Rank() == 0 && atts.GetDumpData())
......@@ -1517,7 +1729,7 @@ avtRPOTFilter::SetExceedenceData(int loc,
monthIndices->SetNumberOfTuples(numExceedences);
yearIndices->SetNumberOfComponents(1);
yearIndices->SetNumberOfTuples(numExceedences);
int idx = 0;
for (int b = 0; b < numBins; b++)
{
......@@ -1539,7 +1751,7 @@ avtRPOTFilter::SetExceedenceData(int loc,
else
monthIndices->SetValue(idx, -1);
yearIndices->SetValue(idx, GetYearFromDay(values[loc][b][t].Cycle)+atts.GetDataYearBegin());
idx++;
if (idx == numExceedences)
break;
......@@ -1771,3 +1983,51 @@ avtRPOTFilter::DebugData(int loc, std::string nm)
}
int
avtRPOTFilter::RankForLocation(int loc)
{
#ifdef PARALLEL
int rank = PAR_Rank();
int nProcs = PAR_Size();
int nSamplesPerProc = (numTuples / nProcs);
int oneExtraUntil = (numTuples % nProcs);
if (loc < (oneExtraUntil*(nSamplesPerProc+1)))
return loc/(nSamplesPerProc+1);
else
{
loc -= (oneExtraUntil*(nSamplesPerProc+1));
return loc/nSamplesPerProc + oneExtraUntil;
}
#else
return 0;
#endif
}
std::pair<int,int>
avtRPOTFilter::LocationsForRank(int rank)
{
int i0 = 0;
int i1 = numTuples;
#ifdef PARALLEL
int nProcs = PAR_Size();
int nSamplesPerProc = (numTuples / nProcs);
int oneExtraUntil = (numTuples % nProcs);
if (rank < oneExtraUntil)
{
i0 = (rank)*(nSamplesPerProc+1);
i1 = (rank+1)*(nSamplesPerProc+1);
}
else
{
i0 = (rank)*(nSamplesPerProc) + oneExtraUntil;
i1 = (rank+1)*(nSamplesPerProc) + oneExtraUntil;
}
#else
#endif
pair<int, int> locs(i0,i1);
return locs;
}
......@@ -139,6 +139,12 @@ class AVTFILTERS_API avtRPOTFilter : virtual public avtDatasetToDatasetFilter,
void DebugData(int loc, std::string nm);
int daysPerYear, dayCountAtMonthEnd[12];
int RankForLocation(int loc);
std::pair<int,int> LocationsForRank(int rank);
void ExchangeData();
void ExchangeData0();
};
#endif
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment