avtParICAlgorithm.C 38.9 KB
Newer Older
1 2
/*****************************************************************************
*
brugger's avatar
 
brugger committed
3
* Copyright (c) 2000 - 2010, Lawrence Livermore National Security, LLC
4
* Produced at the Lawrence Livermore National Laboratory
brugger's avatar
 
brugger committed
5
* LLNL-CODE-400124
6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39
* All rights reserved.
*
* This file is  part of VisIt. For  details, see https://visit.llnl.gov/.  The
* full copyright notice is contained in the file COPYRIGHT located at the root
* of the VisIt distribution or at http://www.llnl.gov/visit/copyright.html.
*
* Redistribution  and  use  in  source  and  binary  forms,  with  or  without
* modification, are permitted provided that the following conditions are met:
*
*  - Redistributions of  source code must  retain the above  copyright notice,
*    this list of conditions and the disclaimer below.
*  - Redistributions in binary form must reproduce the above copyright notice,
*    this  list of  conditions  and  the  disclaimer (as noted below)  in  the
*    documentation and/or other materials provided with the distribution.
*  - Neither the name of  the LLNS/LLNL nor the names of  its contributors may
*    be used to endorse or promote products derived from this software without
*    specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT  HOLDERS AND CONTRIBUTORS "AS IS"
* AND ANY EXPRESS OR  IMPLIED WARRANTIES, INCLUDING,  BUT NOT  LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND  FITNESS FOR A PARTICULAR  PURPOSE
* ARE  DISCLAIMED. IN  NO EVENT  SHALL LAWRENCE  LIVERMORE NATIONAL  SECURITY,
* LLC, THE  U.S.  DEPARTMENT OF  ENERGY  OR  CONTRIBUTORS BE  LIABLE  FOR  ANY
* DIRECT,  INDIRECT,   INCIDENTAL,   SPECIAL,   EXEMPLARY,  OR   CONSEQUENTIAL
* DAMAGES (INCLUDING, BUT NOT  LIMITED TO, PROCUREMENT OF  SUBSTITUTE GOODS OR
* SERVICES; LOSS OF  USE, DATA, OR PROFITS; OR  BUSINESS INTERRUPTION) HOWEVER
* CAUSED  AND  ON  ANY  THEORY  OF  LIABILITY,  WHETHER  IN  CONTRACT,  STRICT
* LIABILITY, OR TORT  (INCLUDING NEGLIGENCE OR OTHERWISE)  ARISING IN ANY  WAY
* OUT OF THE  USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH
* DAMAGE.
*
*****************************************************************************/

// ************************************************************************* //
40
//                              avtParICAlgorithm.C                          //
41 42
// ************************************************************************* //

43
#include "avtParICAlgorithm.h"
44 45 46

#include <avtStateRecorderIntegralCurve.h>

47
#include <TimingsManager.h>
48
#include <VisItException.h>
49 50 51 52 53

using namespace std;

#ifdef PARALLEL

54 55
int avtParICAlgorithm::STATUS_TAG =  420000;
int avtParICAlgorithm::STREAMLINE_TAG = 420001;
56 57

// ****************************************************************************
58
//  Method: avtParICAlgorithm::avtParICAlgorithm
59 60
//
//  Purpose:
61
//      avtParICAlgorithm constructor.
62 63 64 65
//
//  Programmer: Dave Pugmire
//  Creation:   January 27, 2009
//
66 67
//   Dave Pugmire, Mon Mar 23 12:48:12 EDT 2009
//   Change how timings are reported/calculated.
68 69 70
//
//   Dave Pugmire, Wed Apr  1 11:21:05 EDT 2009
//   Limit the number of async recvs outstanding.
71
//   
72 73
// ****************************************************************************

74 75 76
avtParICAlgorithm::avtParICAlgorithm(avtPICSFilter *icFilter)
    : avtICAlgorithm(icFilter),
      CommTime("comT"), MsgCnt("msgC"), ICCommCnt("iccC"), BytesCnt("byteC")
77 78 79
{
    nProcs = PAR_Size();
    rank = PAR_Rank();
80 81 82
    msgID = 0;
    statusMsgSz = -1;
    numAsyncRecvs = -1;
83
    icMsgSz = 10*1024*1024;
84 85 86
}

// ****************************************************************************
87
//  Method: avtParICAlgorithm::~avtParICAlgorithm
88 89
//
//  Purpose:
90
//      avtParICAlgorithm destructor.
91 92 93 94 95 96
//
//  Programmer: Dave Pugmire
//  Creation:   January 27, 2009
//
// ****************************************************************************

97
avtParICAlgorithm::~avtParICAlgorithm()
98 99 100 101
{
}

// ****************************************************************************
102
//  Method: avtParICAlgorithm::Initialize
103 104 105 106 107 108 109
//
//  Purpose:
//      Initialize the request buffers.
//
//  Programmer: Dave Pugmire
//  Creation:   January 27, 2009
//
110 111 112 113 114
//  Modifications:
//
//   Dave Pugmire, Wed Apr  1 11:21:05 EDT 2009
//   Limit the number of async recvs outstanding.
//
115 116 117
//   Hank Childs, Fri Jun  4 19:58:30 CDT 2010
//   Use avtStreamlines, not avtStreamlineWrappers.
//
118 119 120
//   Hank Childs, Mon Jun  7 14:57:13 CDT 2010
//   Rename to InitializeBuffers to prevent name collision.
//
121 122 123
// ****************************************************************************

void
124 125 126
avtParICAlgorithm::InitializeBuffers(vector<avtIntegralCurve *> &seedPts,
                                     int msgSz,
                                     int numRecvs)
127
{
128 129 130 131 132 133 134
    //Standardmsg + 1(sender rank) +1(msg ID).
    statusMsgSz = msgSz+1+1;
    numAsyncRecvs = numRecvs;
    
    if (statusMsgSz <= 0 || numAsyncRecvs <= 0)
        EXCEPTION0(ImproperUseException);
    
135
    avtICAlgorithm::Initialize(seedPts);
136 137 138
    InitRequests();
}

pugmire's avatar
pugmire committed
139 140

// ****************************************************************************
141
//  Method: avtParICAlgorithm::PostRunAlgorithm
pugmire's avatar
pugmire committed
142 143
//
//  Purpose:
144 145
//      Carry out whatever communication pattern is necessary to get the 
//      integral curves in their intended location.
pugmire's avatar
pugmire committed
146 147 148 149
//
//  Programmer: Dave Pugmire
//  Creation:   September 24, 2009
//
150 151 152 153 154
//  Modifications:
//
//    Hank Childs, Tue Jun  8 09:30:45 CDT 2010
//    Add infrastructure to support new communication patterns.
//
pugmire's avatar
pugmire committed
155 156 157
// ****************************************************************************

void
158
avtParICAlgorithm::PostRunAlgorithm()
pugmire's avatar
pugmire committed
159
{
160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182
    // We are enumerating the possible communication styles and then just
    // calling the correct one.  This is an okay solution if there are a small
    // number of styles (which there are right now).  That said, it would be
    // perfectly fine if someone wanted to make this more extensible to handle
    // a wide array of communication patterns ... it just didn't seem worth
    // the effort when this was implemented.

    avtPICSFilter::CommunicationPattern pattern = 
                                         picsFilter->GetCommunicationPattern();
 
    if (pattern == avtPICSFilter::RestoreSequence)
        RestoreIntegralCurveSequence();
    else if (pattern == avtPICSFilter::LeaveOnCurrentProcessor)
        ;
    else if (pattern == avtPICSFilter::ReturnToOriginatingProcessor)
    { 
        EXCEPTION1(VisItException, 
                   "This communication pattern has not been implemented."); 
    }
    else
    { 
        EXCEPTION1(VisItException, "Undefined communication pattern");
    }
pugmire's avatar
pugmire committed
183 184
}

185
// ****************************************************************************
186
//  Method: avtParICAlgorithm::PostExecute
187 188 189 190 191 192 193 194 195 196
//
//  Purpose:
//      Cleanup.
//
//  Programmer: Dave Pugmire
//  Creation:   January 27, 2009
//
// ****************************************************************************

void
197
avtParICAlgorithm::PostExecute()
198 199
{
    CleanupAsynchronous();
200
    avtICAlgorithm::PostExecute();
201 202 203
}

// ****************************************************************************
204
//  Method: avtParICAlgorithm::InitRequests
205 206 207 208 209 210 211
//
//  Purpose:
//      Initialize the request buffers.
//
//  Programmer: Dave Pugmire
//  Creation:   June 16, 2008
//
212 213 214
//   Dave Pugmire, Wed Apr  1 11:21:05 EDT 2009
//   Limit the number of async recvs outstanding.
//
215 216 217
//   Hank Childs, Fri Jun  4 19:58:30 CDT 2010
//   Use avtStreamlines, not avtStreamlineWrappers.
//
218 219 220
// ****************************************************************************

void
221
avtParICAlgorithm::InitRequests()
222
{
223
    debug5<<"avtParICAlgorithm::InitRequests() sz= "<<numAsyncRecvs<<endl;
224
    statusRecvRequests.resize(numAsyncRecvs, MPI_REQUEST_NULL);
225
    icRecvRequests.resize(numAsyncRecvs, MPI_REQUEST_NULL);
226 227
    
    for (int i = 0; i < statusRecvRequests.size(); i++)
228
    {
229
        PostRecvStatusReq(i);
230
        PostRecvICReq(i);
231 232 233
    }
}

pugmire's avatar
pugmire committed
234
static int
235
CountIDs(list<avtIntegralCurve *> &l, int id)
pugmire's avatar
pugmire committed
236 237
{
    int cnt = 0;
238
    list<avtIntegralCurve*>::const_iterator si = l.begin();
pugmire's avatar
pugmire committed
239 240 241 242 243 244 245 246 247
    for (si = l.begin(); si != l.end(); si++)
    {
        if ((*si)->id == id)
            cnt++;
    }
    return cnt;
}

// ****************************************************************************
248
//  Method: avtParICAlgorithm::RestoreIntegralCurveSequence
pugmire's avatar
pugmire committed
249 250 251 252 253 254 255 256 257 258 259
//
//  Purpose:
//      Communicate streamlines pieces to destinations.
//      When a streamline is communicated, only the state information is sent.
//      All the integration steps need to sent to the proc that owns the terminated
//      streamline.  This method figures out where each streamline has terminated and
//      sends all the pieces there.
//
//  Programmer: Dave Pugmire
//  Creation:   September 21, 2009
//
260 261 262 263 264
//  Modifications:
//
//   Hank Childs, Fri Jun  4 19:58:30 CDT 2010
//   Use avtStreamlines, not avtStreamlineWrappers.
//
265 266 267
//   Hank Childs, Tue Jun  8 09:30:45 CDT 2010
//   Rename method, as we plan to add more communication methods.
//
pugmire's avatar
pugmire committed
268 269 270
// ****************************************************************************

void
271
avtParICAlgorithm::RestoreIntegralCurveSequence()
pugmire's avatar
pugmire committed
272
{
273 274 275
    debug5<<"RestoreIntegralCurveSequence: communicatedICs: "
          <<communicatedICs.size()
          <<" terminatedICs: "<<terminatedICs.size()<<endl;
pugmire's avatar
pugmire committed
276 277 278 279 280 281 282 283 284 285 286

    //Communicate to everyone where the terminators are located.
    //Do this "N" streamlines at a time, so we don't have a super big buffer.
    int N;
    if (numSeedPoints > 500)
        N = 500;
    else
        N = numSeedPoints;
    
    long *idBuffer = new long[2*N], *myIDs = new long[2*N];

287
    //Sort the terminated/communicated ICs by id.
288 289
    terminatedICs.sort(avtStateRecorderIntegralCurve::IdSeqCompare);
    communicatedICs.sort(avtStateRecorderIntegralCurve::IdSeqCompare);
pugmire's avatar
pugmire committed
290

291
    vector<vector<avtIntegralCurve *> >sendICs(N);
pugmire's avatar
pugmire committed
292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307
    vector<int> owners(N);
    
    int minId = 0;
    int maxId = N-1;
    int nLoops = numSeedPoints/N;
    if (numSeedPoints % N != 0)
        nLoops++;
    for (int l = 0; l < nLoops; l++)
    {
        //Initialize arrays for this round.
        for (int i = 0; i < N; i++)
        {
            idBuffer[i] = 0;
            idBuffer[i+N] = 0;
            myIDs[i] = 0;
            myIDs[i+N] = 0;
308
            sendICs[i].resize(0);
pugmire's avatar
pugmire committed
309 310 311
            owners[i] = 0;
        }

312 313 314
        //Set array for ICs that terminated here. Update sequence counts for communicated
        //ICs.
        list<avtIntegralCurve*>::iterator t = terminatedICs.begin();
315
        while (t != terminatedICs.end() && (*t)->id <= maxId)
pugmire's avatar
pugmire committed
316 317 318 319 320 321
        {
            if ((*t)->id >= minId)
            {
                int idx = (*t)->id % N;
                myIDs[idx] = rank;
                myIDs[idx+N] += 1;
322
                debug5<<"I own id= "<<(*t)->id<<" "<<(((avtStateRecorderIntegralCurve *)*t))->sequenceCnt<<" idx= "<<idx<<endl;
pugmire's avatar
pugmire committed
323 324 325 326 327
            }

            t++;
        }
        
328 329
        list<avtIntegralCurve*>::const_iterator c = communicatedICs.begin();
        while (c != communicatedICs.end() && (*c)->id <= maxId)
pugmire's avatar
pugmire committed
330 331 332 333 334
        {
            if ((*c)->id >= minId)
            {
                int idx = (*c)->id % N;
                myIDs[idx+N] += 1;
335
                debug5<<"I have "<<(*c)->id<<" "<<(((avtStateRecorderIntegralCurve *)*c))->sequenceCnt<<" idx= "<<idx<<endl;
pugmire's avatar
pugmire committed
336 337 338 339 340 341 342 343 344 345 346 347 348 349
            }
            c++;
        }
        
        //Exchange ID owners and sequence counts.
        MPI_Allreduce(myIDs, idBuffer, 2*N, MPI_LONG, MPI_SUM, VISIT_MPI_COMM);
        if (0)
        {
            debug5<<"idBuffer:  [";
            for(int i=0; i<2*N;i++)
                debug5<<idBuffer[i]<<" ";
            debug5<<"]"<<endl;
        }
        
350 351 352
        //Now we know where all ICs belong and how many sequences for each.
        //Send communicatedICs to the owners.
        while (!communicatedICs.empty())
pugmire's avatar
pugmire committed
353
        {
354
            avtIntegralCurve *s = communicatedICs.front();
pugmire's avatar
pugmire committed
355 356 357 358 359
            if (s->id <= maxId)
            {
                int idx = s->id%N;
                int owner = idBuffer[idx];
                if (owner == rank)
360
                    terminatedICs.push_back(s);
pugmire's avatar
pugmire committed
361 362
                else
                {
363
                    ((avtStateRecorderIntegralCurve *)s)->serializeFlags = avtIntegralCurve::SERIALIZE_STEPS; //Write IC steps.
364
                    sendICs[idx].push_back(s);
pugmire's avatar
pugmire committed
365 366
                    owners[idx] = owner;
                }
367
                communicatedICs.pop_front();
pugmire's avatar
pugmire committed
368 369 370 371 372 373 374
            }
            else
                break;
        }
        
        for (int i = 0; i < N; i++)
        {
375
            if (sendICs[i].size() > 0)
pugmire's avatar
pugmire committed
376
            {
377
                DoSendICs(owners[i], sendICs[i]);
pugmire's avatar
pugmire committed
378

379 380
                for (int j = 0; j < sendICs[i].size(); j++)
                    delete sendICs[i][j];
pugmire's avatar
pugmire committed
381 382 383 384
            }
        }
        
        //Wait for all the sequences to arrive. The total number is known for
385
        //each IC, so wait until they all come.
pugmire's avatar
pugmire committed
386 387 388
        bool seqGathered = false;
        while (!seqGathered)
        {
389
            RecvICs(terminatedICs);
pugmire's avatar
pugmire committed
390 391
            
            //See if we have all the sequences we need.
392
            terminatedICs.sort(avtStateRecorderIntegralCurve::IdSeqCompare);
pugmire's avatar
pugmire committed
393 394 395
            bool needMore = false;
            for (int i = 0; i < N && !needMore; i++)
                if (idBuffer[i] == rank)
396
                    needMore = (CountIDs(terminatedICs, i+minId) < idBuffer[i+N]);
pugmire's avatar
pugmire committed
397 398 399 400 401 402 403 404 405 406
            
            //Everyone done.
            seqGathered = !needMore;
        }
        
        //Advance to next N streamlines.
        maxId += N;
        minId += N;
    }

407 408
    //All ICs are distributed, merge the sequences into single streamlines.
    MergeTerminatedICSequences();
pugmire's avatar
pugmire committed
409 410 411 412 413 414
    
    delete [] idBuffer;
    delete [] myIDs;
}

// ****************************************************************************
415
//  Method: avtParICAlgorithm::MergeTerminatedICSequences
pugmire's avatar
pugmire committed
416 417 418 419 420 421 422
//
//  Purpose:
//      Merge streamline sequences.
//
//  Programmer: Dave Pugmire
//  Creation:   Sept 21, 2009
//
423 424 425 426 427
//  Modifications:
//
//   Hank Childs, Fri Jun  4 19:58:30 CDT 2010
//   Use avtStreamlines, not avtStreamlineWrappers.
//
428 429 430
//   Hank Childs, Tue Jun  8 09:30:45 CDT 2010
//   Reflect movement of some routines to state recorder IC class.
//
pugmire's avatar
pugmire committed
431 432 433
// ****************************************************************************

void
434
avtParICAlgorithm::MergeTerminatedICSequences()
pugmire's avatar
pugmire committed
435 436
{
    //Sort them by id and sequence so we can process them one at a time.
437
    terminatedICs.sort(avtStateRecorderIntegralCurve::IdSeqCompare);
pugmire's avatar
pugmire committed
438 439

    //Split them up into sequences.
440
    vector<vector<avtIntegralCurve *> > seqs;
441
    while (!terminatedICs.empty())
pugmire's avatar
pugmire committed
442
    {
443
        avtIntegralCurve *s = terminatedICs.front();
444
        terminatedICs.pop_front();
pugmire's avatar
pugmire committed
445 446 447 448 449
        
        //Empty or new ID, add a new entry.
        if (seqs.size() == 0 ||
            seqs[seqs.size()-1][0]->id != s->id)
        {
450
            vector<avtIntegralCurve *> v;
pugmire's avatar
pugmire committed
451 452 453 454 455 456 457 458
            v.push_back(s);
            seqs.push_back(v);
        }
        else
        {
            seqs[seqs.size()-1].push_back(s);
        }
    }
459
    terminatedICs.clear();
pugmire's avatar
pugmire committed
460 461 462 463
    
    //Merge the sequences together, put them into terminated list.
    for (int i = 0; i < seqs.size(); i++)
    {
464 465
        avtIntegralCurve *s = 
            avtStateRecorderIntegralCurve::MergeIntegralCurveSequence(seqs[i]);
466
        terminatedICs.push_back(s);
pugmire's avatar
pugmire committed
467 468 469
    }
}

470
// ****************************************************************************
471
//  Method: avtParICAlgorithm::CleanupAsynchronous
472 473 474 475 476 477 478 479
//
//  Purpose:
//      Claenup the buffers used when doing asynchronous processing.
//
//  Programmer: Dave Pugmire
//  Creation:   June 16, 2008
//
// ****************************************************************************
pugmire's avatar
pugmire committed
480

481
void
482
avtParICAlgorithm::CleanupAsynchronous()
483 484 485 486 487 488 489 490
{
    for (int i = 0; i < statusRecvRequests.size(); i++)
    {
        MPI_Request req = statusRecvRequests[i];
        if (req != MPI_REQUEST_NULL)
            MPI_Cancel(&req);
    } 

491
    for (int i = 0; i < icRecvRequests.size(); i++)
492
    {
493
        MPI_Request req = icRecvRequests[i];
494 495 496 497 498 499
        if (req != MPI_REQUEST_NULL)
            MPI_Cancel(&req);
    }

    // Cleanup recv buffers.
    map<MPI_Request, unsigned char*>::const_iterator it;
500
    for (it = recvICBufferMap.begin(); it != recvICBufferMap.end(); ++it)
501 502 503 504 505
    {
        char *buff = (char *)it->second;
        if (it->second != NULL)
            delete [] it->second;
    }
506
    recvICBufferMap.clear();
507 508 509 510 511 512 513 514 515 516 517 518 519

    map<MPI_Request, int*>::const_iterator itt;
    for (itt = recvIntBufferMap.begin(); itt != recvIntBufferMap.end(); ++itt)
    {
        char *buff = (char *)itt->second;
        if (itt->second != NULL)
            delete [] itt->second;
    }
    recvIntBufferMap.clear();
}


// ****************************************************************************
520
//  Method: avtParICAlgorithm::CheckPendingSendRequests
521 522 523 524 525 526 527
//
//  Purpose:
//      Check to see if there are any pending send requests.
//
//  Programmer: Dave Pugmire
//  Creation:   June 16, 2008
//
528 529 530 531 532
//  Modifications:
//
//    Dave Pugmire, Wed Mar 18 17:07:07 EDT 2009
//    Delete entry from map after send is complete.
//
533 534
//   Dave Pugmire, Mon Mar 23 12:48:12 EDT 2009
//   Change how timings are reported/calculated.
535 536 537
//
//   Dave Pugmire, Sat Mar 28 22:21:49 EDT 2009
//   Bug fix. "notCompleted" wasn't in an else clause for the INT messages.
538
//   
539 540 541
//   Hank Childs, Sat Feb 20 16:53:18 CST 2010
//   Don't output timing values to the timing logs.
//
542 543
// ****************************************************************************
void
544
avtParICAlgorithm::CheckPendingSendRequests()
545
{
546
    debug5<<"avtParICAlgorithm::CheckPendingSendRequests()\n";
547 548
    int communicationTimer = visitTimer->StartTimer();
    
549
    if (sendICBufferMap.size() > 0)
550 551 552 553 554
    {
        vector<MPI_Request> req, copy;

        int notCompleted = 0;
        map<MPI_Request, unsigned char*>::const_iterator it;
555
        for (it = sendICBufferMap.begin(); it != sendICBufferMap.end(); ++it)
556 557 558 559 560 561 562 563 564 565
        {
            if (it->first != MPI_REQUEST_NULL && it->second != NULL)
            {
                req.push_back(it->first);
                copy.push_back(it->first);
            }
            else
                notCompleted++;
        }

566
        debug5 << "\tCheckPendingSendRequests() IC completed = "<<req.size()
567 568 569 570 571 572 573 574 575 576 577 578 579
               <<" not completed: "<<notCompleted<<endl;

        if (req.size() > 0)
        {
            // See if any sends have completed. Delete buffers if they have.
            int num = 0, *indices = new int[req.size()];
            MPI_Status *status = new MPI_Status[req.size()];
            int err = MPI_Testsome(req.size(), &req[0], &num, indices, status);
            
            for (int i = 0; i < num; i++)
            {
                int idx = indices[i];
                MPI_Request r = copy[idx];
580
                unsigned char *buff = sendICBufferMap[r];
581 582 583 584
                debug5 << "\tidx = " << idx << " r = " << r << " buff = " 
                       << (void *)buff << endl;
                if (buff)
                    delete [] buff;
585

586 587
                sendICBufferMap[r] = NULL;
                sendICBufferMap.erase(r);
588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607
            }
            
            delete [] indices;
            delete [] status;
        }
    }

    if (sendIntBufferMap.size() > 0)
    {
        vector<MPI_Request> req, copy;
        map<MPI_Request, int*>::const_iterator it;
        int notCompleted = 0;

        for (it = sendIntBufferMap.begin(); it != sendIntBufferMap.end(); ++it)
        {
            if (it->first != MPI_REQUEST_NULL && it->second != NULL)
            {
                req.push_back(it->first);
                copy.push_back(it->first);
            }
608 609
            else
                notCompleted++;
610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630
        }

        debug5 << "\tCheckPendingSendRequests() INT completed = "<<req.size()
               <<" not completed: "<<notCompleted<<endl;
        
        if (req.size() > 0)
        {
            // See if any sends have completed. Delete buffers if they have.
            int num = 0, *indices = new int[req.size()];
            MPI_Status *status = new MPI_Status[req.size()];
            int err = MPI_Testsome(req.size(), &req[0], &num, indices, status);
            
            for (int i = 0; i < num; i++)
            {
                int idx = indices[i];
                MPI_Request r = copy[idx];
                int *buff = sendIntBufferMap[r];
                debug5 << "\tidx = " << idx << " r = " << r << " buff = " 
                       << (void *)buff << endl;
                if (buff)
                    delete [] buff;
631

632
                sendIntBufferMap[r] = NULL;
633
                sendIntBufferMap.erase(r);
634 635 636 637 638 639 640
            }
            
            delete [] indices;
            delete [] status;
        }
    }

641 642
    bool nov = visitTimer->GetNeverOutputValue();
    visitTimer->NeverOutput(true);
643 644
    CommTime.value += visitTimer->StopTimer(communicationTimer, 
                                            "CheckPending");
645
    visitTimer->NeverOutput(nov);
646 647 648 649
    debug5 << "DONE  CheckPendingSendRequests()\n";
}

// ****************************************************************************
650
//  Method: avtParICAlgorithm::PostRecvStatusReq
651 652 653 654 655 656 657
//
//  Purpose:
//      Receives status requests.
//
//  Programmer: Dave Pugmire
//  Creation:   June 16, 2008
//
658 659 660
//   Dave Pugmire, Wed Apr  1 11:21:05 EDT 2009
//   Limit the number of async recvs outstanding.
//
661 662 663
// ****************************************************************************

void
664
avtParICAlgorithm::PostRecvStatusReq(int idx)
665 666 667
{
    MPI_Request req;
    int *buff = new int[statusMsgSz];
668 669

    MPI_Irecv(buff, statusMsgSz, MPI_INT, MPI_ANY_SOURCE,
670
              avtParICAlgorithm::STATUS_TAG,
671
              VISIT_MPI_COMM, &req);
672 673
    debug5 << "Post Statusrecv " <<idx<<" req= "<<req<<endl;
    statusRecvRequests[idx] = req;
674 675 676 677
    recvIntBufferMap[req] = buff;
}

// ****************************************************************************
678
//  Method: avtParICAlgorithm::PostRecvICReq
679 680 681 682 683 684 685
//
//  Purpose:
//      Receives status requests.
//
//  Programmer: Dave Pugmire
//  Creation:   June 16, 2008
//
686 687 688 689 690
//  Modifications:
//
//   Dave Pugmire, Wed Apr  1 11:21:05 EDT 2009
//   Limit the number of async recvs outstanding.
//
691 692 693
// ****************************************************************************

void
694
avtParICAlgorithm::PostRecvICReq(int idx)
695 696
{
    MPI_Request req;
697 698
    unsigned char *buff = new unsigned char[icMsgSz];
    MPI_Irecv(buff, icMsgSz,
699
              MPI_UNSIGNED_CHAR, MPI_ANY_SOURCE,
700
              avtParICAlgorithm::STREAMLINE_TAG, 
701 702
              VISIT_MPI_COMM, &req);

703 704 705
    debug5 << "Post ICrecv " <<idx<<" req= "<<req<<endl;
    icRecvRequests[idx] = req;
    recvICBufferMap[req] = buff;
706 707 708 709
}


// ****************************************************************************
710
//  Method: avtParICAlgorithm::SendMsg
711 712 713 714 715 716 717 718 719
//
//  Purpose:
//      Send an asynchronous message.
//
//  Programmer: Dave Pugmire
//  Creation:   Dec 18, 2008
//
// Modifications:
//
720 721
//   Dave Pugmire, Wed Apr  1 11:21:05 EDT 2009
//   Add the senders rank and msgID to the front of the message.
722
//
723 724 725
//   Hank Childs, Sat Feb 20 16:53:18 CST 2010
//   Don't output timing values to the timing logs.
//
726 727 728
// ****************************************************************************

void
729
avtParICAlgorithm::SendMsg(int dst,
730 731 732 733 734 735 736
                           vector<int> &msg)
{
    int communicationTimer = visitTimer->StartTimer();
    if (msg.size() > statusMsgSz)
        EXCEPTION0(ImproperUseException);
    
    int *buff = new int[statusMsgSz];
737 738 739
    buff[0] = msgID;
    msgID++;
    buff[1] = rank;
740 741 742
    
    MPI_Request req;
    for (int i = 0; i < msg.size(); i++)
743 744 745 746 747
        buff[2+i] = msg[i];

    debug5<<"SendMsg to :"<<dst<<" [";
    for(int i = 0; i < statusMsgSz; i++) debug5<<buff[i]<<" ";
    debug5<<"]"<<endl;
748
        
749
    int err = MPI_Isend(buff, statusMsgSz, MPI_INT, dst,
750
                        avtParICAlgorithm::STATUS_TAG,
751
                        VISIT_MPI_COMM, &req);
752 753 754 755 756

    sendIntBufferMap[req] = buff;
    
    BytesCnt.value += (sizeof(int) *statusMsgSz);
    MsgCnt.value++;
757 758
    bool nov = visitTimer->GetNeverOutputValue();
    visitTimer->NeverOutput(true);
759 760
    CommTime.value += visitTimer->StopTimer(communicationTimer, 
                                            "SendMsg");
761 762
    visitTimer->NeverOutput(nov);
    debug5 << "DONE  CheckPendingSendRequests()\n";
763 764 765
}

// ****************************************************************************
766
//  Method: avtParICAlgorithm::SendAllMsg
767 768 769 770 771 772 773 774 775 776 777 778 779
//
//  Purpose:
//      Broadcast a message.
//
//  Programmer: Dave Pugmire
//  Creation:   Dec 18, 2008
//
// Modifications:
//
//
// ****************************************************************************

void
780
avtParICAlgorithm::SendAllMsg(vector<int> &msg)
781 782 783 784 785 786 787
{
    for (int i = 0; i < nProcs; i++)
        if (i != rank)
            SendMsg(i, msg);
}

// ****************************************************************************
788
//  Method: avtParICAlgorithm::RecvMsgs
789 790 791 792 793 794 795 796 797
//
//  Purpose:
//      Recieve any messages.
//
//  Programmer: Dave Pugmire
//  Creation:   Dec 18, 2008
//
// Modifications:
//
798 799
//   Dave Pugmire, Mon Mar 23 12:48:12 EDT 2009
//   Change how timings are reported/calculated.
800 801 802
//
//   Dave Pugmire, Wed Apr  1 11:21:05 EDT 2009
//   Senders rank and msgID is in the message now.
803
//   
804 805 806
//   Hank Childs, Sat Feb 20 16:53:18 CST 2010
//   Don't output timing values to the timing logs.
//
807 808 809
// ****************************************************************************

void
810
avtParICAlgorithm::RecvMsgs(std::vector<std::vector<int> > &msgs)
811
{
812
    debug5<<"avtParICAlgorithm::RecvMsgs()\n";
813
    int communicationTimer = visitTimer->StartTimer();
814 815 816 817 818 819 820 821 822 823 824 825
    
    msgs.resize(0);
    while (true)
    {
        int nReq = statusRecvRequests.size();
        MPI_Status *status = new MPI_Status[nReq];
        int *indices = new int[nReq];
        int num = 0, err;

        vector<MPI_Request> copy;
        for (int i = 0; i < statusRecvRequests.size(); i++)
            copy.push_back(statusRecvRequests[i]);
826

827
        err = MPI_Testsome(nReq, &copy[0], &num, indices, status);
828
        debug5<<"::RecvMsgs() err= "<<err<<" Testsome("<<nReq<<"); num= "<<num<<endl;
829 830 831 832 833 834

        if (num > 0)
        {
            for (int i = 0; i < num; i++)
            {
                int idx = indices[i];
835
                debug5<<"RecvMsg from "<<idx<<endl;
836 837 838 839 840 841 842 843 844 845

                MPI_Request req = statusRecvRequests[idx];
                if (req == MPI_REQUEST_NULL)
                    continue;
                
                int *buff = recvIntBufferMap[req];
                recvIntBufferMap.erase(req);
                if (buff == NULL)
                    continue;

846 847 848 849 850
                debug5<<"RecvMsg: [";
                for(int i = 0; i < statusMsgSz; i++) debug5<<buff[i]<<" ";
                debug5<<"]"<<endl;
                
                //Skip msg ID, copy buffer int msg.
851
                vector<int> msg;
852
                for (int i = 1; i < statusMsgSz; i++)
853 854 855 856 857 858 859 860 861 862 863 864 865 866 867 868 869
                    msg.push_back(buff[i]);
                msgs.push_back(msg);

                //Clean up.
                delete [] buff;
            }
        
            //Repost recv requests.
            for (int i = 0; i < num; i++)
                PostRecvStatusReq(indices[i]);
        }
            
        delete [] status;
        delete [] indices;
        if (num == 0)
            break;
    }
870 871 872
    bool nov = visitTimer->GetNeverOutputValue();
    visitTimer->NeverOutput(true);
    debug5 << "DONE  CheckPendingSendRequests()\n";
873 874
    CommTime.value += visitTimer->StopTimer(communicationTimer,
                                            "RecvMsgs");
875
    visitTimer->NeverOutput(nov);
876 877 878
}

// ****************************************************************************
879
//  Method: avtParICAlgorithm::SendICs
880 881 882 883 884 885 886 887 888 889 890 891
//
//  Purpose:
//      Send streamlines to a dst.
//
//  Programmer: Dave Pugmire
//  Creation:   June 16, 2008
//
//  Modifications:
//
//   Dave Pugmire, Fri Aug 22 14:47:11 EST 2008
//   Memory leak fix.
//
pugmire's avatar
pugmire committed
892
//   Dave Pugmire, Thu Sep 24 14:03:46 EDT 2009
893
//   Call new method, DoSendICs.
pugmire's avatar
pugmire committed
894
//
895 896 897
//   Hank Childs, Fri Jun  4 19:58:30 CDT 2010
//   Use avtStreamlines, not avtStreamlineWrappers.
//
898 899 900 901
//   Hank Childs, Tue Jun  8 09:30:45 CDT 2010
//   Use virtual methods to reduce dependence on a specific communication
//   pattern.
//
902 903 904
// ****************************************************************************

void
905
avtParICAlgorithm::SendICs(int dst, vector<avtIntegralCurve*> &ics)
906
{
pugmire's avatar
pugmire committed
907

908
    for (int i = 0; i < ics.size(); i++)
pugmire's avatar
pugmire committed
909
    {
910
        avtIntegralCurve *ic = ics[i];
911
        ic->PrepareForSend();
pugmire's avatar
pugmire committed
912 913
    }

914
    if (DoSendICs(dst, ics))
pugmire's avatar
pugmire committed
915
    {
916
        for (int i = 0; i < ics.size(); i++)
pugmire's avatar
pugmire committed
917
        {
918
            avtIntegralCurve *ic = ics[i];
pugmire's avatar
pugmire committed
919 920
            
            //Add if id/seq is unique. (single streamlines can be sent to multiple dst).
921
            list<avtIntegralCurve*>::const_iterator si = communicatedICs.begin();
pugmire's avatar
pugmire committed
922
            bool found = false;
923
            for (si = communicatedICs.begin(); !found && si != communicatedICs.end(); si++)
924
                found = (*si)->SameCurve(ic);
pugmire's avatar
pugmire committed
925 926
        
            if (!found)
927
                communicatedICs.push_back(ic);
pugmire's avatar
pugmire committed
928 929 930
        }
        
        //Empty the array.
931
        ics.resize(0);
pugmire's avatar
pugmire committed
932 933 934 935 936
    }
}


// ****************************************************************************
937
//  Method: avtParICAlgorithm::DoSendICs
pugmire's avatar
pugmire committed
938 939 940 941 942 943 944 945 946
//
//  Purpose:
//      Send streamlines to a dst.
//
//  Programmer: Dave Pugmire
//  Creation:   September 24, 2009
//
//  Modifications:
//
947 948 949
//   Hank Childs, Sat Feb 20 16:53:18 CST 2010
//   Don't output timing values to the timing logs.
//
950 951 952
//   Hank Childs, Fri Jun  4 19:58:30 CDT 2010
//   Use avtStreamlines, not avtStreamlineWrappers.
//
pugmire's avatar
pugmire committed
953 954 955
// ****************************************************************************

bool
956 957
avtParICAlgorithm::DoSendICs(int dst, 
                             vector<avtIntegralCurve*> &ics)
pugmire's avatar
pugmire committed
958
{
959
    if (dst == rank)
pugmire's avatar
pugmire committed
960
        return false;
961
  
962
    size_t szz = ics.size();
963
    if (szz == 0)
pugmire's avatar
pugmire committed
964
        return false;
965 966 967 968

    int communicationTimer = visitTimer->StartTimer();
    MemStream buff;
    buff.write(&szz, 1);
pugmire's avatar
pugmire committed
969

970
    for (int i = 0; i < ics.size(); i++)
971
    {
972 973 974
        avtIntegralCurve *ic = ics[i];
        ic->Serialize(MemStream::WRITE, buff, GetSolver());
        ICCommCnt.value ++;
975 976 977
    }
    
    // Break it up into multiple messages if needed.
978
    if (buff.buffLen() > icMsgSz)
979 980 981 982 983 984 985 986 987 988
        EXCEPTION0(ImproperUseException);
    
    // Copy it into a byte buffer.
    size_t sz = buff.buffLen();
    unsigned char *msg = new unsigned char[sz];
    memcpy(msg, buff.buff(), sz);

    //Send it along.
    MPI_Request req;
    int err = MPI_Isend(msg, sz, MPI_UNSIGNED_CHAR, dst,
989
                        avtParICAlgorithm::STREAMLINE_TAG,
990
                        VISIT_MPI_COMM, &req);
991
    debug5<<err<<" = MPI_Isend(msg, "<<sz<<", MPI_UNSIGNED_CHAR, to "<<dst<<", req= "<<req<<endl;
992
    sendICBufferMap[req] = msg;
993 994

    BytesCnt.value += sz;
995 996
    bool nov = visitTimer->GetNeverOutputValue();
    visitTimer->NeverOutput(true);
997
    CommTime.value += visitTimer->StopTimer(communicationTimer,
998
                                            "SendICs");
999 1000
    visitTimer->NeverOutput(nov);
    debug5 << "DONE  CheckPendingSendRequests()\n";
pugmire's avatar
pugmire committed