Commit 3191ed75 authored by Bill Hoffman's avatar Bill Hoffman
Browse files

ENH: render server N of M to N connections

parent 29957e35
/*=========================================================================
Program: ParaView
Module: vtkMPIMToNSocketConnection.cxx
Copyright (c) Kitware, Inc.
All rights reserved.
See Copyright.txt or http://www.paraview.org/HTML/Copyright.html for details.
This software is distributed WITHOUT ANY WARRANTY; without even
the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
PURPOSE. See the above copyright notice for more information.
*/
#include "vtkMPIMToNSocketConnection.h"
#include "vtkSocketCommunicator.h"
#include "vtkMultiProcessController.h"
#include "vtkPVDuplicatePolyData.h"
#include "vtkObjectFactory.h"
#include "vtkMPIMToNSocketConnectionPortInformation.h"
#include <vtkstd/string>
#include <vtkstd/vector>
vtkCxxRevisionMacro(vtkMPIMToNSocketConnection, "1.1");
vtkStandardNewMacro(vtkMPIMToNSocketConnection);
vtkCxxSetObjectMacro(vtkMPIMToNSocketConnection,Controller, vtkMultiProcessController);
vtkCxxSetObjectMacro(vtkMPIMToNSocketConnection,SocketCommunicator, vtkSocketCommunicator);
class vtkMPIMToNSocketConnectionInternals
{
public:
struct NodeInformation
{
int PortNumber;
vtkstd::string HostName;
};
vtkstd::vector<NodeInformation> ServerInformation;
};
vtkMPIMToNSocketConnection::vtkMPIMToNSocketConnection()
{
this->HostName = 0;
this->PortNumber = -1;
this->Internals = new vtkMPIMToNSocketConnectionInternals;
this->Controller = 0;
this->SetController(vtkMultiProcessController::GetGlobalController());
this->SocketCommunicator = 0;
}
vtkMPIMToNSocketConnection::~vtkMPIMToNSocketConnection()
{
if(this->SocketCommunicator)
{
this->SocketCommunicator->CloseConnection();
this->SocketCommunicator->Delete();
}
delete [] this->HostName;
this->HostName = 0;
delete this->Internals;
this->Internals = 0;
}
void vtkMPIMToNSocketConnection::PrintSelf(ostream& os, vtkIndent indent)
{
this->Superclass::PrintSelf(os,indent);
os << indent << "NumberOfConnections: (" << this->NumberOfConnections << ")\n";
os << indent << "Controller: (" << this->Controller << ")\n";
os << indent << "SocketCommunicator: (" << this->SocketCommunicator << ")\n";
vtkIndent i2 = indent.GetNextIndent();
for(unsigned int i = 0; i < this->Internals->ServerInformation.size(); ++i)
{
os << i2 << "Server Information for Process: " << i << ":\n";
vtkIndent i3 = i2.GetNextIndent();
os << i3 << "PortNumber: " << this->Internals->ServerInformation[i].PortNumber << "\n";
os << i3 << "HostName: " << this->Internals->ServerInformation[i].HostName.c_str() << "\n";
}
}
void vtkMPIMToNSocketConnection::SetupWaitForConnection()
{
int myId = this->Controller->GetLocalProcessId();
// This should determine the host, and select a random port
// for now, just hard code localhost and port 33333 for testing
this->SetHostName("localhost");
this->PortNumber = 33333 + myId;
this->NumberOfConnections = this->Controller->GetNumberOfProcesses();
}
void vtkMPIMToNSocketConnection::WaitForConnection()
{
if(this->SocketCommunicator)
{
vtkErrorMacro("WaitForConnection called more than once");
return;
}
this->SocketCommunicator = vtkSocketCommunicator::New();
int myId = this->Controller->GetLocalProcessId();
cerr << "WaitForConnection: id :" << myId << " host: " << this->HostName << " Port:" << this->PortNumber << "\n";
this->SocketCommunicator->WaitForConnection(this->PortNumber);
int data;
this->SocketCommunicator->Receive(&data, 1, 1, 1238);
cerr << "Received Hello from process " << data << "\n";
}
void vtkMPIMToNSocketConnection::Connect()
{
if(this->SocketCommunicator)
{
vtkErrorMacro("Connect called more than once");
return;
}
this->SocketCommunicator = vtkSocketCommunicator::New();
unsigned int myId = this->Controller->GetLocalProcessId();
if(myId >= this->Internals->ServerInformation.size())
{
return;
}
cerr << "Connect: id :" << myId << " host: "
<< this->Internals->ServerInformation[myId].HostName.c_str()
<< " Port:"
<< this->Internals->ServerInformation[myId].PortNumber
<< "\n";
this->SocketCommunicator->ConnectTo((char*)this->Internals->ServerInformation[myId].HostName.c_str(),
this->Internals->ServerInformation[myId].PortNumber );
int id = static_cast<int>(myId);
this->SocketCommunicator->Send(&id, 1, 1, 1238);
}
void vtkMPIMToNSocketConnection::SetNumberOfConnections(int c)
{
this->NumberOfConnections = c;
this->Internals->ServerInformation.resize(this->NumberOfConnections);
this->Modified();
}
void vtkMPIMToNSocketConnection::SetPortInformation(unsigned int processNumber,
int port, const char* host)
{
if(processNumber >= this->Internals->ServerInformation.size())
{
vtkErrorMacro("Attempt to set port information for process larger than number of processes.\n"
<< "Max process id " << this->Internals->ServerInformation.size()
<< " attempted " << processNumber << "\n");
return;
}
this->Internals->ServerInformation[processNumber].PortNumber = port;
this->Internals->ServerInformation[processNumber].HostName = host;
}
void vtkMPIMToNSocketConnection::GetPortInformation(
vtkMPIMToNSocketConnectionPortInformation* info)
{
info->SetNumberOfConnections(this->Controller->GetNumberOfProcesses());
int myId = this->Controller->GetLocalProcessId();
// for id = 0 set the port information for process 0 in
// in the information object, this is because the gather does
// not call AddInformation for process 0
if(myId == 0)
{
info->SetPortInformation(0, this->PortNumber, this->HostName);
}
info->SetHostName(this->HostName);
info->SetProcessNumber(myId);
info->SetPortNumber(this->PortNumber);
}
/*=========================================================================
Program: ParaView
Module: vtkMPIMToNSocketConnection.h
Copyright (c) Kitware, Inc.
All rights reserved.
See Copyright.txt or http://www.paraview.org/HTML/Copyright.html for details.
This software is distributed WITHOUT ANY WARRANTY; without even
the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
PURPOSE. See the above copyright notice for more information.
=========================================================================*/
// .NAME vtkMPIMToNSocketConnection
// .SECTION Description
#ifndef __vtkMPIMToNSocketConnection_h
#define __vtkMPIMToNSocketConnection_h
#include "vtkObject.h"
class vtkMultiProcessController;
class vtkSocketCommunicator;
class vtkMPIMToNSocketConnectionPortInformation;
class vtkMPIMToNSocketConnectionInternals;
class VTK_EXPORT vtkMPIMToNSocketConnection : public vtkObject
{
public:
static vtkMPIMToNSocketConnection* New();
vtkTypeRevisionMacro(vtkMPIMToNSocketConnection,vtkObject);
void PrintSelf(ostream& os, vtkIndent indent);
// Description:
// Set the number of connections to be made.
void SetNumberOfConnections(int);
vtkGetMacro(NumberOfConnections, int);
// Description:
// Setup the wait for connection, but do not wait yet.
// This should determine the network to be used and the port to be used.
void SetupWaitForConnection();
// Description:
// SetupStartWaitForConnection must be called first. This
// method will start waiting for a connection to be made to it.
void WaitForConnection();
// Description:
// Connect to remote server.
void Connect();
// Description:
// Set up information about the remote connection.
//
void SetPortInformation(unsigned int processNumber, int portNumber, const char* hostName);
// Description:
// Return the socket communicator for this process.
vtkGetObjectMacro(SocketCommunicator, vtkSocketCommunicator);
// Description:
// Fill the port information values into the port information object.
void GetPortInformation(vtkMPIMToNSocketConnectionPortInformation*);
protected:
virtual void SetController(vtkMultiProcessController*);
virtual void SetSocketCommunicator(vtkSocketCommunicator*);
vtkMPIMToNSocketConnection();
~vtkMPIMToNSocketConnection();
private:
int PortNumber;
char* HostName;
vtkSetStringMacro(HostName);
int NumberOfConnections;
vtkMPIMToNSocketConnectionInternals* Internals;
vtkMultiProcessController *Controller;
vtkSocketCommunicator* SocketCommunicator;
vtkMPIMToNSocketConnection(const vtkMPIMToNSocketConnection&); // Not implemented
void operator=(const vtkMPIMToNSocketConnection&); // Not implemented
};
#endif
/*=========================================================================
Program: ParaView
Module: vtkMPIMToNSocketConnectionPortInformation.cxx
Copyright (c) Kitware, Inc.
All rights reserved.
See Copyright.txt or http://www.paraview.org/HTML/Copyright.html for details.
This software is distributed WITHOUT ANY WARRANTY; without even
the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
PURPOSE. See the above copyright notice for more information.
=========================================================================*/
#include "vtkMPIMToNSocketConnectionPortInformation.h"
#include "vtkClientServerStream.h"
#include "vtkObjectFactory.h"
#include "vtkMPIMToNSocketConnection.h"
#include <vtkstd/string>
#include <vtkstd/vector>
class vtkMPIMToNSocketConnectionPortInformationInternals
{
public:
struct NodeInformation
{
int PortNumber;
vtkstd::string HostName;
};
vtkstd::vector<NodeInformation> ServerInformation;
};
vtkStandardNewMacro(vtkMPIMToNSocketConnectionPortInformation);
vtkCxxRevisionMacro(vtkMPIMToNSocketConnectionPortInformation, "1.1");
//----------------------------------------------------------------------------
vtkMPIMToNSocketConnectionPortInformation::vtkMPIMToNSocketConnectionPortInformation()
{
this->Internals = new vtkMPIMToNSocketConnectionPortInformationInternals;
this->HostName = 0;
this->NumberOfConnections = 0;
this->ProcessNumber = 0;
this->PortNumber = 0;
}
//----------------------------------------------------------------------------
vtkMPIMToNSocketConnectionPortInformation::~vtkMPIMToNSocketConnectionPortInformation()
{
delete this->Internals;
this->SetHostName(0);
}
//----------------------------------------------------------------------------
void vtkMPIMToNSocketConnectionPortInformation::PrintSelf(ostream &os, vtkIndent indent)
{
this->Superclass::PrintSelf(os, indent);
os << "\n";
os << indent << "HostName: "
<< (this->HostName?this->HostName:"(none)") << "\n";
os << indent << "NumberOfConnections: " << this->NumberOfConnections << "\n";
os << indent << "ProcessNumber: " << this->ProcessNumber << "\n";
os << indent << "PortNumber: " << this->PortNumber << "\n";
vtkIndent i2 = indent.GetNextIndent();
os << indent << "All Process Information:\n";
for(unsigned int i = 0; i < this->Internals->ServerInformation.size(); ++i)
{
os << i2 << "P" << i << ": PortNumber: " << this->Internals->ServerInformation[i].PortNumber << "\n";
os << i2 << "P" << i << ": HostName: " << this->Internals->ServerInformation[i].HostName.c_str() << "\n";
}
}
//----------------------------------------------------------------------------
void vtkMPIMToNSocketConnectionPortInformation::CopyFromObject(vtkObject* obj)
{
vtkMPIMToNSocketConnection* c = vtkMPIMToNSocketConnection::SafeDownCast(obj);
if(!c)
{
vtkErrorMacro("Cannot get class name from NULL object. Or incorrect object type.");
return;
}
c->GetPortInformation(this);
}
void vtkMPIMToNSocketConnectionPortInformation::SetPortInformation(unsigned int processNumber,
int port,
const char* hostname)
{
if(this->Internals->ServerInformation.size() == 0)
{
this->Internals->ServerInformation.resize(this->NumberOfConnections);
}
this->Internals->ServerInformation[processNumber].PortNumber = port;
this->Internals->ServerInformation[processNumber].HostName = hostname;
}
//----------------------------------------------------------------------------
void vtkMPIMToNSocketConnectionPortInformation::AddInformation(vtkPVInformation* i)
{
vtkMPIMToNSocketConnectionPortInformation* info
= vtkMPIMToNSocketConnectionPortInformation::SafeDownCast(i);
if(!info)
{
vtkErrorMacro("Wrong type for AddInformation" << i);
return;
}
this->SetPortInformation(info->ProcessNumber, info->PortNumber, info->HostName);
}
//----------------------------------------------------------------------------
void
vtkMPIMToNSocketConnectionPortInformation::CopyToStream(vtkClientServerStream* css) const
{
css->Reset();
*css << vtkClientServerStream::Reply
<< this->HostName
<< this->NumberOfConnections
<< this->ProcessNumber
<< this->PortNumber
<< this->Internals->ServerInformation.size();
for(unsigned int i=0; i < this->Internals->ServerInformation.size(); ++i)
{
*css << this->Internals->ServerInformation[i].PortNumber
<< this->Internals->ServerInformation[i].HostName.c_str();
}
*css << vtkClientServerStream::End;
}
//----------------------------------------------------------------------------
void
vtkMPIMToNSocketConnectionPortInformation::CopyFromStream(const vtkClientServerStream* css)
{
const char* hostname = 0;
css->GetArgument(0, 0, &hostname);
this->SetHostName(hostname);
int i = 0;
css->GetArgument(0, 1, &i);
this->SetNumberOfConnections(i);
css->GetArgument(0, 2, &i);
this->SetProcessNumber(i);
css->GetArgument(0, 3, &i);
this->SetPortNumber(i);
int numProcesses;
css->GetArgument(0, 4, &numProcesses);
this->Internals->ServerInformation.resize(numProcesses);
int port;
int pos = 5;
for(int j =0; j < numProcesses; ++j)
{
css->GetArgument(0, pos, &port);
pos++;
css->GetArgument(0, pos, &hostname);
pos++;
this->Internals->ServerInformation[j].PortNumber = port;
this->Internals->ServerInformation[j].HostName = hostname;
}
}
int vtkMPIMToNSocketConnectionPortInformation::GetProcessPort(unsigned int processNumber)
{
if(this->Internals->ServerInformation.size() == 0 && processNumber == 0)
{
return this->PortNumber;
}
if(processNumber >= this->Internals->ServerInformation.size())
{
vtkErrorMacro("Process number greater than number of processes");
return 0;
}
return this->Internals->ServerInformation[processNumber].PortNumber;
}
const char* vtkMPIMToNSocketConnectionPortInformation::GetProcessHostName(unsigned int processNumber)
{
if(this->Internals->ServerInformation.size() == 0 && processNumber == 0)
{
return this->GetHostName();
}
if(processNumber >= this->Internals->ServerInformation.size())
{
vtkErrorMacro("Process number greater than number of processes");
return 0;
}
return this->Internals->ServerInformation[processNumber].HostName.c_str();
}
/*=========================================================================
Program: ParaView
Module: vtkMPIMToNSocketConnectionPortInformation.h
Copyright (c) Kitware, Inc.
All rights reserved.
See Copyright.txt or http://www.paraview.org/HTML/Copyright.html for details.
This software is distributed WITHOUT ANY WARRANTY; without even
the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
PURPOSE. See the above copyright notice for more information.
=========================================================================*/
// .NAME vtkMPIMToNSocketConnectionPortInformation - Holds class name
// .SECTION Description
// This information object gets the class name of the input VTK object. This
// is separate from vtkPVDataInformation because it can be determined before
// Update is called and because it operates on any VTK object.
#ifndef __vtkMPIMToNSocketConnectionPortInformation_h
#define __vtkMPIMToNSocketConnectionPortInformation_h
#include "vtkPVInformation.h"
class vtkMPIMToNSocketConnectionPortInformationInternals;
class VTK_EXPORT vtkMPIMToNSocketConnectionPortInformation : public vtkPVInformation
{
public:
static vtkMPIMToNSocketConnectionPortInformation* New();
vtkTypeRevisionMacro(vtkMPIMToNSocketConnectionPortInformation, vtkPVInformation);
void PrintSelf(ostream &os, vtkIndent indent);
// Description:
// Get class name of VTK object.
vtkGetStringMacro(HostName);
// Description:
// Set/Get the ProcessNumber
vtkSetMacro(ProcessNumber, int);
vtkGetMacro(ProcessNumber, int);
// Description:
// Set/Get the ProcessNumber
vtkSetMacro(PortNumber, int);
vtkGetMacro(PortNumber, int);
// Description:
// Set the port and host information for a specific process number.
void SetPortInformation(unsigned int processNumber, int port, const char* host);
// Description:
// Set/Get the number of connections.
vtkSetMacro(NumberOfConnections, int);
vtkGetMacro(NumberOfConnections, int);
int GetProcessPort(unsigned int processNumber);
const char* GetProcessHostName(unsigned int processNumber);
// Description:
// Transfer information about a single object into this object.
virtual void CopyFromObject(vtkObject*);
// Description:
// Merge another information object.
virtual void AddInformation(vtkPVInformation*);
// Description:
// Manage a serialized version of the information.
virtual void CopyToStream(vtkClientServerStream*) const;
virtual void CopyFromStream(const vtkClientServerStream*);
// Description:
// Set the host name.
vtkSetStringMacro(HostName);
protected:
vtkMPIMToNSocketConnectionPortInformation();
~vtkMPIMToNSocketConnectionPortInformation();
char* HostName;
int NumberOfConnections;
int ProcessNumber;
int PortNumber;
vtkMPIMToNSocketConnectionPortInformationInternals* Internals;
private:
vtkMPIMToNSocketConnectionPortInformation(const vtkMPIMToNSocketConnectionPortInformation&); // Not implemented
void operator=(const vtkMPIMToNSocketConnectionPortInformation&); // Not implemented
};
#endif
......@@ -79,6 +79,7 @@
#include "vtkGraphicsFactory.h"
#include "vtkImagingFactory.h"
#include "vtkSocketController.h"
#include "vtkMPIMToNSocketConnectionPortInformation.h"
#include "vtkPVProgressHandler.h"
......@@ -106,7 +107,7 @@
//----------------------------------------------------------------------------
vtkStandardNewMacro(vtkPVApplication);
vtkCxxRevisionMacro(vtkPVApplication, "1.258");
vtkCxxRevisionMacro(vtkPVApplication, "1.259");
vtkCxxSetObjectMacro(vtkPVApplication, RenderModule, vtkPVRenderModule);
......@@ -208,6 +209,14 @@ public:
void DisplayText(const char* t)
{
#ifdef _WIN32
// if this is a windows application, then always
// send the output to stderr. Most running paraview
// won't see the output, but if you use a terminal that supports
// the output from a windows program like rxvt then you
// can see the output in the console.
cerr << t << "\n";
#endif
if ( this->Windows && this->Windows->GetNumberOfItems() &&
this->Windows->GetLastKWWindow() )
{
......@@ -1212,6 +1221,7 @@ int vtkPVApplication::ParseCommandLineArguments(int argc, char*argv[])
//----------------------------------------------------------------------------
void vtkPVApplication::Start(int argc, char*argv[])
{
if ( ! this->ProcessModule )
{
vtkErrorMacro("No process module");
......@@ -1574,19 +1584,6 @@ void vtkPVApplication::Start(int argc, char*argv[])
}
else
{
if(this->ClientMode && this->RenderServerMode)
{
cout << "test stream object on render server\n";
vtkPVProcessModule* pm = this->GetProcessModule();
vtkClientServerID id = pm->NewStreamObject("vtkObject");
pm->GetStream() << vtkClientServerStream::Invoke
<< id
<< "DebugOn"
<< vtkClientServerStream::End;
pm->DeleteStreamObject(id);
pm->SendStreamToRenderServerRoot();
}
this->vtkKWApplication::Start(argc,argv);
}