From 0af838f86b6bccc87195f3b84534f2d8946983a5 Mon Sep 17 00:00:00 2001
From: Allison Vacanti <allison.vacanti@kitware.com>
Date: Tue, 24 Oct 2017 17:11:23 -0400
Subject: [PATCH] Allow some control over how vtm files are distributed.

Add an option to interleave the datasets across the workers instead
of assigning blocks. This helps load balance certain types of datasets.
---
 IO/XML/vtkXMLCompositeDataReader.cxx | 158 +++++++++++++++++++--------
 IO/XML/vtkXMLCompositeDataReader.h   |  28 +++++
 2 files changed, 143 insertions(+), 43 deletions(-)

diff --git a/IO/XML/vtkXMLCompositeDataReader.cxx b/IO/XML/vtkXMLCompositeDataReader.cxx
index b2a05c647bd..2f2b7cae1fe 100644
--- a/IO/XML/vtkXMLCompositeDataReader.cxx
+++ b/IO/XML/vtkXMLCompositeDataReader.cxx
@@ -59,12 +59,14 @@ struct vtkXMLCompositeDataReaderInternals
   typedef std::map<std::string, vtkSmartPointer<vtkXMLReader> > ReadersType;
   ReadersType Readers;
   static const vtkXMLCompositeDataReaderEntry ReaderList[];
-  unsigned int MinDataset;
-  unsigned int MaxDataset;
+  unsigned int Piece;
+  unsigned int NumPieces;
+  unsigned int NumDataSets;
   vtkXMLCompositeDataReaderInternals()
   {
-    this->MinDataset = 0;
-    this->MaxDataset = 0;
+    this->Piece = 0;
+    this->NumPieces = 1;
+    this->NumDataSets = 1;
   }
   std::set<int> UpdateIndices;
   bool HasUpdateRestriction;
@@ -72,6 +74,7 @@ struct vtkXMLCompositeDataReaderInternals
 
 //----------------------------------------------------------------------------
 vtkXMLCompositeDataReader::vtkXMLCompositeDataReader()
+  : PieceDistribution(Block)
 {
   this->Internal = new vtkXMLCompositeDataReaderInternals;
 }
@@ -85,6 +88,22 @@ vtkXMLCompositeDataReader::~vtkXMLCompositeDataReader()
 //----------------------------------------------------------------------------
 void vtkXMLCompositeDataReader::PrintSelf(ostream& os, vtkIndent indent)
 {
+  os << indent << "PieceDistribution: ";
+  switch (this->PieceDistribution)
+  {
+    case Block:
+      os << "Block\n";
+      break;
+
+    case Interleave:
+      os << "Interleave\n";
+      break;
+
+    default:
+      os << "Invalid (!!)\n";
+      break;
+  }
+
   this->Superclass::PrintSelf(os, indent);
 }
 
