avtParICAlgorithm.C 37.1 KB
Newer Older
1
2
/*****************************************************************************
*
brugger's avatar
 
brugger committed
3
* Copyright (c) 2000 - 2010, Lawrence Livermore National Security, LLC
4
* Produced at the Lawrence Livermore National Laboratory
brugger's avatar
 
brugger committed
5
* LLNL-CODE-400124
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
* All rights reserved.
*
* This file is  part of VisIt. For  details, see https://visit.llnl.gov/.  The
* full copyright notice is contained in the file COPYRIGHT located at the root
* of the VisIt distribution or at http://www.llnl.gov/visit/copyright.html.
*
* Redistribution  and  use  in  source  and  binary  forms,  with  or  without
* modification, are permitted provided that the following conditions are met:
*
*  - Redistributions of  source code must  retain the above  copyright notice,
*    this list of conditions and the disclaimer below.
*  - Redistributions in binary form must reproduce the above copyright notice,
*    this  list of  conditions  and  the  disclaimer (as noted below)  in  the
*    documentation and/or other materials provided with the distribution.
*  - Neither the name of  the LLNS/LLNL nor the names of  its contributors may
*    be used to endorse or promote products derived from this software without
*    specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT  HOLDERS AND CONTRIBUTORS "AS IS"
* AND ANY EXPRESS OR  IMPLIED WARRANTIES, INCLUDING,  BUT NOT  LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND  FITNESS FOR A PARTICULAR  PURPOSE
* ARE  DISCLAIMED. IN  NO EVENT  SHALL LAWRENCE  LIVERMORE NATIONAL  SECURITY,
* LLC, THE  U.S.  DEPARTMENT OF  ENERGY  OR  CONTRIBUTORS BE  LIABLE  FOR  ANY
* DIRECT,  INDIRECT,   INCIDENTAL,   SPECIAL,   EXEMPLARY,  OR   CONSEQUENTIAL
* DAMAGES (INCLUDING, BUT NOT  LIMITED TO, PROCUREMENT OF  SUBSTITUTE GOODS OR
* SERVICES; LOSS OF  USE, DATA, OR PROFITS; OR  BUSINESS INTERRUPTION) HOWEVER
* CAUSED  AND  ON  ANY  THEORY  OF  LIABILITY,  WHETHER  IN  CONTRACT,  STRICT
* LIABILITY, OR TORT  (INCLUDING NEGLIGENCE OR OTHERWISE)  ARISING IN ANY  WAY
* OUT OF THE  USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH
* DAMAGE.
*
*****************************************************************************/

// ************************************************************************* //
40
//                              avtParICAlgorithm.C                          //
41
42
// ************************************************************************* //

43
#include "avtParICAlgorithm.h"
44
45
46
47
48
49
#include <TimingsManager.h>

using namespace std;

#ifdef PARALLEL

50
51
int avtParICAlgorithm::STATUS_TAG =  420000;
int avtParICAlgorithm::STREAMLINE_TAG = 420001;
52
53

// ****************************************************************************
54
//  Method: avtParICAlgorithm::avtParICAlgorithm
55
56
//
//  Purpose:
57
//      avtParICAlgorithm constructor.
58
59
60
61
//
//  Programmer: Dave Pugmire
//  Creation:   January 27, 2009
//
62
63
//   Dave Pugmire, Mon Mar 23 12:48:12 EDT 2009
//   Change how timings are reported/calculated.
64
65
66
//
//   Dave Pugmire, Wed Apr  1 11:21:05 EDT 2009
//   Limit the number of async recvs outstanding.
67
//   
68
69
// ****************************************************************************

70
71
72
avtParICAlgorithm::avtParICAlgorithm(avtPICSFilter *icFilter)
    : avtICAlgorithm(icFilter),
      CommTime("comT"), MsgCnt("msgC"), ICCommCnt("iccC"), BytesCnt("byteC")
73
74
75
{
    nProcs = PAR_Size();
    rank = PAR_Rank();
76
77
78
    msgID = 0;
    statusMsgSz = -1;
    numAsyncRecvs = -1;
79
    icMsgSz = 10*1024*1024;
80
81
82
}

// ****************************************************************************
83
//  Method: avtParICAlgorithm::~avtParICAlgorithm
84
85
//
//  Purpose:
86
//      avtParICAlgorithm destructor.
87
88
89
90
91
92
//
//  Programmer: Dave Pugmire
//  Creation:   January 27, 2009
//
// ****************************************************************************

93
avtParICAlgorithm::~avtParICAlgorithm()
94
95
96
97
{
}

// ****************************************************************************
98
//  Method: avtParICAlgorithm::Initialize
99
100
101
102
103
104
105
//
//  Purpose:
//      Initialize the request buffers.
//
//  Programmer: Dave Pugmire
//  Creation:   January 27, 2009
//
106
107
108
109
110
//  Modifications:
//
//   Dave Pugmire, Wed Apr  1 11:21:05 EDT 2009
//   Limit the number of async recvs outstanding.
//
111
112
113
//   Hank Childs, Fri Jun  4 19:58:30 CDT 2010
//   Use avtStreamlines, not avtStreamlineWrappers.
//
114
115
116
//   Hank Childs, Mon Jun  7 14:57:13 CDT 2010
//   Rename to InitializeBuffers to prevent name collision.
//
117
118
119
// ****************************************************************************

void
120
121
122
avtParICAlgorithm::InitializeBuffers(vector<avtIntegralCurve *> &seedPts,
                                     int msgSz,
                                     int numRecvs)
123
{
124
125
126
127
128
129
130
    //Standardmsg + 1(sender rank) +1(msg ID).
    statusMsgSz = msgSz+1+1;
    numAsyncRecvs = numRecvs;
    
    if (statusMsgSz <= 0 || numAsyncRecvs <= 0)
        EXCEPTION0(ImproperUseException);
    
131
    avtICAlgorithm::Initialize(seedPts);
132
133
134
    InitRequests();
}

pugmire's avatar
pugmire committed
135
136

// ****************************************************************************
137
//  Method: avtParICAlgorithm::PostRunAlgorithm
pugmire's avatar
pugmire committed
138
139
140
141
142
143
144
145
146
147
//
//  Purpose:
//      Cleanup.
//
//  Programmer: Dave Pugmire
//  Creation:   September 24, 2009
//
// ****************************************************************************

void
148
avtParICAlgorithm::PostRunAlgorithm()
pugmire's avatar
pugmire committed
149
{
150
    ExchangeICSteps();
pugmire's avatar
pugmire committed
151
152
}

153
// ****************************************************************************
154
//  Method: avtParICAlgorithm::PostExecute
155
156
157
158
159
160
161
162
163
164
//
//  Purpose:
//      Cleanup.
//
//  Programmer: Dave Pugmire
//  Creation:   January 27, 2009
//
// ****************************************************************************

void
165
avtParICAlgorithm::PostExecute()
166
167
{
    CleanupAsynchronous();
168
    avtICAlgorithm::PostExecute();
169
170
171
}

// ****************************************************************************
172
//  Method: avtParICAlgorithm::InitRequests
173
174
175
176
177
178
179
//
//  Purpose:
//      Initialize the request buffers.
//
//  Programmer: Dave Pugmire
//  Creation:   June 16, 2008
//
180
181
182
//   Dave Pugmire, Wed Apr  1 11:21:05 EDT 2009
//   Limit the number of async recvs outstanding.
//
183
184
185
//   Hank Childs, Fri Jun  4 19:58:30 CDT 2010
//   Use avtStreamlines, not avtStreamlineWrappers.
//
186
187
188
// ****************************************************************************

void
189
avtParICAlgorithm::InitRequests()
190
{
191
    debug5<<"avtParICAlgorithm::InitRequests() sz= "<<numAsyncRecvs<<endl;
192
    statusRecvRequests.resize(numAsyncRecvs, MPI_REQUEST_NULL);
193
    icRecvRequests.resize(numAsyncRecvs, MPI_REQUEST_NULL);
194
195
    
    for (int i = 0; i < statusRecvRequests.size(); i++)
196
    {
197
        PostRecvStatusReq(i);
198
        PostRecvICReq(i);
199
200
201
    }
}

pugmire's avatar
pugmire committed
202
static int
203
CountIDs(list<avtIntegralCurve *> &l, int id)
pugmire's avatar
pugmire committed
204
205
{
    int cnt = 0;
206
    list<avtIntegralCurve*>::const_iterator si = l.begin();
pugmire's avatar
pugmire committed
207
208
209
210
211
212
213
214
215
    for (si = l.begin(); si != l.end(); si++)
    {
        if ((*si)->id == id)
            cnt++;
    }
    return cnt;
}

// ****************************************************************************
216
//  Method: avtParICAlgorithm::ExchangeICSteps
pugmire's avatar
pugmire committed
217
218
219
220
221
222
223
224
225
226
227
//
//  Purpose:
//      Communicate streamlines pieces to destinations.
//      When a streamline is communicated, only the state information is sent.
//      All the integration steps need to sent to the proc that owns the terminated
//      streamline.  This method figures out where each streamline has terminated and
//      sends all the pieces there.
//
//  Programmer: Dave Pugmire
//  Creation:   September 21, 2009
//
228
229
230
231
232
//  Modifications:
//
//   Hank Childs, Fri Jun  4 19:58:30 CDT 2010
//   Use avtStreamlines, not avtStreamlineWrappers.
//
pugmire's avatar
pugmire committed
233
234
235
// ****************************************************************************

void
236
avtParICAlgorithm::ExchangeICSteps()
pugmire's avatar
pugmire committed
237
{
238
    debug5<<"ExchangeICSteps: communicatedICs: "<<communicatedICs.size();
239
    debug5<<" terminatedICs: "<<terminatedICs.size()<<endl;
pugmire's avatar
pugmire committed
240
241
242
243
244
245
246
247
248
249
250

    //Communicate to everyone where the terminators are located.
    //Do this "N" streamlines at a time, so we don't have a super big buffer.
    int N;
    if (numSeedPoints > 500)
        N = 500;
    else
        N = numSeedPoints;
    
    long *idBuffer = new long[2*N], *myIDs = new long[2*N];

251
252
253
    //Sort the terminated/communicated ICs by id.
    terminatedICs.sort(avtIntegralCurve::IdSeqCompare);
    communicatedICs.sort(avtIntegralCurve::IdSeqCompare);
pugmire's avatar
pugmire committed
254

255
    vector<vector<avtIntegralCurve *> >sendICs(N);
pugmire's avatar
pugmire committed
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
    vector<int> owners(N);
    
    int minId = 0;
    int maxId = N-1;
    int nLoops = numSeedPoints/N;
    if (numSeedPoints % N != 0)
        nLoops++;
    for (int l = 0; l < nLoops; l++)
    {
        //Initialize arrays for this round.
        for (int i = 0; i < N; i++)
        {
            idBuffer[i] = 0;
            idBuffer[i+N] = 0;
            myIDs[i] = 0;
            myIDs[i+N] = 0;
272
            sendICs[i].resize(0);
pugmire's avatar
pugmire committed
273
274
275
            owners[i] = 0;
        }

276
277
278
        //Set array for ICs that terminated here. Update sequence counts for communicated
        //ICs.
        list<avtIntegralCurve*>::iterator t = terminatedICs.begin();
279
        while (t != terminatedICs.end() && (*t)->id <= maxId)
pugmire's avatar
pugmire committed
280
281
282
283
284
285
        {
            if ((*t)->id >= minId)
            {
                int idx = (*t)->id % N;
                myIDs[idx] = rank;
                myIDs[idx+N] += 1;
pugmire's avatar
pugmire committed
286
                debug5<<"I own id= "<<(*t)->id<<" "<<(*t)->sequenceCnt<<" idx= "<<idx<<endl;
pugmire's avatar
pugmire committed
287
288
289
290
291
            }

            t++;
        }
        
292
293
        list<avtIntegralCurve*>::const_iterator c = communicatedICs.begin();
        while (c != communicatedICs.end() && (*c)->id <= maxId)
pugmire's avatar
pugmire committed
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
        {
            if ((*c)->id >= minId)
            {
                int idx = (*c)->id % N;
                myIDs[idx+N] += 1;
                debug5<<"I have "<<(*c)->id<<" "<<(*c)->sequenceCnt<<" idx= "<<idx<<endl;
            }
            c++;
        }
        
        //Exchange ID owners and sequence counts.
        MPI_Allreduce(myIDs, idBuffer, 2*N, MPI_LONG, MPI_SUM, VISIT_MPI_COMM);
        if (0)
        {
            debug5<<"idBuffer:  [";
            for(int i=0; i<2*N;i++)
                debug5<<idBuffer[i]<<" ";
            debug5<<"]"<<endl;
        }
        
314
315
316
        //Now we know where all ICs belong and how many sequences for each.
        //Send communicatedICs to the owners.
        while (!communicatedICs.empty())
pugmire's avatar
pugmire committed
317
        {
318
            avtIntegralCurve *s = communicatedICs.front();
pugmire's avatar
pugmire committed
319
320
321
322
323
            if (s->id <= maxId)
            {
                int idx = s->id%N;
                int owner = idBuffer[idx];
                if (owner == rank)
324
                    terminatedICs.push_back(s);
pugmire's avatar
pugmire committed
325
326
                else
                {
327
328
                    s->serializeFlags = avtIntegralCurve::SERIALIZE_STEPS; //Write IC steps.
                    sendICs[idx].push_back(s);
pugmire's avatar
pugmire committed
329
330
                    owners[idx] = owner;
                }
331
                communicatedICs.pop_front();
pugmire's avatar
pugmire committed
332
333
334
335
336
337
338
            }
            else
                break;
        }
        
        for (int i = 0; i < N; i++)
        {
339
            if (sendICs[i].size() > 0)
pugmire's avatar
pugmire committed
340
            {
341
                DoSendICs(owners[i], sendICs[i]);
pugmire's avatar
pugmire committed
342

343
344
                for (int j = 0; j < sendICs[i].size(); j++)
                    delete sendICs[i][j];
pugmire's avatar
pugmire committed
345
346
347
348
            }
        }
        
        //Wait for all the sequences to arrive. The total number is known for
349
        //each IC, so wait until they all come.
pugmire's avatar
pugmire committed
350
351
352
        bool seqGathered = false;
        while (!seqGathered)
        {
353
            RecvICs(terminatedICs);
pugmire's avatar
pugmire committed
354
355
            
            //See if we have all the sequences we need.
356
            terminatedICs.sort(avtIntegralCurve::IdSeqCompare);
pugmire's avatar
pugmire committed
357
358
359
            bool needMore = false;
            for (int i = 0; i < N && !needMore; i++)
                if (idBuffer[i] == rank)
360
                    needMore = (CountIDs(terminatedICs, i+minId) < idBuffer[i+N]);
pugmire's avatar
pugmire committed
361
362
363
364
365
366
367
368
369
370
            
            //Everyone done.
            seqGathered = !needMore;
        }
        
        //Advance to next N streamlines.
        maxId += N;
        minId += N;
    }

371
372
    //All ICs are distributed, merge the sequences into single streamlines.
    MergeTerminatedICSequences();
pugmire's avatar
pugmire committed
373
374
375
376
377
378
    
    delete [] idBuffer;
    delete [] myIDs;
}

// ****************************************************************************
379
//  Method: avtParICAlgorithm::MergeTerminatedICSequences
pugmire's avatar
pugmire committed
380
381
382
383
384
385
386
//
//  Purpose:
//      Merge streamline sequences.
//
//  Programmer: Dave Pugmire
//  Creation:   Sept 21, 2009
//
387
388
389
390
391
//  Modifications:
//
//   Hank Childs, Fri Jun  4 19:58:30 CDT 2010
//   Use avtStreamlines, not avtStreamlineWrappers.
//
pugmire's avatar
pugmire committed
392
393
394
// ****************************************************************************

void
395
avtParICAlgorithm::MergeTerminatedICSequences()
pugmire's avatar
pugmire committed
396
397
{
    //Sort them by id and sequence so we can process them one at a time.
398
    terminatedICs.sort(avtIntegralCurve::IdSeqCompare);
pugmire's avatar
pugmire committed
399
400

    //Split them up into sequences.
401
    vector<vector<avtIntegralCurve *> > seqs;
402
    while (!terminatedICs.empty())
pugmire's avatar
pugmire committed
403
    {
404
        avtIntegralCurve *s = terminatedICs.front();
405
        terminatedICs.pop_front();
pugmire's avatar
pugmire committed
406
407
408
409
410
        
        //Empty or new ID, add a new entry.
        if (seqs.size() == 0 ||
            seqs[seqs.size()-1][0]->id != s->id)
        {
411
            vector<avtIntegralCurve *> v;
pugmire's avatar
pugmire committed
412
413
414
415
416
417
418
419
            v.push_back(s);
            seqs.push_back(v);
        }
        else
        {
            seqs[seqs.size()-1].push_back(s);
        }
    }
420
    terminatedICs.clear();
pugmire's avatar
pugmire committed
421
422
423
424
    
    //Merge the sequences together, put them into terminated list.
    for (int i = 0; i < seqs.size(); i++)
    {
425
        avtIntegralCurve *s = seqs[i][0]->MergeIntegralCurveSequence(seqs[i]);
426
        terminatedICs.push_back(s);
pugmire's avatar
pugmire committed
427
428
429
    }
}

430
// ****************************************************************************
431
//  Method: avtParICAlgorithm::CleanupAsynchronous
432
433
434
435
436
437
438
439
//
//  Purpose:
//      Claenup the buffers used when doing asynchronous processing.
//
//  Programmer: Dave Pugmire
//  Creation:   June 16, 2008
//
// ****************************************************************************
pugmire's avatar
pugmire committed
440

441
void
442
avtParICAlgorithm::CleanupAsynchronous()
443
444
445
446
447
448
449
450
{
    for (int i = 0; i < statusRecvRequests.size(); i++)
    {
        MPI_Request req = statusRecvRequests[i];
        if (req != MPI_REQUEST_NULL)
            MPI_Cancel(&req);
    } 

451
    for (int i = 0; i < icRecvRequests.size(); i++)
452
    {
453
        MPI_Request req = icRecvRequests[i];
454
455
456
457
458
459
        if (req != MPI_REQUEST_NULL)
            MPI_Cancel(&req);
    }

    // Cleanup recv buffers.
    map<MPI_Request, unsigned char*>::const_iterator it;
460
    for (it = recvICBufferMap.begin(); it != recvICBufferMap.end(); ++it)
461
462
463
464
465
    {
        char *buff = (char *)it->second;
        if (it->second != NULL)
            delete [] it->second;
    }
466
    recvICBufferMap.clear();
467
468
469
470
471
472
473
474
475
476
477
478
479

    map<MPI_Request, int*>::const_iterator itt;
    for (itt = recvIntBufferMap.begin(); itt != recvIntBufferMap.end(); ++itt)
    {
        char *buff = (char *)itt->second;
        if (itt->second != NULL)
            delete [] itt->second;
    }
    recvIntBufferMap.clear();
}


// ****************************************************************************
480
//  Method: avtParICAlgorithm::CheckPendingSendRequests
481
482
483
484
485
486
487
//
//  Purpose:
//      Check to see if there are any pending send requests.
//
//  Programmer: Dave Pugmire
//  Creation:   June 16, 2008
//
488
489
490
491
492
//  Modifications:
//
//    Dave Pugmire, Wed Mar 18 17:07:07 EDT 2009
//    Delete entry from map after send is complete.
//
493
494
//   Dave Pugmire, Mon Mar 23 12:48:12 EDT 2009
//   Change how timings are reported/calculated.
495
496
497
//
//   Dave Pugmire, Sat Mar 28 22:21:49 EDT 2009
//   Bug fix. "notCompleted" wasn't in an else clause for the INT messages.
498
//   
499
500
501
//   Hank Childs, Sat Feb 20 16:53:18 CST 2010
//   Don't output timing values to the timing logs.
//
502
503
// ****************************************************************************
void
504
avtParICAlgorithm::CheckPendingSendRequests()
505
{
506
    debug5<<"avtParICAlgorithm::CheckPendingSendRequests()\n";
507
508
    int communicationTimer = visitTimer->StartTimer();
    
509
    if (sendICBufferMap.size() > 0)
510
511
512
513
514
    {
        vector<MPI_Request> req, copy;

        int notCompleted = 0;
        map<MPI_Request, unsigned char*>::const_iterator it;
515
        for (it = sendICBufferMap.begin(); it != sendICBufferMap.end(); ++it)
516
517
518
519
520
521
522
523
524
525
        {
            if (it->first != MPI_REQUEST_NULL && it->second != NULL)
            {
                req.push_back(it->first);
                copy.push_back(it->first);
            }
            else
                notCompleted++;
        }

526
        debug5 << "\tCheckPendingSendRequests() IC completed = "<<req.size()
527
528
529
530
531
532
533
534
535
536
537
538
539
               <<" not completed: "<<notCompleted<<endl;

        if (req.size() > 0)
        {
            // See if any sends have completed. Delete buffers if they have.
            int num = 0, *indices = new int[req.size()];
            MPI_Status *status = new MPI_Status[req.size()];
            int err = MPI_Testsome(req.size(), &req[0], &num, indices, status);
            
            for (int i = 0; i < num; i++)
            {
                int idx = indices[i];
                MPI_Request r = copy[idx];
540
                unsigned char *buff = sendICBufferMap[r];
541
542
543
544
                debug5 << "\tidx = " << idx << " r = " << r << " buff = " 
                       << (void *)buff << endl;
                if (buff)
                    delete [] buff;
545

546
547
                sendICBufferMap[r] = NULL;
                sendICBufferMap.erase(r);
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
            }
            
            delete [] indices;
            delete [] status;
        }
    }

    if (sendIntBufferMap.size() > 0)
    {
        vector<MPI_Request> req, copy;
        map<MPI_Request, int*>::const_iterator it;
        int notCompleted = 0;

        for (it = sendIntBufferMap.begin(); it != sendIntBufferMap.end(); ++it)
        {
            if (it->first != MPI_REQUEST_NULL && it->second != NULL)
            {
                req.push_back(it->first);
                copy.push_back(it->first);
            }
568
569
            else
                notCompleted++;
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
        }

        debug5 << "\tCheckPendingSendRequests() INT completed = "<<req.size()
               <<" not completed: "<<notCompleted<<endl;
        
        if (req.size() > 0)
        {
            // See if any sends have completed. Delete buffers if they have.
            int num = 0, *indices = new int[req.size()];
            MPI_Status *status = new MPI_Status[req.size()];
            int err = MPI_Testsome(req.size(), &req[0], &num, indices, status);
            
            for (int i = 0; i < num; i++)
            {
                int idx = indices[i];
                MPI_Request r = copy[idx];
                int *buff = sendIntBufferMap[r];
                debug5 << "\tidx = " << idx << " r = " << r << " buff = " 
                       << (void *)buff << endl;
                if (buff)
                    delete [] buff;
591

592
                sendIntBufferMap[r] = NULL;
593
                sendIntBufferMap.erase(r);
594
595
596
597
598
599
600
            }
            
            delete [] indices;
            delete [] status;
        }
    }

601
602
    bool nov = visitTimer->GetNeverOutputValue();
    visitTimer->NeverOutput(true);
603
604
    CommTime.value += visitTimer->StopTimer(communicationTimer, 
                                            "CheckPending");
605
    visitTimer->NeverOutput(nov);
606
607
608
609
    debug5 << "DONE  CheckPendingSendRequests()\n";
}

// ****************************************************************************
610
//  Method: avtParICAlgorithm::PostRecvStatusReq
611
612
613
614
615
616
617
//
//  Purpose:
//      Receives status requests.
//
//  Programmer: Dave Pugmire
//  Creation:   June 16, 2008
//
618
619
620
//   Dave Pugmire, Wed Apr  1 11:21:05 EDT 2009
//   Limit the number of async recvs outstanding.
//
621
622
623
// ****************************************************************************

void
624
avtParICAlgorithm::PostRecvStatusReq(int idx)
625
626
627
{
    MPI_Request req;
    int *buff = new int[statusMsgSz];
628
629

    MPI_Irecv(buff, statusMsgSz, MPI_INT, MPI_ANY_SOURCE,
630
              avtParICAlgorithm::STATUS_TAG,
631
              VISIT_MPI_COMM, &req);
632
633
    debug5 << "Post Statusrecv " <<idx<<" req= "<<req<<endl;
    statusRecvRequests[idx] = req;
634
635
636
637
    recvIntBufferMap[req] = buff;
}

// ****************************************************************************
638
//  Method: avtParICAlgorithm::PostRecvICReq
639
640
641
642
643
644
645
//
//  Purpose:
//      Receives status requests.
//
//  Programmer: Dave Pugmire
//  Creation:   June 16, 2008
//
646
647
648
649
650
//  Modifications:
//
//   Dave Pugmire, Wed Apr  1 11:21:05 EDT 2009
//   Limit the number of async recvs outstanding.
//
651
652
653
// ****************************************************************************

void
654
avtParICAlgorithm::PostRecvICReq(int idx)
655
656
{
    MPI_Request req;
657
658
    unsigned char *buff = new unsigned char[icMsgSz];
    MPI_Irecv(buff, icMsgSz,
659
              MPI_UNSIGNED_CHAR, MPI_ANY_SOURCE,
660
              avtParICAlgorithm::STREAMLINE_TAG, 
661
662
              VISIT_MPI_COMM, &req);

663
664
665
    debug5 << "Post ICrecv " <<idx<<" req= "<<req<<endl;
    icRecvRequests[idx] = req;
    recvICBufferMap[req] = buff;
666
667
668
669
}


// ****************************************************************************
670
//  Method: avtParICAlgorithm::SendMsg
671
672
673
674
675
676
677
678
679
//
//  Purpose:
//      Send an asynchronous message.
//
//  Programmer: Dave Pugmire
//  Creation:   Dec 18, 2008
//
// Modifications:
//
680
681
//   Dave Pugmire, Wed Apr  1 11:21:05 EDT 2009
//   Add the senders rank and msgID to the front of the message.
682
//
683
684
685
//   Hank Childs, Sat Feb 20 16:53:18 CST 2010
//   Don't output timing values to the timing logs.
//
686
687
688
// ****************************************************************************

void
689
avtParICAlgorithm::SendMsg(int dst,
690
691
692
693
694
695
696
                           vector<int> &msg)
{
    int communicationTimer = visitTimer->StartTimer();
    if (msg.size() > statusMsgSz)
        EXCEPTION0(ImproperUseException);
    
    int *buff = new int[statusMsgSz];
697
698
699
    buff[0] = msgID;
    msgID++;
    buff[1] = rank;
700
701
702
    
    MPI_Request req;
    for (int i = 0; i < msg.size(); i++)
703
704
705
706
707
        buff[2+i] = msg[i];

    debug5<<"SendMsg to :"<<dst<<" [";
    for(int i = 0; i < statusMsgSz; i++) debug5<<buff[i]<<" ";
    debug5<<"]"<<endl;
708
        
709
    int err = MPI_Isend(buff, statusMsgSz, MPI_INT, dst,
710
                        avtParICAlgorithm::STATUS_TAG,
711
                        VISIT_MPI_COMM, &req);
712
713
714
715
716

    sendIntBufferMap[req] = buff;
    
    BytesCnt.value += (sizeof(int) *statusMsgSz);
    MsgCnt.value++;
717
718
    bool nov = visitTimer->GetNeverOutputValue();
    visitTimer->NeverOutput(true);
719
720
    CommTime.value += visitTimer->StopTimer(communicationTimer, 
                                            "SendMsg");
721
722
    visitTimer->NeverOutput(nov);
    debug5 << "DONE  CheckPendingSendRequests()\n";
723
724
725
}

// ****************************************************************************
726
//  Method: avtParICAlgorithm::SendAllMsg
727
728
729
730
731
732
733
734
735
736
737
738
739
//
//  Purpose:
//      Broadcast a message.
//
//  Programmer: Dave Pugmire
//  Creation:   Dec 18, 2008
//
// Modifications:
//
//
// ****************************************************************************

void
740
avtParICAlgorithm::SendAllMsg(vector<int> &msg)
741
742
743
744
745
746
747
{
    for (int i = 0; i < nProcs; i++)
        if (i != rank)
            SendMsg(i, msg);
}

// ****************************************************************************
748
//  Method: avtParICAlgorithm::RecvMsgs
749
750
751
752
753
754
755
756
757
//
//  Purpose:
//      Recieve any messages.
//
//  Programmer: Dave Pugmire
//  Creation:   Dec 18, 2008
//
// Modifications:
//
758
759
//   Dave Pugmire, Mon Mar 23 12:48:12 EDT 2009
//   Change how timings are reported/calculated.
760
761
762
//
//   Dave Pugmire, Wed Apr  1 11:21:05 EDT 2009
//   Senders rank and msgID is in the message now.
763
//   
764
765
766
//   Hank Childs, Sat Feb 20 16:53:18 CST 2010
//   Don't output timing values to the timing logs.
//
767
768
769
// ****************************************************************************

void
770
avtParICAlgorithm::RecvMsgs(std::vector<std::vector<int> > &msgs)
771
{
772
    debug5<<"avtParICAlgorithm::RecvMsgs()\n";
773
    int communicationTimer = visitTimer->StartTimer();
774
775
776
777
778
779
780
781
782
783
784
785
    
    msgs.resize(0);
    while (true)
    {
        int nReq = statusRecvRequests.size();
        MPI_Status *status = new MPI_Status[nReq];
        int *indices = new int[nReq];
        int num = 0, err;

        vector<MPI_Request> copy;
        for (int i = 0; i < statusRecvRequests.size(); i++)
            copy.push_back(statusRecvRequests[i]);
786

787
        err = MPI_Testsome(nReq, &copy[0], &num, indices, status);
788
        debug5<<"::RecvMsgs() err= "<<err<<" Testsome("<<nReq<<"); num= "<<num<<endl;
789
790
791
792
793
794

        if (num > 0)
        {
            for (int i = 0; i < num; i++)
            {
                int idx = indices[i];
795
                debug5<<"RecvMsg from "<<idx<<endl;
796
797
798
799
800
801
802
803
804
805

                MPI_Request req = statusRecvRequests[idx];
                if (req == MPI_REQUEST_NULL)
                    continue;
                
                int *buff = recvIntBufferMap[req];
                recvIntBufferMap.erase(req);
                if (buff == NULL)
                    continue;

806
807
808
809
810
                debug5<<"RecvMsg: [";
                for(int i = 0; i < statusMsgSz; i++) debug5<<buff[i]<<" ";
                debug5<<"]"<<endl;
                
                //Skip msg ID, copy buffer int msg.
811
                vector<int> msg;
812
                for (int i = 1; i < statusMsgSz; i++)
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
                    msg.push_back(buff[i]);
                msgs.push_back(msg);

                //Clean up.
                delete [] buff;
            }
        
            //Repost recv requests.
            for (int i = 0; i < num; i++)
                PostRecvStatusReq(indices[i]);
        }
            
        delete [] status;
        delete [] indices;
        if (num == 0)
            break;
    }
830
831
832
    bool nov = visitTimer->GetNeverOutputValue();
    visitTimer->NeverOutput(true);
    debug5 << "DONE  CheckPendingSendRequests()\n";
833
834
    CommTime.value += visitTimer->StopTimer(communicationTimer,
                                            "RecvMsgs");
835
    visitTimer->NeverOutput(nov);
836
837
838
}

// ****************************************************************************
839
//  Method: avtParICAlgorithm::SendICs
840
841
842
843
844
845
846
847
848
849
850
851
//
//  Purpose:
//      Send streamlines to a dst.
//
//  Programmer: Dave Pugmire
//  Creation:   June 16, 2008
//
//  Modifications:
//
//   Dave Pugmire, Fri Aug 22 14:47:11 EST 2008
//   Memory leak fix.
//
pugmire's avatar
pugmire committed
852
//   Dave Pugmire, Thu Sep 24 14:03:46 EDT 2009
853
//   Call new method, DoSendICs.
pugmire's avatar
pugmire committed
854
//
855
856
857
//   Hank Childs, Fri Jun  4 19:58:30 CDT 2010
//   Use avtStreamlines, not avtStreamlineWrappers.
//
858
859
860
// ****************************************************************************

void
861
862
avtParICAlgorithm::SendICs(int dst, 
                           vector<avtIntegralCurve*> &ics)
863
{
pugmire's avatar
pugmire committed
864

865
    for (int i = 0; i < ics.size(); i++)
pugmire's avatar
pugmire committed
866
    {
867
868
        avtIntegralCurve *ic = ics[i];
        ic->serializeFlags |= avtIntegralCurve::SERIALIZE_INC_SEQ;
pugmire's avatar
pugmire committed
869
870
    }

871
    if (DoSendICs(dst, ics))
pugmire's avatar
pugmire committed
872
    {
873
        for (int i = 0; i < ics.size(); i++)
pugmire's avatar
pugmire committed
874
        {
875
            avtIntegralCurve *ic = ics[i];
pugmire's avatar
pugmire committed
876
877
            
            //Add if id/seq is unique. (single streamlines can be sent to multiple dst).
878
            list<avtIntegralCurve*>::const_iterator si = communicatedICs.begin();
pugmire's avatar
pugmire committed
879
            bool found = false;
880
881
            for (si = communicatedICs.begin(); !found && si != communicatedICs.end(); si++)
                found = ((*si)->id == ic->id && (*si)->sequenceCnt == ic->sequenceCnt);
pugmire's avatar
pugmire committed
882
883
        
            if (!found)
884
                communicatedICs.push_back(ic);
pugmire's avatar
pugmire committed
885
886
887
        }
        
        //Empty the array.
888
        ics.resize(0);
pugmire's avatar
pugmire committed
889
890
891
892
893
    }
}


// ****************************************************************************
894
//  Method: avtParICAlgorithm::DoSendICs
pugmire's avatar
pugmire committed
895
896
897
898
899
900
901
902
903
//
//  Purpose:
//      Send streamlines to a dst.
//
//  Programmer: Dave Pugmire
//  Creation:   September 24, 2009
//
//  Modifications:
//
904
905
906
//   Hank Childs, Sat Feb 20 16:53:18 CST 2010
//   Don't output timing values to the timing logs.
//
907
908
909
//   Hank Childs, Fri Jun  4 19:58:30 CDT 2010
//   Use avtStreamlines, not avtStreamlineWrappers.
//
pugmire's avatar
pugmire committed
910
911
912
// ****************************************************************************

bool
913
914
avtParICAlgorithm::DoSendICs(int dst, 
                             vector<avtIntegralCurve*> &ics)
pugmire's avatar
pugmire committed
915
{
916
    if (dst == rank)
pugmire's avatar
pugmire committed
917
        return false;
918
  
919
    size_t szz = ics.size();
920
    if (szz == 0)
pugmire's avatar
pugmire committed
921
        return false;
922
923
924
925

    int communicationTimer = visitTimer->StartTimer();
    MemStream buff;
    buff.write(&szz, 1);
pugmire's avatar
pugmire committed
926

927
    for (int i = 0; i < ics.size(); i++)
928
    {
929
930
931
        avtIntegralCurve *ic = ics[i];
        ic->Serialize(MemStream::WRITE, buff, GetSolver());
        ICCommCnt.value ++;
932
933
934
    }
    
    // Break it up into multiple messages if needed.
935
    if (buff.buffLen() > icMsgSz)
936
937
938
939
940
941
942
943
944
945
        EXCEPTION0(ImproperUseException);
    
    // Copy it into a byte buffer.
    size_t sz = buff.buffLen();
    unsigned char *msg = new unsigned char[sz];
    memcpy(msg, buff.buff(), sz);

    //Send it along.
    MPI_Request req;
    int err = MPI_Isend(msg, sz, MPI_UNSIGNED_CHAR, dst,
946
                        avtParICAlgorithm::STREAMLINE_TAG,
947
                        VISIT_MPI_COMM, &req);
948
    debug5<<err<<" = MPI_Isend(msg, "<<sz<<", MPI_UNSIGNED_CHAR, to "<<dst<<", req= "<<req<<endl;
949
    sendICBufferMap[req] = msg;
950
951

    BytesCnt.value += sz;
952
953
    bool nov = visitTimer->GetNeverOutputValue();
    visitTimer->NeverOutput(true);
954
    CommTime.value += visitTimer->StopTimer(communicationTimer,
955
                                            "SendICs");
956
957
    visitTimer->NeverOutput(nov);
    debug5 << "DONE  CheckPendingSendRequests()\n";
pugmire's avatar
pugmire committed
958
    return true;
959
960
961
}

// ****************************************************************************
962
//  Method: avtParICAlgorithm::RecvICs
963
964
965
966
967
//
//  Purpose:
//      Recv streamlines.
//
//  Programmer: Dave Pugmire
968
//  Creation:   Mon Mar 16 15:45:11 EDT 2009
969
//
970
971
972
973
//  Modifications:
//
//  Dave Pugmire, Wed Mar 18 17:17:40 EDT 2009
//  RecvSLs broken into two methods.
974
975
976
//  
//  Dave Pugmire, Mon Mar 23 12:48:12 EDT 2009
//  Change how timings are reported/calculated.
977
//
978
979
980
//  Hank Childs, Sat Feb 20 16:53:18 CST 2010
//  Don't output timing values to the timing logs.
//
981
982
983
//  Hank Childs, Fri Jun  4 19:58:30 CDT 2010
//  Use avtStreamlines, not avtStreamlineWrappers.
//
984
985
986
987
//  Hank Childs, Sat Jun  5 16:21:27 CDT 2010
//  Use the PICS filter to instantiate integral curves, as this is now
//  an abstract type.
//
988
// ****************************************************************************
989

990
int
991
avtParICAlgorithm::RecvICs(list<avtIntegralCurve *> &recvICs)
992
{
993
    int communicationTimer = visitTimer->StartTimer();
994
    int icCount = 0;
995
996
997

    while (true)
    {
998
        int nReq = icRecvRequests.size();
999
1000
1001
1002
1003
        MPI_Status *status = new MPI_Status[nReq];
        int *indices = new int[nReq];
        int num = 0, err;

        vector<MPI_Request> copy;
1004
1005
        for (int i = 0; i < icRecvRequests.size(); i++)
            copy.push_back(icRecvRequests[i]);
1006
1007
1008
1009
1010
1011
1012
        err = MPI_Testsome(nReq, &copy[0], &num, indices, status);

        if (num > 0)
        {
            for (int i = 0; i < num; i++)
            {
                int idx = indices[i];
1013
                MPI_Request req = icRecvRequests[idx];
1014
1015
1016
                if (req == MPI_REQUEST_NULL)
                    continue;
                
1017
                //Grab the bytes, unserialize them, add to list.
1018
1019
                unsigned char *msg = recvICBufferMap[req];
                recvICBufferMap.erase(req);
1020
1021
1022
                if (msg == NULL)
                    continue;
        
1023
                MemStream buff(icMsgSz, msg);
1024
1025
1026
                delete [] msg;
                msg = NULL;

1027
1028
                size_t numICs;
                buff.read(numICs);
1029

1030
                for (int j = 0; j < numICs; j++)
1031
                {
1032
1033
1034
1035
                    avtIntegralCurve *ic = picsFilter->CreateIntegralCurve();
                    ic->Serialize(MemStream::READ, buff, GetSolver());
                    recvICs.push_back(ic);
                    icCount++;
1036
1037
1038
1039
                }
            }

            for (int i = 0; i < num; i++)
1040
                PostRecvICReq(indices[i]);
1041
1042
1043
1044
1045
1046
1047
1048
        }

        delete [] status;
        delete [] indices;
        
        if (num == 0)
            break;
    }
1049
    
1050
1051
    bool nov = visitTimer->GetNeverOutputValue();
    visitTimer->NeverOutput(true);
1052
    CommTime.value += visitTimer->StopTimer(communicationTimer,
1053
                                            "RecvICs");
1054
1055
    visitTimer->NeverOutput(nov);
    debug5 << "DONE  CheckPendingSendRequests()\n";
1056
    return icCount;
1057
1058
}

1059
// ****************************************************************************
1060
//  Method: avtParICAlgorithm::RecvICs
1061
1062
1063
1064
1065
1066
1067
1068
1069
1070
1071
1072
1073
1074
1075
//
//  Purpose:
//      Recv streamlines.
//
//  Programmer: Dave Pugmire
//  Creation:   June 16, 2008
//
//  Modifications:
//
//    Dave Pugmire, Mon Mar 16 15:45:11 EDT 2009
//    Call the other RecvSLs and then check for domain inclusion.
//
//    Dave Pugmire, Tue Mar 17 12:02:10 EDT 2009
//    Use new new RecvSLs method, then check for terminations.
//
1076
1077
1078
//    Hank Childs, Fri Jun  4 03:52:48 PDT 2010
//    Rename GetEndPt to GetCurrentLocation.
//
1079
1080
1081
//    Hank Childs, Fri Jun  4 19:58:30 CDT 2010
//    Use avtStreamlines, not avtStreamlineWrappers.
//
1082
1083
// ****************************************************************************
int
1084
avtParICAlgorithm::RecvICs(list<avtIntegralCurve *> &streamlines,