Commit eeccd0b6 authored by Burlen Loring's avatar Burlen Loring

posthocIO analysis adapter

I added an adapter to perform I/O for conventional
post process analysis. It currently supports MPI-I/O
collective buffering.
parent 05bc16a3
<sensei>
<analysis type="PosthocIO" array="data" association="cell" enabled="0" period="2" />
<analysis type="histogram" array="data" association="cell" bins="10" />
<analysis type="autocorrelation" array="data" association="cell" window="10" k-max="3" />
<analysis type="catalyst" pipeline="slice" array="data" association="cell" />
......
......@@ -174,7 +174,7 @@ void analysis_final(size_t k_max, size_t nblocks)
std::cout << ' ' << result[i];
std::cout << std::endl;
}
master->foreach<Autocorrelation>([](Autocorrelation* b, const diy::Master::ProxyWithLink& cp, void*)
master->foreach<Autocorrelation>([](Autocorrelation*, const diy::Master::ProxyWithLink& cp, void*)
{
cp.collectives()->clear();
});
......@@ -183,10 +183,10 @@ void analysis_final(size_t k_max, size_t nblocks)
diy::ContiguousAssigner assigner(master->communicator().size(), nblocks); // NB: this is coupled to main(...) in oscillator.cpp
diy::RegularMergePartners partners(3, nblocks, 2, true);
diy::reduce(*master, assigner, partners,
[k_max](void* b_, const diy::ReduceProxy& rp, const diy::RegularMergePartners& partners)
[k_max](void* b_, const diy::ReduceProxy& rp, const diy::RegularMergePartners&)
{
Autocorrelation* b = static_cast<Autocorrelation*>(b_);
unsigned round = rp.round(); // current round number
//unsigned round = rp.round(); // current round number
using MaxHeapVector = std::vector<std::vector<std::tuple<float,Vertex>>>;
using Compare = std::greater<std::tuple<float,Vertex>>;
......@@ -211,7 +211,7 @@ void analysis_final(size_t k_max, size_t nblocks)
});
} else
{
for (size_t i = 0; i < rp.in_link().size(); ++i)
for (long i = 0; i < rp.in_link().size(); ++i)
{
MaxHeapVector in_heaps;
rp.dequeue(rp.in_link().target(i).gid, in_heaps);
......
......@@ -24,9 +24,11 @@ void initialize(MPI_Comm world,
int* to_x, int* to_y, int* to_z,
const std::string& config_file)
{
(void)window;
GlobalDataAdaptor = vtkSmartPointer<oscillators::DataAdaptor>::New();
GlobalDataAdaptor->Initialize(nblocks);
GlobalDataAdaptor->SetDataTimeStep(-1);
for (size_t cc=0; cc < n_local_blocks; ++cc)
{
GlobalDataAdaptor->SetBlockExtent(gid[cc],
......@@ -35,6 +37,9 @@ void initialize(MPI_Comm world,
from_z[cc], to_z[cc]);
}
int dext[6] = {0, domain_shape_x, 0, domain_shape_y, 0, domain_shape_z};
GlobalDataAdaptor->SetDataExtent(dext);
GlobalAnalysisAdaptor = vtkSmartPointer<sensei::ConfigurableAnalysis>::New();
GlobalAnalysisAdaptor->Initialize(world, config_file);
}
......@@ -57,6 +62,8 @@ void analyze(float time)
//-----------------------------------------------------------------------------
void finalize(size_t k_max, size_t nblocks)
{
(void)k_max;
(void)nblocks;
GlobalAnalysisAdaptor = NULL;
GlobalDataAdaptor = NULL;
}
......
#include "dataadaptor.h"
#include <vtkInformation.h>
#include <vtkFloatArray.h>
#include <vtkImageData.h>
#include <vtkMultiBlockDataSet.h>
......@@ -12,13 +13,13 @@
namespace oscillators
{
class DataAdaptor::DInternals
struct DataAdaptor::DInternals
{
public:
std::vector<diy::DiscreteBounds> CellExtents;
std::vector<float*> Data;
vtkSmartPointer<vtkMultiBlockDataSet> Mesh;
std::vector<vtkSmartPointer<vtkImageData> > BlockMesh;
std::vector<int> DataExtent;
};
inline bool areBoundsValid(const diy::DiscreteBounds& bds)
......@@ -74,6 +75,15 @@ void DataAdaptor::SetBlockExtent(int gid,
internals.CellExtents[gid].max[2] = zmax;
}
//-----------------------------------------------------------------------------
void DataAdaptor::SetDataExtent(int ext[6])
{
// TODO -- this key holds a int**, it should copy the data
this->Internals->DataExtent.assign(ext, ext+6);
this->GetInformation()->Set(vtkDataObject::DATA_EXTENT(),
this->Internals->DataExtent.data(), 6);
}
//-----------------------------------------------------------------------------
void DataAdaptor::SetBlockData(int gid, float* data)
{
......@@ -94,6 +104,8 @@ vtkDataObject* DataAdaptor::GetMesh(bool vtkNotUsed(structure_only))
internals.Mesh->SetBlock(static_cast<unsigned int>(cc), this->GetBlockMesh(cc));
}
}
this->AddArray(internals.Mesh,
vtkDataObject::FIELD_ASSOCIATION_CELLS, "data");
return internals.Mesh;
}
......@@ -117,13 +129,11 @@ vtkDataObject* DataAdaptor::GetBlockMesh(int gid)
//-----------------------------------------------------------------------------
bool DataAdaptor::AddArray(vtkDataObject* mesh, int association, const char* arrayname)
{
#ifndef NDEBUG
if (association != vtkDataObject::FIELD_ASSOCIATION_CELLS ||
arrayname == NULL ||
strcmp(arrayname, "data") != 0)
{
arrayname == NULL || strcmp(arrayname, "data") != 0)
return false;
}
#endif
bool retVal = false;
DInternals& internals = (*this->Internals);
vtkMultiBlockDataSet* md = vtkMultiBlockDataSet::SafeDownCast(mesh);
......
......@@ -21,13 +21,16 @@ public:
void SetBlockExtent(int gid,
int xmin, int xmax, int ymin, int ymax, int zmin, int zmax);
/// @brief Set the extent of the dataset.
void SetDataExtent(int ext[6]);
/// Set data for a specific block.
void SetBlockData(int gid, float* data);
virtual vtkDataObject* GetMesh(bool structure_only=false);
virtual bool AddArray(vtkDataObject* mesh, int association, const char* arrayname);
virtual unsigned int GetNumberOfArrays(int association) { return 1; }
virtual const char* GetArrayName(int association, unsigned int index)
virtual unsigned int GetNumberOfArrays(int) { return 1; }
virtual const char* GetArrayName(int, unsigned int index)
{ return index==0? "data" : NULL; }
virtual void ReleaseData();
......
......@@ -138,7 +138,7 @@ int main(int argc, char** argv)
// decompose the domain
diy::decompose(3, world.rank(), domain, assigner,
[&](int gid, const Bounds& core, const Bounds& bounds, const Bounds& domain, const Link& link)
[&](int gid, const Bounds&, const Bounds& bounds, const Bounds& domain, const Link& link)
{
auto b = new Block(gid, bounds, { domain.max[0] + 1, domain.max[1] + 1, domain.max[2] + 1 }, oscillators);
master.add(gid, b, new Link(link));
......
......@@ -28,6 +28,12 @@ void DataAdaptor::Initialize(
int tot_blocks_x, int tot_blocks_y, int tot_blocks_z,
int block_id_x, int block_id_y, int block_id_z)
{
(void)tot_blocks_x;
(void)tot_blocks_y;
(void)tot_blocks_z;
(void)block_id_x;
(void)block_id_y;
(void)block_id_z;
// we only really need to save the local extents for our current example. So
// we'll just save that.
this->CellExtent[0] = start_extents_x;
......
......@@ -296,7 +296,7 @@ void Autocorrelation::PrintResults(size_t k_max)
}
internals.Master->foreach<AutocorrelationImpl>(
[](AutocorrelationImpl* b, const diy::Master::ProxyWithLink& cp, void*)
[](AutocorrelationImpl*, const diy::Master::ProxyWithLink& cp, void*)
{
cp.collectives()->clear();
});
......@@ -305,10 +305,10 @@ void Autocorrelation::PrintResults(size_t k_max)
diy::ContiguousAssigner assigner(internals.Master->communicator().size(), nblocks); // NB: this is coupled to main(...) in oscillator.cpp
diy::RegularMergePartners partners(3, nblocks, 2, true);
diy::reduce(*internals.Master, assigner, partners,
[k_max](void* b_, const diy::ReduceProxy& rp, const diy::RegularMergePartners& partners)
[k_max](void* b_, const diy::ReduceProxy& rp, const diy::RegularMergePartners&)
{
AutocorrelationImpl* b = static_cast<AutocorrelationImpl*>(b_);
unsigned round = rp.round(); // current round number
AutocorrelationImpl* b = static_cast<AutocorrelationImpl*>(b_);
//unsigned round = rp.round(); // current round number
using MaxHeapVector = std::vector<std::vector<std::tuple<float,Vertex>>>;
using Compare = std::greater<std::tuple<float,Vertex>>;
......@@ -333,7 +333,7 @@ void Autocorrelation::PrintResults(size_t k_max)
});
} else
{
for (size_t i = 0; i < rp.in_link().size(); ++i)
for (long i = 0; i < rp.in_link().size(); ++i)
{
MaxHeapVector in_heaps;
rp.dequeue(rp.in_link().target(i).gid, in_heaps);
......
......@@ -8,7 +8,9 @@ set (sources
ConfigurableAnalysis.h
DataAdaptor.cxx
DataAdaptor.h
)
PosthocIO.cxx
PosthocIO.h
)
if(VTK_HAS_GENERIC_ARRAYS)
list(APPEND sources
......@@ -67,11 +69,12 @@ if(ENABLE_ADIOS)
endif()
target_link_libraries(sensei
PUBLIC ${VTK_LIBRARIES}
PUBLIC ${VTK_LIBRARIES}
mpi
PRIVATE diy
grid
pugixml
ArrayIO
)
#------------------------------------------------------------------------------
......
......@@ -6,6 +6,7 @@
#include <vtkDataObject.h>
#include "Autocorrelation.h"
#include "PosthocIO.h"
#ifdef ENABLE_HISTOGRAM
# include "Histogram.h"
#endif
......@@ -19,6 +20,11 @@
#include <vector>
#include <pugixml.hpp>
#define sensei_error(_arg) \
cerr << "ERROR: " << __FILE__ " : " << __LINE__ << std::endl \
<< "" _arg << std::endl;
namespace sensei
{
......@@ -35,7 +41,7 @@ class ConfigurableAnalysis::vtkInternals
{
return vtkDataObject::FIELD_ASSOCIATION_CELLS;
}
cout << "Invalid association type '" << association.c_str() << "'. Assuming 'point'" << endl;
sensei_error(<< "Invalid association type '" << association.c_str() << "'. Assuming 'point'");
return vtkDataObject::FIELD_ASSOCIATION_POINTS;
}
......@@ -57,7 +63,7 @@ public:
}
else
{
cerr << "'histogram' missing required attribute 'array'. Skipping." << endl;
sensei_error(<< "'histogram' missing required attribute 'array'. Skipping.");
}
}
#endif
......@@ -113,9 +119,65 @@ public:
node.attribute("k-max")? node.attribute("k-max").as_int() : 3);
this->Analyses.push_back(adaptor.GetPointer());
}
void AddPosthocIO(MPI_Comm comm, pugi::xml_node node)
{
if (!node.attribute("enabled") || !node.attribute("enabled").as_int())
{
cerr << "Skipping 'PosthocIO'." << endl;
return;
}
if (!node.attribute("array"))
{
sensei_error(<< "need at least one array");
return;
}
std::string arrayName = node.attribute("array").value();
std::vector<std::string> cellArrays;
std::vector<std::string> pointArrays;
pugi::xml_attribute assoc_att = node.attribute("association");
if (assoc_att && (std::string(assoc_att.value()) == "cell"))
cellArrays.push_back(arrayName);
else
pointArrays.push_back(arrayName);
std::string outputDir = "./";
if (node.attribute("output_dir"))
outputDir = node.attribute("output_dir").value();
std::string fileBase = "PosthocIO";
if (node.attribute("file_base"))
fileBase = node.attribute("file_base").value();
std::string blockExt = "block";
if (node.attribute("block_ext"))
blockExt = node.attribute("block_ext").value();
int mode = PosthocIO::MpiIO;
if (node.attribute("mode"))
mode = node.attribute("mode").as_int();
int period = 1;
if (node.attribute("period"))
period = node.attribute("period").as_int();
PosthocIO *adapter = PosthocIO::New();
adapter->Initialize(comm, outputDir, fileBase,
blockExt, cellArrays, pointArrays, mode,
period);
this->Analyses.push_back(adapter);
adapter->Delete();
}
};
//----------------------------------------------------------------------------
vtkStandardNewMacro(ConfigurableAnalysis);
//----------------------------------------------------------------------------
ConfigurableAnalysis::ConfigurableAnalysis()
: Internals(new ConfigurableAnalysis::vtkInternals())
......@@ -170,8 +232,16 @@ bool ConfigurableAnalysis::Initialize(MPI_Comm world, const std::string& filenam
this->Internals->AddAutoCorrelation(world, analysis);
continue;
}
cerr << "Skipping '" << type.c_str() << "'." << endl;
if (type == "PosthocIO")
{
this->Internals->AddPosthocIO(world, analysis);
continue;
}
std::cerr << "Skipping '" << type.c_str() << "'." << std::endl;
}
return true;
}
//----------------------------------------------------------------------------
......
This diff is collapsed.
#ifndef sensei_PosthocIO_h
#define sensei_PosthocIO_h
#include "AnalysisAdaptor.h"
#include <mpi.h>
#include <vector>
#include <string>
class vtkInformation;
class vtkCompositeDataSet;
namespace sensei
{
/// @class PosthocIO
/// brief sensei::PosthocIO is a AnalysisAdaptor that writes
/// the data to disk for a posthoc analysis.
class PosthocIO : public AnalysisAdaptor
{
public:
static PosthocIO* New();
vtkTypeMacro(PosthocIO, AnalysisAdaptor);
// modes.
enum {MpiIO=1, FilePerProc=2};
void Initialize(MPI_Comm comm, const std::string &outputDir,
const std::string &headerFile, const std::string &blockExt,
const std::vector<std::string> &cellArrays,
const std::vector<std::string> &pointArrays, int mode,
int period);
bool Execute(sensei::DataAdaptor* data) override;
int WriteBOVHeader(vtkInformation *info);
protected:
PosthocIO();
~PosthocIO();
private:
int WriteBOVHeader(const std::string &fileName,
const std::vector<std::string> &arrays, const int *wholeExtent);
int WriteBOV(vtkCompositeDataSet *cd,
vtkInformation *info, int timeStep);
int WriteXMLP(vtkCompositeDataSet *cd,
vtkInformation *info, int timeStep);
private:
MPI_Comm Comm;
int CommRank;
std::string OutputDir;
std::string HeaderFile;
std::string BlockExt;
std::vector<std::string> CellArrays;
std::vector<std::string> PointArrays;
bool HaveHeader;
int Mode;
int Period;
private:
PosthocIO(const PosthocIO&);
void operator=(const PosthocIO&);
};
}
#endif
#include "ArrayIO.h"
#include <sstream>
using std::ostringstream;
namespace arrayIO
{
// ****************************************************************************
int open(
MPI_Comm comm, // MPI communicator handle
const char *fileName, // file name to write.
MPI_Info hints, // MPI file hints
MPI_File &file) // file handle
{
int iErr = 0;
#ifndef NDEBUG
int mpiOk = 0;
MPI_Initialized(&mpiOk);
if (!mpiOk)
{
std::cerr << "This class requires the MPI runtime" << std::endl;
return -1;
}
#endif
const int eStrLen = 2048;
char eStr[eStrLen] = {'\0'};
// Open the file
if ((iErr = MPI_File_open(comm, const_cast<char *>(fileName),
MPI_MODE_WRONLY|MPI_MODE_CREATE, hints, &file)))
{
MPI_Error_string(iErr, eStr, const_cast<int *>(&eStrLen));
std::cerr << "Error opeing file: " << fileName << std::endl
<< eStr << std::endl;
return -1;
}
return 0;
}
// ****************************************************************************
MPI_Info createHints(
int useCollectiveIO,
int numberOfIONodes,
int collectBufferSize,
int useDirectIO,
int useDeferredOpen,
int useDataSieving,
int sieveBufferSize,
int stripeCount,
int stripeSize)
{
MPI_Info hints;
int mpiOk;
MPI_Initialized(&mpiOk);
if (!mpiOk)
{
std::cerr << "This class requires the MPI runtime" << std::endl;
return hints;
}
MPI_Info_create(&hints);
switch (useCollectiveIO)
{
case HINT_AUTOMATIC:
// do nothing, it's up to implementation.
break;
case HINT_DISABLED:
MPI_Info_set(hints,"romio_cb_write","disable");
break;
case HINT_ENABLED:
MPI_Info_set(hints,"romio_cb_write","enable");
break;
default:
std::cerr << "Invalid value for UseCollectiveIO." << std::endl;
break;
}
if (numberOfIONodes > 0)
{
std::ostringstream os;
os << numberOfIONodes;
MPI_Info_set(hints,"cb_nodes",const_cast<char *>(os.str().c_str()));
}
if (collectBufferSize > 0)
{
std::ostringstream os;
os << collectBufferSize;
MPI_Info_set(hints,"cb_buffer_size",const_cast<char *>(os.str().c_str()));
//MPI_Info_set(hints,"striping_unit", const_cast<char *>(os.str().c_str()));
}
switch (useDirectIO)
{
case HINT_DEFAULT:
// do nothing, it's up to implementation.
break;
case HINT_DISABLED:
MPI_Info_set(hints,"direct_write","false");
break;
case HINT_ENABLED:
MPI_Info_set(hints,"direct_write","true");
break;
default:
std::cerr << "Invalid value for UseDirectIO." << std::endl;
break;
}
switch (useDeferredOpen)
{
case HINT_DEFAULT:
// do nothing, it's up to implementation.
break;
case HINT_DISABLED:
MPI_Info_set(hints,"romio_no_indep_rw","false");
break;
case HINT_ENABLED:
MPI_Info_set(hints,"romio_no_indep_rw","true");
break;
default:
std::cerr << "Invalid value for UseDeferredOpen." << std::endl;
break;
}
switch (useDataSieving)
{
case HINT_AUTOMATIC:
// do nothing, it's up to implementation.
break;
case HINT_DISABLED:
MPI_Info_set(hints,"romio_ds_write","disable");
break;
case HINT_ENABLED:
MPI_Info_set(hints,"romio_ds_write","enable");
break;
default:
std::cerr << "Invalid value for UseDataSieving." << std::endl;
break;
}
if (sieveBufferSize > 0)
{
std::ostringstream os;
os << sieveBufferSize;
MPI_Info_set(hints,"ind_rd_buffer_size", const_cast<char *>(os.str().c_str()));
}
if (stripeCount > 0)
{
std::ostringstream os;
os << stripeCount;
MPI_Info_set(hints,"striping_count", const_cast<char *>(os.str().c_str()));
}
if (stripeSize > 0)
{
std::ostringstream os;
os << stripeSize;
MPI_Info_set(hints,"striping_unit", const_cast<char *>(os.str().c_str()));
}
return hints;
}
}
#ifndef ArrayIO_h
#define ArrayIO_h
#include <mpi.h>
#define arrayIO_error(_arg) \
std::cerr << "ERROR: " << __FILE__ " : " << __LINE__ << std::endl \
<< "" _arg << std::endl;
namespace {
// *****************************************************************************
template <typename T>
size_t size(const T &ext)
{
size_t n = 1;
for (int i = 0; i < 3; ++i)
n *= ext[2*i+1] - ext[2*i] + 1;
return n;
}
// *****************************************************************************
template <typename T>
void size(const T &ext, int *n)
{
for (int i = 0; i < 3; ++i)
n[i] = ext[2*i+1] - ext[2*i] + 1;
}
// *****************************************************************************
template <typename T>
void start(const T &ext, int *id)
{
for (int i = 0; i < 3; ++i)
id[i] = ext[2*i];
}
// *****************************************************************************
template <typename T>
void offset(const T &dom, const T &subdom, int *id)
{
for (int i = 0; i < 3; ++i)
id[i] = dom[2*i] - subdom[2*i];
}
// *****************************************************************************
template <typename T>
bool equal(const T &l, const T &r)
{
for (int i = 0; i < 6; ++i)
{
if (l[i] != r[i])
return false;
}
return true;
}
// *****************************************************************************
template < typename T> struct mpi_tt;
template <> struct mpi_tt <float> { static MPI_Datatype Type(){ return MPI_FLOAT; } };
template <> struct mpi_tt <double> { static MPI_Datatype Type(){ return MPI_DOUBLE; } };
template <> struct mpi_tt <short int> { static MPI_Datatype Type(){ return MPI_SHORT; } };
template <> struct mpi_tt <unsigned short int> { static MPI_Datatype Type(){ return MPI_UNSIGNED_SHORT; } };
template <> struct mpi_tt <int> { static MPI_Datatype Type(){ return MPI_INT; } };
template <> struct mpi_tt <unsigned int> { static MPI_Datatype Type(){ return MPI_UNSIGNED; } };
template <> struct mpi_tt <long> { static MPI_Datatype Type(){ return MPI_LONG; } };
template <> struct mpi_tt <unsigned long> { static MPI_Datatype Type(){ return MPI_UNSIGNED_LONG; } };
template <> struct mpi_tt <long long> { static MPI_Datatype Type(){ return MPI_LONG_LONG; } };
template <> struct mpi_tt <unsigned long long> { static MPI_Datatype Type(){ return MPI_UNSIGNED_LONG_LONG; } };
template <> struct mpi_tt <signed char> { static MPI_Datatype Type(){ return MPI_CHAR; } };
template <> struct mpi_tt <char> { static MPI_Datatype Type(){ return MPI_CHAR; } };
template <> struct mpi_tt <unsigned char> { static MPI_Datatype Type(){ return MPI_UNSIGNED_CHAR; } };
}
namespace arrayIO
{
// ****************************************************************************
int open(
MPI_Comm comm, // MPI communicator handle
const char *fileName, // file name to open
MPI_Info hints, // MPI file hints
MPI_File &file); // file handle
// ****************************************************************************
inline
int close(MPI_File file)
{
MPI_File_close(&file);
return 0;
}
// ****************************************************************************
template < typename T>
int write(
MPI_File file, // File to write to.
MPI_Info hints, // MPI file hints
int domain[6], // entire region, dataset extents
int decomp[6], // local memory region, block extents with ghost zones
int valid[6], // region to write to disk
T *data) // pointer to a buffer to write to disk.
{
int iErr = 0;
// calculate block offsets and lengths
int domainDims[3];
::size(domain, domainDims);
int decompDims[3];
::size(decomp, decompDims);
int validDims[3];
::size(valid, validDims);
int validStart[3];
::start(valid, validStart);
int validOffset[3];
::offset(decomp, valid, validOffset);
// file view
MPI_Datatype nativeType = ::mpi_tt<T>::Type();
MPI_Datatype fileView;
if (MPI_Type_create_subarray(3, domainDims,
validDims, validStart, MPI_ORDER_FORTRAN,
nativeType, &fileView))
{
arrayIO_error(<< "MPI_Type_create_subarray failed.")
return -1;
}
if (MPI_Type_commit(&fileView))
{
arrayIO_error(<< "MPI_Type_commit failed.")
return -1;