@@ -255,10 +274,11 @@ void vtkXMLCompositeDataReader::ReadXMLData()
 {
   vtkInformation* info = this->GetCurrentOutputInformation();
 
-  unsigned int updatePiece = static_cast<unsigned int>(
+  this->Internal->Piece = static_cast<unsigned int>(
     info->Get(vtkStreamingDemandDrivenPipeline::UPDATE_PIECE_NUMBER()));
-  unsigned int updateNumPieces =  static_cast<unsigned int>(
+  this->Internal->NumPieces =  static_cast<unsigned int>(
     info->Get(vtkStreamingDemandDrivenPipeline::UPDATE_NUMBER_OF_PIECES()));
+  this->Internal->NumDataSets = this->CountLeaves(this->GetPrimaryElement());
 
   vtkDataObject* doOutput =
     info->Get(vtkDataObject::DATA_OBJECT());
@@ -284,33 +304,6 @@ void vtkXMLCompositeDataReader::ReadXMLData()
     filePath = "";
   }
 
-  // In earlier implementation only dataset with a group were distributed among
-  // the processes. In this implementation, we distribute all leaf datasets
-  // among the processes.
-
-  // Determine the leaves that this process is going to read.
-  unsigned int numDatasets = this->CountLeaves(this->GetPrimaryElement());
-  unsigned int numDatasetsPerPiece = 1;
-
-  unsigned int remaining_blocks = 0;
-  if (updateNumPieces < numDatasets)
-  {
-    numDatasetsPerPiece = numDatasets / updateNumPieces;
-    remaining_blocks = numDatasets % updateNumPieces;
-  }
-
-  if (updatePiece < remaining_blocks)
-  {
-    this->Internal->MinDataset = (numDatasetsPerPiece+1)*updatePiece;
-    this->Internal->MaxDataset = this->Internal->MinDataset + numDatasetsPerPiece + 1;
-  }
-  else
-  {
-    this->Internal->MinDataset = (numDatasetsPerPiece +1)* remaining_blocks +
-      numDatasetsPerPiece * (updatePiece-remaining_blocks);
-    this->Internal->MaxDataset = this->Internal->MinDataset + numDatasetsPerPiece;
-  }
-
   vtkInformation* outInfo = this->GetCurrentOutputInformation();
   if (outInfo->Has(vtkCompositeDataPipeline::UPDATE_COMPOSITE_INDICES()))
   {
@@ -321,6 +314,10 @@ void vtkXMLCompositeDataReader::ReadXMLData()
     {
       int* idx = outInfo->Get(vtkCompositeDataPipeline::UPDATE_COMPOSITE_INDICES());
       this->Internal->UpdateIndices = std::set<int>(idx, idx+length);
+
+      // Change the total number of datasets so that we'll properly load
+      // balance across the valid datasets.
+      this->Internal->NumDataSets = length;
     }
   }
   else
@@ -335,22 +332,97 @@ void vtkXMLCompositeDataReader::ReadXMLData()
 }
 
 //----------------------------------------------------------------------------
-int vtkXMLCompositeDataReader::ShouldReadDataSet(unsigned int dataSetIndex)
+int vtkXMLCompositeDataReader::ShouldReadDataSet(unsigned int idx)
 {
-  bool shouldRead =
-    (dataSetIndex >= this->Internal->MinDataset &&
-     dataSetIndex < this->Internal->MaxDataset);
-
-  if (shouldRead && this->Internal->HasUpdateRestriction)
+  // Apply the update restriction:
+  if (this->Internal->HasUpdateRestriction)
   {
-    if (this->Internal->UpdateIndices.find(dataSetIndex) ==
-        this->Internal->UpdateIndices.end())
+    auto iter = this->Internal->UpdateIndices.find(idx);
+    if (iter == this->Internal->UpdateIndices.end())
     {
-      shouldRead = false;
+      return 0;
     }
+
+    // Update the dataset index to its position in the update indices:
+    idx = std::distance(this->Internal->UpdateIndices.begin(), iter);
+  }
+
+  int result = 0;
+
+  switch (this->PieceDistribution)
+  {
+    case vtkXMLCompositeDataReader::Block:
+      result = this->DataSetIsValidForBlockStrategy(idx) ? 1 : 0;
+      break;
+
+    case vtkXMLCompositeDataReader::Interleave:
+      result = this->DataSetIsValidForInterleaveStrategy(idx) ? 1 : 0;
+      break;
+
+    default:
+      vtkErrorMacro("Invalid PieceDistribution setting: "
+                    << this->PieceDistribution);
+      break;
+  }
+
+  return result;
+}
+
+//------------------------------------------------------------------------------
+bool vtkXMLCompositeDataReader::DataSetIsValidForBlockStrategy(unsigned int idx)
+{
+  // Minimum number of datasets per block:
+  unsigned int blockSize = 1;
+
+  // Number of blocks with an extra dataset due to overflow:
+  unsigned int overflowBlocks = 0;
+
+  // Adjust values if overflow is detected:
+  if (this->Internal->NumPieces < this->Internal->NumDataSets)
+  {
+    blockSize = this->Internal->NumDataSets / this->Internal->NumPieces;
+    overflowBlocks = this->Internal->NumDataSets % this->Internal->NumPieces;
+  }
+
+  // Size of an overflow block:
+  const unsigned int blockSizeOverflow = blockSize + 1;
+
+  unsigned int minDS; // Minimum valid dataset index
+  unsigned int maxDS; // Maximum valid dataset index
+  if (this->Internal->Piece < overflowBlocks)
+  {
+    minDS = blockSizeOverflow * this->Internal->Piece;
+    maxDS = minDS + blockSizeOverflow;
+  }
+  else
+  {
+    // Account for earlier blocks that have an overflowed dataset:
+    const unsigned int overflowOffset = blockSizeOverflow * overflowBlocks;
+    // Number of preceeding blocks that don't overflow:
+    const unsigned int regularBlocks = this->Internal->Piece - overflowBlocks;
+    // Offset due to regular blocks:
+    const unsigned int regularOffset = blockSize * regularBlocks;
+
+    minDS = overflowOffset + regularOffset;
+    maxDS = minDS + blockSize;
   }
-  return shouldRead;
+
+  return idx >= minDS && idx < maxDS;
 }
+
+//------------------------------------------------------------------------------
+bool vtkXMLCompositeDataReader::
+DataSetIsValidForInterleaveStrategy(unsigned int idx)
+{
+  // Use signed integers for the modulus -- otherwise weird things like
+  // (-1 % 3) == 0 will happen!
+  int i = static_cast<int>(idx);
+  int p = static_cast<int>(this->Internal->Piece);
+  int n = static_cast<int>(this->Internal->NumPieces);
+
+  return ((i - p) % n) == 0;
+}
+
 //----------------------------------------------------------------------------
 vtkDataObject* vtkXMLCompositeDataReader::ReadDataObject(vtkXMLDataElement* xmlElem,
   const char* filePath)
diff --git a/IO/XML/vtkXMLCompositeDataReader.h b/IO/XML/vtkXMLCompositeDataReader.h
index 02c6ed26a1a..270d6451dcb 100644
--- a/IO/XML/vtkXMLCompositeDataReader.h
+++ b/IO/XML/vtkXMLCompositeDataReader.h
@@ -44,6 +44,29 @@ public:
   vtkTypeMacro(vtkXMLCompositeDataReader,vtkXMLReader);
   void PrintSelf(ostream& os, vtkIndent indent) override;
 
+  enum PieceDistributionStrategy
+  {
+    Block,
+    Interleave
+  };
+
+  /**
+   * Set the strategy for assigning files to parallel readers. The default is
+   * @a Block.
+   *
+   * Let @a X be the rank of a specific reader, and @a N be the number of
+   * reader, then:
+   * @arg @c Block Each processor is assigned a contiguous block of files,
+   *      [@a X * @a N, ( @a X + 1) * @a N ).
+   * @arg @c Interleave The files are interleaved across readers,
+   * @a i * @a N + @a X.
+   * @{
+   */
+  vtkSetClampMacro(PieceDistribution, PieceDistributionStrategy,
+                   Block, Interleave)
+  vtkGetMacro(PieceDistribution, PieceDistributionStrategy)
+  /**@}*/
+
   //@{
   /**
    * Get the output data object for a port on this algorithm.
@@ -107,10 +130,15 @@ protected:
    */
   int ShouldReadDataSet(unsigned int datasetIndex);
 
+  bool DataSetIsValidForBlockStrategy(unsigned int datasetIndex);
+  bool DataSetIsValidForInterleaveStrategy(unsigned int datasetIndex);
+
 private:
   vtkXMLCompositeDataReader(const vtkXMLCompositeDataReader&) = delete;
   void operator=(const vtkXMLCompositeDataReader&) = delete;
 
+  PieceDistributionStrategy PieceDistribution;
+
   vtkXMLCompositeDataReaderInternals* Internal;
 };
 
-- 
GitLab