Updates will be applied April 15th at 12pm EDT (UTC-0400). GitLab could be a little slow between 12 - 12:45pm EDT.

vtkPVDataDeliveryManager.cxx 23.3 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14
/*=========================================================================

  Program:   ParaView
  Module:    $RCSfile$

  Copyright (c) Kitware, Inc.
  All rights reserved.
  See Copyright.txt or http://www.paraview.org/HTML/Copyright.html for details.

     This software is distributed WITHOUT ANY WARRANTY; without even
     the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
     PURPOSE.  See the above copyright notice for more information.

=========================================================================*/
15
#include "vtkPVDataDeliveryManager.h"
16 17

#include "vtkAlgorithmOutput.h"
18
#include "vtkDataObject.h"
19 20
#include "vtkExtentTranslator.h"
#include "vtkKdTreeManager.h"
21
#include "vtkMPIMoveData.h"
22 23 24 25 26
#include "vtkMultiProcessController.h"
#include "vtkNew.h"
#include "vtkObjectFactory.h"
#include "vtkOrderedCompositeDistributor.h"
#include "vtkPKdTree.h"
27
#include "vtkPVDataRepresentation.h"
28
#include "vtkPVRenderView.h"
29
#include "vtkPVStreamingMacros.h"
30 31
#include "vtkPVTrivialProducer.h"
#include "vtkSmartPointer.h"
32
#include "vtkStreamingDemandDrivenPipeline.h"
Utkarsh Ayachit's avatar
Utkarsh Ayachit committed
33
#include "vtkTimerLog.h"
34
#include "vtkWeakPointer.h"
35 36

#include <assert.h>
37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69
#include <map>
#include <queue>
#include <utility>

//*****************************************************************************
class vtkPVDataDeliveryManager::vtkInternals
{
public:

  class vtkPriorityQueueItem
    {
  public:
    unsigned int RepresentationId;
    unsigned int BlockId;
    unsigned int Level;
    unsigned int Index;
    double Priority;

    vtkPriorityQueueItem() :
      RepresentationId(0), BlockId(0),
      Level(0), Index(0), Priority(0)
    {
    }

    bool operator < (const vtkPriorityQueueItem& other) const
      {
      return this->Priority < other.Priority;
      }
    };

  typedef std::priority_queue<vtkPriorityQueueItem> PriorityQueueType;
  PriorityQueueType PriorityQueue;

70 71 72 73 74 75 76 77 78
  class vtkOrderedCompositingInfo
    {
  public:
    vtkSmartPointer<vtkExtentTranslator> Translator;
    double Origin[3];
    double Spacing[3];
    int WholeExtent[6];
    };

79 80 81
  class vtkItem
    {
    vtkSmartPointer<vtkPVTrivialProducer> Producer;
82 83

    // Data object produced by the representation.
84
    vtkWeakPointer<vtkDataObject> DataObject;
85 86 87 88 89 90 91 92

    // Data object available after delivery to the "rendering" node.
    vtkSmartPointer<vtkDataObject> DeliveredDataObject;

    // Data object after re-distributing when using ordered compositing, for
    // example.
    vtkSmartPointer<vtkDataObject> RedistributedDataObject;

93 94 95
    // Data object for a streamed piece.
    vtkSmartPointer<vtkDataObject> StreamedPiece;

96 97 98
    unsigned long TimeStamp;
    unsigned long ActualMemorySize;
  public:
99
    vtkOrderedCompositingInfo OrderedCompositingInfo;
100

101
    vtkWeakPointer<vtkPVDataRepresentation> Representation;
102 103 104
    bool CloneDataToAllNodes;
    bool DeliverToClientAndRenderingProcesses;
    bool GatherBeforeDeliveringToClient;
105 106 107 108 109 110 111
    bool Redistributable;
    bool Streamable;

    vtkItem() :
      Producer(vtkSmartPointer<vtkPVTrivialProducer>::New()),
      TimeStamp(0),
      ActualMemorySize(0),
112 113 114
      CloneDataToAllNodes(false),
      DeliverToClientAndRenderingProcesses(false),
      GatherBeforeDeliveringToClient(false),
115 116 117 118 119
      Redistributable(false),
      Streamable(false)
    { }

    void SetDataObject(vtkDataObject* data)
120
      {
121 122
      this->DataObject = data;
      this->ActualMemorySize = data? data->GetActualMemorySize() : 0;
123 124 125

      vtkTimeStamp ts; ts.Modified();
      this->TimeStamp = ts;
126 127
      }

128 129
    void SetDeliveredDataObject(vtkDataObject* data)
      {
130 131 132 133 134 135
      this->DeliveredDataObject = data;
      }

    void SetRedistributedDataObject(vtkDataObject* data)
      {
      this->RedistributedDataObject = data;
136 137 138 139
      }

    vtkDataObject* GetDeliveredDataObject()
      {
140 141 142 143 144 145 146 147 148
      return this->DeliveredDataObject.GetPointer();
      }

    vtkDataObject* GetRedistributedDataObject()
      {
      return this->RedistributedDataObject.GetPointer();
      }

    vtkPVTrivialProducer* GetProducer(bool use_redistributed_data)
149
      {
150 151 152 153 154 155 156 157 158
      if (use_redistributed_data && this->Redistributable)
        {
        this->Producer->SetOutput(this->RedistributedDataObject);
        }
      else
        {
        this->Producer->SetOutput(this->DeliveredDataObject);
        }
      return this->Producer.GetPointer();
159 160
      }

161 162 163 164
    vtkDataObject* GetDataObject() const
      { return this->DataObject.GetPointer(); }
    unsigned long GetTimeStamp() const
      { return this->TimeStamp; }
165

166 167 168 169 170 171 172 173
    unsigned long GetVisibleDataSize()
      {
      if (this->Representation && this->Representation->GetVisibility())
        {
        return this->ActualMemorySize;
        }
      return 0;
      }
174 175 176 177 178 179 180 181 182

    void SetNextStreamedPiece(vtkDataObject* data)
      {
      this->StreamedPiece = data;
      }
    vtkDataObject* GetStreamedPiece()
      {
      return this->StreamedPiece;
      }
183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198
    };

  typedef std::map<unsigned int, std::pair<vtkItem, vtkItem> > ItemsMapType;

  vtkItem* GetItem(unsigned int index, bool use_second)
    {
    if (this->ItemsMap.find(index) != this->ItemsMap.end())
      {
      return use_second? &(this->ItemsMap[index].second) :
        &(this->ItemsMap[index].first);
      }
    return NULL;
    }

  vtkItem* GetItem(vtkPVDataRepresentation* repr, bool use_second)
    {
199
    return this->GetItem(repr->GetUniqueIdentifier(), use_second);
200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218
    }

  unsigned long GetVisibleDataSize(bool use_second_if_available)
    {
    unsigned long size = 0;
    ItemsMapType::iterator iter;
    for (iter = this->ItemsMap.begin(); iter != this->ItemsMap.end(); ++iter)
      {
      if (use_second_if_available && iter->second.second.GetDataObject())
        {
        size += iter->second.second.GetVisibleDataSize();
        }
      else
        {
        size += iter->second.first.GetVisibleDataSize();
        }
      }
    return size;
    }
219

220 221
  ItemsMapType ItemsMap;
};
222

223 224 225
//*****************************************************************************

vtkStandardNewMacro(vtkPVDataDeliveryManager);
226
//----------------------------------------------------------------------------
227
vtkPVDataDeliveryManager::vtkPVDataDeliveryManager()
228 229 230 231 232
  : Internals(new vtkInternals())
{
}

//----------------------------------------------------------------------------
233
vtkPVDataDeliveryManager::~vtkPVDataDeliveryManager()
234 235 236 237 238
{
  delete this->Internals;
  this->Internals = 0;
}

239
//----------------------------------------------------------------------------
240
void vtkPVDataDeliveryManager::SetRenderView(vtkPVRenderView* view)
241
{
242
  this->RenderView = view;
243 244
}

245
//----------------------------------------------------------------------------
246
vtkPVRenderView* vtkPVDataDeliveryManager::GetRenderView()
247
{
248
  return this->RenderView;
249 250
}

Utkarsh Ayachit's avatar
Utkarsh Ayachit committed
251
//----------------------------------------------------------------------------
252
unsigned long vtkPVDataDeliveryManager::GetVisibleDataSize(bool low_res)
Utkarsh Ayachit's avatar
Utkarsh Ayachit committed
253 254 255
{
  return this->Internals->GetVisibleDataSize(low_res);
}
256 257

//----------------------------------------------------------------------------
258
void vtkPVDataDeliveryManager::RegisterRepresentation(vtkPVDataRepresentation* repr)
259
{
260 261
  assert( "A representation must have a valid UniqueIdentifier"
          && repr->GetUniqueIdentifier());
262 263 264

  vtkInternals::vtkItem item;
  item.Representation = repr;
265
  this->Internals->ItemsMap[repr->GetUniqueIdentifier()].first = item;
Utkarsh Ayachit's avatar
Utkarsh Ayachit committed
266 267 268

  vtkInternals::vtkItem item2;
  item2.Representation = repr;
269
  this->Internals->ItemsMap[repr->GetUniqueIdentifier()].second= item2;
270 271 272
}

//----------------------------------------------------------------------------
273
void vtkPVDataDeliveryManager::UnRegisterRepresentation(
274 275
  vtkPVDataRepresentation* repr)
{
276
  this->Internals->ItemsMap.erase(repr->GetUniqueIdentifier());
277 278 279 280 281 282 283 284 285 286
}

//----------------------------------------------------------------------------
vtkPVDataRepresentation* vtkPVDataDeliveryManager::GetRepresentation(
  unsigned int index)
{
  vtkInternals::vtkItem* item = this->Internals->GetItem(index, false);
  return item? item->Representation : NULL;
}

287
//----------------------------------------------------------------------------
288
void vtkPVDataDeliveryManager::SetDeliverToAllProcesses(
Utkarsh Ayachit's avatar
Utkarsh Ayachit committed
289
  vtkPVDataRepresentation* repr, bool mode, bool low_res)
290
{
Utkarsh Ayachit's avatar
Utkarsh Ayachit committed
291
  vtkInternals::vtkItem* item = this->Internals->GetItem(repr, low_res);
292 293
  if (item)
    {
294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311
    item->CloneDataToAllNodes = mode;
    }
  else
    {
    vtkErrorMacro("Invalid argument.");
    }
}

//----------------------------------------------------------------------------
void vtkPVDataDeliveryManager::SetDeliverToClientAndRenderingProcesses(
  vtkPVDataRepresentation* repr, bool deliver_to_client,
  bool gather_before_delivery, bool low_res)
{
  vtkInternals::vtkItem* item = this->Internals->GetItem(repr, low_res);
  if (item)
    {
    item->DeliverToClientAndRenderingProcesses = deliver_to_client;
    item->GatherBeforeDeliveringToClient = gather_before_delivery;
312 313 314 315 316 317 318
    }
  else
    {
    vtkErrorMacro("Invalid argument.");
    }
}

319
//----------------------------------------------------------------------------
320
void vtkPVDataDeliveryManager::MarkAsRedistributable(
321
  vtkPVDataRepresentation* repr, bool value/*=true*/)
322 323 324 325 326
{
  vtkInternals::vtkItem* item = this->Internals->GetItem(repr, false);
  vtkInternals::vtkItem* low_item = this->Internals->GetItem(repr, true);
  if (item)
    {
327 328
    item->Redistributable = value;
    low_item->Redistributable = value;
329 330 331 332 333 334
    }
  else
    {
    vtkErrorMacro("Invalid argument.");
    }
}
Utkarsh Ayachit's avatar
Utkarsh Ayachit committed
335

336
//----------------------------------------------------------------------------
337
void vtkPVDataDeliveryManager::SetStreamable(
338 339 340 341 342 343 344 345 346 347 348 349 350 351 352
  vtkPVDataRepresentation* repr, bool val)
{
  vtkInternals::vtkItem* item = this->Internals->GetItem(repr, false);
  vtkInternals::vtkItem* low_item = this->Internals->GetItem(repr, true);
  if (item)
    {
    item->Streamable = val;
    low_item->Streamable = val;
    }
  else
    {
    vtkErrorMacro("Invalid argument.");
    }
}

353
//----------------------------------------------------------------------------
354
void vtkPVDataDeliveryManager::SetPiece(
Utkarsh Ayachit's avatar
Utkarsh Ayachit committed
355
  vtkPVDataRepresentation* repr, vtkDataObject* data, bool low_res)
356
{
Utkarsh Ayachit's avatar
Utkarsh Ayachit committed
357
  vtkInternals::vtkItem* item = this->Internals->GetItem(repr, low_res);
358 359
  if (item)
    {
360
    unsigned long data_time = 0;
361 362 363 364
    if (data && (data->GetMTime() > data_time))
      {
      data_time = data->GetMTime();
      }
365 366
    if (data_time > item->GetTimeStamp() ||
      item->GetDataObject() != data)
367 368 369
      {
      item->SetDataObject(data);
      }
370 371
    }
  else
Utkarsh Ayachit's avatar
Utkarsh Ayachit committed
372 373 374 375 376 377
    {
    vtkErrorMacro("Invalid argument.");
    }
}

//----------------------------------------------------------------------------
378
vtkAlgorithmOutput* vtkPVDataDeliveryManager::GetProducer(
Utkarsh Ayachit's avatar
Utkarsh Ayachit committed
379 380 381 382
  vtkPVDataRepresentation* repr, bool low_res)
{
  vtkInternals::vtkItem* item = this->Internals->GetItem(repr, low_res);
  if (!item)
383 384
    {
    vtkErrorMacro("Invalid arguments.");
Utkarsh Ayachit's avatar
Utkarsh Ayachit committed
385 386 387
    return NULL;
    }

388 389
  return item->GetProducer(
    this->RenderView->GetUseOrderedCompositing())->GetOutputPort(0);
Utkarsh Ayachit's avatar
Utkarsh Ayachit committed
390 391 392
}

//----------------------------------------------------------------------------
393
void vtkPVDataDeliveryManager::SetPiece(unsigned int id, vtkDataObject* data, bool
Utkarsh Ayachit's avatar
Utkarsh Ayachit committed
394 395 396 397 398 399 400 401 402 403
  low_res)
{
  vtkInternals::vtkItem* item = this->Internals->GetItem(id, low_res);
  if (item)
    {
    item->SetDataObject(data);
    }
  else
    {
    vtkErrorMacro("Invalid argument.");
404 405 406
    }
}

407
//----------------------------------------------------------------------------
408 409 410
void vtkPVDataDeliveryManager::SetOrderedCompositingInformation(
  vtkPVDataRepresentation* repr, vtkExtentTranslator* translator,
  const int whole_extents[6], const double origin[3], const double spacing[3])
411 412 413 414
{
  vtkInternals::vtkItem* item = this->Internals->GetItem(repr, false);
  if (item)
    {
415 416 417 418 419 420 421
    vtkInternals::vtkOrderedCompositingInfo info;
    info.Translator = translator;
    memcpy(info.WholeExtent, whole_extents, sizeof(int)*6);
    memcpy(info.Origin, origin, sizeof(double)*3);
    memcpy(info.Spacing, spacing, sizeof(double)*3);

    item->OrderedCompositingInfo = info;
422 423 424 425 426 427 428
    }
  else
    {
    vtkErrorMacro("Invalid argument.");
    }
}

429
//----------------------------------------------------------------------------
430
vtkAlgorithmOutput* vtkPVDataDeliveryManager::GetProducer(
431
  unsigned int id, bool low_res)
432
{
Utkarsh Ayachit's avatar
Utkarsh Ayachit committed
433
  vtkInternals::vtkItem* item = this->Internals->GetItem(id, low_res);
434 435 436 437 438 439
  if (!item)
    {
    vtkErrorMacro("Invalid arguments.");
    return NULL;
    }

440 441
  return item->GetProducer(
    this->RenderView->GetUseOrderedCompositing())->GetOutputPort(0);
442 443
}

444
//----------------------------------------------------------------------------
445
bool vtkPVDataDeliveryManager::NeedsDelivery(
446
  unsigned long timestamp,
447
  std::vector<unsigned int> &keys_to_deliver, bool use_low)
448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464
{
  vtkInternals::ItemsMapType::iterator iter;
  for (iter = this->Internals->ItemsMap.begin();
    iter != this->Internals->ItemsMap.end(); ++iter)
    {
    vtkInternals::vtkItem& item = use_low? iter->second.second : iter->second.first;
    if (item.Representation &&
      item.Representation->GetVisibility() &&
      item.GetTimeStamp() > timestamp)
      {
      keys_to_deliver.push_back(iter->first);
      }
    }
  return keys_to_deliver.size() > 0;
}

//----------------------------------------------------------------------------
465
void vtkPVDataDeliveryManager::Deliver(int use_lod, unsigned int size, unsigned int *values)
466 467

{
Utkarsh Ayachit's avatar
Utkarsh Ayachit committed
468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483
  // This method gets called on all processes with the list of representations
  // to "deliver". We check with the view what mode we're operating in and
  // decide where the data needs to be delivered.
  //
  // Representations can provide overrides, e.g. though the view says data is
  // merely "pass-through", some representation says we need to clone the data
  // everywhere. That makes it critical that this method is called on all
  // processes at the same time to avoid deadlocks and other complications.
  //
  // This method will be implemented in "view-specific" subclasses since how the
  // data is delivered is very view specific.

  vtkTimerLog::MarkStartEvent(use_lod?
    "LowRes Data Migration" : "FullRes Data Migration");

  bool using_remote_rendering =
484 485 486
    use_lod? this->RenderView->GetUseDistributedRenderingForInteractiveRender() :
    this->RenderView->GetUseDistributedRenderingForStillRender();
  int mode = this->RenderView->GetDataDistributionMode(using_remote_rendering);
Utkarsh Ayachit's avatar
Utkarsh Ayachit committed
487

488 489 490 491 492 493
  for (unsigned int cc=0; cc < size; cc++)
    {
    vtkInternals::vtkItem* item = this->Internals->GetItem(values[cc], use_lod !=0);

    vtkDataObject* data = item->GetDataObject();

494 495 496 497 498 499
//    if (data != NULL && data->IsA("vtkUniformGridAMR"))
//      {
//      // we are dealing with AMR datasets.
//      // We assume for now we're not running in render-server mode. We can
//      // ensure that at some point in future.
//      // So we are either in pass-through or collect mode.
500

501 502
//      // FIXME: check that the mode flags are "suitable" for AMR.
//      }
503

504 505
    vtkNew<vtkMPIMoveData> dataMover;
    dataMover->InitializeForCommunicationForParaView();
506
    dataMover->SetOutputDataType(data ? data->GetDataObjectType() : VTK_POLY_DATA);
507
    dataMover->SetMoveMode(mode);
508
    if (item->CloneDataToAllNodes)
509
      {
510
      dataMover->SetMoveModeToClone();
511
      }
512 513 514 515 516 517 518 519 520 521 522 523 524 525
    else if (item->DeliverToClientAndRenderingProcesses)
      {
      if (mode == vtkMPIMoveData::PASS_THROUGH)
        {
        dataMover->SetMoveMode(vtkMPIMoveData::COLLECT_AND_PASS_THROUGH);
        }
      else
        {
        // nothing to do, since the data is going to be delivered to the client
        // anyways.
        }
      dataMover->SetSkipDataServerGatherToZero(
        item->GatherBeforeDeliveringToClient == false);
      }
526 527
    dataMover->SetInputData(data);

528 529 530 531 532 533
    if (dataMover->GetOutputGeneratedOnProcess())
      {
      // release old memory (not necessarily, but try).
      item->SetDeliveredDataObject(NULL);
      }
    dataMover->Update();
534
    if (item->GetDeliveredDataObject() == NULL)
535 536 537
      {
      item->SetDeliveredDataObject(dataMover->GetOutputDataObject(0));
      }
538
    }
539

540 541 542
  vtkTimerLog::MarkEndEvent(use_lod?
    "LowRes Data Migration" : "FullRes Data Migration");
}
543

544 545 546 547 548
//----------------------------------------------------------------------------
void vtkPVDataDeliveryManager::RedistributeDataForOrderedCompositing(
  bool use_lod)
{
  if (this->RenderView->GetUpdateTimeStamp() > this->RedistributionTimeStamp)
549
    {
550 551 552 553
    vtkTimerLog::MarkStartEvent("Regenerate Kd-Tree");
    // need to re-generate the kd-tree.
    this->RedistributionTimeStamp.Modified();

554
    vtkNew<vtkKdTreeManager> cutsGenerator;
555 556 557 558 559 560
    vtkInternals::ItemsMapType::iterator iter;
    for (iter = this->Internals->ItemsMap.begin();
      iter != this->Internals->ItemsMap.end(); ++iter)
      {
      vtkInternals::vtkItem& item =  iter->second.first;
      if (item.Representation &&
561
        item.Representation->GetVisibility())
562
        {
563
        if (item.OrderedCompositingInfo.Translator)
564
          {
565 566 567 568 569 570
          // implies that the representation is providing us with means to
          // override how the ordered compositing happens.
          const vtkInternals::vtkOrderedCompositingInfo &info =
            item.OrderedCompositingInfo;
          cutsGenerator->SetStructuredDataInformation(info.Translator,
            info.WholeExtent, info.Origin, info.Spacing);
571
          }
572
        else if (item.Redistributable)
573
          {
574
          cutsGenerator->AddDataObject(item.GetDeliveredDataObject());
575
          }
576 577
        }
      }
578 579
    cutsGenerator->GenerateKdTree();
    this->KdTree = cutsGenerator->GetKdTree();
580

581
    vtkTimerLog::MarkEndEvent("Regenerate Kd-Tree");
582
    }
583 584

  if (this->KdTree == NULL)
585
    {
586
    return;
587 588
    }

589 590 591 592
  vtkTimerLog::MarkStartEvent("Redistributing Data for Ordered Compositing");
  vtkInternals::ItemsMapType::iterator iter;
  for (iter = this->Internals->ItemsMap.begin();
    iter != this->Internals->ItemsMap.end(); ++iter)
593
    {
594 595 596 597 598 599 600 601
    vtkInternals::vtkItem& item = use_lod? iter->second.second : iter->second.first;

    if (!item.Redistributable ||
      item.Representation == NULL ||
      item.Representation->GetVisibility() == false ||

      // delivered object can be null in case we're updating lod and the
      // representation doeesn't have any LOD data.
602
      item.GetDeliveredDataObject() == NULL)
603
      {
604 605
      continue;
      }
606

607 608 609 610 611 612 613 614 615 616 617 618
    if (item.GetRedistributedDataObject() &&

      // input-data didn't change
      (item.GetDeliveredDataObject()->GetMTime() <
       item.GetRedistributedDataObject()->GetMTime()) &&

      // kd-tree didn't change
      (item.GetRedistributedDataObject()->GetMTime() >
       this->KdTree->GetMTime()))
      {
      // skip redistribution.
      continue;
619 620
      }

621 622
    // release old memory (not necessarily, but try).
    item.SetRedistributedDataObject(NULL);
623

624 625 626 627 628 629 630 631 632
    vtkNew<vtkOrderedCompositeDistributor> redistributor;
    redistributor->SetController(vtkMultiProcessController::GetGlobalController());
    redistributor->SetInputData(item.GetDeliveredDataObject());
    redistributor->SetPKdTree(this->KdTree);
    redistributor->SetPassThrough(0);
    redistributor->Update();
    item.SetRedistributedDataObject(redistributor->GetOutputDataObject(0));
    }
  vtkTimerLog::MarkEndEvent("Redistributing Data for Ordered Compositing");
633 634
}

635
//----------------------------------------------------------------------------
636
vtkPKdTree* vtkPVDataDeliveryManager::GetKdTree()
637 638 639
{
  return this->KdTree;
}
640 641

//----------------------------------------------------------------------------
642 643
void vtkPVDataDeliveryManager::SetNextStreamedPiece(
  vtkPVDataRepresentation* repr, vtkDataObject* data)
644
{
645 646 647 648 649 650 651 652 653 654 655
  vtkInternals::vtkItem* item = this->Internals->GetItem(repr, /*low_res=*/false);
  if (item == NULL)
    {
    vtkErrorMacro("Invalid argument.");
    return;
    }
  
  // For now, I am going to keep things simple. Piece is delivered to the
  // representation separately. That's it.
  item->SetNextStreamedPiece(data);
}
656

657 658 659 660 661 662 663 664 665 666 667 668
//----------------------------------------------------------------------------
vtkDataObject* vtkPVDataDeliveryManager::GetCurrentStreamedPiece(
  vtkPVDataRepresentation* repr)
{
  vtkInternals::vtkItem* item = this->Internals->GetItem(repr, /*low_res=*/false);
  if (item == NULL)
    {
    vtkErrorMacro("Invalid argument.");
    return NULL;
    }
  return item->GetStreamedPiece();
}
669

670 671 672 673 674
//----------------------------------------------------------------------------
void vtkPVDataDeliveryManager::ClearStreamedPieces()
{
  // I am not too sure if I want to do this. Right now I am thinking once a
  // piece is delivered, the delivery manager should no longer bother about it.
675 676 677 678 679
  vtkInternals::ItemsMapType::iterator iter;
  for (iter = this->Internals->ItemsMap.begin();
    iter != this->Internals->ItemsMap.end(); ++iter)
    {
    vtkInternals::vtkItem& item = iter->second.first;
680
    item.SetNextStreamedPiece(NULL);
681 682 683 684
    }
}

//----------------------------------------------------------------------------
685 686
bool vtkPVDataDeliveryManager::GetRepresentationsReadyToStreamPieces(
  std::vector<unsigned int>& keys)
687
{
688 689
  // I am not too sure if I want to do this. Right now I am thinking once a
  // piece is delivered, the delivery manager should no longer bother about it.
690 691 692
  vtkInternals::ItemsMapType::iterator iter;
  for (iter = this->Internals->ItemsMap.begin();
    iter != this->Internals->ItemsMap.end(); ++iter)
693
    {
694
    vtkInternals::vtkItem& item = iter->second.first;
695 696 697 698 699 700 701 702 703 704 705 706 707 708 709 710 711 712 713 714 715 716 717 718 719 720 721 722 723 724 725 726 727 728 729 730 731 732
    if (item.Representation &&
      item.Representation->GetVisibility() &&
      item.Streamable &&
      item.GetStreamedPiece())
      {
      keys.push_back(iter->first);
      }
    }
  return (keys.size() > 0);
}

//----------------------------------------------------------------------------
void vtkPVDataDeliveryManager::DeliverStreamedPieces(
  unsigned int size, unsigned int *values)
{
  // This method gets called on all processes to deliver any streamed pieces
  // currently available. This is similar to Deliver(...) except that this deals
  // with only delivering pieces for streaming. 

  bool using_remote_rendering =
    this->RenderView->GetUseDistributedRenderingForStillRender();
  int mode = this->RenderView->GetDataDistributionMode(using_remote_rendering);

  for (unsigned int cc=0; cc < size; cc++)
    {
    vtkInternals::vtkItem* item = this->Internals->GetItem(values[cc], false);

    // FIXME: we need information about the datatype on all processes. For now
    // we assume that the data type is same as the full-data (which is not
    // really necessary). We can API to allow representations to be able to
    // specify the data type.
    vtkDataObject* data = item->GetDataObject();
    vtkDataObject* piece = item->GetStreamedPiece();

    vtkNew<vtkMPIMoveData> dataMover;
    dataMover->InitializeForCommunicationForParaView();
    dataMover->SetOutputDataType(data->GetDataObjectType());
    dataMover->SetMoveMode(mode);
733
    if (item->CloneDataToAllNodes)
734 735 736 737 738 739
      {
      dataMover->SetMoveModeToClone();
      }
    dataMover->SetInputData(piece);
    dataMover->Update();
    if (dataMover->GetOutputGeneratedOnProcess())
740
      {
741
      item->SetNextStreamedPiece(dataMover->GetOutputDataObject(0));
742
      }
743 744 745
    }
}

746
//----------------------------------------------------------------------------
747
void vtkPVDataDeliveryManager::PrintSelf(ostream& os, vtkIndent indent)
748 749 750
{
  this->Superclass::PrintSelf(os, indent);
}
751 752 753 754 755 756 757 758 759 760 761 762 763 764 765

//----------------------------------------------------------------------------
int vtkPVDataDeliveryManager::GetSynchronizationMagicNumber()
{
  const int prime = 31;
  int result = 1;
  result = prime * result + static_cast<int>(this->Internals->ItemsMap.size());
  vtkInternals::ItemsMapType::iterator iter = this->Internals->ItemsMap.begin();
  for(;iter != this->Internals->ItemsMap.end(); iter++)
    {
    result = prime * result + static_cast<int>(iter->first);
    }

  return result;
}