Commit 348b7cbf authored by David C. Lonie's avatar David C. Lonie

Add port information to DataDeliveryManager API.

parent cf261a61
......@@ -172,20 +172,24 @@ public:
vtkDataObject* GetStreamedPiece() { return this->StreamedPiece; }
};
typedef std::map<unsigned int, std::pair<vtkItem, vtkItem> > ItemsMapType;
// First is repr unique id, second is the input port.
typedef std::pair<unsigned int, int> ReprPortType;
typedef std::map<ReprPortType, std::pair<vtkItem, vtkItem> > ItemsMapType;
vtkItem* GetItem(unsigned int index, bool use_second)
vtkItem* GetItem(unsigned int index, bool use_second, int port = 0)
{
if (this->ItemsMap.find(index) != this->ItemsMap.end())
ReprPortType key(index, port);
ItemsMapType::iterator items = this->ItemsMap.find(key);
if (items != this->ItemsMap.end())
{
return use_second ? &(this->ItemsMap[index].second) : &(this->ItemsMap[index].first);
return use_second ? &(items->second.second) : &(items->second.first);
}
return NULL;
}
vtkItem* GetItem(vtkPVDataRepresentation* repr, bool use_second)
vtkItem* GetItem(vtkPVDataRepresentation* repr, bool use_second, int port = 0)
{
return this->GetItem(repr->GetUniqueIdentifier(), use_second);
return this->GetItem(repr->GetUniqueIdentifier(), use_second, port);
}
unsigned long GetVisibleDataSize(bool use_second_if_available)
......@@ -248,19 +252,32 @@ void vtkPVDataDeliveryManager::RegisterRepresentation(vtkPVDataRepresentation* r
{
assert("A representation must have a valid UniqueIdentifier" && repr->GetUniqueIdentifier());
vtkInternals::vtkItem item;
item.Representation = repr;
this->Internals->ItemsMap[repr->GetUniqueIdentifier()].first = item;
int numPorts = repr->GetNumberOfInputPorts();
vtkInternals::vtkItem item2;
item2.Representation = repr;
this->Internals->ItemsMap[repr->GetUniqueIdentifier()].second = item2;
for (int port = 0; port < numPorts; ++port)
{
vtkInternals::ReprPortType key(repr->GetUniqueIdentifier(), port);
vtkInternals::vtkItem item;
item.Representation = repr;
this->Internals->ItemsMap[key].first = item;
vtkInternals::vtkItem item2;
item2.Representation = repr;
this->Internals->ItemsMap[key].second = item2;
}
}
//----------------------------------------------------------------------------
void vtkPVDataDeliveryManager::UnRegisterRepresentation(vtkPVDataRepresentation* repr)
{
this->Internals->ItemsMap.erase(repr->GetUniqueIdentifier());
int numPorts = repr->GetNumberOfInputPorts();
for (int port = 0; port < numPorts; ++port)
{
vtkInternals::ReprPortType key(repr->GetUniqueIdentifier(), port);
this->Internals->ItemsMap.erase(key);
}
}
//----------------------------------------------------------------------------
......@@ -272,9 +289,9 @@ vtkPVDataRepresentation* vtkPVDataDeliveryManager::GetRepresentation(unsigned in
//----------------------------------------------------------------------------
void vtkPVDataDeliveryManager::SetDeliverToAllProcesses(
vtkPVDataRepresentation* repr, bool mode, bool low_res)
vtkPVDataRepresentation* repr, bool mode, bool low_res, int port)
{
vtkInternals::vtkItem* item = this->Internals->GetItem(repr, low_res);
vtkInternals::vtkItem* item = this->Internals->GetItem(repr, low_res, port);
if (item)
{
item->CloneDataToAllNodes = mode;
......@@ -287,9 +304,10 @@ void vtkPVDataDeliveryManager::SetDeliverToAllProcesses(
//----------------------------------------------------------------------------
void vtkPVDataDeliveryManager::SetDeliverToClientAndRenderingProcesses(
vtkPVDataRepresentation* repr, bool deliver_to_client, bool gather_before_delivery, bool low_res)
vtkPVDataRepresentation* repr, bool deliver_to_client, bool gather_before_delivery, bool low_res,
int port)
{
vtkInternals::vtkItem* item = this->Internals->GetItem(repr, low_res);
vtkInternals::vtkItem* item = this->Internals->GetItem(repr, low_res, port);
if (item)
{
item->DeliverToClientAndRenderingProcesses = deliver_to_client;
......@@ -303,10 +321,10 @@ void vtkPVDataDeliveryManager::SetDeliverToClientAndRenderingProcesses(
//----------------------------------------------------------------------------
void vtkPVDataDeliveryManager::MarkAsRedistributable(
vtkPVDataRepresentation* repr, bool value /*=true*/)
vtkPVDataRepresentation* repr, bool value /*=true*/, int port)
{
vtkInternals::vtkItem* item = this->Internals->GetItem(repr, false);
vtkInternals::vtkItem* low_item = this->Internals->GetItem(repr, true);
vtkInternals::vtkItem* item = this->Internals->GetItem(repr, false, port);
vtkInternals::vtkItem* low_item = this->Internals->GetItem(repr, true, port);
if (item)
{
item->Redistributable = value;
......@@ -319,10 +337,10 @@ void vtkPVDataDeliveryManager::MarkAsRedistributable(
}
//----------------------------------------------------------------------------
void vtkPVDataDeliveryManager::SetStreamable(vtkPVDataRepresentation* repr, bool val)
void vtkPVDataDeliveryManager::SetStreamable(vtkPVDataRepresentation* repr, bool val, int port)
{
vtkInternals::vtkItem* item = this->Internals->GetItem(repr, false);
vtkInternals::vtkItem* low_item = this->Internals->GetItem(repr, true);
vtkInternals::vtkItem* item = this->Internals->GetItem(repr, false, port);
vtkInternals::vtkItem* low_item = this->Internals->GetItem(repr, true, port);
if (item)
{
item->Streamable = val;
......@@ -335,10 +353,10 @@ void vtkPVDataDeliveryManager::SetStreamable(vtkPVDataRepresentation* repr, bool
}
//----------------------------------------------------------------------------
void vtkPVDataDeliveryManager::SetPiece(
vtkPVDataRepresentation* repr, vtkDataObject* data, bool low_res, unsigned long trueSize)
void vtkPVDataDeliveryManager::SetPiece(vtkPVDataRepresentation* repr, vtkDataObject* data,
bool low_res, unsigned long trueSize, int port)
{
vtkInternals::vtkItem* item = this->Internals->GetItem(repr, low_res);
vtkInternals::vtkItem* item = this->Internals->GetItem(repr, low_res, port);
if (item)
{
unsigned long data_time = 0;
......@@ -363,9 +381,9 @@ void vtkPVDataDeliveryManager::SetPiece(
//----------------------------------------------------------------------------
vtkAlgorithmOutput* vtkPVDataDeliveryManager::GetProducer(
vtkPVDataRepresentation* repr, bool low_res)
vtkPVDataRepresentation* repr, bool low_res, int port)
{
vtkInternals::vtkItem* item = this->Internals->GetItem(repr, low_res);
vtkInternals::vtkItem* item = this->Internals->GetItem(repr, low_res, port);
if (!item)
{
vtkErrorMacro("Invalid arguments.");
......@@ -376,9 +394,10 @@ vtkAlgorithmOutput* vtkPVDataDeliveryManager::GetProducer(
}
//----------------------------------------------------------------------------
void vtkPVDataDeliveryManager::SetPiece(unsigned int id, vtkDataObject* data, bool low_res)
void vtkPVDataDeliveryManager::SetPiece(
unsigned int id, vtkDataObject* data, bool low_res, int port)
{
vtkInternals::vtkItem* item = this->Internals->GetItem(id, low_res);
vtkInternals::vtkItem* item = this->Internals->GetItem(id, low_res, port);
if (item)
{
item->SetDataObject(data);
......@@ -392,9 +411,9 @@ void vtkPVDataDeliveryManager::SetPiece(unsigned int id, vtkDataObject* data, bo
//----------------------------------------------------------------------------
void vtkPVDataDeliveryManager::SetOrderedCompositingInformation(vtkPVDataRepresentation* repr,
vtkExtentTranslator* translator, const int whole_extents[6], const double origin[3],
const double spacing[3])
const double spacing[3], int port)
{
vtkInternals::vtkItem* item = this->Internals->GetItem(repr, false);
vtkInternals::vtkItem* item = this->Internals->GetItem(repr, false, port);
if (item)
{
vtkInternals::vtkOrderedCompositingInfo info;
......@@ -412,9 +431,9 @@ void vtkPVDataDeliveryManager::SetOrderedCompositingInformation(vtkPVDataReprese
}
//----------------------------------------------------------------------------
vtkAlgorithmOutput* vtkPVDataDeliveryManager::GetProducer(unsigned int id, bool low_res)
vtkAlgorithmOutput* vtkPVDataDeliveryManager::GetProducer(unsigned int id, bool low_res, int port)
{
vtkInternals::vtkItem* item = this->Internals->GetItem(id, low_res);
vtkInternals::vtkItem* item = this->Internals->GetItem(id, low_res, port);
if (!item)
{
vtkErrorMacro("Invalid arguments.");
......@@ -435,7 +454,7 @@ bool vtkPVDataDeliveryManager::NeedsDelivery(
if (item.Representation && item.Representation->GetVisibility() &&
item.GetTimeStamp() > timestamp)
{
keys_to_deliver.push_back(iter->first);
keys_to_deliver.push_back(iter->first.first);
}
}
return keys_to_deliver.size() > 0;
......@@ -466,52 +485,66 @@ void vtkPVDataDeliveryManager::Deliver(int use_lod, unsigned int size, unsigned
for (unsigned int cc = 0; cc < size; cc++)
{
vtkInternals::vtkItem* item = this->Internals->GetItem(values[cc], use_lod != 0);
// Get the repr so we can loop over input ports:
// TODO Both this method and NeedsDelivery should be updated to use
// port information so we don't copy unneccesarily.
vtkPVDataRepresentation* repr = this->GetRepresentation(values[cc]);
int numPorts = repr->GetNumberOfInputPorts();
vtkDataObject* data = item->GetDataObject();
for (int port = 0; port < numPorts; ++port)
{
vtkInternals::vtkItem* item = this->Internals->GetItem(values[cc], use_lod != 0, port);
// if (data != NULL && data->IsA("vtkUniformGridAMR"))
// {
// // we are dealing with AMR datasets.
// // We assume for now we're not running in render-server mode. We can
// // ensure that at some point in future.
// // So we are either in pass-through or collect mode.
vtkDataObject* data = item ? item->GetDataObject() : NULL;
// // FIXME: check that the mode flags are "suitable" for AMR.
// }
if (!data)
{
continue;
}
vtkNew<vtkMPIMoveData> dataMover;
dataMover->InitializeForCommunicationForParaView();
dataMover->SetOutputDataType(data ? data->GetDataObjectType() : VTK_POLY_DATA);
dataMover->SetMoveMode(mode);
if (item->CloneDataToAllNodes)
{
dataMover->SetMoveModeToClone();
}
else if (item->DeliverToClientAndRenderingProcesses)
{
if (mode == vtkMPIMoveData::PASS_THROUGH)
// if (data != NULL && data->IsA("vtkUniformGridAMR"))
// {
// // we are dealing with AMR datasets.
// // We assume for now we're not running in render-server mode. We can
// // ensure that at some point in future.
// // So we are either in pass-through or collect mode.
// // FIXME: check that the mode flags are "suitable" for AMR.
// }
vtkNew<vtkMPIMoveData> dataMover;
dataMover->InitializeForCommunicationForParaView();
dataMover->SetOutputDataType(data ? data->GetDataObjectType() : VTK_POLY_DATA);
dataMover->SetMoveMode(mode);
if (item->CloneDataToAllNodes)
{
dataMover->SetMoveMode(vtkMPIMoveData::COLLECT_AND_PASS_THROUGH);
dataMover->SetMoveModeToClone();
}
else
else if (item->DeliverToClientAndRenderingProcesses)
{
// nothing to do, since the data is going to be delivered to the client
// anyways.
if (mode == vtkMPIMoveData::PASS_THROUGH)
{
dataMover->SetMoveMode(vtkMPIMoveData::COLLECT_AND_PASS_THROUGH);
}
else
{
// nothing to do, since the data is going to be delivered to the client
// anyways.
}
dataMover->SetSkipDataServerGatherToZero(item->GatherBeforeDeliveringToClient == false);
}
dataMover->SetSkipDataServerGatherToZero(item->GatherBeforeDeliveringToClient == false);
}
dataMover->SetInputData(data);
dataMover->SetInputData(data);
if (dataMover->GetOutputGeneratedOnProcess())
{
// release old memory (not necessarily, but try).
item->SetDeliveredDataObject(NULL);
}
dataMover->Update();
if (item->GetDeliveredDataObject() == NULL)
{
item->SetDeliveredDataObject(dataMover->GetOutputDataObject(0));
if (dataMover->GetOutputGeneratedOnProcess())
{
// release old memory (not necessarily, but try).
item->SetDeliveredDataObject(NULL);
}
dataMover->Update();
if (item->GetDeliveredDataObject() == NULL)
{
item->SetDeliveredDataObject(dataMover->GetOutputDataObject(0));
}
}
}
......@@ -634,9 +667,10 @@ vtkPKdTree* vtkPVDataDeliveryManager::GetKdTree()
//----------------------------------------------------------------------------
void vtkPVDataDeliveryManager::SetNextStreamedPiece(
vtkPVDataRepresentation* repr, vtkDataObject* data)
vtkPVDataRepresentation* repr, vtkDataObject* data, int port)
{
vtkInternals::vtkItem* item = this->Internals->GetItem(repr, /*low_res=*/false);
vtkInternals::vtkItem* item = this->Internals->GetItem(repr,
/*low_res=*/false, port);
if (item == NULL)
{
vtkErrorMacro("Invalid argument.");
......@@ -649,9 +683,11 @@ void vtkPVDataDeliveryManager::SetNextStreamedPiece(
}
//----------------------------------------------------------------------------
vtkDataObject* vtkPVDataDeliveryManager::GetCurrentStreamedPiece(vtkPVDataRepresentation* repr)
vtkDataObject* vtkPVDataDeliveryManager::GetCurrentStreamedPiece(
vtkPVDataRepresentation* repr, int port)
{
vtkInternals::vtkItem* item = this->Internals->GetItem(repr, /*low_res=*/false);
vtkInternals::vtkItem* item = this->Internals->GetItem(repr,
/*low_res=*/false, port);
if (item == NULL)
{
vtkErrorMacro("Invalid argument.");
......@@ -686,7 +722,7 @@ bool vtkPVDataDeliveryManager::GetRepresentationsReadyToStreamPieces(
if (item.Representation && item.Representation->GetVisibility() && item.Streamable &&
item.GetStreamedPiece())
{
keys.push_back(iter->first);
keys.push_back(iter->first.first);
}
}
return (keys.size() > 0);
......@@ -704,28 +740,37 @@ void vtkPVDataDeliveryManager::DeliverStreamedPieces(unsigned int size, unsigned
for (unsigned int cc = 0; cc < size; cc++)
{
vtkInternals::vtkItem* item = this->Internals->GetItem(values[cc], false);
// FIXME: we need information about the datatype on all processes. For now
// we assume that the data type is same as the full-data (which is not
// really necessary). We can API to allow representations to be able to
// specify the data type.
vtkDataObject* data = item->GetDataObject();
vtkDataObject* piece = item->GetStreamedPiece();
// Get the repr so we can loop over input ports:
// TODO Both this method and GetRepresentationsReadyToStreamPieces should be
// updated to use port information so we don't copy unneccesarily.
vtkPVDataRepresentation* repr = this->GetRepresentation(values[cc]);
int numPorts = repr->GetNumberOfInputPorts();
vtkNew<vtkMPIMoveData> dataMover;
dataMover->InitializeForCommunicationForParaView();
dataMover->SetOutputDataType(data->GetDataObjectType());
dataMover->SetMoveMode(mode);
if (item->CloneDataToAllNodes)
{
dataMover->SetMoveModeToClone();
}
dataMover->SetInputData(piece);
dataMover->Update();
if (dataMover->GetOutputGeneratedOnProcess())
for (int port = 0; port < numPorts; ++port)
{
item->SetNextStreamedPiece(dataMover->GetOutputDataObject(0));
vtkInternals::vtkItem* item = this->Internals->GetItem(values[cc], false, port);
// FIXME: we need information about the datatype on all processes. For now
// we assume that the data type is same as the full-data (which is not
// really necessary). We can API to allow representations to be able to
// specify the data type.
vtkDataObject* data = item->GetDataObject();
vtkDataObject* piece = item->GetStreamedPiece();
vtkNew<vtkMPIMoveData> dataMover;
dataMover->InitializeForCommunicationForParaView();
dataMover->SetOutputDataType(data->GetDataObjectType());
dataMover->SetMoveMode(mode);
if (item->CloneDataToAllNodes)
{
dataMover->SetMoveModeToClone();
}
dataMover->SetInputData(piece);
dataMover->Update();
if (dataMover->GetOutputGeneratedOnProcess())
{
item->SetNextStreamedPiece(dataMover->GetOutputDataObject(0));
}
}
}
}
......@@ -745,7 +790,7 @@ int vtkPVDataDeliveryManager::GetSynchronizationMagicNumber()
vtkInternals::ItemsMapType::iterator iter = this->Internals->ItemsMap.begin();
for (; iter != this->Internals->ItemsMap.end(); iter++)
{
result = prime * result + static_cast<int>(iter->first);
result = prime * result + static_cast<int>(iter->first.first);
}
return result;
......
......@@ -73,9 +73,9 @@ public:
* representation that requires delivering of any geometry must register with
* the vtkPVDataDeliveryManager and never manage the delivery on its own.
*/
void SetPiece(
vtkPVDataRepresentation* repr, vtkDataObject* data, bool low_res, unsigned long trueSize = 0);
void SetPiece(unsigned int repr_id, vtkDataObject* data, bool low_res);
void SetPiece(vtkPVDataRepresentation* repr, vtkDataObject* data, bool low_res,
unsigned long trueSize = 0, int port = 0);
void SetPiece(unsigned int repr_id, vtkDataObject* data, bool low_res, int port = 0);
//@}
//@{
......@@ -85,8 +85,8 @@ public:
* vtkPVRenderView::GetPieceProducer() and GetPieceProducerLOD()) to obtain
* the geometry producer for the geometry to be rendered.
*/
vtkAlgorithmOutput* GetProducer(vtkPVDataRepresentation*, bool low_res);
vtkAlgorithmOutput* GetProducer(unsigned int, bool low_res);
vtkAlgorithmOutput* GetProducer(vtkPVDataRepresentation*, bool low_res, int port = 0);
vtkAlgorithmOutput* GetProducer(unsigned int, bool low_res, int port = 0);
//@}
/**
......@@ -96,7 +96,7 @@ public:
* geometry to be delivered to all nodes always. That can be done by using
* this method (via vtkPVRenderView::SetDeliverToAllProcesses()).
*/
void SetDeliverToAllProcesses(vtkPVDataRepresentation*, bool flag, bool low_res);
void SetDeliverToAllProcesses(vtkPVDataRepresentation*, bool flag, bool low_res, int port = 0);
/**
* By default, this class only delivers geometries to nodes that are doing the
......@@ -108,8 +108,8 @@ public:
* the data on the server nodes, while the latter will optionally gather the data to
* deliver to the client and never scatter.
*/
void SetDeliverToClientAndRenderingProcesses(
vtkPVDataRepresentation*, bool deliver_to_client, bool gather_before_delivery, bool low_res);
void SetDeliverToClientAndRenderingProcesses(vtkPVDataRepresentation*, bool deliver_to_client,
bool gather_before_delivery, bool low_res, int port = 0);
/**
* Under certain cases, e.g. when remote rendering in parallel with
......@@ -119,7 +119,7 @@ public:
* redistribute the geometry as needed. Only vtkPolyData, vtkUnstructuredGrid
* or a multi-block comprising of vtkPolyData is currently supported.
*/
void MarkAsRedistributable(vtkPVDataRepresentation*, bool value = true);
void MarkAsRedistributable(vtkPVDataRepresentation*, bool value = true, int port = 0);
/**
* Returns the size for all visible geometry. If low_res is true, and low-res
......@@ -162,7 +162,7 @@ public:
*/
void SetOrderedCompositingInformation(vtkPVDataRepresentation* repr,
vtkExtentTranslator* translator, const int whole_extents[6], const double origin[3],
const double spacing[3]);
const double spacing[3], int port = 0);
/**
* Internal method used to determine the list of representations that need
......@@ -187,15 +187,15 @@ public:
* it is streamable i.e. the view can call streaming passses on it and it will
* deliver data incrementally.
*/
void SetStreamable(vtkPVDataRepresentation*, bool);
void SetStreamable(vtkPVDataRepresentation*, bool, int port = 0);
//@{
/**
* Passes the current streamed piece. This is the piece that will be delivered
* to the rendering node.
*/
void SetNextStreamedPiece(vtkPVDataRepresentation* repr, vtkDataObject* piece);
vtkDataObject* GetCurrentStreamedPiece(vtkPVDataRepresentation* repr);
void SetNextStreamedPiece(vtkPVDataRepresentation* repr, vtkDataObject* piece, int port = 0);
vtkDataObject* GetCurrentStreamedPiece(vtkPVDataRepresentation* repr, int port = 0);
void ClearStreamedPieces();
//@}
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment