XdmfHDF5WriterDSM.cpp 15.5 KB
Newer Older
Kenneth Leiter's avatar
Kenneth Leiter committed
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
/*****************************************************************************/
/*                                    XDMF                                   */
/*                       eXtensible Data Model and Format                    */
/*                                                                           */
/*  Id : XdmfHDF5WriterDSM.cpp                                               */
/*                                                                           */
/*  Author:                                                                  */
/*     Kenneth Leiter                                                        */
/*     kenneth.leiter@arl.army.mil                                           */
/*     US Army Research Laboratory                                           */
/*     Aberdeen Proving Ground, MD                                           */
/*                                                                           */
/*     Copyright @ 2011 US Army Research Laboratory                          */
/*     All Rights Reserved                                                   */
/*     See Copyright.txt for details                                         */
/*                                                                           */
/*     This software is distributed WITHOUT ANY WARRANTY; without            */
/*     even the implied warranty of MERCHANTABILITY or FITNESS               */
/*     FOR A PARTICULAR PURPOSE.  See the above copyright notice             */
/*     for more information.                                                 */
/*                                                                           */
/*****************************************************************************/
23

24 25 26 27 28 29 30
#ifdef XDMF_BUILD_DSM_THREADS
  #include <H5FDdsm.h>
  #include <H5FDdsmManager.h>
  #include <H5FDdsmBuffer.h>
  #include <H5FDdsmBufferService.h>
  #include <H5FDdsmComm.h>
#endif
31
#include <hdf5.h>
32 33 34 35
#include <XdmfDSMCommMPI.hpp>
#include <XdmfDSMBuffer.hpp>
#include <XdmfDSMManager.hpp>
#include <XdmfDSMDriver.hpp>
36 37
#include "XdmfHDF5ControllerDSM.hpp"
#include "XdmfHDF5WriterDSM.hpp"
38
#include "XdmfError.hpp"
39

40
#ifdef XDMF_BUILD_DSM_THREADS
41
shared_ptr<XdmfHDF5WriterDSM>
42 43
XdmfHDF5WriterDSM::New(const std::string & filePath,
                       H5FDdsmBuffer * const dsmBuffer)
44
{
45 46
  shared_ptr<XdmfHDF5WriterDSM> p(new XdmfHDF5WriterDSM(filePath,
                                                        dsmBuffer));
47
  return p;
48
}
49
#endif
50

51 52
shared_ptr<XdmfHDF5WriterDSM>
XdmfHDF5WriterDSM::New(const std::string & filePath,
53
                       XdmfDSMBuffer * const dsmBuffer)
54 55
{
  shared_ptr<XdmfHDF5WriterDSM> p(new XdmfHDF5WriterDSM(filePath,
56
                                                        dsmBuffer));
57 58 59
  return p;
}

60
#ifdef XDMF_BUILD_DSM_THREADS
61 62 63 64 65 66 67 68 69 70
shared_ptr<XdmfHDF5WriterDSM>
XdmfHDF5WriterDSM::New(const std::string & filePath,
                       MPI_Comm comm,
                       unsigned int bufferSize)
{
  shared_ptr<XdmfHDF5WriterDSM> p(new XdmfHDF5WriterDSM(filePath,
                                                        comm,
                                                        bufferSize));
  return p;
}
71
#endif
72

73 74 75 76 77 78 79 80 81 82 83 84 85 86 87
shared_ptr<XdmfHDF5WriterDSM>
XdmfHDF5WriterDSM::New(const std::string & filePath,
                       MPI_Comm comm,
                       unsigned int bufferSize,
                       int startCoreIndex,
                       int endCoreIndex)
{
  shared_ptr<XdmfHDF5WriterDSM> p(new XdmfHDF5WriterDSM(filePath,
                                                        comm,
                                                        bufferSize,
                                                        startCoreIndex,
                                                        endCoreIndex));
  return p;
}

88
#ifdef XDMF_BUILD_DSM_THREADS
89 90 91
XdmfHDF5WriterDSM::XdmfHDF5WriterDSM(const std::string & filePath,
                                     H5FDdsmBuffer * const dsmBuffer) :
  XdmfHDF5Writer(filePath),
92
  mDSMManager(NULL),
93
  mDSMBuffer(dsmBuffer),
94 95 96 97 98
  mFAPL(-1),
  mDSMServerManager(NULL),
  mDSMServerBuffer(NULL),
  mWorkerComm(MPI_COMM_NULL),
  mServerMode(false)
99 100 101
{
}

102 103 104 105
XdmfHDF5WriterDSM::XdmfHDF5WriterDSM(const std::string & filePath,
                                     MPI_Comm comm,
                                     unsigned int bufferSize) :
  XdmfHDF5Writer(filePath),
106 107 108 109 110
  mFAPL(-1),
  mDSMServerManager(NULL),
  mDSMServerBuffer(NULL),
  mWorkerComm(MPI_COMM_NULL),
  mServerMode(false)
111
{
112 113 114 115 116
  H5FDdsmManager * newManager = new H5FDdsmManager();
  newManager->SetMpiComm(comm);
  newManager->SetLocalBufferSizeMBytes(bufferSize);
  newManager->SetIsStandAlone(H5FD_DSM_TRUE);
  newManager->Create();
117

118
  H5FD_dsm_set_manager(newManager);
119

120
  H5FD_dsm_set_options(H5FD_DSM_LOCK_ASYNCHRONOUS);
121

122 123 124
  H5FDdsmBuffer * newBuffer = newManager->GetDsmBuffer();
  mDSMManager = newManager;
  mDSMBuffer = newBuffer;
125
}
126
#endif
127

128
// The database/nonthreaded version
129
XdmfHDF5WriterDSM::XdmfHDF5WriterDSM(const std::string & filePath,
130
                                     XdmfDSMBuffer * const dsmBuffer) :
131
  XdmfHDF5Writer(filePath),
132
#ifdef XDMF_BUILD_DSM_THREADS
133 134
  mDSMManager(NULL),
  mDSMBuffer(NULL),
135
#endif
136 137 138
  mFAPL(-1),
  mDSMServerManager(NULL),
  mDSMServerBuffer(dsmBuffer),
139
  mServerMode(true)
140
{
141 142 143 144
  mWorkerComm = mDSMServerBuffer->GetComm()->GetIntraComm();
  if (xdmf_dsm_get_manager() == NULL) {
    mDSMServerManager = new XdmfDSMManager();
    mDSMServerManager->SetLocalBufferSizeMBytes(mDSMServerBuffer->GetLength());
145
    mDSMServerManager->SetInterCommType(XDMF_DSM_COMM_MPI);
146 147 148 149
    mDSMServerManager->SetIsServer(false);
    mDSMServerManager->SetMpiComm(mDSMServerBuffer->GetComm()->GetIntraComm());
    mDSMServerManager->SetDsmBuffer(mDSMServerBuffer);
    XDMF_dsm_set_manager(mDSMServerManager);
150 151
  }
  else {
152
    static_cast<XdmfDSMManager *>(xdmf_dsm_get_manager())->SetDsmBuffer(mDSMServerBuffer);
153 154 155 156 157 158 159 160 161 162
  }
}

XdmfHDF5WriterDSM::XdmfHDF5WriterDSM(const std::string & filePath,
                                     MPI_Comm comm,
                                     unsigned int bufferSize,
                                     int startCoreIndex,
                                     int endCoreIndex) :
  XdmfHDF5Writer(filePath),
  mFAPL(-1),
163
#ifdef XDMF_BUILD_DSM_THREADS
164 165
  mDSMManager(NULL),
  mDSMBuffer(NULL),
166
#endif
167 168
  mServerMode(true)
{
169 170 171 172 173
  int rank, size;

  MPI_Comm_size(comm, &size);
  MPI_Comm_rank(comm, &rank);

174 175 176 177 178
  // Negative values will be changed to maximum range
  if (startCoreIndex < 0) {
    startCoreIndex = 0;
  }
  if (endCoreIndex < 0) {
179
    endCoreIndex = size - 1;
180 181 182 183 184 185 186 187 188
  }

  // Ensure start index is less than end index
  if (startCoreIndex > endCoreIndex) {
    int tempholder = startCoreIndex;
    startCoreIndex = endCoreIndex;
    endCoreIndex = tempholder;
  }

189
  MPI_Comm serverComm;
190 191 192

  MPI_Group workers, dsmgroup, serversplit, servergroup;

193
  int * ServerIds = (int *)calloc((endCoreIndex - startCoreIndex + 1), sizeof(int));
194
  unsigned int index = 0;
195
  for(int i=startCoreIndex ; i <= endCoreIndex ; ++i) {
196 197 198 199 200
    ServerIds[index++] = i;
  }

  MPI_Comm_group(comm, &serversplit);
  MPI_Group_incl(serversplit, index, ServerIds, &servergroup);
201
  MPI_Comm_create(comm, servergroup, &serverComm);
202 203 204 205 206 207 208 209 210 211
  MPI_Comm_group(comm, &dsmgroup);
  MPI_Group_excl(dsmgroup, index, ServerIds, &workers);
  MPI_Comm_create(comm, workers, &mWorkerComm);
  cfree(ServerIds);

  // Create the manager

  mDSMServerManager = new XdmfDSMManager();

  mDSMServerManager->SetLocalBufferSizeMBytes(bufferSize);
212
  mDSMServerManager->SetInterCommType(XDMF_DSM_COMM_MPI);
213

214
  MPI_Barrier(comm);
215

216 217
  if (rank >= startCoreIndex && rank <= endCoreIndex) {
    mDSMServerManager->SetMpiComm(serverComm);
218 219 220 221 222
    mDSMServerManager->Create();
  }
  else {
    mDSMServerManager->SetMpiComm(mWorkerComm);
    mDSMServerManager->SetIsServer(false);
223
    mDSMServerManager->Create(startCoreIndex, endCoreIndex);
224 225 226 227 228 229
  }

  XDMF_dsm_set_manager(mDSMServerManager);

  mDSMServerBuffer = mDSMServerManager->GetDsmBuffer();

230
  mDSMServerBuffer->GetComm()->DupInterComm(comm);
231 232
  mDSMServerBuffer->SetIsConnected(true);

233 234 235 236 237 238 239 240 241
  if (startCoreIndex < size) {
    if (rank >= startCoreIndex && rank <= endCoreIndex) {
      mDSMServerManager->GetDsmBuffer()->ReceiveInfo();
    }
    else {
      mDSMServerManager->GetDsmBuffer()->SendInfo();
    }
  }

242 243
  MPI_Barrier(comm);

244 245
  // Loop needs to be started before anything can be done to the file
  // since the service is what sets up the file
246

247
  if (rank < startCoreIndex || rank > endCoreIndex) {
248
    // Turn off the server designation
249
    mDSMServerBuffer->SetIsServer(false);
250 251
    // If this is set to false then the buffer will attempt to connect
    // to the intercomm for DSM communications
252
    mDSMServerManager->SetIsServer(false);
253 254 255 256
  }
  else {
    // On cores where memory is set up, start the service loop
    // This should iterate infinitely until a value to end the loop is passed
257
    int returnOpCode;
258 259 260 261 262 263 264
    try {
      mDSMServerBuffer->BufferServiceLoop(&returnOpCode);
    }
    catch (XdmfError e) {
      throw e;
    }
  }
265 266
}

267 268
XdmfHDF5WriterDSM::~XdmfHDF5WriterDSM()
{
269
  
270 271
}

272 273 274 275 276 277 278 279
shared_ptr<XdmfHeavyDataController>
XdmfHDF5WriterDSM::createController(const std::string & hdf5FilePath,
                                    const std::string & dataSetPath,
                                    const shared_ptr<const XdmfArrayType> type,
                                    const std::vector<unsigned int> & start,
                                    const std::vector<unsigned int> & stride,
                                    const std::vector<unsigned int> & dimensions,
                                    const std::vector<unsigned int> & dataspaceDimensions)
280
{
281 282 283 284 285 286 287 288
  if (mDSMServerBuffer != NULL) {
        return XdmfHDF5ControllerDSM::New(hdf5FilePath,
                                      dataSetPath,
                                      type,
                                      start,
                                      stride,
                                      dimensions,
                                      dataspaceDimensions,
289
                                      mDSMServerBuffer);
290
  }
291 292
#ifdef XDMF_BUILD_DSM_THREADS
  else if (mDSMBuffer != NULL) {
293 294 295 296 297 298 299 300 301
    return XdmfHDF5ControllerDSM::New(hdf5FilePath,
                                      dataSetPath,
                                      type,
                                      start,
                                      stride,
                                      dimensions,
                                      dataspaceDimensions,
                                      mDSMBuffer);
  }
302 303 304 305
#endif
  else {
    return shared_ptr<XdmfHDF5ControllerDSM>();
  }
306 307
}

308
void XdmfHDF5WriterDSM::deleteManager()
309
{
310
#ifdef XDMF_BUILD_DSM_THREADS
311 312 313 314
  if (mDSMManager != NULL)
  {
    delete mDSMManager;
  }
315
#endif
316 317 318 319 320 321 322 323 324 325 326 327 328 329 330
  if (mDSMServerManager != NULL)
  {
    closeFile();
    delete mDSMServerManager;
  }
}

void
XdmfHDF5WriterDSM::closeFile()
{
  if(mFAPL >= 0) {
    herr_t status = H5Pclose(mFAPL);
    mFAPL = -1;
  }
  XdmfHDF5Writer::closeFile();
331 332
}

333
#ifdef XDMF_BUILD_DSM_THREADS
334 335 336 337 338
H5FDdsmBuffer * XdmfHDF5WriterDSM::getBuffer()
{
  return mDSMBuffer;
}

339
H5FDdsmManager * XdmfHDF5WriterDSM::getManager()
340
{
341
  return mDSMManager;
342
}
343
#endif
344 345 346 347 348 349

XdmfDSMBuffer * XdmfHDF5WriterDSM::getServerBuffer()
{
  return mDSMServerBuffer;
}

350 351 352 353 354
XdmfDSMManager * XdmfHDF5WriterDSM::getServerManager()
{
  return mDSMServerManager;
}

355 356 357 358 359 360 361
bool XdmfHDF5WriterDSM::getServerMode()
{
  return mServerMode;
}

MPI_Comm XdmfHDF5WriterDSM::getWorkerComm()
{
362 363 364
  MPI_Comm returnComm;
  int status = MPI_Comm_dup(mWorkerComm, &returnComm);
  return returnComm;
365 366
}

367 368 369 370 371 372
void XdmfHDF5WriterDSM::setAllowSetSplitting(bool newAllow)
{
  //overrides to disable the parent version
  XdmfHDF5Writer::setAllowSetSplitting(false); 
}

373
#ifdef XDMF_BUILD_DSM_THREADS
374
void XdmfHDF5WriterDSM::setBuffer(H5FDdsmBuffer * newBuffer)
375
{
376
  mDSMBuffer = newBuffer;
377
}
378
#endif
379

380
void XdmfHDF5WriterDSM::setBuffer(XdmfDSMBuffer * newBuffer)
381
{
382
  mDSMServerBuffer = newBuffer;
383 384
}

385
#ifdef XDMF_BUILD_DSM_THREADS
386
void XdmfHDF5WriterDSM::setManager(H5FDdsmManager * newManager)
387
{
388 389
  H5FDdsmBuffer * newBuffer = newManager->GetDsmBuffer();
  mDSMManager = newManager;
390 391
  mDSMBuffer = newBuffer;
}
392
#endif
393

394 395 396 397 398 399 400 401 402 403 404 405 406 407
void XdmfHDF5WriterDSM::setManager(XdmfDSMManager * newManager)
{
  XdmfDSMBuffer * newBuffer = newManager->GetDsmBuffer();
  mDSMServerManager = newManager;
  mDSMServerBuffer = newBuffer;
}

void XdmfHDF5WriterDSM::setServerMode(bool newMode)
{
  mServerMode = newMode;
}

void XdmfHDF5WriterDSM::setWorkerComm(MPI_Comm comm)
{
408
  int status;
409
#ifndef OPEN_MPI
410 411 412 413 414 415 416 417 418 419 420
  if (mWorkerComm != MPI_COMM_NULL) {
    status = MPI_Comm_free(&mWorkerComm);
    if (status != MPI_SUCCESS) {
      try {
        XdmfError::message(XdmfError::FATAL, "Failed to disconnect Comm");
      }
      catch (XdmfError e) {
        throw e;
      }
    }
  }
421
#endif
422 423 424 425 426 427 428 429 430 431 432 433
  if (comm != MPI_COMM_NULL) {
    status = MPI_Comm_dup(comm, &mWorkerComm);
    if (status != MPI_SUCCESS) {
      try {
        XdmfError::message(XdmfError::FATAL, "Failed to duplicate Comm");
      }
      catch (XdmfError e) {
        throw e;
      }
    }
  }
  mDSMServerBuffer->GetComm()->DupComm(comm);
434 435 436 437
}

void XdmfHDF5WriterDSM::stopDSM()
{
438 439 440 441 442 443 444 445 446 447 448 449 450 451
  if (mServerMode) {
    // Send manually
    for (int i = mDSMServerBuffer->GetStartServerId();
         i <= mDSMServerBuffer->GetEndServerId();
         ++i) {
      try {
        mDSMServerBuffer->SendCommandHeader(XDMF_DSM_OPCODE_DONE, i, 0, 0, XDMF_DSM_INTER_COMM);
      }
      catch (XdmfError e) {
        throw e;
      }
    }
  }
  else {
452
    try {
453
      XdmfError::message(XdmfError::FATAL, "Error: Stopping DSM manually only available in server mode.");
454 455 456 457 458
    }
    catch (XdmfError e) {
      throw e;
    }
  }
459 460 461 462
}

void XdmfHDF5WriterDSM::restartDSM()
{
463 464 465 466 467 468 469 470 471 472 473 474 475 476 477
  if (mServerMode) {
    if (mDSMServerBuffer->GetComm()->GetInterId() >=
          mDSMServerBuffer->GetStartServerId() &&
        mDSMServerBuffer->GetComm()->GetInterId() <=
          mDSMServerBuffer->GetEndServerId()) {
      int returnOpCode;
      try {
        mDSMServerBuffer->BufferServiceLoop(&returnOpCode);
      }
      catch (XdmfError e) {
        throw e;
      }
    }
  }
  else {
478
    try {
479
      XdmfError::message(XdmfError::FATAL, "Error: Restarting DSM only available in server mode.");
480 481 482 483
    }
    catch (XdmfError e) {
      throw e;
    }
484 485 486 487 488
  }
}

void 
XdmfHDF5WriterDSM::openFile()
489
{
490 491 492 493
  if(mFAPL >= 0) {
    this->closeFile();
  }

494
  // Set file access property list for DSM
495
  mFAPL = H5Pcreate(H5P_FILE_ACCESS);
496

497 498 499 500 501 502
  if (mServerMode) {
    if (mWorkerComm != MPI_COMM_NULL) {
      XDMFH5Pset_fapl_dsm(mFAPL, mWorkerComm, mDSMServerBuffer, 0);
    }
  }
  else {
503
#ifdef XDMF_BUILD_DSM_THREADS
504
    H5Pset_fapl_dsm(mFAPL, MPI_COMM_WORLD, mDSMBuffer, 0);
505 506 507 508 509 510 511 512
#else
    try {
      XdmfError::message(XdmfError::FATAL, "Error: Threaded DSM not enabled.");
    }
    catch (XdmfError e) {
      throw e;
    }
#endif
513
  }
514 515 516 517 518 519 520
  XdmfHDF5Writer::openFile(mFAPL);
}

void XdmfHDF5WriterDSM::visit(XdmfArray & array,
                              const shared_ptr<XdmfBaseVisitor>)
{
  bool closeFAPL = false;
521

522
  if(mFAPL < 0) {
523 524 525
    // Set file access property list for DSM
    mFAPL = H5Pcreate(H5P_FILE_ACCESS);
    // Use DSM driver
526 527 528 529 530 531
    if (mServerMode) {
      if (mWorkerComm != MPI_COMM_NULL) {
        XDMFH5Pset_fapl_dsm(mFAPL, mWorkerComm, mDSMServerBuffer, 0);
      }
    }
    else {
532
#ifdef XDMF_BUILD_DSM_THREADS
533
      H5Pset_fapl_dsm(mFAPL, MPI_COMM_WORLD, mDSMBuffer, 0);
534 535 536 537 538 539 540 541
#else
      try {
        XdmfError::message(XdmfError::FATAL, "Error: Threaded DSM not enabled.");
      }
      catch (XdmfError e) {
        throw e;
      }
#endif
542
    }
543 544 545

    closeFAPL = true;
  }
546

547
  // Write to DSM Buffer
548
  this->write(array, mFAPL);
549

550 551 552 553 554
  if(closeFAPL) {
    // Close file access property list
    herr_t status = H5Pclose(mFAPL);
    mFAPL = -1;
  }
555

556
}