Commit 13804aed authored by David Thompson's avatar David Thompson

Get CDF working as a configurable analysis.

This eliminates all but token dependence on VTKm in favor of
VTK's vtkSortDataArray class (which uses vtkSMPTools).
However, it has the advantage of handling different types of
data arrays, whereas the previous VTK-m worklet insisted on
being provided doubles.
parent 571a82ed
<sensei>
<!--
Catalyst particle CDF example
This XML configures an MPI-based computation to obtain
inter-quantile boundaries. Specify the number of quantiles
you want with the `quantiles` attribute.
The output of the analysis is a Cinema database holding
the CDF approximation.
Specify the `working-directory` to indicate the directory
to hold the database.
Note that for now, if you want multiple CDF analyses to
be saved from a single run, you must set the
`working-directory` attribute to a different directory
for each one.
-->
<analysis
enabled="1"
type="cdf"
mesh="mesh"
field="data"
association="cell"
quantiles="50"
working-directory="/tmp/data-cdf"
/>
<analysis
enabled="1"
type="cdf"
mesh="mesh"
field="data"
association="cell"
quantiles="50"
working-directory="/tmp/velocity-cdf"
/>
</sensei>
#include <limits>
#include <vtkCommunicator.h>
#include <vtkMultiProcessController.h>
#include "CDFReducer.h"
// ----------------------------------------------------------------------------
struct Entry
{
Entry()
: GlobalID(0)
, Value(0)
{
}
Entry(vtkIdType id, double value)
: GlobalID(id)
, Value(value)
{
}
bool operator<(Entry const& other) const
{
if (this->Value == other.Value)
{
return this->GlobalID > other.GlobalID;
}
return this->Value > other.Value;
}
vtkIdType GlobalID;
double Value;
};
// ----------------------------------------------------------------------------
// ----------------------------------------------------------------------------
CDFReducer::~CDFReducer()
{
this->LocalValues = nullptr; // We are not the owner of that array
if (this->ReducedCDF != nullptr)
{
delete[] this->ReducedCDF;
this->ReducedCDF = nullptr;
}
if (this->CDFOffsets != nullptr)
{
delete[] this->CDFOffsets;
this->CDFOffsets = nullptr;
}
if (this->LocalCDF != nullptr)
{
delete[] this->LocalCDF;
this->LocalCDF = nullptr;
}
if (this->RemoteCDFs != nullptr)
{
delete[] this->RemoteCDFs;
this->RemoteCDFs = nullptr;
}
}
// ----------------------------------------------------------------------------
double CDFReducer::GetValueAtIndex(vtkIdType targetIdx, vtkIdType pid, vtkIdType mpiSize, int depth)
{
double lastValue = 0;
this->ExecutionCount++;
this->Handler.Reset(targetIdx);
this->CDFStep = this->Handler.Step;
// Fill local CDF
vtkIdType localOffset = this->CDFOffsets[pid];
for (vtkIdType i = 0; i < this->CDFSize; i++)
{
vtkIdType gid = localOffset + this->CDFStep + (i * this->CDFStep);
this->LocalCDF[i] =
(gid < this->ArraySize) ? this->LocalValues[gid] : std::numeric_limits<double>::max();
}
// Gather CDFs to pid(0)
this->Controller->Gather(this->LocalCDF, this->RemoteCDFs, this->CDFSize, 0);
// Only pid(0)
if (pid == 0)
{
size_t size = (size_t)(this->CDFSize * mpiSize);
std::vector<Entry> sortedCDFs(size);
for (size_t i = 0; i < size; i++)
{
sortedCDFs[i].GlobalID = (vtkIdType)i;
sortedCDFs[i].Value = this->RemoteCDFs[i];
}
std::sort(sortedCDFs.begin(), sortedCDFs.end()); // In reverse
// Move forward only if no skip possible
while (this->Handler.Move(sortedCDFs.back().GlobalID / this->CDFSize))
{
// std::cout << sortedCDFs.back().GlobalID << " => " << sortedCDFs.back().GlobalID / this->CDFSize << std::endl;
lastValue = sortedCDFs.back().Value;
sortedCDFs.pop_back();
}
// Synch up offsets
this->Controller->Broadcast(this->CDFOffsets, mpiSize, 0);
}
else
{
// Just update offset from 0
this->Controller->Broadcast(this->CDFOffsets, mpiSize, 0);
this->Handler.UpdateCurrentIndex();
}
// if (pid == 0)
// {
// for (int i = 0; i < depth; i++)
// {
// std::cout << " ";
// }
// std::cout << " - index: " << this->Handler.CurrentIndex << " - target: " << targetIdx << std::endl;
// }
if (this->Handler.CurrentIndex == targetIdx)
{
return lastValue;
}
else
{
// Need to refine
return this->GetValueAtIndex(targetIdx, pid, mpiSize, depth + 1);
}
}
// ----------------------------------------------------------------------------
double* CDFReducer::Compute(double* localSortedValues,
vtkIdType localArraySize,
vtkIdType outputCDFSize)
{
// Initialization
vtkIdType MPI_ID = this->Controller->GetLocalProcessId();
vtkIdType MPI_SIZE = this->Controller->GetNumberOfProcesses();
if (this->LocalCDF == nullptr)
{
this->LocalCDF = new double[this->CDFSize];
}
if (this->RemoteCDFs == nullptr)
{
this->RemoteCDFs = new double[MPI_SIZE * this->CDFSize];
}
if (this->CDFOffsets == nullptr)
{
this->CDFOffsets = new vtkIdType[MPI_SIZE];
}
for (vtkIdType idx = 0; idx < MPI_SIZE; idx++)
{
this->CDFOffsets[idx] = 0;
}
this->ArraySize = localArraySize;
this->LocalValues = localSortedValues;
if (this->ReducedCDF == nullptr || this->ReducedCDFSize != outputCDFSize)
{
delete[] this->ReducedCDF;
this->ReducedCDF = new double[outputCDFSize];
}
this->ReducedCDFSize = outputCDFSize;
// Share basic information (min, max, counts)
double globalMin, globalMax;
this->Controller->AllReduce(&this->LocalValues[0], &globalMin, 1, vtkCommunicator::MIN_OP);
this->Controller->AllReduce(
&this->LocalValues[localArraySize - 1], &globalMax, 1, vtkCommunicator::MAX_OP);
this->Controller->AllReduce(&localArraySize, &this->TotalCount, 1, vtkCommunicator::SUM_OP);
this->ReducedCDF[0] = globalMin;
this->ReducedCDF[outputCDFSize - 1] = globalMax;
// Look for indexes we will search value for
vtkIdType splitSize = outputCDFSize - 1;
// Proper init
this->Handler.Init(this->CDFOffsets, MPI_SIZE, this->CDFSize);
// Fill resulting CDF
double avgCallstack = 0;
for (vtkIdType i = 1; i < splitSize; i++)
{
this->ExecutionCount = 0;
this->ReducedCDF[i] = this->GetValueAtIndex(i * this->TotalCount / splitSize, MPI_ID, MPI_SIZE, 1);
avgCallstack += double(this->ExecutionCount);
}
avgCallstack /= (splitSize - 1);
if (MPI_ID == 0)
{
std::cout << "Avg stack size: " << avgCallstack << std::endl;
}
return this->ReducedCDF;
}
\ No newline at end of file
//============================================================================
// Copyright (c) Kitware, Inc.
// All rights reserved.
// See LICENSE.txt for details.
// This software is distributed WITHOUT ANY WARRANTY; without even
// the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
// PURPOSE. See the above copyright notice for more information.
//
// Copyright 2014 Sandia Corporation.
// Copyright 2014 UT-Battelle, LLC.
// Copyright 2014 Los Alamos National Security.
//
// Under the terms of Contract DE-AC04-94AL85000 with Sandia Corporation,
// the U.S. Government retains certain rights in this software.
//
// Under the terms of Contract DE-AC52-06NA25396 with Los Alamos National
// Laboratory (LANL), the U.S. Government retains certain rights in
// this software.
//============================================================================
#ifndef CDFReducer_h
#define CDFReducer_h
class vtkMultiProcessController;
struct StepHandler
{
StepHandler()
: Offsets(nullptr)
, MPISize(0)
, CDFSize(0)
, ProcessStepCount(nullptr)
{}
~StepHandler()
{
delete[] this->ProcessStepCount;
this->ProcessStepCount = nullptr;
}
void Init(vtkIdType* &offsets, vtkIdType mpiSize, vtkIdType cdfSize)
{
this->Offsets = offsets;
this->MPISize = mpiSize;
this->CDFSize = cdfSize;
this->ProcessStepCount = new vtkIdType[mpiSize];
}
void Reset(vtkIdType targetIdx)
{
this->TargetIdx = targetIdx;
this->CurrentIndex = 0;
this->Step = 1;
this->KeepMoving = true;
// Init counts
for (int i = 0; i < this->MPISize; i++)
{
this->ProcessStepCount[i] = 0;
}
// Figure out the current index based on offsets
this->UpdateCurrentIndex();
// Compute best Step size
this->Step = (this->TargetIdx - this->CurrentIndex) / this->CDFSize;
if (this->Step < 2)
{
this->Step = 1;
this->ErrorStep = 0;
}
else
{
this->ErrorStep = this->MPISize * (this->Step - 1);
}
}
bool CanMoveForward()
{
return this->KeepMoving;
}
bool Move(vtkIdType processId)
{
if (!this->KeepMoving)
{
return this->KeepMoving;
}
if (this->Step + this->CurrentIndex > this->TargetIdx - this->ErrorStep) {
return (this->KeepMoving = false);
}
if (++this->ProcessStepCount[processId] == this->CDFSize)
{
return (this->KeepMoving = false);
}
this->Offsets[processId] += this->Step;
this->CurrentIndex += this->Step;
return this->KeepMoving;
}
void UpdateCurrentIndex()
{
this->CurrentIndex = 0;
for (vtkIdType idx = 0; idx < this->MPISize; idx++)
{
this->CurrentIndex += this->Offsets[idx];
}
}
vtkIdType* Offsets;
vtkIdType TargetIdx;
vtkIdType MPISize;
vtkIdType CDFSize;
vtkIdType CurrentIndex;
vtkIdType Step;
bool KeepMoving;
vtkIdType* ProcessStepCount;
vtkIdType ErrorStep;
};
class CDFReducer
{
public:
CDFReducer(vtkMultiProcessController* controller)
: Controller(controller)
, ArraySize(0)
, LocalValues(nullptr)
, ReducedCDF(nullptr)
, ReducedCDFSize(0)
, TotalCount(0)
, CDFOffsets(nullptr)
, CDFStep(1)
, CDFSize(512)
, LocalCDF(nullptr)
, RemoteCDFs(nullptr)
{};
~CDFReducer();
double* Compute(double* localSortedValues, vtkIdType localArraySize, vtkIdType outputCDFSize);
vtkIdType GetBufferSize() { return this->CDFSize; };
void SetBufferSize(vtkIdType exchangeCDFSize) { this->CDFSize = exchangeCDFSize; };
vtkIdType GetTotalCount() { return this->TotalCount; };
protected:
double GetValueAtIndex(vtkIdType targetIdx, vtkIdType pid, vtkIdType mpiSize, int depth);
private:
vtkMultiProcessController* Controller;
vtkIdType ArraySize;
double* LocalValues;
double* ReducedCDF;
vtkIdType ReducedCDFSize;
vtkIdType TotalCount;
//
vtkIdType* CDFOffsets;
vtkIdType CDFStep;
vtkIdType CDFSize;
double* LocalCDF;
double* RemoteCDFs;
//
int ExecutionCount;
//
StepHandler Handler;
};
#endif
\ No newline at end of file
......@@ -22,7 +22,7 @@ if (ENABLE_SENSEI)
if (ENABLE_VTKM)
list(APPEND senseiCore_sources VTKmVolumeReductionAnalysis.cxx
CinemaHelper.cxx)
VTKmCDFAnalysis.cxx CDFReducer.cxx CinemaHelper.cxx)
list(APPEND senseiCore_libs vtkm_cont)
endif()
......
......@@ -18,6 +18,7 @@
#endif
#ifdef ENABLE_VTKM
#include "VTKmVolumeReductionAnalysis.h"
#include "VTKmCDFAnalysis.h"
#endif
#ifdef ENABLE_ADIOS
#include "ADIOSAnalysisAdaptor.h"
......@@ -149,6 +150,7 @@ struct ConfigurableAnalysis::InternalsType
int AddHistogram(pugi::xml_node node);
int AddVTKmContour(pugi::xml_node node);
int AddVTKmVolumeReduction(pugi::xml_node node);
int AddVTKmCDF(pugi::xml_node node);
int AddAdios(pugi::xml_node node);
int AddCatalyst(pugi::xml_node node);
int AddLibsim(pugi::xml_node node);
......@@ -322,6 +324,46 @@ int ConfigurableAnalysis::InternalsType::AddVTKmVolumeReduction(pugi::xml_node n
#endif
}
// --------------------------------------------------------------------------
int ConfigurableAnalysis::InternalsType::AddVTKmCDF(pugi::xml_node node)
{
#ifndef ENABLE_VTKM
(void)node;
SENSEI_ERROR("VTK-m analysis was requested but is disabled in this build")
return -1;
#else
if (
requireAttribute(node, "mesh") ||
requireAttribute(node, "field") ||
requireAttribute(node, "association")
)
{
SENSEI_ERROR("Failed to initialize VTKmCDFAnalysis");
return -1;
}
auto mesh = node.attribute("mesh").as_string();
auto field = node.attribute("field").as_string();
auto assoc = node.attribute("association").as_string();
auto quantiles = node.attribute("quantiles").as_int(10);
auto exchangeSize = node.attribute("exchange-size").as_int(quantiles);
bool haveWorkDir = !!node.attribute("working-directory");
std::string workDir = haveWorkDir ? node.attribute("working-directory").as_string() : ".";
auto analysis = vtkSmartPointer<VTKmCDFAnalysis>::New();
this->TimeInitialization(analysis, [&]() {
analysis->Initialize(mesh, field, assoc, workDir, quantiles, exchangeSize, this->Comm);
return 0;
});
this->Analyses.push_back(analysis.GetPointer());
SENSEI_STATUS("Configured VTKmCDFAnalysis " << mesh << "/" << field)
return 0;
#endif
}
// --------------------------------------------------------------------------
int ConfigurableAnalysis::InternalsType::AddAdios(pugi::xml_node node)
{
......@@ -900,6 +942,7 @@ int ConfigurableAnalysis::Initialize(const std::string& filename)
|| ((type == "VTKAmrWriter") && !this->Internals->AddVTKAmrWriter(node))
|| ((type == "vtkmcontour") && !this->Internals->AddVTKmContour(node))
|| ((type == "vtkmhaar") && !this->Internals->AddVTKmVolumeReduction(node))
|| ((type == "cdf") && !this->Internals->AddVTKmCDF(node))
|| ((type == "python") && !this->Internals->AddPythonAnalysis(node))))
{
if (rank == 0)
......
#include "VTKmCDFAnalysis.h"
#include "CDFReducer.h"
#include "CinemaHelper.h"
#include "DataAdaptor.h"
#include <Timer.h>
#include <Error.h>
#include <vtkDataSet.h>
#include <vtkCellData.h>
#include <vtkIntArray.h>
#include <vtkSortDataArray.h>
#include <vtkDoubleArray.h>
#include <vtkImageData.h>
#ifdef ENABLE_VTK_MPI
# include <vtkMPICommunicator.h>
# include <vtkMPIController.h>
#endif
#include <vtkMultiProcessController.h>
#include <vtkMultiBlockDataSet.h>
#include <vtkNew.h>
#include <vtkObjectFactory.h>
#include <vtkPointData.h>
#include <vtkSmartPointer.h>
#include <algorithm>
#include <vector>
// --- vtkm ---
#include <vtkm/Math.h>
#include <vtkm/cont/ArrayHandle.h>
#include <vtkm/cont/ArrayHandleCounting.h>
#include <vtkm/cont/DataSetBuilderUniform.h>
#include <vtkm/filter/FilterDataSet.h>
#include <vtkm/worklet/DispatcherPointNeighborhood.h>
#include <vtkm/worklet/ScatterPermutation.h>
#include <vtkm/worklet/WorkletPointNeighborhood.h>
#include <vtkm/cont/TryExecute.h>
#include <vtkm/cont/cuda/DeviceAdapterCuda.h>
#include <vtkm/cont/serial/DeviceAdapterSerial.h>
#include <vtkm/cont/tbb/DeviceAdapterTBB.h>
namespace sensei
{
//-----------------------------------------------------------------------------
senseiNewMacro(VTKmCDFAnalysis);
//-----------------------------------------------------------------------------
VTKmCDFAnalysis::VTKmCDFAnalysis()
: Communicator(MPI_COMM_WORLD)
, Helper(nullptr)
, NumberOfQuantiles(10)
, RequestSize(10)
{
}
//-----------------------------------------------------------------------------
VTKmCDFAnalysis::~VTKmCDFAnalysis()
{
delete this->Helper;
}
//-----------------------------------------------------------------------------
void VTKmCDFAnalysis::Initialize(
const std::string& meshName,
const std::string& fieldName,
const std::string& fieldAssoc,
const std::string& workingDirectory,
int numberOfQuantiles,
int requestSize,
MPI_Comm comm
)
{
this->MeshName = meshName;
this->FieldName = fieldName;
this->FieldAssoc = fieldAssoc == "cell" ?
vtkm::cont::Field::Association::CELL_SET :
vtkm::cont::Field::Association::POINTS;
this->Communicator = comm;
this->NumberOfQuantiles = numberOfQuantiles;
this->RequestSize = requestSize;
#ifdef ENABLE_VTK_MPI
vtkNew<vtkMPIController> con;
con->Initialize(0, 0, 1); // initialized externally
vtkMultiProcessController::SetGlobalController(con.GetPointer());
con->Register(NULL); // Keep ref
#endif
this->Helper = new CinemaHelper();
this->Helper->SetWorkingDirectory(workingDirectory);
this->Helper->SetExportType("cdf");
this->Helper->SetSampleSize(this->NumberOfQuantiles);
}
//-----------------------------------------------------------------------------
bool VTKmCDFAnalysis::Execute(DataAdaptor* data)
{
timer::MarkEvent mark("VTKmCDFAnalysis::execute");
this->Helper->AddTimeEntry();
// Get the mesh from the simulation:
vtkDataObject* mesh = nullptr;
if (data->GetMesh(this->MeshName, /*structure_only*/true, mesh) || !mesh)
{
return false;
}
// Tell the simulation to add the array we want:
data->AddArray(
mesh, this->MeshName,
this->FieldAssoc == vtkm::cont::Field::Association::POINTS ?
vtkDataObject::POINT : vtkDataObject::CELL,
this->FieldName);
// Now ask the mesh for the array:
vtkDataArray* array = nullptr;
if (mesh && this->FieldAssoc == vtkm::cont::Field::Association::WHOLE_MESH)
{
array = mesh->GetFieldData()->GetArray(this->FieldName.c_str());
}
auto dataset = vtkDataSet::SafeDownCast(mesh);
if (!array && dataset)
{
if (this->FieldAssoc == vtkm::cont::Field::Association::POINTS)
{
array = dataset->GetPointData()->GetArray(this->FieldName.c_str());
}
else if (this->FieldAssoc == vtkm::cont::Field::Association::CELL_SET)
{
array = dataset->GetCellData()->GetArray(this->FieldName.c_str());
}
}
if (!array)
{
vtkMultiBlockDataSet* blocks = vtkMultiBlockDataSet::SafeDownCast(mesh);
for(unsigned int i = 0; !array && i < blocks->GetNumberOfBlocks(); i++)
{
auto dobj = blocks->GetBlock(i);
if (dobj && this->FieldAssoc == vtkm::cont::Field::Association::WHOLE_MESH)
{
array = dobj->GetFieldData()->GetArray(this->FieldName.c_str());
if (array)
{
break;
}
}
auto dataset = vtkDataSet::SafeDownCast(dobj);
if (dataset && this->FieldAssoc == vtkm::cont::Field::Association::POINTS)
{
array = dataset->GetPointData()->GetArray(this->FieldName.c_str());
}
else if (dataset && this->FieldAssoc == vtkm::cont::Field::Association::CELL_SET)
{
array = dataset->GetCellData()->GetArray(this->FieldName.c_str());
}
}
}
if (!array)
{
SENSEI_ERROR("Could not obtain array \"" << this->FieldName << "\" from data adaptor.");
return false;
}
if (array->GetNumberOfComponents() > 1)
{
SENSEI_ERROR("Cannot compute CDF of multi-component (vector, non-scalar) array.");
return false;
}
if (this->NumberOfQuantiles <= 0)
{
SENSEI_ERROR("Invalid CDF request (bad number of quantiles).");
return false;
}
timer::MarkStartEvent("VTKm CDF");
vtkNew<vtkDoubleArray> sorted;
sorted->DeepCopy(array);
vtkSortDataArray::SortArrayByComponent(sorted, 0);
#ifdef ENABLE_VTK_MPI
vtkNew<vtkMPIController> controller;
#else
vtkNew<vtkMultiProcessController> controller;
#endif
CDFReducer reducer(controller);
reducer.SetBufferSize(this->RequestSize);
double* cdf = reducer.Compute(sorted->GetPointer(0), sorted->GetNumberOfTuples(), this->NumberOfQuantiles);
timer::MarkEndEvent("VTKm CDF");
timer::MarkStartEvent("Cinema CDF export");
this->Helper->WriteCDF(this->NumberOfQuantiles, cdf);
this->Helper->WriteMetadata();
timer::MarkEndEvent("Cinema CDF export");
return true;
}
}
#ifndef sensei_VTKmCDFAnalysis_h
#define sensei_VTKmCDFAnalysis_h
#include "AnalysisAdaptor.h"
#include <vtkm/cont/Field.h>
#include <mpi.h>
namespace sensei
{
class CinemaHelper;
class VTKmCDFAnalysis : public AnalysisAdaptor
{
public:
static VTKmCDFAnalysis* New();
senseiTypeMacro(VTKmCDFAnalysis, AnalysisAdaptor);
void Initialize(
const std::string& meshName,
const std::string& fieldName,
const std::string& fieldAssoc,
const std::string& workingDirectory,
int numberOfQuantiles,
int requestSize,
MPI_Comm comm);
bool Execute(DataAdaptor* data) override;
int Finalize() override { return 0; }
protected: