Commit a7036ba9 authored by Burlen Loring's avatar Burlen Loring

ADIOS support for unstructured, polydata, image, and multiblock, multipiece

* add support for read(ADIOSDataAdaptor)/write(ADIOSAnalysisAdaptor)
  of unstructured, polydata, image and multiblock, multipiece via ADIOS.
  The previuous work for image data was refactored into the new
  hierarchical approach.

* The code that serializes and deserializes VTK data objec ts to and

* from ADIOS streams is encapsulated in classes in sensei::ADIOSSchema
  Schema -- base class defines common API
  DataObjectSchema -- serializes/deserializes at the highest level. This
                      is the user facing class that manages the low
                      level classes,
  DatasetSchema -- seriealizes/deserializes dataset level attributes and
                   manages the lower level classes.
  Extent3DSchema -- serializes/deserializes properties unique to VTK's
                    extent based datasets
  DatasetAttributesSchema -- serializes/deserializes attribute data arrays
                             templated on attribute type(POINT/CELL).
  CellsSchema -- serializes/deserializes cells
  PointsSchema -- serializes/deserializes points

* adds supprt for all array data type supported by VTK including vtkIdType
  and other integer types.

* Update the end point to better handle temporal information.

* Update the end point to detect and handle error conditions.
parent 565947b8
#include "ADIOSDataAdaptor.h"
#include "ConfigurableAnalysis.h"
#include "Timer.h"
#include "Error.h"
#include <opts/opts.h>
#include <mpi.h>
#include <iostream>
#include <vtkNew.h>
#include <vtkSmartPointer.h>
#include <vtkDataSet.h>
using DataAdaptorPtr = vtkSmartPointer<sensei::ADIOSDataAdaptor>;
using AnalysisAdaptorPtr = vtkSmartPointer<sensei::ConfigurableAnalysis>;
/*!
* This program is designed to be an endpoint component in a scientific
* workflow. It can read a data-stream using ADIOS-FLEXPATH. When enabled, this end point
......@@ -6,20 +23,12 @@
* Usage:
* <exec> input-stream-name
*/
#include <opts/opts.h>
#include <mpi.h>
#include <iostream>
#include <ADIOSDataAdaptor.h>
#include <ConfigurableAnalysis.h>
#include <Timer.h>
#include <vtkNew.h>
#include <vtkDataSet.h>
using std::cout;
using std::cerr;
using std::endl;
int main(int argc, char** argv)
int main(int argc, char **argv)
{
int rank, size;
MPI_Comm comm = MPI_COMM_WORLD;
......@@ -37,16 +46,23 @@ int main(int argc, char** argv)
bool log = ops >> opts::Present("log", "generate time and memory usage log");
bool shortlog = ops >> opts::Present("shortlog", "generate a summary time and memory usage log");
if (ops >> opts::Present('h', "help", "show help") ||
!(ops >> opts::PosOption(input)) ||
config_file.empty())
bool showHelp = ops >> opts::Present('h', "help", "show help");
bool haveInput = ops >> opts::PosOption(input);
if (!showHelp && !haveInput && (rank == 0))
SENSEI_ERROR("Missing ADIOS input stream")
if (!showHelp && config_file.empty() && (rank == 0))
SENSEI_ERROR("Missing XML analysis configuration")
if (showHelp || !haveInput || config_file.empty())
{
if (rank == 0)
{
cout << "Usage: " << argv[0] << "[OPTIONS] input-stream-name\n\n" << ops;
cerr << "Usage: " << argv[0] << "[OPTIONS] input-stream-name\n\n" << ops << endl;
}
MPI_Barrier(comm);
return 1;
MPI_Finalize();
return showHelp ? 0 : 1;
}
timer::SetLogging(log || shortlog);
......@@ -59,40 +75,68 @@ int main(int argc, char** argv)
readmethods["dimes"] = ADIOS_READ_METHOD_DIMES;
readmethods["flexpath"] = ADIOS_READ_METHOD_FLEXPATH;
vtkSmartPointer<sensei::ConfigurableAnalysis> analysis =
vtkSmartPointer<sensei::ConfigurableAnalysis>::New();
analysis->Initialize(comm, config_file);
SENSEI_STATUS("Opening: \"" << input.c_str() << "\" using method \""
<< readmethod.c_str() << "\"")
// open the ADIOS stream using the ADIOS adaptor
DataAdaptorPtr dataAdaptor = DataAdaptorPtr::New();
if (dataAdaptor->Open(comm, readmethods[readmethod], input))
{
SENSEI_ERROR("Failed to open \"" << input << "\"")
MPI_Abort(comm, 1);
}
// initlaize the analysis using the XML configurable adaptor
SENSEI_STATUS("Loading configurable analysis \"" << config_file << "\"")
cout << "Opening: '" << input.c_str() << "' using '" << readmethod.c_str() << "'" << endl;
vtkNew<sensei::ADIOSDataAdaptor> dataAdaptor;
dataAdaptor->Open(comm, readmethods[readmethod], input);
cout << "Done opening '" << input.c_str() << "'" << endl;
AnalysisAdaptorPtr analysisAdaptor = AnalysisAdaptorPtr::New();
analysisAdaptor->Initialize(comm, config_file);
int t_count = 0;
double t = 0.0;
// read from the ADIOS stream until all steps have been
// processed
unsigned int nSteps = 0;
do
{
timer::MarkStartTimeStep(t_count, t);
// gte the current simulation time and time step
long timeStep = dataAdaptor->GetDataTimeStep();
double time = dataAdaptor->GetDataTime();
nSteps += 1;
timer::MarkStartEvent("adios::advance");
// request reading of meta-data for this step.
dataAdaptor->ReadStep();
timer::MarkEndEvent("adios::advance");
timer::MarkStartTimeStep(timeStep, time);
timer::MarkStartEvent("adios::analysis");
analysis->Execute(dataAdaptor.GetPointer());
timer::MarkEndEvent("adios::analysis");
SENSEI_STATUS("Processing time step " << timeStep << " time " << time)
// execute the analysis
timer::MarkStartEvent("AnalysisAdaptor::Execute");
if (!analysisAdaptor->Execute(dataAdaptor.Get()))
{
SENSEI_ERROR("Execute failed")
MPI_Abort(comm, 1);
}
timer::MarkEndEvent("AnalysisAdaptor::Execute");
// let the data adaptor release the mesh and data from this
// time step
dataAdaptor->ReleaseData();
timer::MarkEndTimeStep();
}
while (dataAdaptor->Advance());
while (!dataAdaptor->Advance());
SENSEI_STATUS("Finished processing " << nSteps << " time steps")
timer::MarkStartEvent("adios::finalize");
analysis = NULL;
timer::MarkEndEvent("adios::finalize");
// close the ADIOS stream
dataAdaptor->Close();
// we must force these to be destroyed before mpi finalize
// some of the adaptors make MPI calls in the destructor
// noteabley Catalyst
dataAdaptor = nullptr;
analysisAdaptor = nullptr;
timer::PrintLog(std::cout, comm);
MPI_Finalize();
return 0;
}
......@@ -237,7 +237,7 @@ def points_to_polydata(ids,x,y,z,m,vx,vy,vz,fx,fy,fz):
cells.SetCells(nx, vtknp.numpy_to_vtk(cids, \
deep=1, array_type=vtk.VTK_ID_TYPE))
# scalars, id
vtkids = vtknp.numpy_to_vtk(ids, deep=1)
vtkids = vtknp.numpy_to_vtk(ids, 1, vtk.VTK_LONG)
vtkids.SetName('ids')
# mass
vtkm = vtknp.numpy_to_vtk(m, deep=1)
......
#include "ADIOSAnalysisAdaptor.h"
#include <DataAdaptor.h>
#include <Timer.h>
#include "ADIOSSchema.h"
#include "DataAdaptor.h"
#include "Timer.h"
#include "Error.h"
#include <vtkCellTypes.h>
#include <vtkCellData.h>
#include <vtkCompositeDataIterator.h>
#include <vtkCompositeDataSet.h>
#include <vtkDataSetAttributes.h>
#include <vtkDoubleArray.h>
#include <vtkFloatArray.h>
#include <vtkIntArray.h>
#include <vtkUnsignedIntArray.h>
#include <vtkLongArray.h>
#include <vtkUnsignedLongArray.h>
#include <vtkCharArray.h>
#include <vtkUnsignedCharArray.h>
#include <vtkIdTypeArray.h>
#include <vtkCellArray.h>
#include <vtkUnstructuredGrid.h>
#include <vtkPolyData.h>
#include <vtkImageData.h>
#include <vtkInformation.h>
#include <vtkObjectFactory.h>
......@@ -21,377 +34,100 @@
namespace sensei
{
namespace internals
{
// --------------------------------------------------------------------------
ADIOS_DATATYPES adiosType(vtkDataArray* da)
{
if (vtkFloatArray::SafeDownCast(da))
{
return adios_real;
}
if (vtkDoubleArray::SafeDownCast(da))
{
return adios_double;
}
// TODO:
abort();
}
// --------------------------------------------------------------------------
int64_t CountBlocks(vtkCompositeDataSet* cd, bool skip_null)
{
vtkSmartPointer<vtkCompositeDataIterator> iter;
iter.TakeReference(cd->NewIterator());
iter->SetSkipEmptyNodes(skip_null? 1 : 0);
int64_t count = 0;
for (iter->InitTraversal(); !iter->IsDoneWithTraversal(); iter->GoToNextItem())
{
count++;
}
return count;
}
// --------------------------------------------------------------------------
int64_t CountTotalBlocks(vtkDataObject* dobj)
{
vtkCompositeDataSet* cd = vtkCompositeDataSet::SafeDownCast(dobj);
return cd? CountBlocks(cd, false) : 1;
}
// --------------------------------------------------------------------------
int64_t CountLocalBlocks(vtkDataObject* dobj)
{
vtkCompositeDataSet* cd = vtkCompositeDataSet::SafeDownCast(dobj);
return cd? CountBlocks(cd, true) : 1;
}
// --------------------------------------------------------------------------
void define_attribute_data_vars(vtkDataSet* ds, int blockno, int association,
int64_t g_handle, const char* c_prefix, const char* ldims, const char* gdims,
const char* offsets, int64_t& databytes)
{
(void)blockno;
(void)databytes;
vtkDataSetAttributes* dsa = ds->GetAttributes(association);
std::string prefix(c_prefix);
for (unsigned int cc=0, max=dsa->GetNumberOfArrays(); cc<max;++cc)
{
if (vtkDataArray* da = dsa->GetArray(cc))
{
const char* aname = da->GetName();
adios_define_var(g_handle, (prefix + aname).c_str(), "",
adiosType(da), ldims, gdims, offsets);
}
}
}
// --------------------------------------------------------------------------
void write_attribute_data_vars(vtkDataSet* ds, int association,
int64_t io_handle, const char* c_prefix)
{
std::string prefix(c_prefix);
vtkDataSetAttributes* dsa = ds->GetAttributes(association);
for (unsigned int cc=0, max=dsa->GetNumberOfArrays(); cc<max; ++cc)
{
if (vtkDataArray* da = dsa->GetArray(cc))
{
const char* aname = da->GetName();
adios_write(io_handle, (prefix + aname).c_str(), da->GetVoidPointer(0));
}
}
}
// --------------------------------------------------------------------------
int64_t ComputeVariableLengthVarSize(vtkDataSet* data)
{
if (data)
{
return data->GetNumberOfCells()*data->GetCellData()->GetNumberOfArrays()*sizeof(double) +
data->GetNumberOfPoints()*data->GetPointData()->GetNumberOfArrays()*sizeof(double);
}
return 0;
}
// --------------------------------------------------------------------------
int64_t ComputeVariableLengthVarSize(vtkDataObject* data)
{
if (vtkCompositeDataSet* cd = vtkCompositeDataSet::SafeDownCast(data))
{
vtkSmartPointer<vtkCompositeDataIterator> iter;
iter.TakeReference(cd->NewIterator());
int64_t dsize = 0;
for (iter->InitTraversal(); !iter->IsDoneWithTraversal(); iter->GoToNextItem())
{
dsize += ComputeVariableLengthVarSize(vtkDataSet::SafeDownCast(iter->GetCurrentDataObject()));
}
return dsize;
}
else
{
return ComputeVariableLengthVarSize(vtkDataSet::SafeDownCast(data));
}
}
// --------------------------------------------------------------------------
void CountLocalElements(vtkDataSet* ds, int64_t* num_points, int64_t* num_cells)
{
*num_points += ds? ds->GetNumberOfPoints() : 0;
*num_cells += ds? ds->GetNumberOfCells() : 0;
}
// --------------------------------------------------------------------------
void CountLocalElements(vtkDataObject* dobj, int64_t* num_points, int64_t* num_cells)
{
if (vtkCompositeDataSet* cd = vtkCompositeDataSet::SafeDownCast(dobj))
{
vtkSmartPointer<vtkCompositeDataIterator> iter;
iter.TakeReference(cd->NewIterator());
for (iter->InitTraversal(); !iter->IsDoneWithTraversal(); iter->GoToNextItem())
{
CountLocalElements(vtkDataSet::SafeDownCast(iter->GetCurrentDataObject()), num_points, num_cells);
}
}
else
{
CountLocalElements(vtkDataSet::SafeDownCast(dobj), num_points, num_cells);
}
}
// --------------------------------------------------------------------------
vtkDataSet* GetRepresentativeBlock(vtkDataObject* dobj)
{
if (vtkCompositeDataSet* cd = vtkCompositeDataSet::SafeDownCast(dobj))
{
vtkSmartPointer<vtkCompositeDataIterator> iter;
iter.TakeReference(cd->NewIterator());
for (iter->InitTraversal(); !iter->IsDoneWithTraversal(); iter->GoToNextItem())
{
return vtkDataSet::SafeDownCast(iter->GetCurrentDataObject());
}
}
return vtkDataSet::SafeDownCast(dobj);
}
// --------------------------------------------------------------------------
void GetBlocks(vtkDataObject* dobj, std::vector<vtkDataSet*> &blocks)
{
if (vtkCompositeDataSet* cd = vtkCompositeDataSet::SafeDownCast(dobj))
{
vtkSmartPointer<vtkCompositeDataIterator> iter;
iter.TakeReference(cd->NewIterator());
for (iter->InitTraversal(); !iter->IsDoneWithTraversal(); iter->GoToNextItem())
{
if (vtkDataSet* ds = vtkDataSet::SafeDownCast(iter->GetCurrentDataObject()))
{
blocks.push_back(ds);
}
}
}
else if (vtkDataSet* ds = vtkDataSet::SafeDownCast(dobj))
{
blocks.push_back(ds);
}
}
}
//----------------------------------------------------------------------------
senseiNewMacro(ADIOSAnalysisAdaptor);
//----------------------------------------------------------------------------
ADIOSAnalysisAdaptor::ADIOSAnalysisAdaptor() :
Initialized(false),
FixedLengthVarSize(0)
ADIOSAnalysisAdaptor::ADIOSAnalysisAdaptor() : Comm(MPI_COMM_WORLD),
Schema(nullptr), Method("MPI"), FileName("sensei.bp")
{
this->SetMethod("MPI");
this->SetFileName("sensei.bp");
}
//----------------------------------------------------------------------------
ADIOSAnalysisAdaptor::~ADIOSAnalysisAdaptor()
{
if (this->Schema)
this->FinalizeADIOS();
delete Schema;
}
//----------------------------------------------------------------------------
bool ADIOSAnalysisAdaptor::Execute(DataAdaptor* data)
{
this->InitializeADIOS(data);
this->WriteTimestep(data);
timer::MarkEvent mark("ADIOSAnalysisAdaptor::Execute");
vtkDataObject* dobj = data->GetCompleteMesh();
unsigned long timeStep = data->GetDataTimeStep();
double time = data->GetDataTime();
this->InitializeADIOS(dobj);
this->WriteTimestep(timeStep, time, dobj);
return true;
}
//----------------------------------------------------------------------------
void ADIOSAnalysisAdaptor::InitializeADIOS(DataAdaptor* data)
void ADIOSAnalysisAdaptor::InitializeADIOS(vtkDataObject *dobj)
{
// Ideally, we'd develop code that can write any VTK data object to ADIOS
// using a schema that we can call VTK_ADIOS_SCHEMA. VTK and consequently
// ParaView, Catalyst, VisIt, Libsim all can then develop readers/writers that
// read/write this schema. Should this simply be the ADIOS Vis Schema? Maybe.
if (this->Initialized)
{
if (this->Schema)
return;
}
timer::MarkEvent mark("adios::intialize");
timer::MarkEvent mark("ADIOSAnalysisAdaptor::IntializeADIOS");
adios_init_noxml(MPI_COMM_WORLD);
// initialize adios
adios_init_noxml(this->Comm);
int64_t g_handle = 0;
#if ADIOS_VERSION_GE(1,11,0)
adios_set_max_buffer_size(500);
#else
adios_allocate_buffer(ADIOS_BUFFER_ALLOC_NOW, 500);
#endif
vtkDataObject* structure = data->GetMesh(/*structure_only*/ true);
int64_t local_blocks = internals::CountLocalBlocks(structure);
vtkDataSet* ds = internals::GetRepresentativeBlock(structure);
int64_t databytes = 0;
int64_t g_handle;
adios_declare_group(&g_handle, "sensei", "",
#if ADIOS_VERSION_GE(1,11,0)
static_cast<ADIOS_STATISTICS_FLAG>(adios_flag_yes)
static_cast<ADIOS_STATISTICS_FLAG>(adios_flag_yes));
#else
adios_flag_yes
adios_allocate_buffer(ADIOS_BUFFER_ALLOC_NOW, 500);
adios_declare_group(&g_handle, "sensei", "", adios_flag_yes);
#endif
);
adios_select_method(g_handle, this->Method.c_str(), "", "");
// API Help:
// adios_define_var(group_id, name, path, type, dimensions, global_dimensions, local_offsets)
// global data.
adios_define_var(g_handle, "rank", "", adios_integer, "", "", ""); databytes+= sizeof(int);
adios_define_var(g_handle, "mpi_size", "", adios_integer, "", "", ""); databytes += sizeof(int);
adios_define_var(g_handle, "data_type", "", adios_integer, "", "", ""); databytes += sizeof(int);
adios_define_var(g_handle, "version", "", adios_integer, "", "", ""); databytes += sizeof(int);
adios_define_var(g_handle, "extents", "", adios_integer, "6", "", ""); databytes += 6*sizeof(int);
adios_define_var(g_handle, "num_blocks", "", adios_unsigned_long, "", "", ""); databytes += sizeof(int64_t);
adios_define_var(g_handle, "num_points", "", adios_unsigned_long, "", "", ""); databytes += sizeof(int64_t);
adios_define_var(g_handle, "num_cells", "", adios_unsigned_long, "", "", ""); databytes += sizeof(int64_t);
adios_define_var (g_handle, "ntimestep", "", adios_unsigned_long, "", "", ""); databytes += sizeof(int64_t);
adios_define_var (g_handle, "time", "", adios_double, "", "", ""); databytes += sizeof(double);
// per block data.
for (int64_t block=0; block < local_blocks; ++block)
{
adios_define_var(g_handle, "block/origin", "", adios_double, "3", "", ""); databytes += 3*sizeof(double);
adios_define_var(g_handle, "block/spacing", "", adios_double, "3", "", ""); databytes += 3*sizeof(double);
adios_define_var(g_handle, "block/extents", "", adios_integer, "6", "", ""); databytes += 6*sizeof(double);
adios_define_var(g_handle, "block/num_points", "", adios_unsigned_long, "", "", ""); databytes += 6*sizeof(int64_t);
adios_define_var(g_handle, "block/num_cells", "", adios_unsigned_long, "", "", ""); databytes += 6*sizeof(int64_t);
adios_define_var(g_handle, "block/offset_points", "", adios_unsigned_long, "", "", ""); databytes += 6*sizeof(int64_t);
adios_define_var(g_handle, "block/offset_cells", "", adios_unsigned_long, "", "", ""); databytes += 6*sizeof(int64_t);
internals::define_attribute_data_vars(ds, block, vtkDataObject::POINT, g_handle,
"block/pointdata/", "block/num_points", "num_points", "block/offset_points", databytes);
internals::define_attribute_data_vars(ds, block, vtkDataObject::CELL, g_handle,
"block/celldata/", "block/num_cells", "num_cells", "block/offset_cells", databytes);
}
this->FixedLengthVarSize = databytes;
this->Initialized=true;
// define ADIOS variables
this->Schema = new senseiADIOS::DataObjectSchema;
this->Schema->DefineVariables(this->Comm, g_handle, dobj);
}
//----------------------------------------------------------------------------
void ADIOSAnalysisAdaptor::WriteTimestep(sensei::DataAdaptor* data)
void ADIOSAnalysisAdaptor::FinalizeADIOS()
{
timer::MarkEvent mark("adios::execute");
vtkDataObject* mesh = data->GetCompleteMesh();
int nprocs = 1, rank = 0;
MPI_Comm_size(MPI_COMM_WORLD, &nprocs);
timer::MarkEvent mark("ADIOSAnalysisAdaptor::FinalizeADIOS");
int rank = 0;
MPI_Comm_rank(MPI_COMM_WORLD, &rank);
adios_finalize(rank);
}
int64_t io_handle;
adios_open(&io_handle, "sensei", this->FileName.c_str(),
data->GetDataTimeStep() == 0? "w" : "a", MPI_COMM_WORLD);
uint64_t total_size;
adios_group_size(io_handle,
internals::ComputeVariableLengthVarSize(mesh) + this->FixedLengthVarSize,
&total_size);
int64_t local_blocks = internals::CountLocalBlocks(mesh);
int64_t total_blocks = internals::CountTotalBlocks(mesh);
vtkDataSet* ds = internals::GetRepresentativeBlock(mesh);
//----------------------------------------------------------------------------
void ADIOSAnalysisAdaptor::WriteTimestep(unsigned long timeStep,
double time, vtkDataObject *dobj)
{
timer::MarkEvent mark("ADIOSAnalysisAdaptor::WriteTimestep");
// write global data.
adios_write(io_handle, "rank", &rank);
adios_write(io_handle, "mpi_size", &nprocs);
adios_write(io_handle, "num_blocks", &total_blocks);
int data_type = ds->GetDataObjectType();
adios_write(io_handle, "data_type", &data_type);
int version = 1;
adios_write(io_handle, "version", &version);
int64_t handle = 0;
int extents[6] = {0, -1, 0, -1, 0, -1};
data->GetInformation()->Get(vtkDataObject::DATA_EXTENT(), extents);
adios_write(io_handle, "extents", extents);
adios_open(&handle, "sensei", this->FileName.c_str(),
timeStep == 0 ? "w" : "a", this->Comm);
int64_t local_num_points=0, local_num_cells=0;
internals::CountLocalElements(mesh, &local_num_points, &local_num_cells);
uint64_t group_size = this->Schema->GetSize(dobj);
adios_group_size(handle, group_size, &group_size);
// compute inclusive scan to determine process offsets.
int64_t global_offset_points=0, global_offset_cells=0, num_points=0, num_cells=0;
if (this->Schema->Write(this->Comm, handle, dobj) ||
this->Schema->WriteTimeStep(this->Comm, handle, timeStep, time))
{
int64_t local_counts[] = { local_num_points, local_num_cells };
int64_t global_offsets[2];
MPI_Scan(&local_counts, &global_offsets, 2, MPI_UNSIGNED_LONG_LONG, MPI_SUM, MPI_COMM_WORLD);
int64_t totals[2] = {global_offsets[0], global_offsets[1]};
MPI_Bcast(totals, 2, MPI_UNSIGNED_LONG_LONG, (nprocs-1), MPI_COMM_WORLD);
global_offset_points = global_offsets[0] - local_counts[0];
global_offset_cells = global_offsets[1] - local_counts[1];
num_points = totals[0];
num_cells = totals[1];
SENSEI_ERROR("Failed to write step " << timeStep
<< " to \"" << this->FileName << "\"")
return;
}
adios_write(io_handle, "num_points", &num_points);
adios_write(io_handle, "num_cells", &num_cells);
int timestep = data->GetDataTimeStep();
adios_write(io_handle, "ntimestep", &timestep);
double time = data->GetDataTime();
adios_write(io_handle, "time", &time);
// write blocks.
std::vector<vtkDataSet*> blocks;
internals::GetBlocks(mesh, blocks);
assert(blocks.size() == static_cast<size_t>(local_blocks));
for (int64_t cc=0; cc < local_blocks; ++cc)
{
vtkDataSet* block = blocks[cc];
double origin[3] = {0, 0, 0};
double spacing[3] = {1, 1, 1};
int lextents[6] = {0, -1, 0, -1, 0, -1};