avtParICAlgorithm.C 37 KB
Newer Older
1
2
/*****************************************************************************
*
brugger's avatar
 
brugger committed
3
* Copyright (c) 2000 - 2010, Lawrence Livermore National Security, LLC
4
* Produced at the Lawrence Livermore National Laboratory
brugger's avatar
 
brugger committed
5
* LLNL-CODE-400124
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
* All rights reserved.
*
* This file is  part of VisIt. For  details, see https://visit.llnl.gov/.  The
* full copyright notice is contained in the file COPYRIGHT located at the root
* of the VisIt distribution or at http://www.llnl.gov/visit/copyright.html.
*
* Redistribution  and  use  in  source  and  binary  forms,  with  or  without
* modification, are permitted provided that the following conditions are met:
*
*  - Redistributions of  source code must  retain the above  copyright notice,
*    this list of conditions and the disclaimer below.
*  - Redistributions in binary form must reproduce the above copyright notice,
*    this  list of  conditions  and  the  disclaimer (as noted below)  in  the
*    documentation and/or other materials provided with the distribution.
*  - Neither the name of  the LLNS/LLNL nor the names of  its contributors may
*    be used to endorse or promote products derived from this software without
*    specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT  HOLDERS AND CONTRIBUTORS "AS IS"
* AND ANY EXPRESS OR  IMPLIED WARRANTIES, INCLUDING,  BUT NOT  LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND  FITNESS FOR A PARTICULAR  PURPOSE
* ARE  DISCLAIMED. IN  NO EVENT  SHALL LAWRENCE  LIVERMORE NATIONAL  SECURITY,
* LLC, THE  U.S.  DEPARTMENT OF  ENERGY  OR  CONTRIBUTORS BE  LIABLE  FOR  ANY
* DIRECT,  INDIRECT,   INCIDENTAL,   SPECIAL,   EXEMPLARY,  OR   CONSEQUENTIAL
* DAMAGES (INCLUDING, BUT NOT  LIMITED TO, PROCUREMENT OF  SUBSTITUTE GOODS OR
* SERVICES; LOSS OF  USE, DATA, OR PROFITS; OR  BUSINESS INTERRUPTION) HOWEVER
* CAUSED  AND  ON  ANY  THEORY  OF  LIABILITY,  WHETHER  IN  CONTRACT,  STRICT
* LIABILITY, OR TORT  (INCLUDING NEGLIGENCE OR OTHERWISE)  ARISING IN ANY  WAY
* OUT OF THE  USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH
* DAMAGE.
*
*****************************************************************************/

// ************************************************************************* //
40
//                              avtParICAlgorithm.C                          //
41
42
// ************************************************************************* //

43
#include "avtParICAlgorithm.h"
44
45
46
47
48
49
#include <TimingsManager.h>

using namespace std;

#ifdef PARALLEL

50
51
int avtParICAlgorithm::STATUS_TAG =  420000;
int avtParICAlgorithm::STREAMLINE_TAG = 420001;
52
53

// ****************************************************************************
54
//  Method: avtParICAlgorithm::avtParICAlgorithm
55
56
//
//  Purpose:
57
//      avtParICAlgorithm constructor.
58
59
60
61
//
//  Programmer: Dave Pugmire
//  Creation:   January 27, 2009
//
62
63
//   Dave Pugmire, Mon Mar 23 12:48:12 EDT 2009
//   Change how timings are reported/calculated.
64
65
66
//
//   Dave Pugmire, Wed Apr  1 11:21:05 EDT 2009
//   Limit the number of async recvs outstanding.
67
//   
68
69
// ****************************************************************************

70
71
72
avtParICAlgorithm::avtParICAlgorithm(avtPICSFilter *icFilter)
    : avtICAlgorithm(icFilter),
      CommTime("comT"), MsgCnt("msgC"), ICCommCnt("iccC"), BytesCnt("byteC")
73
74
75
{
    nProcs = PAR_Size();
    rank = PAR_Rank();
76
77
78
    msgID = 0;
    statusMsgSz = -1;
    numAsyncRecvs = -1;
79
    icMsgSz = 10*1024*1024;
80
81
82
}

// ****************************************************************************
83
//  Method: avtParICAlgorithm::~avtParICAlgorithm
84
85
//
//  Purpose:
86
//      avtParICAlgorithm destructor.
87
88
89
90
91
92
//
//  Programmer: Dave Pugmire
//  Creation:   January 27, 2009
//
// ****************************************************************************

93
avtParICAlgorithm::~avtParICAlgorithm()
94
95
96
97
{
}

// ****************************************************************************
98
//  Method: avtParICAlgorithm::Initialize
99
100
101
102
103
104
105
//
//  Purpose:
//      Initialize the request buffers.
//
//  Programmer: Dave Pugmire
//  Creation:   January 27, 2009
//
106
107
108
109
110
//  Modifications:
//
//   Dave Pugmire, Wed Apr  1 11:21:05 EDT 2009
//   Limit the number of async recvs outstanding.
//
111
112
113
//   Hank Childs, Fri Jun  4 19:58:30 CDT 2010
//   Use avtStreamlines, not avtStreamlineWrappers.
//
114
115
116
// ****************************************************************************

void
117
avtParICAlgorithm::Initialize(vector<avtIntegralCurve *> &seedPts,
118
119
                              int msgSz,
                              int numRecvs)
