Commit 289b3364 authored by Burlen Loring's avatar Burlen Loring
Browse files

PosthocIO file-per-process

I added file-per-process output via VTK's parallel XML
writers. The PosthocIO adapter can be confiugured for
either mpiIO or vtkXmlP mode in the run time xml config.
parent f273cc9c
......@@ -40,6 +40,9 @@ cmake_dependent_option(
cmake_dependent_option(
ENABLE_ADIOS "Enable analysis methods that use ADIOS" OFF "ENABLE_SENSEI" OFF)
#------------------------------------------------------------------------------
option(ENABLE_VTK_XMLP "Enable use of vtk parallel I/O" OFF)
#------------------------------------------------------------------------------
include(mpi)
if(ENABLE_ADIOS)
......
<sensei>
<analysis type="PosthocIO" array="data" association="cell" enabled="0" period="2" />
<analysis type="PosthocIO" array="data" association="cell" enabled="0" period="1" mode="vtkXmlP" />
<analysis type="histogram" array="data" association="cell" bins="10" />
<analysis type="autocorrelation" array="data" association="cell" window="10" k-max="3" />
<analysis type="catalyst" pipeline="slice" array="data" association="cell" />
......
......@@ -49,11 +49,15 @@ if(ENABLE_CATALYST)
target_link_libraries(sensei PUBLIC ${VTK_LIBRARIES})
target_compile_definitions(sensei PUBLIC ENABLE_CATALYST)
else()
find_package(VTK QUIET COMPONENTS vtkCommonDataModel)
set(SENSEI_VTK_DEPS vtkCommonDataModel)
if (ENABLE_VTK_XMLP)
list(APPEND SENSEI_VTK_DEPS vtkParallelMPI vtkIOParallelXML)
endif()
find_package(VTK QUIET COMPONENTS ${SENSEI_VTK_DEPS})
if(NOT VTK_FOUND)
message(FATAL_ERROR "VTK (vtkCommonDataModel) is required for Sensei core "
"even when not using any infrastructures. Please set VTK_DIR to point to a "
"directory containing `VTKConfig.cmake`.")
message(FATAL_ERROR "VTK (${SENSEI_VTK_DEPS}) modules are required for "
"Sensei core even when not using any infrastructures. Please set "
"VTK_DIR to point to a directory containing `VTKConfig.cmake`.")
endif()
target_include_directories(sensei SYSTEM PUBLIC ${VTK_INCLUDE_DIRS})
target_compile_definitions(sensei PUBLIC ${VTK_DEFINITIONS})
......@@ -68,6 +72,10 @@ if(ENABLE_ADIOS)
target_compile_definitions(sensei PUBLIC ENABLE_ADIOS)
endif()
if (ENABLE_VTK_XMLP)
target_compile_definitions(sensei PUBLIC ENABLE_VTK_XMLP)
endif()
target_link_libraries(sensei
PUBLIC ${VTK_LIBRARIES}
mpi
......
......@@ -21,7 +21,7 @@
#include <vector>
#include <pugixml.hpp>
#define sensei_error(_arg) \
#define ConfigurableAnalysisError(_arg) \
cerr << "ERROR: " << __FILE__ " : " << __LINE__ << std::endl \
<< "" _arg << std::endl;
......@@ -41,7 +41,7 @@ class ConfigurableAnalysis::vtkInternals
{
return vtkDataObject::FIELD_ASSOCIATION_CELLS;
}
sensei_error(<< "Invalid association type '" << association.c_str() << "'. Assuming 'point'");
ConfigurableAnalysisError(<< "Invalid association type '" << association.c_str() << "'. Assuming 'point'");
return vtkDataObject::FIELD_ASSOCIATION_POINTS;
}
......@@ -63,7 +63,7 @@ public:
}
else
{
sensei_error(<< "'histogram' missing required attribute 'array'. Skipping.");
ConfigurableAnalysisError(<< "'histogram' missing required attribute 'array'. Skipping.");
}
}
#endif
......@@ -120,18 +120,15 @@ public:
this->Analyses.push_back(adaptor.GetPointer());
}
void AddPosthocIO(MPI_Comm comm, pugi::xml_node node)
int AddPosthocIO(MPI_Comm comm, pugi::xml_node node)
{
if (!node.attribute("enabled") || !node.attribute("enabled").as_int())
{
cerr << "Skipping 'PosthocIO'." << endl;
return;
}
return -1;
if (!node.attribute("array"))
{
sensei_error(<< "need at least one array");
return;
ConfigurableAnalysisError(<< "need at least one array");
return -1;
}
std::string arrayName = node.attribute("array").value();
......@@ -153,11 +150,20 @@ public:
std::string blockExt = "block";
if (node.attribute("block_ext"))
blockExt = node.attribute("block_ext").value();
blockExt = node.attribute("block_ext").value();
int mode = PosthocIO::MpiIO;
int mode = PosthocIO::mpiIO;
if (node.attribute("mode"))
mode = node.attribute("mode").as_int();
{
std::string val = node.attribute("mode").value();
if (val == "vtkXmlP")
mode = PosthocIO::vtkXmlP;
else
if (val == "mpiIO")
mode = PosthocIO::mpiIO;
else
ConfigurableAnalysisError(<< "invalid mode \"" << val << "\"");
}
int period = 1;
if (node.attribute("period"))
......@@ -172,6 +178,7 @@ public:
this->Analyses.push_back(adapter);
adapter->Delete();
return 0;
}
};
......@@ -193,17 +200,23 @@ ConfigurableAnalysis::~ConfigurableAnalysis()
//----------------------------------------------------------------------------
bool ConfigurableAnalysis::Initialize(MPI_Comm world, const std::string& filename)
{
int rank;
MPI_Comm_rank(world, &rank);
pugi::xml_document doc;
pugi::xml_parse_result result = doc.load_file(filename.c_str());
if (!result)
{
cout << "XML [" << filename << "] parsed with errors, attr value: [" << doc.child("node").attribute("attr").value() << "]\n";
cout << "Error description: " << result.description() << "\n";
cout << "Error offset: " << result.offset << endl;
ConfigurableAnalysisError(
"XML [" << filename << "] parsed with errors, attr value: ["
<< doc.child("node").attribute("attr").value() << "]" << endl
<< "Error description: " << result.description() << endl
<< "Error offset: " << result.offset << endl)
return false;
}
pugi::xml_node sensei = doc.child("sensei");
for (pugi::xml_node analysis = sensei.child("analysis"); analysis; analysis = analysis.next_sibling("analysis"))
for (pugi::xml_node analysis = sensei.child("analysis");
analysis; analysis = analysis.next_sibling("analysis"))
{
std::string type = analysis.attribute("type").value();
#ifdef ENABLE_HISTOGRAM
......@@ -233,13 +246,12 @@ bool ConfigurableAnalysis::Initialize(MPI_Comm world, const std::string& filenam
continue;
}
if (type == "PosthocIO")
{
this->Internals->AddPosthocIO(world, analysis);
if ((type == "PosthocIO") &&
!this->Internals->AddPosthocIO(world, analysis))
continue;
}
std::cerr << "Skipping '" << type.c_str() << "'." << std::endl;
if (rank == 0)
std::cerr << "Skipping '" << type.c_str() << "'." << std::endl;
}
return true;
}
......
......@@ -20,18 +20,24 @@
#include <ArrayIO.h>
#if defined(ENABLE_VTK_XMLP)
#include <vtkAlgorithm.h>
#include <vtkCompositeDataPipeline.h>
#include <vtkMultiProcessController.h>
#include <vtkMPI.h>
#include <vtkMPIController.h>
#include <vtkXMLPMultiBlockDataWriter.h>
#endif
//#define PosthocIO_DEBUG
#define posthocIO_error(_arg) \
#define PosthocIOError(_arg) \
cerr << "ERROR: " << __FILE__ " : " << __LINE__ << std::endl \
<< "" _arg << std::endl;
#define posthocIO_status(_cond, _arg) \
if (_cond) \
{ \
cerr << "STATUS: " << __FILE__ " : " << __LINE__ << std::endl \
<< "" _arg << std::endl; \
}
cerr << "" _arg << std::endl;
namespace {
// **************************************************************************
......@@ -100,12 +106,12 @@ int write(MPI_File file, MPI_Info hints,
arrayIO::write(file, hints, domain, decomp,
valid, ta->GetPointer(0)))
{
posthocIO_error("write failed");
PosthocIOError("write failed");
return -1;
}
);
default:
posthocIO_error("Unhandled data type");
PosthocIOError("Unhandled data type");
return -1;
}
return 0;
......@@ -118,13 +124,23 @@ namespace sensei
vtkStandardNewMacro(PosthocIO);
//-----------------------------------------------------------------------------
PosthocIO::PosthocIO() : Comm(MPI_COMM_WORLD), CommRank(0),
PosthocIO::PosthocIO() : Comm(MPI_COMM_WORLD), CommRank(0), CommSize(1),
OutputDir("./"), HeaderFile("ImageHeader"), BlockExt(".sensei"),
HaveHeader(true), Mode(MpiIO), Period(1) {}
HaveHeader(true), Mode(mpiIO), Period(1) {}
//-----------------------------------------------------------------------------
PosthocIO::~PosthocIO()
{}
{
#if defined(ENABLE_VTK_XMLP)
// teardown for parallel vtk I/O
vtkMultiProcessController *mpc =
vtkMultiProcessController::GetGlobalController();
mpc->Finalize(1);
mpc->Delete();
vtkMultiProcessController::SetGlobalController(nullptr);
vtkAlgorithm::SetDefaultExecutivePrototype(nullptr);
#endif
}
//-----------------------------------------------------------------------------
void PosthocIO::Initialize(MPI_Comm comm,
......@@ -136,8 +152,8 @@ void PosthocIO::Initialize(MPI_Comm comm,
posthocIO_status((this->CommRank==0), "PosthocIO::Initialize");
#endif
this->Comm = comm;
this->CommRank = 0;
MPI_Comm_rank(this->Comm, &this->CommRank);
MPI_Comm_size(this->Comm, &this->CommSize);
this->OutputDir = outputDir;
this->HeaderFile = headerFile;
this->BlockExt = blockExt;
......@@ -146,39 +162,106 @@ void PosthocIO::Initialize(MPI_Comm comm,
this->HaveHeader = (this->CommRank==0 ? false : true);
this->Mode = mode;
this->Period = period;
#if defined(ENABLE_VTK_XMLP)
// setup for parallel vtk i/o
vtkMPICommunicator* vtkComm = vtkMPICommunicator::New();
vtkMPICommunicatorOpaqueComm h(&comm);
vtkComm->InitializeExternal(&h);
vtkMPIController *con = vtkMPIController::New();
con->SetCommunicator(vtkComm);
vtkComm->Delete();
vtkMultiProcessController::SetGlobalController(con);
vtkCompositeDataPipeline* cexec = vtkCompositeDataPipeline::New();
vtkAlgorithm::SetDefaultExecutivePrototype(cexec);
cexec->Delete();
#endif
}
//-----------------------------------------------------------------------------
int PosthocIO::WriteBOVHeader(const std::string &fileName,
const std::vector<std::string> &arrays, const int *wholeExtent)
bool PosthocIO::Execute(sensei::DataAdaptor* data)
{
std::ofstream ff(fileName, std::ofstream::out);
if (!ff.good())
#ifdef PosthocIO_DEBUG
posthocIO_status((this->CommRank==0), "PosthocIO::Execute");
#endif
// validate the input dataset.
// TODO:for now we need composite data, to support non-composite
// data we will wrap it in a composite dataset.
vtkCompositeDataSet* cd =
dynamic_cast<vtkCompositeDataSet*>(data->GetMesh(false));
if (!cd)
{
posthocIO_error("Failed to write the header file \"" << fileName << "\"")
return -1;
PosthocIOError("unsupported dataset type")
return false;
}
int dims[3] = {wholeExtent[1] - wholeExtent[0] + 1,
wholeExtent[3] - wholeExtent[2] + 1,
wholeExtent[5] - wholeExtent[4] + 1};
// we need whole extents
vtkInformation *info = data->GetInformation();
if (!info->Has(vtkDataObject::DATA_EXTENT()))
{
PosthocIOError("missing vtkDataObject::DATA_EXTENT");
return false;
}
ff << "# SciberQuest MPI-IO BOV Reader" << std::endl
<< "nx=" << dims[0] << ", ny=" << dims[1] << ", nz=" << dims[2] << std::endl
<< "ext=" << this->BlockExt << std::endl
<< "dtype=f32" << std::endl
<< std::endl;
// grab the current time step
int timeStep = data->GetDataTimeStep();
size_t n = arrays.size();
for (size_t i = 0; i < n; ++i)
ff << "scalar:" << arrays[i] << std::endl;
// option to reduce the amount of data written
if (timeStep%this->Period)
return true;
ff << std::endl;
ff.close();
// dispatch the write
switch (this->Mode)
{
case mpiIO:
this->WriteBOVHeader(info);
this->WriteBOV(cd, info, timeStep);
break;
case vtkXmlP:
this->WriteXMLP(cd, info, timeStep);
break;
default:
PosthocIOError("invalid mode \"" << this->Mode << "\"")
return false;
}
#ifdef posthocIO_DEBUG
return true;
}
//-----------------------------------------------------------------------------
int PosthocIO::WriteXMLP(vtkCompositeDataSet *cd,
vtkInformation *info, int timeStep)
{
#if defined(ENABLE_VTK_XMLP)
(void)info;
std::ostringstream oss;
oss << this->OutputDir << "/" << this->HeaderFile
<< "_" << timeStep << ".vtmb";
vtkXMLPMultiBlockDataWriter *writer = vtkXMLPMultiBlockDataWriter::New();
writer->SetInputData(cd);
writer->SetDataModeToAppended();
writer->EncodeAppendedDataOff();
writer->SetCompressorTypeToNone();
writer->SetFileName(oss.str().c_str());
writer->Write();
writer->Delete();
#ifdef PosthocIO_DEBUG
posthocIO_status((this->CommRank==0),
"wrote BOV header \"" << fileName << "\"");
"PosthocIO::WriteXMLP \"" << oss.str() << "\"");
#endif
#else
(void)cd;
(void)info;
(void)timeStep;
PosthocIOError("built without vtk xmlp writer")
return -1;
#endif
return 0;
}
......@@ -220,11 +303,36 @@ int PosthocIO::WriteBOVHeader(vtkInformation *info)
}
//-----------------------------------------------------------------------------
int PosthocIO::WriteXMLP(vtkCompositeDataSet *cd,
vtkInformation *info, int timeStep)
int PosthocIO::WriteBOVHeader(const std::string &fileName,
const std::vector<std::string> &arrays, const int *wholeExtent)
{
#ifdef PosthocIO_DEBUG
posthocIO_status((this->CommRank==0), "PosthocIO::WriteXMLP");
std::ofstream ff(fileName, std::ofstream::out);
if (!ff.good())
{
PosthocIOError("Failed to write the header file \"" << fileName << "\"")
return -1;
}
int dims[3] = {wholeExtent[1] - wholeExtent[0] + 1,
wholeExtent[3] - wholeExtent[2] + 1,
wholeExtent[5] - wholeExtent[4] + 1};
ff << "# SciberQuest MPI-IO BOV Reader" << std::endl
<< "nx=" << dims[0] << ", ny=" << dims[1] << ", nz=" << dims[2] << std::endl
<< "ext=" << this->BlockExt << std::endl
<< "dtype=f32" << std::endl
<< std::endl;
size_t n = arrays.size();
for (size_t i = 0; i < n; ++i)
ff << "scalar:" << arrays[i] << std::endl;
ff << std::endl;
ff.close();
#ifdef posthocIO_DEBUG
posthocIO_status((this->CommRank==0),
"wrote BOV header \"" << fileName << "\"");
#endif
return 0;
}
......@@ -258,7 +366,7 @@ int PosthocIO::WriteBOV(vtkCompositeDataSet *cd,
MPI_File fh;
if (arrayIO::open(this->Comm, fileName.c_str(), MPI_INFO_NULL, fh))
{
posthocIO_error("Open failed \"" << fileName);
PosthocIOError("Open failed \"" << fileName);
return -1;
}
......@@ -320,7 +428,7 @@ int PosthocIO::WriteBOV(vtkCompositeDataSet *cd,
if (!id)
{
posthocIO_error("input not an image.");
PosthocIOError("input not an image.");
continue;
}
......@@ -346,7 +454,7 @@ int PosthocIO::WriteBOV(vtkCompositeDataSet *cd,
vtkDataArray *da = atts->GetArray(arrayName.c_str());
if (!da)
{
posthocIO_error("no array named \"" << arrayName << "\"");
PosthocIOError("no array named \"" << arrayName << "\"");
atts->Print(cerr);
continue;
}
......@@ -355,7 +463,7 @@ int PosthocIO::WriteBOV(vtkCompositeDataSet *cd,
if (::write(fh, MPI_INFO_NULL, wholeExt, localExt,
validExt, da, useCollectives))
{
posthocIO_error("write failed \"" << fileName)
PosthocIOError("write failed \"" << fileName)
return -1;
}
}
......@@ -366,55 +474,5 @@ int PosthocIO::WriteBOV(vtkCompositeDataSet *cd,
return 0;
}
//-----------------------------------------------------------------------------
bool PosthocIO::Execute(sensei::DataAdaptor* data)
{
#ifdef PosthocIO_DEBUG
posthocIO_status((this->CommRank==0), "PosthocIO::Execute");
#endif
// validate the input dataset.
// TODO:for now we need composite data, to support non-composite
// data we will wrap it in a composite dataset.
vtkCompositeDataSet* cd =
dynamic_cast<vtkCompositeDataSet*>(data->GetMesh(false));
if (!cd)
{
posthocIO_error("unsupported dataset type")
return false;
}
// we need whole extents
vtkInformation *info = data->GetInformation();
if (!info->Has(vtkDataObject::DATA_EXTENT()))
{
posthocIO_error("missing vtkDataObject::DATA_EXTENT");
return false;
}
// grab the current time step
int timeStep = data->GetDataTimeStep();
// option to reduce the amount of data written
if (timeStep%this->Period)
return true;
// dispatch the write
switch (this->Mode)
{
case MpiIO:
this->WriteBOVHeader(info);
this->WriteBOV(cd, info, timeStep);
break;
case FilePerProc:
this->WriteXMLP(cd, info, timeStep);
break;
default:
posthocIO_error("invalid mode \"" << this->Mode << "\"")
return false;
}
return true;
}
}
......@@ -23,7 +23,7 @@ public:
vtkTypeMacro(PosthocIO, AnalysisAdaptor);
// modes.
enum {MpiIO=1, FilePerProc=2};
enum {mpiIO=1, vtkXmlP=2};
void Initialize(MPI_Comm comm, const std::string &outputDir,
const std::string &headerFile, const std::string &blockExt,
......@@ -52,6 +52,7 @@ private:
private:
MPI_Comm Comm;
int CommRank;
int CommSize;
std::string OutputDir;
std::string HeaderFile;
std::string BlockExt;
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment