Commit 24a829a6 authored by Chuck Atkins's avatar Chuck Atkins Committed by Code Review
Browse files

Merge topic 'update-adios-to-new-api' into master

56454c5f ADIOS: Remove block-index calculations to use new APIs
parents 6a78788b 56454c5f
......@@ -225,10 +225,6 @@ if(ADIOS_FOUND)
unset(_LIB_DIR)
endforeach()
#add libraries which are already using cmake format
string(REGEX MATCHALL "/([A-Za-z_0-9/\\.-]+)\\.([a|so]+)" _ADIOS_LIBS_SUB "${ADIOS_LINKFLAGS}")
list(APPEND ADIOS_LIBRARIES "${_ADIOS_LIBS_SUB}")
# add the version string
execute_process(COMMAND ${ADIOS_CONFIG} -v
OUTPUT_VARIABLE ADIOS_VERSION
......
......@@ -23,72 +23,62 @@
namespace ADIOS
{
template<typename T>
void LoadScalarsFromStats(void* &ptr, ADIOS_VARINFO *v)
{
T* &ptrT = reinterpret_cast<T*&>(ptr);
ptrT = new T[v->sum_nblocks];
for(size_t i = 0; i < v->sum_nblocks; ++i)
{
ptrT[i] = *reinterpret_cast<const T*>(v->statistics->blocks->mins[i]);
}
}
//----------------------------------------------------------------------------
Scalar::Scalar(ADIOS_FILE *f, ADIOS_VARINFO *v)
: VarInfo(f, v), Values(NULL)
{
// Allocate memory
switch(this->Type)
{
case adios_byte:
this->Values = new int8_t[v->sum_nblocks];
LoadScalarsFromStats<int8_t>(this->Values, v);
break;
case adios_short:
this->Values = new int16_t[v->sum_nblocks];
LoadScalarsFromStats<int16_t>(this->Values, v);
break;
case adios_integer:
this->Values = new int32_t[v->sum_nblocks];
LoadScalarsFromStats<int32_t>(this->Values, v);
break;
case adios_long:
this->Values = new int64_t[v->sum_nblocks];
LoadScalarsFromStats<int64_t>(this->Values, v);
break;
case adios_unsigned_byte:
this->Values = new uint8_t[v->sum_nblocks];
LoadScalarsFromStats<uint8_t>(this->Values, v);
break;
case adios_unsigned_short:
this->Values = new uint16_t[v->sum_nblocks];
LoadScalarsFromStats<uint16_t>(this->Values, v);
break;
case adios_unsigned_integer:
this->Values = new uint32_t[v->sum_nblocks];
LoadScalarsFromStats<uint32_t>(this->Values, v);
break;
case adios_unsigned_long:
this->Values = new uint64_t[v->sum_nblocks];
LoadScalarsFromStats<uint64_t>(this->Values, v);
break;
case adios_real:
this->Values = new float[v->sum_nblocks];
LoadScalarsFromStats<float>(this->Values, v);
break;
case adios_double:
this->Values = new double[v->sum_nblocks];
LoadScalarsFromStats<double>(this->Values, v);
break;
case adios_complex:
this->Values = new std::complex<float>[v->sum_nblocks];
LoadScalarsFromStats<std::complex<float> >(this->Values, v);
break;
case adios_double_complex:
this->Values = new std::complex<double>[v->sum_nblocks];
LoadScalarsFromStats<std::complex<double> >(this->Values, v);
break;
default:
// Unsupported data type
break;
default: break;
}
size_t tSize = Type::SizeOf(this->Type);
// Read all blocks and steps
int err;
char *rawPtr = reinterpret_cast<char *>(this->Values);
for(size_t s = 0; s < v->nsteps; ++s)
{
for(size_t b = 0; b < v->nblocks[s]; ++b)
{
ADIOS_SELECTION *sel = adios_selection_writeblock(b);
ReadError::TestNe<ADIOS_SELECTION*>(NULL, sel);
err = adios_schedule_read_byid(f, sel, v->varid, s, 1, rawPtr);
ReadError::TestEq(0, err);
err = adios_perform_reads(f, 1);
ReadError::TestEq(0, err);
adios_selection_delete(sel);
rawPtr += tSize;
}
}
}
......
......@@ -39,8 +39,10 @@ public:
{
ReadError::TestEq(this->Type, Type::NativeToADIOS<T>(), "Invalid type");
const int blockId = this->GetBlockId(step, block);
return reinterpret_cast<const T*>(this->Values)[blockId];
StepBlock* idx = this->GetNewestBlockIndex(step, block);
ReadError::TestNe<StepBlock*>(NULL, idx, "Variable not available");
return reinterpret_cast<const T*>(this->Values)[idx->BlockId];
}
protected:
......
......@@ -35,20 +35,58 @@ VarInfo::VarInfo(ADIOS_FILE *f, ADIOS_VARINFO *v)
err = adios_inq_var_blockinfo(f, v);
ReadError::TestEq(0, err);
// Calculate block ids
// Polulate dimensions and determine block step ranges
size_t pidMax = 0;
size_t tidMax = 0;
int nd = v->ndim;
this->Dims.resize(v->sum_nblocks);
for(size_t bid = 0; bid < v->sum_nblocks; ++bid)
{
ADIOS_VARBLOCK &bi = v->blockinfo[bid];
if(bi.process_id > pidMax)
{
pidMax = bi.process_id;
}
if(nd > 0)
{
std::vector<size_t> &dimsBid = this->Dims[bid];
dimsBid.reserve(nd);
for(size_t n = 0; n < nd; ++n)
{
dimsBid.push_back(bi.count[n]);
}
}
}
// Construct the block index
this->NumPids = pidMax + 1;
this->NumSteps = f->last_step+1;
this->StepBlockIndex.clear();
this->StepBlockIndex.resize(this->NumSteps*this->NumPids, NULL);
size_t bid = 0;
this->BlockId.resize(v->nsteps);
this->Dims.resize(v->nsteps);
for(size_t s = 0; s < v->nsteps; ++s)
{
this->Dims[s].resize(v->nblocks[s]);
for(size_t b = 0; b < v->nblocks[s]; ++b)
{
this->Dims[s][b].reserve(v->ndim);
std::copy(v->blockinfo[bid].count, v->blockinfo[bid].count+v->ndim,
std::back_inserter(this->Dims[s][b]));
ADIOS_VARBLOCK &bi = v->blockinfo[bid];
this->StepBlockIndex[(bi.time_index-1)*this->NumPids+bi.process_id] =
new StepBlock(s, b, bid++);
}
}
}
this->BlockId[s].push_back(bid++);
//----------------------------------------------------------------------------
VarInfo::~VarInfo()
{
// Cleanup the block step index
for(std::vector<StepBlock*>::iterator i = this->StepBlockIndex.begin();
i != this->StepBlockIndex.end(); ++i)
{
if(*i)
{
delete *i;
*i = NULL;
}
}
}
......@@ -74,34 +112,38 @@ const std::string& VarInfo::GetName(void) const
//----------------------------------------------------------------------------
size_t VarInfo::GetNumSteps(void) const
{
return this->BlockId.size();
return this->NumSteps;
}
//----------------------------------------------------------------------------
size_t VarInfo::GetNumBlocks(size_t step) const
{
return this->BlockId[step].size();
return this->NumPids;
}
//----------------------------------------------------------------------------
size_t VarInfo::GetBlockId(size_t step, size_t block) const
VarInfo::StepBlock* VarInfo::GetNewestBlockIndex(size_t step, size_t pid) const
{
ReadError::TestEq(true, step < this->BlockId.size(), "Invalid step");
ReadError::TestEq(true, block < this->BlockId[step].size(),
"Invalid block");
return static_cast<int>(this->BlockId[step][block]);
ReadError::TestEq(true, step < this->NumSteps, "Invalid step");
ReadError::TestEq(true, pid < this->NumPids, "Invalid block");
StepBlock* idx = NULL;
for(int curStep = step; !idx && curStep >= 0; --curStep)
{
idx = this->StepBlockIndex[curStep*this->NumPids+pid];
}
return idx;
}
//----------------------------------------------------------------------------
void VarInfo::GetDims(std::vector<size_t>& dims, size_t step,
size_t block) const
void VarInfo::GetDims(std::vector<size_t>& dims, size_t step, size_t pid) const
{
ReadError::TestEq(true, step < this->BlockId.size(), "Invalid step");
ReadError::TestEq(true, block < this->BlockId[step].size(),
"Invalid block");
StepBlock* idx = this->GetNewestBlockIndex(step, pid);
ReadError::TestNe<VarInfo::StepBlock*>(NULL, idx, "Variable not available");
dims.clear();
dims = this->Dims[step][block];
dims = this->Dims[idx->BlockId];
}
} // End namespace ADIOS
......@@ -28,9 +28,20 @@ namespace ADIOS
class VarInfo
{
public:
// Data structure used to hold block index mapping info
struct StepBlock
{
StepBlock() : Step(-1), Block(-1), BlockId(-1) {}
StepBlock(int s, int b, int i) : Step(s), Block(b), BlockId(i) { }
int Step;
int Block;
int BlockId;
};
public:
VarInfo(ADIOS_FILE *f, ADIOS_VARINFO *v);
virtual ~VarInfo(void) { }
virtual ~VarInfo(void);
void SetName(const std::string& name) { this->Name = name; }
const int& GetId() const;
......@@ -38,15 +49,20 @@ public:
const std::string& GetName(void) const;
size_t GetNumSteps(void) const;
size_t GetNumBlocks(size_t step) const;
size_t GetBlockId(size_t step, size_t block) const;
void GetDims(std::vector<size_t>& dims, size_t step, size_t block) const;
StepBlock* GetNewestBlockIndex(size_t step, size_t pid) const;
void GetDims(std::vector<size_t>& dims, size_t step, size_t pid) const;
protected:
int Id;
ADIOS_DATATYPES Type;
std::string Name;
std::vector<std::vector<size_t> > BlockId;
std::vector<std::vector<std::vector<size_t> > > Dims;
size_t NumSteps;
size_t NumPids;
std::vector<std::vector<size_t> > Dims;
// This maps the absolute time step and process id to a file-local
// step and block id for reading
std::vector<StepBlock*> StepBlockIndex;
};
} // End namespace ADIOS
......
......@@ -409,7 +409,6 @@ void Writer::WriteArray(const std::string& path, const void* val)
void Writer::Commit(const std::string& fName, bool app)
{
uint64_t groupSize = 0;
std::vector<const WriterImpl::ArrayValue*> nonEmptyArrays;
// Step 1: Preprocessing
......@@ -447,23 +446,15 @@ void Writer::Commit(const std::string& fName, bool app)
di->ValueI : this->Impl->IntegralScalars[di->ValueS];
}
}
//if(numElements == 0)
// {
// delete *avi;
// }
//else
// {
groupSize += numElements * ai->ElementSize;
nonEmptyArrays.push_back(*avi);
// }
groupSize += numElements * ai->ElementSize;
}
this->Impl->ArraysToWrite.clear();
int err;
// Step 2. Set the buffer size in MB with the full knowledge of the dynamic
// group size
err = adios_allocate_buffer(ADIOS_BUFFER_ALLOC_LATER, (groupSize >> 20) + 1);
// group size. Ask for 10% over the group size to account for extra metadata
int bufSize = (groupSize * 1.1)/(1024*1024) + 1;
err = adios_allocate_buffer(ADIOS_BUFFER_ALLOC_LATER, bufSize);
WriteError::TestEq(0, err);
// Step 3. Open the file for writing
......@@ -488,8 +479,8 @@ void Writer::Commit(const std::string& fName, bool app)
// Step 5: Write Arrays
for(std::vector<const WriterImpl::ArrayValue*>::iterator avi =
nonEmptyArrays.begin();
avi != nonEmptyArrays.end();
this->Impl->ArraysToWrite.begin();
avi != this->Impl->ArraysToWrite.end();
++avi)
{
err = adios_write(file, (*avi)->Path.c_str(),
......@@ -499,7 +490,6 @@ void Writer::Commit(const std::string& fName, bool app)
// Step 6. Close the file and commit the writes to ADIOS
adios_close(file);
MPI_Barrier(this->Ctx->Comm);
// Step 7. Cleanup
for(std::vector<const WriterImpl::ScalarValue*>::iterator svi =
......@@ -509,14 +499,16 @@ void Writer::Commit(const std::string& fName, bool app)
{
delete *svi;
}
this->Impl->ScalarsToWrite.clear();
for(std::vector<const WriterImpl::ArrayValue*>::iterator avi =
nonEmptyArrays.begin();
avi != nonEmptyArrays.end();
this->Impl->ArraysToWrite.begin();
avi != this->Impl->ArraysToWrite.end();
++avi)
{
delete *avi;
}
this->Impl->ScalarsToWrite.clear();
this->Impl->ArraysToWrite.clear();
}
} // End namespace
......@@ -205,7 +205,7 @@ int vtkADIOSReader::RequestInformation(vtkInformation *vtkNotUsed(req),
vtkInformation* outInfo = output->GetInformationObject(0);
outInfo->Set(vtkAlgorithm::CAN_HANDLE_PIECE_REQUEST(), 1);
// Rank 0 reads attributes and time steps and sends to all other ranks
// Rank 0 reads attributes and sends to all other ranks
if(this->Controller->GetLocalProcessId() == 0)
{
// 1: Retrieve the necessary attributes
......@@ -226,32 +226,30 @@ int vtkADIOSReader::RequestInformation(vtkInformation *vtkNotUsed(req),
vtkWarningMacro(<< "NumberOfPieces attribute not present. Assuming 1");
this->NumberOfPieces = 1;
}
// 3: Retrieve the time steps
const ADIOS::Scalar *varTimeSteps = this->Tree->GetScalar("TimeStamp");
this->TimeSteps.clear();
this->TimeSteps.resize(varTimeSteps->GetNumSteps());
for(int t = 0; t < varTimeSteps->GetNumSteps(); ++t)
{
this->TimeSteps[t] = varTimeSteps->GetValue<double>(t, 0);
}
}
// 4: Communicate metadata to all other ranks
int msg1[2];
// 3: Broadcast number of pieces to all other ranks
int msg1[1];
if(this->Controller->GetLocalProcessId() == 0)
{
msg1[0] = this->NumberOfPieces;
msg1[1] = this->TimeSteps.size();
}
this->Controller->Broadcast(msg1, 2, 0);
this->Controller->Broadcast(msg1, 1, 0);
if(this->Controller->GetLocalProcessId() != 0)
{
this->NumberOfPieces = msg1[0];
this->TimeSteps.resize(msg1[1]);
}
this->Controller->Broadcast(&(*this->TimeSteps.begin()),
this->TimeSteps.size(), 0);
// 4: Retrieve the time steps
const ADIOS::Scalar *varTimeSteps = this->Tree->GetScalar("TimeStamp");
this->TimeSteps.clear();
this->TimeSteps.resize(varTimeSteps->GetNumSteps());
for(int t = 0; t < varTimeSteps->GetNumSteps(); ++t)
{
// Always read time info from block0
this->TimeSteps[t] = varTimeSteps->GetValue<double>(t, 0);
}
// Populate the inverse lookup, i.e. time step value to time step index
this->TimeStepsIndex.clear();
......@@ -500,10 +498,14 @@ void vtkADIOSReader::ReadObject(const ADIOS::VarInfo* info,
// Only queue the read if there's data to be read
if(nc != 0 && nt != 0)
{
const ADIOS::VarInfo::StepBlock *idx =
info->GetNewestBlockIndex(this->RequestStepIndex, blockId);
// TODO: Use a cached copy if available
data->SetNumberOfComponents(nc);
data->SetNumberOfTuples(nt);
this->Reader->ScheduleReadArray(info->GetId(), data->GetVoidPointer(0),
this->RequestStepIndex, blockId);
idx->Step, idx->Block);
}
}
......
......@@ -58,7 +58,7 @@ vtkADIOSWriter::vtkADIOSWriter()
TransportMethod(static_cast<int>(ADIOS::TransportMethod_POSIX)),
TransportMethodArguments(NULL),
Transform(static_cast<int>(ADIOS::Transform_NONE)),
WriteMode(vtkADIOSWriter::Always), CurrentStep(-1), Controller(NULL),
CurrentStep(-1), Controller(NULL),
Writer(NULL),
NumberOfPieces(-1), RequestPiece(-1), NumberOfGhostLevels(-1),
WriteAllTimeSteps(false), TimeSteps(), CurrentTimeStepIndex(-1)
......@@ -150,26 +150,6 @@ bool vtkADIOSWriter::DefineAndWrite(vtkDataObject *input)
// Before any data can be writen, it's structure must be declared
this->Define("", data);
if(this->WriteMode == vtkADIOSWriter::OnChange)
{
// Set up the index for independently array stepping
this->BlockStepIndex.clear();
this->BlockStepIndex.resize(this->BlockStepIndexIdMap.size());
std::vector<ADIOS::ArrayDim> indexDims;
indexDims.push_back(ADIOS::ArrayDim(this->BlockStepIndexIdMap.size()));
this->Writer->DefineLocalArray<int>("::BlockStepIndex", indexDims);
// Gather all the block step index id maps to Rank 0
std::string BlockStepIndexIdMapAttr = this->GatherBlockStepIdMap();
if(localProc == 0)
{
this->Writer->DefineAttribute<std::string>("::BlockStepIndexIdMap",
BlockStepIndexIdMapAttr);
}
}
if(localProc == 0)
{
// Global time step is only used by Rank 0
......@@ -194,14 +174,7 @@ bool vtkADIOSWriter::DefineAndWrite(vtkDataObject *input)
}
}
std::memset(&*this->BlockStepIndex.begin(), 0xFF,
sizeof(vtkTypeInt64)*this->BlockStepIndex.size());
this->Write("", data);
if(this->WriteMode == vtkADIOSWriter::OnChange)
{
this->Writer->WriteArray("::BlockStepIndex", &this->BlockStepIndex[0]);
}
this->Writer->Commit(this->FileName, this->CurrentStep > 0);
}
catch(const ADIOS::WriteError &err)
......@@ -212,64 +185,6 @@ bool vtkADIOSWriter::DefineAndWrite(vtkDataObject *input)
return true;
}
//----------------------------------------------------------------------------
std::string vtkADIOSWriter::GatherBlockStepIdMap(void)
{
const int numProcs = this->Controller->GetNumberOfProcesses();
const int localProc = this->Controller->GetLocalProcessId();
// Encode into string containing:
// Block0_Id Var0_Id Var0_Name
// Block0_Id Var1_Id Var1_Name
// ...
// BlockN_Id VarM_Id VarM_Name
std::stringstream ss;
for(NameIdMap::const_iterator i = this->BlockStepIndexIdMap.begin();
i != this->BlockStepIndexIdMap.end(); ++i)
{
ss << localProc << ' ' << i->second << ' ' << i->first << '\n';
}
std::string sendBuf = ss.str();
vtkIdType sendBufLen = sendBuf.length();
// Gather the variable length buffer sizes
vtkIdType *recvLengths = localProc == 0 ? new vtkIdType[numProcs] : NULL;
this->Controller->Gather(&sendBufLen, recvLengths, 1, 0);
// Compute the recieving buffer sizes and offsets
vtkIdType fullLength = 0;
vtkIdType *recvOffsets = NULL;
char *recvBuffer = NULL;
if(localProc == 0)
{
recvOffsets = new vtkIdType[numProcs];
for(int p = 0; p < numProcs; ++p)
{
recvOffsets[p] = fullLength;
fullLength += recvLengths[p];
}
recvBuffer = new char[fullLength];
}
// Gather the index id maps from all processes
this->Controller->GatherV(sendBuf.c_str(), recvBuffer, sendBufLen,
recvLengths, recvOffsets, 0);
std::string recv;
if(localProc == 0)
{
// Strip the trailing \n to make null terminated and parse as an std::string
recvBuffer[fullLength-1] = '\0';
recv = recvBuffer;
// Cleanup
delete[] recvBuffer;
delete[] recvOffsets;
delete[] recvLengths;
}
return recv;
}
//----------------------------------------------------------------------------
bool vtkADIOSWriter::WriteInternal(void)
{
......@@ -441,11 +356,6 @@ void vtkADIOSWriter::Define(const std::string& path, const vtkAbstractArray* v)
this->Writer->DefineLocalArray(path,
ADIOS::Type::VTKToADIOS(valueTmp->GetDataType()), dims,
static_cast<ADIOS::Transform>(this->Transform));
if(this->WriteMode == vtkADIOSWriter::OnChange)
{
this->BlockStepIndexIdMap.insert(
std::make_pair(path, this->BlockStepIndexIdMap.size()));
}
}
//----------------------------------------------------------------------------
......@@ -580,7 +490,7 @@ bool vtkADIOSWriter::UpdateMTimeTable(const std::string path,
unsigned long mtimePrev = mtimeCurrent;
mtimeCurrent = mtimeNew;
return this->WriteMode == vtkADIOSWriter::Always || mtimeNew != mtimePrev;
return mtimeNew != mtimePrev;
}
//----------------------------------------------------------------------------
......@@ -602,22 +512,9 @@ void vtkADIOSWriter::Write(const std::string& path, const vtkAbstractArray* v)
size_t nc = valueTmp->GetNumberOfComponents();
size_t nt = valueTmp->GetNumberOfTuples();
// Skip empty arrays
//if(nc == 0 || nt == 0)
// {
// return;
// }
this->Writer->WriteScalar<size_t>(path+"#NC", nc);
this->Writer->WriteScalar<size_t>(path+"#NT", nt);
this->Writer->WriteArray(path, valueTmp->GetVoidPointer(0));
if(this->WriteMode == vtkADIOSWriter::OnChange)
{
this->BlockStepIndex[this->BlockStepIndexIdMap[path]] =
(static_cast<vtkTypeInt64>(this->CurrentStep) << 32) |
this->Controller->GetLocalProcessId();
}
}
//----------------------------------------------------------------------------
......@@ -628,7 +525,7 @@ void vtkADIOSWriter::Write(const std::string& path, const vtkDataArray* v)
if(lut)
{
// Only heck the mtime here if a LUT is present. Otherwise it will be
// Only check the mtime here if a LUT is present. Otherwise it will be
// handled apropriately by the abstract array writer
if(!this->UpdateMTimeTable(path, v))