From 9d81e15cc30e8f815072e8577014ceb9aff0695f Mon Sep 17 00:00:00 2001
From: Chris Harris <chris.harris@kitware.com>
Date: Thu, 13 Mar 2014 17:12:11 -0400
Subject: [PATCH] vtkWeb: Teach launcher.py to wait for ready_line

Rather than just waiting the full timeout check the output of the
process for the ready_line ( which is configured in launcher.json ).
This should resolve timing issues related to server websockets not
being ready to receive client connections. Also in error paths rather
than return status code 200 (ok) return a reasonable error code.

Change-Id: I3cd99ab5758184002500e3b9adc6208d0fd736e0
---
 Web/Python/launcher.py | 114 ++++++++++++++++++++++++++++++++++++-----
 1 file changed, 101 insertions(+), 13 deletions(-)

diff --git a/Web/Python/launcher.py b/Web/Python/launcher.py
index 9d3916d98ba..738b79da470 100755
--- a/Web/Python/launcher.py
+++ b/Web/Python/launcher.py
@@ -13,6 +13,7 @@ from random import choice
 
 from twisted.internet import reactor, defer
 from twisted.internet.task import deferLater
+from twisted.internet.defer import CancelledError
 from twisted.python import log
 from twisted.web import server, resource, http
 from twisted.web.resource import Resource
@@ -282,11 +283,14 @@ class ProcessManager(object):
         for id in self.processes:
             self.processes[id].terminate()
 
-    def startProcess(self, session, ready_callback=None):
+    def _getLogFilePath(self, id):
+        return "%s%s%s.txt" % (self.log_dir, os.sep, id)
+
+    def startProcess(self, session):
         proc = None
 
         # Create output log file
-        logFilePath = self.log_dir + os.sep + str(session['id']) + ".txt"
+        logFilePath = self._getLogFilePath(session['id'])
         with open(logFilePath, "a+") as log_file:
             try:
                 proc = subprocess.Popen(session['cmd'], stdout=log_file, stderr=log_file)
@@ -316,6 +320,38 @@ class ProcessManager(object):
     def isRunning(self, id):
         return self.processes[id].poll() is None
 
+    # ========================================================================
+    # Look for ready line in process output. Return True if found, False
+    # otherwise. If no ready_line is configured and process is running return
+    # True.
+    # ========================================================================
+
+    def isReady(self, session):
+      id = session['id']
+
+      # The process has to be running to be ready!
+      if not self.isRunning(id):
+          return False
+
+      application = self.config['apps'][session['application']]
+      ready_line  = application.get('ready_line', None)
+
+      # If no ready_line is configured and the process is running then thats
+      # enough.
+      if not ready_line:
+          return True
+
+      ready = False
+
+      # Check the output for ready_line
+      logFilePath = self._getLogFilePath(session['id'])
+      with open(logFilePath, "r") as log_file:
+          for line in log_file.readlines():
+              if ready_line in line:
+                  ready = True
+                  break
+
+      return ready
 
 # ===========================================================================
 # Class to implement requests to POST, GET and DELETE methods
@@ -361,29 +397,81 @@ class LauncherResource(resource.Resource, object):
         if not session:
             return json.dumps({"error": "All the resources are currently taken"})
 
-        # Create response callback
-        d = deferLater(reactor, self.time_to_wait, lambda: request)
-        d.addCallback(self._delayedRender, session_id=session['id'])
-
         # Start process
-        proc = self.process_manager.startProcess(session, d)
+        proc = self.process_manager.startProcess(session)
 
         if not proc:
-            request.setResponseCode(http.OK)
+            request.setResponseCode(http.SERVICE_UNAVAILABLE)
             return json.dumps({"error": "The process did not properly start. %s" % str(session['cmd'])})
 
+        # local function to act as errback for Deferred objects.
+        def errback(error):
+            # Filter out CancelledError and propagate rest
+            if error.type != CancelledError:
+                return error
+
+        # Deferred object set to timeout request if process doesn't start in time
+        timeout_deferred = deferLater(reactor, self.time_to_wait, lambda: request)
+        timeout_deferred.addCallback(self._delayedRenderTimeout, session)
+        timeout_deferred.addErrback(errback)
+        # Make sure other deferred is canceled once one has been fired
+        request.notifyFinish().addCallback(lambda x: timeout_deferred.cancel())
+
+        # If a ready_line is configured create a Deferred object to wait for
+        # ready line to be produced
+        if 'ready_line' in self._config['apps'][session['application']]:
+          ready_deferred = self._waitForReady(session, request)
+          ready_deferred.addCallback(self._delayedRenderReady, session)
+          ready_deferred.addErrback(errback)
+          # Make sure other deferred is canceled once one has been fired
+          request.notifyFinish().addCallback(lambda x: ready_deferred.cancel())
+
         return NOT_DONE_YET
 
-    def _delayedRender(self, request, session_id=None):
-        session = self.session_manager.getSession(session_id)
-        running = self.process_manager.isRunning(session_id)
 
-        if session and running:
+    # ========================================================================
+    # Wait for session to be ready. Rather than blocking keep using callLater(...)
+    # to schedule self in reactor. Return a Deferred object whose callback will
+    # be triggered when the session is ready
+    # ========================================================================
+
+    def _waitForReady(self, session, request, d=None):
+        if not d:
+            d = defer.Deferred()
+
+        if not self.process_manager.isReady(session):
+            reactor.callLater(1, self._waitForReady, session, request, d)
+        else:
+            d.callback(request)
+
+        return d
+
+    # ========================================================================
+    # Called when the timeout out expires. Check if process is now ready
+    # and send response to client.
+    # ========================================================================
+
+    def _delayedRenderTimeout(self, request, session):
+        ready = self.process_manager.isReady(session)
+
+        if ready:
             request.write(json.dumps(filterResponse(session, self.field_filter)))
+            request.setResponseCode(http.OK)
         else:
-            request.write(json.dumps({"error":"Session did not properly started"}))
+            request.write(json.dumps({"error": "Session did not start before timeout expired. Check session logs."}))
+            request.setResponseCode(http.SERVICE_UNAVAILABLE)
+
+        request.finish()
 
+    # ========================================================================
+    # Called when the process is ready ( the ready line has been read from the
+    # process output).
+    # ========================================================================
+
+    def _delayedRenderReady(self, request, session):
+        request.write(json.dumps(filterResponse(session, self.field_filter)))
         request.setResponseCode(http.OK)
+
         request.finish()
 
     # =========================================================================
-- 
GitLab