Skip to content
Snippets Groups Projects
Commit 67ee0782 authored by Sebastien Jourdain's avatar Sebastien Jourdain Committed by Code Review
Browse files

Merge topic 'vtkweb-launcher-ready-fix' into master

1d12532f vtkWeb: Add appropriate error codes for launch requests
9d81e15c vtkWeb: Teach launcher.py to wait for ready_line
parents 12e77524 1d12532f
No related branches found
No related tags found
No related merge requests found
......@@ -13,6 +13,7 @@ from random import choice
from twisted.internet import reactor, defer
from twisted.internet.task import deferLater
from twisted.internet.defer import CancelledError
from twisted.python import log
from twisted.web import server, resource, http
from twisted.web.resource import Resource
......@@ -282,11 +283,14 @@ class ProcessManager(object):
for id in self.processes:
self.processes[id].terminate()
def startProcess(self, session, ready_callback=None):
def _getLogFilePath(self, id):
return "%s%s%s.txt" % (self.log_dir, os.sep, id)
def startProcess(self, session):
proc = None
# Create output log file
logFilePath = self.log_dir + os.sep + str(session['id']) + ".txt"
logFilePath = self._getLogFilePath(session['id'])
with open(logFilePath, "a+") as log_file:
try:
proc = subprocess.Popen(session['cmd'], stdout=log_file, stderr=log_file)
......@@ -316,6 +320,38 @@ class ProcessManager(object):
def isRunning(self, id):
return self.processes[id].poll() is None
# ========================================================================
# Look for ready line in process output. Return True if found, False
# otherwise. If no ready_line is configured and process is running return
# True.
# ========================================================================
def isReady(self, session):
id = session['id']
# The process has to be running to be ready!
if not self.isRunning(id):
return False
application = self.config['apps'][session['application']]
ready_line = application.get('ready_line', None)
# If no ready_line is configured and the process is running then thats
# enough.
if not ready_line:
return True
ready = False
# Check the output for ready_line
logFilePath = self._getLogFilePath(session['id'])
with open(logFilePath, "r") as log_file:
for line in log_file.readlines():
if ready_line in line:
ready = True
break
return ready
# ===========================================================================
# Class to implement requests to POST, GET and DELETE methods
......@@ -346,6 +382,7 @@ class LauncherResource(resource.Resource, object):
# Make sure the request has all the expected keys
if not validateKeySet(payload, ["application"], "Launch request"):
request.setResponseCode(http.BAD_REQUEST)
return json.dumps({"error": "The request is not complete"})
# Try to free any available resource
......@@ -359,31 +396,84 @@ class LauncherResource(resource.Resource, object):
# No resource available
if not session:
request.setResponseCode(http.SERVICE_UNAVAILABLE)
return json.dumps({"error": "All the resources are currently taken"})
# Create response callback
d = deferLater(reactor, self.time_to_wait, lambda: request)
d.addCallback(self._delayedRender, session_id=session['id'])
# Start process
proc = self.process_manager.startProcess(session, d)
proc = self.process_manager.startProcess(session)
if not proc:
request.setResponseCode(http.OK)
request.setResponseCode(http.SERVICE_UNAVAILABLE)
return json.dumps({"error": "The process did not properly start. %s" % str(session['cmd'])})
# local function to act as errback for Deferred objects.
def errback(error):
# Filter out CancelledError and propagate rest
if error.type != CancelledError:
return error
# Deferred object set to timeout request if process doesn't start in time
timeout_deferred = deferLater(reactor, self.time_to_wait, lambda: request)
timeout_deferred.addCallback(self._delayedRenderTimeout, session)
timeout_deferred.addErrback(errback)
# Make sure other deferred is canceled once one has been fired
request.notifyFinish().addCallback(lambda x: timeout_deferred.cancel())
# If a ready_line is configured create a Deferred object to wait for
# ready line to be produced
if 'ready_line' in self._config['apps'][session['application']]:
ready_deferred = self._waitForReady(session, request)
ready_deferred.addCallback(self._delayedRenderReady, session)
ready_deferred.addErrback(errback)
# Make sure other deferred is canceled once one has been fired
request.notifyFinish().addCallback(lambda x: ready_deferred.cancel())
return NOT_DONE_YET
def _delayedRender(self, request, session_id=None):
session = self.session_manager.getSession(session_id)
running = self.process_manager.isRunning(session_id)
if session and running:
# ========================================================================
# Wait for session to be ready. Rather than blocking keep using callLater(...)
# to schedule self in reactor. Return a Deferred object whose callback will
# be triggered when the session is ready
# ========================================================================
def _waitForReady(self, session, request, d=None):
if not d:
d = defer.Deferred()
if not self.process_manager.isReady(session):
reactor.callLater(1, self._waitForReady, session, request, d)
else:
d.callback(request)
return d
# ========================================================================
# Called when the timeout out expires. Check if process is now ready
# and send response to client.
# ========================================================================
def _delayedRenderTimeout(self, request, session):
ready = self.process_manager.isReady(session)
if ready:
request.write(json.dumps(filterResponse(session, self.field_filter)))
request.setResponseCode(http.OK)
else:
request.write(json.dumps({"error":"Session did not properly started"}))
request.write(json.dumps({"error": "Session did not start before timeout expired. Check session logs."}))
request.setResponseCode(http.SERVICE_UNAVAILABLE)
request.finish()
# ========================================================================
# Called when the process is ready ( the ready line has been read from the
# process output).
# ========================================================================
def _delayedRenderReady(self, request, session):
request.write(json.dumps(filterResponse(session, self.field_filter)))
request.setResponseCode(http.OK)
request.finish()
# =========================================================================
......@@ -396,6 +486,7 @@ class LauncherResource(resource.Resource, object):
if not id:
message = "id not provided in GET request"
logging.error(message)
request.setResponseCode(http.BAD_REQUEST)
return json.dumps({"error":message})
logging.info("GET request received for id: %s" % id)
......@@ -404,6 +495,7 @@ class LauncherResource(resource.Resource, object):
if not session:
message = "No session with id: %s" % id
logging.error(message)
request.setResponseCode(http.NOT_FOUND)
return json.dumps({"error":message})
# Return session meta-data
......@@ -420,6 +512,7 @@ class LauncherResource(resource.Resource, object):
if not id:
message = "id not provided in DELETE request"
logging.error(message)
request.setResponseCode(http.BAD_REQUEST)
return json.dumps({"error":message})
logging.info("DELETE request received for id: %s" % id)
......@@ -428,6 +521,7 @@ class LauncherResource(resource.Resource, object):
if not session:
message = "No session with id: %s" % id
logging.error(message)
request.setResponseCode(http.NOT_FOUND)
return json.dumps({"error":message})
# Remove session
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment