From 9a1ea4b93db51c4d41ff999237bfcd0451a9ff0c Mon Sep 17 00:00:00 2001 From: hrchilds Date: Sun, 31 Jan 2016 23:10:39 +0000 Subject: [PATCH] Merge Phi branch to trunk, picking up changes for threading/Phi git-svn-id: http://visit.ilight.com/svn/visit/trunk/src@27978 18c085ea-50e0-402c-830e-de6fd14e8384 --- avt/Pipeline/Pipeline/ThreadPool.C | 53 ++++++- avt/Pipeline/Pipeline/ThreadPool.h | 6 + avt/Pipeline/Pipeline/avtExecutionManager.C | 56 ++++++- common/misc/DebugStreamFull.C | 142 +++++++++++++----- common/misc/DebugStreamFull.h | 4 +- common/misc/VisItInit.C | 100 +++++++++++- common/misc/VisItInit.h | 10 ++ config-site/beacon-login1.cmake | 28 ++++ config-site/beacon-login2.cmake | 1 + .../blackrock_client.engr.utk.edu.cmake | 89 +++++++++++ .../blackrock_server.engr.utk.edu.cmake | 32 ++++ engine/main/Engine.C | 21 ++- resources/hosts/nics/host_beacon004_mic0.xml | 23 +++ 13 files changed, 521 insertions(+), 44 deletions(-) create mode 100644 config-site/beacon-login1.cmake create mode 120000 config-site/beacon-login2.cmake create mode 100644 config-site/blackrock_client.engr.utk.edu.cmake create mode 100644 config-site/blackrock_server.engr.utk.edu.cmake create mode 100644 resources/hosts/nics/host_beacon004_mic0.xml diff --git a/avt/Pipeline/Pipeline/ThreadPool.C b/avt/Pipeline/Pipeline/ThreadPool.C index 4b7455fd8fc..dcd530e6846 100644 --- a/avt/Pipeline/Pipeline/ThreadPool.C +++ b/avt/Pipeline/Pipeline/ThreadPool.C @@ -10,6 +10,7 @@ #include #include "ThreadPool.h" #include +#include void *ThreadPoolThread( void * ); @@ -19,6 +20,27 @@ struct ThreadInfo int id; }; +int +GetPthreadIDCallback(void *a) +{ + ThreadPool *tp = (ThreadPool *) a; + return tp->GetPthreadID(); +} + +int +ThreadPool::GetPthreadID() +{ + pthread_t my_id = pthread_self(); + if (my_id == mainThread) + return 0; + for (int i = 0 ; i < numberThreads ; i++) + { + if (threads[i] == my_id) + return i; + } + return -1; // ?? ... and don't issue debug statement, since we would get infinite recursion +} + /*********************************************************************** return: pointer to a new ThreadPool. @@ -36,6 +58,7 @@ ThreadPool * ThreadPool::Create( int numWorkerThreads, int maxQueueSize, int doN } // Initialize the fields. + tpool->mainThread = pthread_self(); tpool->numberThreads = numWorkerThreads; tpool->maxQueueSize = maxQueueSize; tpool->doNotBlockWhenFull = doNotBlockWhenFull; @@ -89,6 +112,8 @@ ThreadPool * ThreadPool::Create( int numWorkerThreads, int maxQueueSize, int doN } } + VisItInit::RegisterThreadIDFunction(GetPthreadIDCallback, tpool); + return( tpool ); } @@ -319,7 +344,7 @@ int ThreadPool::JoinNoExit() return( 1 ); } - while( currentQueueSize || threadWorking.to_ulong() ) + while( currentQueueSize || threadWorking.to_ulong() || threadWorking2.to_ulong() || threadWorking3.to_ulong() || threadWorking4.to_ulong()) { if( (rtn = pthread_cond_wait(&queueEmpty, &queueLock)) != 0 ) { @@ -356,7 +381,13 @@ void *ThreadPoolThread( void *arg ) #ifndef TURN_OFF_THREAD_SET_AFFINITY cpu_set_t cpuset; CPU_ZERO( &cpuset ); +#ifdef __KNC__ + int numCores = sysconf( _SC_NPROCESSORS_ONLN ); + CPU_SET(((tI->id-1)*4)%numCores + ((tI->id-1)*4)/numCores + 1, &cpuset); +#else CPU_SET( (tI->id - 1), &cpuset ); +#endif + if( (rtn = pthread_setaffinity_np( pthread_self(), sizeof(cpu_set_t), &cpuset )) != 0 ) { debug1 << "ThreadPoolThread pthread_setaffinity_np: " << strerror(rtn) << endl; @@ -376,8 +407,15 @@ void *ThreadPoolThread( void *arg ) while( (tpool->currentQueueSize == 0) && (! tpool->shutdown) ) { // Check if we should send the empty queue message. - tpool->threadWorking[tI->id] = 0; - if( tpool->threadWorking.to_ulong() == 0 ) + if( tI->id < 64) + tpool->threadWorking[tI->id] = 0; + else if( tI->id <128) + tpool->threadWorking2[tI->id-64] = 0; + else if( tI->id < 192) + tpool->threadWorking3[tI->id-128] = 0; + else if( tI->id < 256) + tpool->threadWorking4[tI->id-192] = 0; + if( tpool->threadWorking.to_ulong() == 0 && tpool->threadWorking2.to_ulong() == 0 && tpool->threadWorking3.to_ulong() == 0 && tpool->threadWorking3.to_ulong() == 0 ) { if( (rtn = pthread_cond_signal(&(tpool->queueEmpty))) != 0 ) { @@ -415,7 +453,14 @@ void *ThreadPoolThread( void *arg ) tpool->queueHead = my_workp->next; // Set that this thread is working. - tpool->threadWorking[tI->id] = 1; + if( tI->id < 64) + tpool->threadWorking[tI->id] = 1; + else if( tI->id <128) + tpool->threadWorking2[tI->id-64] = 1; + else if( tI->id < 192) + tpool->threadWorking3[tI->id-128] = 1; + else if( tI->id < 256) + tpool->threadWorking4[tI->id-192] = 1; // Handle waiting add_work threads. if( (!tpool->doNotBlockWhenFull) && (tpool->currentQueueSize == (tpool->maxQueueSize - 1)) ) diff --git a/avt/Pipeline/Pipeline/ThreadPool.h b/avt/Pipeline/Pipeline/ThreadPool.h index b1963b787ad..5674a19e607 100644 --- a/avt/Pipeline/Pipeline/ThreadPool.h +++ b/avt/Pipeline/Pipeline/ThreadPool.h @@ -53,6 +53,8 @@ class ThreadPool return( numberThreads ); } + int GetPthreadID(); + private: ThreadPool() {} ThreadPool( const ThreadPool &obj ) {} @@ -74,6 +76,7 @@ class ThreadPool // state. pthread_t *threads; + pthread_t mainThread; int currentQueueSize; ThreadPoolWork *queueHead; ThreadPoolWork *queueTail; @@ -83,6 +86,9 @@ class ThreadPool // Used with the queueEmpty condition to know if all work is done. // TODO: make a bitset class that can handle any number of threads std::bitset<64> threadWorking; + std::bitset<64> threadWorking2; + std::bitset<64> threadWorking3; + std::bitset<64> threadWorking4; // synchronization. pthread_mutex_t queueLock; diff --git a/avt/Pipeline/Pipeline/avtExecutionManager.C b/avt/Pipeline/Pipeline/avtExecutionManager.C index 4ee1c04f7e5..d7c36a1c0f3 100644 --- a/avt/Pipeline/Pipeline/avtExecutionManager.C +++ b/avt/Pipeline/Pipeline/avtExecutionManager.C @@ -2,6 +2,7 @@ #include #include +#include #include "avtExecutionManager.h" @@ -64,20 +65,37 @@ void avtExecutionManager::SetNumberOfThreads( int nt ) // Failed to create threads, we need to exit. // TODO: I think we need to throw an error. } + else + { + VisItInit::SetNumberOfThreads(numThreads); + } } void avtExecutionManager::ScheduleWork( void (*workerThreadFunction)(void *), void *cbdata ) { - tPool->AddWork( workerThreadFunction, cbdata ); + if (tPool == NULL) + { + workerThreadFunction(cbdata); + } + else + { + tPool->AddWork( workerThreadFunction, cbdata ); + } } void avtExecutionManager::FinishWork() { + if (tPool == NULL) + return; + tPool->JoinNoExit(); } MUTEX * avtExecutionManager::FindMutex( const MUTEX_ID id ) { + if (tPool == NULL) + return NULL; + std::map::iterator it; MUTEX *lock; @@ -104,6 +122,9 @@ MUTEX * avtExecutionManager::FindMutex( const MUTEX_ID id ) MUTEX * avtExecutionManager::RemoveMutex( const MUTEX_ID id ) { + if (tPool == NULL) + return NULL; + std::map::iterator it; MUTEX *lock; @@ -124,21 +145,33 @@ MUTEX * avtExecutionManager::RemoveMutex( const MUTEX_ID id ) void avtExecutionManager::MutexLock( const MUTEX_ID stringID ) { + if (tPool == NULL) + return; + MutexLock( FindMutex(stringID) ); } void avtExecutionManager::MutexUnlock( const MUTEX_ID stringID ) { + if (tPool == NULL) + return; + MutexUnlock( FindMutex(stringID) ); } void avtExecutionManager::MutexDestroy( const MUTEX_ID stringID ) { + if (tPool == NULL) + return; + MutexDestroy( RemoveMutex(stringID) ); } void avtExecutionManager::MutexInit( MUTEX *lock ) { + if (tPool == NULL) + return; + int rtn; if( (rtn = pthread_mutex_init(lock, NULL)) ) { @@ -148,6 +181,9 @@ void avtExecutionManager::MutexInit( MUTEX *lock ) void avtExecutionManager::MutexDestroy( MUTEX *lock, bool delMutex ) { + if (tPool == NULL) + return; + int rtn; if( (rtn = pthread_mutex_destroy(lock)) ) { @@ -159,6 +195,9 @@ void avtExecutionManager::MutexDestroy( MUTEX *lock, bool delMutex ) void avtExecutionManager::MutexLock( MUTEX *lock ) { + if (tPool == NULL) + return; + int rtn; if( (rtn = pthread_mutex_lock(lock)) ) { @@ -168,6 +207,9 @@ void avtExecutionManager::MutexLock( MUTEX *lock ) void avtExecutionManager::MutexUnlock( MUTEX *lock ) { + if (tPool == NULL) + return; + int rtn; if( (rtn = pthread_mutex_unlock(lock)) ) { @@ -177,6 +219,9 @@ void avtExecutionManager::MutexUnlock( MUTEX *lock ) void avtExecutionManager::CondInit( COND *cond ) { + if (tPool == NULL) + return; + int rtn; if( (rtn = pthread_cond_init(cond, NULL)) ) { @@ -186,6 +231,9 @@ void avtExecutionManager::CondInit( COND *cond ) void avtExecutionManager::CondDestroy( COND *cond ) { + if (tPool == NULL) + return; + int rtn; if( (rtn = pthread_cond_destroy(cond)) ) { @@ -195,6 +243,9 @@ void avtExecutionManager::CondDestroy( COND *cond ) void avtExecutionManager::CondSignal( COND *cond ) { + if (tPool == NULL) + return; + int rtn; if( (rtn = pthread_cond_signal(cond)) ) { @@ -204,6 +255,9 @@ void avtExecutionManager::CondSignal( COND *cond ) void avtExecutionManager::CondWait( COND *cond, MUTEX *lock ) { + if (tPool == NULL) + return; + int rtn; if( (rtn = pthread_cond_wait(cond, lock)) ) { diff --git a/common/misc/DebugStreamFull.C b/common/misc/DebugStreamFull.C index 108c1281d33..809d78fccbd 100644 --- a/common/misc/DebugStreamFull.C +++ b/common/misc/DebugStreamFull.C @@ -39,6 +39,7 @@ #include #include #include +#include #if defined(_WIN32) #include @@ -57,6 +58,8 @@ using std::vector; vector DebugStreamFull::DebugStreamBuf::allBuffers; int DebugStreamFull::DebugStreamBuf::curLevel; +int DebugStreamFull::numThreadLogs = 1; + // global DebugStreams // We make these static so they are NOT visible outside this file static DebugStreamFull debug1_realobj(1); @@ -65,6 +68,14 @@ static DebugStreamFull debug3_realobj(3); static DebugStreamFull debug4_realobj(4); static DebugStreamFull debug5_realobj(5); +bool IsThread0(void) +{ + if (VisItInit::GetNumberOfThreads() <= 1) + return true; + if (VisItInit::GetMyThreadID() == 0) + return true; + return false; +} bool DebugStream::Level1() { return debug1_realobj.isenabled(); }; bool DebugStream::Level2() { return debug2_realobj.isenabled(); }; bool DebugStream::Level3() { return debug3_realobj.isenabled(); }; @@ -326,10 +337,15 @@ DebugStreamFull::DebugStreamBuf::SetLevel(int level_) void DebugStreamFull::DebugStreamBuf::close() { - if (out) + if(out == NULL) return; + + for(int i=0; iclose(); - delete out; + delete [] out; out = NULL; } } @@ -367,25 +383,33 @@ DebugStreamFull::DebugStreamBuf::open(const char *filename_, bool buffer_debug) { close(); strcpy(filename, filename_); - out = new ofstream; - out->open(filename, ios::out); - if (! *out) - { - delete out; - out = NULL; - } - else - { - // flush the buffer after every operation - if (!buffer_debug) - out->setf(ios::unitbuf); + out = new ofstream[numThreadLogs]; + + for(int i=0; irdbuf()->setbuf((char*)0,0); + // the previous flag does nothing on SunOS; + // I hate to do this, but I'm doing it to force automatic flushing: + if (!buffer_debug) + out[i].rdbuf()->setbuf((char*)0,0); #endif + } } + } @@ -410,12 +434,13 @@ DebugStreamFull::DebugStreamBuf::open(const char *filename_, bool buffer_debug) int DebugStreamFull::DebugStreamBuf::put(int c) { - if (out && + int t_id = VisItInit::GetMyThreadID(); + if ( t_id < numThreadLogs && out != NULL && out[t_id] && curLevel <= level) { if (c!=EOF) { - out->put((char)c); + out[t_id].put((char)c); } } return c; @@ -568,10 +593,10 @@ DebugStreamFull::open(const char *progname, bool clobber, bool buffer_debug) #ifdef WIN32 // On windows, we always use pids, so won't need to rename, and thus // don't need to prepend a letter. - sprintf(filename, "%s.%d.vlog", progname, level); + sprintf(filename, "%s.%d.thr", progname, level); #else - sprintf(filename, "A.%s.%d.vlog", progname, level); + sprintf(filename, "A.%s.%d.thr", progname, level); // only rename old vlogs if we don't have pids bool renameOld = !clobber && (strspn(progname, ".0123456789") == 0); @@ -581,16 +606,57 @@ DebugStreamFull::open(const char *progname, bool clobber, bool buffer_debug) { char filenametmp1[256]; char filenametmp2[256]; - sprintf(filenametmp1, "E.%s.%d.vlog", progname, level); - unlink(filenametmp1); // E->deleted - sprintf(filenametmp2, "D.%s.%d.vlog", progname, level); - rename(filenametmp2, filenametmp1); // D->E - sprintf(filenametmp1, "C.%s.%d.vlog", progname, level); - rename(filenametmp1, filenametmp2); // C->D - sprintf(filenametmp2, "B.%s.%d.vlog", progname, level); - rename(filenametmp2, filenametmp1); // B->C - sprintf(filenametmp1, "A.%s.%d.vlog", progname, level); - rename(filenametmp1, filenametmp2); // A->B + int fileThr = 0; + sprintf(filenametmp1, "E.%s.%d.thr%d.vlog", progname, level, fileThr); + while (access( filenametmp1, F_OK) != -1 ) + { + unlink(filenametmp1); // E->deleted + sprintf(filenametmp1, "E.%s.%d.thr%d.vlog", progname, level, ++fileThr); + } + fileThr = 0; + sprintf(filenametmp1, "E.%s.%d.thr%d.vlog", progname, level, fileThr); + sprintf(filenametmp2, "D.%s.%d.thr%d.vlog", progname, level, fileThr); + while (access( filenametmp2, F_OK) != -1 ) + { + rename(filenametmp2, filenametmp1); // D->E + + ++fileThr; + sprintf(filenametmp1, "E.%s.%d.thr%d.vlog", progname, level, fileThr); + sprintf(filenametmp2, "D.%s.%d.thr%d.vlog", progname, level, fileThr); + } + fileThr = 0; + sprintf(filenametmp1, "D.%s.%d.thr%d.vlog", progname, level, fileThr); + sprintf(filenametmp2, "C.%s.%d.thr%d.vlog", progname, level, fileThr); + while (access( filenametmp2, F_OK) != -1 ) + { + rename(filenametmp2, filenametmp1); // C->D + + ++fileThr; + sprintf(filenametmp1, "D.%s.%d.thr%d.vlog", progname, level, fileThr); + sprintf(filenametmp2, "C.%s.%d.thr%d.vlog", progname, level, fileThr); + } + fileThr = 0; + sprintf(filenametmp1, "C.%s.%d.thr%d.vlog", progname, level, fileThr); + sprintf(filenametmp2, "B.%s.%d.thr%d.vlog", progname, level, fileThr); + while (access( filenametmp2, F_OK) != -1 ) + { + rename(filenametmp2, filenametmp1); // B->C + + ++fileThr; + sprintf(filenametmp1, "C.%s.%d.thr%d.vlog", progname, level, fileThr); + sprintf(filenametmp2, "B.%s.%d.thr%d.vlog", progname, level, fileThr); + } + fileThr = 0; + sprintf(filenametmp1, "B.%s.%d.thr%d.vlog", progname, level, fileThr); + sprintf(filenametmp2, "A.%s.%d.thr%d.vlog", progname, level, fileThr); + while (access( filenametmp2, F_OK) != -1 ) + { + rename(filenametmp2, filenametmp1); // A->B + + ++fileThr; + sprintf(filenametmp1, "B.%s.%d.thr%d.vlog", progname, level, fileThr); + sprintf(filenametmp2, "A.%s.%d.thr%d.vlog", progname, level, fileThr); + } } #endif @@ -669,12 +735,20 @@ DebugStreamFull::close() // // Mark C. Miller, Tue Apr 14 16:01:49 PDT 2009 // Added option to buffer the debug logs. +// +// Elliott Ewing, Fri Oct 3 16:10:05 PDT 2015 +// Added argument to specify number of threaded logs from command line args. // **************************************************************************** void -DebugStreamFull::Initialize(const char *progname, int debuglevel, bool sigs, +DebugStreamFull::Initialize(const char *progname, int debuglevel, int numThreadLogs_, bool sigs, bool clobber, bool buffer_debug, bool _decorate) { + if(numThreadLogs_ >= VisItInit::GetNumberOfThreads()) + numThreadLogs = VisItInit::GetNumberOfThreads(); + else + numThreadLogs = numThreadLogs_; + switch (debuglevel) { case 5: debug5_realobj.open(progname, clobber, buffer_debug); diff --git a/common/misc/DebugStreamFull.h b/common/misc/DebugStreamFull.h index a6443394607..f6a53a3e757 100644 --- a/common/misc/DebugStreamFull.h +++ b/common/misc/DebugStreamFull.h @@ -99,7 +99,7 @@ class MISC_API DebugStreamFull : public ostream bool isdecorated() const { return decorate; } void open(const char *progname, bool, bool); void close(); - static void Initialize(const char *, int, bool=true, bool=false, bool=false, bool=false); + static void Initialize(const char *, int, int, bool=true, bool=false, bool=false, bool=false); private: class DebugStreamBuf : public streambuf { @@ -137,6 +137,8 @@ class MISC_API DebugStreamFull : public ostream bool enabled; // true if output should be decorated with __FILE__ and __LINE__ bool decorate; + // number of threads (and files) + static int numThreadLogs; }; #endif diff --git a/common/misc/VisItInit.C b/common/misc/VisItInit.C index e328a7e2f80..18809409d31 100644 --- a/common/misc/VisItInit.C +++ b/common/misc/VisItInit.C @@ -87,6 +87,9 @@ static ErrorFunction errorFunction = NULL; static void *errorFunctionArgs = NULL; static bool initializeCalled = false; static bool finalizeCalled = false; +static ThreadIDFunction threadIDFunction = NULL; +static void *threadIDFunctionArgs = NULL; +static int numThreads = 1; // **************************************************************************** // Function: striparg @@ -262,6 +265,7 @@ VisItInit::Initialize(int &argc, char *argv[], int r, int n, bool strip, bool si bool clobberVlogs = false; bool vtk_debug = false; bool enableTimings = false; + int threadDebugLogs=1; for (i=1; iStartTimer(); + int nthreads = 0; + for (int i = 0 ; i < *argc-1 ; i++) + { + if ((strcmp((*argv)[i], "-thread") == 0) || + (strcmp((*argv)[i], "-threads") == 0)) + { + nthreads = atoi((*argv)[i+1]); + if (nthreads > 0) + { + VisitSetNumberOfThreads(nthreads); + } + else + { + debug1 << "Invalid number of threads! Ignoring argument" << endl; + } + + } + } + #ifdef PARALLEL // We fork/exec X servers in some cases. Open MPI will yell at us about // it, but the warning is not relevant for us because our children are @@ -601,8 +620,6 @@ Engine::Initialize(int *argc, char **argv[], bool sigs) RuntimeSetting::parse_command_line(*argc, const_cast(*argv)); this->X_Args = RuntimeSetting::lookups("x-args"); - VisitSetNumberOfThreads( RuntimeSetting::lookupi("threads") ); - // // Set a different new handler for the engine // diff --git a/resources/hosts/nics/host_beacon004_mic0.xml b/resources/hosts/nics/host_beacon004_mic0.xml new file mode 100644 index 00000000000..cec1e411075 --- /dev/null +++ b/resources/hosts/nics/host_beacon004_mic0.xml @@ -0,0 +1,23 @@ + + + beacon004-mic0 + beacon004-mic0 + notset + + /tmp/pbstmp.38181.beacon-mgmt1.nics.utk.edu/visit2_9_0b.linux-x86_64 + false + false + 22 + true + "micssh" "-i" "/tmp/pbstmp.38181.beacon-mgmt1.nics.utk.edu/.micssh/micssh-id_rsa" + false + + MachineName + + false + false + 1 + false + 1 + -1 + -- GitLab