120
{
121
122
123
124
125
126
127
    //Standardmsg + 1(sender rank) +1(msg ID).
    statusMsgSz = msgSz+1+1;
    numAsyncRecvs = numRecvs;
    
    if (statusMsgSz <= 0 || numAsyncRecvs <= 0)
        EXCEPTION0(ImproperUseException);
    
128
    avtICAlgorithm::Initialize(seedPts);
129
130
131
    InitRequests();
}

pugmire's avatar
pugmire committed
132
133

// ****************************************************************************
134
//  Method: avtParICAlgorithm::PostRunAlgorithm
pugmire's avatar
pugmire committed
135
136
137
138
139
140
141
142
143
144
//
//  Purpose:
//      Cleanup.
//
//  Programmer: Dave Pugmire
//  Creation:   September 24, 2009
//
// ****************************************************************************

void
145
avtParICAlgorithm::PostRunAlgorithm()
pugmire's avatar
pugmire committed
146
{
147
    ExchangeICSteps();
pugmire's avatar
pugmire committed
148
149
}

150
// ****************************************************************************
151
//  Method: avtParICAlgorithm::PostExecute
152
153
154
155
156
157
158
159
160
161
//
//  Purpose:
//      Cleanup.
//
//  Programmer: Dave Pugmire
//  Creation:   January 27, 2009
//
// ****************************************************************************

void
162
avtParICAlgorithm::PostExecute()
163
164
{
    CleanupAsynchronous();
165
    avtICAlgorithm::PostExecute();
166
167
168
}

// ****************************************************************************
169
//  Method: avtParICAlgorithm::InitRequests
170
171
172
173
174
175
176
//
//  Purpose:
//      Initialize the request buffers.
//
//  Programmer: Dave Pugmire
//  Creation:   June 16, 2008
//
177
178
179
//   Dave Pugmire, Wed Apr  1 11:21:05 EDT 2009
//   Limit the number of async recvs outstanding.
//
180
181
182
//   Hank Childs, Fri Jun  4 19:58:30 CDT 2010
//   Use avtStreamlines, not avtStreamlineWrappers.
//
183
184
185
// ****************************************************************************

void
186
avtParICAlgorithm::InitRequests()
187
{
188
    debug5<<"avtParICAlgorithm::InitRequests() sz= "<<numAsyncRecvs<<endl;
189
    statusRecvRequests.resize(numAsyncRecvs, MPI_REQUEST_NULL);
190
    icRecvRequests.resize(numAsyncRecvs, MPI_REQUEST_NULL);
191
192
    
    for (int i = 0; i < statusRecvRequests.size(); i++)
193
    {
194
        PostRecvStatusReq(i);
195
        PostRecvICReq(i);
196
197
198
    }
}

pugmire's avatar
pugmire committed
199
static int
200
CountIDs(list<avtIntegralCurve *> &l, int id)
pugmire's avatar
pugmire committed
201
202
{
    int cnt = 0;
203
    list<avtIntegralCurve*>::const_iterator si = l.begin();
pugmire's avatar
pugmire committed
204
205
206
207
208
209
210
211
212
    for (si = l.begin(); si != l.end(); si++)
    {
        if ((*si)->id == id)
            cnt++;
    }
    return cnt;
}

// ****************************************************************************
213
//  Method: avtParICAlgorithm::ExchangeICSteps
pugmire's avatar
pugmire committed
214
215
216
217
218
219
220
221
222
223
224
//
//  Purpose:
//      Communicate streamlines pieces to destinations.
//      When a streamline is communicated, only the state information is sent.
//      All the integration steps need to sent to the proc that owns the terminated
//      streamline.  This method figures out where each streamline has terminated and
//      sends all the pieces there.
//
//  Programmer: Dave Pugmire
//  Creation:   September 21, 2009
//
225
226
227
228
229
//  Modifications:
//
//   Hank Childs, Fri Jun  4 19:58:30 CDT 2010
//   Use avtStreamlines, not avtStreamlineWrappers.
//
pugmire's avatar
pugmire committed
230
231
232
// ****************************************************************************

void
233
avtParICAlgorithm::ExchangeICSteps()
pugmire's avatar
pugmire committed
234
{
235
    debug5<<"ExchangeICSteps: communicatedICs: "<<communicatedICs.size();
236
    debug5<<" terminatedICs: "<<terminatedICs.size()<<endl;
pugmire's avatar
pugmire committed
237
238
239
240
241
242
243
244
245
246
247

    //Communicate to everyone where the terminators are located.
    //Do this "N" streamlines at a time, so we don't have a super big buffer.
    int N;
    if (numSeedPoints > 500)
        N = 500;
    else
        N = numSeedPoints;
    
    long *idBuffer = new long[2*N], *myIDs = new long[2*N];

248
249
250
    //Sort the terminated/communicated ICs by id.
    terminatedICs.sort(avtIntegralCurve::IdSeqCompare);
    communicatedICs.sort(avtIntegralCurve::IdSeqCompare);
pugmire's avatar
pugmire committed
251

252
    vector<vector<avtIntegralCurve *> >sendICs(N);
pugmire's avatar
pugmire committed
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
    vector<int> owners(N);
    
    int minId = 0;
    int maxId = N-1;
    int nLoops = numSeedPoints/N;
    if (numSeedPoints % N != 0)
        nLoops++;
    for (int l = 0; l < nLoops; l++)
    {
        //Initialize arrays for this round.
        for (int i = 0; i < N; i++)
        {
            idBuffer[i] = 0;
            idBuffer[i+N] = 0;
            myIDs[i] = 0;
            myIDs[i+N] = 0;
269
            sendICs[i].resize(0);
pugmire's avatar
pugmire committed
270
271
272
            owners[i] = 0;
        }

273
274
275
        //Set array for ICs that terminated here. Update sequence counts for communicated
        //ICs.
        list<avtIntegralCurve*>::iterator t = terminatedICs.begin();
276
        while (t != terminatedICs.end() && (*t)->id <= maxId)
pugmire's avatar
pugmire committed
277
278
279
280
281
282
        {
            if ((*t)->id >= minId)
            {
                int idx = (*t)->id % N;
                myIDs[idx] = rank;
                myIDs[idx+N] += 1;
pugmire's avatar
pugmire committed
283
                debug5<<"I own id= "<<(*t)->id<<" "<<(*t)->sequenceCnt<<" idx= "<<idx<<endl;
pugmire's avatar
pugmire committed
284
285
286
287
288
            }

            t++;
        }
        
289
290
        list<avtIntegralCurve*>::const_iterator c = communicatedICs.begin();
        while (c != communicatedICs.end() && (*c)->id <= maxId)
pugmire's avatar
pugmire committed
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
        {
            if ((*c)->id >= minId)
            {
                int idx = (*c)->id % N;
                myIDs[idx+N] += 1;
                debug5<<"I have "<<(*c)->id<<" "<<(*c)->sequenceCnt<<" idx= "<<idx<<endl;
            }
            c++;
        }
        
        //Exchange ID owners and sequence counts.
        MPI_Allreduce(myIDs, idBuffer, 2*N, MPI_LONG, MPI_SUM, VISIT_MPI_COMM);
        if (0)
        {
            debug5<<"idBuffer:  [";
            for(int i=0; i<2*N;i++)
                debug5<<idBuffer[i]<<" ";
            debug5<<"]"<<endl;
        }
        
311
312
313
        //Now we know where all ICs belong and how many sequences for each.
        //Send communicatedICs to the owners.
        while (!communicatedICs.empty())
pugmire's avatar
pugmire committed
314
        {
315
            avtIntegralCurve *s = communicatedICs.front();
pugmire's avatar
pugmire committed
316
317
318
319
320
            if (s->id <= maxId)
            {
                int idx = s->id%N;
                int owner = idBuffer[idx];
                if (owner == rank)
321
                    terminatedICs.push_back(s);
pugmire's avatar
pugmire committed
322
323
                else
                {
324
325
                    s->serializeFlags = avtIntegralCurve::SERIALIZE_STEPS; //Write IC steps.
                    sendICs[idx].push_back(s);
pugmire's avatar
pugmire committed
326
327
                    owners[idx] = owner;
                }
328
                communicatedICs.pop_front();
pugmire's avatar
pugmire committed
329
330
331
332
333
334
335
            }
            else
                break;
        }
        
        for (int i = 0; i < N; i++)
        {
336
            if (sendICs[i].size() > 0)
pugmire's avatar
pugmire committed
337
            {
338
                DoSendICs(owners[i], sendICs[i]);
pugmire's avatar
pugmire committed
339

340
341
                for (int j = 0; j < sendICs[i].size(); j++)
                    delete sendICs[i][j];
pugmire's avatar
pugmire committed
342
343
344
345
            }
        }
        
        //Wait for all the sequences to arrive. The total number is known for
346
        //each IC, so wait until they all come.
pugmire's avatar
pugmire committed
347
348
349
        bool seqGathered = false;
        while (!seqGathered)
        {
350
            RecvICs(terminatedICs);
pugmire's avatar
pugmire committed
351
352
            
            //See if we have all the sequences we need.
353
            terminatedICs.sort(avtIntegralCurve::IdSeqCompare);
pugmire's avatar
pugmire committed
354
355
356
            bool needMore = false;
            for (int i = 0; i < N && !needMore; i++)
                if (idBuffer[i] == rank)
357
                    needMore = (CountIDs(terminatedICs, i+minId) < idBuffer[i+N]);
pugmire's avatar
pugmire committed
358
359
360
361
362
363
364
365
366
367
            
            //Everyone done.
            seqGathered = !needMore;
        }
        
        //Advance to next N streamlines.
        maxId += N;
        minId += N;
    }

368
369
    //All ICs are distributed, merge the sequences into single streamlines.
    MergeTerminatedICSequences();
pugmire's avatar
pugmire committed
370
371
372
373
374
375
    
    delete [] idBuffer;
    delete [] myIDs;
}

// ****************************************************************************
376
//  Method: avtParICAlgorithm::MergeTerminatedICSequences
pugmire's avatar
pugmire committed
377
378
379
380
381
382
383
//
//  Purpose:
//      Merge streamline sequences.
//
//  Programmer: Dave Pugmire
//  Creation:   Sept 21, 2009
//
384
385
386
387
388
//  Modifications:
//
//   Hank Childs, Fri Jun  4 19:58:30 CDT 2010
//   Use avtStreamlines, not avtStreamlineWrappers.
//
pugmire's avatar
pugmire committed
389
390
391
// ****************************************************************************

void
392
avtParICAlgorithm::MergeTerminatedICSequences()
pugmire's avatar
pugmire committed
393
394
{
    //Sort them by id and sequence so we can process them one at a time.
395
    terminatedICs.sort(avtIntegralCurve::IdSeqCompare);
pugmire's avatar
pugmire committed
396
397

    //Split them up into sequences.
398
    vector<vector<avtIntegralCurve *> > seqs;
399
    while (!terminatedICs.empty())
pugmire's avatar
pugmire committed
400
    {
401
        avtIntegralCurve *s = terminatedICs.front();
402
        terminatedICs.pop_front();
pugmire's avatar
pugmire committed
403
404
405
406
407
        
        //Empty or new ID, add a new entry.
        if (seqs.size() == 0 ||
            seqs[seqs.size()-1][0]->id != s->id)
        {
408
            vector<avtIntegralCurve *> v;
pugmire's avatar
pugmire committed
409
410
411
412
413
414
415
416
            v.push_back(s);
            seqs.push_back(v);
        }
        else
        {
            seqs[seqs.size()-1].push_back(s);
        }
    }
417
    terminatedICs.clear();
pugmire's avatar
pugmire committed
418
419
420
421
    
    //Merge the sequences together, put them into terminated list.
    for (int i = 0; i < seqs.size(); i++)
    {
422
        avtIntegralCurve *s = seqs[i][0]->MergeIntegralCurveSequence(seqs[i]);
423
        terminatedICs.push_back(s);
pugmire's avatar
pugmire committed
424
425
426
    }
}

427
// ****************************************************************************
428
//  Method: avtParICAlgorithm::CleanupAsynchronous
429
430
431
432
433
434
435
436
//
//  Purpose:
//      Claenup the buffers used when doing asynchronous processing.
//
//  Programmer: Dave Pugmire
//  Creation:   June 16, 2008
//
// ****************************************************************************
pugmire's avatar
pugmire committed
437

438
void
439
avtParICAlgorithm::CleanupAsynchronous()
440
441
442
443
444
445
446
447
{
    for (int i = 0; i < statusRecvRequests.size(); i++)
    {
        MPI_Request req = statusRecvRequests[i];
        if (req != MPI_REQUEST_NULL)
            MPI_Cancel(&req);
    } 

448
    for (int i = 0; i < icRecvRequests.size(); i++)
449
    {
450
        MPI_Request req = icRecvRequests[i];
451
452
453
454
455
456
        if (req != MPI_REQUEST_NULL)
            MPI_Cancel(&req);
    }

    // Cleanup recv buffers.
    map<MPI_Request, unsigned char*>::const_iterator it;
457
    for (it = recvICBufferMap.begin(); it != recvICBufferMap.end(); ++it)
458
459
460
461
462
    {
        char *buff = (char *)it->second;
        if (it->second != NULL)
            delete [] it->second;
    }
463
    recvICBufferMap.clear();
464
465
466
467
468
469
470
471
472
473
474
475
476

    map<MPI_Request, int*>::const_iterator itt;
    for (itt = recvIntBufferMap.begin(); itt != recvIntBufferMap.end(); ++itt)
    {
        char *buff = (char *)itt->second;
        if (itt->second != NULL)
            delete [] itt->second;
    }
    recvIntBufferMap.clear();
}


// ****************************************************************************
477
//  Method: avtParICAlgorithm::CheckPendingSendRequests
478
479
480
481
482
483
484
//
//  Purpose:
//      Check to see if there are any pending send requests.
//
//  Programmer: Dave Pugmire
//  Creation:   June 16, 2008
//
485
486
487
488
489
//  Modifications:
//
//    Dave Pugmire, Wed Mar 18 17:07:07 EDT 2009
//    Delete entry from map after send is complete.
//
490
491
//   Dave Pugmire, Mon Mar 23 12:48:12 EDT 2009
//   Change how timings are reported/calculated.
492
493
494
//
//   Dave Pugmire, Sat Mar 28 22:21:49 EDT 2009
//   Bug fix. "notCompleted" wasn't in an else clause for the INT messages.
495
//   
496
497
498
//   Hank Childs, Sat Feb 20 16:53:18 CST 2010
//   Don't output timing values to the timing logs.
//
499
500
// ****************************************************************************
void
501
avtParICAlgorithm::CheckPendingSendRequests()
502
{
503
    debug5<<"avtParICAlgorithm::CheckPendingSendRequests()\n";
504
505
    int communicationTimer = visitTimer->StartTimer();
    
506
    if (sendICBufferMap.size() > 0)
507
508
509
510
511
    {
        vector<MPI_Request> req, copy;

        int notCompleted = 0;
        map<MPI_Request, unsigned char*>::const_iterator it;
512
        for (it = sendICBufferMap.begin(); it != sendICBufferMap.end(); ++it)
513
514
515
516
517
518
519
520
521
522
        {
            if (it->first != MPI_REQUEST_NULL && it->second != NULL)
            {
                req.push_back(it->first);
                copy.push_back(it->first);
            }
            else
                notCompleted++;
        }

523
        debug5 << "\tCheckPendingSendRequests() IC completed = "<<req.size()
524
525
526
527
528
529
530
531
532
533
534
535
536
               <<" not completed: "<<notCompleted<<endl;

        if (req.size() > 0)
        {
            // See if any sends have completed. Delete buffers if they have.
            int num = 0, *indices = new int[req.size()];
            MPI_Status *status = new MPI_Status[req.size()];
            int err = MPI_Testsome(req.size(), &req[0], &num, indices, status);
            
            for (int i = 0; i < num; i++)
            {
                int idx = indices[i];
                MPI_Request r = copy[idx];
537
                unsigned char *buff = sendICBufferMap[r];
538
539
540
541
                debug5 << "\tidx = " << idx << " r = " << r << " buff = " 
                       << (void *)buff << endl;
                if (buff)
                    delete [] buff;
542

543
544
                sendICBufferMap[r] = NULL;
                sendICBufferMap.erase(r);
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
            }
            
            delete [] indices;
            delete [] status;
        }
    }

    if (sendIntBufferMap.size() > 0)
    {
        vector<MPI_Request> req, copy;
        map<MPI_Request, int*>::const_iterator it;
        int notCompleted = 0;

        for (it = sendIntBufferMap.begin(); it != sendIntBufferMap.end(); ++it)
        {
            if (it->first != MPI_REQUEST_NULL && it->second != NULL)
            {
                req.push_back(it->first);
                copy.push_back(it->first);
            }
565
566
            else
                notCompleted++;
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
        }

        debug5 << "\tCheckPendingSendRequests() INT completed = "<<req.size()
               <<" not completed: "<<notCompleted<<endl;
        
        if (req.size() > 0)
        {
            // See if any sends have completed. Delete buffers if they have.
            int num = 0, *indices = new int[req.size()];
            MPI_Status *status = new MPI_Status[req.size()];
            int err = MPI_Testsome(req.size(), &req[0], &num, indices, status);
            
            for (int i = 0; i < num; i++)
            {
                int idx = indices[i];
                MPI_Request r = copy[idx];
                int *buff = sendIntBufferMap[r];
                debug5 << "\tidx = " << idx << " r = " << r << " buff = " 
                       << (void *)buff << endl;
                if (buff)
                    delete [] buff;
588

589
                sendIntBufferMap[r] = NULL;
590
                sendIntBufferMap.erase(r);
591
592
593
594
595
596
597
            }
            
            delete [] indices;
            delete [] status;
        }
    }

598
599
    bool nov = visitTimer->GetNeverOutputValue();
    visitTimer->NeverOutput(true);
600
601
    CommTime.value += visitTimer->StopTimer(communicationTimer, 
                                            "CheckPending");
602
    visitTimer->NeverOutput(nov);
603
604
605
606
    debug5 << "DONE  CheckPendingSendRequests()\n";
}

// ****************************************************************************
607
//  Method: avtParICAlgorithm::PostRecvStatusReq
608
609
610
611
612
613
614
//
//  Purpose:
//      Receives status requests.
//
//  Programmer: Dave Pugmire
//  Creation:   June 16, 2008
//
615
616
617
//   Dave Pugmire, Wed Apr  1 11:21:05 EDT 2009
//   Limit the number of async recvs outstanding.
//
618
619
620
// ****************************************************************************

void
621
avtParICAlgorithm::PostRecvStatusReq(int idx)
622
623
624
{
    MPI_Request req;
    int *buff = new int[statusMsgSz];
625
626

    MPI_Irecv(buff, statusMsgSz, MPI_INT, MPI_ANY_SOURCE,
627
              avtParICAlgorithm::STATUS_TAG,
628
              VISIT_MPI_COMM, &req);
629
630
    debug5 << "Post Statusrecv " <<idx<<" req= "<<req<<endl;
    statusRecvRequests[idx] = req;
631
632
633
634
    recvIntBufferMap[req] = buff;
}

// ****************************************************************************
635
//  Method: avtParICAlgorithm::PostRecvICReq
636
637
638
639
640
641
642
//
//  Purpose:
//      Receives status requests.
//
//  Programmer: Dave Pugmire
//  Creation:   June 16, 2008
//
643
644
645
646
647
//  Modifications:
//
//   Dave Pugmire, Wed Apr  1 11:21:05 EDT 2009
//   Limit the number of async recvs outstanding.
//
648
649
650
// ****************************************************************************

void
651
avtParICAlgorithm::PostRecvICReq(int idx)
652
653
{
    MPI_Request req;
654
655
    unsigned char *buff = new unsigned char[icMsgSz];
    MPI_Irecv(buff, icMsgSz,
656
              MPI_UNSIGNED_CHAR, MPI_ANY_SOURCE,
657
              avtParICAlgorithm::STREAMLINE_TAG, 
658
659
              VISIT_MPI_COMM, &req);

660
661
662
    debug5 << "Post ICrecv " <<idx<<" req= "<<req<<endl;
    icRecvRequests[idx] = req;
    recvICBufferMap[req] = buff;
663
664
665
666
}


// ****************************************************************************
667
//  Method: avtParICAlgorithm::SendMsg
668
669
670
671
672
673
674
675
676
//
//  Purpose:
//      Send an asynchronous message.
//
//  Programmer: Dave Pugmire
//  Creation:   Dec 18, 2008
//
// Modifications:
//
677
678
//   Dave Pugmire, Wed Apr  1 11:21:05 EDT 2009
//   Add the senders rank and msgID to the front of the message.
679
//
680
681
682
//   Hank Childs, Sat Feb 20 16:53:18 CST 2010
//   Don't output timing values to the timing logs.
//
683
684
685
// ****************************************************************************

void
686
avtParICAlgorithm::SendMsg(int dst,
687
688
689
690
691
692
693
                           vector<int> &msg)
{
    int communicationTimer = visitTimer->StartTimer();
    if (msg.size() > statusMsgSz)
        EXCEPTION0(ImproperUseException);
    
    int *buff = new int[statusMsgSz];
694
695
696
    buff[0] = msgID;
    msgID++;
    buff[1] = rank;
697
698
699
    
    MPI_Request req;
    for (int i = 0; i < msg.size(); i++)
700
701
702
703
704
        buff[2+i] = msg[i];

    debug5<<"SendMsg to :"<<dst<<" [";
    for(int i = 0; i < statusMsgSz; i++) debug5<<buff[i]<<" ";
    debug5<<"]"<<endl;
705
        
706
    int err = MPI_Isend(buff, statusMsgSz, MPI_INT, dst,
707
                        avtParICAlgorithm::STATUS_TAG,
708
                        VISIT_MPI_COMM, &req);
709
710
711
712
713

    sendIntBufferMap[req] = buff;
    
    BytesCnt.value += (sizeof(int) *statusMsgSz);
    MsgCnt.value++;
714
715
    bool nov = visitTimer->GetNeverOutputValue();
    visitTimer->NeverOutput(true);
716
717
    CommTime.value += visitTimer->StopTimer(communicationTimer, 
                                            "SendMsg");
718
719
    visitTimer->NeverOutput(nov);
    debug5 << "DONE  CheckPendingSendRequests()\n";
720
721
722
}

// ****************************************************************************
723
//  Method: avtParICAlgorithm::SendAllMsg
724
725
726
727
728
729
730
731
732
733
734
735
736
//
//  Purpose:
//      Broadcast a message.
//
//  Programmer: Dave Pugmire
//  Creation:   Dec 18, 2008
//
// Modifications:
//
//
// ****************************************************************************

void
737
avtParICAlgorithm::SendAllMsg(vector<int> &msg)
738
739
740
741
742
743
744
{
    for (int i = 0; i < nProcs; i++)
        if (i != rank)
            SendMsg(i, msg);
}

// ****************************************************************************
745
//  Method: avtParICAlgorithm::RecvMsgs
746
747
748
749
750
751
752
753
754
//
//  Purpose:
//      Recieve any messages.
//
//  Programmer: Dave Pugmire
//  Creation:   Dec 18, 2008
//
// Modifications:
//
755
756
//   Dave Pugmire, Mon Mar 23 12:48:12 EDT 2009
//   Change how timings are reported/calculated.
757
758
759
//
//   Dave Pugmire, Wed Apr  1 11:21:05 EDT 2009
//   Senders rank and msgID is in the message now.
760
//   
761
762
763
//   Hank Childs, Sat Feb 20 16:53:18 CST 2010
//   Don't output timing values to the timing logs.
//
764
765
766
// ****************************************************************************

void
767
avtParICAlgorithm::RecvMsgs(std::vector<std::vector<int> > &msgs)
768
{
769
    debug5<<"avtParICAlgorithm::RecvMsgs()\n";
770
    int communicationTimer = visitTimer->StartTimer();
771
772
773
774
775
776
777
778
779
780
781
782
    
    msgs.resize(0);
    while (true)
    {
        int nReq = statusRecvRequests.size();
        MPI_Status *status = new MPI_Status[nReq];
        int *indices = new int[nReq];
        int num = 0, err;

        vector<MPI_Request> copy;
        for (int i = 0; i < statusRecvRequests.size(); i++)
            copy.push_back(statusRecvRequests[i]);
783

784
        err = MPI_Testsome(nReq, &copy[0], &num, indices, status);
785
        debug5<<"::RecvMsgs() err= "<<err<<" Testsome("<<nReq<<"); num= "<<num<<endl;
786
787
788
789
790
791

        if (num > 0)
        {
            for (int i = 0; i < num; i++)
            {
                int idx = indices[i];
792
                debug5<<"RecvMsg from "<<idx<<endl;
793
794
795
796
797
798
799
800
801
802

                MPI_Request req = statusRecvRequests[idx];
                if (req == MPI_REQUEST_NULL)
                    continue;
                
                int *buff = recvIntBufferMap[req];
                recvIntBufferMap.erase(req);
                if (buff == NULL)
                    continue;

803
804
805
806
807
                debug5<<"RecvMsg: [";
                for(int i = 0; i < statusMsgSz; i++) debug5<<buff[i]<<" ";
                debug5<<"]"<<endl;
                
                //Skip msg ID, copy buffer int msg.
808
                vector<int> msg;
809
                for (int i = 1; i < statusMsgSz; i++)
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
                    msg.push_back(buff[i]);
                msgs.push_back(msg);

                //Clean up.
                delete [] buff;
            }
        
            //Repost recv requests.
            for (int i = 0; i < num; i++)
                PostRecvStatusReq(indices[i]);
        }
            
        delete [] status;
        delete [] indices;
        if (num == 0)
            break;
    }
827
828
829
    bool nov = visitTimer->GetNeverOutputValue();
    visitTimer->NeverOutput(true);
    debug5 << "DONE  CheckPendingSendRequests()\n";
830
831
    CommTime.value += visitTimer->StopTimer(communicationTimer,
                                            "RecvMsgs");
832
    visitTimer->NeverOutput(nov);
833
834
835
}

// ****************************************************************************
836
//  Method: avtParICAlgorithm::SendICs
837
838
839
840
841
842
843
844
845
846
847
848
//
//  Purpose:
//      Send streamlines to a dst.
//
//  Programmer: Dave Pugmire
//  Creation:   June 16, 2008
//
//  Modifications:
//
//   Dave Pugmire, Fri Aug 22 14:47:11 EST 2008
//   Memory leak fix.
//
pugmire's avatar
pugmire committed
849
//   Dave Pugmire, Thu Sep 24 14:03:46 EDT 2009
850
//   Call new method, DoSendICs.
pugmire's avatar
pugmire committed
851
//
852
853
854
//   Hank Childs, Fri Jun  4 19:58:30 CDT 2010
//   Use avtStreamlines, not avtStreamlineWrappers.
//
855
856
857
// ****************************************************************************

void
858
859
avtParICAlgorithm::SendICs(int dst, 
                           vector<avtIntegralCurve*> &ics)
860
{
pugmire's avatar
pugmire committed
861

862
    for (int i = 0; i < ics.size(); i++)
pugmire's avatar
pugmire committed
863
    {
864
865
        avtIntegralCurve *ic = ics[i];
        ic->serializeFlags |= avtIntegralCurve::SERIALIZE_INC_SEQ;
pugmire's avatar
pugmire committed
866
867
    }

868
    if (DoSendICs(dst, ics))
pugmire's avatar
pugmire committed
869
    {
870
        for (int i = 0; i < ics.size(); i++)
pugmire's avatar
pugmire committed
871
        {
872
            avtIntegralCurve *ic = ics[i];
pugmire's avatar
pugmire committed
873
874
            
            //Add if id/seq is unique. (single streamlines can be sent to multiple dst).
875
            list<avtIntegralCurve*>::const_iterator si = communicatedICs.begin();
pugmire's avatar
pugmire committed
876
            bool found = false;
877
878
            for (si = communicatedICs.begin(); !found && si != communicatedICs.end(); si++)
                found = ((*si)->id == ic->id && (*si)->sequenceCnt == ic->sequenceCnt);
pugmire's avatar
pugmire committed
879
880
        
            if (!found)
881
                communicatedICs.push_back(ic);
pugmire's avatar
pugmire committed
882
883
884
        }
        
        //Empty the array.
885
        ics.resize(0);
pugmire's avatar
pugmire committed
886
887
888
889
890
    }
}


// ****************************************************************************
891
//  Method: avtParICAlgorithm::DoSendICs
pugmire's avatar
pugmire committed
892
893
894
895
896
897
898
899
900
//
//  Purpose:
//      Send streamlines to a dst.
//
//  Programmer: Dave Pugmire
//  Creation:   September 24, 2009
//
//  Modifications:
//
901
902
903
//   Hank Childs, Sat Feb 20 16:53:18 CST 2010
//   Don't output timing values to the timing logs.
//
904
905
906
//   Hank Childs, Fri Jun  4 19:58:30 CDT 2010
//   Use avtStreamlines, not avtStreamlineWrappers.
//
pugmire's avatar
pugmire committed
907
908
909
// ****************************************************************************

bool
910
911
avtParICAlgorithm::DoSendICs(int dst, 
                             vector<avtIntegralCurve*> &ics)
pugmire's avatar
pugmire committed
912
{
913
    if (dst == rank)
pugmire's avatar
pugmire committed
914
        return false;
915
  
916
    size_t szz = ics.size();
917
    if (szz == 0)
pugmire's avatar
pugmire committed
918
        return false;
919
920
921
922

    int communicationTimer = visitTimer->StartTimer();
    MemStream buff;
    buff.write(&szz, 1);
pugmire's avatar
pugmire committed
923

924
    for (int i = 0; i < ics.size(); i++)
925
    {
926
927
928
        avtIntegralCurve *ic = ics[i];
        ic->Serialize(MemStream::WRITE, buff, GetSolver());
        ICCommCnt.value ++;
929
930
931
    }
    
    // Break it up into multiple messages if needed.
932
    if (buff.buffLen() > icMsgSz)
933
934
935
936
937
938
939
940
941
942
        EXCEPTION0(ImproperUseException);
    
    // Copy it into a byte buffer.
    size_t sz = buff.buffLen();
    unsigned char *msg = new unsigned char[sz];
    memcpy(msg, buff.buff(), sz);

    //Send it along.
    MPI_Request req;
    int err = MPI_Isend(msg, sz, MPI_UNSIGNED_CHAR, dst,
943
                        avtParICAlgorithm::STREAMLINE_TAG,
944
                        VISIT_MPI_COMM, &req);
945
    debug5<<err<<" = MPI_Isend(msg, "<<sz<<", MPI_UNSIGNED_CHAR, to "<<dst<<", req= "<<req<<endl;
946
    sendICBufferMap[req] = msg;
947
948

    BytesCnt.value += sz;
949
950
    bool nov = visitTimer->GetNeverOutputValue();
    visitTimer->NeverOutput(true);
951
    CommTime.value += visitTimer->StopTimer(communicationTimer,
952
                                            "SendICs");
953
954
    visitTimer->NeverOutput(nov);
    debug5 << "DONE  CheckPendingSendRequests()\n";
pugmire's avatar
pugmire committed
955
    return true;
956
957
958
}

// ****************************************************************************
959
//  Method: avtParICAlgorithm::RecvICs
960
961
962
963
964
//
//  Purpose:
//      Recv streamlines.
//
//  Programmer: Dave Pugmire
965
//  Creation:   Mon Mar 16 15:45:11 EDT 2009
966
//
967
968
969
970
//  Modifications:
//
//  Dave Pugmire, Wed Mar 18 17:17:40 EDT 2009
//  RecvSLs broken into two methods.
971
972
973
//  
//  Dave Pugmire, Mon Mar 23 12:48:12 EDT 2009
//  Change how timings are reported/calculated.
974
//
975
976
977
//  Hank Childs, Sat Feb 20 16:53:18 CST 2010
//  Don't output timing values to the timing logs.
//
978
979
980
//  Hank Childs, Fri Jun  4 19:58:30 CDT 2010
//  Use avtStreamlines, not avtStreamlineWrappers.
//
981
982
983
984
//  Hank Childs, Sat Jun  5 16:21:27 CDT 2010
//  Use the PICS filter to instantiate integral curves, as this is now
//  an abstract type.
//
985
// ****************************************************************************
986

987
int
988
avtParICAlgorithm::RecvICs(list<avtIntegralCurve *> &recvICs)
989
{
990
    int communicationTimer = visitTimer->StartTimer();
991
    int icCount = 0;
992
993
994

    while (true)
    {
995
        int nReq = icRecvRequests.size();
996
997
998
999
1000
        MPI_Status *status = new MPI_Status[nReq];
        int *indices = new int[nReq];
        int num = 0, err;

        vector<MPI_Request> copy;
1001
1002
        for (int i = 0; i < icRecvRequests.size(); i++)
            copy.push_back(icRecvRequests[i]);
1003
1004
1005
1006
1007
1008
1009
        err = MPI_Testsome(nReq, &copy[0], &num, indices, status);

        if (num > 0)
        {
            for (int i = 0; i < num; i++)
            {
                int idx = indices[i];
1010
                MPI_Request req = icRecvRequests[idx];
1011
1012
1013
                if (req == MPI_REQUEST_NULL)
                    continue;
                
1014
                //Grab the bytes, unserialize them, add to list.
1015
1016
                unsigned char *msg = recvICBufferMap[req];
                recvICBufferMap.erase(req);
1017
1018
1019
                if (msg == NULL)
                    continue;
        
1020
                MemStream buff(icMsgSz, msg);
1021
1022
1023
                delete [] msg;
                msg = NULL;

1024
1025
                size_t numICs;
                buff.read(numICs);
1026

1027
                for (int j = 0; j < numICs; j++)
1028
                {
1029
1030
1031
1032
                    avtIntegralCurve *ic = picsFilter->CreateIntegralCurve();
                    ic->Serialize(MemStream::READ, buff, GetSolver());
                    recvICs.push_back(ic);
                    icCount++;
1033
1034
1035
1036
                }
            }

            for (int i = 0; i < num; i++)
1037
                PostRecvICReq(indices[i]);
1038
1039
1040
1041
1042
1043
1044
1045
        }

        delete [] status;
        delete [] indices;
        
        if (num == 0)
            break;
    }
1046
    
1047
1048
    bool nov = visitTimer->GetNeverOutputValue();
    visitTimer->NeverOutput(true);
1049
    CommTime.value += visitTimer->StopTimer(communicationTimer,
1050
                                            "RecvICs");
1051
1052
    visitTimer->NeverOutput(nov);
    debug5 << "DONE  CheckPendingSendRequests()\n";
1053
    return icCount;
1054
1055
}

1056
// ****************************************************************************
1057
//  Method: avtParICAlgorithm::RecvICs
1058
1059
1060
1061
1062
1063
1064
1065
1066
1067
1068
1069
1070
1071
1072
//
//  Purpose:
//      Recv streamlines.
//
//  Programmer: Dave Pugmire
//  Creation:   June 16, 2008
//
//  Modifications:
//
//    Dave Pugmire, Mon Mar 16 15:45:11 EDT 2009
//    Call the other RecvSLs and then check for domain inclusion.
//
//    Dave Pugmire, Tue Mar 17 12:02:10 EDT 2009
//    Use new new RecvSLs method, then check for terminations.
//
1073
1074
1075
//    Hank Childs, Fri Jun  4 03:52:48 PDT 2010
//    Rename GetEndPt to GetCurrentLocation.
//
1076
1077
1078
//    Hank Childs, Fri Jun  4 19:58:30 CDT 2010
//    Use avtStreamlines, not avtStreamlineWrappers.
//
1079
1080
// ****************************************************************************
int
1081
avtParICAlgorithm::RecvICs(list<avtIntegralCurve *> &streamlines,