Skip to content
Snippets Groups Projects
Commit 3097720b authored by Julien Fausty's avatar Julien Fausty
Browse files

vtkMPICommunicator: add blocking probe methods

parent 1ea2bd23
No related branches found
No related tags found
No related merge requests found
......@@ -487,6 +487,70 @@ int vtkMPICommunicatorIprobe(int source, int tag, int* flag, int* actualSource,
return retVal;
}
//------------------------------------------------------------------------------
int vtkMPICommunicatorProbe(
int source, int tag, int* actualSource, MPI_Datatype datatype, int* size, MPI_Comm* handle)
{
if (source == vtkMultiProcessController::ANY_SOURCE)
{
source = MPI_ANY_SOURCE;
}
MPI_Status status;
int retVal = MPI_Probe(source, tag, *handle, &status);
if (retVal == MPI_SUCCESS)
{
if (actualSource)
{
*actualSource = status.MPI_SOURCE;
}
if (size)
{
return MPI_Get_count(&status, datatype, size);
}
}
return retVal;
}
//------------------------------------------------------------------------------
int vtkMPICommunicatorProbe(int source, int tag, int* actualSource, MPI_Datatype datatype,
vtkTypeInt64* size, MPI_Comm* handle)
{
if (source == vtkMultiProcessController::ANY_SOURCE)
{
source = MPI_ANY_SOURCE;
}
MPI_Status status;
int retVal = MPI_Probe(source, tag, *handle, &status);
if (retVal == MPI_SUCCESS)
{
if (actualSource)
{
*actualSource = status.MPI_SOURCE;
}
if (size)
{
#ifdef VTKMPI_64BIT_LENGTH
MPI_Count countSize = 0;
retVal = MPI_Get_count_c(&status, datatype, &countSize);
if (retVal == MPI_SUCCESS)
{
*size = countSize;
}
return retVal;
#else
int intSize = 0;
retVal = MPI_Get_count(&status, datatype, &intSize);
if (retVal == MPI_SUCCESS)
{
*size = intSize;
}
return retVal;
#endif
}
}
return retVal;
}
//------------------------------------------------------------------------------
// Method for converting an MPI operation to a
// vtkMultiProcessController::Operation.
......@@ -2022,4 +2086,90 @@ int vtkMPICommunicator::Iprobe(
source, tag, flag, actualSource, MPI_DOUBLE, size, this->MPIComm->Handle));
}
//------------------------------------------------------------------------------
int vtkMPICommunicator::Probe(int source, int tag, int* actualSource)
{
return CheckForMPIError(vtkMPICommunicatorProbe(
source, tag, actualSource, MPI_INT, (vtkIdType*)nullptr, this->MPIComm->Handle));
}
//------------------------------------------------------------------------------
int vtkMPICommunicator::Probe(
int source, int tag, int* actualSource, int* vtkNotUsed(type), int* size)
{
return CheckForMPIError(
vtkMPICommunicatorProbe(source, tag, actualSource, MPI_INT, size, this->MPIComm->Handle));
}
//------------------------------------------------------------------------------
int vtkMPICommunicator::Probe(
int source, int tag, int* actualSource, unsigned long* vtkNotUsed(type), int* size)
{
return CheckForMPIError(vtkMPICommunicatorProbe(
source, tag, actualSource, MPI_UNSIGNED_LONG, size, this->MPIComm->Handle));
}
//------------------------------------------------------------------------------
int vtkMPICommunicator::Probe(
int source, int tag, int* actualSource, const char* vtkNotUsed(type), int* size)
{
return CheckForMPIError(
vtkMPICommunicatorProbe(source, tag, actualSource, MPI_CHAR, size, this->MPIComm->Handle));
}
//------------------------------------------------------------------------------
int vtkMPICommunicator::Probe(
int source, int tag, int* actualSource, float* vtkNotUsed(type), int* size)
{
return CheckForMPIError(
vtkMPICommunicatorProbe(source, tag, actualSource, MPI_FLOAT, size, this->MPIComm->Handle));
}
//------------------------------------------------------------------------------
int vtkMPICommunicator::Probe(
int source, int tag, int* actualSource, double* vtkNotUsed(type), int* size)
{
return CheckForMPIError(
vtkMPICommunicatorProbe(source, tag, actualSource, MPI_DOUBLE, size, this->MPIComm->Handle));
}
//------------------------------------------------------------------------------
int vtkMPICommunicator::Probe(
int source, int tag, int* actualSource, int* vtkNotUsed(type), vtkTypeInt64* size)
{
return CheckForMPIError(
vtkMPICommunicatorProbe(source, tag, actualSource, MPI_INT, size, this->MPIComm->Handle));
}
//------------------------------------------------------------------------------
int vtkMPICommunicator::Probe(
int source, int tag, int* actualSource, unsigned long* vtkNotUsed(type), vtkTypeInt64* size)
{
return CheckForMPIError(vtkMPICommunicatorProbe(
source, tag, actualSource, MPI_UNSIGNED_LONG, size, this->MPIComm->Handle));
}
//------------------------------------------------------------------------------
int vtkMPICommunicator::Probe(
int source, int tag, int* actualSource, const char* vtkNotUsed(type), vtkTypeInt64* size)
{
return CheckForMPIError(
vtkMPICommunicatorProbe(source, tag, actualSource, MPI_CHAR, size, this->MPIComm->Handle));
}
//------------------------------------------------------------------------------
int vtkMPICommunicator::Probe(
int source, int tag, int* actualSource, float* vtkNotUsed(type), vtkTypeInt64* size)
{
return CheckForMPIError(
vtkMPICommunicatorProbe(source, tag, actualSource, MPI_FLOAT, size, this->MPIComm->Handle));
}
//------------------------------------------------------------------------------
int vtkMPICommunicator::Probe(
int source, int tag, int* actualSource, double* vtkNotUsed(type), vtkTypeInt64* size)
{
return CheckForMPIError(
vtkMPICommunicatorProbe(source, tag, actualSource, MPI_DOUBLE, size, this->MPIComm->Handle));
}
VTK_ABI_NAMESPACE_END
......@@ -227,6 +227,34 @@ public:
int Iprobe(int source, int tag, int* flag, int* actualSource, double* type, vtkTypeInt64* size);
///@}
/**
* Check if this communicator implements a probe operation (always true for MPI communicator)
*/
bool CanProbe() override { return true; };
///@{
/**
* Blocking test for a message. Inputs are: source -- the source rank
* or ANY_SOURCE; tag -- the tag value. Outputs are:
* actualSource -- the rank sending the message (useful if ANY_SOURCE is used)
* if actualSource isn't nullptr; size -- the length of the message in
* bytes if flag is true (only set if size isn't nullptr). The return
* value is 1 for success and 0 otherwise.
*/
int Probe(int source, int tag, int* actualSource) override;
int Probe(int source, int tag, int* actualSource, int* type, int* size);
int Probe(int source, int tag, int* actualSource, unsigned long* type, int* size);
int Probe(int source, int tag, int* actualSource, const char* type, int* size);
int Probe(int source, int tag, int* actualSource, float* type, int* size);
int Probe(int source, int tag, int* actualSource, double* type, int* size);
int Probe(int source, int tag, int* actualSource, int* type, vtkTypeInt64* size);
int Probe(int source, int tag, int* actualSource, unsigned long* type, vtkTypeInt64* size);
int Probe(int source, int tag, int* actualSource, const char* type, vtkTypeInt64* size);
int Probe(int source, int tag, int* actualSource, float* type, vtkTypeInt64* size);
int Probe(int source, int tag, int* actualSource, double* type, vtkTypeInt64* size);
///@}
/**
* Given the request objects of a set of non-blocking operations
* (send and/or receive) this method blocks until all requests are complete.
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment