Skip to content
Snippets Groups Projects
Commit bdd1d24f authored by Allison Vacanti's avatar Allison Vacanti Committed by Kitware Robot
Browse files

Merge topic 'pv17355-vtm-interleave-blocks'


d7a2f804 Add test for piece distribution options in XML MBDS reader.
0af838f8 Allow some control over how vtm files are distributed.

Acked-by: default avatarKitware Robot <kwrobot@kitware.com>
Acked-by: default avatarUtkarsh Ayachit <utkarsh.ayachit@kitware.com>
Merge-request: !3487
parents 2a60661f d7a2f804
No related branches found
No related tags found
No related merge requests found
Showing
with 369 additions and 43 deletions
...@@ -13,6 +13,10 @@ vtk_add_test_cxx(${vtk-module}CxxTests tests ...@@ -13,6 +13,10 @@ vtk_add_test_cxx(${vtk-module}CxxTests tests
) )
# Each of these most be added in a separate vtk_add_test_cxx # Each of these most be added in a separate vtk_add_test_cxx
vtk_add_test_cxx(${vtk-module}CxxTests tests
TestXMLCompositeDataReaderDistribution.cxx,NO_VALID,NO_OUTPUT
"DATA{${VTK_TEST_INPUT_DIR}/distTest.vtm,REGEX:distTest_[0-9]_0.vtp}"
)
vtk_add_test_cxx(${vtk-module}CxxTests tests vtk_add_test_cxx(${vtk-module}CxxTests tests
TestXMLReaderBadImageData,TestXMLReaderBadData.cxx,NO_VALID,NO_OUTPUT "DATA{${VTK_TEST_INPUT_DIR}/badImageData.xml}" TestXMLReaderBadImageData,TestXMLReaderBadData.cxx,NO_VALID,NO_OUTPUT "DATA{${VTK_TEST_INPUT_DIR}/badImageData.xml}"
) )
......
/*=========================================================================
Program: Visualization Toolkit
Module: TestXMLCompositeDataReaderDistribution.cxx
Copyright (c) Ken Martin, Will Schroeder, Bill Lorensen
All rights reserved.
See Copyright.txt or http://www.kitware.com/Copyright.htm for details.
This software is distributed WITHOUT ANY WARRANTY; without even
the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
PURPOSE. See the above copyright notice for more information.
=========================================================================*/
#include "vtkXMLCompositeDataReader.h"
#include "vtkCompositeDataIterator.h"
#include "vtkCompositeDataPipeline.h"
#include "vtkCompositeDataSet.h"
#include "vtkInformation.h"
#include "vtkXMLMultiBlockDataReader.h"
#include "vtkPolyData.h"
#include "vtkNew.h"
#include <iostream>
#include <set>
#include <sstream>
#define TEST_ASSERT(cond, message) \
do { \
if (!(cond)) \
{ \
std::cerr << "Failure at line " << __LINE__ << ":\n" \
<< "\tCondition: " << #cond << "\n" \
<< "\tError: " << message << "\n"; \
return EXIT_FAILURE; \
} \
} while (0) /* do-while swallows semicolons */
namespace
{
// Returns a multiset containing the number of points in each leaf dataset.
std::multiset<vtkIdType> PointCounts(vtkCompositeDataSet *cds)
{
std::multiset<vtkIdType> result;
using SmartIterator = vtkSmartPointer<vtkCompositeDataIterator>;
auto it = SmartIterator::Take(cds->NewIterator());
it->SkipEmptyNodesOn();
for (it->InitTraversal(); !it->IsDoneWithTraversal(); it->GoToNextItem())
{
vtkPolyData *pd = vtkPolyData::SafeDownCast(it->GetCurrentDataObject());
if (pd)
{
result.insert(pd->GetNumberOfPoints());
}
}
return result;
}
bool VerifyCounts(vtkCompositeDataSet *cds,
const std::multiset<vtkIdType> &expected)
{
return PointCounts(cds) == expected;
}
std::string DumpCounts(vtkCompositeDataSet *cds)
{
const std::multiset<vtkIdType> ids = PointCounts(cds);
std::ostringstream out;
out << "{ ";
for (auto id : ids)
{
out << id << " ";
}
out << "}";
return out.str();
}
}
int TestXMLCompositeDataReaderDistribution(int argc, char* argv[])
{
if (argc < 2)
{
std::cerr << "Missing argument.\n";
return EXIT_FAILURE;
}
vtkNew<vtkXMLMultiBlockDataReader> in;
in->SetFileName(argv[1]);
// Verify that the dataset is what we expect: 10 leaves, each with a unique
// number of points spanning 1-10 inclusive.
{
in->Update();
TEST_ASSERT(VerifyCounts(in->GetOutput(), {1, 2, 3, 4, 5, 6, 7, 8, 9, 10}),
"Incorrect partitioning: " << DumpCounts(in->GetOutput()));
}
// Simulate a request for a partitioning across 3 processors:
vtkNew<vtkInformation> req;
req->Set(vtkCompositeDataPipeline::UPDATE_NUMBER_OF_PIECES(), 3);
// Verify that block loading works as expected.
{
in->SetPieceDistribution(vtkXMLMultiBlockDataReader::Block);
// First processor:
req->Set(vtkCompositeDataPipeline::UPDATE_PIECE_NUMBER(), 0);
in->Update(req);
TEST_ASSERT(VerifyCounts(in->GetOutput(), {1, 2, 3, 4}),
"Incorrect partitioning: " << DumpCounts(in->GetOutput()));
// Second processor:
req->Set(vtkCompositeDataPipeline::UPDATE_PIECE_NUMBER(), 1);
in->Update(req);
TEST_ASSERT(VerifyCounts(in->GetOutput(), {5, 6, 7}),
"Incorrect partitioning: " << DumpCounts(in->GetOutput()));
// Third processor:
req->Set(vtkCompositeDataPipeline::UPDATE_PIECE_NUMBER(), 2);
in->Update(req);
TEST_ASSERT(VerifyCounts(in->GetOutput(), {8, 9, 10}),
"Incorrect partitioning: " << DumpCounts(in->GetOutput()));
}
// Verify that interleaved loading works as expected.
{
in->SetPieceDistribution(vtkXMLMultiBlockDataReader::Interleave);
// First processor:
req->Set(vtkCompositeDataPipeline::UPDATE_PIECE_NUMBER(), 0);
in->Update(req);
TEST_ASSERT(VerifyCounts(in->GetOutput(), {1, 4, 7, 10}),
"Incorrect partitioning: " << DumpCounts(in->GetOutput()));
// Second processor:
req->Set(vtkCompositeDataPipeline::UPDATE_PIECE_NUMBER(), 1);
in->Update(req);
TEST_ASSERT(VerifyCounts(in->GetOutput(), {2, 5, 8}),
"Incorrect partitioning: " << DumpCounts(in->GetOutput()));
// Third processor:
req->Set(vtkCompositeDataPipeline::UPDATE_PIECE_NUMBER(), 2);
in->Update(req);
TEST_ASSERT(VerifyCounts(in->GetOutput(), {3, 6, 9}),
"Incorrect partitioning: " << DumpCounts(in->GetOutput()));
}
// Add an update restriction to test that loading is balanced when datasets 2
// and 7 (point counts, not ids) are ignored.
req->Append(vtkCompositeDataPipeline::UPDATE_COMPOSITE_INDICES(), 0);
req->Append(vtkCompositeDataPipeline::UPDATE_COMPOSITE_INDICES(), 2);
req->Append(vtkCompositeDataPipeline::UPDATE_COMPOSITE_INDICES(), 3);
req->Append(vtkCompositeDataPipeline::UPDATE_COMPOSITE_INDICES(), 4);
req->Append(vtkCompositeDataPipeline::UPDATE_COMPOSITE_INDICES(), 5);
req->Append(vtkCompositeDataPipeline::UPDATE_COMPOSITE_INDICES(), 7);
req->Append(vtkCompositeDataPipeline::UPDATE_COMPOSITE_INDICES(), 8);
req->Append(vtkCompositeDataPipeline::UPDATE_COMPOSITE_INDICES(), 9);
// Verify that block loading works as expected.
{
in->SetPieceDistribution(vtkXMLMultiBlockDataReader::Block);
// First processor:
req->Set(vtkCompositeDataPipeline::UPDATE_PIECE_NUMBER(), 0);
in->Update(req);
TEST_ASSERT(VerifyCounts(in->GetOutput(), {1, 3, 4}),
"Incorrect partitioning: " << DumpCounts(in->GetOutput()));
// Second processor:
req->Set(vtkCompositeDataPipeline::UPDATE_PIECE_NUMBER(), 1);
in->Update(req);
TEST_ASSERT(VerifyCounts(in->GetOutput(), {5, 6, 8}),
"Incorrect partitioning: " << DumpCounts(in->GetOutput()));
// Third processor:
req->Set(vtkCompositeDataPipeline::UPDATE_PIECE_NUMBER(), 2);
in->Update(req);
TEST_ASSERT(VerifyCounts(in->GetOutput(), {9, 10}),
"Incorrect partitioning: " << DumpCounts(in->GetOutput()));
}
// Verify that interleaved loading works as expected.
{
in->SetPieceDistribution(vtkXMLMultiBlockDataReader::Interleave);
// First processor:
req->Set(vtkCompositeDataPipeline::UPDATE_PIECE_NUMBER(), 0);
in->Update(req);
TEST_ASSERT(VerifyCounts(in->GetOutput(), {1, 5, 9}),
"Incorrect partitioning: " << DumpCounts(in->GetOutput()));
// Second processor:
req->Set(vtkCompositeDataPipeline::UPDATE_PIECE_NUMBER(), 1);
in->Update(req);
TEST_ASSERT(VerifyCounts(in->GetOutput(), {3, 6, 10}),
"Incorrect partitioning: " << DumpCounts(in->GetOutput()));
// Third processor:
req->Set(vtkCompositeDataPipeline::UPDATE_PIECE_NUMBER(), 2);
in->Update(req);
TEST_ASSERT(VerifyCounts(in->GetOutput(), {4, 8}),
"Incorrect partitioning: " << DumpCounts(in->GetOutput()));
}
return EXIT_SUCCESS;
}
...@@ -59,12 +59,14 @@ struct vtkXMLCompositeDataReaderInternals ...@@ -59,12 +59,14 @@ struct vtkXMLCompositeDataReaderInternals
typedef std::map<std::string, vtkSmartPointer<vtkXMLReader> > ReadersType; typedef std::map<std::string, vtkSmartPointer<vtkXMLReader> > ReadersType;
ReadersType Readers; ReadersType Readers;
static const vtkXMLCompositeDataReaderEntry ReaderList[]; static const vtkXMLCompositeDataReaderEntry ReaderList[];
unsigned int MinDataset; unsigned int Piece;
unsigned int MaxDataset; unsigned int NumPieces;
unsigned int NumDataSets;
vtkXMLCompositeDataReaderInternals() vtkXMLCompositeDataReaderInternals()
{ {
this->MinDataset = 0; this->Piece = 0;
this->MaxDataset = 0; this->NumPieces = 1;
this->NumDataSets = 1;
} }
std::set<int> UpdateIndices; std::set<int> UpdateIndices;
bool HasUpdateRestriction; bool HasUpdateRestriction;
...@@ -72,6 +74,7 @@ struct vtkXMLCompositeDataReaderInternals ...@@ -72,6 +74,7 @@ struct vtkXMLCompositeDataReaderInternals
//---------------------------------------------------------------------------- //----------------------------------------------------------------------------
vtkXMLCompositeDataReader::vtkXMLCompositeDataReader() vtkXMLCompositeDataReader::vtkXMLCompositeDataReader()
: PieceDistribution(Block)
{ {
this->Internal = new vtkXMLCompositeDataReaderInternals; this->Internal = new vtkXMLCompositeDataReaderInternals;
} }
...@@ -85,6 +88,22 @@ vtkXMLCompositeDataReader::~vtkXMLCompositeDataReader() ...@@ -85,6 +88,22 @@ vtkXMLCompositeDataReader::~vtkXMLCompositeDataReader()
//---------------------------------------------------------------------------- //----------------------------------------------------------------------------
void vtkXMLCompositeDataReader::PrintSelf(ostream& os, vtkIndent indent) void vtkXMLCompositeDataReader::PrintSelf(ostream& os, vtkIndent indent)
{ {
os << indent << "PieceDistribution: ";
switch (this->PieceDistribution)
{
case Block:
os << "Block\n";
break;
case Interleave:
os << "Interleave\n";
break;
default:
os << "Invalid (!!)\n";
break;
}
this->Superclass::PrintSelf(os, indent); this->Superclass::PrintSelf(os, indent);
} }
...@@ -255,10 +274,11 @@ void vtkXMLCompositeDataReader::ReadXMLData() ...@@ -255,10 +274,11 @@ void vtkXMLCompositeDataReader::ReadXMLData()
{ {
vtkInformation* info = this->GetCurrentOutputInformation(); vtkInformation* info = this->GetCurrentOutputInformation();
unsigned int updatePiece = static_cast<unsigned int>( this->Internal->Piece = static_cast<unsigned int>(
info->Get(vtkStreamingDemandDrivenPipeline::UPDATE_PIECE_NUMBER())); info->Get(vtkStreamingDemandDrivenPipeline::UPDATE_PIECE_NUMBER()));
unsigned int updateNumPieces = static_cast<unsigned int>( this->Internal->NumPieces = static_cast<unsigned int>(
info->Get(vtkStreamingDemandDrivenPipeline::UPDATE_NUMBER_OF_PIECES())); info->Get(vtkStreamingDemandDrivenPipeline::UPDATE_NUMBER_OF_PIECES()));
this->Internal->NumDataSets = this->CountLeaves(this->GetPrimaryElement());
vtkDataObject* doOutput = vtkDataObject* doOutput =
info->Get(vtkDataObject::DATA_OBJECT()); info->Get(vtkDataObject::DATA_OBJECT());
...@@ -284,33 +304,6 @@ void vtkXMLCompositeDataReader::ReadXMLData() ...@@ -284,33 +304,6 @@ void vtkXMLCompositeDataReader::ReadXMLData()
filePath = ""; filePath = "";
} }
// In earlier implementation only dataset with a group were distributed among
// the processes. In this implementation, we distribute all leaf datasets
// among the processes.
// Determine the leaves that this process is going to read.
unsigned int numDatasets = this->CountLeaves(this->GetPrimaryElement());
unsigned int numDatasetsPerPiece = 1;
unsigned int remaining_blocks = 0;
if (updateNumPieces < numDatasets)
{
numDatasetsPerPiece = numDatasets / updateNumPieces;
remaining_blocks = numDatasets % updateNumPieces;
}
if (updatePiece < remaining_blocks)
{
this->Internal->MinDataset = (numDatasetsPerPiece+1)*updatePiece;
this->Internal->MaxDataset = this->Internal->MinDataset + numDatasetsPerPiece + 1;
}
else
{
this->Internal->MinDataset = (numDatasetsPerPiece +1)* remaining_blocks +
numDatasetsPerPiece * (updatePiece-remaining_blocks);
this->Internal->MaxDataset = this->Internal->MinDataset + numDatasetsPerPiece;
}
vtkInformation* outInfo = this->GetCurrentOutputInformation(); vtkInformation* outInfo = this->GetCurrentOutputInformation();
if (outInfo->Has(vtkCompositeDataPipeline::UPDATE_COMPOSITE_INDICES())) if (outInfo->Has(vtkCompositeDataPipeline::UPDATE_COMPOSITE_INDICES()))
{ {
...@@ -321,6 +314,10 @@ void vtkXMLCompositeDataReader::ReadXMLData() ...@@ -321,6 +314,10 @@ void vtkXMLCompositeDataReader::ReadXMLData()
{ {
int* idx = outInfo->Get(vtkCompositeDataPipeline::UPDATE_COMPOSITE_INDICES()); int* idx = outInfo->Get(vtkCompositeDataPipeline::UPDATE_COMPOSITE_INDICES());
this->Internal->UpdateIndices = std::set<int>(idx, idx+length); this->Internal->UpdateIndices = std::set<int>(idx, idx+length);
// Change the total number of datasets so that we'll properly load
// balance across the valid datasets.
this->Internal->NumDataSets = length;
} }
} }
else else
...@@ -335,22 +332,97 @@ void vtkXMLCompositeDataReader::ReadXMLData() ...@@ -335,22 +332,97 @@ void vtkXMLCompositeDataReader::ReadXMLData()
} }
//---------------------------------------------------------------------------- //----------------------------------------------------------------------------
int vtkXMLCompositeDataReader::ShouldReadDataSet(unsigned int dataSetIndex) int vtkXMLCompositeDataReader::ShouldReadDataSet(unsigned int idx)
{ {
bool shouldRead = // Apply the update restriction:
(dataSetIndex >= this->Internal->MinDataset && if (this->Internal->HasUpdateRestriction)
dataSetIndex < this->Internal->MaxDataset);
if (shouldRead && this->Internal->HasUpdateRestriction)
{ {
if (this->Internal->UpdateIndices.find(dataSetIndex) == auto iter = this->Internal->UpdateIndices.find(idx);
this->Internal->UpdateIndices.end()) if (iter == this->Internal->UpdateIndices.end())
{ {
shouldRead = false; return 0;
} }
// Update the dataset index to its position in the update indices:
idx = std::distance(this->Internal->UpdateIndices.begin(), iter);
}
int result = 0;
switch (this->PieceDistribution)
{
case vtkXMLCompositeDataReader::Block:
result = this->DataSetIsValidForBlockStrategy(idx) ? 1 : 0;
break;
case vtkXMLCompositeDataReader::Interleave:
result = this->DataSetIsValidForInterleaveStrategy(idx) ? 1 : 0;
break;
default:
vtkErrorMacro("Invalid PieceDistribution setting: "
<< this->PieceDistribution);
break;
}
return result;
}
//------------------------------------------------------------------------------
bool vtkXMLCompositeDataReader::DataSetIsValidForBlockStrategy(unsigned int idx)
{
// Minimum number of datasets per block:
unsigned int blockSize = 1;
// Number of blocks with an extra dataset due to overflow:
unsigned int overflowBlocks = 0;
// Adjust values if overflow is detected:
if (this->Internal->NumPieces < this->Internal->NumDataSets)
{
blockSize = this->Internal->NumDataSets / this->Internal->NumPieces;
overflowBlocks = this->Internal->NumDataSets % this->Internal->NumPieces;
}
// Size of an overflow block:
const unsigned int blockSizeOverflow = blockSize + 1;
unsigned int minDS; // Minimum valid dataset index
unsigned int maxDS; // Maximum valid dataset index
if (this->Internal->Piece < overflowBlocks)
{
minDS = blockSizeOverflow * this->Internal->Piece;
maxDS = minDS + blockSizeOverflow;
}
else
{
// Account for earlier blocks that have an overflowed dataset:
const unsigned int overflowOffset = blockSizeOverflow * overflowBlocks;
// Number of preceeding blocks that don't overflow:
const unsigned int regularBlocks = this->Internal->Piece - overflowBlocks;
// Offset due to regular blocks:
const unsigned int regularOffset = blockSize * regularBlocks;
minDS = overflowOffset + regularOffset;
maxDS = minDS + blockSize;
} }
return shouldRead;
return idx >= minDS && idx < maxDS;
} }
//------------------------------------------------------------------------------
bool vtkXMLCompositeDataReader::
DataSetIsValidForInterleaveStrategy(unsigned int idx)
{
// Use signed integers for the modulus -- otherwise weird things like
// (-1 % 3) == 0 will happen!
int i = static_cast<int>(idx);
int p = static_cast<int>(this->Internal->Piece);
int n = static_cast<int>(this->Internal->NumPieces);
return ((i - p) % n) == 0;
}
//---------------------------------------------------------------------------- //----------------------------------------------------------------------------
vtkDataObject* vtkXMLCompositeDataReader::ReadDataObject(vtkXMLDataElement* xmlElem, vtkDataObject* vtkXMLCompositeDataReader::ReadDataObject(vtkXMLDataElement* xmlElem,
const char* filePath) const char* filePath)
......
...@@ -44,6 +44,29 @@ public: ...@@ -44,6 +44,29 @@ public:
vtkTypeMacro(vtkXMLCompositeDataReader,vtkXMLReader); vtkTypeMacro(vtkXMLCompositeDataReader,vtkXMLReader);
void PrintSelf(ostream& os, vtkIndent indent) override; void PrintSelf(ostream& os, vtkIndent indent) override;
enum PieceDistributionStrategy
{
Block,
Interleave
};
/**
* Set the strategy for assigning files to parallel readers. The default is
* @a Block.
*
* Let @a X be the rank of a specific reader, and @a N be the number of
* reader, then:
* @arg @c Block Each processor is assigned a contiguous block of files,
* [@a X * @a N, ( @a X + 1) * @a N ).
* @arg @c Interleave The files are interleaved across readers,
* @a i * @a N + @a X.
* @{
*/
vtkSetClampMacro(PieceDistribution, PieceDistributionStrategy,
Block, Interleave)
vtkGetMacro(PieceDistribution, PieceDistributionStrategy)
/**@}*/
//@{ //@{
/** /**
* Get the output data object for a port on this algorithm. * Get the output data object for a port on this algorithm.
...@@ -107,10 +130,15 @@ protected: ...@@ -107,10 +130,15 @@ protected:
*/ */
int ShouldReadDataSet(unsigned int datasetIndex); int ShouldReadDataSet(unsigned int datasetIndex);
bool DataSetIsValidForBlockStrategy(unsigned int datasetIndex);
bool DataSetIsValidForInterleaveStrategy(unsigned int datasetIndex);
private: private:
vtkXMLCompositeDataReader(const vtkXMLCompositeDataReader&) = delete; vtkXMLCompositeDataReader(const vtkXMLCompositeDataReader&) = delete;
void operator=(const vtkXMLCompositeDataReader&) = delete; void operator=(const vtkXMLCompositeDataReader&) = delete;
PieceDistributionStrategy PieceDistribution;
vtkXMLCompositeDataReaderInternals* Internal; vtkXMLCompositeDataReaderInternals* Internal;
}; };
......
c4c7116c6c7b7f0e92c757a2f5ea3070
93aa53a851147f3483ec6b966207a74e
096d49773e571eb104210d5b423ea17c
6a0ed8e47cfdbb27a2f02996e7b1e25a
989a57c6faffda69c678bb73bcab4623
a9e12205f4b109593a3c13323177a74b
0d2133ec6756ced6dc086f9383573811
9de3f9de7b83424db047035f4f5492ff
b6dc5dde0b97d4c531b7b0446bb3e068
1d7c4098d6d08806d4d04f8a9aaf6d1f
4cb9650348bfb1577c0e2925417dd3f2
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment