Commit 612f2188 authored by David Thompson's avatar David Thompson

Merge branch 'vtk-cdf' into 'master'

Get CDF working as a configurable analysis.

See merge request !104
parents 571a82ed 13804aed
<sensei>
<!--
Catalyst particle CDF example
This XML configures an MPI-based computation to obtain
inter-quantile boundaries. Specify the number of quantiles
you want with the `quantiles` attribute.
The output of the analysis is a Cinema database holding
the CDF approximation.
Specify the `working-directory` to indicate the directory
to hold the database.
Note that for now, if you want multiple CDF analyses to
be saved from a single run, you must set the
`working-directory` attribute to a different directory
for each one.
-->
<analysis
enabled="1"
type="cdf"
mesh="mesh"
field="data"
association="cell"
quantiles="50"
working-directory="/tmp/data-cdf"
/>
<analysis
enabled="1"
type="cdf"
mesh="mesh"
field="data"
association="cell"
quantiles="50"
working-directory="/tmp/velocity-cdf"
/>
</sensei>
#include <limits>
#include <vtkCommunicator.h>
#include <vtkMultiProcessController.h>
#include "CDFReducer.h"
// ----------------------------------------------------------------------------
struct Entry
{
Entry()
: GlobalID(0)
, Value(0)
{
}
Entry(vtkIdType id, double value)
: GlobalID(id)
, Value(value)
{
}
bool operator<(Entry const& other) const
{
if (this->Value == other.Value)
{
return this->GlobalID > other.GlobalID;
}
return this->Value > other.Value;
}
vtkIdType GlobalID;
double Value;
};
// ----------------------------------------------------------------------------
// ----------------------------------------------------------------------------
CDFReducer::~CDFReducer()
{
this->LocalValues = nullptr; // We are not the owner of that array
if (this->ReducedCDF != nullptr)
{
delete[] this->ReducedCDF;
this->ReducedCDF = nullptr;
}
if (this->CDFOffsets != nullptr)
{
delete[] this->CDFOffsets;
this->CDFOffsets = nullptr;
}
if (this->LocalCDF != nullptr)
{
delete[] this->LocalCDF;
this->LocalCDF = nullptr;
}
if (this->RemoteCDFs != nullptr)
{
delete[] this->RemoteCDFs;
this->RemoteCDFs = nullptr;
}
}
// ----------------------------------------------------------------------------
double CDFReducer::GetValueAtIndex(vtkIdType targetIdx, vtkIdType pid, vtkIdType mpiSize, int depth)
{
double lastValue = 0;
this->ExecutionCount++;
this->Handler.Reset(targetIdx);
this->CDFStep = this->Handler.Step;
// Fill local CDF
vtkIdType localOffset = this->CDFOffsets[pid];
for (vtkIdType i = 0; i < this->CDFSize; i++)
{
vtkIdType gid = localOffset + this->CDFStep + (i * this->CDFStep);
this->LocalCDF[i] =
(gid < this->ArraySize) ? this->LocalValues[gid] : std::numeric_limits<double>::max();
}
// Gather CDFs to pid(0)
this->Controller->Gather(this->LocalCDF, this->RemoteCDFs, this->CDFSize, 0);
// Only pid(0)
if (pid == 0)
{
size_t size = (size_t)(this->CDFSize * mpiSize);
std::vector<Entry> sortedCDFs(size);
for (size_t i = 0; i < size; i++)
{
sortedCDFs[i].GlobalID = (vtkIdType)i;
sortedCDFs[i].Value = this->RemoteCDFs[i];
}
std::sort(sortedCDFs.begin(), sortedCDFs.end()); // In reverse
// Move forward only if no skip possible
while (this->Handler.Move(sortedCDFs.back().GlobalID / this->CDFSize))
{
// std::cout << sortedCDFs.back().GlobalID << " => " << sortedCDFs.back().GlobalID / this->CDFSize << std::endl;
lastValue = sortedCDFs.back().Value;
sortedCDFs.pop_back();
}
// Synch up offsets
this->Controller->Broadcast(this->CDFOffsets, mpiSize, 0);
}
else
{
// Just update offset from 0
this->Controller->Broadcast(this->CDFOffsets, mpiSize, 0);
this->Handler.UpdateCurrentIndex();
}
// if (pid == 0)
// {
// for (int i = 0; i < depth; i++)
// {
// std::cout << " ";
// }
// std::cout << " - index: " << this->Handler.CurrentIndex << " - target: " << targetIdx << std::endl;
// }
if (this->Handler.CurrentIndex == targetIdx)
{
return lastValue;
}
else
{
// Need to refine
return this->GetValueAtIndex(targetIdx, pid, mpiSize, depth + 1);
}
}
// ----------------------------------------------------------------------------
double* CDFReducer::Compute(double* localSortedValues,
vtkIdType localArraySize,
vtkIdType outputCDFSize)
{
// Initialization
vtkIdType MPI_ID = this->Controller->GetLocalProcessId();
vtkIdType MPI_SIZE = this->Controller->GetNumberOfProcesses();
if (this->LocalCDF == nullptr)
{
this->LocalCDF = new double[this->CDFSize];
}
if (this->RemoteCDFs == nullptr)
{
this->RemoteCDFs = new double[MPI_SIZE * this->CDFSize];
}
if (this->CDFOffsets == nullptr)
{
this->CDFOffsets = new vtkIdType[MPI_SIZE];
}
for (vtkIdType idx = 0; idx < MPI_SIZE; idx++)
{
this->CDFOffsets[idx] = 0;
}
this->ArraySize = localArraySize;
this->LocalValues = localSortedValues;
if (this->ReducedCDF == nullptr || this->ReducedCDFSize != outputCDFSize)
{
delete[] this->ReducedCDF;
this->ReducedCDF = new double[outputCDFSize];
}
this->ReducedCDFSize = outputCDFSize;
// Share basic information (min, max, counts)
double globalMin, globalMax;
this->Controller->AllReduce(&this->LocalValues[0], &globalMin, 1, vtkCommunicator::MIN_OP);
this->Controller->AllReduce(
&this->LocalValues[localArraySize - 1], &globalMax, 1, vtkCommunicator::MAX_OP);
this->Controller->AllReduce(&localArraySize, &this->TotalCount, 1, vtkCommunicator::SUM_OP);
this->ReducedCDF[0] = globalMin;
this->ReducedCDF[outputCDFSize - 1] = globalMax;
// Look for indexes we will search value for
vtkIdType splitSize = outputCDFSize - 1;
// Proper init
this->Handler.Init(this->CDFOffsets, MPI_SIZE, this->CDFSize);
// Fill resulting CDF
double avgCallstack = 0;
for (vtkIdType i = 1; i < splitSize; i++)
{
this->ExecutionCount = 0;
this->ReducedCDF[i] = this->GetValueAtIndex(i * this->TotalCount / splitSize, MPI_ID, MPI_SIZE, 1);
avgCallstack += double(this->ExecutionCount);
}
avgCallstack /= (splitSize - 1);
if (MPI_ID == 0)
{
std::cout << "Avg stack size: " << avgCallstack << std::endl;
}
return this->ReducedCDF;
}
\ No newline at end of file
//============================================================================
// Copyright (c) Kitware, Inc.
// All rights reserved.
// See LICENSE.txt for details.
// This software is distributed WITHOUT ANY WARRANTY; without even
// the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
// PURPOSE. See the above copyright notice for more information.
//
// Copyright 2014 Sandia Corporation.
// Copyright 2014 UT-Battelle, LLC.
// Copyright 2014 Los Alamos National Security.
//
// Under the terms of Contract DE-AC04-94AL85000 with Sandia Corporation,
// the U.S. Government retains certain rights in this software.
//
// Under the terms of Contract DE-AC52-06NA25396 with Los Alamos National
// Laboratory (LANL), the U.S. Government retains certain rights in
// this software.
//============================================================================
#ifndef CDFReducer_h
#define CDFReducer_h
class vtkMultiProcessController;
struct StepHandler
{
StepHandler()
: Offsets(nullptr)
, MPISize(0)
, CDFSize(0)
, ProcessStepCount(nullptr)
{}
~StepHandler()
{
delete[] this->ProcessStepCount;
this->ProcessStepCount = nullptr;
}
void Init(vtkIdType* &offsets, vtkIdType mpiSize, vtkIdType cdfSize)
{
this->Offsets = offsets;
this->MPISize = mpiSize;
this->CDFSize = cdfSize;
this->ProcessStepCount = new vtkIdType[mpiSize];
}
void Reset(vtkIdType targetIdx)
{
this->TargetIdx = targetIdx;
this->CurrentIndex = 0;
this->Step = 1;
this->KeepMoving = true;
// Init counts
for (int i = 0; i < this->MPISize; i++)
{
this->ProcessStepCount[i] = 0;
}
// Figure out the current index based on offsets
this->UpdateCurrentIndex();
// Compute best Step size
this->Step = (this->TargetIdx - this->CurrentIndex) / this->CDFSize;
if (this->Step < 2)
{
this->Step = 1;
this->ErrorStep = 0;
}
else
{
this->ErrorStep = this->MPISize * (this->Step - 1);
}
}
bool CanMoveForward()
{
return this->KeepMoving;
}
bool Move(vtkIdType processId)
{
if (!this->KeepMoving)
{
return this->KeepMoving;
}
if (this->Step + this->CurrentIndex > this->TargetIdx - this->ErrorStep) {
return (this->KeepMoving = false);
}
if (++this->ProcessStepCount[processId] == this->CDFSize)
{
return (this->KeepMoving = false);
}
this->Offsets[processId] += this->Step;
this->CurrentIndex += this->Step;
return this->KeepMoving;
}
void UpdateCurrentIndex()
{
this->CurrentIndex = 0;
for (vtkIdType idx = 0; idx < this->MPISize; idx++)
{
this->CurrentIndex += this->Offsets[idx];
}
}
vtkIdType* Offsets;
vtkIdType TargetIdx;
vtkIdType MPISize;
vtkIdType CDFSize;
vtkIdType CurrentIndex;
vtkIdType Step;
bool KeepMoving;
vtkIdType* ProcessStepCount;
vtkIdType ErrorStep;
};
class CDFReducer
{
public:
CDFReducer(vtkMultiProcessController* controller)
: Controller(controller)
, ArraySize(0)
, LocalValues(nullptr)
, ReducedCDF(nullptr)
, ReducedCDFSize(0)
, TotalCount(0)
, CDFOffsets(nullptr)
, CDFStep(1)
, CDFSize(512)
, LocalCDF(nullptr)
, RemoteCDFs(nullptr)
{};
~CDFReducer();
double* Compute(double* localSortedValues, vtkIdType localArraySize, vtkIdType outputCDFSize);
vtkIdType GetBufferSize() { return this->CDFSize; };
void SetBufferSize(vtkIdType exchangeCDFSize) { this->CDFSize = exchangeCDFSize; };
vtkIdType GetTotalCount() { return this->TotalCount; };
protected:
double GetValueAtIndex(vtkIdType targetIdx, vtkIdType pid, vtkIdType mpiSize, int depth);
private:
vtkMultiProcessController* Controller;
vtkIdType ArraySize;
double* LocalValues;
double* ReducedCDF;
vtkIdType ReducedCDFSize;
vtkIdType TotalCount;
//
vtkIdType* CDFOffsets;
vtkIdType CDFStep;
vtkIdType CDFSize;
double* LocalCDF;
double* RemoteCDFs;
//
int ExecutionCount;
//
StepHandler Handler;
};
#endif
\ No newline at end of file
......@@ -22,7 +22,7 @@ if (ENABLE_SENSEI)
if (ENABLE_VTKM)
list(APPEND senseiCore_sources VTKmVolumeReductionAnalysis.cxx
CinemaHelper.cxx)
VTKmCDFAnalysis.cxx CDFReducer.cxx CinemaHelper.cxx)
list(APPEND senseiCore_libs vtkm_cont)
endif()
......
......@@ -18,6 +18,7 @@
#endif
#ifdef ENABLE_VTKM
#include "VTKmVolumeReductionAnalysis.h"
#include "VTKmCDFAnalysis.h"
#endif
#ifdef ENABLE_ADIOS
#include "ADIOSAnalysisAdaptor.h"
......@@ -149,6 +150,7 @@ struct ConfigurableAnalysis::InternalsType
int AddHistogram(pugi::xml_node node);
int AddVTKmContour(pugi::xml_node node);
int AddVTKmVolumeReduction(pugi::xml_node node);
int AddVTKmCDF(pugi::xml_node node);
int AddAdios(pugi::xml_node node);
int AddCatalyst(pugi::xml_node node);
int AddLibsim(pugi::xml_node node);
......@@ -322,6 +324,46 @@ int ConfigurableAnalysis::InternalsType::AddVTKmVolumeReduction(pugi::xml_node n
#endif
}
// --------------------------------------------------------------------------
int ConfigurableAnalysis::InternalsType::AddVTKmCDF(pugi::xml_node node)
{
#ifndef ENABLE_VTKM
(void)node;
SENSEI_ERROR("VTK-m analysis was requested but is disabled in this build")
return -1;
#else
if (
requireAttribute(node, "mesh") ||
requireAttribute(node, "field") ||
requireAttribute(node, "association")
)
{
SENSEI_ERROR("Failed to initialize VTKmCDFAnalysis");
return -1;
}
auto mesh = node.attribute("mesh").as_string();
auto field = node.attribute("field").as_string();
auto assoc = node.attribute("association").as_string();
auto quantiles = node.attribute("quantiles").as_int(10);
auto exchangeSize = node.attribute("exchange-size").as_int(quantiles);
bool haveWorkDir = !!node.attribute("working-directory");
std::string workDir = haveWorkDir ? node.attribute("working-directory").as_string() : ".";
auto analysis = vtkSmartPointer<VTKmCDFAnalysis>::New();
this->TimeInitialization(analysis, [&]() {
analysis->Initialize(mesh, field, assoc, workDir, quantiles, exchangeSize, this->Comm);
return 0;
});
this->Analyses.push_back(analysis.GetPointer());
SENSEI_STATUS("Configured VTKmCDFAnalysis " << mesh << "/" << field)
return 0;
#endif
}
// --------------------------------------------------------------------------
int ConfigurableAnalysis::InternalsType::AddAdios(pugi::xml_node node)
{
......@@ -900,6 +942,7 @@ int ConfigurableAnalysis::Initialize(const std::string& filename)
|| ((type == "VTKAmrWriter") && !this->Internals->AddVTKAmrWriter(node))
|| ((type == "vtkmcontour") && !this->Internals->AddVTKmContour(node))
|| ((type == "vtkmhaar") && !this->Internals->AddVTKmVolumeReduction(node))
|| ((type == "cdf") && !this->Internals->AddVTKmCDF(node))
|| ((type == "python") && !this->Internals->AddPythonAnalysis(node))))
{
if (rank == 0)
......
#include "VTKmCDFAnalysis.h"
#include "CDFReducer.h"
#include "CinemaHelper.h"
#include "DataAdaptor.h"
#include <Timer.h>
#include <Error.h>
#include <vtkDataSet.h>
#include <vtkCellData.h>
#include <vtkIntArray.h>
#include <vtkSortDataArray.h>
#include <vtkDoubleArray.h>
#include <vtkImageData.h>
#ifdef ENABLE_VTK_MPI
# include <vtkMPICommunicator.h>
# include <vtkMPIController.h>
#endif
#include <vtkMultiProcessController.h>
#include <vtkMultiBlockDataSet.h>
#include <vtkNew.h>
#include <vtkObjectFactory.h>
#include <vtkPointData.h>
#include <vtkSmartPointer.h>
#include <algorithm>
#include <vector>
// --- vtkm ---
#include <vtkm/Math.h>
#include <vtkm/cont/ArrayHandle.h>
#include <vtkm/cont/ArrayHandleCounting.h>
#include <vtkm/cont/DataSetBuilderUniform.h>
#include <vtkm/filter/FilterDataSet.h>
#include <vtkm/worklet/DispatcherPointNeighborhood.h>
#include <vtkm/worklet/ScatterPermutation.h>
#include <vtkm/worklet/WorkletPointNeighborhood.h>
#include <vtkm/cont/TryExecute.h>
#include <vtkm/cont/cuda/DeviceAdapterCuda.h>
#include <vtkm/cont/serial/DeviceAdapterSerial.h>
#include <vtkm/cont/tbb/DeviceAdapterTBB.h>
namespace sensei
{
//-----------------------------------------------------------------------------
senseiNewMacro(VTKmCDFAnalysis);
//-----------------------------------------------------------------------------
VTKmCDFAnalysis::VTKmCDFAnalysis()
: Communicator(MPI_COMM_WORLD)
, Helper(nullptr)
, NumberOfQuantiles(10)
, RequestSize(10)
{
}
//-----------------------------------------------------------------------------
VTKmCDFAnalysis::~VTKmCDFAnalysis()
{
delete this->Helper;
}
//-----------------------------------------------------------------------------
void VTKmCDFAnalysis::Initialize(
const std::string& meshName,
const std::string& fieldName,
const std::string& fieldAssoc,
const std::string& workingDirectory,
int numberOfQuantiles,
int requestSize,
MPI_Comm comm
)
{
this->MeshName = meshName;
this->FieldName = fieldName;
this->FieldAssoc = fieldAssoc == "cell" ?
vtkm::cont::Field::Association::CELL_SET :
vtkm::cont::Field::Association::POINTS;
this->Communicator = comm;
this->NumberOfQuantiles = numberOfQuantiles;
this->RequestSize = requestSize;
#ifdef ENABLE_VTK_MPI
vtkNew<vtkMPIController> con;
con->Initialize(0, 0, 1); // initialized externally
vtkMultiProcessController::SetGlobalController(con.GetPointer());
con->Register(NULL); // Keep ref
#endif
this->Helper = new CinemaHelper();
this->Helper->SetWorkingDirectory(workingDirectory);
this->Helper->SetExportType("cdf");
this->Helper->SetSampleSize(this->NumberOfQuantiles);
}
//-----------------------------------------------------------------------------
bool VTKmCDFAnalysis::Execute(DataAdaptor* data)
{
timer::MarkEvent mark("VTKmCDFAnalysis::execute");
this->Helper->AddTimeEntry();
// Get the mesh from the simulation:
vtkDataObject* mesh = nullptr;
if (data->GetMesh(this->MeshName, /*structure_only*/true, mesh) || !mesh)
{
return false;
}
// Tell the simulation to add the array we want:
data->AddArray(
mesh, this->MeshName,
this->FieldAssoc == vtkm::cont::Field::Association::POINTS ?
vtkDataObject::POINT : vtkDataObject::CELL,
this->FieldName);
// Now ask the mesh for the array:
vtkDataArray* array = nullptr;
if (mesh && this->FieldAssoc == vtkm::cont::Field::Association::WHOLE_MESH)
{
array = mesh->GetFieldData()->GetArray(this->FieldName.c_str());
}
auto dataset = vtkDataSet::SafeDownCast(mesh);
if (!array && dataset)
{
if (this->FieldAssoc == vtkm::cont::Field::Association::POINTS)
{
array = dataset->GetPointData()->GetArray(this->FieldName.c_str());
}
else if (this->FieldAssoc == vtkm::cont::Field::Association::CELL_SET)
{
array = dataset->GetCellData()->GetArray(this->FieldName.c_str());
}
}
if (!array)
{
vtkMultiBlockDataSet* blocks = vtkMultiBlockDataSet::SafeDownCast(mesh);
for(unsigned int i = 0; !array && i < blocks->GetNumberOfBlocks(); i++)
{
auto dobj = blocks->GetBlock(i);
if (dobj && this->FieldAssoc == vtkm::cont::Field::Association::WHOLE_MESH)
{
array = dobj->GetFieldData()->GetArray(this->FieldName.c_str());
if (array)
{
break;
}
}
auto dataset = vtkDataSet::SafeDownCast(dobj);
if (dataset && this->FieldAssoc == vtkm::cont::Field::Association::POINTS)
{
array = dataset->GetPointData()->GetArray(this->FieldName.c_str());
}
else if (dataset && this->FieldAssoc == vtkm::cont::Field::Association::CELL_SET)
{
array = dataset->GetCellData()->GetArray(this->FieldName.c_str());
}
}
}
if (!array)
{
SENSEI_ERROR("Could not obtain array \"" << this->FieldName << "\" from data adaptor.");
return false;
}
if (array->GetNumberOfComponents() > 1)
{
SENSEI_ERROR("Cannot compute CDF of multi-component (vector, non-scalar) array.");
return false;
}
if (this->NumberOfQuantiles <= 0)
{
SENSEI_ERROR("Invalid CDF request (bad number of quantiles).");
return false;
}
timer::MarkStartEvent("VTKm CDF");
vtkNew<vtkDoubleArray> sorted;
sorted->DeepCopy(array);
vtkSortDataArray::SortArrayByComponent(sorted, 0);
#ifdef ENABLE_VTK_MPI
vtkNew<vtkMPIController> controller;
#else
vtkNew<vtkMultiProcessController> controller;
#endif
CDFReducer reducer(controller);
reducer.SetBufferSize(this->RequestSize);
double* cdf = reducer.Compute(sorted->GetPointer(0), sorted->GetNumberOfTuples(), this->NumberOfQuantiles);
timer::MarkEndEvent("VTKm CDF");
timer::MarkStartEvent("Cinema CDF export");
this->Helper->WriteCDF(this->NumberOfQuantiles, cdf);
this->Helper->WriteMetadata();
timer::MarkEndEvent("Cinema CDF export");
return true;
}
}
#ifndef sensei_VTKmCDFAnalysis_h
#define sensei_VTKmCDFAnalysis_h
#include "AnalysisAdaptor.h"
#include <vtkm/cont/Field.h>
#include <mpi.h>
namespace sensei
{
class CinemaHelper;
class VTKmCDFAnalysis : public AnalysisAdaptor
{
public:
static VTKmCDFAnalysis* New();
senseiTypeMacro(VTKmCDFAnalysis, AnalysisAdaptor);
void Initialize(
const std::string& meshName,
const std::string& fieldName,
const std::string& fieldAssoc,
const std::string& workingDirectory,
int numberOfQuantiles,
int requestSize,
MPI_Comm comm);
bool Execute(DataAdaptor* data) override;
int Finalize() override { return 0; }
protected: