Commit 9a1ea4b9 authored by hrchilds's avatar hrchilds
Browse files

Merge Phi branch to trunk, picking up changes for threading/Phi

git-svn-id: http://visit.ilight.com/svn/visit/trunk/src@27978 18c085ea-50e0-402c-830e-de6fd14e8384
parent d8f96bc2
...@@ -10,6 +10,7 @@ ...@@ -10,6 +10,7 @@
#include <unistd.h> #include <unistd.h>
#include "ThreadPool.h" #include "ThreadPool.h"
#include <DebugStream.h> #include <DebugStream.h>
#include <VisItInit.h>
void *ThreadPoolThread( void * ); void *ThreadPoolThread( void * );
...@@ -19,6 +20,27 @@ struct ThreadInfo ...@@ -19,6 +20,27 @@ struct ThreadInfo
int id; int id;
}; };
int
GetPthreadIDCallback(void *a)
{
ThreadPool *tp = (ThreadPool *) a;
return tp->GetPthreadID();
}
int
ThreadPool::GetPthreadID()
{
pthread_t my_id = pthread_self();
if (my_id == mainThread)
return 0;
for (int i = 0 ; i < numberThreads ; i++)
{
if (threads[i] == my_id)
return i;
}
return -1; // ?? ... and don't issue debug statement, since we would get infinite recursion
}
/*********************************************************************** /***********************************************************************
return: return:
pointer to a new ThreadPool. pointer to a new ThreadPool.
...@@ -36,6 +58,7 @@ ThreadPool * ThreadPool::Create( int numWorkerThreads, int maxQueueSize, int doN ...@@ -36,6 +58,7 @@ ThreadPool * ThreadPool::Create( int numWorkerThreads, int maxQueueSize, int doN
} }
// Initialize the fields. // Initialize the fields.
tpool->mainThread = pthread_self();
tpool->numberThreads = numWorkerThreads; tpool->numberThreads = numWorkerThreads;
tpool->maxQueueSize = maxQueueSize; tpool->maxQueueSize = maxQueueSize;
tpool->doNotBlockWhenFull = doNotBlockWhenFull; tpool->doNotBlockWhenFull = doNotBlockWhenFull;
...@@ -89,6 +112,8 @@ ThreadPool * ThreadPool::Create( int numWorkerThreads, int maxQueueSize, int doN ...@@ -89,6 +112,8 @@ ThreadPool * ThreadPool::Create( int numWorkerThreads, int maxQueueSize, int doN
} }
} }
VisItInit::RegisterThreadIDFunction(GetPthreadIDCallback, tpool);
return( tpool ); return( tpool );
} }
...@@ -319,7 +344,7 @@ int ThreadPool::JoinNoExit() ...@@ -319,7 +344,7 @@ int ThreadPool::JoinNoExit()
return( 1 ); return( 1 );
} }
while( currentQueueSize || threadWorking.to_ulong() ) while( currentQueueSize || threadWorking.to_ulong() || threadWorking2.to_ulong() || threadWorking3.to_ulong() || threadWorking4.to_ulong())
{ {
if( (rtn = pthread_cond_wait(&queueEmpty, &queueLock)) != 0 ) if( (rtn = pthread_cond_wait(&queueEmpty, &queueLock)) != 0 )
{ {
...@@ -356,7 +381,13 @@ void *ThreadPoolThread( void *arg ) ...@@ -356,7 +381,13 @@ void *ThreadPoolThread( void *arg )
#ifndef TURN_OFF_THREAD_SET_AFFINITY #ifndef TURN_OFF_THREAD_SET_AFFINITY
cpu_set_t cpuset; cpu_set_t cpuset;
CPU_ZERO( &cpuset ); CPU_ZERO( &cpuset );
#ifdef __KNC__
int numCores = sysconf( _SC_NPROCESSORS_ONLN );
CPU_SET(((tI->id-1)*4)%numCores + ((tI->id-1)*4)/numCores + 1, &cpuset);
#else
CPU_SET( (tI->id - 1), &cpuset ); CPU_SET( (tI->id - 1), &cpuset );
#endif
if( (rtn = pthread_setaffinity_np( pthread_self(), sizeof(cpu_set_t), &cpuset )) != 0 ) if( (rtn = pthread_setaffinity_np( pthread_self(), sizeof(cpu_set_t), &cpuset )) != 0 )
{ {
debug1 << "ThreadPoolThread pthread_setaffinity_np: " << strerror(rtn) << endl; debug1 << "ThreadPoolThread pthread_setaffinity_np: " << strerror(rtn) << endl;
...@@ -376,8 +407,15 @@ void *ThreadPoolThread( void *arg ) ...@@ -376,8 +407,15 @@ void *ThreadPoolThread( void *arg )
while( (tpool->currentQueueSize == 0) && (! tpool->shutdown) ) while( (tpool->currentQueueSize == 0) && (! tpool->shutdown) )
{ {
// Check if we should send the empty queue message. // Check if we should send the empty queue message.
if( tI->id < 64)
tpool->threadWorking[tI->id] = 0; tpool->threadWorking[tI->id] = 0;
if( tpool->threadWorking.to_ulong() == 0 ) else if( tI->id <128)
tpool->threadWorking2[tI->id-64] = 0;
else if( tI->id < 192)
tpool->threadWorking3[tI->id-128] = 0;
else if( tI->id < 256)
tpool->threadWorking4[tI->id-192] = 0;
if( tpool->threadWorking.to_ulong() == 0 && tpool->threadWorking2.to_ulong() == 0 && tpool->threadWorking3.to_ulong() == 0 && tpool->threadWorking3.to_ulong() == 0 )
{ {
if( (rtn = pthread_cond_signal(&(tpool->queueEmpty))) != 0 ) if( (rtn = pthread_cond_signal(&(tpool->queueEmpty))) != 0 )
{ {
...@@ -415,7 +453,14 @@ void *ThreadPoolThread( void *arg ) ...@@ -415,7 +453,14 @@ void *ThreadPoolThread( void *arg )
tpool->queueHead = my_workp->next; tpool->queueHead = my_workp->next;
// Set that this thread is working. // Set that this thread is working.
if( tI->id < 64)
tpool->threadWorking[tI->id] = 1; tpool->threadWorking[tI->id] = 1;
else if( tI->id <128)
tpool->threadWorking2[tI->id-64] = 1;
else if( tI->id < 192)
tpool->threadWorking3[tI->id-128] = 1;
else if( tI->id < 256)
tpool->threadWorking4[tI->id-192] = 1;
// Handle waiting add_work threads. // Handle waiting add_work threads.
if( (!tpool->doNotBlockWhenFull) && (tpool->currentQueueSize == (tpool->maxQueueSize - 1)) ) if( (!tpool->doNotBlockWhenFull) && (tpool->currentQueueSize == (tpool->maxQueueSize - 1)) )
......
...@@ -53,6 +53,8 @@ class ThreadPool ...@@ -53,6 +53,8 @@ class ThreadPool
return( numberThreads ); return( numberThreads );
} }
int GetPthreadID();
private: private:
ThreadPool() {} ThreadPool() {}
ThreadPool( const ThreadPool &obj ) {} ThreadPool( const ThreadPool &obj ) {}
...@@ -74,6 +76,7 @@ class ThreadPool ...@@ -74,6 +76,7 @@ class ThreadPool
// state. // state.
pthread_t *threads; pthread_t *threads;
pthread_t mainThread;
int currentQueueSize; int currentQueueSize;
ThreadPoolWork *queueHead; ThreadPoolWork *queueHead;
ThreadPoolWork *queueTail; ThreadPoolWork *queueTail;
...@@ -83,6 +86,9 @@ class ThreadPool ...@@ -83,6 +86,9 @@ class ThreadPool
// Used with the queueEmpty condition to know if all work is done. // Used with the queueEmpty condition to know if all work is done.
// TODO: make a bitset class that can handle any number of threads // TODO: make a bitset class that can handle any number of threads
std::bitset<64> threadWorking; std::bitset<64> threadWorking;
std::bitset<64> threadWorking2;
std::bitset<64> threadWorking3;
std::bitset<64> threadWorking4;
// synchronization. // synchronization.
pthread_mutex_t queueLock; pthread_mutex_t queueLock;
......
...@@ -2,6 +2,7 @@ ...@@ -2,6 +2,7 @@
#include <stdlib.h> #include <stdlib.h>
#include <DebugStream.h> #include <DebugStream.h>
#include <VisItInit.h>
#include "avtExecutionManager.h" #include "avtExecutionManager.h"
...@@ -64,20 +65,37 @@ void avtExecutionManager::SetNumberOfThreads( int nt ) ...@@ -64,20 +65,37 @@ void avtExecutionManager::SetNumberOfThreads( int nt )
// Failed to create threads, we need to exit. // Failed to create threads, we need to exit.
// TODO: I think we need to throw an error. // TODO: I think we need to throw an error.
} }
else
{
VisItInit::SetNumberOfThreads(numThreads);
}
} }
void avtExecutionManager::ScheduleWork( void (*workerThreadFunction)(void *), void *cbdata ) void avtExecutionManager::ScheduleWork( void (*workerThreadFunction)(void *), void *cbdata )
{ {
if (tPool == NULL)
{
workerThreadFunction(cbdata);
}
else
{
tPool->AddWork( workerThreadFunction, cbdata ); tPool->AddWork( workerThreadFunction, cbdata );
}
} }
void avtExecutionManager::FinishWork() void avtExecutionManager::FinishWork()
{ {
if (tPool == NULL)
return;
tPool->JoinNoExit(); tPool->JoinNoExit();
} }
MUTEX * avtExecutionManager::FindMutex( const MUTEX_ID id ) MUTEX * avtExecutionManager::FindMutex( const MUTEX_ID id )
{ {
if (tPool == NULL)
return NULL;
std::map<MUTEX_ID, MUTEX *>::iterator it; std::map<MUTEX_ID, MUTEX *>::iterator it;
MUTEX *lock; MUTEX *lock;
...@@ -104,6 +122,9 @@ MUTEX * avtExecutionManager::FindMutex( const MUTEX_ID id ) ...@@ -104,6 +122,9 @@ MUTEX * avtExecutionManager::FindMutex( const MUTEX_ID id )
MUTEX * avtExecutionManager::RemoveMutex( const MUTEX_ID id ) MUTEX * avtExecutionManager::RemoveMutex( const MUTEX_ID id )
{ {
if (tPool == NULL)
return NULL;
std::map<MUTEX_ID, MUTEX *>::iterator it; std::map<MUTEX_ID, MUTEX *>::iterator it;
MUTEX *lock; MUTEX *lock;
...@@ -124,21 +145,33 @@ MUTEX * avtExecutionManager::RemoveMutex( const MUTEX_ID id ) ...@@ -124,21 +145,33 @@ MUTEX * avtExecutionManager::RemoveMutex( const MUTEX_ID id )
void avtExecutionManager::MutexLock( const MUTEX_ID stringID ) void avtExecutionManager::MutexLock( const MUTEX_ID stringID )
{ {
if (tPool == NULL)
return;
MutexLock( FindMutex(stringID) ); MutexLock( FindMutex(stringID) );
} }
void avtExecutionManager::MutexUnlock( const MUTEX_ID stringID ) void avtExecutionManager::MutexUnlock( const MUTEX_ID stringID )
{ {
if (tPool == NULL)
return;
MutexUnlock( FindMutex(stringID) ); MutexUnlock( FindMutex(stringID) );
} }
void avtExecutionManager::MutexDestroy( const MUTEX_ID stringID ) void avtExecutionManager::MutexDestroy( const MUTEX_ID stringID )
{ {
if (tPool == NULL)
return;
MutexDestroy( RemoveMutex(stringID) ); MutexDestroy( RemoveMutex(stringID) );
} }
void avtExecutionManager::MutexInit( MUTEX *lock ) void avtExecutionManager::MutexInit( MUTEX *lock )
{ {
if (tPool == NULL)
return;
int rtn; int rtn;
if( (rtn = pthread_mutex_init(lock, NULL)) ) if( (rtn = pthread_mutex_init(lock, NULL)) )
{ {
...@@ -148,6 +181,9 @@ void avtExecutionManager::MutexInit( MUTEX *lock ) ...@@ -148,6 +181,9 @@ void avtExecutionManager::MutexInit( MUTEX *lock )
void avtExecutionManager::MutexDestroy( MUTEX *lock, bool delMutex ) void avtExecutionManager::MutexDestroy( MUTEX *lock, bool delMutex )
{ {
if (tPool == NULL)
return;
int rtn; int rtn;
if( (rtn = pthread_mutex_destroy(lock)) ) if( (rtn = pthread_mutex_destroy(lock)) )
{ {
...@@ -159,6 +195,9 @@ void avtExecutionManager::MutexDestroy( MUTEX *lock, bool delMutex ) ...@@ -159,6 +195,9 @@ void avtExecutionManager::MutexDestroy( MUTEX *lock, bool delMutex )
void avtExecutionManager::MutexLock( MUTEX *lock ) void avtExecutionManager::MutexLock( MUTEX *lock )
{ {
if (tPool == NULL)
return;
int rtn; int rtn;
if( (rtn = pthread_mutex_lock(lock)) ) if( (rtn = pthread_mutex_lock(lock)) )
{ {
...@@ -168,6 +207,9 @@ void avtExecutionManager::MutexLock( MUTEX *lock ) ...@@ -168,6 +207,9 @@ void avtExecutionManager::MutexLock( MUTEX *lock )
void avtExecutionManager::MutexUnlock( MUTEX *lock ) void avtExecutionManager::MutexUnlock( MUTEX *lock )
{ {
if (tPool == NULL)
return;
int rtn; int rtn;
if( (rtn = pthread_mutex_unlock(lock)) ) if( (rtn = pthread_mutex_unlock(lock)) )
{ {
...@@ -177,6 +219,9 @@ void avtExecutionManager::MutexUnlock( MUTEX *lock ) ...@@ -177,6 +219,9 @@ void avtExecutionManager::MutexUnlock( MUTEX *lock )
void avtExecutionManager::CondInit( COND *cond ) void avtExecutionManager::CondInit( COND *cond )
{ {
if (tPool == NULL)
return;
int rtn; int rtn;
if( (rtn = pthread_cond_init(cond, NULL)) ) if( (rtn = pthread_cond_init(cond, NULL)) )
{ {
...@@ -186,6 +231,9 @@ void avtExecutionManager::CondInit( COND *cond ) ...@@ -186,6 +231,9 @@ void avtExecutionManager::CondInit( COND *cond )
void avtExecutionManager::CondDestroy( COND *cond ) void avtExecutionManager::CondDestroy( COND *cond )
{ {
if (tPool == NULL)
return;
int rtn; int rtn;
if( (rtn = pthread_cond_destroy(cond)) ) if( (rtn = pthread_cond_destroy(cond)) )
{ {
...@@ -195,6 +243,9 @@ void avtExecutionManager::CondDestroy( COND *cond ) ...@@ -195,6 +243,9 @@ void avtExecutionManager::CondDestroy( COND *cond )
void avtExecutionManager::CondSignal( COND *cond ) void avtExecutionManager::CondSignal( COND *cond )
{ {
if (tPool == NULL)
return;
int rtn; int rtn;
if( (rtn = pthread_cond_signal(cond)) ) if( (rtn = pthread_cond_signal(cond)) )
{ {
...@@ -204,6 +255,9 @@ void avtExecutionManager::CondSignal( COND *cond ) ...@@ -204,6 +255,9 @@ void avtExecutionManager::CondSignal( COND *cond )
void avtExecutionManager::CondWait( COND *cond, MUTEX *lock ) void avtExecutionManager::CondWait( COND *cond, MUTEX *lock )
{ {
if (tPool == NULL)
return;
int rtn; int rtn;
if( (rtn = pthread_cond_wait(cond, lock)) ) if( (rtn = pthread_cond_wait(cond, lock)) )
{ {
......
...@@ -39,6 +39,7 @@ ...@@ -39,6 +39,7 @@
#include <DebugStreamFull.h> #include <DebugStreamFull.h>
#include <DebugStream.h> #include <DebugStream.h>
#include <visitstream.h> #include <visitstream.h>
#include <VisItInit.h>
#if defined(_WIN32) #if defined(_WIN32)
#include <windows.h> #include <windows.h>
...@@ -57,6 +58,8 @@ using std::vector; ...@@ -57,6 +58,8 @@ using std::vector;
vector<DebugStreamFull::DebugStreamBuf*> DebugStreamFull::DebugStreamBuf::allBuffers; vector<DebugStreamFull::DebugStreamBuf*> DebugStreamFull::DebugStreamBuf::allBuffers;
int DebugStreamFull::DebugStreamBuf::curLevel; int DebugStreamFull::DebugStreamBuf::curLevel;
int DebugStreamFull::numThreadLogs = 1;
// global DebugStreams // global DebugStreams
// We make these static so they are NOT visible outside this file // We make these static so they are NOT visible outside this file
static DebugStreamFull debug1_realobj(1); static DebugStreamFull debug1_realobj(1);
...@@ -65,6 +68,14 @@ static DebugStreamFull debug3_realobj(3); ...@@ -65,6 +68,14 @@ static DebugStreamFull debug3_realobj(3);
static DebugStreamFull debug4_realobj(4); static DebugStreamFull debug4_realobj(4);
static DebugStreamFull debug5_realobj(5); static DebugStreamFull debug5_realobj(5);
bool IsThread0(void)
{
if (VisItInit::GetNumberOfThreads() <= 1)
return true;
if (VisItInit::GetMyThreadID() == 0)
return true;
return false;
}
bool DebugStream::Level1() { return debug1_realobj.isenabled(); }; bool DebugStream::Level1() { return debug1_realobj.isenabled(); };
bool DebugStream::Level2() { return debug2_realobj.isenabled(); }; bool DebugStream::Level2() { return debug2_realobj.isenabled(); };
bool DebugStream::Level3() { return debug3_realobj.isenabled(); }; bool DebugStream::Level3() { return debug3_realobj.isenabled(); };
...@@ -326,10 +337,15 @@ DebugStreamFull::DebugStreamBuf::SetLevel(int level_) ...@@ -326,10 +337,15 @@ DebugStreamFull::DebugStreamBuf::SetLevel(int level_)
void void
DebugStreamFull::DebugStreamBuf::close() DebugStreamFull::DebugStreamBuf::close()
{ {
if (out) if(out == NULL) return;
for(int i=0; i<numThreadLogs; ++i)
if (out[i])
out[i].close();
if(out)
{ {
out->close(); delete [] out;
delete out;
out = NULL; out = NULL;
} }
} }
...@@ -367,25 +383,33 @@ DebugStreamFull::DebugStreamBuf::open(const char *filename_, bool buffer_debug) ...@@ -367,25 +383,33 @@ DebugStreamFull::DebugStreamBuf::open(const char *filename_, bool buffer_debug)
{ {
close(); close();
strcpy(filename, filename_); strcpy(filename, filename_);
out = new ofstream; out = new ofstream[numThreadLogs];
out->open(filename, ios::out);
if (! *out) for(int i=0; i<numThreadLogs; ++i) {
std::stringstream fname;
fname << filename << i << ".vlog";
out[i].open(fname.str().c_str(), ios::out);
if (! out[i])
{ {
delete out; //out[i] is a value, not a pointer....
out = NULL; //delete [] out;
//out = NULL;
} }
else else
{ {
// flush the buffer after every operation // flush the buffer after every operation
if (!buffer_debug) if (!buffer_debug)
out->setf(ios::unitbuf); out[i].setf(ios::unitbuf);
#ifndef NO_SETBUF #ifndef NO_SETBUF
// the previous flag does nothing on SunOS; // the previous flag does nothing on SunOS;
// I hate to do this, but I'm doing it to force automatic flushing: // I hate to do this, but I'm doing it to force automatic flushing:
if (!buffer_debug) if (!buffer_debug)
out->rdbuf()->setbuf((char*)0,0); out[i].rdbuf()->setbuf((char*)0,0);
#endif #endif
} }
}
} }
...@@ -410,12 +434,13 @@ DebugStreamFull::DebugStreamBuf::open(const char *filename_, bool buffer_debug) ...@@ -410,12 +434,13 @@ DebugStreamFull::DebugStreamBuf::open(const char *filename_, bool buffer_debug)
int int
DebugStreamFull::DebugStreamBuf::put(int c) DebugStreamFull::DebugStreamBuf::put(int c)
{ {
if (out && int t_id = VisItInit::GetMyThreadID();
if ( t_id < numThreadLogs && out != NULL && out[t_id] &&
curLevel <= level) curLevel <= level)
{ {
if (c!=EOF) if (c!=EOF)
{ {
out->put((char)c); out[t_id].put((char)c);
} }
} }
return c; return c;
...@@ -568,10 +593,10 @@ DebugStreamFull::open(const char *progname, bool clobber, bool buffer_debug) ...@@ -568,10 +593,10 @@ DebugStreamFull::open(const char *progname, bool clobber, bool buffer_debug)
#ifdef WIN32 #ifdef WIN32
// On windows, we always use pids, so won't need to rename, and thus // On windows, we always use pids, so won't need to rename, and thus
// don't need to prepend a letter. // don't need to prepend a letter.
sprintf(filename, "%s.%d.vlog", progname, level); sprintf(filename, "%s.%d.thr", progname, level);
#else #else
sprintf(filename, "A.%s.%d.vlog", progname, level); sprintf(filename, "A.%s.%d.thr", progname, level);
// only rename old vlogs if we don't have pids // only rename old vlogs if we don't have pids
bool renameOld = !clobber && (strspn(progname, ".0123456789") == 0); bool renameOld = !clobber && (strspn(progname, ".0123456789") == 0);
...@@ -581,16 +606,57 @@ DebugStreamFull::open(const char *progname, bool clobber, bool buffer_debug) ...@@ -581,16 +606,57 @@ DebugStreamFull::open(const char *progname, bool clobber, bool buffer_debug)
{ {
char filenametmp1[256]; char filenametmp1[256];
char filenametmp2[256]; char filenametmp2[256];
sprintf(filenametmp1, "E.%s.%d.vlog", progname, level); int fileThr = 0;
sprintf(filenametmp1, "E.%s.%d.thr%d.vlog", progname, level, fileThr);
while (access( filenametmp1, F_OK) != -1 )
{
unlink(filenametmp1); // E->deleted unlink(filenametmp1); // E->deleted
sprintf(filenametmp2, "D.%s.%d.vlog", progname, level); sprintf(filenametmp1, "E.%s.%d.thr%d.vlog", progname, level, ++fileThr);
}
fileThr = 0;
sprintf(filenametmp1, "E.%s.%d.thr%d.vlog", progname, level, fileThr);
sprintf(filenametmp2, "D.%s.%d.thr%d.vlog", progname, level, fileThr);
while (access( filenametmp2, F_OK) != -1 )
{
rename(filenametmp2, filenametmp1); // D->E rename(filenametmp2, filenametmp1); // D->E
sprintf(filenametmp1, "C.%s.%d.vlog", progname, level);
rename(filenametmp1, filenametmp2); // C->D ++fileThr;
sprintf(filenametmp2, "B.%s.%d.vlog", progname, level); sprintf(filenametmp1, "E.%s.%d.thr%d.vlog", progname, level, fileThr);
sprintf(filenametmp2, "D.%s.%d.thr%d.vlog", progname, level, fileThr);
}
fileThr = 0;