Commit 409099d1 authored by Utkarsh Ayachit's avatar Utkarsh Ayachit
Browse files

refactor vtkCSVWriter to fix #19224.

Updating vtkCSVWriter to not pass-the-baton when writing, but instead
pass-the-data one rank at a time to the root node for writing. This
allows the writer to better handle misaligned columns, missing columns
etc.

Fixes #19224.
parent 218b98b3
......@@ -32,10 +32,12 @@
#include "vtkStreamingDemandDrivenPipeline.h"
#include "vtkTable.h"
#include <numeric>
#include <sstream>
#include <vector>
vtkStandardNewMacro(vtkCSVWriter);
vtkCxxSetObjectMacro(vtkCSVWriter, Controller, vtkMultiProcessController);
//-----------------------------------------------------------------------------
vtkCSVWriter::vtkCSVWriter()
{
......@@ -44,21 +46,22 @@ vtkCSVWriter::vtkCSVWriter()
this->UseStringDelimiter = true;
this->SetStringDelimiter("\"");
this->SetFieldDelimiter(",");
this->Stream = 0;
this->FileName = 0;
this->Precision = 5;
this->UseScientificNotation = true;
this->FieldAssociation = 0;
this->AddMetaData = false;
this->Controller = nullptr;
this->SetController(vtkMultiProcessController::GetGlobalController());
}
//-----------------------------------------------------------------------------
vtkCSVWriter::~vtkCSVWriter()
{
this->SetController(nullptr);
this->SetStringDelimiter(0);
this->SetFieldDelimiter(0);
this->SetFileName(0);
delete this->Stream;
}
//-----------------------------------------------------------------------------
......@@ -76,74 +79,37 @@ int vtkCSVWriter::ProcessRequest(
{
if (request->Has(vtkStreamingDemandDrivenPipeline::REQUEST_UPDATE_EXTENT()))
{
vtkMultiProcessController* controller = vtkMultiProcessController::GetGlobalController();
vtkInformation* inInfo = inputVector[0]->GetInformationObject(0);
inInfo->Set(vtkStreamingDemandDrivenPipeline::UPDATE_NUMBER_OF_PIECES(),
controller->GetNumberOfProcesses());
inInfo->Set(
vtkStreamingDemandDrivenPipeline::UPDATE_PIECE_NUMBER(), controller->GetLocalProcessId());
(this->Controller ? this->Controller->GetNumberOfProcesses() : 1));
inInfo->Set(vtkStreamingDemandDrivenPipeline::UPDATE_PIECE_NUMBER(),
(this->Controller ? this->Controller->GetLocalProcessId() : 0));
inInfo->Set(vtkStreamingDemandDrivenPipeline::UPDATE_NUMBER_OF_GHOST_LEVELS(), 0);
return 1;
}
return this->Superclass::ProcessRequest(request, inputVector, outputVector);
}
//-----------------------------------------------------------------------------
bool vtkCSVWriter::OpenFile(bool append)
{
if (!this->FileName)
{
vtkErrorMacro(<< "No FileName specified! Can't write!");
this->SetErrorCode(vtkErrorCode::NoFileNameError);
return false;
}
vtkDebugMacro(<< "Opening file for writing...");
ofstream* fptr = append == false ? new ofstream(this->FileName, ios::out)
: new ofstream(this->FileName, ios::out | ios::app);
if (fptr->fail())
{
vtkErrorMacro(<< "Unable to open file: " << this->FileName);
this->SetErrorCode(vtkErrorCode::CannotOpenFileError);
delete fptr;
return false;
}
this->Stream = fptr;
return true;
}
namespace
{
//-----------------------------------------------------------------------------
template <class iterT>
void vtkCSVWriterGetDataString(
iterT* iter, vtkIdType tupleIndex, ofstream* stream, vtkCSVWriter* writer, bool* first)
iterT* iter, vtkIdType tupleIndex, ofstream& stream, vtkCSVWriter* writer, bool* first)
{
int numComps = iter->GetNumberOfComponents();
vtkIdType index = tupleIndex * numComps;
for (int cc = 0; cc < numComps; cc++)
{
if ((index + cc) < iter->GetNumberOfValues())
if (!(*first))
{
if (*first == false)
{
(*stream) << writer->GetFieldDelimiter();
}
*first = false;
(*stream) << iter->GetValue(index + cc);
stream << writer->GetFieldDelimiter();
}
else
*first = false;
if ((index + cc) < iter->GetNumberOfValues())
{
if (*first == false)
{
(*stream) << writer->GetFieldDelimiter();
}
*first = false;
stream << iter->GetValue(index + cc);
}
}
}
......@@ -151,28 +117,20 @@ void vtkCSVWriterGetDataString(
//-----------------------------------------------------------------------------
template <>
void vtkCSVWriterGetDataString(vtkArrayIteratorTemplate<vtkStdString>* iter, vtkIdType tupleIndex,
ofstream* stream, vtkCSVWriter* writer, bool* first)
ofstream& stream, vtkCSVWriter* writer, bool* first)
{
int numComps = iter->GetNumberOfComponents();
vtkIdType index = tupleIndex * numComps;
for (int cc = 0; cc < numComps; cc++)
{
if ((index + cc) < iter->GetNumberOfValues())
if (!(*first))
{
if (*first == false)
{
(*stream) << writer->GetFieldDelimiter();
}
*first = false;
(*stream) << writer->GetString(iter->GetValue(index + cc));
stream << writer->GetFieldDelimiter();
}
else
(*first) = false;
if ((index + cc) < iter->GetNumberOfValues())
{
if (*first == false)
{
(*stream) << writer->GetFieldDelimiter();
}
*first = false;
stream << writer->GetString(iter->GetValue(index + cc));
}
}
}
......@@ -180,28 +138,20 @@ void vtkCSVWriterGetDataString(vtkArrayIteratorTemplate<vtkStdString>* iter, vtk
//-----------------------------------------------------------------------------
template <>
void vtkCSVWriterGetDataString(vtkArrayIteratorTemplate<char>* iter, vtkIdType tupleIndex,
ofstream* stream, vtkCSVWriter* writer, bool* first)
ofstream& stream, vtkCSVWriter* writer, bool* first)
{
int numComps = iter->GetNumberOfComponents();
vtkIdType index = tupleIndex * numComps;
for (int cc = 0; cc < numComps; cc++)
{
if ((index + cc) < iter->GetNumberOfValues())
if (!(*first))
{
if (*first == false)
{
(*stream) << writer->GetFieldDelimiter();
}
*first = false;
(*stream) << static_cast<int>(iter->GetValue(index + cc));
stream << writer->GetFieldDelimiter();
}
else
(*first) = false;
if ((index + cc) < iter->GetNumberOfValues())
{
if (*first == false)
{
(*stream) << writer->GetFieldDelimiter();
}
*first = false;
stream << static_cast<int>(iter->GetValue(index + cc));
}
}
}
......@@ -209,7 +159,7 @@ void vtkCSVWriterGetDataString(vtkArrayIteratorTemplate<char>* iter, vtkIdType t
//-----------------------------------------------------------------------------
template <>
void vtkCSVWriterGetDataString(vtkArrayIteratorTemplate<unsigned char>* iter, vtkIdType tupleIndex,
ofstream* stream, vtkCSVWriter* writer, bool* first)
ofstream& stream, vtkCSVWriter* writer, bool* first)
{
int numComps = iter->GetNumberOfComponents();
vtkIdType index = tupleIndex * numComps;
......@@ -219,92 +169,124 @@ void vtkCSVWriterGetDataString(vtkArrayIteratorTemplate<unsigned char>* iter, vt
{
if (*first == false)
{
(*stream) << writer->GetFieldDelimiter();
stream << writer->GetFieldDelimiter();
}
*first = false;
(*stream) << static_cast<int>(iter->GetValue(index + cc));
stream << static_cast<int>(iter->GetValue(index + cc));
}
else
{
if (*first == false)
{
(*stream) << writer->GetFieldDelimiter();
stream << writer->GetFieldDelimiter();
}
*first = false;
}
}
}
//-----------------------------------------------------------------------------
bool SomethingForMeToDo(int myRank, const std::vector<vtkIdType>& numRowsGlobal)
} // end anonymous namespace
class vtkCSVWriter::CSVFile
{
// there's something for me to do if either I have rows or I'm process 0
// and no process rows in which case I have to write the header
if (numRowsGlobal[myRank] > 0)
{
return true;
}
else if (myRank == 0)
ofstream Stream;
std::vector<std::pair<std::string, int> > ColumnInfo;
public:
int Open(const char* filename)
{
for (size_t i = 1; i < numRowsGlobal.size(); i++)
if (!filename)
{
if (numRowsGlobal[i] > 0)
{
return false; // someone else will write the header info
}
return vtkErrorCode::NoFileNameError;
}
return true; // I have to write the header info even though I have no rows
}
return false;
}
//-----------------------------------------------------------------------------
bool DoIWriteTheHeader(int myRank, const std::vector<vtkIdType>& numRowsGlobal)
{
for (int i = 0; i < myRank; i++)
{
if (numRowsGlobal[i])
this->Stream = ofstream(filename, ios::out);
if (this->Stream.fail())
{
return false; // someone before me has data and will write the header
return vtkErrorCode::CannotOpenFileError;
}
return vtkErrorCode::NoError;
}
return true; // note if process 0 is here it will write the header
}
//-----------------------------------------------------------------------------
void StartProcessWrite(
int myRank, const std::vector<vtkIdType>& numRowsGlobal, vtkMultiProcessController* controller)
{
if (DoIWriteTheHeader(myRank, numRowsGlobal) == false)
void WriteHeader(vtkTable* table, vtkCSVWriter* self)
{
this->WriteHeader(table->GetRowData(), self);
}
void WriteHeader(vtkDataSetAttributes* dsa, vtkCSVWriter* self)
{
int prevProc = myRank - 1;
while (static_cast<size_t>(prevProc) > 0 && numRowsGlobal[prevProc] == 0)
for (int cc = 0, numArrays = dsa->GetNumberOfArrays(); cc < numArrays; ++cc)
{
auto array = dsa->GetAbstractArray(cc);
const int num_comps = array->GetNumberOfComponents();
// save order of arrays written out in header
this->ColumnInfo.push_back(std::make_pair(std::string(array->GetName()), num_comps));
for (int comp = 0; comp < num_comps; ++comp)
{
if (cc > 0 || comp > 0)
{
// add separator for all but the very first column
this->Stream << self->GetFieldDelimiter();
}
std::ostringstream array_name;
array_name << array->GetName();
if (array->GetNumberOfComponents() > 1)
{
array_name << ":" << comp;
}
this->Stream << self->GetString(array_name.str());
}
}
this->Stream << "\n";
// push the floating point precision/notation type.
if (self->GetUseScientificNotation())
{
prevProc--;
this->Stream << std::scientific;
}
int tmp = 0; // just used for the blocking send/receive
controller->Receive(&tmp, 1, prevProc, 11419);
this->Stream << std::setprecision(self->GetPrecision());
}
}
//-----------------------------------------------------------------------------
void EndProcessWrite(
int myRank, const std::vector<vtkIdType>& numRowsGlobal, vtkMultiProcessController* controller)
{
int nextProc = myRank + 1;
while (static_cast<size_t>(nextProc) < numRowsGlobal.size() && numRowsGlobal[nextProc] == 0)
void WriteData(vtkTable* table, vtkCSVWriter* self)
{
nextProc++;
this->WriteData(table->GetRowData(), self);
}
if (static_cast<size_t>(nextProc) < numRowsGlobal.size())
void WriteData(vtkDataSetAttributes* dsa, vtkCSVWriter* self)
{
int tmp = 0; // just used for the blocking send/receive
controller->Send(&tmp, 1, nextProc, 11419);
}
}
std::vector<vtkSmartPointer<vtkArrayIterator> > columnsIters;
for (const auto& cinfo : this->ColumnInfo)
{
auto array = dsa->GetAbstractArray(cinfo.first.c_str());
if (array->GetNumberOfComponents() != cinfo.second)
{
vtkErrorWithObjectMacro(self, "Mismatched components for '" << array->GetName() << "'!");
}
vtkArrayIterator* iter = array->NewIterator();
columnsIters.push_back(iter);
iter->FastDelete();
}
} // end anonymous namespace
const auto num_tuples = dsa->GetNumberOfTuples();
for (vtkIdType cc = 0; cc < num_tuples; ++cc)
{
bool first_column = true;
for (auto& iter : columnsIters)
{
switch (iter->GetDataType())
{
vtkArrayIteratorTemplateMacro(vtkCSVWriterGetDataString(
static_cast<VTK_TT*>(iter.GetPointer()), cc, this->Stream, self, &first_column));
}
}
this->Stream << "\n";
}
}
};
//-----------------------------------------------------------------------------
vtkStdString vtkCSVWriter::GetString(vtkStdString string)
......@@ -338,99 +320,127 @@ void vtkCSVWriter::WriteData()
table = mergeTables->GetOutput();
}
}
this->WriteTable(table);
}
//-----------------------------------------------------------------------------
void vtkCSVWriter::WriteTable(vtkTable* table)
{
vtkIdType numRows = table->GetNumberOfRows();
vtkDataSetAttributes* dsa = table->GetRowData();
vtkMultiProcessController* controller = vtkMultiProcessController::GetGlobalController();
int myRank = controller->GetLocalProcessId();
int numProcs = controller->GetNumberOfProcesses();
std::vector<vtkIdType> numRowsGlobal(numProcs, 0);
numRowsGlobal[myRank] = numRows;
assert(table != nullptr);
controller->AllGather(&numRows, numRowsGlobal.data(), 1);
if (SomethingForMeToDo(myRank, numRowsGlobal) == false)
auto controller = this->Controller;
if (controller == nullptr ||
(controller->GetNumberOfProcesses() == 1 && controller->GetLocalProcessId() == 0))
{
vtkCSVWriter::CSVFile file;
int error_code = file.Open(this->FileName);
if (error_code == vtkErrorCode::NoError)
{
file.WriteHeader(table, this);
file.WriteData(table, this);
}
this->SetErrorCode(error_code);
return;
}
StartProcessWrite(myRank, numRowsGlobal, controller);
bool writeHeader = DoIWriteTheHeader(myRank, numRowsGlobal);
if (!this->OpenFile(!writeHeader))
const int myRank = controller->GetLocalProcessId();
const int numRanks = controller->GetNumberOfProcesses();
if (myRank > 0)
{
EndProcessWrite(myRank, numRowsGlobal, controller); // to make sure we don't have a deadlock
return;
}
int error_code{ vtkErrorCode::NoError };
controller->Broadcast(&error_code, 1, 0);
if (error_code != vtkErrorCode::NoError)
{
this->SetErrorCode(error_code);
return;
}
std::vector<vtkSmartPointer<vtkArrayIterator> > columnsIters;
vtkIdType row_count = table->GetNumberOfRows();
controller->Gather(&row_count, nullptr, 1, 0);
if (row_count > 0)
{
vtkNew<vtkTable> clone;
auto cloneRD = clone->GetRowData();
cloneRD->CopyAllOn();
cloneRD->CopyAllocate(table->GetRowData(), /*sze=*/1);
cloneRD->CopyData(table->GetRowData(), 0, 1, 0);
// send clone first so the root can determine which arrays to save to the
// output file consistently.
controller->Send(clone, 0, 88020);
}
// BARRIER
controller->Barrier();
int cc;
int numArrays = dsa->GetNumberOfArrays();
bool first = true;
// Keep track of which arrays to output in columnsIters and write header:
for (cc = 0; cc < numArrays; cc++)
if (row_count > 0)
{
controller->Send(table, 0, 88021);
}
controller->Broadcast(&error_code, 1, 0);
this->SetErrorCode(error_code);
}
else
{
vtkAbstractArray* array = dsa->GetAbstractArray(cc);
for (int comp = 0; comp < array->GetNumberOfComponents(); comp++)
vtkCSVWriter::CSVFile file;
int error_code = file.Open(this->FileName);
controller->Broadcast(&error_code, 1, 0);
if (error_code != vtkErrorCode::NoError)
{
this->SetErrorCode(error_code);
return;
}
const vtkIdType row_count = table->GetNumberOfRows();
std::vector<vtkIdType> global_row_counts(numRanks, 0);
controller->Gather(&row_count, &global_row_counts[0], 1, 0);
// build field list to determine which columns to write.
vtkDataSetAttributes::FieldList columns;
for (int rank = 0; rank < numRanks; ++rank)
{
if (writeHeader)
if (global_row_counts[rank] > 0)
{
if (!first)
if (rank == 0)
{
(*this->Stream) << this->FieldDelimiter;
columns.IntersectFieldList(table->GetRowData());
}
first = false;
std::ostringstream array_name;
array_name << array->GetName();
if (array->GetNumberOfComponents() > 1)
else
{
array_name << ":" << comp;
vtkNew<vtkTable> emptytable;
controller->Receive(emptytable, vtkMultiProcessController::ANY_SOURCE, 88020);
columns.IntersectFieldList(emptytable->GetRowData());
}
(*this->Stream) << this->GetString(array_name.str());
}
}
vtkArrayIterator* iter = array->NewIterator();
columnsIters.push_back(iter);
iter->Delete();
}
if (writeHeader)
{
(*this->Stream) << "\n";
}
// push the floating point precision/notation type.
if (this->UseScientificNotation)
{
(*this->Stream) << std::scientific;
}
// BARRIER
controller->Barrier();
(*this->Stream) << std::setprecision(this->Precision);
// now write the real data.
vtkNew<vtkDataSetAttributes> tmp;
tmp->CopyAllOn();
columns.CopyAllocate(tmp, vtkDataSetAttributes::PASSDATA, /*sz=*/1, 0);
for (vtkIdType index = 0; index < numRows; index++)
{
first = true;
std::vector<vtkSmartPointer<vtkArrayIterator> >::iterator iter;
for (iter = columnsIters.begin(); iter != columnsIters.end(); ++iter)
// first write headers.
file.WriteHeader(tmp, this);
for (int rank = 0; rank < numRanks; ++rank)
{
switch ((*iter)->GetDataType())
if (global_row_counts[rank] > 0)
{
vtkArrayIteratorTemplateMacro(vtkCSVWriterGetDataString(
static_cast<VTK_TT*>(iter->GetPointer()), index, this->Stream, this, &first));
if (rank == 0)
{
file.WriteData(table, this);
}
else
{
vtkNew<vtkTable> remote_table;
controller->Receive(remote_table.Get(), rank, 88021);
assert(remote_table->GetNumberOfRows() > 0);
file.WriteData(remote_table, this);
}
}
}
(*this->Stream) << "\n";
}
this->Stream->close();
EndProcessWrite(myRank, numRowsGlobal, controller);
error_code = vtkErrorCode::NoError;
controller->Broadcast(&error_code, 1, 0);
this->SetErrorCode(error_code);
}
}
//-----------------------------------------------------------------------------
......@@ -447,4 +457,12 @@ void vtkCSVWriter::PrintSelf(ostream& os, vtkIndent indent)
os << indent << "Precision: " << this->Precision << endl;
os << indent << "FieldAssociation: " << this->FieldAssociation << endl;
os << indent << "AddMetaData: " << this->AddMetaData << endl;
if (this->Controller)
{
os << indent << "Controller: " << this->Controller << endl;
}
else
{
os << indent << "Controller: (none)" << endl;
}
}
......@@ -24,6 +24,7 @@
#include "vtkPVVTKExtensionsDefaultModule.h" //needed for exports
#include "vtkWriter.h"