Commit d150caa8 authored by Junmin Gu's avatar Junmin Gu

used insitu-dataadptor on hdf5 data adaptor. + clang-format

parent 70a0dc7d
#include "HDF5DataAdaptor.h"
#include "ConfigurableAnalysis.h"
#include "Timer.h"
#include "Error.h"
#include "HDF5DataAdaptor.h"
#include "Timer.h"
#include <opts/opts.h>
#include <mpi.h>
#include <iostream>
#include <mpi.h>
#include <vtkDataSet.h>
#include <vtkNew.h>
#include <vtkSmartPointer.h>
#include <vtkDataSet.h>
using DataAdaptorPtr = vtkSmartPointer<sensei::HDF5DataAdaptor>;
using AnalysisAdaptorPtr = vtkSmartPointer<sensei::ConfigurableAnalysis>;
/*!
* This program is designed to be an endpoint component in a scientific
* workflow. It can read a data-stream using HDF5. When enabled, this end point
......@@ -28,11 +27,10 @@ using std::cout;
using std::cerr;
using std::endl;
int main(int argc, char **argv)
{
int main(int argc, char **argv) {
int rank, size;
MPI_Comm comm = MPI_COMM_WORLD;
MPI_Init (&argc, &argv);
MPI_Init(&argc, &argv);
MPI_Comm_rank(comm, &rank);
MPI_Comm_size(comm, &size);
......@@ -41,11 +39,15 @@ int main(int argc, char **argv)
std::string config_file;
opts::Options ops(argc, argv);
ops >> opts::Option('r', "readmethod", readmethod, "specify read method: s(=streaming) n(=nostreaming)")
>> opts::Option('f', "config", config_file, "Sensei analysis configuration xml (required)");
ops >> opts::Option('r', "readmethod", readmethod,
"specify read method: s(=streaming) n(=nostreaming)") >>
opts::Option('f', "config", config_file,
"Sensei analysis configuration xml (required)");
bool log = ops >> opts::Present("log", "generate time and memory usage log");
bool shortlog = ops >> opts::Present("shortlog", "generate a summary time and memory usage log");
bool shortlog =
ops >>
opts::Present("shortlog", "generate a summary time and memory usage log");
bool showHelp = ops >> opts::Present('h', "help", "show help");
bool haveInput = ops >> opts::PosOption(input);
......@@ -58,9 +60,10 @@ int main(int argc, char **argv)
if (!showHelp && config_file.empty() && (rank == 0))
SENSEI_ERROR("Missing XML analysis configuration");
if (showHelp || !haveInput || config_file.empty()) {
if (rank == 0) {
cerr << "Usage: " << argv[0] << "[OPTIONS] input-stream-name\n\n" << ops << endl;
if (showHelp || !haveInput || config_file.empty()) {
if (rank == 0) {
cerr << "Usage: " << argv[0] << "[OPTIONS] input-stream-name\n\n"
<< ops << endl;
}
MPI_Finalize();
return showHelp ? 0 : 1;
......@@ -68,15 +71,16 @@ int main(int argc, char **argv)
if (log | shortlog)
timer::Enable(shortlog);
timer::Initialize();
DataAdaptorPtr dataAdaptor = DataAdaptorPtr::New();
dataAdaptor->SetCommunicator(comm);
dataAdaptor->SetStreaming(doStreaming);
dataAdaptor->SetCollective(doCollectiveTxf);
dataAdaptor->SetStreamName(input);
if (dataAdaptor->Open(input)) {
if (dataAdaptor->OpenStream()) {
SENSEI_ERROR("Failed to open \"" << input << "\"");
MPI_Abort(comm, 1);
}
......@@ -85,7 +89,7 @@ int main(int argc, char **argv)
SENSEI_STATUS("Loading configurable analysis \"" << config_file << "\"");
if (rank == 0) {
SENSEI_STATUS("... streaming? "<<doStreaming);
SENSEI_STATUS("... streaming? " << doStreaming);
}
AnalysisAdaptorPtr analysisAdaptor = AnalysisAdaptorPtr::New();
......@@ -98,37 +102,35 @@ int main(int argc, char **argv)
// read from the HDF5 stream until all steps have been
// processed
unsigned int nSteps = 0;
do
{
do {
// gte the current simulation time and time step
long timeStep = dataAdaptor->GetDataTimeStep();
double time = dataAdaptor->GetDataTime();
nSteps += 1;
timer::MarkStartTimeStep(timeStep, time);
SENSEI_STATUS("Processing time step " << timeStep << " time " << time);
// execute the analysis
timer::MarkStartEvent("AnalysisAdaptor::Execute");
if (!analysisAdaptor->Execute(dataAdaptor.Get())) {
SENSEI_ERROR("Execute failed");
MPI_Abort(comm, 1);
}
timer::MarkEndEvent("AnalysisAdaptor::Execute");
// let the data adaptor release the mesh and data from this
// time step
dataAdaptor->ReleaseData();
timer::MarkEndTimeStep();
long timeStep = dataAdaptor->GetDataTimeStep();
double time = dataAdaptor->GetDataTime();
nSteps += 1;
timer::MarkStartTimeStep(timeStep, time);
SENSEI_STATUS("Processing time step " << timeStep << " time " << time);
// execute the analysis
timer::MarkStartEvent("AnalysisAdaptor::Execute");
if (!analysisAdaptor->Execute(dataAdaptor.Get())) {
SENSEI_ERROR("Execute failed");
MPI_Abort(comm, 1);
}
while (!dataAdaptor->Advance());
timer::MarkEndEvent("AnalysisAdaptor::Execute");
// let the data adaptor release the mesh and data from this
// time step
dataAdaptor->ReleaseData();
timer::MarkEndTimeStep();
} while (!dataAdaptor->AdvanceStream());
SENSEI_STATUS("Finished processing " << nSteps << " time steps")
// close the HDF5 stream
dataAdaptor->Close();
dataAdaptor->CloseStream();
analysisAdaptor->Finalize();
// we must force these to be destroyed before mpi finalize
......
This diff is collapsed.
......@@ -5,56 +5,50 @@
#include "DataRequirements.h"
#include "MeshMetadata.h"
#include <vector>
#include <string>
#include <mpi.h>
#include "hdf5.h"
#include <mpi.h>
#include <string>
#include <vector>
#include "HDF5Schema.h"
//namespace senseiHDF5 { class DataObjectCollectionSchema; }
// namespace senseiHDF5 { class DataObjectCollectionSchema; }
class vtkDataObject;
class vtkCompositeDataSet;
namespace sensei
{
namespace sensei {
/// The write side of the ADIOS 1 transport
class HDF5AnalysisAdaptor : public AnalysisAdaptor
{
class HDF5AnalysisAdaptor : public AnalysisAdaptor {
public:
static HDF5AnalysisAdaptor* New();
static HDF5AnalysisAdaptor *New();
senseiTypeMacro(HDF5AnalysisAdaptor, AnalysisAdaptor);
void PrintSelf(ostream& os, vtkIndent indent) override;
void PrintSelf(ostream &os, vtkIndent indent) override;
/// Sets the maximum buffer allocated by HDF5 in MB
/// takes affect on first Execute
void SetMaxBufferSize(unsigned int size)
{ this->MaxBufferSize = size; }
void SetMaxBufferSize(unsigned int size) { this->MaxBufferSize = size; }
/// @brief Set the filename.
///
/// Default value is "sensei.bp"
void SetFileName(const std::string &filename)
{ this->m_FileName = filename; }
void SetFileName(const std::string &filename) { this->m_FileName = filename; }
void SetStreaming(bool streamOption)
{ this ->m_DoStreaming = streamOption;}
void SetStreaming(bool streamOption) { this->m_DoStreaming = streamOption; }
void SetCollective(bool s) {m_Collective = s;}
void SetCollective(bool s) { m_Collective = s; }
std::string GetFileName() const
{ return this->m_FileName; }
std::string GetFileName() const { return this->m_FileName; }
/// data requirements tell the adaptor what to push
/// if none are given then all data is pushed.
int SetDataRequirements(const DataRequirements &reqs);
int AddDataRequirement(const std::string &meshName,
int association, const std::vector<std::string> &arrays);
int AddDataRequirement(const std::string &meshName, int association,
const std::vector<std::string> &arrays);
// SENSEI AnalysisAdaptor API
bool Execute(DataAdaptor* data) override;
bool Execute(DataAdaptor *data) override;
int Finalize() override;
protected:
......@@ -62,14 +56,14 @@ protected:
~HDF5AnalysisAdaptor();
// intializes HDF5 in no-xml mode, allocate buffers, and declares a group
//bool InitializeHDF5(const std::vector<MeshMetadataPtr> &metadata);
// bool InitializeHDF5(const std::vector<MeshMetadataPtr> &metadata);
bool InitializeHDF5();
// writes the data collection
/*
bool WriteTimestep(unsigned long timeStep, double time,
const std::vector<MeshMetadataPtr> &metadata,
const std::vector<vtkCompositeDataSet*> &dobjects);
const std::vector<MeshMetadataPtr> &metadata,
const std::vector<vtkCompositeDataSet*> &dobjects);
*/
unsigned int MaxBufferSize;
sensei::DataRequirements Requirements;
......@@ -78,12 +72,12 @@ protected:
bool m_Collective = false;
private:
senseiHDF5::WriteStream* m_HDF5Writer;
senseiHDF5::WriteStream *m_HDF5Writer;
HDF5AnalysisAdaptor(const HDF5AnalysisAdaptor&) = delete;
void operator=(const HDF5AnalysisAdaptor&) = delete;
HDF5AnalysisAdaptor(const HDF5AnalysisAdaptor &) = delete;
void operator=(const HDF5AnalysisAdaptor &) = delete;
};
}
} // namespace sensei
#endif
......@@ -3,62 +3,79 @@
#include "Error.h"
#include "Timer.h"
#include "VTKUtils.h"
#include "BlockPartitioner.h"
#include "MeshMetadata.h"
#include "Partitioner.h"
#include "VTKUtils.h"
#include <vtkCompositeDataIterator.h>
#include <vtkDataSet.h>
#include <vtkDataSetAttributes.h>
#include <vtkInformation.h>
#include <vtkMultiBlockDataSet.h>
#include <vtkObjectFactory.h>
#include <vtkSmartPointer.h>
#include <vtkDataSet.h>
#include <sstream>
namespace sensei
{
namespace sensei {
//----------------------------------------------------------------------------
senseiNewMacro(HDF5DataAdaptor);
//----------------------------------------------------------------------------
HDF5DataAdaptor::HDF5DataAdaptor():m_HDF5Reader(nullptr)
{
}
HDF5DataAdaptor::HDF5DataAdaptor() : m_HDF5Reader(nullptr) {}
//----------------------------------------------------------------------------
HDF5DataAdaptor::~HDF5DataAdaptor()
{
delete m_HDF5Reader;
HDF5DataAdaptor::~HDF5DataAdaptor() { delete m_HDF5Reader; }
//----------------------------------------------------------------------------
void HDF5DataAdaptor::SetStreamName(const std::string &name) {
m_StreamName = name;
}
//----------------------------------------------------------------------------
int HDF5DataAdaptor::Open(const std::string& fileName)
{
timer::MarkEvent mark("HDF5DataAdaptor::Open");
// int HDF5DataAdaptor::Open(const std::string& fileName)
int HDF5DataAdaptor::OpenStream() {
timer::MarkEvent mark("HDF5DataAdaptor::OpenStream");
if (m_StreamName.size() == 0) {
SENSEI_ERROR("Failed to specify stream name:\"");
return -1;
}
if (this->m_HDF5Reader == NULL) {
this->m_HDF5Reader = new senseiHDF5::ReadStream(this->GetCommunicator(), m_Streaming);
this->m_HDF5Reader =
new senseiHDF5::ReadStream(this->GetCommunicator(), m_Streaming);
}
if (!this->m_HDF5Reader->Init(fileName)) {
SENSEI_ERROR("Failed to open \"" << fileName << "\"");
if (!this->m_HDF5Reader->Init(m_StreamName)) {
SENSEI_ERROR("Failed to open \"" << m_StreamName << "\"");
return -1;
}
// initialize the time step
// initialize the time step
if (this->UpdateTimeStep())
return -1;
return 0;
}
int HDF5DataAdaptor::StreamGood() {
if (this->m_HDF5Reader == nullptr)
return -1;
if (this->m_HDF5Reader->m_Streamer == nullptr)
return -1;
if (this->m_HDF5Reader->m_Streamer->IsValid())
return 0; //
return -1;
}
//----------------------------------------------------------------------------
int HDF5DataAdaptor::Close()
{
// int HDF5DataAdaptor::Close()
int HDF5DataAdaptor::CloseStream() {
timer::MarkEvent mark("HDF5DataAdaptor::Close");
int m_Rank;
MPI_Comm_rank(GetCommunicator(), &m_Rank);
......@@ -72,27 +89,31 @@ int HDF5DataAdaptor::Close()
}
//----------------------------------------------------------------------------
int HDF5DataAdaptor::Advance()
{
int HDF5DataAdaptor::Finalize() {
timer::MarkEvent mark("HDF5DataAdaptor::Finalize");
return 0;
}
//----------------------------------------------------------------------------
// int HDF5DataAdaptor::Advance()
int HDF5DataAdaptor::AdvanceStream() {
timer::MarkEvent mark("HDF5DataAdaptor::Advance");
return this->UpdateTimeStep();
}
//----------------------------------------------------------------------------
int HDF5DataAdaptor::UpdateTimeStep()
{
int HDF5DataAdaptor::UpdateTimeStep() {
timer::MarkEvent mark("HDF5DataAdaptor::UpdateTimeStep");
// update data object time and time step
unsigned long timeStep = 0;
double time = 0.0;
if (!this->m_HDF5Reader->AdvanceTimeStep(timeStep, time))
{
//SENSEI_ERROR("Failed to update time step");
return -1;
}
if (!this->m_HDF5Reader->AdvanceTimeStep(timeStep, time)) {
// SENSEI_ERROR("Failed to update time step");
return -1;
}
this->SetDataTimeStep(timeStep);
this->SetDataTime(time);
......@@ -101,51 +122,79 @@ int HDF5DataAdaptor::UpdateTimeStep()
unsigned int nMeshes = 0;
if (!this->m_HDF5Reader->ReadMetadata(nMeshes)) {
SENSEI_ERROR("Failed to read metadata at timestep: "<<timeStep);
return -1;
SENSEI_ERROR("Failed to read metadata at timestep: " << timeStep);
return -1;
}
if (nMeshes == 0) {
SENSEI_ERROR("No Mesh at this timestep found");
return -1;
if (nMeshes == 0) {
SENSEI_ERROR("No Mesh at this timestep found");
return -1;
}
return 0;
}
//----------------------------------------------------------------------------
int HDF5DataAdaptor::GetNumberOfMeshes(unsigned int &numMeshes)
{
if (this->m_HDF5Reader) {
numMeshes = this->m_HDF5Reader->GetNumberOfMeshes();
int HDF5DataAdaptor::GetSenderMeshMetadata(unsigned int id,
MeshMetadataPtr &metadata) {
if (this->m_HDF5Reader->ReadMeshMetaData(id, metadata))
return 0;
else {
SENSEI_ERROR("Failed to get metadata for object " << id)
return -1;
}
return -1;
}
//----------------------------------------------------------------------------
int HDF5DataAdaptor::GetMeshMetadata(unsigned int id, MeshMetadataPtr &metadata)
{
if (this->m_HDF5Reader->ReadMeshMetaData(id, metadata))
int HDF5DataAdaptor::GetNumberOfMeshes(unsigned int &numMeshes) {
if (this->m_HDF5Reader) {
numMeshes = this->m_HDF5Reader->GetNumberOfMeshes();
return 0;
else
{
SENSEI_ERROR("Failed to get metadata for object " << id);
}
return -1;
}
//----------------------------------------------------------------------------
int HDF5DataAdaptor::GetMeshMetadata(unsigned int id,
MeshMetadataPtr &metadata) {
// check if some uber analysis told us how the data should land by
// passing in reciever metadata
if (this->GetReceiverMeshMetadata(id, metadata)) {
// none set, we'll use the partitioner to figure it out
// first take a look at what's available
MeshMetadataPtr senderMd;
if (this->GetSenderMeshMetadata(id, senderMd)) {
SENSEI_ERROR("Failed to get sender metadata")
return -1;
}
// get the partitioner, default to the block based layout
Partitioner *part = this->GetPartitioner();
if (!part) {
part = new BlockPartitioner();
}
MeshMetadataPtr recverMd;
if (part->GetPartition(this->GetCommunicator(), senderMd, recverMd)) {
SENSEI_ERROR("Failed to determine a suitable layout to receive the data");
this->CloseStream();
}
metadata = recverMd;
}
return 0;
}
//----------------------------------------------------------------------------
int HDF5DataAdaptor::GetMesh(const std::string &meshName,
bool structureOnly, vtkDataObject *&mesh)
{
int HDF5DataAdaptor::GetMesh(const std::string &meshName, bool structureOnly,
vtkDataObject *&mesh) {
timer::MarkEvent mark("HDF5DataAdaptor::GetMesh");
mesh = nullptr;
// other wise we need to read the mesh at the current time step
if (!this->m_HDF5Reader->ReadMesh(meshName, mesh, structureOnly))
{
if (!this->m_HDF5Reader->ReadMesh(meshName, mesh, structureOnly)) {
SENSEI_ERROR("Failed to read mesh \"" << meshName << "\"");
return -1;
}
......@@ -155,24 +204,19 @@ int HDF5DataAdaptor::GetMesh(const std::string &meshName,
//----------------------------------------------------------------------------
int HDF5DataAdaptor::AddGhostNodesArray(vtkDataObject *mesh,
const std::string &meshName)
{
const std::string &meshName) {
return AddArray(mesh, meshName, vtkDataObject::POINT, "vtkGhostType");
}
//----------------------------------------------------------------------------
int HDF5DataAdaptor::AddGhostCellsArray(vtkDataObject *mesh,
const std::string &meshName)
{
const std::string &meshName) {
return AddArray(mesh, meshName, vtkDataObject::CELL, "vtkGhostType");
}
//----------------------------------------------------------------------------
int HDF5DataAdaptor::AddArray(vtkDataObject* mesh,
const std::string &meshName,
int association,
const std::string& arrayName)
{
int HDF5DataAdaptor::AddArray(vtkDataObject *mesh, const std::string &meshName,
int association, const std::string &arrayName) {
timer::MarkEvent mark("HDF5DataAdaptor::AddArray");
// the mesh should never be null. there must have been an error
......@@ -182,27 +226,26 @@ int HDF5DataAdaptor::AddArray(vtkDataObject* mesh,
return -1;
}
if (!this->m_HDF5Reader->ReadInArray(meshName, association, arrayName, mesh))
{
SENSEI_ERROR("Failed to read " << VTKUtils::GetAttributesName(association)
<< " data array \"" << arrayName << "\" from mesh \"" << meshName << "\"");
return -1;
}
if (!this->m_HDF5Reader->ReadInArray(meshName, association, arrayName,
mesh)) {
SENSEI_ERROR("Failed to read " << VTKUtils::GetAttributesName(association)
<< " data array \"" << arrayName
<< "\" from mesh \"" << meshName << "\"");
return -1;
}
return 0;
}
//----------------------------------------------------------------------------
int HDF5DataAdaptor::ReleaseData()
{
int HDF5DataAdaptor::ReleaseData() {
timer::MarkEvent mark("HDF5DataAdaptor::ReleaseData");
return 0;
}
//----------------------------------------------------------------------------
void HDF5DataAdaptor::PrintSelf(ostream& os, vtkIndent indent)
{
void HDF5DataAdaptor::PrintSelf(ostream &os, vtkIndent indent) {
this->sensei::DataAdaptor::PrintSelf(os, indent);
}
} // namespace
} // namespace sensei
#ifndef HDF5_DataAdaptor_h
#define HDF5_DataAdaptor_h
#include "DataAdaptor.h"
#include "InTransitDataAdaptor.h"
#include <mpi.h>
#include <hdf5.h>
#include <map>
#include <mpi.h>
#include <string>
#include <vtkSmartPointer.h>
#include "HDF5Schema.h"
namespace sensei
{
namespace sensei {
//
// read from HDF and construct VTK object
//
class HDF5DataAdaptor : public DataAdaptor
{
class HDF5DataAdaptor : public InTransitDataAdaptor {
public:
static HDF5DataAdaptor* New();
static HDF5DataAdaptor *New();
senseiTypeMacro(HDF5DataAdaptor, DataAdaptor);
void PrintSelf(ostream& os, vtkIndent indent) override;
void PrintSelf(ostream &os, vtkIndent indent) override;
// int Open( const std::string& filename); // replaced to OpenStream()
int Open( const std::string& filename);
void SetStreaming(bool s) {m_Streaming = s;}
void SetCollective(bool s) {m_Collective = s;}
void SetStreamName(const std::string &name);
void SetStreaming(bool s) { m_Streaming = s; }
void SetCollective(bool s) { m_Collective = s; }
int Advance();
// int Advance(); now is AdvanceStream()
int Close();
// int Close(); now is CloseStream()
/// SENSEI DataAdaptor API
int GetNumberOfMeshes(unsigned int &numMeshes) override;
......@@ -39,16 +38,27 @@ public:
int GetMeshMetadata(unsigned int id, MeshMetadataPtr &metadata) override;
int GetMesh(const std::string &meshName, bool structure_only,
vtkDataObject *&mesh) override;
vtkDataObject *&mesh) override;
int AddGhostNodesArray(vtkDataObject* mesh, const std::string &meshName) override;
int AddGhostCellsArray(vtkDataObject* mesh, const std::string &meshName) override;
int AddGhostNodesArray(vtkDataObject *mesh,
const std::string &meshName) override;
int AddGhostCellsArray(vtkDataObject *mesh,
const std::string &meshName) override;
int AddArray(vtkDataObject* mesh, const std::string &meshName,
int association, const std::string &arrayName) override;
int AddArray(vtkDataObject *mesh, const std::string &meshName,
int association, const std::string &arrayName) override;
int ReleaseData() override;
// intransit:
int OpenStream() override;
int CloseStream() override;
int AdvanceStream() override;
int StreamGood() override;
int Finalize() override;
int GetSenderMeshMetadata(unsigned int id,
MeshMetadataPtr &metadata) override;
protected:
HDF5DataAdaptor();
~HDF5DataAdaptor();
......@@ -58,17 +68,19 @@ protected:
int UpdateTimeStep();
private:
//struct InternalsType;
//InternalsType *Internals;
senseiHDF5::ReadStream* m_HDF5Reader;
// struct InternalsType;
// InternalsType *Internals;
senseiHDF5::ReadStream *m_HDF5Reader;
bool m_Streaming = false;
bool m_Collective = false;
HDF5DataAdaptor(const HDF5DataAdaptor&) = delete;
void operator=(const HDF5DataAdaptor&) = delete;
std::string m_StreamName;
HDF5DataAdaptor(const HDF5DataAdaptor &) = delete;
void operator=(const HDF5DataAdaptor &) = delete;
};
}
} // namespace sensei
#endif
This source diff could not be displayed because it is too large. You can view the blob instead.
This diff is collapsed.
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment