Engine.C 57.9 KB
Newer Older
hrchilds's avatar
hrchilds committed
1
2
#include <Engine.h>
#include <Executors.h>
hrchilds's avatar
hrchilds committed
3

hrchilds's avatar
hrchilds committed
4
5
#if !defined(_WIN32)
#include <strings.h>
hrchilds's avatar
hrchilds committed
6
#include <sys/types.h>   // for getpid()
hrchilds's avatar
hrchilds committed
7
8
#include <unistd.h>      // for alarm()
#endif
hrchilds's avatar
hrchilds committed
9
#include <new.h>
hrchilds's avatar
hrchilds committed
10

hrchilds's avatar
hrchilds committed
11
#include <visitstream.h>
hrchilds's avatar
hrchilds committed
12
#include <snprintf.h>
hrchilds's avatar
hrchilds committed
13
14
15
16
17

#include <BufferConnection.h>
#include <CouldNotConnectException.h>
#include <DefineVirtualDatabaseRPC.h>
#include <IncompatibleVersionException.h>
hrchilds's avatar
hrchilds committed
18
#include <ExpressionList.h>
hrchilds's avatar
hrchilds committed
19
#include <ExprParser.h>
hrchilds's avatar
hrchilds committed
20
#include <ParsingExprList.h>
hrchilds's avatar
hrchilds committed
21
#include <avtExprNodeFactory.h>
hrchilds's avatar
hrchilds committed
22
23
#include <Init.h>
#include <InitVTK.h>
hrchilds's avatar
hrchilds committed
24
25
#include <LoadBalancer.h>
#include <LostConnectionException.h>
hrchilds's avatar
hrchilds committed
26
#include <Netnodes.h>
hrchilds's avatar
hrchilds committed
27
#include <ParentProcess.h>
hrchilds's avatar
hrchilds committed
28
#include <QueryAttributes.h>
hrchilds's avatar
hrchilds committed
29
#include <SILAttributes.h>
hrchilds's avatar
hrchilds committed
30
#include <SocketConnection.h>
hrchilds's avatar
hrchilds committed
31

hrchilds's avatar
hrchilds committed
32
#include <avtDatabaseMetaData.h>
hrchilds's avatar
hrchilds committed
33
34
35
#include <avtDataObjectReader.h>
#include <avtDataObjectString.h>
#include <avtDataObjectWriter.h>
hrchilds's avatar
hrchilds committed
36
#include <avtDataset.h>
hrchilds's avatar
hrchilds committed
37
38
#include <avtFilter.h>
#include <avtOriginatingSink.h>
hrchilds's avatar
hrchilds committed
39
#include <avtStreamer.h>
hrchilds's avatar
hrchilds committed
40
#include <avtTerminatingSource.h>
hrchilds's avatar
hrchilds committed
41
#include <avtVariableMapper.h>
hrchilds's avatar
hrchilds committed
42
43
44
45
46
47
48
#include <vtkDataSetWriter.h>

#include <string>
using std::string;

#ifdef PARALLEL
#include <parallel.h>
hrchilds's avatar
hrchilds committed
49
#include <avtParallel.h>
hrchilds's avatar
hrchilds committed
50
51
52
53
#else
#include <Xfer.h>
#endif

hrchilds's avatar
hrchilds committed
54
55
// Static data
Engine *Engine::instance = NULL;
hrchilds's avatar
hrchilds committed
56

hrchilds's avatar
hrchilds committed
57
58
59
60
// Static method
static void WriteByteStreamToSocket(NonBlockingRPC *, Connection *,
                                    avtDataObjectString &);

hrchilds's avatar
hrchilds committed
61
62
// Initial connection timeout of 5 minutes (300 seconds)
#define INITIAL_CONNECTION_TIMEOUT 60
hrchilds's avatar
hrchilds committed
63

hrchilds's avatar
hrchilds committed
64
65
// message tag for interrupt messages used in static abort callback function
#ifdef PARALLEL
hrchilds's avatar
hrchilds committed
66
const int INTERRUPT_MESSAGE_TAG = GetUniqueStaticMessageTag();
hrchilds's avatar
hrchilds committed
67
68
#endif

hrchilds's avatar
hrchilds committed
69
// ****************************************************************************
hrchilds's avatar
hrchilds committed
70
//  Constructor:  Engine::Engine
hrchilds's avatar
hrchilds committed
71
//
hrchilds's avatar
hrchilds committed
72
73
//  Programmer:  Jeremy Meredith
//  Creation:    July 10, 2003
hrchilds's avatar
hrchilds committed
74
//
hrchilds's avatar
hrchilds committed
75
76
77
78
//  Modifications:
//
//    Mark C. Miller, Tue Mar  8 17:59:40 PST 2005
//    Added procAtts
hrchilds's avatar
hrchilds committed
79
// ****************************************************************************
hrchilds's avatar
hrchilds committed
80
Engine::Engine()
hrchilds's avatar
hrchilds committed
81
{
hrchilds's avatar
hrchilds committed
82
83
84
85
86
    vtkConnection = 0;
    noFatalExceptions = true;
    timeout = 0;
    netmgr = NULL;
    lb = NULL;
hrchilds's avatar
hrchilds committed
87
    procAtts = NULL;
hrchilds's avatar
hrchilds committed
88
89
90
}

// ****************************************************************************
hrchilds's avatar
hrchilds committed
91
//  Destructor:  Engine::~Engine
hrchilds's avatar
hrchilds committed
92
//
hrchilds's avatar
hrchilds committed
93
94
//  Programmer:  Jeremy Meredith
//  Creation:    July 10, 2003
hrchilds's avatar
hrchilds committed
95
96
//
// ****************************************************************************
hrchilds's avatar
hrchilds committed
97
Engine::~Engine()
hrchilds's avatar
hrchilds committed
98
{
hrchilds's avatar
hrchilds committed
99
100
101
102
103
    delete netmgr;
    delete xfer;
    delete lb;
    for (int i=0; i<rpcExecutors.size(); i++)
        delete rpcExecutors[i];
hrchilds's avatar
hrchilds committed
104
105
106
}

// ****************************************************************************
hrchilds's avatar
hrchilds committed
107
//  Method:  Engine::Instance
hrchilds's avatar
hrchilds committed
108
//
hrchilds's avatar
hrchilds committed
109
110
//  Programmer:  Jeremy Meredith
//  Creation:    July 10, 2003
hrchilds's avatar
hrchilds committed
111
112
//
// ****************************************************************************
hrchilds's avatar
hrchilds committed
113
Engine *Engine::Instance()
hrchilds's avatar
hrchilds committed
114
{
hrchilds's avatar
hrchilds committed
115
116
117
    if (!instance)
        instance = new Engine;
    return instance;
hrchilds's avatar
hrchilds committed
118
119
120
}

// ****************************************************************************
hrchilds's avatar
hrchilds committed
121
//  Method:  Engine::Initialize
hrchilds's avatar
hrchilds committed
122
123
//
//  Purpose:
hrchilds's avatar
hrchilds committed
124
125
126
127
128
//    Do all the initialization needed first.
//
//  Arguments:
//    argc
//    argv
hrchilds's avatar
hrchilds committed
129
//
hrchilds's avatar
hrchilds committed
130
131
132
133
//  Note: Broken off from old main().  See main.C for comment history.
//
//  Programmer:  Jeremy Meredith
//  Creation:    July 10, 2003
hrchilds's avatar
hrchilds committed
134
//
hrchilds's avatar
hrchilds committed
135
136
137
138
139
//  Modifications:
//
//    Mark C. Miller, Wed Jul  7 11:42:09 PDT 2004
//    Added code to override the new handler for the engine
//
hrchilds's avatar
hrchilds committed
140
141
142
//    Mark C. Miller, Mon Jul 12 19:46:32 PDT 2004
//    Wrapped call to set_new_handler with WIN32 conditional compilation
//
hrchilds's avatar
hrchilds committed
143
144
145
146
//    Eric Brugger, Tue Aug 31 10:45:57 PDT 2004
//    Added a call to PAR_CreateTypes since it is no longer called from
//    PAR_Init.
//
hrchilds's avatar
hrchilds committed
147
148
149
150
151
152
//    Jeremy Meredith, Mon Nov  1 13:26:23 PST 2004
//    Use a buffer connection that is visible at class scope for parallel
//    communication.  Before, it was an automatic variable in PAR_EventLoop
//    but I needed a separate PAR_ProcessInput method that used the same
//    buffer connection.
//
hrchilds's avatar
hrchilds committed
153
154
// ****************************************************************************
void
hrchilds's avatar
hrchilds committed
155
Engine::Initialize(int *argc, char **argv[])
hrchilds's avatar
hrchilds committed
156
{
hrchilds's avatar
hrchilds committed
157
#ifdef PARALLEL
hrchilds's avatar
hrchilds committed
158

hrchilds's avatar
hrchilds committed
159
    xfer = new MPIXfer;
hrchilds's avatar
hrchilds committed
160
    //
hrchilds's avatar
hrchilds committed
161
162
163
    // Initialize for MPI and get the process rank & size.
    //
    PAR_Init(*argc, *argv);
hrchilds's avatar
hrchilds committed
164

hrchilds's avatar
hrchilds committed
165
166
167
168
169
170
    //
    // Create the derived types and operators for sending messages
    // and collective operations.
    //
    PAR_CreateTypes();

hrchilds's avatar
hrchilds committed
171
172
173
174
175
176
177
178
179
180
    //
    // Initialize error logging
    //
    Init::Initialize(*argc, *argv, PAR_Rank(), PAR_Size());
#else
    xfer = new Xfer;
    Init::Initialize(*argc, *argv);
#endif
    Init::SetComponentName("engine");

hrchilds's avatar
hrchilds committed
181
182
183
    //
    // Set a different new handler for the engine
    //
hrchilds's avatar
hrchilds committed
184
#if !defined(_WIN32)
hrchilds's avatar
hrchilds committed
185
    set_new_handler(Engine::NewHandler);
hrchilds's avatar
hrchilds committed
186
#endif
hrchilds's avatar
hrchilds committed
187

hrchilds's avatar
hrchilds committed
188
189
190
191
192
193
194
195
196
#ifdef PARALLEL
    if (!PAR_UIProcess())
    {
        // Set the xfer object's input connection to be the buffer connection
        // of the object itself
        xfer->SetInputConnection(&par_conn);
    }
#endif

hrchilds's avatar
hrchilds committed
197
    debug1 << "ENGINE started\n";
hrchilds's avatar
hrchilds committed
198
199
}

hrchilds's avatar
hrchilds committed
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
// ****************************************************************************
//  Method:  Engine::Finalize
//
//  Purpose:
//      Do the finalization for the engine.
//
//  Programmer:  Hank Childs
//  Creation:    June 1, 2004
//
// ****************************************************************************
void
Engine::Finalize(void)
{
    Init::Finalize();
}

hrchilds's avatar
hrchilds committed
216
// ****************************************************************************
hrchilds's avatar
hrchilds committed
217
//  Method:  Engine::SetUpViewerInterface
hrchilds's avatar
hrchilds committed
218
//
hrchilds's avatar
hrchilds committed
219
220
//  Purpose:
//    Do all the initialization needed after we connect to the viewer.
hrchilds's avatar
hrchilds committed
221
//
hrchilds's avatar
hrchilds committed
222
223
224
//  Arguments:
//    argc
//    argv
hrchilds's avatar
hrchilds committed
225
//
hrchilds's avatar
hrchilds committed
226
//  Note: Broken off from old main().  See main.C for comment history.
hrchilds's avatar
hrchilds committed
227
//
hrchilds's avatar
hrchilds committed
228
229
//  Programmer:  Jeremy Meredith
//  Creation:    July 10, 2003
hrchilds's avatar
hrchilds committed
230
//
hrchilds's avatar
hrchilds committed
231
//  Modifications:
hrchilds's avatar
hrchilds committed
232
233
234
235
236
//    Sean Ahern, Fri Nov 22 16:09:26 PST 2002
//    Removed ApplyNamedFunction.
//
//    Jeremy Meredith, Mon Sep 15 17:14:07 PDT 2003
//    Removed SetFinalVariableName.
hrchilds's avatar
hrchilds committed
237
//
hrchilds's avatar
hrchilds committed
238
239
240
//    Hank Childs, Fri Mar  5 11:46:09 PST 2004
//    Made the database plugins be loaded on demand.
//
hrchilds's avatar
hrchilds committed
241
242
243
//    Brad Whitlock, Fri Mar 12 11:20:58 PDT 2004
//    I added keepAliveRPC.
//
hrchilds's avatar
hrchilds committed
244
245
246
//    Kathleen Bonnell, Wed Mar 31 16:53:03 PST 2004 
//    Added cloneNetworkRPC.
//
hrchilds's avatar
hrchilds committed
247
248
249
//    Jeremy Meredith, Wed Aug 25 10:09:14 PDT 2004
//    Added ability to send metadata and SIL atts back to the viewer.
//
hrchilds's avatar
hrchilds committed
250
251
252
//    Jeremy Meredith, Wed Nov 24 11:42:40 PST 2004
//    Renamed EngineExprNode to avtExprNode.
//
hrchilds's avatar
hrchilds committed
253
254
255
//    Hank Childs, Mon Feb 28 17:03:06 PST 2005
//    Added StartQueryRPC.
//
hrchilds's avatar
hrchilds committed
256
257
258
//    Mark C. Miller, Tue Mar  8 17:59:40 PST 2005
//    Added procInfoRPC 
//
hrchilds's avatar
hrchilds committed
259
// ****************************************************************************
hrchilds's avatar
hrchilds committed
260

hrchilds's avatar
hrchilds committed
261
262
void
Engine::SetUpViewerInterface(int *argc, char **argv[])
hrchilds's avatar
hrchilds committed
263
264
{
    //
hrchilds's avatar
hrchilds committed
265
    // Initialize the plugin managers.
hrchilds's avatar
hrchilds committed
266
    //
hrchilds's avatar
hrchilds committed
267
268
269
270
271
272
273
274
275
#ifdef PARALLEL
    PlotPluginManager::Initialize(PlotPluginManager::Engine, true);
    OperatorPluginManager::Initialize(OperatorPluginManager::Engine, true);
    DatabasePluginManager::Initialize(DatabasePluginManager::Engine, true);
#else
    PlotPluginManager::Initialize(PlotPluginManager::Engine, false);
    OperatorPluginManager::Initialize(OperatorPluginManager::Engine, false);
    DatabasePluginManager::Initialize(DatabasePluginManager::Engine, false);
#endif
hrchilds's avatar
hrchilds committed
276

hrchilds's avatar
hrchilds committed
277
278
279
    InitVTK::Initialize();
    InitVTK::ForceMesa();
    avtCallback::SetNowinMode(true);
hrchilds's avatar
hrchilds committed
280
281

    //
hrchilds's avatar
hrchilds committed
282
    // Load plugins
hrchilds's avatar
hrchilds committed
283
    //
hrchilds's avatar
hrchilds committed
284
285
    PlotPluginManager::Instance()->LoadPluginsOnDemand();
    OperatorPluginManager::Instance()->LoadPluginsOnDemand();
hrchilds's avatar
hrchilds committed
286
    DatabasePluginManager::Instance()->LoadPluginsOnDemand();
hrchilds's avatar
hrchilds committed
287

hrchilds's avatar
hrchilds committed
288
    vtkConnection = theViewer.GetReadConnection(1);
hrchilds's avatar
hrchilds committed
289
290

    //
hrchilds's avatar
hrchilds committed
291
    // Create the network manager.
hrchilds's avatar
hrchilds committed
292
    //
hrchilds's avatar
hrchilds committed
293
    netmgr = new NetworkManager;
hrchilds's avatar
hrchilds committed
294

hrchilds's avatar
hrchilds committed
295
296
    // Parse the command line.
    ProcessCommandLine(*argc, *argv);
hrchilds's avatar
hrchilds committed
297
298

#if !defined(_WIN32)
hrchilds's avatar
hrchilds committed
299
300
    // Set up the alarm signal handler.
    signal(SIGALRM, Engine::AlarmHandler);
hrchilds's avatar
hrchilds committed
301
302
#endif

hrchilds's avatar
hrchilds committed
303
304
    // Create some RPC objects and make Xfer observe them.
    quitRPC                         = new QuitRPC;
hrchilds's avatar
hrchilds committed
305
    keepAliveRPC                    = new KeepAliveRPC;
hrchilds's avatar
hrchilds committed
306
307
308
309
310
311
312
    readRPC                         = new ReadRPC;
    applyOperatorRPC                = new ApplyOperatorRPC;
    makePlotRPC                     = new MakePlotRPC;
    useNetworkRPC                   = new UseNetworkRPC;
    updatePlotAttsRPC               = new UpdatePlotAttsRPC;
    pickRPC                         = new PickRPC;
    startPickRPC                    = new StartPickRPC;
hrchilds's avatar
hrchilds committed
313
    startQueryRPC                   = new StartQueryRPC;
hrchilds's avatar
hrchilds committed
314
315
316
317
318
319
320
    executeRPC                      = new ExecuteRPC;
    clearCacheRPC                   = new ClearCacheRPC;
    queryRPC                        = new QueryRPC;
    releaseDataRPC                  = new ReleaseDataRPC;
    openDatabaseRPC                 = new OpenDatabaseRPC;
    defineVirtualDatabaseRPC        = new DefineVirtualDatabaseRPC;
    renderRPC                       = new RenderRPC;
hrchilds's avatar
hrchilds committed
321
    setWinAnnotAttsRPC              = new SetWinAnnotAttsRPC;
hrchilds's avatar
hrchilds committed
322
    cloneNetworkRPC                 = new CloneNetworkRPC;
hrchilds's avatar
hrchilds committed
323
    procInfoRPC                     = new ProcInfoRPC;
hrchilds's avatar
hrchilds committed
324
325

    xfer->Add(quitRPC);
hrchilds's avatar
hrchilds committed
326
    xfer->Add(keepAliveRPC);
hrchilds's avatar
hrchilds committed
327
328
329
330
331
332
333
    xfer->Add(readRPC);
    xfer->Add(applyOperatorRPC);
    xfer->Add(makePlotRPC);
    xfer->Add(useNetworkRPC);
    xfer->Add(updatePlotAttsRPC);
    xfer->Add(pickRPC);
    xfer->Add(startPickRPC);
hrchilds's avatar
hrchilds committed
334
    xfer->Add(startQueryRPC);
hrchilds's avatar
hrchilds committed
335
336
337
338
339
340
341
    xfer->Add(executeRPC);
    xfer->Add(clearCacheRPC);
    xfer->Add(queryRPC);
    xfer->Add(releaseDataRPC);
    xfer->Add(openDatabaseRPC);
    xfer->Add(defineVirtualDatabaseRPC);
    xfer->Add(renderRPC);
hrchilds's avatar
hrchilds committed
342
    xfer->Add(setWinAnnotAttsRPC);
hrchilds's avatar
hrchilds committed
343
    xfer->Add(cloneNetworkRPC);
hrchilds's avatar
hrchilds committed
344
    xfer->Add(procInfoRPC);
hrchilds's avatar
hrchilds committed
345
346
347

    // Create an object to implement the RPCs
    rpcExecutors.push_back(new RPCExecutor<QuitRPC>(quitRPC));
hrchilds's avatar
hrchilds committed
348
    rpcExecutors.push_back(new RPCExecutor<KeepAliveRPC>(keepAliveRPC));
hrchilds's avatar
hrchilds committed
349
350
351
352
353
354
355
356
357
358
    rpcExecutors.push_back(new RPCExecutor<ReadRPC>(readRPC));
    rpcExecutors.push_back(new RPCExecutor<ApplyOperatorRPC>(applyOperatorRPC));
    rpcExecutors.push_back(new RPCExecutor<PrepareOperatorRPC>(&applyOperatorRPC->GetPrepareOperatorRPC()));
    rpcExecutors.push_back(new RPCExecutor<MakePlotRPC>(makePlotRPC));
    rpcExecutors.push_back(new RPCExecutor<PreparePlotRPC>(&makePlotRPC->GetPreparePlotRPC()));
    rpcExecutors.push_back(new RPCExecutor<UseNetworkRPC>(useNetworkRPC));
    rpcExecutors.push_back(new RPCExecutor<UpdatePlotAttsRPC>(updatePlotAttsRPC));
    rpcExecutors.push_back(new RPCExecutor<PrepareUpdatePlotAttsRPC>(&updatePlotAttsRPC->GetPrepareUpdatePlotAttsRPC()));
    rpcExecutors.push_back(new RPCExecutor<PickRPC>(pickRPC));
    rpcExecutors.push_back(new RPCExecutor<StartPickRPC>(startPickRPC));
hrchilds's avatar
hrchilds committed
359
    rpcExecutors.push_back(new RPCExecutor<StartQueryRPC>(startQueryRPC));
hrchilds's avatar
hrchilds committed
360
361
362
363
364
365
366
    rpcExecutors.push_back(new RPCExecutor<ExecuteRPC>(executeRPC));
    rpcExecutors.push_back(new RPCExecutor<ClearCacheRPC>(clearCacheRPC));
    rpcExecutors.push_back(new RPCExecutor<QueryRPC>(queryRPC));
    rpcExecutors.push_back(new RPCExecutor<ReleaseDataRPC>(releaseDataRPC));
    rpcExecutors.push_back(new RPCExecutor<OpenDatabaseRPC>(openDatabaseRPC));
    rpcExecutors.push_back(new RPCExecutor<DefineVirtualDatabaseRPC>(defineVirtualDatabaseRPC));
    rpcExecutors.push_back(new RPCExecutor<RenderRPC>(renderRPC));
hrchilds's avatar
hrchilds committed
367
    rpcExecutors.push_back(new RPCExecutor<SetWinAnnotAttsRPC>(setWinAnnotAttsRPC));
hrchilds's avatar
hrchilds committed
368
    rpcExecutors.push_back(new RPCExecutor<CloneNetworkRPC>(cloneNetworkRPC));
hrchilds's avatar
hrchilds committed
369
    rpcExecutors.push_back(new RPCExecutor<ProcInfoRPC>(procInfoRPC));
hrchilds's avatar
hrchilds committed
370

hrchilds's avatar
hrchilds committed
371
    // Hook up the expression list as an observed object.
hrchilds's avatar
hrchilds committed
372
    Parser *p = new ExprParser(new avtExprNodeFactory());
hrchilds's avatar
hrchilds committed
373
374
375
    ParsingExprList *l = new ParsingExprList(p);
    xfer->Add(l->GetList());

hrchilds's avatar
hrchilds committed
376
377
378
379
380
381
382
    // Hook up metadata and SIL to be send back to the viewer.
    // This is intended to only be used for simulations.
    metaData = new avtDatabaseMetaData;
    silAtts = new SILAttributes;
    xfer->Add(metaData);
    xfer->Add(silAtts);

hrchilds's avatar
hrchilds committed
383
384
385
    //
    // Hook up the viewer connections to Xfer
    //
hrchilds's avatar
hrchilds committed
386
#ifdef PARALLEL
hrchilds's avatar
hrchilds committed
387
388
    if (PAR_UIProcess())
        xfer->SetInputConnection(theViewer.GetWriteConnection());
hrchilds's avatar
hrchilds committed
389
#else
hrchilds's avatar
hrchilds committed
390
    xfer->SetInputConnection(theViewer.GetWriteConnection());
hrchilds's avatar
hrchilds committed
391
#endif
hrchilds's avatar
hrchilds committed
392
    xfer->SetOutputConnection(theViewer.GetReadConnection());
hrchilds's avatar
hrchilds committed
393

hrchilds's avatar
hrchilds committed
394
395
396
397
398
399
    //
    // Set the global destination format. This only happens on the UI-Process
    // when running in parallel since non-UI processes have no SocketConnections.
    //
    if (theViewer.GetReadConnection() != 0)
        destinationFormat = theViewer.GetReadConnection()->GetDestinationFormat();
hrchilds's avatar
hrchilds committed
400

hrchilds's avatar
hrchilds committed
401
402
403
    //
    // Create the network manager and the load balancer.
    //
hrchilds's avatar
hrchilds committed
404
#ifdef PARALLEL
hrchilds's avatar
hrchilds committed
405
    lb = new LoadBalancer(PAR_Size(), PAR_Rank());
hrchilds's avatar
hrchilds committed
406
#else
hrchilds's avatar
hrchilds committed
407
    lb = new LoadBalancer(1, 0);
hrchilds's avatar
hrchilds committed
408
#endif
hrchilds's avatar
hrchilds committed
409
    netmgr->SetLoadBalancer(lb);
hrchilds's avatar
hrchilds committed
410

hrchilds's avatar
hrchilds committed
411
412
413
414
415
416
417
418
419
420
421
    //
    // Initialize some callback functions.
    //
    avtDataObjectSource::RegisterAbortCallback(Engine::EngineAbortCallback, xfer);
    avtDataObjectSource::RegisterProgressCallback(Engine::EngineUpdateProgressCallback,
                                                  NULL);
    LoadBalancer::RegisterAbortCallback(Engine::EngineAbortCallbackParallel, xfer);
    LoadBalancer::RegisterProgressCallback(Engine::EngineUpdateProgressCallback,
                                           NULL);
    avtTerminatingSource::RegisterInitializeProgressCallback(
                                       Engine::EngineInitializeProgressCallback, NULL);
hrchilds's avatar
hrchilds committed
422
423
}

hrchilds's avatar
hrchilds committed
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
// ****************************************************************************
//  Method:  Engine::GetInputSocket
//
//  Purpose:
//    Return the file descriptor for the input socket.
//
//  Arguments:
//    none
//
//  Programmer:  Jeremy Meredith
//  Creation:    January 12, 2004
//
// ****************************************************************************

int
Engine::GetInputSocket()
{
    return theViewer.GetWriteConnection()->GetDescriptor();
}

hrchilds's avatar
hrchilds committed
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
// ****************************************************************************
// Function: ConnectViewer
//
// Purpose: 
//   Connects to the viewer.
//
// Arguments:
//   viewer : The viewer object that we want to connect.
//
// Returns:    True if we connect to the viewer, false otherwise.
//
// Programmer: Brad Whitlock
// Creation:   Mon Sep 30 08:47:46 PDT 2002
//
// Modifications:
hrchilds's avatar
hrchilds committed
459
460
461
//    Jeremy Meredith, Thu Jul 10 11:37:48 PDT 2003
//    Made the engine an object.
//
hrchilds's avatar
hrchilds committed
462
463
464
//    Brad Whitlock, Tue Jul 29 11:21:22 PDT 2003
//    I updated the interface to ParentProcess::Connect.
//
hrchilds's avatar
hrchilds committed
465
466
467
// ****************************************************************************

bool
hrchilds's avatar
hrchilds committed
468
Engine::ConnectViewer(int *argc, char **argv[])
hrchilds's avatar
hrchilds committed
469
470
471
472
473
{
    // Connect to the viewer.
    TRY
    {
#ifdef PARALLEL
hrchilds's avatar
hrchilds committed
474
        theViewer.Connect(1, 2, argc, argv, PAR_UIProcess());
hrchilds's avatar
hrchilds committed
475
#else
hrchilds's avatar
hrchilds committed
476
        theViewer.Connect(1, 2, argc, argv, true);
hrchilds's avatar
hrchilds committed
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
#endif
    }
    CATCH(IncompatibleVersionException)
    {
        debug1 << "The engine has a different version than its client." << endl;
        noFatalExceptions = false;
    }
    CATCH(CouldNotConnectException)
    {
        noFatalExceptions = false;
    }
    ENDTRY

#ifdef PARALLEL
    //
    // Tell the other processes if they should exit if we had an error when
    // connecting to the viewer.
    //
    int shouldExit = noFatalExceptions ? 0 : 1;
    MPI_Bcast((void *)&shouldExit, 1, MPI_INT, 0, MPI_COMM_WORLD);
    noFatalExceptions = (shouldExit == 0);
#endif

    return noFatalExceptions;
}

#ifdef PARALLEL
// ****************************************************************************
// Function: PAR_EventLoop
//
// Purpose:
//   This is the main event loop for the engine. The master process executes
//   the serial event loop and its xfer object broadcasts the viewer's control
//   data to the xfer objects on the other processors.
//
// Notes:      
//   The MPI_BCast call for the UI process happens in the MPIXfer
//   object's process method. This was done so broadcasts are done
//   only when there are complete messages. This avoids LOTS of
//   extra broadcasts when the socket read tends to happen in small
//   increments.
//
// Programmer: Brad Whitlock
// Creation:   Wed Jul 12 15:58:20 PST 2000
//
// Modifications:
//    Jeremy Meredith, Thu Sep 21 22:42:19 PDT 2000
//    Added an extern for the appropriate socket to read from.
//    Note that this is a small hack.   
//
//    Brad Whitlock, Thu Mar 15 14:32:33 PST 2001
//    Rewrote it so the master process uses the new & improved serial
//    event loop. It is also set up to tell the other processes to die if
//    we lost the connection to the viewer.
//
//    Sean Ahern, Thu Feb 21 16:18:03 PST 2002
//    Added timeout support.
//
//    Brad Whitlock, Tue Apr 9 13:43:12 PST 2002
//    Ported to Windows.
//
hrchilds's avatar
hrchilds committed
538
539
540
//    Hank Childs, Tue Jun 24 18:02:01 PDT 2003
//    Allow for timeouts during network executions.
//
hrchilds's avatar
hrchilds committed
541
542
543
//    Jeremy Meredith, Thu Jul 10 11:37:48 PDT 2003
//    Made the engine an object.
//
hrchilds's avatar
hrchilds committed
544
545
546
547
548
549
//    Jeremy Meredith, Mon Nov  1 13:26:23 PST 2004
//    Use a buffer connection that is visible at class scope for parallel
//    communication.  Before, it was an automatic variable in this method
//    but I needed a separate PAR_ProcessInput method that used the same
//    buffer connection.
//
hrchilds's avatar
hrchilds committed
550
551
552
// ****************************************************************************

void
hrchilds's avatar
hrchilds committed
553
Engine::PAR_EventLoop()
hrchilds's avatar
hrchilds committed
554
{
hrchilds's avatar
hrchilds committed
555
    if (PAR_UIProcess())
hrchilds's avatar
hrchilds committed
556
557
558
    {
        // The master process executes the serial event loop since it
        // communicates with the viewer.
hrchilds's avatar
hrchilds committed
559
        bool errFlag = EventLoop();
hrchilds's avatar
hrchilds committed
560
561
562
563

        // If the errFlag is set, we exited the event loop because we lost
        // the connection to the viewer. We need to send a quit signal
        // to all other processes.
hrchilds's avatar
hrchilds committed
564
        if (errFlag || !noFatalExceptions)
hrchilds's avatar
hrchilds committed
565
        {
hrchilds's avatar
hrchilds committed
566
567
            quitRPC->Write(par_conn);
            xfer->SetInputConnection(&par_conn);
hrchilds's avatar
hrchilds committed
568
569
            xfer->SetEnableReadHeader(false);
            xfer->Process();
hrchilds's avatar
hrchilds committed
570
571
572
573
574
        }
    }
    else
    {
        // Non-UI Process
hrchilds's avatar
hrchilds committed
575
        while(!quitRPC->GetQuit() && noFatalExceptions)
hrchilds's avatar
hrchilds committed
576
577
578
579
580
        {
            // Reset the alarm
            ResetTimeout(timeout * 60);

            // Get state information from the UI process.
hrchilds's avatar
hrchilds committed
581
            MPI_Bcast((void *)&par_buf, 1, PAR_STATEBUFFER, 0, MPI_COMM_WORLD);
hrchilds's avatar
hrchilds committed
582
583

            // We have work to do, so cancel the alarm.
hrchilds's avatar
hrchilds committed
584
585
            int num_seconds_in_half_hour = 30*60;
            ResetTimeout(num_seconds_in_half_hour);
hrchilds's avatar
hrchilds committed
586
587

            // Add the state information to the connection.
hrchilds's avatar
hrchilds committed
588
            par_conn.Append((unsigned char *)par_buf.buffer, par_buf.nbytes);
hrchilds's avatar
hrchilds committed
589
590

            // Process the state information.
hrchilds's avatar
hrchilds committed
591
            xfer->Process();
hrchilds's avatar
hrchilds committed
592
593

            ResetTimeout(timeout * 60);
hrchilds's avatar
hrchilds committed
594
595
596
        }
    }
}
hrchilds's avatar
hrchilds committed
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624

// ****************************************************************************
//  Method:  Engine::PAR_ProcessInput
//
//  Purpose:
//    Same purpose as ProcessInput, but parallel-aware.
//
//  Arguments:
//    none
//
//  Programmer:  Jeremy Meredith
//  Creation:    November  1, 2004
//
// ****************************************************************************
void
Engine::PAR_ProcessInput()
{    
    if (PAR_UIProcess())
    {
        ProcessInput();
    }
    else
    {
        MPI_Bcast((void *)&par_buf, 1, PAR_STATEBUFFER, 0, MPI_COMM_WORLD);
        par_conn.Append((unsigned char *)par_buf.buffer, par_buf.nbytes);
        xfer->Process();
    }
}
hrchilds's avatar
hrchilds committed
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
#endif

// ****************************************************************************
// Function: EventLoop
//
// Purpose:
//   This is the serial event loop for the engine. It reads from the
//   socket attached to the viewer and processes the state information
//   that it reads.
//
// Notes:      Executes the event loop until the quit State object
//             says to quit.
//
// Programmer: Brad Whitlock
// Creation:   Wed Jul 12 16:08:27 PST 2000
//
// Modifications:
//    Jeremy Meredith, Wed Aug  9 14:42:44 PDT 2000
//    Changed the read socket to a real variable.
//
//    Brad Whitlock, Thu Mar 15 13:44:41 PST 2001
//    I rewrote the event loop so it uses a call to select. This allowed me
//    to add code that tests to see if we've lost a connection. If we lost
//    the connection, quit the program.
//
//    Brad Whitlock, Mon Oct 22 18:33:37 PST 2001
//    Changed the exception keywords to macros.
//
//    Sean Ahern, Thu Feb 21 16:14:52 PST 2002
//    Added timeout support.
//
//    Brad Whitlock, Mon Mar 25 16:03:54 PST 2002
//    Made the connection and timeout code more general.
//
hrchilds's avatar
hrchilds committed
659
660
661
//    Hank Childs, Tue Jun 24 18:02:01 PDT 2003
//    Allow for timeouts during network executions.
//
hrchilds's avatar
hrchilds committed
662
663
664
//    Jeremy Meredith, Thu Jul 10 11:37:48 PDT 2003
//    Made the engine an object.
//
hrchilds's avatar
hrchilds committed
665
666
667
// ****************************************************************************

bool
hrchilds's avatar
hrchilds committed
668
Engine::EventLoop()
hrchilds's avatar
hrchilds committed
669
670
671
672
{
    bool errFlag = false;

    // The application's main loop
hrchilds's avatar
hrchilds committed
673
    while(!quitRPC->GetQuit() && noFatalExceptions)
hrchilds's avatar
hrchilds committed
674
675
676
677
678
679
680
681
    {
        // Reset the timeout alarm
        ResetTimeout(timeout * 60);

        //
        // Block until the connection needs to be read. Then process its
        // new input.
        //
hrchilds's avatar
hrchilds committed
682
        if (xfer->GetInputConnection()->NeedsRead(true))
hrchilds's avatar
hrchilds committed
683
684
685
686
        {
            TRY
            {
                // We've got some work to do.  Disable the alarm.
hrchilds's avatar
hrchilds committed
687
                const int num_seconds_in_half_hour = 30*60;
hrchilds's avatar
hrchilds committed
688
                ResetTimeout(num_seconds_in_half_hour);
hrchilds's avatar
hrchilds committed
689
690

                // Process input.
hrchilds's avatar
hrchilds committed
691
                ProcessInput();
hrchilds's avatar
hrchilds committed
692
693

                ResetTimeout(timeout * 60);
hrchilds's avatar
hrchilds committed
694
695
696
697
            }
            CATCH(LostConnectionException)
            {
                // Indicate that we want to quit the application.
hrchilds's avatar
hrchilds committed
698
                quitRPC->SetQuit(true);
hrchilds's avatar
hrchilds committed
699
700
701
702
703
704
705
706
707
708
                errFlag = true;
            }
            ENDTRY
        }
    }

    return errFlag;
}

// ****************************************************************************
hrchilds's avatar
hrchilds committed
709
//  Function: ProcessInput
hrchilds's avatar
hrchilds committed
710
//
hrchilds's avatar
hrchilds committed
711
712
713
714
//  Purpose:
//    Reads socket input from the viewer and adds it to the xfer object's
//    input. After doing that, the xfer object is called upon to process its
//    input.
hrchilds's avatar
hrchilds committed
715
//
hrchilds's avatar
hrchilds committed
716
//  Notes:      
hrchilds's avatar
hrchilds committed
717
//
hrchilds's avatar
hrchilds committed
718
719
//  Programmer: Brad Whitlock
//  Creation:   Thu Mar 15 14:08:30 PST 2001
hrchilds's avatar
hrchilds committed
720
//
hrchilds's avatar
hrchilds committed
721
722
723
724
725
726
//  Modifications:
//    Brad Whitlock, Wed Mar 20 17:53:20 PST 2002
//    I abstracted the read code.
//
//    Jeremy Meredith, Thu Jul 10 11:37:48 PDT 2003
//    Made the engine an object.
hrchilds's avatar
hrchilds committed
727
728
729
730
//
// ****************************************************************************

void
hrchilds's avatar
hrchilds committed
731
Engine::ProcessInput()
hrchilds's avatar
hrchilds committed
732
733
{    
    // Try reading from the viewer.  
hrchilds's avatar
hrchilds committed
734
    int amountRead = xfer->GetInputConnection()->Fill();
hrchilds's avatar
hrchilds committed
735
736

    // If we got input, process it. Otherwise, start counting errors.
hrchilds's avatar
hrchilds committed
737
    if (amountRead > 0)
hrchilds's avatar
hrchilds committed
738
739
    {
        // Process the new information.
hrchilds's avatar
hrchilds committed
740
        xfer->Process();
hrchilds's avatar
hrchilds committed
741
742
743
744
    }
}

// ****************************************************************************
hrchilds's avatar
hrchilds committed
745
//  Function: ProcessCommandLine
hrchilds's avatar
hrchilds committed
746
//
hrchilds's avatar
hrchilds committed
747
748
//  Purpose:
//    Reads the command line arguments for the engine.
hrchilds's avatar
hrchilds committed
749
//
hrchilds's avatar
hrchilds committed
750
751
//  Programmer: Jeremy Meredith
//  Creation:   September 21, 2001
hrchilds's avatar
hrchilds committed
752
//
hrchilds's avatar
hrchilds committed
753
754
755
756
757
758
//  Modifications:
//    Eric Brugger, Wed Nov  7 12:40:56 PST 2001
//    I added the command line argument -timing.
//
//    Sean Ahern, Thu Feb 21 16:12:43 PST 2002
//    Added timeout support.
hrchilds's avatar
hrchilds committed
759
//
hrchilds's avatar
hrchilds committed
760
761
//    Sean Ahern, Tue Dec  3 09:58:28 PST 2002
//    Added -dump support for streamer debugging.
hrchilds's avatar
hrchilds committed
762
//
hrchilds's avatar
hrchilds committed
763
764
//    Hank Childs, Mon May 12 19:44:50 PDT 2003
//    Add support for -lb-block, -lb-stride, and -lb-random.
hrchilds's avatar
hrchilds committed
765
//
hrchilds's avatar
hrchilds committed
766
767
//    Jeremy Meredith, Thu Jul 10 11:37:48 PDT 2003
//    Made the engine an object.
hrchilds's avatar
hrchilds committed
768
769
770
// 
//    Mark C. Miller, Thu Mar  4 12:07:04 PST 2004
//    Told NetworkManager to dump rendered images
hrchilds's avatar
hrchilds committed
771
//
hrchilds's avatar
hrchilds committed
772
773
774
//    Hank Childs, Sun Mar  6 08:42:50 PST 2005
//    Renamed -forcedynamic to -allowdynamic.  Removed -forcestatic argument.
//
hrchilds's avatar
hrchilds committed
775
776
// ****************************************************************************
void
hrchilds's avatar
hrchilds committed
777
Engine::ProcessCommandLine(int argc, char **argv)
hrchilds's avatar
hrchilds committed
778
779
780
781
{
    // process arguments.
    for (int i=1; i<argc; i++)
    {
hrchilds's avatar
hrchilds committed
782
783
        if (strcmp(argv[i], "-allowdynamic") == 0)
            LoadBalancer::AllowDynamic();
hrchilds's avatar
hrchilds committed
784
785
786
787
788
789
790
791
792
793
        else if (strcmp(argv[i], "-timing") == 0)
            visitTimer->Enable();
        else if (strcmp(argv[i], "-timeout") == 0)
        {
            timeout = atol(argv[i+1]);
            i++;
        }
        else if (strcmp(argv[i], "-dump") == 0)
        {
            avtStreamer::DebugDump(true);
hrchilds's avatar
hrchilds committed
794
            netmgr->DumpRenders();
hrchilds's avatar
hrchilds committed
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
        }
        else if (strcmp(argv[i], "-lb-block") == 0)
        {
            LoadBalancer::SetScheme(LOAD_BALANCE_CONTIGUOUS_BLOCKS_TOGETHER);
        }
        else if (strcmp(argv[i], "-lb-stride") == 0)
        {
            LoadBalancer::SetScheme(LOAD_BALANCE_STRIDE_ACROSS_BLOCKS);
        }
        else if (strcmp(argv[i], "-lb-random") == 0)
        {
            LoadBalancer::SetScheme(LOAD_BALANCE_RANDOM_ASSIGNMENT);
        }
    }
}

// ****************************************************************************
hrchilds's avatar
hrchilds committed
812
//  Function: AlarmHandler
hrchilds's avatar
hrchilds committed
813
//
hrchilds's avatar
hrchilds committed
814
815
//  Purpose:
//    Gracefully exits the engine if an SIGALRM signal was received.
hrchilds's avatar
hrchilds committed
816
//
hrchilds's avatar
hrchilds committed
817
818
//  Programmer: Sean Ahern
//  Creation:   Thu Feb 21 16:13:43 PST 2002
hrchilds's avatar
hrchilds committed
819
//
hrchilds's avatar
hrchilds committed
820
821
822
823
824
825
//  Modifications:
//    Brad Whitlock, Tue Apr 9 13:46:32 PST 2002
//    Disabled on Windows.
//
//    Jeremy Meredith, Thu Jul 10 11:37:48 PDT 2003
//    Made the engine an object.
hrchilds's avatar
hrchilds committed
826
//
hrchilds's avatar
hrchilds committed
827
828
829
//    Mark C. Miller, Wed Jul  7 11:42:09 PDT 2004
//    Made it PAR_Exit() in parallel and call Init::Finalize()
//
hrchilds's avatar
hrchilds committed
830
// ****************************************************************************
hrchilds's avatar
hrchilds committed
831

hrchilds's avatar
hrchilds committed
832
void
hrchilds's avatar
hrchilds committed
833
Engine::AlarmHandler(int signal)
hrchilds's avatar
hrchilds committed
834
835
{
    debug1 << "ENGINE exited due to an inactivity timeout of "
hrchilds's avatar
hrchilds committed
836
           << Engine::Instance()->timeout << " minutes." << endl;
hrchilds's avatar
hrchilds committed
837
838

    Init::Finalize();
hrchilds's avatar
hrchilds committed
839
840
841
#ifdef PARALLEL
    PAR_Exit();
#endif
hrchilds's avatar
hrchilds committed
842
843
    exit(0);

hrchilds's avatar
hrchilds committed
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
}

// ****************************************************************************
//  Function: NewHandler
//
//  Purpose: Issue warning message when memory has run out
//
//  Programmer: Mark C. Miller 
//  Creation:   Tue Jun 29 17:34:19 PDT 2004
// ****************************************************************************

void
Engine::NewHandler(void)
{
#ifdef PARALLEL
    const char *msg = "VisIt: engine out of memory, try more processors";
#else
    const char *msg = "VisIt: engine out of memory";
#endif

    debug1 << msg << endl;
    //cerr << msg << endl;

#ifdef PARALLEL
    MPI_Abort(MPI_COMM_WORLD, 18);
#else
    abort();
#endif

hrchilds's avatar
hrchilds committed
873
874
}

hrchilds's avatar
hrchilds committed
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
// ****************************************************************************
//  Method: WriteByteStreamToSocket
//
//  Purpose:
//      Writes a byte stream to a socket.
//
//  Programmer: Hank Childs
//  Creation:   March 19, 2004
//
// ****************************************************************************

static void
WriteByteStreamToSocket(NonBlockingRPC *rpc, Connection *vtkConnection,
                        avtDataObjectString &do_str)
{
    int totalSize = do_str.GetTotalLength();
    rpc->SendReply(totalSize);
    int writeData = visitTimer->StartTimer();
    int nStrings = do_str.GetNStrings();
    debug5 << "sending " << totalSize << " bytes to the viewer " << nStrings
           << " from strings." << endl;

    const int buff_size = 4096;
    unsigned char buffer[buff_size];
    int buff_cur = 0;
    int strings_written = 0;
    for (int i = 0 ; i < nStrings ; i++)
    {
        int size;
        char *str;
        do_str.GetString(i, str, size);

        if ((buff_cur + size) < buff_size)
        {
            // Put this message into the buffer.
            memcpy(buffer + buff_cur, str, size*sizeof(char));
            buff_cur += size;
        }
        else
        {
            // We can't put this message into "buffer", because
            // that would exceed buffer's size.  Write "buffer"
            // first, or else we would be sending messages out of
            // order.
            vtkConnection->DirectWrite(buffer, long(buff_cur));
            strings_written++;

            buff_cur = 0;
            if (size > buff_size)
            {
                // It's big. Just write this string directly.
                vtkConnection->DirectWrite((const unsigned char *)str,
                                           long(size));
                strings_written++;
            }
            else
            {
                memcpy(buffer + buff_cur, str, size*sizeof(char));
                buff_cur += size;
            }
        }
        
        // We have no more strings, so just write what we have.
        if ((i == (nStrings-1)) && (buff_cur > 0))
        {
            vtkConnection->DirectWrite(buffer, long(buff_cur));
            strings_written++;
            buff_cur = 0;
        }
    }

    debug5 << "Number of actual direct writes = " << strings_written << endl;

    char info[124];
    SNPRINTF(info, 124, "Writing %d bytes to socket", totalSize);     
    visitTimer->StopTimer(writeData, info);
}


hrchilds's avatar
hrchilds committed
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
// ****************************************************************************
// Function: WriteData
//
// Purpose:
//   Writes a vtkDataSet object back to the viewer
//
// Notes:      
//
// Programmer: Jeremy Meredith
// Creation:   September 5, 2000
//
// Modifications:
//    Jeremy Meredith, Thu Sep 21 22:17:42 PDT 2000
//    I made this append the output from all processors in parallel
//    before sending it back to the viewer.
//
//    Jeremy Meredith, Tue Sep 26 16:41:09 PDT 2000
//    Made this use an avtDataSetWriter in serial.  Parallel is
//    temporarily broken.
//
//    Jeremy Meredith, Thu Sep 28 12:44:22 PDT 2000
//    Fixed for parallel.
//
//    Hank Childs, Tue Oct 17 09:38:12 PDT 2000
//    Made argument be a reference counted avtDataset instead of a normal
//    avtDataset.
//
//    Brad Whitlock, Wed Oct 18 14:48:29 PST 2000
//    I fixed a bug with using the ref_ptr. I also added code to set
//    the destination format in the avtDataSetWriter used by the UI-process
//    so the avtDataSet it sends back is in the right format. Note that
//    I didn't set the destination format in the non-UI processes because
//    the string created by non-UI processes is sent to the UI process
//    where it is read and resent to the viewer.
//
//    Hank Childs, Wed Oct 18 16:57:03 PDT 2000
//    Cleaned up memory leak.
//
//    Hank Childs, Wed Jan 17 11:37:36 PST 2001
//    Made input be a data object writer rather than data.
//
//    Hank Childs, Mon Feb 12 07:55:47 PST 2001
//    Fix logic for parallel.
//
//    Brad Whitlock, Tue May 1 13:45:31 PST 2001
//    Added code to send back status.
//
//    Hank Childs, Sat May 26 10:31:14 PDT 2001
//    Made use of avtDataObjectString to prevent bottlenecking at proc 0.
//
//    Jeremy Meredith, Fri Jun 29 14:50:18 PDT 2001
//    Added progress reporting, even in parallel.
//
//    Jeremy Meredith, Tue Aug 14 14:45:12 PDT 2001
//    Made the final progress update occur before the SendReply because
//    SendStatus is for reporting *incomplete* progress.  If it got sent
//    too soon, it overwrote the complete status in the viewer and 
//    the viewer never realized it was done.  Thus, it hung.
//
//    Hank Childs, Sun Sep 16 14:55:48 PDT 2001
//    Add timing information.
//
//    Hank Childs, Sun Sep 16 17:30:43 PDT 2001
//    Reflect new interface for data object strings.
//
//    Hank Childs, Mon Sep 17 11:20:10 PDT 2001
//    Have the data object reader handle deleting data strings.
//
//    Hank Childs, Mon Oct  1 11:31:27 PDT 2001
//    Stop assumptions about data objects.
//
//    Eric Brugger, Mon Nov  5 13:50:49 PST 2001
//    Modified to always compile the timing code.
//
//    Hank Childs, Mon Jan  7 19:39:32 PST 2002
//    Fix memory leak.
//
//    Brad Whitlock, Tue Mar 26 10:56:38 PDT 2002
//    Changed the communication code so it uses connection objects.
//
//    Sean Ahern, Thu Jun 13 10:51:17 PDT 2002
//    Removed broken, unused code.
//
//    Hank Childs, Sun Aug 18 21:41:29 PDT 2002
//    Removed progress message that said "Transferring Data Set" as we were
//    really waiting for the processors to synchronize.
//
//    Hank Childs (Mark Miller), Mon May 12 18:04:51 PDT 2003
//    Corrected slightly misleading debug statement.
//
//    Hank Childs, Fri May 16 16:26:03 PDT 2003
//    Detect if there was a failure in the pipeline and send a message to
//    the viewer if so.
//
hrchilds's avatar
hrchilds committed
1048
1049
1050
//    Kathleen Bonnell, Thu Jun 12 10:57:11 PDT 2003 
//    Split timing code to time Serialization separately from write. 
//    
hrchilds's avatar
hrchilds committed
1051
1052
1053
//    Jeremy Meredith, Thu Jul 10 11:37:48 PDT 2003
//    Made the engine an object.
//
hrchilds's avatar
hrchilds committed
1054
1055
1056
//    Mark C. Miller, Wed Feb  4 19:45:35 PST 2004
//    Made the ui_dob a 'clone' of the writer's input
//
hrchilds's avatar
hrchilds committed
1057
1058
1059
1060
//    Hank Childs, Wed Feb 25 11:40:21 PST 2004
//    Fix mis-spelling of cumulative (the function we were calling changed
//    names).
//
hrchilds's avatar
hrchilds committed
1061
1062
1063
//    Hank Childs, Fri Mar 19 21:20:12 PST 2004
//    Use a helper routine (that's more efficient) to write to a socket.
//
hrchilds's avatar
hrchilds committed
1064
1065
1066
1067
1068
1069
//    Mark C. Miller, Mon May 24 18:36:13 PDT 2004
//    Added arguments to support checking of scalable threshold is exceeded
//    Modified communication algorithm to have each processor handshake with
//    UI proc and check if ok before actually sending data. When scalable
//    threshold is exceeded, UI proc tells everyone to stop sending data.
//
hrchilds's avatar
hrchilds committed
1070
1071
1072
//    Mark C. Miller, Thu Jun 10 09:08:18 PDT 2004
//    Modified to use unique MPI message tags
//
hrchilds's avatar
hrchilds committed
1073
1074
1075
1076
1077
1078
//    Mark C. Miller, Wed Jul  7 11:42:09 PDT 2004
//    Added explicit const bool polysOnly to document the fact that we
//    are counting polygons only during this phase. Also added code to
//    set processor 0's cell count and test if its count alone causes the
//    scalable threshold to be exceeded.
//
hrchilds's avatar
hrchilds committed
1079
1080
1081
1082
//    Mark C. Miller, Wed Aug 11 23:42:18 PDT 2004
//    Added argument for cellCountMultiplier. Used cellCountMultiplier
//    to adjust cell counting for SR threshold
//
hrchilds's avatar
hrchilds committed
1083
1084
1085
//    Hank Childs, Wed Dec  1 14:57:22 PST 2004
//    Automatically transition to SR mode with image based plots.
//
hrchilds's avatar
hrchilds committed
1086
1087
// ****************************************************************************
void
hrchilds's avatar
hrchilds committed
1088
1089
Engine::WriteData(NonBlockingRPC *rpc, avtDataObjectWriter_p &writer,
    bool respondWithNull, int scalableThreshold, bool* scalableThresholdExceeded,
hrchilds's avatar
hrchilds committed
1090
1091
    int currentTotalGlobalCellCount, float cellCountMultiplier,
    int* currentNetworkGlobalCellCount)
hrchilds's avatar
hrchilds committed
1092
{
hrchilds's avatar
hrchilds committed
1093

hrchilds's avatar
hrchilds committed
1094
#ifdef PARALLEL
hrchilds's avatar
hrchilds committed
1095

hrchilds's avatar
hrchilds committed
1096
1097
    static const bool polysOnly = true;

hrchilds's avatar
hrchilds committed
1098
1099
1100
1101
    // set up MPI message tags
    int mpiCellCountTag   = GetUniqueMessageTag();
    int mpiSendDataTag    = GetUniqueMessageTag();
    int mpiDataObjSizeTag = GetUniqueMessageTag();
hrchilds's avatar
hrchilds committed
1102
    int mpiDataObjDataTag = GetUniqueMessageTag();
hrchilds's avatar
hrchilds committed
1103

hrchilds's avatar
hrchilds committed
1104
1105
1106
1107
1108
1109
1110
1111
1112
1113
1114
    //
    // When respond with null is true, this routine still has an obligation
    // to recieve the dummied-up data tree from each processor, regardless of
    // whether or not the scalable threshold has been exceeded. So, we capture
    // that fact in the 'sendDataAnyway' bool. Likewise, when scalableThreshold
    // is -1, it means also to send the data anyway. 
    //
    bool sendDataAnyway = respondWithNull || scalableThreshold==-1;
    bool thresholdExceeded = false;
    int  currentCellCount = 0;

hrchilds's avatar
hrchilds committed
1115
1116
1117
1118
1119
1120
1121
1122
1123
1124
1125
1126
    if (PAR_UIProcess())
    {
        int collectAndWriteData = visitTimer->StartTimer();
        int collectData = visitTimer->StartTimer();

        // Send a second stage for the RPC.
        rpc->SendStatus(0,
                        rpc->GetCurStageNum(),
                        "Synchronizing",
                        rpc->GetMaxStageNum());

        avtDataObject_p ui_dob = writer->GetInput();
hrchilds's avatar
hrchilds committed
1127

hrchilds's avatar
hrchilds committed
1128
1129
1130
1131
1132
1133
1134
1135
1136
1137
1138
1139
        if (cellCountMultiplier > INT_MAX/2.)
            currentCellCount = INT_MAX;
        else
            currentCellCount = (int) 
                   (ui_dob->GetNumberOfCells(polysOnly) * cellCountMultiplier);

        // test if we've exceeded the scalable threshold already with proc 0's
        // output
        if (currentTotalGlobalCellCount == INT_MAX ||
            currentCellCount == INT_MAX ||
            (currentTotalGlobalCellCount + currentCellCount 
                  > scalableThreshold))
hrchilds's avatar
hrchilds committed
1140
        {
hrchilds's avatar
hrchilds committed
1141
1142
            debug5 << "exceeded scalable threshold of " << scalableThreshold 
                   << endl;
hrchilds's avatar
hrchilds committed
1143
1144
            thresholdExceeded = true; 
        }
hrchilds's avatar
hrchilds committed
1145

hrchilds's avatar
hrchilds committed
1146
1147
        if (writer->MustMergeParallelStreams())
        {
hrchilds's avatar
hrchilds committed
1148
1149
1150
1151
            // we clone here to preserve this processor's orig network output
            // while we merge other proc's output into the cloned dob
            ui_dob = ui_dob->Clone();

hrchilds's avatar
hrchilds committed
1152
1153
1154
            for (int i=1; i<PAR_Size(); i++)
            {
                MPI_Status stat;
hrchilds's avatar
hrchilds committed
1155
1156
1157
1158
1159
1160
1161
                int size, proc_i_localCellCount;

                int shouldGetData = 1;
                int mpiSource = MPI_ANY_SOURCE; 

                // recv the "num cells I have" message from any proc
                MPI_Recv(&proc_i_localCellCount, 1, MPI_INT, MPI_ANY_SOURCE,
hrchilds's avatar
hrchilds committed
1162
                    mpiCellCountTag, MPI_COMM_WORLD, &stat);
hrchilds's avatar
hrchilds committed
1163
1164
1165
1166
1167
1168
1169

                mpiSource = stat.MPI_SOURCE;

                debug5 << "recievied the \"num cells I have\" (=" << proc_i_localCellCount
                       << ") message from processor " << mpiSource << endl;

                // accumulate this processors cell count in the total for this network
hrchilds's avatar
hrchilds committed
1170
1171
                if (currentCellCount != INT_MAX)
                    currentCellCount += proc_i_localCellCount;
hrchilds's avatar
hrchilds committed
1172
1173

                // test if we've exceeded the scalable threshold
hrchilds's avatar
hrchilds committed
1174
1175
1176
1177
                if (currentTotalGlobalCellCount == INT_MAX ||
                    currentCellCount == INT_MAX ||
                    (currentTotalGlobalCellCount + currentCellCount 
                          > scalableThreshold))
hrchilds's avatar
hrchilds committed
1178
                {
hrchilds's avatar
hrchilds committed
1179
1180
                    if (!thresholdExceeded)
                        debug5 << "exceeded scalable threshold of " << scalableThreshold << endl;
hrchilds's avatar
hrchilds committed
1181
1182
1183
1184
1185
1186
1187
                    shouldGetData = sendDataAnyway;
                    thresholdExceeded = true; 
                }

                // tell source processor whether or not to send data with
                // the "should send data" message
                MPI_Send(&shouldGetData, 1, MPI_INT, mpiSource, 
hrchilds's avatar
hrchilds committed
1188
                    mpiSendDataTag, MPI_COMM_WORLD);
hrchilds's avatar
hrchilds committed
1189
1190
1191
1192
1193
1194
                debug5 << "told processor " << mpiSource << (shouldGetData==1?" to":" NOT to")
                       << " send data" << endl;

                if (shouldGetData)
                {
                    MPI_Recv(&size, 1, MPI_INT, mpiSource, 
hrchilds's avatar
hrchilds committed
1195
                             mpiDataObjSizeTag, MPI_COMM_WORLD, &stat);
hrchilds's avatar
hrchilds committed
1196
1197
1198
1199
1200
1201
1202
                    debug5 << "recieving size=" << size << endl;

                    debug5 << "receiving " << size << " bytes from MPI_SOURCE "
                           << mpiSource << endl;

                    char *str = new char[size];
                    MPI_Recv(str, size, MPI_CHAR, mpiSource, 
hrchilds's avatar
hrchilds committed
1203
                             mpiDataObjDataTag, MPI_COMM_WORLD, &stat);
hrchilds's avatar
hrchilds committed
1204
                    debug5 << "recieving data" << endl;
hrchilds's avatar
hrchilds committed
1205
    
hrchilds's avatar
hrchilds committed
1206
1207
1208
1209
1210
1211
1212
1213
1214
1215
1216
1217
1218
1219
1220
1221
                    // The data object reader will delete the string.
                    avtDataObjectReader *avtreader = new avtDataObjectReader;
                    avtreader->Read(size, str);
                    avtDataObject_p proc_i_dob = avtreader->GetOutput();

                    // We can't tell the reader to read (Update) unless we tell it
                    // what we want it to read.  Fortunately, we can just ask it
                    // for a general specification.
                    avtTerminatingSource *src = proc_i_dob->GetTerminatingSource();
                    avtPipelineSpecification_p spec
                        = src->GetGeneralPipelineSpecification();
                    proc_i_dob->Update(spec);

                    ui_dob->Merge(*proc_i_dob);
                    delete avtreader;
                }
hrchilds's avatar
hrchilds committed
1222
1223
1224
1225
1226
1227
1228
1229
1230

                rpc->SendStatus(100. * float(i)/float(PAR_Size()),
                                rpc->GetCurStageNum(),
                                "Synchronizing",
                                rpc->GetMaxStageNum());
            }
        }
        visitTimer->StopTimer(collectData, "Collecting data");

hrchilds's avatar
hrchilds committed
1231
        // indicate that cumulative extents in data object now as good as true extents
hrchilds's avatar
hrchilds committed
1232
        ui_dob->GetInfo().GetAttributes().SetCanUseCumulativeAsTrueOrCurrent(true);
hrchilds's avatar
hrchilds committed
1233

hrchilds's avatar
hrchilds committed
1234
1235
1236
1237
1238
1239
        //
        // See if there was an error on another processor.
        //
        avtDataValidity &v = ui_dob->GetInfo().GetValidity();
        if (!v.HasErrorOccurred())
        {
hrchilds's avatar
hrchilds committed
1240
            int serializeData = visitTimer->StartTimer();
hrchilds's avatar
hrchilds committed
1241
1242
1243
1244
1245
1246
1247
1248
1249
1250
1251

            if (thresholdExceeded && !sendDataAnyway)
            {
                // dummy a null data object message to send to viewer
                avtNullData_p nullData = new avtNullData(NULL,AVT_NULL_DATASET_MSG);
                CopyTo(ui_dob, nullData);
            }

            // Create a writer to write across the network.
            avtDataObjectWriter_p networkwriter = ui_dob->InstantiateWriter();
            networkwriter->SetDestinationFormat(destinationFormat);
hrchilds's avatar
hrchilds committed
1252
1253
1254
1255
1256
1257
1258
1259
1260
1261
            networkwriter->SetInput(ui_dob);
    
            avtDataObjectString do_str;
            networkwriter->Write(do_str);
    
            rpc->SendStatus(100,
                            rpc->GetCurStageNum(),
                            "Synchronizing",
                            rpc->GetMaxStageNum());
    
hrchilds's avatar
hrchilds committed
1262
1263
            visitTimer->StopTimer(serializeData, "Serializing data for writer");

hrchilds's avatar
hrchilds committed
1264
            WriteByteStreamToSocket(rpc, vtkConnection, do_str);
hrchilds's avatar
hrchilds committed
1265

hrchilds's avatar
hrchilds committed
1266
1267
1268
1269
1270
1271
1272
1273
1274
        }
        else
        {
            rpc->SendError(v.GetErrorMessage());
        }

        char *descStr = "Collecting data and writing it to viewer";
        visitTimer->StopTimer(collectAndWriteData, descStr);
    }
hrchilds's avatar
hrchilds committed
1275
    else // non-UI processes
hrchilds's avatar
hrchilds committed
1276
1277
1278
1279
1280
1281
1282
1283
1284
    {
        if (writer->MustMergeParallelStreams())
        {
            char *str;
            int   size;
            avtDataObjectString do_str;
            writer->Write(do_str);
            do_str.GetWholeString(str, size);

hrchilds's avatar
hrchilds committed
1285
1286
1287
1288
            int shouldSendData = 1;
            MPI_Status stat;

            // send the "num cells I have" message to proc 0
hrchilds's avatar
hrchilds committed
1289
1290
1291
1292
1293
1294
            int numCells;
            if (cellCountMultiplier > INT_MAX/2.)
                numCells = INT_MAX;
            else
                numCells = (int) 
                             (writer->GetInput()->GetNumberOfCells(polysOnly) *
hrchilds's avatar
hrchilds committed
1295
                                                      cellCountMultiplier);
hrchilds's avatar
hrchilds committed
1296
            debug5 << "sending \"num cells I have\" message (=" << numCells << ")" << endl;
hrchilds's avatar
hrchilds committed
1297
            MPI_Send(&numCells, 1, MPI_INT, 0, mpiCellCountTag, MPI_COMM_WORLD);
hrchilds's avatar
hrchilds committed
1298
1299

            // recv the "should send data" message from proc 0
hrchilds's avatar
hrchilds committed
1300
            MPI_Recv(&shouldSendData, 1, MPI_INT, 0, mpiSendDataTag, MPI_COMM_WORLD, &stat);
hrchilds's avatar
hrchilds committed
1301
1302
1303
1304

            if (shouldSendData)
            {
               debug5 << "sending size=" << size << endl; 
hrchilds's avatar
hrchilds committed
1305
               MPI_Send(&size, 1, MPI_INT, 0, mpiDataObjSizeTag, MPI_COMM_WORLD);
hrchilds's avatar
hrchilds committed
1306
1307
               debug5 << "sending " << size << " bytes to proc 0" << endl;
               debug5 << "sending data" << endl; 
hrchilds's avatar
hrchilds committed
1308
               MPI_Send(str, size, MPI_CHAR, 0, mpiDataObjDataTag, MPI_COMM_WORLD);
hrchilds's avatar
hrchilds committed
1309
1310
1311
1312
1313
1314
            }
            else
            {
                debug5 << "not sending data to proc 0 because the scalable"
                       << "threshold has been exceeded." << endl;
            }
hrchilds's avatar
hrchilds committed
1315
1316
1317
1318
1319
1320
1321
1322
        }
        else
        {
            debug5 << "not sending data to proc 0 because the data object "
                   << "does not require parallel streams." << endl;
        }
    }

hrchilds's avatar
hrchilds committed
1323
1324
1325
1326
1327
1328
1329
1330
1331
1332
1333
1334
1335
1336
1337
    //
    // all processors need to know the network's cell count and whether
    // scalable threshold was exceeded
    //
    int tmp[2] = {currentCellCount, thresholdExceeded?1:0};
    MPI_Bcast(tmp, 2, MPI_INT, 0, MPI_COMM_WORLD);
    currentCellCount  = tmp[0];
    thresholdExceeded = tmp[1]==1;

    // return requested arguments
    if (currentNetworkGlobalCellCount != 0)
        *currentNetworkGlobalCellCount = currentCellCount;
    if (scalableThresholdExceeded != 0)
        *scalableThresholdExceeded = thresholdExceeded;

hrchilds's avatar
hrchilds committed
1338
1339
#else // serial
    avtDataObject_p dob = writer->GetInput();
hrchilds's avatar
hrchilds committed
1340
    dob->GetInfo().GetAttributes().SetCanUseCumulativeAsTrueOrCurrent(true);
hrchilds's avatar
hrchilds committed
1341
1342
1343
    avtDataValidity &v = dob->GetInfo().GetValidity();
    if (!v.HasErrorOccurred())
    {
hrchilds's avatar
hrchilds committed
1344
1345
1346
1347
1348
1349
1350
1351
1352
1353
1354
1355
1356
1357
        avtDataObjectWriter_p writer_to_use = writer;
        if (cellCountMultiplier > INT_MAX/2.) // div2 for float precision
        {
            // dummy a null data object message to send to viewer
            avtNullData_p nullData =new avtNullData(NULL,AVT_NULL_DATASET_MSG);
            nullData->GetInfo().Copy(dob->GetInfo());
            CopyTo(dob, nullData);
            avtDataObjectWriter_p nullwriter = dob->InstantiateWriter();
            nullwriter->SetInput(dob);
            writer_to_use = nullwriter;
            *scalableThresholdExceeded = true;
            *currentNetworkGlobalCellCount = INT_MAX;
        }

hrchilds's avatar
hrchilds committed
1358
1359
1360
1361
1362
1363
        // Send a second stage for the RPC.
        rpc->SendStatus(0,
                        rpc->GetCurStageNum(),
                        "Transferring Data Set",
                        rpc->GetMaxStageNum());

hrchilds's avatar
hrchilds committed
1364
        writer_to_use->SetDestinationFormat(destinationFormat);
hrchilds's avatar
hrchilds committed
1365
        avtDataObjectString  do_str;
hrchilds's avatar
hrchilds committed
1366
        writer_to_use->Write(do_str);
hrchilds's avatar
hrchilds committed
1367
1368
1369
1370
1371
1372

        rpc->SendStatus(100,
                        rpc->GetCurStageNum(),
                        "Transferring Data Set",
                        rpc->GetMaxStageNum());

hrchilds's avatar
hrchilds committed
1373
        WriteByteStreamToSocket(rpc, vtkConnection, do_str);
hrchilds's avatar
hrchilds committed
1374
1375
1376
1377
1378
1379
1380
1381
    }
    else
    {
        rpc->SendError(v.GetErrorMessage());
    }
#endif
}

hrchilds's avatar
hrchilds committed
1382
1383
1384
1385
1386
1387
1388
1389
1390
1391
1392
1393
1394
1395
1396
1397
1398
1399
1400
1401
1402
1403
1404
1405
1406
1407
1408
1409
1410
1411
1412
1413
1414
1415
// ****************************************************************************
// Method: Engine::SendKeepAliveReply
//
// Purpose: 
//   Sends a small string to the client over the engine's data socket.
//
// Programmer: Brad Whitlock
// Creation:   Fri Mar 12 11:32:25 PDT 2004
//
// Modifications:
//   
// ****************************************************************************

void
Engine::SendKeepAliveReply()
{
#ifdef PARALLEL
    if(PAR_UIProcess())
    {
#endif
        //
        // Send a reply on the command socket.
        //
        keepAliveRPC->SendReply();

        //
        // Send a little data on the data socket.
        //
        const char *str = "VisIt!!!";
        vtkConnection->DirectWrite((const unsigned char *)str, 10);
#ifdef PARALLEL
    }
#endif
}
hrchilds's avatar
hrchilds committed
1416
1417
1418
1419
1420
1421
1422
1423
1424
1425
1426
1427
1428
1429
1430
1431
1432
1433
1434
1435

// ****************************************************************************
//  Function: EngineAbortCallback
//
//  Purpose:
//      Tells filters whether or not they should abort execution.
//
//  Arguments:
//      data     the xfer object to use
//
//  Programmer:   Jeremy Meredith
//  Creation:     July 3, 2001
//
//  Modifications:
//    Jeremy Meredith, Thu Sep 20 18:28:22 PDT 2001
//    Added support for parallel interruption.
//
//    Brad Whitlock, Mon Mar 25 15:51:39 PST 2002
//    Made it more general.
//
hrchilds's avatar
hrchilds committed
1436
1437
1438
//    Jeremy Meredith, Thu Jul 10 11:37:48 PDT 2003
//    Made the engine an object.
//
hrchilds's avatar
hrchilds committed
1439
1440
1441
//    Mark C. Miller, Thu Jun 10 10:05:09 PDT 2004
//    Modified to use a unique message tag for the interrupt message
//
hrchilds's avatar
hrchilds committed
1442
1443
1444
1445
1446
1447
//    Mark C. Miller, Fri Jun 11 09:39:11 PDT 2004
//    Made xfer local variable conditionally defined as MPIXfer or Xfer
//
//    Mark C. Miller, Fri Jun 11 13:21:42 PDT 2004
//    Made it use a static, file-scope const int as the message tag
//
hrchilds's avatar
hrchilds committed
1448
1449
1450
// ****************************************************************************

bool
hrchilds's avatar
hrchilds committed
1451
Engine::EngineAbortCallbackParallel(void *data, bool informSlaves)
hrchilds's avatar
hrchilds committed
1452
{