Skip to content
Snippets Groups Projects
Commit 0af838f8 authored by Allison Vacanti's avatar Allison Vacanti
Browse files

Allow some control over how vtm files are distributed.

Add an option to interleave the datasets across the workers instead
of assigning blocks. This helps load balance certain types of datasets.
parent bfb2b808
No related branches found
No related tags found
No related merge requests found
...@@ -59,12 +59,14 @@ struct vtkXMLCompositeDataReaderInternals ...@@ -59,12 +59,14 @@ struct vtkXMLCompositeDataReaderInternals
typedef std::map<std::string, vtkSmartPointer<vtkXMLReader> > ReadersType; typedef std::map<std::string, vtkSmartPointer<vtkXMLReader> > ReadersType;
ReadersType Readers; ReadersType Readers;
static const vtkXMLCompositeDataReaderEntry ReaderList[]; static const vtkXMLCompositeDataReaderEntry ReaderList[];
unsigned int MinDataset; unsigned int Piece;
unsigned int MaxDataset; unsigned int NumPieces;
unsigned int NumDataSets;
vtkXMLCompositeDataReaderInternals() vtkXMLCompositeDataReaderInternals()
{ {
this->MinDataset = 0; this->Piece = 0;
this->MaxDataset = 0; this->NumPieces = 1;
this->NumDataSets = 1;
} }
std::set<int> UpdateIndices; std::set<int> UpdateIndices;
bool HasUpdateRestriction; bool HasUpdateRestriction;
...@@ -72,6 +74,7 @@ struct vtkXMLCompositeDataReaderInternals ...@@ -72,6 +74,7 @@ struct vtkXMLCompositeDataReaderInternals
//---------------------------------------------------------------------------- //----------------------------------------------------------------------------
vtkXMLCompositeDataReader::vtkXMLCompositeDataReader() vtkXMLCompositeDataReader::vtkXMLCompositeDataReader()
: PieceDistribution(Block)
{ {
this->Internal = new vtkXMLCompositeDataReaderInternals; this->Internal = new vtkXMLCompositeDataReaderInternals;
} }
...@@ -85,6 +88,22 @@ vtkXMLCompositeDataReader::~vtkXMLCompositeDataReader() ...@@ -85,6 +88,22 @@ vtkXMLCompositeDataReader::~vtkXMLCompositeDataReader()
//---------------------------------------------------------------------------- //----------------------------------------------------------------------------
void vtkXMLCompositeDataReader::PrintSelf(ostream& os, vtkIndent indent) void vtkXMLCompositeDataReader::PrintSelf(ostream& os, vtkIndent indent)
{ {
os << indent << "PieceDistribution: ";
switch (this->PieceDistribution)
{
case Block:
os << "Block\n";
break;
case Interleave:
os << "Interleave\n";
break;
default:
os << "Invalid (!!)\n";
break;
}
this->Superclass::PrintSelf(os, indent); this->Superclass::PrintSelf(os, indent);
} }
...@@ -255,10 +274,11 @@ void vtkXMLCompositeDataReader::ReadXMLData() ...@@ -255,10 +274,11 @@ void vtkXMLCompositeDataReader::ReadXMLData()
{ {
vtkInformation* info = this->GetCurrentOutputInformation(); vtkInformation* info = this->GetCurrentOutputInformation();
unsigned int updatePiece = static_cast<unsigned int>( this->Internal->Piece = static_cast<unsigned int>(
info->Get(vtkStreamingDemandDrivenPipeline::UPDATE_PIECE_NUMBER())); info->Get(vtkStreamingDemandDrivenPipeline::UPDATE_PIECE_NUMBER()));
unsigned int updateNumPieces = static_cast<unsigned int>( this->Internal->NumPieces = static_cast<unsigned int>(
info->Get(vtkStreamingDemandDrivenPipeline::UPDATE_NUMBER_OF_PIECES())); info->Get(vtkStreamingDemandDrivenPipeline::UPDATE_NUMBER_OF_PIECES()));
this->Internal->NumDataSets = this->CountLeaves(this->GetPrimaryElement());
vtkDataObject* doOutput = vtkDataObject* doOutput =
info->Get(vtkDataObject::DATA_OBJECT()); info->Get(vtkDataObject::DATA_OBJECT());
...@@ -284,33 +304,6 @@ void vtkXMLCompositeDataReader::ReadXMLData() ...@@ -284,33 +304,6 @@ void vtkXMLCompositeDataReader::ReadXMLData()
filePath = ""; filePath = "";
} }
// In earlier implementation only dataset with a group were distributed among
// the processes. In this implementation, we distribute all leaf datasets
// among the processes.
// Determine the leaves that this process is going to read.
unsigned int numDatasets = this->CountLeaves(this->GetPrimaryElement());
unsigned int numDatasetsPerPiece = 1;
unsigned int remaining_blocks = 0;
if (updateNumPieces < numDatasets)
{
numDatasetsPerPiece = numDatasets / updateNumPieces;
remaining_blocks = numDatasets % updateNumPieces;
}
if (updatePiece < remaining_blocks)
{
this->Internal->MinDataset = (numDatasetsPerPiece+1)*updatePiece;
this->Internal->MaxDataset = this->Internal->MinDataset + numDatasetsPerPiece + 1;
}
else
{
this->Internal->MinDataset = (numDatasetsPerPiece +1)* remaining_blocks +
numDatasetsPerPiece * (updatePiece-remaining_blocks);
this->Internal->MaxDataset = this->Internal->MinDataset + numDatasetsPerPiece;
}
vtkInformation* outInfo = this->GetCurrentOutputInformation(); vtkInformation* outInfo = this->GetCurrentOutputInformation();
if (outInfo->Has(vtkCompositeDataPipeline::UPDATE_COMPOSITE_INDICES())) if (outInfo->Has(vtkCompositeDataPipeline::UPDATE_COMPOSITE_INDICES()))
{ {
...@@ -321,6 +314,10 @@ void vtkXMLCompositeDataReader::ReadXMLData() ...@@ -321,6 +314,10 @@ void vtkXMLCompositeDataReader::ReadXMLData()
{ {
int* idx = outInfo->Get(vtkCompositeDataPipeline::UPDATE_COMPOSITE_INDICES()); int* idx = outInfo->Get(vtkCompositeDataPipeline::UPDATE_COMPOSITE_INDICES());
this->Internal->UpdateIndices = std::set<int>(idx, idx+length); this->Internal->UpdateIndices = std::set<int>(idx, idx+length);
// Change the total number of datasets so that we'll properly load
// balance across the valid datasets.
this->Internal->NumDataSets = length;
} }
} }
else else
...@@ -335,22 +332,97 @@ void vtkXMLCompositeDataReader::ReadXMLData() ...@@ -335,22 +332,97 @@ void vtkXMLCompositeDataReader::ReadXMLData()
} }
//---------------------------------------------------------------------------- //----------------------------------------------------------------------------
int vtkXMLCompositeDataReader::ShouldReadDataSet(unsigned int dataSetIndex) int vtkXMLCompositeDataReader::ShouldReadDataSet(unsigned int idx)
{ {
bool shouldRead = // Apply the update restriction:
(dataSetIndex >= this->Internal->MinDataset && if (this->Internal->HasUpdateRestriction)
dataSetIndex < this->Internal->MaxDataset);
if (shouldRead && this->Internal->HasUpdateRestriction)
{ {
if (this->Internal->UpdateIndices.find(dataSetIndex) == auto iter = this->Internal->UpdateIndices.find(idx);
this->Internal->UpdateIndices.end()) if (iter == this->Internal->UpdateIndices.end())
{ {
shouldRead = false; return 0;
} }
// Update the dataset index to its position in the update indices:
idx = std::distance(this->Internal->UpdateIndices.begin(), iter);
}
int result = 0;
switch (this->PieceDistribution)
{
case vtkXMLCompositeDataReader::Block:
result = this->DataSetIsValidForBlockStrategy(idx) ? 1 : 0;
break;
case vtkXMLCompositeDataReader::Interleave:
result = this->DataSetIsValidForInterleaveStrategy(idx) ? 1 : 0;
break;
default:
vtkErrorMacro("Invalid PieceDistribution setting: "
<< this->PieceDistribution);
break;
}
return result;
}
//------------------------------------------------------------------------------
bool vtkXMLCompositeDataReader::DataSetIsValidForBlockStrategy(unsigned int idx)
{
// Minimum number of datasets per block:
unsigned int blockSize = 1;
// Number of blocks with an extra dataset due to overflow:
unsigned int overflowBlocks = 0;
// Adjust values if overflow is detected:
if (this->Internal->NumPieces < this->Internal->NumDataSets)
{
blockSize = this->Internal->NumDataSets / this->Internal->NumPieces;
overflowBlocks = this->Internal->NumDataSets % this->Internal->NumPieces;
}
// Size of an overflow block:
const unsigned int blockSizeOverflow = blockSize + 1;
unsigned int minDS; // Minimum valid dataset index
unsigned int maxDS; // Maximum valid dataset index
if (this->Internal->Piece < overflowBlocks)
{
minDS = blockSizeOverflow * this->Internal->Piece;
maxDS = minDS + blockSizeOverflow;
}
else
{
// Account for earlier blocks that have an overflowed dataset:
const unsigned int overflowOffset = blockSizeOverflow * overflowBlocks;
// Number of preceeding blocks that don't overflow:
const unsigned int regularBlocks = this->Internal->Piece - overflowBlocks;
// Offset due to regular blocks:
const unsigned int regularOffset = blockSize * regularBlocks;
minDS = overflowOffset + regularOffset;
maxDS = minDS + blockSize;
} }
return shouldRead;
return idx >= minDS && idx < maxDS;
} }
//------------------------------------------------------------------------------
bool vtkXMLCompositeDataReader::
DataSetIsValidForInterleaveStrategy(unsigned int idx)
{
// Use signed integers for the modulus -- otherwise weird things like
// (-1 % 3) == 0 will happen!
int i = static_cast<int>(idx);
int p = static_cast<int>(this->Internal->Piece);
int n = static_cast<int>(this->Internal->NumPieces);
return ((i - p) % n) == 0;
}
//---------------------------------------------------------------------------- //----------------------------------------------------------------------------
vtkDataObject* vtkXMLCompositeDataReader::ReadDataObject(vtkXMLDataElement* xmlElem, vtkDataObject* vtkXMLCompositeDataReader::ReadDataObject(vtkXMLDataElement* xmlElem,
const char* filePath) const char* filePath)
......
...@@ -44,6 +44,29 @@ public: ...@@ -44,6 +44,29 @@ public:
vtkTypeMacro(vtkXMLCompositeDataReader,vtkXMLReader); vtkTypeMacro(vtkXMLCompositeDataReader,vtkXMLReader);
void PrintSelf(ostream& os, vtkIndent indent) override; void PrintSelf(ostream& os, vtkIndent indent) override;
enum PieceDistributionStrategy
{
Block,
Interleave
};
/**
* Set the strategy for assigning files to parallel readers. The default is
* @a Block.
*
* Let @a X be the rank of a specific reader, and @a N be the number of
* reader, then:
* @arg @c Block Each processor is assigned a contiguous block of files,
* [@a X * @a N, ( @a X + 1) * @a N ).
* @arg @c Interleave The files are interleaved across readers,
* @a i * @a N + @a X.
* @{
*/
vtkSetClampMacro(PieceDistribution, PieceDistributionStrategy,
Block, Interleave)
vtkGetMacro(PieceDistribution, PieceDistributionStrategy)
/**@}*/
//@{ //@{
/** /**
* Get the output data object for a port on this algorithm. * Get the output data object for a port on this algorithm.
...@@ -107,10 +130,15 @@ protected: ...@@ -107,10 +130,15 @@ protected:
*/ */
int ShouldReadDataSet(unsigned int datasetIndex); int ShouldReadDataSet(unsigned int datasetIndex);
bool DataSetIsValidForBlockStrategy(unsigned int datasetIndex);
bool DataSetIsValidForInterleaveStrategy(unsigned int datasetIndex);
private: private:
vtkXMLCompositeDataReader(const vtkXMLCompositeDataReader&) = delete; vtkXMLCompositeDataReader(const vtkXMLCompositeDataReader&) = delete;
void operator=(const vtkXMLCompositeDataReader&) = delete; void operator=(const vtkXMLCompositeDataReader&) = delete;
PieceDistributionStrategy PieceDistribution;
vtkXMLCompositeDataReaderInternals* Internal; vtkXMLCompositeDataReaderInternals* Internal;
}; };
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment