Commit 56454c5f authored by Chuck Atkins's avatar Chuck Atkins

ADIOS: Remove block-index calculations to use new APIs

ADIOS has added 2 new pieces of metadata in the read API to
retrieve the absolute file timestep and writing process rank for
each block of data.  This removes the need to compute the block
index.

Change-Id: I02b14706225f0a21d3ae6b11060a21748ccba199
parent 86cdc8c3
......@@ -225,10 +225,6 @@ if(ADIOS_FOUND)
unset(_LIB_DIR)
endforeach()
#add libraries which are already using cmake format
string(REGEX MATCHALL "/([A-Za-z_0-9/\\.-]+)\\.([a|so]+)" _ADIOS_LIBS_SUB "${ADIOS_LINKFLAGS}")
list(APPEND ADIOS_LIBRARIES "${_ADIOS_LIBS_SUB}")
# add the version string
execute_process(COMMAND ${ADIOS_CONFIG} -v
OUTPUT_VARIABLE ADIOS_VERSION
......
......@@ -23,72 +23,62 @@
namespace ADIOS
{
template<typename T>
void LoadScalarsFromStats(void* &ptr, ADIOS_VARINFO *v)
{
T* &ptrT = reinterpret_cast<T*&>(ptr);
ptrT = new T[v->sum_nblocks];
for(size_t i = 0; i < v->sum_nblocks; ++i)
{
ptrT[i] = *reinterpret_cast<const T*>(v->statistics->blocks->mins[i]);
}
}
//----------------------------------------------------------------------------
Scalar::Scalar(ADIOS_FILE *f, ADIOS_VARINFO *v)
: VarInfo(f, v), Values(NULL)
{
// Allocate memory
switch(this->Type)
{
case adios_byte:
this->Values = new int8_t[v->sum_nblocks];
LoadScalarsFromStats<int8_t>(this->Values, v);
break;
case adios_short:
this->Values = new int16_t[v->sum_nblocks];
LoadScalarsFromStats<int16_t>(this->Values, v);
break;
case adios_integer:
this->Values = new int32_t[v->sum_nblocks];
LoadScalarsFromStats<int32_t>(this->Values, v);
break;
case adios_long:
this->Values = new int64_t[v->sum_nblocks];
LoadScalarsFromStats<int64_t>(this->Values, v);
break;
case adios_unsigned_byte:
this->Values = new uint8_t[v->sum_nblocks];
LoadScalarsFromStats<uint8_t>(this->Values, v);
break;
case adios_unsigned_short:
this->Values = new uint16_t[v->sum_nblocks];
LoadScalarsFromStats<uint16_t>(this->Values, v);
break;
case adios_unsigned_integer:
this->Values = new uint32_t[v->sum_nblocks];
LoadScalarsFromStats<uint32_t>(this->Values, v);
break;
case adios_unsigned_long:
this->Values = new uint64_t[v->sum_nblocks];
LoadScalarsFromStats<uint64_t>(this->Values, v);
break;
case adios_real:
this->Values = new float[v->sum_nblocks];
LoadScalarsFromStats<float>(this->Values, v);
break;
case adios_double:
this->Values = new double[v->sum_nblocks];
LoadScalarsFromStats<double>(this->Values, v);
break;
case adios_complex:
this->Values = new std::complex<float>[v->sum_nblocks];
LoadScalarsFromStats<std::complex<float> >(this->Values, v);
break;
case adios_double_complex:
this->Values = new std::complex<double>[v->sum_nblocks];
LoadScalarsFromStats<std::complex<double> >(this->Values, v);
break;
default:
// Unsupported data type
break;
default: break;
}
size_t tSize = Type::SizeOf(this->Type);
// Read all blocks and steps
int err;
char *rawPtr = reinterpret_cast<char *>(this->Values);
for(size_t s = 0; s < v->nsteps; ++s)
{
for(size_t b = 0; b < v->nblocks[s]; ++b)
{
ADIOS_SELECTION *sel = adios_selection_writeblock(b);
ReadError::TestNe<ADIOS_SELECTION*>(NULL, sel);
err = adios_schedule_read_byid(f, sel, v->varid, s, 1, rawPtr);
ReadError::TestEq(0, err);
err = adios_perform_reads(f, 1);
ReadError::TestEq(0, err);
adios_selection_delete(sel);
rawPtr += tSize;
}
}
}
......
......@@ -39,8 +39,10 @@ public:
{
ReadError::TestEq(this->Type, Type::NativeToADIOS<T>(), "Invalid type");
const int blockId = this->GetBlockId(step, block);
return reinterpret_cast<const T*>(this->Values)[blockId];
StepBlock* idx = this->GetNewestBlockIndex(step, block);
ReadError::TestNe<StepBlock*>(NULL, idx, "Variable not available");
return reinterpret_cast<const T*>(this->Values)[idx->BlockId];
}
protected:
......
......@@ -35,20 +35,58 @@ VarInfo::VarInfo(ADIOS_FILE *f, ADIOS_VARINFO *v)
err = adios_inq_var_blockinfo(f, v);
ReadError::TestEq(0, err);
// Calculate block ids
// Polulate dimensions and determine block step ranges
size_t pidMax = 0;
size_t tidMax = 0;
int nd = v->ndim;
this->Dims.resize(v->sum_nblocks);
for(size_t bid = 0; bid < v->sum_nblocks; ++bid)
{
ADIOS_VARBLOCK &bi = v->blockinfo[bid];
if(bi.process_id > pidMax)
{
pidMax = bi.process_id;
}
if(nd > 0)
{
std::vector<size_t> &dimsBid = this->Dims[bid];
dimsBid.reserve(nd);
for(size_t n = 0; n < nd; ++n)
{
dimsBid.push_back(bi.count[n]);
}
}
}
// Construct the block index
this->NumPids = pidMax + 1;
this->NumSteps = f->last_step+1;
this->StepBlockIndex.clear();
this->StepBlockIndex.resize(this->NumSteps*this->NumPids, NULL);
size_t bid = 0;
this->BlockId.resize(v->nsteps);
this->Dims.resize(v->nsteps);
for(size_t s = 0; s < v->nsteps; ++s)
{
this->Dims[s].resize(v->nblocks[s]);
for(size_t b = 0; b < v->nblocks[s]; ++b)
{
this->Dims[s][b].reserve(v->ndim);
std::copy(v->blockinfo[bid].count, v->blockinfo[bid].count+v->ndim,
std::back_inserter(this->Dims[s][b]));
ADIOS_VARBLOCK &bi = v->blockinfo[bid];
this->StepBlockIndex[(bi.time_index-1)*this->NumPids+bi.process_id] =
new StepBlock(s, b, bid++);
}
}
}
this->BlockId[s].push_back(bid++);
//----------------------------------------------------------------------------
VarInfo::~VarInfo()
{
// Cleanup the block step index
for(std::vector<StepBlock*>::iterator i = this->StepBlockIndex.begin();
i != this->StepBlockIndex.end(); ++i)
{
if(*i)
{
delete *i;
*i = NULL;
}
}
}
......@@ -74,34 +112,38 @@ const std::string& VarInfo::GetName(void) const
//----------------------------------------------------------------------------
size_t VarInfo::GetNumSteps(void) const
{
return this->BlockId.size();
return this->NumSteps;
}
//----------------------------------------------------------------------------
size_t VarInfo::GetNumBlocks(size_t step) const
{
return this->BlockId[step].size();
return this->NumPids;
}
//----------------------------------------------------------------------------
size_t VarInfo::GetBlockId(size_t step, size_t block) const
VarInfo::StepBlock* VarInfo::GetNewestBlockIndex(size_t step, size_t pid) const
{
ReadError::TestEq(true, step < this->BlockId.size(), "Invalid step");
ReadError::TestEq(true, block < this->BlockId[step].size(),
"Invalid block");
return static_cast<int>(this->BlockId[step][block]);
ReadError::TestEq(true, step < this->NumSteps, "Invalid step");
ReadError::TestEq(true, pid < this->NumPids, "Invalid block");
StepBlock* idx = NULL;
for(int curStep = step; !idx && curStep >= 0; --curStep)
{
idx = this->StepBlockIndex[curStep*this->NumPids+pid];
}
return idx;
}
//----------------------------------------------------------------------------
void VarInfo::GetDims(std::vector<size_t>& dims, size_t step,
size_t block) const
void VarInfo::GetDims(std::vector<size_t>& dims, size_t step, size_t pid) const
{
ReadError::TestEq(true, step < this->BlockId.size(), "Invalid step");
ReadError::TestEq(true, block < this->BlockId[step].size(),
"Invalid block");
StepBlock* idx = this->GetNewestBlockIndex(step, pid);
ReadError::TestNe<VarInfo::StepBlock*>(NULL, idx, "Variable not available");
dims.clear();
dims = this->Dims[step][block];
dims = this->Dims[idx->BlockId];
}
} // End namespace ADIOS
......@@ -28,9 +28,20 @@ namespace ADIOS
class VarInfo
{
public:
// Data structure used to hold block index mapping info
struct StepBlock
{
StepBlock() : Step(-1), Block(-1), BlockId(-1) {}
StepBlock(int s, int b, int i) : Step(s), Block(b), BlockId(i) { }
int Step;
int Block;
int BlockId;
};
public:
VarInfo(ADIOS_FILE *f, ADIOS_VARINFO *v);
virtual ~VarInfo(void) { }
virtual ~VarInfo(void);
void SetName(const std::string& name) { this->Name = name; }
const int& GetId() const;
......@@ -38,15 +49,20 @@ public:
const std::string& GetName(void) const;
size_t GetNumSteps(void) const;
size_t GetNumBlocks(size_t step) const;
size_t GetBlockId(size_t step, size_t block) const;
void GetDims(std::vector<size_t>& dims, size_t step, size_t block) const;
StepBlock* GetNewestBlockIndex(size_t step, size_t pid) const;
void GetDims(std::vector<size_t>& dims, size_t step, size_t pid) const;
protected:
int Id;
ADIOS_DATATYPES Type;
std::string Name;
std::vector<std::vector<size_t> > BlockId;
std::vector<std::vector<std::vector<size_t> > > Dims;
size_t NumSteps;
size_t NumPids;
std::vector<std::vector<size_t> > Dims;
// This maps the absolute time step and process id to a file-local
// step and block id for reading
std::vector<StepBlock*> StepBlockIndex;
};
} // End namespace ADIOS
......
......@@ -409,7 +409,6 @@ void Writer::WriteArray(const std::string& path, const void* val)
void Writer::Commit(const std::string& fName, bool app)
{
uint64_t groupSize = 0;
std::vector<const WriterImpl::ArrayValue*> nonEmptyArrays;
// Step 1: Preprocessing
......@@ -447,23 +446,15 @@ void Writer::Commit(const std::string& fName, bool app)
di->ValueI : this->Impl->IntegralScalars[di->ValueS];
}
}
//if(numElements == 0)
// {
// delete *avi;
// }
//else
// {
groupSize += numElements * ai->ElementSize;
nonEmptyArrays.push_back(*avi);
// }
groupSize += numElements * ai->ElementSize;
}
this->Impl->ArraysToWrite.clear();
int err;
// Step 2. Set the buffer size in MB with the full knowledge of the dynamic
// group size
err = adios_allocate_buffer(ADIOS_BUFFER_ALLOC_LATER, (groupSize >> 20) + 1);
// group size. Ask for 10% over the group size to account for extra metadata
int bufSize = (groupSize * 1.1)/(1024*1024) + 1;
err = adios_allocate_buffer(ADIOS_BUFFER_ALLOC_LATER, bufSize);
WriteError::TestEq(0, err);
// Step 3. Open the file for writing
......@@ -488,8 +479,8 @@ void Writer::Commit(const std::string& fName, bool app)
// Step 5: Write Arrays
for(std::vector<const WriterImpl::ArrayValue*>::iterator avi =
nonEmptyArrays.begin();
avi != nonEmptyArrays.end();
this->Impl->ArraysToWrite.begin();
avi != this->Impl->ArraysToWrite.end();
++avi)
{
err = adios_write(file, (*avi)->Path.c_str(),
......@@ -499,7 +490,6 @@ void Writer::Commit(const std::string& fName, bool app)
// Step 6. Close the file and commit the writes to ADIOS
adios_close(file);
MPI_Barrier(this->Ctx->Comm);
// Step 7. Cleanup
for(std::vector<const WriterImpl::ScalarValue*>::iterator svi =
......@@ -509,14 +499,16 @@ void Writer::Commit(const std::string& fName, bool app)
{
delete *svi;
}
this->Impl->ScalarsToWrite.clear();
for(std::vector<const WriterImpl::ArrayValue*>::iterator avi =
nonEmptyArrays.begin();
avi != nonEmptyArrays.end();
this->Impl->ArraysToWrite.begin();
avi != this->Impl->ArraysToWrite.end();
++avi)
{
delete *avi;
}
this->Impl->ScalarsToWrite.clear();
this->Impl->ArraysToWrite.clear();
}
} // End namespace
......@@ -205,7 +205,7 @@ int vtkADIOSReader::RequestInformation(vtkInformation *vtkNotUsed(req),
vtkInformation* outInfo = output->GetInformationObject(0);
outInfo->Set(vtkAlgorithm::CAN_HANDLE_PIECE_REQUEST(), 1);
// Rank 0 reads attributes and time steps and sends to all other ranks
// Rank 0 reads attributes and sends to all other ranks
if(this->Controller->GetLocalProcessId() == 0)
{
// 1: Retrieve the necessary attributes
......@@ -226,32 +226,30 @@ int vtkADIOSReader::RequestInformation(vtkInformation *vtkNotUsed(req),
vtkWarningMacro(<< "NumberOfPieces attribute not present. Assuming 1");
this->NumberOfPieces = 1;
}
// 3: Retrieve the time steps
const ADIOS::Scalar *varTimeSteps = this->Tree->GetScalar("TimeStamp");
this->TimeSteps.clear();
this->TimeSteps.resize(varTimeSteps->GetNumSteps());
for(int t = 0; t < varTimeSteps->GetNumSteps(); ++t)
{
this->TimeSteps[t] = varTimeSteps->GetValue<double>(t, 0);
}
}
// 4: Communicate metadata to all other ranks
int msg1[2];
// 3: Broadcast number of pieces to all other ranks
int msg1[1];
if(this->Controller->GetLocalProcessId() == 0)
{
msg1[0] = this->NumberOfPieces;
msg1[1] = this->TimeSteps.size();
}
this->Controller->Broadcast(msg1, 2, 0);
this->Controller->Broadcast(msg1, 1, 0);
if(this->Controller->GetLocalProcessId() != 0)
{
this->NumberOfPieces = msg1[0];
this->TimeSteps.resize(msg1[1]);
}
this->Controller->Broadcast(&(*this->TimeSteps.begin()),
this->TimeSteps.size(), 0);
// 4: Retrieve the time steps
const ADIOS::Scalar *varTimeSteps = this->Tree->GetScalar("TimeStamp");
this->TimeSteps.clear();
this->TimeSteps.resize(varTimeSteps->GetNumSteps());
for(int t = 0; t < varTimeSteps->GetNumSteps(); ++t)
{
// Always read time info from block0
this->TimeSteps[t] = varTimeSteps->GetValue<double>(t, 0);
}
// Populate the inverse lookup, i.e. time step value to time step index
this->TimeStepsIndex.clear();
......@@ -500,10 +498,14 @@ void vtkADIOSReader::ReadObject(const ADIOS::VarInfo* info,
// Only queue the read if there's data to be read
if(nc != 0 && nt != 0)
{
const ADIOS::VarInfo::StepBlock *idx =
info->GetNewestBlockIndex(this->RequestStepIndex, blockId);
// TODO: Use a cached copy if available
data->SetNumberOfComponents(nc);
data->SetNumberOfTuples(nt);
this->Reader->ScheduleReadArray(info->GetId(), data->GetVoidPointer(0),
this->RequestStepIndex, blockId);
idx->Step, idx->Block);
}
}
......
......@@ -58,7 +58,7 @@ vtkADIOSWriter::vtkADIOSWriter()
TransportMethod(static_cast<int>(ADIOS::TransportMethod_POSIX)),
TransportMethodArguments(NULL),
Transform(static_cast<int>(ADIOS::Transform_NONE)),
WriteMode(vtkADIOSWriter::Always), CurrentStep(-1), Controller(NULL),
CurrentStep(-1), Controller(NULL),
Writer(NULL),
NumberOfPieces(-1), RequestPiece(-1), NumberOfGhostLevels(-1),
WriteAllTimeSteps(false), TimeSteps(), CurrentTimeStepIndex(-1)
......@@ -150,26 +150,6 @@ bool vtkADIOSWriter::DefineAndWrite(vtkDataObject *input)
// Before any data can be writen, it's structure must be declared
this->Define("", data);
if(this->WriteMode == vtkADIOSWriter::OnChange)
{
// Set up the index for independently array stepping
this->BlockStepIndex.clear();
this->BlockStepIndex.resize(this->BlockStepIndexIdMap.size());
std::vector<ADIOS::ArrayDim> indexDims;
indexDims.push_back(ADIOS::ArrayDim(this->BlockStepIndexIdMap.size()));
this->Writer->DefineLocalArray<int>("::BlockStepIndex", indexDims);
// Gather all the block step index id maps to Rank 0
std::string BlockStepIndexIdMapAttr = this->GatherBlockStepIdMap();
if(localProc == 0)
{
this->Writer->DefineAttribute<std::string>("::BlockStepIndexIdMap",
BlockStepIndexIdMapAttr);
}
}
if(localProc == 0)
{
// Global time step is only used by Rank 0
......@@ -194,14 +174,7 @@ bool vtkADIOSWriter::DefineAndWrite(vtkDataObject *input)
}
}
std::memset(&*this->BlockStepIndex.begin(), 0xFF,
sizeof(vtkTypeInt64)*this->BlockStepIndex.size());
this->Write("", data);
if(this->WriteMode == vtkADIOSWriter::OnChange)
{
this->Writer->WriteArray("::BlockStepIndex", &this->BlockStepIndex[0]);
}
this->Writer->Commit(this->FileName, this->CurrentStep > 0);
}
catch(const ADIOS::WriteError &err)
......@@ -212,64 +185,6 @@ bool vtkADIOSWriter::DefineAndWrite(vtkDataObject *input)
return true;
}
//----------------------------------------------------------------------------
std::string vtkADIOSWriter::GatherBlockStepIdMap(void)
{
const int numProcs = this->Controller->GetNumberOfProcesses();
const int localProc = this->Controller->GetLocalProcessId();
// Encode into string containing:
// Block0_Id Var0_Id Var0_Name
// Block0_Id Var1_Id Var1_Name
// ...
// BlockN_Id VarM_Id VarM_Name
std::stringstream ss;
for(NameIdMap::const_iterator i = this->BlockStepIndexIdMap.begin();
i != this->BlockStepIndexIdMap.end(); ++i)
{
ss << localProc << ' ' << i->second << ' ' << i->first << '\n';
}
std::string sendBuf = ss.str();
vtkIdType sendBufLen = sendBuf.length();
// Gather the variable length buffer sizes
vtkIdType *recvLengths = localProc == 0 ? new vtkIdType[numProcs] : NULL;
this->Controller->Gather(&sendBufLen, recvLengths, 1, 0);
// Compute the recieving buffer sizes and offsets
vtkIdType fullLength = 0;
vtkIdType *recvOffsets = NULL;
char *recvBuffer = NULL;
if(localProc == 0)
{
recvOffsets = new vtkIdType[numProcs];
for(int p = 0; p < numProcs; ++p)
{
recvOffsets[p] = fullLength;
fullLength += recvLengths[p];
}
recvBuffer = new char[fullLength];
}
// Gather the index id maps from all processes
this->Controller->GatherV(sendBuf.c_str(), recvBuffer, sendBufLen,
recvLengths, recvOffsets, 0);
std::string recv;
if(localProc == 0)
{
// Strip the trailing \n to make null terminated and parse as an std::string
recvBuffer[fullLength-1] = '\0';
recv = recvBuffer;
// Cleanup
delete[] recvBuffer;
delete[] recvOffsets;
delete[] recvLengths;
}
return recv;
}
//----------------------------------------------------------------------------
bool vtkADIOSWriter::WriteInternal(void)
{
......@@ -441,11 +356,6 @@ void vtkADIOSWriter::Define(const std::string& path, const vtkAbstractArray* v)
this->Writer->DefineLocalArray(path,
ADIOS::Type::VTKToADIOS(valueTmp->GetDataType()), dims,
static_cast<ADIOS::Transform>(this->Transform));
if(this->WriteMode == vtkADIOSWriter::OnChange)
{
this->BlockStepIndexIdMap.insert(
std::make_pair(path, this->BlockStepIndexIdMap.size()));
}
}
//----------------------------------------------------------------------------
......@@ -580,7 +490,7 @@ bool vtkADIOSWriter::UpdateMTimeTable(const std::string path,
unsigned long mtimePrev = mtimeCurrent;
mtimeCurrent = mtimeNew;
return this->WriteMode == vtkADIOSWriter::Always || mtimeNew != mtimePrev;
return mtimeNew != mtimePrev;
}
//----------------------------------------------------------------------------
......@@ -602,22 +512,9 @@ void vtkADIOSWriter::Write(const std::string& path, const vtkAbstractArray* v)
size_t nc = valueTmp->GetNumberOfComponents();
size_t nt = valueTmp->GetNumberOfTuples();
// Skip empty arrays
//if(nc == 0 || nt == 0)
// {
// return;
// }
this->Writer->WriteScalar<size_t>(path+"#NC", nc);
this->Writer->WriteScalar<size_t>(path+"#NT", nt);
this->Writer->WriteArray(path, valueTmp->GetVoidPointer(0));
if(this->WriteMode == vtkADIOSWriter::OnChange)
{
this->BlockStepIndex[this->BlockStepIndexIdMap[path]] =
(static_cast<vtkTypeInt64>(this->CurrentStep) << 32) |
this->Controller->GetLocalProcessId();
}
}
//----------------------------------------------------------------------------
......@@ -628,7 +525,7 @@ void vtkADIOSWriter::Write(const std::string& path, const vtkDataArray* v)
if(lut)
{
// Only heck the mtime here if a LUT is present. Otherwise it will be
// Only check the mtime here if a LUT is present. Otherwise it will be
// handled apropriately by the abstract array writer
if(!this->UpdateMTimeTable(path, v))
{
......
......@@ -95,19 +95,6 @@ public:
void SetTransformToBZip2() { this->SetTransform(static_cast<int>(ADIOS::Transform_BZLIB2)); }
void SetTransformToSZip() { this->SetTransform(static_cast<int>(ADIOS::Transform_SZIP)); }
enum
{
Always = 0,
OnChange = 1
};
// Description:
// Get/Set the write mode for array data
vtkGetMacro(WriteMode, int);
vtkSetClampMacro(WriteMode, int, Always, OnChange);
void SetWriteModeToAlways() { this->SetWriteMode(Always); }
void SetWriteModeToOnChange() { this->SetWriteMode(OnChange); }
//Description:
//Controls whether writer automatically writes all input time steps, or
//just the timestep that is currently on the input.
......@@ -166,15 +153,10 @@ protected:
int TransportMethod;
char *TransportMethodArguments;
int Transform;
int WriteMode;
int Rank;
int CurrentStep;
typedef std::map<std::string, size_t> NameIdMap;
NameIdMap BlockStepIndexIdMap;
std::vector<vtkTypeInt64> BlockStepIndex;
vtkMultiProcessController *Controller;
ADIOS::Writer *Writer;
int BLOCKDEBUG;
vtkADIOSWriter();
~vtkADIOSWriter();
......@@ -206,9 +188,6 @@ protected:
bool UpdateMTimeTable(const std::string path, const vtkObject* value);
std::map<std::string, unsigned long> LastUpdated;
private:
// Synchronize the block step index map across all processes
std::string GatherBlockStepIdMap();
bool WriteInternal();
template<typename T>
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment