Commit f8e2dbff authored by Utkarsh Ayachit's avatar Utkarsh Ayachit

Adding support for streaming in client/server configurations.

"AMR Blocks" streaming now works in client-server with local or remote
rendering. This updates the logic for delivering streamed pieces to rendering
nodes. Although both remote/local rendering are supported for streaming, if the
rendering mode is changed in between, the streaming is not restarted (currently)
and hence the blocks already delivered will not be available on the other
processes.

Tile display configuration also works.
parent 52c7392f
......@@ -77,6 +77,7 @@ set (Module_SRCS
vtkPVRenderView.cxx
vtkPVRepresentedDataInformation.cxx
vtkPVSelectionInformation.cxx
vtkPVStreamingPiecesInformation.cxx
vtkPVSynchronizedRenderer.cxx
vtkPVSynchronizedRenderWindows.cxx
vtkPVView.cxx
......
......@@ -783,40 +783,65 @@ void vtkPVDataDeliveryManager::ClearStreamedPieces()
}
//----------------------------------------------------------------------------
bool vtkPVDataDeliveryManager::DeliverStreamedPieces()
bool vtkPVDataDeliveryManager::GetRepresentationsReadyToStreamPieces(
std::vector<unsigned int>& keys)
{
// This method gets called on all processes to deliver any streamed pieces
// currently available.
// Representations can provide overrides, e.g. though the view says data is
// merely "pass-through", some representation says we need to clone the data
// everywhere. That makes it critical that this method is called on all
// processes at the same time to avoid deadlocks and other complications.
//
// This method will be implemented in "view-specific" subclasses since how the
// data is delivered is very view specific.
//bool using_remote_rendering =
// this->RenderView->GetUseDistributedRenderingForStillRender();
//int mode = this->RenderView->GetDataDistributionMode(using_remote_rendering);
// FIXME: This only support built-in operation for now. We will have to add
// support for client-server modes. The challenge is how do we deliver data
// "gracefully".
bool something_delivered = false;
// I am not too sure if I want to do this. Right now I am thinking once a
// piece is delivered, the delivery manager should no longer bother about it.
vtkInternals::ItemsMapType::iterator iter;
for (iter = this->Internals->ItemsMap.begin();
iter != this->Internals->ItemsMap.end(); ++iter)
{
vtkInternals::vtkItem& item = iter->second.first;
if (item.Representation && item.Streamable && item.GetStreamedPiece())
if (item.Representation &&
item.Representation->GetVisibility() &&
item.Streamable &&
item.GetStreamedPiece())
{
keys.push_back(iter->first);
}
}
return (keys.size() > 0);
}
//----------------------------------------------------------------------------
void vtkPVDataDeliveryManager::DeliverStreamedPieces(
unsigned int size, unsigned int *values)
{
// This method gets called on all processes to deliver any streamed pieces
// currently available. This is similar to Deliver(...) except that this deals
// with only delivering pieces for streaming.
bool using_remote_rendering =
this->RenderView->GetUseDistributedRenderingForStillRender();
int mode = this->RenderView->GetDataDistributionMode(using_remote_rendering);
for (unsigned int cc=0; cc < size; cc++)
{
vtkInternals::vtkItem* item = this->Internals->GetItem(values[cc], false);
// FIXME: we need information about the datatype on all processes. For now
// we assume that the data type is same as the full-data (which is not
// really necessary). We can API to allow representations to be able to
// specify the data type.
vtkDataObject* data = item->GetDataObject();
vtkDataObject* piece = item->GetStreamedPiece();
vtkNew<vtkMPIMoveData> dataMover;
dataMover->InitializeForCommunicationForParaView();
dataMover->SetOutputDataType(data->GetDataObjectType());
dataMover->SetMoveMode(mode);
if (item->AlwaysClone)
{
dataMover->SetMoveModeToClone();
}
dataMover->SetInputData(piece);
dataMover->Update();
if (dataMover->GetOutputGeneratedOnProcess())
{
// FIXME: do data delivery.
something_delivered = true;
item->SetNextStreamedPiece(dataMover->GetOutputDataObject(0));
}
}
return something_delivered;
}
//----------------------------------------------------------------------------
......
......@@ -147,7 +147,14 @@ public:
// Deliver streamed pieces. Unlike regular data, streamed pieces are delivered
// and released. Representations are expected to manage the pieces once they
// are delivered to them.
bool DeliverStreamedPieces();
void DeliverStreamedPieces(unsigned int size, unsigned int *keys);
//BTX
// Description:
// Fills up the vector with the keys for representations that have non-null
// streaming pieces.
bool GetRepresentationsReadyToStreamPieces(std::vector<unsigned int>& keys);
//ETX
// *******************************************************************
......
......@@ -659,6 +659,7 @@ void vtkPVRenderView::SynchronizeGeometryBounds()
//----------------------------------------------------------------------------
bool vtkPVRenderView::GetLocalProcessDoesRendering(bool using_distributed_rendering)
{
switch (vtkProcessModule::GetProcessType())
{
case vtkProcessModule::PROCESS_DATA_SERVER:
......@@ -668,7 +669,9 @@ bool vtkPVRenderView::GetLocalProcessDoesRendering(bool using_distributed_render
return true;
default:
return using_distributed_rendering;
return using_distributed_rendering ||
this->InTileDisplayMode() ||
this->SynchronizedWindows->GetIsInCave();
}
}
......@@ -1439,22 +1442,29 @@ void vtkPVRenderView::StreamingUpdate(const double view_planes[24])
}
//----------------------------------------------------------------------------
bool vtkPVRenderView::DeliverStreamedPieces()
void vtkPVRenderView::DeliverStreamedPieces(
unsigned int size, unsigned int *representation_ids)
{
// the plan now is to fetch the piece and then simply give it to the
// representation as "next piece". Representation can decide what to do with
// it, including adding to the existing datastructure.
vtkTimerLog::MarkStartEvent("vtkPVRenderView::DeliverStreamedPieces");
bool ret_val = this->Internals->DeliveryManager->DeliverStreamedPieces();
this->Internals->DeliveryManager->DeliverStreamedPieces(
size, representation_ids);
// tell representations to "deal with" the newly streamed piece.
this->CallProcessViewRequest(vtkPVRenderView::REQUEST_PROCESS_STREAMED_PIECE(),
this->RequestInformation, this->ReplyInformationVector);
if (this->GetLocalProcessDoesRendering(
this->GetUseDistributedRenderingForStillRender()))
{
// tell representations to "deal with" the newly streamed piece on the
// rendering nodes.
this->CallProcessViewRequest(vtkPVRenderView::REQUEST_PROCESS_STREAMED_PIECE(),
this->RequestInformation, this->ReplyInformationVector);
}
this->Internals->DeliveryManager->ClearStreamedPieces();
// ^--- the most dubious part of this code.
vtkTimerLog::MarkEndEvent("vtkPVRenderView::DeliverStreamedPieces");
return ret_val;
}
//----------------------------------------------------------------------------
......
......@@ -205,7 +205,7 @@ public:
// Description:
void StreamingUpdate(const double view_planes[24]);
bool DeliverStreamedPieces();
void DeliverStreamedPieces(unsigned int size, unsigned int *representation_ids);
// Description:
// USE_LOD indicates if LOD is being used for the current render/update.
......
/*=========================================================================
Program: ParaView
Module: $RCSfile$
Copyright (c) Kitware, Inc.
All rights reserved.
See Copyright.txt or http://www.paraview.org/HTML/Copyright.html for details.
This software is distributed WITHOUT ANY WARRANTY; without even
the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
PURPOSE. See the above copyright notice for more information.
=========================================================================*/
#include "vtkPVStreamingPiecesInformation.h"
#include "vtkClientServerStream.h"
#include "vtkObjectFactory.h"
#include "vtkPVDataDeliveryManager.h"
#include "vtkPVRenderView.h"
#include <set>
#include <vector>
#include <algorithm>
class vtkPVStreamingPiecesInformation::vtkInternals :
public std::set<unsigned int>
{
};
vtkStandardNewMacro(vtkPVStreamingPiecesInformation);
//----------------------------------------------------------------------------
vtkPVStreamingPiecesInformation::vtkPVStreamingPiecesInformation()
: Internals (new vtkInternals())
{
}
//----------------------------------------------------------------------------
vtkPVStreamingPiecesInformation::~vtkPVStreamingPiecesInformation()
{
delete this->Internals;
this->Internals = NULL;
}
//----------------------------------------------------------------------------
void vtkPVStreamingPiecesInformation::CopyFromObject(vtkObject* object)
{
vtkPVRenderView* view = vtkPVRenderView::SafeDownCast(object);
if (!view)
{
vtkErrorMacro("Incorrect object: " <<
(object? object->GetClassName() : "(null)"));
return;
}
this->Internals->clear();
vtkPVDataDeliveryManager* mgr = view->GetDeliveryManager();
std::vector<unsigned int> keys;
mgr->GetRepresentationsReadyToStreamPieces(keys);
this->Internals->insert(keys.begin(), keys.end());
}
//----------------------------------------------------------------------------
void vtkPVStreamingPiecesInformation::AddInformation(vtkPVInformation* info)
{
vtkPVStreamingPiecesInformation* other =
vtkPVStreamingPiecesInformation::SafeDownCast(info);
if (!other)
{
return;
}
this->Internals->insert(
other->Internals->begin(), other->Internals->end());
}
//----------------------------------------------------------------------------
void vtkPVStreamingPiecesInformation::CopyToStream(vtkClientServerStream* css)
{
css->Reset();
*css << vtkClientServerStream::Reply
<< static_cast<int>(this->Internals->size());
for (vtkInternals::iterator iter = this->Internals->begin();
iter != this->Internals->end(); ++iter)
{
*css << (*iter);
}
*css << vtkClientServerStream::End;
}
//----------------------------------------------------------------------------
void vtkPVStreamingPiecesInformation::CopyFromStream(
const vtkClientServerStream* css)
{
this->Internals->clear();
int num_items=0;
css->GetArgument(0, 0, &num_items);
for (int cc=0; cc < num_items; cc++)
{
unsigned int value;
css->GetArgument(0, 1 + cc, &value);
this->Internals->insert(value);
}
}
//----------------------------------------------------------------------------
void vtkPVStreamingPiecesInformation::GetKeys(
std::vector<unsigned int>& keys) const
{
keys.insert(keys.end(), this->Internals->begin(), this->Internals->end());
}
//----------------------------------------------------------------------------
void vtkPVStreamingPiecesInformation::PrintSelf(ostream& os, vtkIndent indent)
{
this->Superclass::PrintSelf(os, indent);
os << indent << "Keys (" << this->Internals->size() << "): " << endl;
for (vtkInternals::iterator iter = this->Internals->begin();
iter != this->Internals->end(); ++iter)
{
os << indent.GetNextIndent() << (*iter) << endl;
}
}
/*=========================================================================
Program: ParaView
Module: $RCSfile$
Copyright (c) Kitware, Inc.
All rights reserved.
See Copyright.txt or http://www.paraview.org/HTML/Copyright.html for details.
This software is distributed WITHOUT ANY WARRANTY; without even
the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
PURPOSE. See the above copyright notice for more information.
=========================================================================*/
// .NAME vtkPVStreamingPiecesInformation - information object used by
// vtkSMDataDeliveryManager to get information about representations that have
// pieces to stream from the data-server.
// .SECTION Description
// vtkPVStreamingPiecesInformation is an information object used by
// vtkSMDataDeliveryManager to get information about representations that have
// pieces to stream from the data-server.
#ifndef __vtkPVStreamingPiecesInformation_h
#define __vtkPVStreamingPiecesInformation_h
#include "vtkPVInformation.h"
#include "vtkPVClientServerCoreRenderingModule.h"
//BTX
#include <vector>
//ETX
class VTKPVCLIENTSERVERCORERENDERING_EXPORT vtkPVStreamingPiecesInformation : public vtkPVInformation
{
public:
static vtkPVStreamingPiecesInformation* New();
vtkTypeMacro(vtkPVStreamingPiecesInformation, vtkPVInformation);
void PrintSelf(ostream& os, vtkIndent indent);
// Description:
// Transfer information about a single object into this object.
virtual void CopyFromObject(vtkObject*);
// Description:
// Merge another information object. Calls AddInformation(info, 0).
virtual void AddInformation(vtkPVInformation* info);
// Description:
// Manage a serialized version of the information.
virtual void CopyToStream(vtkClientServerStream*);
virtual void CopyFromStream(const vtkClientServerStream*);
//BTX
// Description:
// API to access the internal keys.
void GetKeys(std::vector<unsigned int>& keys) const;
//ETX
//BTX
protected:
vtkPVStreamingPiecesInformation();
~vtkPVStreamingPiecesInformation();
private:
vtkPVStreamingPiecesInformation(const vtkPVStreamingPiecesInformation&); // Not implemented
void operator=(const vtkPVStreamingPiecesInformation&); // Not implemented
class vtkInternals;
vtkInternals* Internals;
//ETX
};
#endif
......@@ -789,6 +789,28 @@ bool vtkSMProxy::GatherInformation(vtkPVInformation* information)
return false;
}
//---------------------------------------------------------------------------
bool vtkSMProxy::GatherInformation(
vtkPVInformation* information, vtkTypeUInt32 location)
{
assert(information);
vtkTypeUInt32 realLocation = (this->Location & location);
if (this->GetSession() && realLocation != 0)
{
// ensure that the proxy is created.
this->CreateVTKObjects();
return this->GetSession()->GatherInformation(
realLocation, information, this->GetGlobalID());
}
if ((this->Location != 0) && (realLocation == 0) && (location != 0))
{
vtkWarningMacro("GatherInformation was called with location "
"on which the proxy does not exist. Ignoring.");
}
return false;
}
//---------------------------------------------------------------------------
bool vtkSMProxy::WarnIfDeprecated()
{
......
......@@ -391,6 +391,7 @@ public:
// On success, the \c information object is filled up with details about the
// VTK object.
bool GatherInformation(vtkPVInformation* information);
bool GatherInformation(vtkPVInformation* information, vtkTypeUInt32 location);
// Description:
// Saves the state of the proxy. This state can be reloaded
......
......@@ -16,9 +16,11 @@
#include "vtkCamera.h"
#include "vtkClientServerStream.h"
#include "vtkNew.h"
#include "vtkObjectFactory.h"
#include "vtkPVDataDeliveryManager.h"
#include "vtkPVRenderView.h"
#include "vtkPVStreamingPiecesInformation.h"
#include "vtkRenderer.h"
#include "vtkSMSession.h"
#include "vtkSMViewProxy.h"
......@@ -109,38 +111,43 @@ void vtkSMDataDeliveryManager::Deliver(bool interactive)
bool vtkSMDataDeliveryManager::DeliverStreamedPieces()
{
// Deliver() relies on the client telling the server about the representation
// that need data from the server-side. This is done primarily for the
// multi-clients (aka collaboration) mode since the clients are in a better
// position to know the state of the representations.
// Streaming is not supported in multi-clients mode. Also, when streaming, the
// server has more up-to-date information about whether something was
// streamed.
// that need data from the server-side. Deliver() can reliably determine that
// since representation objects on client side are indeed updated correctly
// when data changes (that's to the update-suppressing behaviour of
// vtkPVDataRepresentation subclasses).
// However for streaming, only the server-side representations update, hence
// client has no information about representations that have pieces to stream.
// Hence we do the following:
// 1. Ask data-server information about what representations have some pieces
// reader.
// 2. Request streamed pieces for those representations.
vtkTimerLog::MarkStartEvent(
"vtkSMDataDeliveryManager: Deliver Geometry (streaming)");
vtkNew<vtkPVStreamingPiecesInformation> info;
this->ViewProxy->GatherInformation(info.GetPointer(), vtkPVSession::DATA_SERVER);
vtkSMSession* session = this->ViewProxy->GetSession();
vtkClientServerStream stream;
stream << vtkClientServerStream::Invoke
<< VTKOBJECT(this->ViewProxy)
<< "DeliverStreamedPieces"
<< vtkClientServerStream::End;
session->ExecuteStream(this->ViewProxy->GetLocation(), stream, false);
// get the status from the local process whether something was delivered.
const vtkClientServerStream& result = session->GetLastResult(
vtkPVSession::DATA_SERVER_ROOT);
//result.Print(cout);
bool something_delivered = false;
result.GetArgument(0, 0, &something_delivered);
std::vector<unsigned int> keys_to_deliver;
info->GetKeys(keys_to_deliver);
if (keys_to_deliver.size() != 0)
{
vtkSMSession* session = this->ViewProxy->GetSession();
vtkClientServerStream stream;
stream << vtkClientServerStream::Invoke
<< VTKOBJECT(this->ViewProxy)
<< "DeliverStreamedPieces"
<< static_cast<unsigned int>(keys_to_deliver.size())
<< vtkClientServerStream::InsertArray(
&keys_to_deliver[0], keys_to_deliver.size())
<< vtkClientServerStream::End;
session->ExecuteStream(this->ViewProxy->GetLocation(), stream, false);
}
vtkTimerLog::MarkEndEvent(
"vtkSMDataDeliveryManager: Deliver Geometry (streaming)");
return something_delivered;
return keys_to_deliver.size() > 0;
}
//----------------------------------------------------------------------------
......
......@@ -233,6 +233,8 @@ void vtkSMRenderViewProxy::UpdateLOD()
//-----------------------------------------------------------------------------
bool vtkSMRenderViewProxy::StreamingUpdate(bool render_if_needed)
{
// FIXME: add a check to not do anything when in multi-client mode. We don't
// support streaming in multi-client mode.
this->GetSession()->PrepareProgress();
vtkPVRenderView* view = vtkPVRenderView::SafeDownCast(this->GetClientSideObject());
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment