Commit 9b33cadf authored by Nicolas Vuaille's avatar Nicolas Vuaille

Add option --disable-further-connections

In multi-clients mode, the master client can enable or disable the further
connections from the collaboration panel. Default is "enable".
With option --disable-further-connections, initial state is "disable".

The master client can change the connect-id of the server.

Connected clients are not disconnected when mode or connect-id is changed
in the server.
parent a509cb45
......@@ -64,6 +64,8 @@ static bool RealMain(int argc, char* argv[], vtkProcessModule::ProcessTypes type
vtkPVSessionServer* session = vtkPVSessionServer::New();
session->SetMultipleConnection(options->GetMultiClientMode() != 0);
session->SetDisableFurtherConnections(options->GetDisableFurtherConnections() != 0);
int process_id = controller->GetLocalProcessId();
if (process_id == 0)
{
......
......@@ -86,6 +86,11 @@ public:
*/
virtual bool GetPendingConnectionsPresent() = 0;
/**
* Enable/disable further connections for given port.
*/
virtual void DisableFurtherConnections(int port, bool disable) = 0;
protected:
vtkNetworkAccessManager();
~vtkNetworkAccessManager() override;
......
......@@ -54,6 +54,7 @@ vtkPVOptions::vtkPVOptions()
this->ClientMode = 0;
this->ServerMode = 0;
this->MultiClientMode = 0;
this->DisableFurtherConnections = 0;
this->MultiClientModeWithErrorMacro = 0;
this->MultiServerMode = 0;
this->RenderServerMode = 0;
......@@ -154,6 +155,11 @@ void vtkPVOptions::Initialize()
"While keeping the error macro on the server session for debug.",
vtkPVOptions::PVDATA_SERVER | vtkPVOptions::PVSERVER);
this->AddBooleanArgument("--disable-further-connections", 0, &this->DisableFurtherConnections,
"Disable further connections after the first client connects."
"Does nothing without --multi-clients enabled.",
vtkPVOptions::PVDATA_SERVER | vtkPVOptions::PVSERVER);
this->AddBooleanArgument("--multi-servers", 0, &this->MultiServerMode,
"Allow client to connect to several pvserver", vtkPVOptions::PVCLIENT);
......@@ -463,6 +469,12 @@ void vtkPVOptions::PrintSelf(ostream& os, vtkIndent indent)
{
os << indent << "Allow several clients to connect to a server.\n";
}
if (this->DisableFurtherConnections)
{
os << indent << "Disable further connections after the first client connects to a server..\n";
}
if (this->MultiServerMode)
{
os << indent << "Allow a client to connect to multiple servers at the same time.\n";
......
......@@ -135,6 +135,13 @@ public:
}
virtual int IsMultiClientModeDebug() { return this->MultiClientModeWithErrorMacro; }
//@{
/**
* Returns if this server does not allow connection after the first client.
*/
vtkGetMacro(DisableFurtherConnections, int);
//@}
//@{
/**
* Is this client allow multiple server connection in parallel
......@@ -295,6 +302,7 @@ protected:
int ClientMode;
int RenderServerMode;
int MultiClientMode;
int DisableFurtherConnections;
int MultiClientModeWithErrorMacro;
int MultiServerMode;
int SymmetricMPIMode;
......
......@@ -146,6 +146,31 @@ bool vtkTCPNetworkAccessManager::GetPendingConnectionsPresent()
return false;
}
//----------------------------------------------------------------------------
void vtkTCPNetworkAccessManager::DisableFurtherConnections(int port, bool disable)
{
if (disable)
{
if (this->Internals->ServerSockets.find(port) != this->Internals->ServerSockets.end())
{
this->Internals->ServerSockets.at(port)->CloseSocket();
this->Internals->ServerSockets.erase(port);
}
}
else
{
vtkServerSocket* server_socket = vtkServerSocket::New();
if (server_socket->CreateServer(port) != 0)
{
vtkErrorMacro("Failed to set up server socket.");
server_socket->Delete();
return;
}
this->Internals->ServerSockets[port] = server_socket;
server_socket->FastDelete();
}
}
//----------------------------------------------------------------------------
bool vtkTCPNetworkAccessManager::GetNetworkEventsAvailable()
{
......@@ -304,11 +329,12 @@ void vtkTCPNetworkAccessManager::PrintHandshakeError(int errorcode)
"**********************************************************************\n");
break;
case HANDSHAKE_DIFFERENT_CONNECTION_IDS:
vtkErrorMacro("\n"
"**********************************************************************\n"
" Connection failed during handshake. The server has a different connection id"
" than the client.\n"
"**********************************************************************\n");
vtkWarningMacro("\n"
" Wrong connect-id: \n"
" The client cannot connect to the server because their connection ids\n"
" are not the same. Change your connect-id and try again.\n");
this->InvokeEvent(vtkCommand::UserEvent);
break;
case HANDSHAKE_DIFFERENT_RENDERING_BACKENDS:
vtkErrorMacro("\n"
......
......@@ -96,6 +96,11 @@ public:
*/
bool GetPendingConnectionsPresent() VTK_OVERRIDE;
/**
* Enable/disable further connections for given port.
*/
virtual void DisableFurtherConnections(int port, bool disable) VTK_OVERRIDE;
protected:
vtkTCPNetworkAccessManager();
~vtkTCPNetworkAccessManager() override;
......
......@@ -235,6 +235,8 @@ message ClientsInformation {
optional string name = 2;
optional bool is_master = 3 [default = false];
optional bool follow_cam = 4 [default = false];
optional bool disable_further_connections = 5 [default = false];
optional uint32 connect_id = 6 [default = 0];
}
extend Message {
......
......@@ -119,6 +119,124 @@ public:
//-----------------------------------------------------------------
void SetClientURL(const char* client_url) { this->ClientURL = client_url; }
//-----------------------------------------------------------------
void SetBaseURL(const char* url) { this->BaseURL = url; }
//-----------------------------------------------------------------
std::string ComputeURL(const char* url)
{
vtkProcessModule* pm = vtkProcessModule::GetProcessModule();
vtkPVOptions* options = pm->GetOptions();
vtksys::RegularExpression pvserver("^cs://([^:]+)?(:([0-9]+))?");
vtksys::RegularExpression pvserver_reverse("^csrc://([^:]+)(:([0-9]+))?");
vtksys::RegularExpression pvrenderserver("^cdsrs://([^:]+)(:([0-9]+))?/([^:]+)(:([0-9]+))?");
vtksys::RegularExpression pvrenderserver_reverse(
"^cdsrsrc://([^:]+)?(:([0-9]+))?/([^:]+)?(:([0-9]+))?");
std::ostringstream handshake;
handshake << "handshake=paraview-" << PARAVIEW_VERSION;
// Add connect-id if needed. The connect-id is added to the handshake that
// must match on client and server processes.
if (options->GetConnectID() != 0)
{
handshake << ".connect_id." << options->GetConnectID();
}
// Add rendering backend information.
handshake << ".renderingbackend.opengl2";
// for forward connections, port number 0 is acceptable, while for
// reverse-connections it's not.
std::string client_url;
if (pvserver.find(url))
{
int port = atoi(pvserver.match(3).c_str());
port = (port < 0) ? 11111 : port;
std::ostringstream stream;
stream << "tcp://localhost:" << port << "?listen=true&" << handshake.str();
stream << ((this->Owner->MultipleConnection && !this->Owner->DisableFurtherConnections)
? "&multiple=true"
: "");
client_url = stream.str();
}
else if (pvserver_reverse.find(url))
{
std::string hostname = pvserver_reverse.match(1);
int port = atoi(pvserver_reverse.match(3).c_str());
port = (port <= 0) ? 11111 : port;
std::ostringstream stream;
stream << "tcp://" << hostname.c_str() << ":" << port << "?" << handshake.str();
client_url = stream.str();
}
else if (pvrenderserver.find(url))
{
int dsport = atoi(pvrenderserver.match(3).c_str());
dsport = (dsport < 0) ? 11111 : dsport;
int rsport = atoi(pvrenderserver.match(6).c_str());
rsport = (rsport < 0) ? 22221 : rsport;
if (vtkProcessModule::GetProcessType() == vtkProcessModule::PROCESS_RENDER_SERVER)
{
std::ostringstream stream;
stream << "tcp://localhost:" << rsport << "?listen=true&" << handshake.str();
client_url = stream.str();
}
else if (vtkProcessModule::GetProcessType() == vtkProcessModule::PROCESS_DATA_SERVER)
{
std::ostringstream stream;
stream << "tcp://localhost:" << dsport << "?listen=true&" << handshake.str();
stream << ((this->Owner->MultipleConnection && !this->Owner->DisableFurtherConnections)
? "&multiple=true"
: "");
client_url = stream.str();
}
}
else if (pvrenderserver_reverse.find(url))
{
std::string dataserverhost = pvrenderserver_reverse.match(1);
int dsport = atoi(pvrenderserver_reverse.match(3).c_str());
dsport = (dsport <= 0) ? 11111 : dsport;
std::string renderserverhost = pvrenderserver_reverse.match(4);
int rsport = atoi(pvrenderserver_reverse.match(6).c_str());
rsport = (rsport <= 0) ? 22221 : rsport;
if (vtkProcessModule::GetProcessType() == vtkProcessModule::PROCESS_RENDER_SERVER)
{
std::ostringstream stream;
stream << "tcp://" << dataserverhost.c_str() << ":" << rsport << "?" << handshake.str();
client_url = stream.str();
}
else if (vtkProcessModule::GetProcessType() == vtkProcessModule::PROCESS_DATA_SERVER)
{
std::ostringstream stream;
stream << "tcp://" << renderserverhost.c_str() << ":" << dsport << "?" << handshake.str();
client_url = stream.str();
}
}
return client_url;
}
//-----------------------------------------------------------------
void UpdateClientURL()
{
std::string url = this->ComputeURL(this->BaseURL.c_str());
this->SetClientURL(url.c_str());
}
//-----------------------------------------------------------------
void SetConnectID(int newConnectID)
{
vtkProcessModule* pm = vtkProcessModule::GetProcessModule();
vtkPVOptions* options = pm->GetOptions();
if (options->GetConnectID() != newConnectID)
{
options->SetConnectID(newConnectID);
this->UpdateClientURL();
}
}
//-----------------------------------------------------------------
void NotifyOtherClients(const vtkSMMessage* msgToBroadcast)
{
std::string data = msgToBroadcast->SerializeAsString();
......@@ -201,6 +319,7 @@ private:
vtkNew<vtkCompositeMultiProcessController> CompositeMultiProcessController;
vtkWeakPointer<vtkPVSessionServer> Owner;
std::string ClientURL;
std::string BaseURL;
std::map<vtkTypeUInt32, vtkSMMessage> ShareOnlyCache;
bool SatelliteServerSession;
};
......@@ -214,6 +333,7 @@ vtkPVSessionServer::vtkPVSessionServer()
// By default we act as a server for a single client
this->MultipleConnection = false;
this->DisableFurtherConnections = false;
// On server side only one session is available so we just set it Active()
// forever
......@@ -326,94 +446,9 @@ bool vtkPVSessionServer::Connect(const char* url)
return true;
}
this->Internal->SetBaseURL(url);
vtkNetworkAccessManager* nam = pm->GetNetworkAccessManager();
vtkPVOptions* options = pm->GetOptions();
vtksys::RegularExpression pvserver("^cs://([^:]+)?(:([0-9]+))?");
vtksys::RegularExpression pvserver_reverse("^csrc://([^:]+)(:([0-9]+))?");
vtksys::RegularExpression pvrenderserver("^cdsrs://([^:]+)(:([0-9]+))?/([^:]+)(:([0-9]+))?");
vtksys::RegularExpression pvrenderserver_reverse(
"^cdsrsrc://([^:]+)?(:([0-9]+))?/([^:]+)?(:([0-9]+))?");
std::ostringstream handshake;
handshake << "handshake=paraview-" << PARAVIEW_VERSION;
// Add connect-id if needed. The connect-id is added to the handshake that
// must match on client and server processes.
if (options->GetConnectID() != 0)
{
handshake << ".connect_id." << options->GetConnectID();
}
// Add rendering backend information.
handshake << ".renderingbackend.opengl2";
// for forward connections, port number 0 is acceptable, while for
// reverse-connections it's not.
std::string client_url;
if (pvserver.find(url))
{
int port = atoi(pvserver.match(3).c_str());
port = (port < 0) ? 11111 : port;
std::ostringstream stream;
stream << "tcp://localhost:" << port << "?listen=true&" << handshake.str();
stream << (this->MultipleConnection ? "&multiple=true" : "");
client_url = stream.str();
}
else if (pvserver_reverse.find(url))
{
std::string hostname = pvserver_reverse.match(1);
int port = atoi(pvserver_reverse.match(3).c_str());
port = (port <= 0) ? 11111 : port;
std::ostringstream stream;
stream << "tcp://" << hostname.c_str() << ":" << port << "?" << handshake.str();
client_url = stream.str();
}
else if (pvrenderserver.find(url))
{
int dsport = atoi(pvrenderserver.match(3).c_str());
dsport = (dsport < 0) ? 11111 : dsport;
int rsport = atoi(pvrenderserver.match(6).c_str());
rsport = (rsport < 0) ? 22221 : rsport;
if (vtkProcessModule::GetProcessType() == vtkProcessModule::PROCESS_RENDER_SERVER)
{
std::ostringstream stream;
stream << "tcp://localhost:" << rsport << "?listen=true&" << handshake.str();
client_url = stream.str();
}
else if (vtkProcessModule::GetProcessType() == vtkProcessModule::PROCESS_DATA_SERVER)
{
std::ostringstream stream;
stream << "tcp://localhost:" << dsport << "?listen=true&" << handshake.str();
stream << (this->MultipleConnection ? "&multiple=true" : "");
client_url = stream.str();
}
}
else if (pvrenderserver_reverse.find(url))
{
std::string dataserverhost = pvrenderserver_reverse.match(1);
int dsport = atoi(pvrenderserver_reverse.match(3).c_str());
dsport = (dsport <= 0) ? 11111 : dsport;
std::string renderserverhost = pvrenderserver_reverse.match(4);
int rsport = atoi(pvrenderserver_reverse.match(6).c_str());
rsport = (rsport <= 0) ? 22221 : rsport;
if (vtkProcessModule::GetProcessType() == vtkProcessModule::PROCESS_RENDER_SERVER)
{
std::ostringstream stream;
stream << "tcp://" << dataserverhost.c_str() << ":" << rsport << "?" << handshake.str();
client_url = stream.str();
}
else if (vtkProcessModule::GetProcessType() == vtkProcessModule::PROCESS_DATA_SERVER)
{
std::ostringstream stream;
stream << "tcp://" << renderserverhost.c_str() << ":" << dsport << "?" << handshake.str();
client_url = stream.str();
}
}
std::string client_url = this->Internal->ComputeURL(url);
vtkMultiProcessController* ccontroller = nam->NewConnection(client_url.c_str());
this->Internal->SetClientURL(client_url.c_str());
if (ccontroller)
......@@ -423,7 +458,8 @@ bool vtkPVSessionServer::Connect(const char* url)
cout << "Client connected." << endl;
}
if (this->MultipleConnection && this->Internal->GetActiveController())
if (this->MultipleConnection && !this->DisableFurtherConnections &&
this->Internal->GetActiveController())
{
// Listen for new client controller creation
nam->AddObserver(
......@@ -446,6 +482,48 @@ bool vtkPVSessionServer::GetIsAlive()
return (this->Internal->GetActiveController() != NULL);
}
//----------------------------------------------------------------------------
void vtkPVSessionServer::SetDisableFurtherConnections(bool disable)
{
if (this->DisableFurtherConnections != disable)
{
this->DisableFurtherConnections = disable;
this->Internal->UpdateClientURL();
vtkProcessModule* pm = vtkProcessModule::GetProcessModule();
vtkNetworkAccessManager* nam = pm->GetNetworkAccessManager();
vtkPVServerOptions* options = vtkPVServerOptions::SafeDownCast(pm->GetOptions());
int port = options->GetServerPort();
nam->DisableFurtherConnections(port, disable);
if (!disable)
{
nam->AddObserver(
vtkCommand::ConnectionCreatedEvent, this->Internal, &vtkInternals::CreateController);
}
else
{
nam->RemoveObservers(vtkCommand::ConnectionCreatedEvent);
}
this->Modified();
}
}
//----------------------------------------------------------------------------
void vtkPVSessionServer::SetConnectID(int newConnectID)
{
this->Internal->SetConnectID(newConnectID);
}
//----------------------------------------------------------------------------
int vtkPVSessionServer::GetConnectID()
{
vtkProcessModule* pm = vtkProcessModule::GetProcessModule();
vtkPVOptions* options = pm->GetOptions();
return options->GetConnectID();
}
//----------------------------------------------------------------------------
void vtkPVSessionServer::OnClientServerMessageRMI(void* message, int message_length)
{
......
......@@ -97,13 +97,32 @@ public:
* Enable or Disable multi-connection support.
* The MultipleConnection is only used inside the DATA_SERVER to support
* several clients to connect to it.
* By default we allow collaboration (this->MultipleConnection = true)
* By default, collaboration is not allowed (this->MultipleConnection = false)
*/
vtkBooleanMacro(MultipleConnection, bool);
vtkSetMacro(MultipleConnection, bool);
vtkGetMacro(MultipleConnection, bool);
//@}
//@{
/**
* Enable or Disable further connections in muliple connection mode.
* By default, further connections are enabled. (this->DisableFurtherConnections = false)
*/
vtkBooleanMacro(DisableFurtherConnections, bool);
vtkGetMacro(DisableFurtherConnections, bool);
void SetDisableFurtherConnections(bool disable);
//@}
//@{
/**
* Set/Get the server connect-id.
* Default is 0.
*/
void SetConnectID(int newConnectID);
int GetConnectID();
//@}
void OnClientServerMessageRMI(void* message, int message_length);
void OnCloseSessionRMI();
......@@ -135,6 +154,7 @@ protected:
vtkMPIMToNSocketConnection* MPIMToNSocketConnection;
bool MultipleConnection;
bool DisableFurtherConnections;
class vtkInternals;
vtkInternals* Internal;
......
......@@ -66,7 +66,7 @@ public:
}
}
bool UpdateUserNamesAndMaster(vtkSMMessage* msg)
bool UpdateFromUsers(vtkSMMessage* msg)
{
bool findChanges = false;
this->DisableBroadcast = true;
......@@ -81,6 +81,16 @@ public:
{
findChanges = findChanges || (this->MultiProcessController->GetMasterController() != id);
this->MultiProcessController->SetMasterController(id);
if (this->ServerSession)
{
findChanges = findChanges || (this->ServerSession->GetDisableFurtherConnections() !=
user->disable_further_connections());
this->ServerSession->SetDisableFurtherConnections(user->disable_further_connections());
unsigned int serverConnectId = this->ServerSession->GetConnectID();
findChanges = findChanges || serverConnectId != user->connect_id();
this->ServerSession->SetConnectID(user->connect_id());
}
}
}
this->DisableBroadcast = false;
......@@ -111,6 +121,11 @@ public:
{
user->set_is_master(true);
}
if (this->ServerSession)
{
user->set_disable_further_connections(this->ServerSession->GetDisableFurtherConnections());
user->set_connect_id(this->ServerSession->GetConnectID());
}
}
return &this->ServerState;
......@@ -161,7 +176,7 @@ void vtkSICollaborationManager::PrintSelf(ostream& os, vtkIndent indent)
//----------------------------------------------------------------------------
void vtkSICollaborationManager::Push(vtkSMMessage* msg)
{
if (this->Internal->UpdateUserNamesAndMaster(msg) && this->Internal->CanBroadcast())
if (this->Internal->UpdateFromUsers(msg) && this->Internal->CanBroadcast())
{
this->BroadcastToClients(this->Internal->BuildServerStateMessage());
}
......
......@@ -42,6 +42,8 @@ public:
this->ObserverTag = 0;
this->Me = 0;
this->UserToFollow = 0;
this->DisableFurtherConnections = false;
this->ConnectID = 0;
this->Clear();
}
......@@ -102,11 +104,35 @@ public:
return false;
}
bool SetDisableFurtherConnections(bool disable)
{
if (disable != this->DisableFurtherConnections)
{
this->DisableFurtherConnections = disable;
this->UpdateState(this->UserToFollow == 0 ? this->Master : this->UserToFollow);
return true;
}
return false;
}
bool UpdateConnectID(int connectID)
{
if (this->ConnectID != connectID)
{
this->ConnectID = connectID;
this->UpdateState(this->UserToFollow == 0 ? this->Master : this->UserToFollow);
return true;
}
return false;
}
void Clear()
{
this->UserNames.clear();
this->Users.clear();
this->Master = 0;
this->ConnectID = 0;
this->DisableFurtherConnections = false;
this->State.Clear();
this->PendingCameraUpdate.Clear();
this->LocalCameraStateCache.clear();
......@@ -130,6 +156,8 @@ public:
{
user->set_follow_cam(true);