Updates will be applied April 15th at 12pm EDT (UTC-0400). GitLab could be a little slow between 12 - 12:45pm EDT.

Commit 9075be36 authored by Andrew J. Burns (Cont's avatar Andrew J. Burns (Cont

added nonthreaded DSM, inter program connection hasn't been tested

parent 5d41d4cd
......@@ -18,6 +18,9 @@ swig -v -c++ -python -o XdmfPython.cpp Xdmf.i
#include <XdmfArrayType.hpp>
#include <XdmfCoreItemFactory.hpp>
#include <XdmfCoreReader.hpp>
#include <XdmfDSMManager.hpp>
#include <XdmfDSMBuffer.hpp>
#include <XdmfDSMCommMPI.hpp>
#include <XdmfError.hpp>
#include <XdmfHDF5Controller.hpp>
#include <XdmfHDF5Writer.hpp>
......
......@@ -80,7 +80,7 @@ check_cxx_source_compiles("
struct Base { virtual ~Base(){} };
struct Derived : public Base {};
int main(int ac, char ** av)
int main(int ac, char * av)
{
boost::shared_ptr<Base> ptr(new Base());
boost::shared_dynamic_cast<Derived>(ptr);
......@@ -114,7 +114,11 @@ if(XDMF_BUILD_DSM)
set(XdmfCoreSources
${XdmfCoreSources}
XdmfHDF5ControllerDSM
XdmfHDF5WriterDSM)
XdmfHDF5WriterDSM
XdmfDSMCommMPI
XdmfDSMBuffer
XdmfDSMManager
XdmfDSMDriver)
find_package(H5FDdsm REQUIRED NO_MODULE)
if(H5FDdsm_FOUND)
......
......@@ -16,6 +16,9 @@ swig -v -c++ -python -o XdmfCorePython.cpp XdmfCore.i
#include <XdmfCore.hpp>
#include <XdmfCoreItemFactory.hpp>
#include <XdmfCoreReader.hpp>
#include <XdmfDSMManager.hpp>
#include <XdmfDSMBuffer.hpp>
#include <XdmfDSMCommMPI.hpp>
#include <XdmfError.hpp>
#include <XdmfHeavyDataController.hpp>
#include <XdmfHeavyDataWriter.hpp>
......@@ -968,6 +971,9 @@ swig -v -c++ -python -o XdmfCorePython.cpp XdmfCore.i
#ifdef XDMF_BUILD_DSM
%include XdmfHDF5ControllerDSM.hpp
%include XdmfHDF5WriterDSM.hpp
%include XdmfDSMManager.hpp
%include XdmfDSMBuffer.hpp
%include XdmfDSMCommMPI.hpp
#endif
%include XdmfArray.hpp
......
/*****************************************************************************/
/* XDMF */
/* eXtensible Data Model and Format */
/* */
/* Id : XdmfDSMBuffer.hpp */
/* */
/* Author: */
/* Andrew Burns */
/* andrew.j.burns2@us.army.mil */
/* US Army Research Laboratory */
/* Aberdeen Proving Ground, MD */
/* */
/* Copyright @ 2013 US Army Research Laboratory */
/* All Rights Reserved */
/* See Copyright.txt for details */
/* */
/* This software is distributed WITHOUT ANY WARRANTY; without */
/* even the implied warranty of MERCHANTABILITY or FITNESS */
/* FOR A PARTICULAR PURPOSE. See the above copyright notice */
/* for more information. */
/* */
/*****************************************************************************/
/*=========================================================================
This code is derived from an earlier work and is distributed
with permission from, and thanks to ...
=========================================================================*/
/*============================================================================
Project : H5FDdsm
Module : H5FDdsmBufferService.cxx H5FDdsmBuffer.cxx
Authors:
John Biddiscombe Jerome Soumagne
biddisco@cscs.ch soumagne@cscs.ch
Copyright (C) CSCS - Swiss National Supercomputing Centre.
You may use modify and and distribute this code freely providing
1) This copyright notice appears on all copies of source code
2) An acknowledgment appears with any substantial usage of the code
3) If this code is contributed to any other open source project, it
must not be reformatted such that the indentation, bracketing or
overall style is modified significantly.
This software is distributed WITHOUT ANY WARRANTY; without even the
implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
This work has received funding from the European Community's Seventh
Framework Programme (FP7/2007-2013) under grant agreement 225967 âxtMuSEâOC
============================================================================*/
#include <XdmfDSMBuffer.hpp>
#include <XdmfDSMCommMPI.hpp>
#include <XdmfError.hpp>
#include <mpi.h>
#include <string.h>
#include <stdlib.h>
#include <algorithm>
XdmfDSMBuffer::XdmfDSMBuffer()
{
this->CommChannel = XDMF_DSM_INTER_COMM;
this->IsServer = true;
this->StartAddress = this->EndAddress = 0;
this->StartServerId = this->EndServerId = -1;
this->Length = 0;
this->TotalLength = 0;
this->BlockLength = 0;
this->Comm = NULL;
this->DataPointer = NULL;
this->IsConnected = false;
}
XdmfDSMBuffer::~XdmfDSMBuffer()
{
if (this->DataPointer)
{
free(this->DataPointer);
}
this->DataPointer = NULL;
}
class XdmfDSMBuffer::CommandMsg
{
public:
int Opcode;
int Source;
int Target;
int Address;
int Length;
};
class XdmfDSMBuffer::InfoMsg
{
public:
int type;
unsigned int length;
unsigned int total_length;
unsigned int block_length;
int start_server_id;
int end_server_id;
};
void
XdmfDSMBuffer::ConfigureUniform(XdmfDSMCommMPI *aComm, long aLength,
int startId, int endId, long aBlockLength,
bool random)
{
if (startId < 0)
{
startId = 0;
}
if (endId < 0)
{
endId = aComm->GetIntraSize() - 1;
}
this->SetDsmType(XDMF_DSM_TYPE_UNIFORM_RANGE);
if ((startId == 0) && (endId == aComm->GetIntraSize() - 1))
{
this->SetDsmType(XDMF_DSM_TYPE_UNIFORM);
}
if (aBlockLength)
{
if (!random)
{
this->SetDsmType(XDMF_DSM_TYPE_BLOCK_CYCLIC);
}
else
{
this->SetDsmType(XDMF_DSM_TYPE_BLOCK_RANDOM);
}
this->SetBlockLength(aBlockLength);
}
this->StartServerId = startId;
this->EndServerId = endId;
this->SetComm(aComm);
if ((aComm->GetId() >= startId) && (aComm->GetId() <= endId) && this->IsServer)
{
try
{
if (aBlockLength)
{
// For optimization we make the DSM length fit to a multiple of block size
this->SetLength(((long)(aLength / aBlockLength)) * aBlockLength);
}
else
{
this->SetLength(aLength);
}
}
catch (XdmfError e)
{
throw e;
}
this->StartAddress = (aComm->GetId() - startId) * aLength;
this->EndAddress = this->StartAddress + aLength - 1;
}
else
{
if (aBlockLength)
{
this->Length = ((long)(aLength / aBlockLength)) * aBlockLength;
}
else
{
this->Length = aLength;
}
}
this->TotalLength = this->GetLength() * (endId - startId + 1);
}
bool
XdmfDSMBuffer::GetIsConnected()
{
return IsConnected;
}
void
XdmfDSMBuffer::SetIsConnected(bool newStatus)
{
IsConnected = newStatus;
}
char *
XdmfDSMBuffer::GetDataPointer()
{
return this->DataPointer;
}
int
XdmfDSMBuffer::GetDsmType()
{
return this->DsmType;
}
void
XdmfDSMBuffer::SetDsmType(int newDsmType)
{
this->DsmType = newDsmType;
}
bool
XdmfDSMBuffer::GetIsServer()
{
return this->IsServer;
}
void
XdmfDSMBuffer::SetIsServer(bool newIsServer)
{
this->IsServer = newIsServer;
}
int
XdmfDSMBuffer::GetEndAddress()
{
return this->EndAddress;
}
int
XdmfDSMBuffer::GetStartAddress()
{
return this->StartAddress;
}
int
XdmfDSMBuffer::GetStartServerId()
{
return this->StartServerId;
}
int
XdmfDSMBuffer::GetEndServerId()
{
return this->EndServerId;
}
long
XdmfDSMBuffer::GetLength()
{
return this->Length;
}
long
XdmfDSMBuffer::GetTotalLength()
{
return this->TotalLength;
}
long
XdmfDSMBuffer::GetBlockLength()
{
return this->BlockLength;
}
void
XdmfDSMBuffer::SetBlockLength(long newBlock)
{
this->BlockLength = newBlock;
}
XdmfDSMCommMPI *
XdmfDSMBuffer::GetComm()
{
return this->Comm;
}
void
XdmfDSMBuffer::SetComm(XdmfDSMCommMPI * newComm)
{
this->Comm = newComm;
}
void
XdmfDSMBuffer::SetLength(long aLength)
{
this->Length = aLength;
if (this->DataPointer)
{
// try to reallocate
// this should not be called in most cases
this->DataPointer = static_cast<char *>(realloc(this->DataPointer, this->Length*sizeof(char)));
}
else
{
#ifdef _WIN32
this->DataPointer = calloc(this->Length, sizeof(char));
#else
posix_memalign((void **)(&this->DataPointer), getpagesize(), this->Length);
memset(this->DataPointer, 0, this->Length);
#endif
}
if (this->DataPointer == NULL)
{
std::stringstream message;
message << "Allocation Failed, unable to allocate " << this->Length;
XdmfError::message(XdmfError::FATAL, message.str());
}
}
void
XdmfDSMBuffer::SendCommandHeader(int opcode, int dest, int address, int aLength, int comm)
{
int status;
CommandMsg cmd;
memset(&cmd, 0, sizeof(CommandMsg));
cmd.Opcode = opcode;
cmd.Source = this->Comm->GetId();
cmd.Target = dest;
cmd.Address = address;
cmd.Length = aLength;
if (comm == XDMF_DSM_INTRA_COMM)
{
status = MPI_Send(&cmd, sizeof(CommandMsg), MPI_UNSIGNED_CHAR, dest, XDMF_DSM_COMMAND_TAG, static_cast<XdmfDSMCommMPI *>(this->Comm)->GetIntraComm());
}
else if (comm == XDMF_DSM_INTER_COMM)
{
int interSource = 0;
MPI_Comm_rank(static_cast<XdmfDSMCommMPI *>(this->Comm)->GetInterComm(), &interSource);
cmd.Source = interSource;
status = MPI_Send(&cmd, sizeof(CommandMsg), MPI_UNSIGNED_CHAR, dest, XDMF_DSM_COMMAND_TAG, static_cast<XdmfDSMCommMPI *>(this->Comm)->GetInterComm());
}
else
{//in this case the comm should be a pointer to an MPI_Comm object
status = MPI_Send(&cmd, sizeof(CommandMsg), MPI_UNSIGNED_CHAR, dest, XDMF_DSM_COMMAND_TAG, comm);
}
if (status != MPI_SUCCESS)
{
try
{
XdmfError::message(XdmfError::FATAL, "Error: Failed to send command header");
}
catch (XdmfError e)
{
throw e;
}
}
}
void
XdmfDSMBuffer::ReceiveCommandHeader(int *opcode, int *source, int *address, int *aLength, int comm, int remoteSource)
{
CommandMsg cmd;
memset(&cmd, 0, sizeof(CommandMsg));
int status = MPI_ERR_OTHER;
MPI_Status signalStatus;
if (remoteSource < 0)
{
remoteSource = MPI_ANY_SOURCE;
}
if (comm == XDMF_DSM_INTRA_COMM)
{
status = MPI_Recv(&cmd, sizeof(CommandMsg), MPI_UNSIGNED_CHAR, remoteSource, XDMF_DSM_COMMAND_TAG, static_cast<XdmfDSMCommMPI *>(this->Comm)->GetIntraComm(), &signalStatus);
}
else if (comm == XDMF_DSM_INTER_COMM)
{
status = MPI_Recv(&cmd, sizeof(CommandMsg), MPI_UNSIGNED_CHAR, remoteSource, XDMF_DSM_COMMAND_TAG, static_cast<XdmfDSMCommMPI *>(this->Comm)->GetInterComm(), &signalStatus);
}
else
{//in this case the integer is probably a pointer to an MPI_Comm object
status = MPI_Recv(&cmd, sizeof(CommandMsg), MPI_UNSIGNED_CHAR, remoteSource, XDMF_DSM_COMMAND_TAG, comm, &signalStatus);
}
if (status != MPI_SUCCESS)
{
try
{
XdmfError::message(XdmfError::FATAL, "Error: Failed to receive command header");
}
catch (XdmfError e)
{
throw e;
}
}
else
{
*opcode = cmd.Opcode;
*source = cmd.Source;
*address = cmd.Address;
*aLength = cmd.Length;
}
}
void
XdmfDSMBuffer::SendData(int dest, char * data, int aLength, int tag, int aAddress, int comm)
{
int status;
if (comm == XDMF_DSM_INTRA_COMM)
{
status = MPI_Send(data, aLength, MPI_UNSIGNED_CHAR, dest, tag, static_cast<XdmfDSMCommMPI *>(this->Comm)->GetIntraComm());
}
else if (comm == XDMF_DSM_INTER_COMM)
{
status = MPI_Send(data, aLength, MPI_UNSIGNED_CHAR, dest, tag, static_cast<XdmfDSMCommMPI *>(this->Comm)->GetInterComm());
}
else
{
status = MPI_Send(data, aLength, MPI_UNSIGNED_CHAR, dest, tag, comm);
}
if (status != MPI_SUCCESS)
{
try
{
XdmfError::message(XdmfError::FATAL, "Error: Failed to send data");
}
catch (XdmfError e)
{
throw e;
}
}
}
void
XdmfDSMBuffer::ReceiveData(int source, char * data, int aLength, int tag, int aAddress, int comm)
{
int status;
MPI_Status signalStatus;
if (comm == XDMF_DSM_INTRA_COMM)
{
status = MPI_Recv(data, aLength, MPI_UNSIGNED_CHAR, source, tag, static_cast<XdmfDSMCommMPI *>(this->Comm)->GetIntraComm(), &signalStatus);
}
else if (comm == XDMF_DSM_INTER_COMM)
{
status = MPI_Recv(data, aLength, MPI_UNSIGNED_CHAR, source, tag, static_cast<XdmfDSMCommMPI *>(this->Comm)->GetInterComm(), &signalStatus);
}
else
{
status = MPI_Recv(data, aLength, MPI_UNSIGNED_CHAR, source, tag, comm, &signalStatus);
}
if (status != MPI_SUCCESS)
{
try
{
XdmfError::message(XdmfError::FATAL, "Error: Failed to receive data");
}
catch (XdmfError e)
{
throw e;
}
}
}
void
XdmfDSMBuffer::SendAcknowledgment(int dest, int data, int tag, int comm)
{
int status;
if (comm == XDMF_DSM_INTRA_COMM)
{
status = MPI_Send(&data, sizeof(int), MPI_UNSIGNED_CHAR, dest, tag, static_cast<XdmfDSMCommMPI *>(this->Comm)->GetIntraComm());
}
else if (comm == XDMF_DSM_INTER_COMM)
{
status = MPI_Send(&data, sizeof(int), MPI_UNSIGNED_CHAR, dest, tag, static_cast<XdmfDSMCommMPI *>(this->Comm)->GetInterComm());
}
else
{
status = MPI_Send(&data, sizeof(int), MPI_UNSIGNED_CHAR, dest, tag, comm);
}
if (status != MPI_SUCCESS)
{
try
{
XdmfError::message(XdmfError::FATAL, "Error: Failed to receive data");
}
catch (XdmfError e)
{
throw e;
}
}
}
void
XdmfDSMBuffer::ReceiveAcknowledgment(int source, int &data, int tag, int comm)
{
int status;
MPI_Status signalStatus;
if (comm == XDMF_DSM_INTRA_COMM)
{
status = MPI_Recv(&data, sizeof(int), MPI_UNSIGNED_CHAR, source, tag, static_cast<XdmfDSMCommMPI *>(this->Comm)->GetIntraComm(), &signalStatus);
}
else if (comm == XDMF_DSM_INTER_COMM)
{
status = MPI_Recv(&data, sizeof(int), MPI_UNSIGNED_CHAR, source, tag, static_cast<XdmfDSMCommMPI *>(this->Comm)->GetInterComm(), &signalStatus);
}
else
{
status = MPI_Recv(&data, sizeof(int), MPI_UNSIGNED_CHAR, source, tag, comm, &signalStatus);
}
if (status != MPI_SUCCESS)
{
try
{
XdmfError::message(XdmfError::FATAL, "Error: Failed to receive data");
}
catch (XdmfError e)
{
throw e;
}
}
}
void
XdmfDSMBuffer::SendInfo()
{
InfoMsg dsmInfo;
int status;
memset(&dsmInfo, 0, sizeof(InfoMsg));
dsmInfo.type = this->GetDsmType();
dsmInfo.length = this->GetLength();
dsmInfo.total_length = this->GetTotalLength();
dsmInfo.block_length = this->GetBlockLength();
dsmInfo.start_server_id = this->GetStartServerId();
dsmInfo.end_server_id = this->GetEndServerId();
if (this->Comm->GetId() == 0)
{
status = MPI_Send(&dsmInfo, sizeof(InfoMsg), MPI_UNSIGNED_CHAR, 0, XDMF_DSM_EXCHANGE_TAG, static_cast<XdmfDSMCommMPI *>(this->Comm)->GetInterComm());
if (status != MPI_SUCCESS)
{
try
{
XdmfError::message(XdmfError::FATAL, "Error: Failed to send info");
}
catch (XdmfError e)
{
throw e;
}
}
}
status = MPI_Barrier(this->Comm->GetIntraComm());
if (status != MPI_SUCCESS)
{
try
{
XdmfError::message(XdmfError::FATAL, "Error: Failed to send info");
}
catch (XdmfError e)
{
throw e;
}
}
}
void
XdmfDSMBuffer::ReceiveInfo()
{
InfoMsg dsmInfo;
int status;
MPI_Status signalStatus;
memset(&dsmInfo, 0, sizeof(InfoMsg));
if (this->Comm->GetId() == 0)
{
status = MPI_Recv(&dsmInfo, sizeof(InfoMsg), MPI_UNSIGNED_CHAR, XDMF_DSM_ANY_SOURCE, XDMF_DSM_EXCHANGE_TAG, static_cast<XdmfDSMCommMPI *>(this->Comm)->GetInterComm(), &signalStatus);
if (status != MPI_SUCCESS)
{
try