Commit d9c4d3d8 authored by Kenneth Moreland's avatar Kenneth Moreland
Browse files

ENH: Added an image reader that uses MPI I/O to efficiently read from parallel file systems.

parent 3c906302
......@@ -24,7 +24,7 @@
#include "vtkStreamingDemandDrivenPipeline.h"
#include "vtkTransform.h"
vtkCxxRevisionMacro(vtkImageReader, "1.122");
vtkCxxRevisionMacro(vtkImageReader, "1.123");
vtkStandardNewMacro(vtkImageReader);
vtkCxxSetObjectMacro(vtkImageReader,Transform,vtkTransform);
......@@ -46,9 +46,8 @@ vtkImageReader::vtkImageReader()
{
this->DataVOI[idx*2] = this->DataVOI[idx*2 + 1] = 0;
}
// Left over from short reader
this->DataMask = 0xffff;
this->DataMask = static_cast<vtkTypeUInt64>(~0UL);
this->Transform = NULL;
this->ScalarArrayName = NULL;
......@@ -211,7 +210,7 @@ void vtkImageReaderUpdate2(vtkImageReader *self, vtkImageData *data,
int comp, pixelSkip;
long filePos, correction = 0;
unsigned long count = 0;
unsigned short DataMask;
vtkTypeUInt64 DataMask;
unsigned long target;
// Get the requested extents.
......@@ -327,7 +326,7 @@ void vtkImageReaderUpdate2(vtkImageReader *self, vtkImageData *data,
for (idx0 = dataExtent[0]; idx0 <= dataExtent[1]; ++idx0)
{
// Copy pixel into the output.
if (DataMask == 0xffff)
if (DataMask == static_cast<vtkTypeUInt64>(~0UL))
{
for (comp = 0; comp < pixelSkip; comp++)
{
......@@ -336,10 +335,9 @@ void vtkImageReaderUpdate2(vtkImageReader *self, vtkImageData *data,
}
else
{
// left over from short reader (what about other types.
for (comp = 0; comp < pixelSkip; comp++)
{
outPtr0[comp] = (OT)((short)(inPtr[comp]) & DataMask);
outPtr0[comp] = (OT)((vtkTypeUInt64)(inPtr[comp]) & DataMask);
}
}
// move to next pixel
......
......@@ -45,17 +45,13 @@ public:
vtkGetVector6Macro(DataVOI,int);
// Description:
// Set/Get the Data mask.
vtkGetMacro(DataMask,unsigned short);
void SetDataMask(int val)
{
if (val == this->DataMask)
{
return;
}
this->DataMask = static_cast<unsigned short>(val);
this->Modified();
}
// Set/Get the Data mask. The data mask is a simply integer whose bits are
// treated as a mask to the bits read from disk. That is, the data mask is
// bitwise-and'ed to the numbers read from disk. This ivar is stored as 64
// bits, the largest mask you will need. The mask will be truncated to the
// data size required to be read (using the least significant bits).
vtkGetMacro(DataMask, vtkTypeUInt64);
vtkSetMacro(DataMask, vtkTypeUInt64);
// Description:
// Set/Get transformation matrix to transform the data from slice space
......@@ -82,7 +78,7 @@ protected:
vtkImageReader();
~vtkImageReader();
unsigned short DataMask; // Mask each pixel with ...
vtkTypeUInt64 DataMask;
vtkTransform *Transform;
......
......@@ -45,6 +45,7 @@ vtkExtractPiece.cxx
vtkExtractPolyDataPiece.cxx
vtkExtractUnstructuredGridPiece.cxx
vtkExtractUserDefinedPiece.cxx
vtkMPIImageReader.cxx
vtkMultiProcessController.cxx
vtkMultiProcessStream.cxx
vtkParallelFactory.cxx
......@@ -58,6 +59,7 @@ vtkPieceRequestFilter.cxx
vtkPieceScalars.cxx
vtkPKdTree.cxx
vtkPLinearExtrusionFilter.cxx
vtkPNrrdReader.cxx
vtkPOPReader.cxx
${_VTK_OPENFOAM_SOURCES}
vtkPOutlineCornerFilter.cxx
......
......@@ -349,6 +349,16 @@ IF(VTK_USE_DISPLAY AND VTK_USE_RENDERING)
${VTK_MPI_POSTFLAGS})
ENDIF(VTK_MPI_MAX_NUMPROCS GREATER 1)
IF(VTK_MPI_MAX_NUMPROCS GREATER 6)
ADD_TEST(ParallelIso-7proc-image
${VTK_MPIRUN_EXE} ${VTK_MPI_PRENUMPROC_FLAGS} ${VTK_MPI_NUMPROC_FLAG} 7 ${VTK_MPI_PREFLAGS}
${CXX_TEST_PATH}/ParallelIsoTest
-D ${VTK_DATA_ROOT}
-T ${VTK_BINARY_DIR}/Testing/Temporary
-V Baseline/Parallel/ParallelIso.7proc.png
${VTK_MPI_POSTFLAGS})
ENDIF(VTK_MPI_MAX_NUMPROCS GREATER 6)
ENDIF (VTK_MPIRUN_EXE)
#
# If we do not have the data, still run the tests that we can
......
......@@ -1052,6 +1052,22 @@ int ExerciseMultiProcessController(vtkMultiProcessController *controller)
args.retval = 1;
}
int color = (group1->GetLocalProcessId() >= 0) ? 1 : 2;
vtkMultiProcessController *subcontroller
= controller->PartitionController(color, 0);
subcontroller->SetSingleMethod(Run, &args);
subcontroller->SingleMethodExecute();
subcontroller->Delete();
try
{
CheckSuccess(controller, !args.retval);
}
catch (ExerciseMultiProcessControllerError)
{
args.retval = 1;
}
return args.retval;
}
......@@ -13,7 +13,7 @@
=========================================================================*/
// This example demonstrates the use of data parallelism in VTK. The
// pipeline ( vtkImageReader -> vtkContourFilter -> vtkElevationFilter )
// pipeline ( vtkMPIImageReader -> vtkContourFilter -> vtkElevationFilter )
// is created in parallel and each process is assigned 1 piece to process.
// All satellite processes send the result to the first process which
// collects and renders them.
......@@ -27,10 +27,10 @@
#include "vtkContourFilter.h"
#include "vtkDataSet.h"
#include "vtkElevationFilter.h"
#include "vtkImageReader.h"
#include "vtkMath.h"
#include "vtkMPIController.h"
#include "vtkParallelFactory.h"
#include "vtkPNrrdReader.h"
#include "vtkPolyData.h"
#include "vtkPolyDataMapper.h"
#include "vtkTestUtilities.h"
......@@ -87,7 +87,7 @@ void SetIsoValueRMI(void *localArg, void* vtkNotUsed(remoteArg),
// This will be called by all processes
void MyMain( vtkMultiProcessController *controller, void *arg )
{
vtkImageReader *reader;
vtkPNrrdReader *reader;
vtkContourFilter *iso;
vtkElevationFilter *elev;
int myid, numProcs;
......@@ -102,12 +102,13 @@ void MyMain( vtkMultiProcessController *controller, void *arg )
// Create the reader, the data file name might have
// to be changed depending on where the data files are.
char* fname = vtkTestUtilities::ExpandDataFileName(args->argc, args->argv,
"Data/headsq/quarter");
reader = vtkImageReader::New();
reader->SetDataByteOrderToLittleEndian();
reader->SetDataExtent(0, 63, 0, 63, 1, 93);
reader->SetFilePrefix(fname);
reader->SetDataSpacing(3.2, 3.2, 1.5);
"Data/headsq/quarter.nhdr");
reader = vtkPNrrdReader::New();
reader->SetFileName(fname);
// reader->SetDataByteOrderToLittleEndian();
// reader->SetDataExtent(0, 63, 0, 63, 1, 93);
// reader->SetFilePrefix(fname);
// reader->SetDataSpacing(3.2, 3.2, 1.5);
delete[] fname;
// Iso-surface.
......@@ -129,6 +130,9 @@ void MyMain( vtkMultiProcessController *controller, void *arg )
exec->SetUpdateNumberOfPieces(exec->GetOutputInformation(0), numProcs);
exec->SetUpdatePiece(exec->GetOutputInformation(0), myid);
// Make sure all processes update at the same time.
elev->Update();
if (myid != 0)
{
// If I am not the root process
......
......@@ -52,6 +52,13 @@ public:
MPI_Comm* Handle;
};
class VTK_PARALLEL_EXPORT vtkMPIOpaqueFileHandle
{
public:
vtkMPIOpaqueFileHandle() : Handle(MPI_FILE_NULL) { }
MPI_File Handle;
};
//-----------------------------------------------------------------------------
class vtkMPICommunicatorOpaqueRequest
{
......
......@@ -30,7 +30,7 @@
#include <vtkstd/vector>
vtkCxxRevisionMacro(vtkMPICommunicator, "1.55");
vtkCxxRevisionMacro(vtkMPICommunicator, "1.56");
vtkStandardNewMacro(vtkMPICommunicator);
vtkMPICommunicator* vtkMPICommunicator::WorldCommunicator = 0;
......@@ -552,6 +552,53 @@ int vtkMPICommunicator::Initialize(vtkProcessGroup *group)
return 1;
}
//-----------------------------------------------------------------------------
int vtkMPICommunicator::SplitInitialize(vtkCommunicator *oldcomm,
int color, int key)
{
if (this->Initialized) return 0;
vtkMPICommunicator *mpiComm = vtkMPICommunicator::SafeDownCast(oldcomm);
if (!mpiComm)
{
vtkErrorMacro("Split communicator must be an MPI communicator.");
return 0;
}
// If mpiComm has been initialized, it is guaranteed (unless the MPI calls
// return an error somewhere) to have valid Communicator.
if (!mpiComm->Initialized)
{
vtkWarningMacro("The communicator passed has not been initialized!");
return 0;
}
this->KeepHandleOff();
this->MPIComm->Handle = new MPI_Comm;
int err;
if ( (err = MPI_Comm_split(*(mpiComm->MPIComm->Handle), color, key,
this->MPIComm->Handle))
!= MPI_SUCCESS )
{
delete this->MPIComm->Handle;
this->MPIComm->Handle = 0;
char *msg = vtkMPIController::ErrorString(err);
vtkErrorMacro("MPI error occured: " << msg);
delete[] msg;
return 0;
}
this->InitializeNumberOfProcesses();
this->Initialized = 1;
this->Modified();
return 1;
}
//----------------------------------------------------------------------------
// Start the copying process
void vtkMPICommunicator::InitializeCopy(vtkMPICommunicator* source)
......
......@@ -85,6 +85,11 @@ public:
// DO NOT CALL. Deprecated in VTK 5.2.
VTK_LEGACY(int Initialize(vtkMPICommunicator* mpiComm, vtkMPIGroup* group));
// Description:
// Used to initialize the communicator (i.e. create the underlying MPI_Comm)
// using MPI_Comm_split on the given communicator.
int SplitInitialize(vtkCommunicator *oldcomm, int color, int key);
// Description:
// Performs the actual communication. You will usually use the convenience
// Send functions defined in the superclass.
......
......@@ -70,9 +70,9 @@ void vtkMPIController::CreateOutputWindow()
vtkOutputWindow::SetInstance(this->OutputWindow);
}
vtkCxxRevisionMacro(vtkMPIOutputWindow, "1.28");
vtkCxxRevisionMacro(vtkMPIOutputWindow, "1.29");
vtkCxxRevisionMacro(vtkMPIController, "1.28");
vtkCxxRevisionMacro(vtkMPIController, "1.29");
vtkStandardNewMacro(vtkMPIController);
//----------------------------------------------------------------------------
......@@ -324,3 +324,19 @@ vtkMPIController *vtkMPIController::CreateSubController(vtkProcessGroup *group)
controller->SetCommunicator(subcomm);
return controller;
}
//-----------------------------------------------------------------------------
vtkMPIController *vtkMPIController::PartitionController(int localColor,
int localKey)
{
VTK_CREATE(vtkMPICommunicator, subcomm);
if (!subcomm->SplitInitialize(this->Communicator, localColor, localKey))
{
return NULL;
}
vtkMPIController *controller = vtkMPIController::New();
controller->SetCommunicator(subcomm);
return controller;
}
......@@ -114,6 +114,8 @@ public:
virtual vtkMPIController *CreateSubController(vtkProcessGroup *group);
virtual vtkMPIController *PartitionController(int localColor, int localKey);
//BTX
// Description:
......
// -*- c++ -*-
/*=========================================================================
Program: Visualization Toolkit
Module: vtkMPIImageReader.cxx
Copyright (c) Ken Martin, Will Schroeder, Bill Lorensen
All rights reserved.
See Copyright.txt or http://www.kitware.com/Copyright.htm for details.
This software is distributed WITHOUT ANY WARRANTY; without even
the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
PURPOSE. See the above copyright notice for more information.
=========================================================================*/
/*----------------------------------------------------------------------------
Copyright (c) Sandia Corporation
See Copyright.txt or http://www.paraview.org/HTML/Copyright.html for details.
----------------------------------------------------------------------------*/
#include "vtkMPIImageReader.h"
#include "vtkMultiProcessController.h"
#include "vtkObjectFactory.h"
#include "vtkToolkits.h"
#ifdef VTK_USE_MPI
// Include the MPI headers and then determine if MPIIO is available.
#include "vtkMPI.h"
#ifdef MPI_VERSION
# if (MPI_VERSION >= 2)
# define VTK_USE_MPI_IO 1
# endif
#endif
#if !defined(VTK_USE_MPI_IO) && defined(ROMIO_VERSION)
# define VTK_USE_MPI_IO 1
#endif
#if !defined(VTK_USE_MPI_IO) && defined(MPI_SEEK_SET)
# define VTK_USE_MPI_IO 1
#endif
#endif // VTK_USE_MPI
// If VTK_USE_MPI_IO is set, that means we will read the data ourself using
// MPIIO. Otherwise, just delegate everything to the superclass.
#ifdef VTK_USE_MPI_IO
// We only need these includes if we are actually loading the data.
#include "vtkByteSwap.h"
#include "vtkDataArray.h"
#include "vtkImageData.h"
#include "vtkMPIController.h"
#include "vtkPointData.h"
#include "vtkTransform.h"
#include "vtkType.h"
// This macro can be wrapped around MPI function calls to easily report errors.
// Reporting errors is more important with file I/O because, unlike network I/O,
// they usually don't terminate the program.
#define MPICall(funcall) \
{ \
int __my_result = funcall; \
if (__my_result != MPI_SUCCESS) \
{ \
char errormsg[MPI_MAX_ERROR_STRING]; \
int dummy; \
MPI_Error_string(__my_result, errormsg, &dummy); \
vtkErrorMacro(<< "Received error when calling" << endl \
<< #funcall << endl << endl \
<< errormsg); \
} \
}
#endif // VTK_USE_MPI_IO
//=============================================================================
vtkCxxRevisionMacro(vtkMPIImageReader, "1.1");
vtkStandardNewMacro(vtkMPIImageReader);
vtkCxxSetObjectMacro(vtkMPIImageReader, Controller, vtkMultiProcessController);
vtkCxxSetObjectMacro(vtkMPIImageReader, GroupedController,
vtkMultiProcessController);
//-----------------------------------------------------------------------------
#ifdef VTK_USE_MPI_IO
template<class T>
inline void vtkMPIImageReaderMaskBits(T *data, vtkIdType length,
vtkTypeUInt64 _mask)
{
T mask = (T)_mask;
// If the mask is the identity, just return.
if ((_mask == (vtkTypeUInt64)~0UL) || (mask == (T)~0) || (_mask == 0)) return;
for (vtkIdType i = 0; i < length; i++)
{
data[i] &= mask;
}
}
// Override float and double because masking bits for them makes no sense.
VTK_TEMPLATE_SPECIALIZE
void vtkMPIImageReaderMaskBits(float *, vtkIdType, vtkTypeUInt64)
{
return;
}
VTK_TEMPLATE_SPECIALIZE
void vtkMPIImageReaderMaskBits(double *, vtkIdType, vtkTypeUInt64)
{
return;
}
#endif //VTK_USE_MPI_IO
//-----------------------------------------------------------------------------
#ifdef VTK_USE_MPI_IO
namespace {
template<class T>
inline T MY_ABS(T x) { return (x < 0) ? -x : x; }
template<class T>
inline T MY_MIN(T x, T y) { return (x < y) ? x : y; }
};
#endif //VTK_USE_MPI_IO
//=============================================================================
vtkMPIImageReader::vtkMPIImageReader()
{
this->Controller = NULL;
this->SetController(vtkMultiProcessController::GetGlobalController());
this->GroupedController = NULL;
}
vtkMPIImageReader::~vtkMPIImageReader()
{
this->SetController(NULL);
this->SetGroupedController(NULL);
}
void vtkMPIImageReader::PrintSelf(ostream &os, vtkIndent indent)
{
this->Superclass::PrintSelf(os, indent);
os << indent << "Controller: " << this->Controller << endl;
}
//-----------------------------------------------------------------------------
int vtkMPIImageReader::GetDataScalarTypeSize()
{
switch (this->GetDataScalarType())
{
vtkTemplateMacro(return sizeof(VTK_TT));
default:
vtkErrorMacro("Unknown data type.");
return 0;
}
}
//-----------------------------------------------------------------------------
#ifdef VTK_USE_MPI_IO
void vtkMPIImageReader::PartitionController(const int extent[6])
{
// Number of points in the z direction of the whole data.
int numZ = this->DataExtent[5] - this->DataExtent[4] + 1;
if ((this->GetFileDimensionality() == 3) || (numZ == 1))
{
// Everyone reads from the same single file. No need to partion controller.
this->SetGroupedController(this->Controller);
return;
}
// The following algorithm will have overflow problems if there are more
// than 2^15 files. I doubt anyone will ever be crazy enough to set up a
// large 3D image with that many slice files, but just in case...
if (numZ >= 32768)
{
vtkErrorMacro("I do not support more than 32768 files.");
return;
}
// Hash the Z extent. This is guaranteed to be unique for any pair of
// extents (within the constraint given above).
int extentHash = ( extent[4]+this->DataExtent[4]
+ (extent[5]+this->DataExtent[4])*numZ );
vtkMultiProcessController *subController
= this->Controller->PartitionController(extentHash, 0);
this->SetGroupedController(subController);
subController->Delete();
}
#else // VTK_USE_MPI_IO
void vtkMPIImageReader::PartitionController(const int *)
{
vtkErrorMacro(<< "vtkMPIImageReader::PartitionController() called when MPIIO "
<< "not available.");
}
#endif // VTK_USE_MPI_IO
//-----------------------------------------------------------------------------
// Technically we should be returning a 64 bit number, but I doubt any header
// will be bigger than the value stored in an unsigned int. Thus, we just
// follow the convention of the superclass.
#ifdef VTK_USE_MPI_IO
unsigned long vtkMPIImageReader::GetHeaderSize(vtkMPIOpaqueFileHandle &file)
{
if (this->ManualHeaderSize)
{
return this->HeaderSize;
}
else
{
this->ComputeDataIncrements();
MPI_Offset size;
MPICall(MPI_File_get_size(file.Handle, &size));
return static_cast<unsigned long>
(size - this->DataIncrements[this->GetFileDimensionality()]);
}
}
#else // VTK_USE_MPI_IO
unsigned long vtkMPIImageReader::GetHeaderSize(vtkMPIOpaqueFileHandle &)
{
vtkErrorMacro(<< "vtkMPIImageReader::GetHeaderSize() called when MPIIO "
<< "not available.");
return 0;
}
#endif // VTK_USE_MPI_IO
//-----------------------------------------------------------------------------
#ifdef VTK_USE_MPI_IO
void vtkMPIImageReader::SetupFileView(vtkMPIOpaqueFileHandle &file,
const int extent[6])
{
int arrayOfSizes[3];
int arrayOfSubSizes[3];
int arrayOfStarts[3];
for (int i = 0; i < this->GetFileDimensionality(); i++)
{
arrayOfSizes[i] = this->DataExtent[i*2+1] - this->DataExtent[i*2] + 1;
arrayOfSubSizes[i] = extent[i*2+1] - extent[i*2] + 1;
arrayOfStarts[i] = extent[i*2];
}
// Adjust for base size of data type and tuple size.
int baseSize = this->GetDataScalarTypeSize() * this->NumberOfScalarComponents;
arrayOfSizes[0] *= baseSize;
arrayOfSubSizes[0] *= baseSize;
arrayOfStarts[0] *= baseSize;
// Create a view in MPIIO.
MPI_Datatype view;
MPICall(MPI_Type_create_subarray(this->GetFileDimensionality(),
arrayOfSizes, arrayOfSubSizes, arrayOfStarts,
MPI_ORDER_FORTRAN, MPI_BYTE, &view));
MPICall(MPI_Type_commit(&view));
MPICall(MPI_File_set_view(file.Handle, this->GetHeaderSize(file), MPI_BYTE,
view, const_cast<char *>("native"), MPI_INFO_NULL));
MPICall(MPI_Type_free(&view));
}
#else // VTK_USE_MPI_IO
void vtkMPIImageReader::SetupFileView(vtkMPIOpaqueFileHandle &, const int[6])
{
vtkErrorMacro(<< "vtkMPIImageReader::SetupFileView() called when MPIIO "
<< "not available.");
}
#endif // VTK_USE_MPI_IO
//-----------------------------------------------------------------------------
#ifdef VTK_USE_MPI_IO
void vtkMPIImageReader::ReadSlice(int slice, const int extent[6], void *buffer)
{
this->ComputeInternalFileName(slice);
vtkMPICommunicator *mpiComm = vtkMPICommunicator::SafeDownCast(
this->GroupedController->GetCommunicator());
// Open the file for this slice.
vtkMPIOpaqueFileHandle file;
int result;