MultiDeviceGradient.hxx 8.33 KB
Newer Older
1 2 3 4
//============================================================================
//  Copyright (c) Kitware, Inc.
//  All rights reserved.
//  See LICENSE.txt for details.
5
//
6 7 8 9 10 11 12
//  This software is distributed WITHOUT ANY WARRANTY; without even
//  the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
//  PURPOSE.  See the above copyright notice for more information.
//============================================================================

#include <vtkm/cont/RuntimeDeviceTracker.h>
#include <vtkm/cont/cuda/DeviceAdapterCuda.h>
13
#include <vtkm/cont/openmp/DeviceAdapterOpenMP.h>
14 15 16 17 18 19 20
#include <vtkm/cont/tbb/DeviceAdapterTBB.h>

#include <vtkm/filter/Gradient.h>


namespace
{
21
int determine_cuda_gpu_count()
22
{
23
  int count = 0;
24 25 26 27 28
#if defined(VTKM_ENABLE_CUDA)
  int numberOfDevices = 0;
  auto res = cudaGetDeviceCount(&numberOfDevices);
  if (res == cudaSuccess)
  {
29
    count = numberOfDevices;
30 31 32 33 34 35 36 37 38
  }
#endif
  return count;
}

void process_block_tbb(RuntimeTaskQueue& queue)
{
  //Step 1. Set the device adapter to this thread to TBB.
  //This makes sure that any vtkm::filters used by our
39 40
  //task operate only on TBB. The "global" thread tracker
  //is actually thread-local, so we can use that.
41
  //
42
  vtkm::cont::GetRuntimeDeviceTracker().ForceDevice(vtkm::cont::DeviceAdapterTagTBB{});
43 44 45 46 47 48 49 50 51 52 53

  while (queue.hasTasks())
  {
    //Step 2. Get the task to run on TBB
    auto task = queue.pop();

    //Step 3. Run the task on TBB. We check the validity
    //of the task since we could be given an empty task
    //when the queue is empty and we are shutting down
    if (task != nullptr)
    {
54
      task();
55 56 57 58 59 60 61 62
    }

    //Step 4. Notify the queue that we finished processing this task
    queue.completedTask();
    std::cout << "finished a block on tbb (" << std::this_thread::get_id() << ")" << std::endl;
  }
}

63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90
void process_block_openMP(RuntimeTaskQueue& queue)
{
  //Step 1. Set the device adapter to this thread to TBB.
  //This makes sure that any vtkm::filters used by our
  //task operate only on TBB. The "global" thread tracker
  //is actually thread-local, so we can use that.
  //
  vtkm::cont::GetRuntimeDeviceTracker().ForceDevice(vtkm::cont::DeviceAdapterTagOpenMP{});

  while (queue.hasTasks())
  {
    //Step 2. Get the task to run on TBB
    auto task = queue.pop();

    //Step 3. Run the task on TBB. We check the validity
    //of the task since we could be given an empty task
    //when the queue is empty and we are shutting down
    if (task != nullptr)
    {
      task();
    }

    //Step 4. Notify the queue that we finished processing this task
    queue.completedTask();
    std::cout << "finished a block on tbb (" << std::this_thread::get_id() << ")" << std::endl;
  }
}

91 92 93 94
void process_block_cuda(RuntimeTaskQueue& queue, int gpuId)
{
  //Step 1. Set the device adapter to this thread to cuda.
  //This makes sure that any vtkm::filters used by our
95 96
  //task operate only on cuda.  The "global" thread tracker
  //is actually thread-local, so we can use that.
97
  //
98
  vtkm::cont::GetRuntimeDeviceTracker().ForceDevice(vtkm::cont::DeviceAdapterTagCuda{});
99 100 101 102 103 104 105
  (void)gpuId;

  while (queue.hasTasks())
  {
    //Step 2. Get the task to run on cuda
    auto task = queue.pop();

106
    //Step 3. Run the task on cuda. We check the validity
107 108 109 110
    //of the task since we could be given an empty task
    //when the queue is empty and we are shutting down
    if (task != nullptr)
    {
111
      task();
112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128
    }

    //Step 4. Notify the queue that we finished processing this task
    queue.completedTask();
    std::cout << "finished a block on cuda (" << std::this_thread::get_id() << ")" << std::endl;
  }
}

} //namespace

//-----------------------------------------------------------------------------
VTKM_CONT MultiDeviceGradient::MultiDeviceGradient()
  : ComputePointGradient(false)
  , Queue()
  , Workers()
{
  //Step 1. Determine the number of workers we want
129
  auto& tracker = vtkm::cont::GetRuntimeDeviceTracker();
130
  const bool runOnCuda = tracker.CanRunOn(vtkm::cont::DeviceAdapterTagCuda{});
131 132
  const bool runOnOpenMP = tracker.CanRunOn(vtkm::cont::DeviceAdapterTagOpenMP{});
  const bool runOnTbb = tracker.CanRunOn(vtkm::cont::DeviceAdapterTagTBB{});
133 134

  //Note currently the virtual implementation has some issues
luz.paz's avatar
luz.paz committed
135
  //In a multi-threaded environment only cuda can be used or
136 137 138 139 140 141 142 143 144 145
  //all SMP backends ( Serial, TBB, OpenMP ).
  //Once this issue is resolved we can enable CUDA + TBB in
  //this example

  //Step 2. Launch workers that will use cuda (if enabled).
  //The threads share a queue object so we need to explicitly pass it
  //by reference (the std::ref call)
  if (runOnCuda)
  {
    std::cout << "adding cuda workers" << std::endl;
146 147
    const int gpu_count = determine_cuda_gpu_count();
    for (int i = 0; i < gpu_count; ++i)
148 149 150 151
    {
      //The number of workers per GPU is purely arbitrary currently,
      //but in general we want multiple of them so we can overlap compute
      //and transfer
152 153 154 155
      this->Workers.emplace_back(std::bind(process_block_cuda, std::ref(this->Queue), i));
      this->Workers.emplace_back(std::bind(process_block_cuda, std::ref(this->Queue), i));
      this->Workers.emplace_back(std::bind(process_block_cuda, std::ref(this->Queue), i));
      this->Workers.emplace_back(std::bind(process_block_cuda, std::ref(this->Queue), i));
156 157
    }
  }
158 159 160 161 162 163 164 165 166
  //Step 3. Launch a worker that will use openMP (if enabled).
  //The threads share a queue object so we need to explicitly pass it
  //by reference (the std::ref call)
  else if (runOnOpenMP)
  {
    std::cout << "adding a openMP worker" << std::endl;
    this->Workers.emplace_back(std::bind(process_block_openMP, std::ref(this->Queue)));
  }
  //Step 4. Launch a worker that will use tbb (if enabled).
167 168 169 170 171
  //The threads share a queue object so we need to explicitly pass it
  //by reference (the std::ref call)
  else if (runOnTbb)
  {
    std::cout << "adding a tbb worker" << std::endl;
172
    this->Workers.emplace_back(std::bind(process_block_tbb, std::ref(this->Queue)));
173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201
  }
}

//-----------------------------------------------------------------------------
VTKM_CONT MultiDeviceGradient::~MultiDeviceGradient()
{
  this->Queue.shutdown();

  //shutdown all workers
  for (auto&& thread : this->Workers)
  {
    thread.join();
  }
}

//-----------------------------------------------------------------------------
template <typename DerivedPolicy>
inline VTKM_CONT vtkm::cont::MultiBlock MultiDeviceGradient::PrepareForExecution(
  const vtkm::cont::MultiBlock& mb,
  const vtkm::filter::PolicyBase<DerivedPolicy>& policy)
{
  //Step 1. Say that we have no more to submit for this multi block
  //This is needed to happen for each execute as we want to support
  //the same filter being used for multiple inputs
  this->Queue.reset();

  //Step 2. Construct the multi-block we are going to fill. The size signature
  //to MultiBlock just reserves size
  vtkm::cont::MultiBlock output;
202
  output.AddBlocks(std::vector<vtkm::cont::DataSet>(static_cast<size_t>(mb.GetNumberOfBlocks())));
203 204 205 206 207 208 209 210 211 212 213 214 215 216 217
  vtkm::cont::MultiBlock* outPtr = &output;


  //Step 3. Construct the filter we want to run on each block
  vtkm::filter::Gradient gradient;
  gradient.SetComputePointGradient(this->GetComputePointGradient());
  gradient.SetActiveField(this->GetActiveFieldName());

  //Step 3b. Post 1 block up as work and block intil it is
  //complete. This is needed as currently constructing the virtual
  //Point Coordinates is not thread safe.
  auto block = mb.cbegin();
  {
    vtkm::cont::DataSet input = *block;
    this->Queue.push( //build a lambda that is the work to do
218
      [=]() {
219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236
        vtkm::filter::Gradient perThreadGrad = gradient;

        vtkm::cont::DataSet result = perThreadGrad.Execute(input, policy);
        outPtr->ReplaceBlock(0, result);
      });
    this->Queue.waitForAllTasksToComplete();
    block++;
  }

  vtkm::Id index = 1;
  for (; block != mb.cend(); ++block)
  {
    vtkm::cont::DataSet input = *block;
    //Step 4. For each input block construct a lambda
    //and add it to the queue for workers to take. This
    //will allows us to have multiple works execute in a non
    //blocking manner
    this->Queue.push( //build a lambda that is the work to do
237
      [=]() {
238 239 240 241 242 243 244 245 246 247 248 249 250
        vtkm::filter::Gradient perThreadGrad = gradient;

        vtkm::cont::DataSet result = perThreadGrad.Execute(input, policy);
        outPtr->ReplaceBlock(index, result);
      });
    index++;
  }

  // Step 5. Wait on all workers to finish
  this->Queue.waitForAllTasksToComplete();

  return output;
}