Commit 88d43af4 authored by Burlen Loring's avatar Burlen Loring Committed by Mario Melara

autocreelation use 1 thread by default

fixes a bug where each rank had used the maximum number of threads
on the system, leading us to hit resource limits when running fully
packed.
parent 31a1063f
......@@ -188,7 +188,7 @@ int main(int argc, char** argv)
size_t k_max = 3;
int threads = 1;
int ghostLevels = 0;
int numberOfParticles = 1024;
int numberOfParticles = 64;
int seed = -1;
std::string config_file;
std::string out_prefix = "";
......@@ -242,10 +242,8 @@ int main(int argc, char** argv)
if (seed == -1)
{
if (world.rank() == 0)
{
seed = static_cast<int>(std::time(nullptr));
fmt::print("using seed {}\n", seed);
}
diy::mpi::broadcast(world, seed, 0);
}
......@@ -419,12 +417,13 @@ int main(int argc, char** argv)
if (sync)
world.barrier();
timer::MarkEndTimeStep();
t += dt;
t_count++;
}
world.barrier();
timer::MarkStartEvent("oscillators::finalize");
#ifdef ENABLE_SENSEI
......@@ -436,9 +435,10 @@ int main(int argc, char** argv)
timer::Finalize();
auto duration = std::chrono::duration_cast<ms>(Time::now() - start);
world.barrier();
if (world.rank() == 0)
{
auto duration = std::chrono::duration_cast<ms>(Time::now() - start);
fmt::print("Total run time: {}.{} s\n", duration.count() / 1000, duration.count() % 1000);
}
}
......@@ -153,13 +153,9 @@ public:
size_t Window;
bool BlocksInitialized;
size_t NumberOfBlocks;
AInternals() :
KMax(3),
Association(vtkDataObject::POINT),
Window(10),
BlocksInitialized(false),
NumberOfBlocks(0)
{}
AInternals() : KMax(3), Association(vtkDataObject::POINT),
Window(10), BlocksInitialized(false), NumberOfBlocks(0) {}
void InitializeBlocks(vtkDataObject* dobj)
{
......@@ -238,14 +234,15 @@ Autocorrelation::~Autocorrelation()
//-----------------------------------------------------------------------------
void Autocorrelation::Initialize(size_t window, const std::string &meshName,
int association, const std::string &arrayname, size_t kmax)
int association, const std::string &arrayname, size_t kmax, int numThreads)
{
timer::MarkEvent mark("Autocorrelation::Initialize");
AInternals& internals = (*this->Internals);
internals.Master = make_unique<diy::Master>(this->GetCommunicator(), -1, -1,
&AutocorrelationImpl::create,
&AutocorrelationImpl::destroy);
internals.Master = make_unique<diy::Master>(this->GetCommunicator(),
numThreads, -1, &AutocorrelationImpl::create, &AutocorrelationImpl::destroy);
internals.MeshName = meshName;
internals.Association = association;
internals.ArrayName = arrayname;
......@@ -257,15 +254,16 @@ void Autocorrelation::Initialize(size_t window, const std::string &meshName,
bool Autocorrelation::Execute(DataAdaptor* data)
{
timer::MarkEvent mark("Autocorrelation::Execute");
AInternals& internals = (*this->Internals);
const int association = internals.Association;
vtkDataObject* mesh = nullptr;
if (data->GetMesh(internals.MeshName, false, mesh))
{
SENSEI_ERROR("Failed to get mesh \"" << internals.MeshName << "\"")
return false;
}
{
SENSEI_ERROR("Failed to get mesh \"" << internals.MeshName << "\"")
return false;
}
if (data->AddArray(mesh, internals.MeshName,
internals.Association, internals.ArrayName))
......@@ -277,18 +275,18 @@ bool Autocorrelation::Execute(DataAdaptor* data)
int nLayers = 0;
if (data->GetMeshHasGhostCells(internals.MeshName, nLayers) == 0)
{
if (nLayers > 0 && data->AddGhostCellsArray(mesh, internals.MeshName))
{
SENSEI_ERROR(<< data->GetClassName() << " failed to add ghost cells.")
return false;
}
}
{
if (nLayers > 0 && data->AddGhostCellsArray(mesh, internals.MeshName))
{
SENSEI_ERROR(<< data->GetClassName() << " failed to add ghost cells.")
return false;
}
}
else
{
SENSEI_ERROR(<< data->GetClassName() << " failed to query for ghost cells.")
return false;
}
{
SENSEI_ERROR(<< data->GetClassName() << " failed to query for ghost cells.")
return false;
}
internals.InitializeBlocks(mesh);
......@@ -329,7 +327,7 @@ bool Autocorrelation::Execute(DataAdaptor* data)
vtkFloatArray* fa = vtkFloatArray::SafeDownCast(
ds->GetAttributesAsFieldData(association)->GetArray(internals.ArrayName.c_str()));
vtkUnsignedCharArray *gc = vtkUnsignedCharArray::SafeDownCast(
ds->GetCellData()->GetArray("vtkGhostType"));
ds->GetCellData()->GetArray("vtkGhostType"));
if (fa)
{
corr->process(fa->GetPointer(0), gc ? gc->GetPointer(0) : nullptr);
......@@ -340,13 +338,15 @@ bool Autocorrelation::Execute(DataAdaptor* data)
abort();
}
}
return true;
}
//-----------------------------------------------------------------------------
void Autocorrelation::PrintResults(size_t k_max)
{
timer::MarkEvent mark("autocorrelation::collect results");
timer::MarkEvent mark("Autocorrelation::PrintResults");
AInternals& internals = (*this->Internals);
size_t nblocks = internals.NumberOfBlocks;
......@@ -367,10 +367,10 @@ void Autocorrelation::PrintResults(size_t k_max)
{
// print out the autocorrelations
auto result = internals.Master->proxy(0).get<std::vector<float>>();
std::cout << "Autocorrelations:";
std::cerr << "Autocorrelations:";
for (size_t i = 0; i < result.size(); ++i)
std::cout << ' ' << result[i];
std::cout << std::endl;
std::cerr << ' ' << result[i];
std::cerr << std::endl;
}
internals.Master->foreach(
......@@ -379,85 +379,90 @@ void Autocorrelation::PrintResults(size_t k_max)
cp.collectives()->clear();
});
// select k strongest autocorrelations for each shift
diy::ContiguousAssigner assigner(internals.Master->communicator().size(), nblocks); // NB: this is coupled to main(...) in oscillator.cpp
diy::RegularDecomposer<diy::DiscreteBounds> decomposer(1, diy::interval(0, nblocks-1), nblocks);
diy::RegularMergePartners partners(decomposer, 2);
//diy::RegularMergePartners partners(3, nblocks, 2, true);
diy::reduce(*internals.Master, assigner, partners,
[k_max](void* b_, const diy::ReduceProxy& rp, const diy::RegularMergePartners&)
{
AutocorrelationImpl* b = static_cast<AutocorrelationImpl*>(b_);
//unsigned round = rp.round(); // current round number
using MaxHeapVector = std::vector<std::vector<std::tuple<float,Vertex>>>;
using Compare = std::greater<std::tuple<float,Vertex>>;
MaxHeapVector maxs(b->window);
if (rp.in_link().size() == 0)
{
grid::for_each(b->corr.shape(), [&](const Vertex4D& v)
{
size_t offset = v[3];
float val = b->corr(v);
auto& max = maxs[offset];
if (max.size() < k_max)
{
max.emplace_back(val, v.drop(3) + b->from);
std::push_heap(max.begin(), max.end(), Compare());
} else if (val > std::get<0>(max[0]))
{
std::pop_heap(max.begin(), max.end(), Compare());
maxs[offset].back() = std::make_tuple(val, v.drop(3) + b->from);
std::push_heap(max.begin(), max.end(), Compare());
}
});
} else
{
for (long i = 0; i < rp.in_link().size(); ++i)
{
MaxHeapVector in_heaps;
rp.dequeue(rp.in_link().target(i).gid, in_heaps);
for (size_t j = 0; j < in_heaps.size(); ++j)
{
for (auto& x : in_heaps[j])
maxs[j].push_back(x);
}
}
for (size_t j = 0; j < maxs.size(); ++j)
{
auto& max = maxs[j];
std::make_heap(max.begin(), max.end(), Compare());
while(max.size() > k_max)
{
std::pop_heap(max.begin(), max.end(), Compare());
max.pop_back();
}
}
}
if (rp.out_link().size() > 0)
rp.enqueue(rp.out_link().target(0), maxs);
else
{
// print out the answer
for (size_t i = 0; i < maxs.size(); ++i)
{
std::sort(maxs[i].begin(), maxs[i].end(), Compare());
std::cout << "Max autocorrelations for " << i << ":";
for (auto& x : maxs[i])
std::cout << " (" << std::get<0>(x) << " at " << std::get<1>(x) << ")";
std::cout << std::endl;
}
}
});
// select k strongest autocorrelations for each shift
diy::ContiguousAssigner assigner(internals.Master->communicator().size(), nblocks); // NB: this is coupled to main(...) in oscillator.cpp
diy::RegularDecomposer<diy::DiscreteBounds> decomposer(1, diy::interval(0, nblocks-1), nblocks);
diy::RegularMergePartners partners(decomposer, 2);
//diy::RegularMergePartners partners(3, nblocks, 2, true);
diy::reduce(*internals.Master, assigner, partners,
[k_max](void* b_, const diy::ReduceProxy& rp, const diy::RegularMergePartners&)
{
AutocorrelationImpl* b = static_cast<AutocorrelationImpl*>(b_);
//unsigned round = rp.round(); // current round number
using MaxHeapVector = std::vector<std::vector<std::tuple<float,Vertex>>>;
using Compare = std::greater<std::tuple<float,Vertex>>;
MaxHeapVector maxs(b->window);
if (rp.in_link().size() == 0)
{
grid::for_each(b->corr.shape(), [&](const Vertex4D& v)
{
size_t offset = v[3];
float val = b->corr(v);
auto& max = maxs[offset];
if (max.size() < k_max)
{
max.emplace_back(val, v.drop(3) + b->from);
std::push_heap(max.begin(), max.end(), Compare());
} else if (val > std::get<0>(max[0]))
{
std::pop_heap(max.begin(), max.end(), Compare());
maxs[offset].back() = std::make_tuple(val, v.drop(3) + b->from);
std::push_heap(max.begin(), max.end(), Compare());
}
});
} else
{
for (long i = 0; i < rp.in_link().size(); ++i)
{
MaxHeapVector in_heaps;
rp.dequeue(rp.in_link().target(i).gid, in_heaps);
for (size_t j = 0; j < in_heaps.size(); ++j)
{
for (auto& x : in_heaps[j])
maxs[j].push_back(x);
}
}
for (size_t j = 0; j < maxs.size(); ++j)
{
auto& max = maxs[j];
std::make_heap(max.begin(), max.end(), Compare());
while(max.size() > k_max)
{
std::pop_heap(max.begin(), max.end(), Compare());
max.pop_back();
}
}
}
if (rp.out_link().size() > 0)
rp.enqueue(rp.out_link().target(0), maxs);
else
{
// print out the answer
for (size_t i = 0; i < maxs.size(); ++i)
{
std::sort(maxs[i].begin(), maxs[i].end(), Compare());
std::cerr << "Max autocorrelations for " << i << ":";
for (auto& x : maxs[i])
std::cerr << " (" << std::get<0>(x) << " at " << std::get<1>(x) << ")";
std::cerr << std::endl;
}
}
});
}
//-----------------------------------------------------------------------------
int Autocorrelation::Finalize()
{
timer::MarkEvent mark("Autocorrelation::Finalize");
this->PrintResults(this->Internals->KMax);
delete this->Internals;
this->Internals = nullptr;
return 0;
}
......
......@@ -29,9 +29,11 @@ public:
/// compute autocorrelation for.
/// @param arrayname together with \c association, identifies the array to
/// compute autocorrelation for.
/// @param k_max number of strongest autocorrelations to report
/// @param kMax number of strongest autocorrelations to report
/// @param numThreads number of threads in diy's thread pool
void Initialize(size_t window, const std::string &meshName,
int association, const std::string &arrayname, size_t k_max);
int association, const std::string &arrayname, size_t kMax,
int numThreads = 1);
bool Execute(DataAdaptor* data) override;
......
......@@ -710,6 +710,7 @@ int ConfigurableAnalysis::InternalsType::AddAutoCorrelation(pugi::xml_node node)
int window = node.attribute("window").as_int(10);
int kMax = node.attribute("k-max").as_int(3);
int numThreads = node.attribute("n-threads").as_int(1);
auto adaptor = vtkSmartPointer<Autocorrelation>::New();
......@@ -720,11 +721,13 @@ int ConfigurableAnalysis::InternalsType::AddAutoCorrelation(pugi::xml_node node)
adaptor->Initialize(window, meshName, assoc, arrayName, kMax);
return 0;
});
this->Analyses.push_back(adaptor.GetPointer());
SENSEI_STATUS("Configured Autocorrelation " << assocStr
<< " data array " << arrayName << " on mesh " << meshName
<< " window " << window << " k-max " << kMax)
<< " data array \"" << arrayName << "\" on mesh \"" << meshName
<< "\" window " << window << " k-max " << kMax
<< " n-threads " << numThreads)
return 0;
}
......
......@@ -9,6 +9,7 @@
#include <functional>
#include <numeric>
#include <memory>
#include <climits>
#include "link.hpp"
#include "collection.hpp"
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment