vtkPVDataDeliveryManager.cxx 22.2 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14
/*=========================================================================

  Program:   ParaView
  Module:    $RCSfile$

  Copyright (c) Kitware, Inc.
  All rights reserved.
  See Copyright.txt or http://www.paraview.org/HTML/Copyright.html for details.

     This software is distributed WITHOUT ANY WARRANTY; without even
     the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
     PURPOSE.  See the above copyright notice for more information.

=========================================================================*/
15
#include "vtkPVDataDeliveryManager.h"
16 17

#include "vtkAlgorithmOutput.h"
18
#include "vtkBSPCutsGenerator.h"
19
#include "vtkDataObject.h"
20
#include "vtkMPIMoveData.h"
21 22 23 24 25
#include "vtkMultiProcessController.h"
#include "vtkNew.h"
#include "vtkObjectFactory.h"
#include "vtkOrderedCompositeDistributor.h"
#include "vtkPKdTree.h"
26 27
#include "vtkPVDataRepresentation.h"
#include "vtkPVDataRepresentationPipeline.h"
28
#include "vtkPVRenderView.h"
29
#include "vtkPVStreamingMacros.h"
30 31
#include "vtkPVTrivialProducer.h"
#include "vtkSmartPointer.h"
32
#include "vtkStreamingDemandDrivenPipeline.h"
Utkarsh Ayachit's avatar
Utkarsh Ayachit committed
33
#include "vtkTimerLog.h"
34
#include "vtkWeakPointer.h"
35 36

#include <assert.h>
37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72
#include <map>
#include <queue>
#include <utility>

//*****************************************************************************
class vtkPVDataDeliveryManager::vtkInternals
{
public:

  class vtkPriorityQueueItem
    {
  public:
    unsigned int RepresentationId;
    unsigned int BlockId;
    unsigned int Level;
    unsigned int Index;
    double Priority;

    vtkPriorityQueueItem() :
      RepresentationId(0), BlockId(0),
      Level(0), Index(0), Priority(0)
    {
    }

    bool operator < (const vtkPriorityQueueItem& other) const
      {
      return this->Priority < other.Priority;
      }
    };

  typedef std::priority_queue<vtkPriorityQueueItem> PriorityQueueType;
  PriorityQueueType PriorityQueue;

  class vtkItem
    {
    vtkSmartPointer<vtkPVTrivialProducer> Producer;
73 74

    // Data object produced by the representation.
75
    vtkWeakPointer<vtkDataObject> DataObject;
76 77 78 79 80 81 82 83

    // Data object available after delivery to the "rendering" node.
    vtkSmartPointer<vtkDataObject> DeliveredDataObject;

    // Data object after re-distributing when using ordered compositing, for
    // example.
    vtkSmartPointer<vtkDataObject> RedistributedDataObject;

84 85 86
    // Data object for a streamed piece.
    vtkSmartPointer<vtkDataObject> StreamedPiece;

87 88 89
    unsigned long TimeStamp;
    unsigned long ActualMemorySize;
  public:
90 91 92
    vtkWeakPointer<vtkAlgorithmOutput> ImageDataProducer;
      // <-- HACK for image data volume rendering.

93 94 95 96 97 98 99 100 101 102 103 104 105 106 107
    vtkWeakPointer<vtkPVDataRepresentation> Representation;
    bool AlwaysClone;
    bool Redistributable;
    bool Streamable;

    vtkItem() :
      Producer(vtkSmartPointer<vtkPVTrivialProducer>::New()),
      TimeStamp(0),
      ActualMemorySize(0),
      AlwaysClone(false),
      Redistributable(false),
      Streamable(false)
    { }

    void SetDataObject(vtkDataObject* data)
108
      {
109 110
      this->DataObject = data;
      this->ActualMemorySize = data? data->GetActualMemorySize() : 0;
111 112 113

      vtkTimeStamp ts; ts.Modified();
      this->TimeStamp = ts;
114 115
      }

116 117
    void SetDeliveredDataObject(vtkDataObject* data)
      {
118 119 120 121 122 123
      this->DeliveredDataObject = data;
      }

    void SetRedistributedDataObject(vtkDataObject* data)
      {
      this->RedistributedDataObject = data;
124 125 126 127
      }

    vtkDataObject* GetDeliveredDataObject()
      {
128 129 130 131 132 133 134 135 136
      return this->DeliveredDataObject.GetPointer();
      }

    vtkDataObject* GetRedistributedDataObject()
      {
      return this->RedistributedDataObject.GetPointer();
      }

    vtkPVTrivialProducer* GetProducer(bool use_redistributed_data)
137
      {
138 139 140 141 142 143 144 145 146
      if (use_redistributed_data && this->Redistributable)
        {
        this->Producer->SetOutput(this->RedistributedDataObject);
        }
      else
        {
        this->Producer->SetOutput(this->DeliveredDataObject);
        }
      return this->Producer.GetPointer();
147 148
      }

149 150 151 152
    vtkDataObject* GetDataObject() const
      { return this->DataObject.GetPointer(); }
    unsigned long GetTimeStamp() const
      { return this->TimeStamp; }
153

154 155 156 157 158 159 160 161
    unsigned long GetVisibleDataSize()
      {
      if (this->Representation && this->Representation->GetVisibility())
        {
        return this->ActualMemorySize;
        }
      return 0;
      }
162 163 164 165 166 167 168 169 170

    void SetNextStreamedPiece(vtkDataObject* data)
      {
      this->StreamedPiece = data;
      }
    vtkDataObject* GetStreamedPiece()
      {
      return this->StreamedPiece;
      }
171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186
    };

  typedef std::map<unsigned int, std::pair<vtkItem, vtkItem> > ItemsMapType;

  vtkItem* GetItem(unsigned int index, bool use_second)
    {
    if (this->ItemsMap.find(index) != this->ItemsMap.end())
      {
      return use_second? &(this->ItemsMap[index].second) :
        &(this->ItemsMap[index].first);
      }
    return NULL;
    }

  vtkItem* GetItem(vtkPVDataRepresentation* repr, bool use_second)
    {
187
    return this->GetItem(repr->GetUniqueIdentifier(), use_second);
188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206
    }

  unsigned long GetVisibleDataSize(bool use_second_if_available)
    {
    unsigned long size = 0;
    ItemsMapType::iterator iter;
    for (iter = this->ItemsMap.begin(); iter != this->ItemsMap.end(); ++iter)
      {
      if (use_second_if_available && iter->second.second.GetDataObject())
        {
        size += iter->second.second.GetVisibleDataSize();
        }
      else
        {
        size += iter->second.first.GetVisibleDataSize();
        }
      }
    return size;
    }
207

208 209
  ItemsMapType ItemsMap;
};
210

211 212 213
//*****************************************************************************

vtkStandardNewMacro(vtkPVDataDeliveryManager);
214
//----------------------------------------------------------------------------
215
vtkPVDataDeliveryManager::vtkPVDataDeliveryManager()
216 217 218 219 220
  : Internals(new vtkInternals())
{
}

//----------------------------------------------------------------------------
221
vtkPVDataDeliveryManager::~vtkPVDataDeliveryManager()
222 223 224 225 226
{
  delete this->Internals;
  this->Internals = 0;
}

227
//----------------------------------------------------------------------------
228
void vtkPVDataDeliveryManager::SetRenderView(vtkPVRenderView* view)
229
{
230
  this->RenderView = view;
231 232
}

233
//----------------------------------------------------------------------------
234
vtkPVRenderView* vtkPVDataDeliveryManager::GetRenderView()
235
{
236
  return this->RenderView;
237 238
}

Utkarsh Ayachit's avatar
Utkarsh Ayachit committed
239
//----------------------------------------------------------------------------
240
unsigned long vtkPVDataDeliveryManager::GetVisibleDataSize(bool low_res)
Utkarsh Ayachit's avatar
Utkarsh Ayachit committed
241 242 243
{
  return this->Internals->GetVisibleDataSize(low_res);
}
244 245

//----------------------------------------------------------------------------
246
void vtkPVDataDeliveryManager::RegisterRepresentation(vtkPVDataRepresentation* repr)
247
{
248 249
  assert( "A representation must have a valid UniqueIdentifier"
          && repr->GetUniqueIdentifier());
250 251 252

  vtkInternals::vtkItem item;
  item.Representation = repr;
253
  this->Internals->ItemsMap[repr->GetUniqueIdentifier()].first = item;
Utkarsh Ayachit's avatar
Utkarsh Ayachit committed
254 255 256

  vtkInternals::vtkItem item2;
  item2.Representation = repr;
257
  this->Internals->ItemsMap[repr->GetUniqueIdentifier()].second= item2;
258 259 260
}

//----------------------------------------------------------------------------
261
void vtkPVDataDeliveryManager::UnRegisterRepresentation(
262 263
  vtkPVDataRepresentation* repr)
{
264
  this->Internals->ItemsMap.erase(repr->GetUniqueIdentifier());
265 266 267 268 269 270 271 272 273 274
}

//----------------------------------------------------------------------------
vtkPVDataRepresentation* vtkPVDataDeliveryManager::GetRepresentation(
  unsigned int index)
{
  vtkInternals::vtkItem* item = this->Internals->GetItem(index, false);
  return item? item->Representation : NULL;
}

275
//----------------------------------------------------------------------------
276
void vtkPVDataDeliveryManager::SetDeliverToAllProcesses(
Utkarsh Ayachit's avatar
Utkarsh Ayachit committed
277
  vtkPVDataRepresentation* repr, bool mode, bool low_res)
278
{
Utkarsh Ayachit's avatar
Utkarsh Ayachit committed
279
  vtkInternals::vtkItem* item = this->Internals->GetItem(repr, low_res);
280 281
  if (item)
    {
Utkarsh Ayachit's avatar
Utkarsh Ayachit committed
282
    item->AlwaysClone = mode;
283 284 285 286 287 288 289
    }
  else
    {
    vtkErrorMacro("Invalid argument.");
    }
}

290
//----------------------------------------------------------------------------
291
void vtkPVDataDeliveryManager::MarkAsRedistributable(
292 293 294 295 296 297 298 299 300 301 302 303 304 305
  vtkPVDataRepresentation* repr)
{
  vtkInternals::vtkItem* item = this->Internals->GetItem(repr, false);
  vtkInternals::vtkItem* low_item = this->Internals->GetItem(repr, true);
  if (item)
    {
    item->Redistributable = true;
    low_item->Redistributable = true;
    }
  else
    {
    vtkErrorMacro("Invalid argument.");
    }
}
Utkarsh Ayachit's avatar
Utkarsh Ayachit committed
306

307
//----------------------------------------------------------------------------
308
void vtkPVDataDeliveryManager::SetStreamable(
309 310 311 312 313 314 315 316 317 318 319 320 321 322 323
  vtkPVDataRepresentation* repr, bool val)
{
  vtkInternals::vtkItem* item = this->Internals->GetItem(repr, false);
  vtkInternals::vtkItem* low_item = this->Internals->GetItem(repr, true);
  if (item)
    {
    item->Streamable = val;
    low_item->Streamable = val;
    }
  else
    {
    vtkErrorMacro("Invalid argument.");
    }
}

324
//----------------------------------------------------------------------------
325
void vtkPVDataDeliveryManager::SetPiece(
Utkarsh Ayachit's avatar
Utkarsh Ayachit committed
326
  vtkPVDataRepresentation* repr, vtkDataObject* data, bool low_res)
