From 01c0cec08a5227bfe91b37fe01970e60999b9e51 Mon Sep 17 00:00:00 2001
From: Yohann Bearzi <yohann.bearzi@kitware.com>
Date: Thu, 5 Oct 2023 10:59:10 -0400
Subject: [PATCH] Fixing vtkThreadedCallbackQueue deadlock

When one is calling `Wait()` on a future, the future will try to un its
priors if they are enqueued (not on hold) using a constant-time lookup
through the `InvokerIndex` member. There was a race condition with the
regular path for running tasks. The queue could try to run the prior
while the user does it as well. When this happens, the computed index in
the queue is negative (because the prior has been popped in the regular
pathe). When such occurenc happens, we abandon trying to run the prior.

Setting the invoker's status to `RUNNING` was done in `Invoke()`. It
turns out that there are cases when we want to set it befor calling
`Invoke()`. From now on, each part of the code about to invoke is
responsible for updating its status.
---
 Parallel/Core/vtkThreadedCallbackQueue.cxx | 55 ++++++++++++++++------
 Parallel/Core/vtkThreadedCallbackQueue.h   |  3 +-
 Parallel/Core/vtkThreadedCallbackQueue.txx |  9 ++--
 3 files changed, 47 insertions(+), 20 deletions(-)

diff --git a/Parallel/Core/vtkThreadedCallbackQueue.cxx b/Parallel/Core/vtkThreadedCallbackQueue.cxx
index 07dd38404cf..9c88e75123c 100644
--- a/Parallel/Core/vtkThreadedCallbackQueue.cxx
+++ b/Parallel/Core/vtkThreadedCallbackQueue.cxx
@@ -61,11 +61,14 @@ private:
     SharedFutureBasePointer invoker = std::move(invokerQueue.front());
     invokerQueue.pop_front();
 
+    // invoker was in the queue which is locked. No one will be touching Status, so we do not need
+    // to lock invoker->Mutex
+    invoker->Status = RUNNING;
+
     this->Queue->PopFrontNullptr();
     lock.unlock();
 
-    std::unique_lock<std::mutex> stateLock(invoker->Mutex);
-    this->Queue->Invoke(std::move(invoker), stateLock);
+    this->Queue->Invoke(std::move(invoker));
 
     return true;
   }
@@ -201,11 +204,9 @@ void vtkThreadedCallbackQueue::PopFrontNullptr()
 }
 
 //-----------------------------------------------------------------------------
-void vtkThreadedCallbackQueue::Invoke(
-  vtkSharedFutureBase* invoker, std::unique_lock<std::mutex>& lock)
+void vtkThreadedCallbackQueue::Invoke(vtkSharedFutureBase* invoker)
 {
   invoker->Status = RUNNING;
-  lock.unlock();
   (*invoker)();
   this->SignalDependentSharedFutures(invoker);
 }
@@ -221,6 +222,7 @@ void vtkThreadedCallbackQueue::SignalDependentSharedFutures(vtkSharedFutureBase*
     // this container. At this point we're "ready" anyway so no dependents should be waiting in most
     // cases.
     std::lock_guard<std::mutex> lock(invoker->Mutex);
+
     for (auto& dependent : invoker->Dependents)
     {
       // We're locking the dependent future. When the lock is released, either the future is not
@@ -234,7 +236,9 @@ void vtkThreadedCallbackQueue::SignalDependentSharedFutures(vtkSharedFutureBase*
         // Invoker is high priority if it comes from vtkThreadedCallbackQueue::Wait for example.
         if (dependent->IsHighPriority)
         {
-          this->Invoke(dependent, dependentLock);
+          dependent->Status = RUNNING;
+          dependentLock.unlock();
+          this->Invoke(dependent);
         }
         else
         {
@@ -251,8 +255,8 @@ void vtkThreadedCallbackQueue::SignalDependentSharedFutures(vtkSharedFutureBase*
     // We need to handle the invoker index.
     // If the InvokerQueue is empty, then we set it such that after this look, the front has index
     // 0.
-    std::size_t index = this->InvokerQueue.empty() ? invokersToLaunch.size()
-                                                   : this->InvokerQueue.front()->InvokerIndex;
+    vtkIdType index = this->InvokerQueue.empty() ? static_cast<vtkIdType>(invokersToLaunch.size())
+                                                 : this->InvokerQueue.front()->InvokerIndex;
     for (SharedFutureBasePointer& inv : invokersToLaunch)
     {
       assert(inv->Status == ON_HOLD && "Status should be ON_HOLD");
@@ -275,26 +279,48 @@ void vtkThreadedCallbackQueue::SignalDependentSharedFutures(vtkSharedFutureBase*
 //-----------------------------------------------------------------------------
 bool vtkThreadedCallbackQueue::TryInvoke(vtkSharedFutureBase* invoker)
 {
-  std::unique_lock<std::mutex> invokerLock(invoker->Mutex);
   if (![this, &invoker] {
         // We need to check again if we cannot run in case the thread worker just popped this
         // invoker. We are guarded by this->Mutex so there cannot be a conflict here.
         if (invoker->Status != ENQUEUED)
         {
           // Someone picked up the invoker right before us, we can abort.
-          return SharedFutureBasePointer(nullptr);
+          return false;
         }
 
         std::lock_guard<std::mutex> lock(this->Mutex);
 
         if (this->InvokerQueue.empty())
         {
-          return SharedFutureBasePointer(nullptr);
+          return false;
+        }
+
+        std::lock_guard<std::mutex> invLock(invoker->Mutex);
+
+        if (invoker->Status != ENQUEUED)
+        {
+          // Recheck if someone picked up the invoker right before us.
+          return false;
         }
 
         // There has to be a front if we are here.
         vtkIdType index = invoker->InvokerIndex - this->InvokerQueue.front()->InvokerIndex;
-        SharedFutureBasePointer result = std::move(this->InvokerQueue[index]);
+
+        // When index is negative, it means that the invoker we want to run is already being
+        // handled by the "normal" path of the queue. The invoker is locked by a mutex, we must
+        // release it.
+        if (index < 0)
+        {
+          return false;
+        }
+
+        SharedFutureBasePointer& result = this->InvokerQueue[index];
+
+        // Someone has reinserted in the front another invoker. invoker is already running.
+        if (result != invoker)
+        {
+          return false;
+        }
 
         // If we just picked the front invoker, let's pop the queue.
         if (index == 0)
@@ -302,13 +328,14 @@ bool vtkThreadedCallbackQueue::TryInvoke(vtkSharedFutureBase* invoker)
           this->InvokerQueue.pop_front();
           this->PopFrontNullptr();
         }
-        return result;
+        invoker->Status.store(RUNNING, std::memory_order_release);
+        return true;
       }())
   {
     return false;
   }
 
-  this->Invoke(invoker, invokerLock);
+  this->Invoke(invoker);
   return true;
 }
 
diff --git a/Parallel/Core/vtkThreadedCallbackQueue.h b/Parallel/Core/vtkThreadedCallbackQueue.h
index 70d032a0aad..1e13ff63f0e 100644
--- a/Parallel/Core/vtkThreadedCallbackQueue.h
+++ b/Parallel/Core/vtkThreadedCallbackQueue.h
@@ -423,9 +423,8 @@ private:
 
   /**
    * This function should always be used to invoke.
-   * lock should be locked upon calling this function.
    */
-  void Invoke(vtkSharedFutureBase* invoker, std::unique_lock<std::mutex>& lock);
+  void Invoke(vtkSharedFutureBase* invoker);
 
   /**
    * This will try to invoke the invoker owning a reference of `state`. The invoker will be ran if
diff --git a/Parallel/Core/vtkThreadedCallbackQueue.txx b/Parallel/Core/vtkThreadedCallbackQueue.txx
index b532cae391f..506921f5218 100644
--- a/Parallel/Core/vtkThreadedCallbackQueue.txx
+++ b/Parallel/Core/vtkThreadedCallbackQueue.txx
@@ -468,7 +468,9 @@ void vtkThreadedCallbackQueue::HandleDependentInvoker(
   }
   else
   {
-    this->Invoke(std::forward<InvokerT>(invoker), lock);
+    invoker->Status = RUNNING;
+    lock.unlock();
+    this->Invoke(std::forward<InvokerT>(invoker));
   }
 }
 
@@ -658,9 +660,8 @@ vtkThreadedCallbackQueue::Push(FT&& f, ArgsT&&... args)
 
   {
     std::lock_guard<std::mutex> lock(this->Mutex);
-    invoker->InvokerIndex = this->InvokerQueue.empty() ? 0
-                                                       : this->InvokerQueue.front()->InvokerIndex +
-        static_cast<vtkIdType>(this->InvokerQueue.size());
+    invoker->InvokerIndex =
+      this->InvokerQueue.empty() ? 0 : this->InvokerQueue.back()->InvokerIndex + 1;
     this->InvokerQueue.emplace_back(invoker);
   }
 
-- 
GitLab