327
{
Utkarsh Ayachit's avatar
Utkarsh Ayachit committed
328
  vtkInternals::vtkItem* item = this->Internals->GetItem(repr, low_res);
329 330
  if (item)
    {
331 332
    vtkPVDataRepresentationPipeline* executive =
      vtkPVDataRepresentationPipeline::SafeDownCast(repr->GetExecutive());
333 334

    // SetPiece() is called in every REQUEST_UPDATE() or REQUEST_UPDATE_LOD()
335
    // pass irrespective of whether the data has actually changed.
336 337 338 339 340 341 342 343 344 345
    // (I think that's a mistake, but the fact that representations can be
    // updated without view makes it tricky since we cannot set the data to
    // deliver in vtkPVDataRepresentation::RequestData() easily). Hence we need
    // to ensure that the data we are getting is newer than what we have.
    unsigned long data_time = executive? executive->GetDataTime() : 0;
    if (data && (data->GetMTime() > data_time))
      {
      data_time = data->GetMTime();
      }
    if (data_time > item->GetTimeStamp())
346 347 348
      {
      item->SetDataObject(data);
      }
349 350
    }
  else
Utkarsh Ayachit's avatar
Utkarsh Ayachit committed
351 352 353 354 355 356
    {
    vtkErrorMacro("Invalid argument.");
    }
}

//----------------------------------------------------------------------------
357
vtkAlgorithmOutput* vtkPVDataDeliveryManager::GetProducer(
Utkarsh Ayachit's avatar
Utkarsh Ayachit committed
358 359 360 361
  vtkPVDataRepresentation* repr, bool low_res)
{
  vtkInternals::vtkItem* item = this->Internals->GetItem(repr, low_res);
  if (!item)
362 363
    {
    vtkErrorMacro("Invalid arguments.");
Utkarsh Ayachit's avatar
Utkarsh Ayachit committed
364 365 366
    return NULL;
    }

367 368
  return item->GetProducer(
    this->RenderView->GetUseOrderedCompositing())->GetOutputPort(0);
Utkarsh Ayachit's avatar
Utkarsh Ayachit committed
369 370 371
}

//----------------------------------------------------------------------------
372
void vtkPVDataDeliveryManager::SetPiece(unsigned int id, vtkDataObject* data, bool
Utkarsh Ayachit's avatar
Utkarsh Ayachit committed
373 374 375 376 377 378 379 380 381 382
  low_res)
{
  vtkInternals::vtkItem* item = this->Internals->GetItem(id, low_res);
  if (item)
    {
    item->SetDataObject(data);
    }
  else
    {
    vtkErrorMacro("Invalid argument.");
383 384 385
    }
}

386 387 388 389 390 391 392 393 394 395 396 397 398 399 400
//----------------------------------------------------------------------------
void vtkPVDataDeliveryManager::SetImageDataProducer(
  vtkPVDataRepresentation* repr, vtkAlgorithmOutput *producer)
{
  vtkInternals::vtkItem* item = this->Internals->GetItem(repr, false);
  if (item)
    {
    item->ImageDataProducer = producer;
    }
  else
    {
    vtkErrorMacro("Invalid argument.");
    }
}

401
//----------------------------------------------------------------------------
402
vtkAlgorithmOutput* vtkPVDataDeliveryManager::GetProducer(
403
  unsigned int id, bool low_res)
404
{
Utkarsh Ayachit's avatar
Utkarsh Ayachit committed
405
  vtkInternals::vtkItem* item = this->Internals->GetItem(id, low_res);
406 407 408 409 410 411
  if (!item)
    {
    vtkErrorMacro("Invalid arguments.");
    return NULL;
    }

412 413
  return item->GetProducer(
    this->RenderView->GetUseOrderedCompositing())->GetOutputPort(0);
414 415
}

416
//----------------------------------------------------------------------------
417
bool vtkPVDataDeliveryManager::NeedsDelivery(
418
  unsigned long timestamp,
419
  std::vector<unsigned int> &keys_to_deliver, bool use_low)
420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436
{
  vtkInternals::ItemsMapType::iterator iter;
  for (iter = this->Internals->ItemsMap.begin();
    iter != this->Internals->ItemsMap.end(); ++iter)
    {
    vtkInternals::vtkItem& item = use_low? iter->second.second : iter->second.first;
    if (item.Representation &&
      item.Representation->GetVisibility() &&
      item.GetTimeStamp() > timestamp)
      {
      keys_to_deliver.push_back(iter->first);
      }
    }
  return keys_to_deliver.size() > 0;
}

//----------------------------------------------------------------------------
437
void vtkPVDataDeliveryManager::Deliver(int use_lod, unsigned int size, unsigned int *values)
438 439

{
Utkarsh Ayachit's avatar
Utkarsh Ayachit committed
440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455
  // This method gets called on all processes with the list of representations
  // to "deliver". We check with the view what mode we're operating in and
  // decide where the data needs to be delivered.
  //
  // Representations can provide overrides, e.g. though the view says data is
  // merely "pass-through", some representation says we need to clone the data
  // everywhere. That makes it critical that this method is called on all
  // processes at the same time to avoid deadlocks and other complications.
  //
  // This method will be implemented in "view-specific" subclasses since how the
  // data is delivered is very view specific.

  vtkTimerLog::MarkStartEvent(use_lod?
    "LowRes Data Migration" : "FullRes Data Migration");

  bool using_remote_rendering =
456 457 458
    use_lod? this->RenderView->GetUseDistributedRenderingForInteractiveRender() :
    this->RenderView->GetUseDistributedRenderingForStillRender();
  int mode = this->RenderView->GetDataDistributionMode(using_remote_rendering);
Utkarsh Ayachit's avatar
Utkarsh Ayachit committed
459

460 461 462 463 464 465
  for (unsigned int cc=0; cc < size; cc++)
    {
    vtkInternals::vtkItem* item = this->Internals->GetItem(values[cc], use_lod !=0);

    vtkDataObject* data = item->GetDataObject();

466
    if (data->IsA("vtkUniformGridAMR"))
467
      {
468 469
      // we are dealing with AMR datasets.
      // We assume for now we're not running in render-server mode. We can
470
      // ensure that at some point in future.
471 472 473
      // So we are either in pass-through or collect mode.

      // FIXME: check that the mode flags are "suitable" for AMR.
474
      }
475

476 477 478 479 480
    vtkNew<vtkMPIMoveData> dataMover;
    dataMover->InitializeForCommunicationForParaView();
    dataMover->SetOutputDataType(data->GetDataObjectType());
    dataMover->SetMoveMode(mode);
    if (item->AlwaysClone)
481
      {
482
      dataMover->SetMoveModeToClone();
483
      }
484 485
    dataMover->SetInputData(data);

486 487 488 489 490 491
    if (dataMover->GetOutputGeneratedOnProcess())
      {
      // release old memory (not necessarily, but try).
      item->SetDeliveredDataObject(NULL);
      }
    dataMover->Update();
492
    if (item->GetDeliveredDataObject() == NULL)
493 494 495
      {
      item->SetDeliveredDataObject(dataMover->GetOutputDataObject(0));
      }
496
    }
497

498 499 500
  vtkTimerLog::MarkEndEvent(use_lod?
    "LowRes Data Migration" : "FullRes Data Migration");
}
501

502 503 504 505 506
//----------------------------------------------------------------------------
void vtkPVDataDeliveryManager::RedistributeDataForOrderedCompositing(
  bool use_lod)
{
  if (this->RenderView->GetUpdateTimeStamp() > this->RedistributionTimeStamp)
507
    {
508 509 510 511
    vtkTimerLog::MarkStartEvent("Regenerate Kd-Tree");
    // need to re-generate the kd-tree.
    this->RedistributionTimeStamp.Modified();

512 513 514 515 516 517 518
    vtkNew<vtkBSPCutsGenerator> cutsGenerator;
    vtkInternals::ItemsMapType::iterator iter;
    for (iter = this->Internals->ItemsMap.begin();
      iter != this->Internals->ItemsMap.end(); ++iter)
      {
      vtkInternals::vtkItem& item =  iter->second.first;
      if (item.Representation &&
519
        item.Representation->GetVisibility())
520
        {
521 522 523 524 525 526 527 528
        if (item.Redistributable)
          {
          cutsGenerator->AddInputData(item.GetDeliveredDataObject());
          }
        else if (item.ImageDataProducer)
          {
          cutsGenerator->AddInputConnection(item.ImageDataProducer);
          }
529 530 531 532 533 534 535 536 537 538 539 540
        }
      }

    vtkMultiProcessController* controller =
      vtkMultiProcessController::GetGlobalController();
    vtkStreamingDemandDrivenPipeline *sddp = vtkStreamingDemandDrivenPipeline::
      SafeDownCast(cutsGenerator->GetExecutive());
    sddp->SetUpdateExtent
      (0,controller->GetLocalProcessId(),controller->GetNumberOfProcesses(),0);
    sddp->Update(0);

    this->KdTree = cutsGenerator->GetPKdTree();
541
    vtkTimerLog::MarkEndEvent("Regenerate Kd-Tree");
542
    }
543 544

  if (this->KdTree == NULL)
545
    {
546
    return;
547 548
    }

549 550 551 552
  vtkTimerLog::MarkStartEvent("Redistributing Data for Ordered Compositing");
  vtkInternals::ItemsMapType::iterator iter;
  for (iter = this->Internals->ItemsMap.begin();
    iter != this->Internals->ItemsMap.end(); ++iter)
553
    {
554 555 556 557 558 559 560 561
    vtkInternals::vtkItem& item = use_lod? iter->second.second : iter->second.first;

    if (!item.Redistributable ||
      item.Representation == NULL ||
      item.Representation->GetVisibility() == false ||

      // delivered object can be null in case we're updating lod and the
      // representation doeesn't have any LOD data.
562
      item.GetDeliveredDataObject() == NULL)
563
      {
564 565
      continue;
      }
566

567 568 569 570 571 572 573 574 575 576 577 578
    if (item.GetRedistributedDataObject() &&

      // input-data didn't change
      (item.GetDeliveredDataObject()->GetMTime() <
       item.GetRedistributedDataObject()->GetMTime()) &&

      // kd-tree didn't change
      (item.GetRedistributedDataObject()->GetMTime() >
       this->KdTree->GetMTime()))
      {
      // skip redistribution.
      continue;
579 580
      }

581 582
    // release old memory (not necessarily, but try).
    item.SetRedistributedDataObject(NULL);
583

584 585 586 587 588 589 590 591 592
    vtkNew<vtkOrderedCompositeDistributor> redistributor;
    redistributor->SetController(vtkMultiProcessController::GetGlobalController());
    redistributor->SetInputData(item.GetDeliveredDataObject());
    redistributor->SetPKdTree(this->KdTree);
    redistributor->SetPassThrough(0);
    redistributor->Update();
    item.SetRedistributedDataObject(redistributor->GetOutputDataObject(0));
    }
  vtkTimerLog::MarkEndEvent("Redistributing Data for Ordered Compositing");
593 594
}

595
//----------------------------------------------------------------------------
596
vtkPKdTree* vtkPVDataDeliveryManager::GetKdTree()
597 598 599
{
  return this->KdTree;
}
600 601

//----------------------------------------------------------------------------
602 603
void vtkPVDataDeliveryManager::SetNextStreamedPiece(
  vtkPVDataRepresentation* repr, vtkDataObject* data)
604
{
605 606 607 608 609 610 611 612 613 614 615
  vtkInternals::vtkItem* item = this->Internals->GetItem(repr, /*low_res=*/false);
  if (item == NULL)
    {
    vtkErrorMacro("Invalid argument.");
    return;
    }
  
  // For now, I am going to keep things simple. Piece is delivered to the
  // representation separately. That's it.
  item->SetNextStreamedPiece(data);
}
616

617 618 619 620 621 622 623 624 625 626 627 628
//----------------------------------------------------------------------------
vtkDataObject* vtkPVDataDeliveryManager::GetCurrentStreamedPiece(
  vtkPVDataRepresentation* repr)
{
  vtkInternals::vtkItem* item = this->Internals->GetItem(repr, /*low_res=*/false);
  if (item == NULL)
    {
    vtkErrorMacro("Invalid argument.");
    return NULL;
    }
  return item->GetStreamedPiece();
}
629

630 631 632 633 634
//----------------------------------------------------------------------------
void vtkPVDataDeliveryManager::ClearStreamedPieces()
{
  // I am not too sure if I want to do this. Right now I am thinking once a
  // piece is delivered, the delivery manager should no longer bother about it.
635 636 637 638 639
  vtkInternals::ItemsMapType::iterator iter;
  for (iter = this->Internals->ItemsMap.begin();
    iter != this->Internals->ItemsMap.end(); ++iter)
    {
    vtkInternals::vtkItem& item = iter->second.first;
640
    item.SetNextStreamedPiece(NULL);
641 642 643 644
    }
}

//----------------------------------------------------------------------------
645 646
bool vtkPVDataDeliveryManager::GetRepresentationsReadyToStreamPieces(
  std::vector<unsigned int>& keys)
647
{
648 649
  // I am not too sure if I want to do this. Right now I am thinking once a
  // piece is delivered, the delivery manager should no longer bother about it.
650 651 652
  vtkInternals::ItemsMapType::iterator iter;
  for (iter = this->Internals->ItemsMap.begin();
    iter != this->Internals->ItemsMap.end(); ++iter)
653
    {
654
    vtkInternals::vtkItem& item = iter->second.first;
655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 683 684 685 686 687 688 689 690 691 692 693 694 695 696 697 698 699
    if (item.Representation &&
      item.Representation->GetVisibility() &&
      item.Streamable &&
      item.GetStreamedPiece())
      {
      keys.push_back(iter->first);
      }
    }
  return (keys.size() > 0);
}

//----------------------------------------------------------------------------
void vtkPVDataDeliveryManager::DeliverStreamedPieces(
  unsigned int size, unsigned int *values)
{
  // This method gets called on all processes to deliver any streamed pieces
  // currently available. This is similar to Deliver(...) except that this deals
  // with only delivering pieces for streaming. 

  bool using_remote_rendering =
    this->RenderView->GetUseDistributedRenderingForStillRender();
  int mode = this->RenderView->GetDataDistributionMode(using_remote_rendering);

  for (unsigned int cc=0; cc < size; cc++)
    {
    vtkInternals::vtkItem* item = this->Internals->GetItem(values[cc], false);

    // FIXME: we need information about the datatype on all processes. For now
    // we assume that the data type is same as the full-data (which is not
    // really necessary). We can API to allow representations to be able to
    // specify the data type.
    vtkDataObject* data = item->GetDataObject();
    vtkDataObject* piece = item->GetStreamedPiece();

    vtkNew<vtkMPIMoveData> dataMover;
    dataMover->InitializeForCommunicationForParaView();
    dataMover->SetOutputDataType(data->GetDataObjectType());
    dataMover->SetMoveMode(mode);
    if (item->AlwaysClone)
      {
      dataMover->SetMoveModeToClone();
      }
    dataMover->SetInputData(piece);
    dataMover->Update();
    if (dataMover->GetOutputGeneratedOnProcess())
700
      {
701
      item->SetNextStreamedPiece(dataMover->GetOutputDataObject(0));
702
      }
703 704 705
    }
}

706
//----------------------------------------------------------------------------
707
void vtkPVDataDeliveryManager::PrintSelf(ostream& os, vtkIndent indent)
708 709 710
{
  this->Superclass::PrintSelf(os, indent);
}
711 712 713 714 715 716 717 718 719 720 721 722 723 724 725

//----------------------------------------------------------------------------
int vtkPVDataDeliveryManager::GetSynchronizationMagicNumber()
{
  const int prime = 31;
  int result = 1;
  result = prime * result + static_cast<int>(this->Internals->ItemsMap.size());
  vtkInternals::ItemsMapType::iterator iter = this->Internals->ItemsMap.begin();
  for(;iter != this->Internals->ItemsMap.end(); iter++)
    {
    result = prime * result + static_cast<int>(iter->first);
    }

  return result;
}