Commit 14b56b82 authored by Sebastien Jourdain's avatar Sebastien Jourdain

Add ThreadedImageWriter

This writer support writting an image data onto a drive in a threaded manner so the encoding and IO time does not lock the application.
The supported extension/format are: .Z, .png, .jpg, .jpeg, .bmp, .ppm, .tif, .tiff, .vti
parent 853b3dbc
set(Module_SRCS
vtkThreadedImageWriter.cxx
)
vtk_module_library(vtkIOAsynchronous ${Module_SRCS})
vtk_add_test_python(
TestThreadedWriter.py,NO_VALID
)
#!/usr/bin/env python
import sys
import os
import time
import vtk
from vtk.util.misc import vtkGetTempDir
VTK_TEMP_DIR = vtkGetTempDir()
# Write all file types: tif, tiff, bpm, png, jpg, jpeg, vti, Z, ppm, raw
fileNames = [
'threaded-writer.vti',
# 'threaded-writer.ppm', # Only for unsigned char
'threaded-writer.Z',
# 'threaded-writer.jpg', # Only for unsigned char
# 'threaded-writer.png', # Only for unsigned char
# 'threaded-writer.bpm', # Only for unsigned char
# 'threaded-writer.tif', # Only for unsigned char
'threaded-writer.raw'
]
# Generate Data
source = vtk.vtkRTAnalyticSource()
source.Update()
image = source.GetOutput()
# Initialize writer
writer = vtk.vtkThreadedImageWriter()
# Reduce the number of worker threads
writer.SetMaxThreads(2)
writer.Initialize()
# Write all files
t0 = time.time()
for i in range(10):
for fileName in fileNames:
filePath = '%s/%s-%s' % (VTK_TEMP_DIR, i, fileName)
print('write:', filePath)
writer.EncodeAndWrite(image, filePath)
t1 = time.time()
# Wait for the work to be done
writer.Finalize()
t2 = time.time()
print('Write time', t1 - t0)
print('Wait time', t2 - t1)
print('Total time', t2 - t0)
if t1 - t0 > (t2 - t0) / 5.0: # less than 1/5 of the total time
print('Calling Write should be like a NoOp and therefore should be fast')
sys.exit(1)
# Validate data checksum
# ...TODO
print("All good...")
sys.exit(0)
vtk_module(vtkIOAsynchronous
GROUPS
StandAlone
TEST_DEPENDS
vtkTestingCore
KIT
vtkIO
DEPENDS
vtkCommonCore
vtkCommonExecutionModel
vtkIOCore
vtkIOImage
vtkIOXML
PRIVATE_DEPENDS
vtkCommonDataModel
vtkCommonMath
vtkCommonMisc
vtkCommonSystem
vtkjpeg
vtkpng
vtksys
vtktiff
vtkzlib
)
/*=========================================================================
Program: Visualization Toolkit
Module: vtkThreadedImageWriter.cxx
Copyright (c) Ken Martin, Will Schroeder, Bill Lorensen
All rights reserved.
See Copyright.txt or http://www.kitware.com/Copyright.htm for details.
This software is distributed WITHOUT ANY WARRANTY; without even
the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
PURPOSE. See the above copyright notice for more information.
=========================================================================*/
#include "vtkThreadedImageWriter.h"
#include "vtkBMPWriter.h"
#include "vtkCommand.h"
#include "vtkConditionVariable.h"
#include "vtkFloatArray.h"
#include "vtkImageData.h"
#include "vtkJPEGWriter.h"
#include "vtkMultiThreader.h"
#include "vtkMutexLock.h"
#include "vtkNew.h"
#include "vtkObjectFactory.h"
#include "vtkPNGWriter.h"
#include "vtkPNMWriter.h"
#include "vtkPointData.h"
#include "vtkTIFFWriter.h"
#include "vtkUnsignedCharArray.h"
#include "vtkXMLImageDataWriter.h"
#include "vtkZLibDataCompressor.h"
#include <cassert>
#include <cmath>
#include <iostream>
#include <queue>
#include <vtksys/SystemTools.hxx>
#define MAX_NUMBER_OF_THREADS_IN_POOL 32
//****************************************************************************
namespace
{
class vtkSharedData
{
public:
struct InputValueType
{
public:
vtkSmartPointer<vtkImageData> Image;
std::string FileName;
InputValueType()
: Image(nullptr)
, FileName("")
{
}
};
typedef std::queue<InputValueType> InputQueueType;
typedef std::map<vtkTypeUInt32, InputValueType> InputMapType;
private:
bool Done;
vtkSimpleMutexLock DoneLock;
vtkSimpleMutexLock ThreadDoneLock;
vtkSimpleConditionVariable ThreadDone;
int ActiveThreadCount;
//------------------------------------------------------------------------
// Constructs used to synchronization.
vtkSimpleMutexLock InputsLock;
vtkSimpleConditionVariable InputsAvailable;
//------------------------------------------------------------------------
// InputsLock must be held before accessing any of the following members.
InputQueueType Inputs;
public:
//------------------------------------------------------------------------
vtkSharedData()
: Done(false)
, ActiveThreadCount(0)
{
}
//------------------------------------------------------------------------
// Each thread should call this method when it starts. It helps us clean up
// threads when they are done.
void BeginWorker()
{
this->ThreadDoneLock.Lock();
this->ActiveThreadCount++;
this->ThreadDoneLock.Unlock();
}
//------------------------------------------------------------------------
// Each thread should call this method when it ends.
void EndWorker()
{
this->ThreadDoneLock.Lock();
this->ActiveThreadCount--;
bool last_thread = (this->ActiveThreadCount == 0);
this->ThreadDoneLock.Unlock();
if (last_thread)
{
this->ThreadDone.Signal();
}
}
//------------------------------------------------------------------------
void RequestAndWaitForWorkersToEnd()
{
// Get the done lock so we other threads don't end up testing the Done
// flag and quitting before this thread starts to wait for them to quit.
this->DoneLock.Lock();
this->Done = true;
// Grab the ThreadDoneLock. so even if any thread ends up check this->Done
// as soon as we release the lock, it won't get a chance to terminate.
this->ThreadDoneLock.Lock();
// release the done lock. Let threads test for this->Done flag.
this->DoneLock.Unlock();
// Tell all workers that inputs are available, so they will try to check
// the input as well as the done flag.
this->InputsAvailable.Broadcast();
// Now wait for thread to terminate releasing this->ThreadDoneLock as soon
// as we start waiting. Thus, no threads have got a chance to call
// EndWorker() till the main thread starts waiting for them.
this->ThreadDone.Wait(this->ThreadDoneLock);
this->ThreadDoneLock.Unlock();
// reset Done flag since all threads have died.
this->Done = false;
}
//------------------------------------------------------------------------
bool IsDone()
{
this->DoneLock.Lock();
bool val = this->Done;
this->DoneLock.Unlock();
return val;
}
//------------------------------------------------------------------------
void PushImageToQueue(vtkImageData*& data, char* fileName)
{
this->InputsLock.Lock();
{
vtkSharedData::InputValueType value;
value.Image = data;
value.FileName = fileName;
this->Inputs.push(value);
data = nullptr;
}
this->InputsLock.Unlock();
this->InputsAvailable.Signal();
}
//------------------------------------------------------------------------
// NOTE: This method may suspend the calling thread until inputs become
// available.
void GetNextInputToProcess(vtkSmartPointer<vtkImageData>& image, std::string& fileName)
{
this->InputsLock.Lock();
do
{
// Check if we have an input available, if so, return it.
if (this->Inputs.size() > 0)
{
InputValueType& input = this->Inputs.front();
image = input.Image;
input.Image = nullptr;
fileName = input.FileName;
this->Inputs.pop();
}
if (image == nullptr && !this->IsDone())
{
// No data is available, let's wait till it becomes available.
this->InputsAvailable.Wait(this->InputsLock);
}
} while (image == nullptr && !this->IsDone());
this->InputsLock.Unlock();
}
};
VTK_THREAD_RETURN_TYPE Worker(void* calldata)
{
vtkMultiThreader::ThreadInfo* info = reinterpret_cast<vtkMultiThreader::ThreadInfo*>(calldata);
vtkSharedData* sharedData = reinterpret_cast<vtkSharedData*>(info->UserData);
sharedData->BeginWorker();
while (true)
{
vtkSmartPointer<vtkImageData> image;
std::string fileName = "";
sharedData->GetNextInputToProcess(image, fileName);
if (image == nullptr)
{
// end thread.
break;
}
std::size_t pos = fileName.rfind(".");
std::string extension = fileName.substr(pos + 1);
if (extension == "Z")
{
vtkNew<vtkZLibDataCompressor> zLib;
float* zBuf = static_cast<vtkFloatArray*>(image->GetPointData()->GetScalars())->GetPointer(0);
size_t bufSize = image->GetNumberOfPoints() * sizeof(float);
unsigned char* cBuffer = new unsigned char[bufSize];
size_t compressSize = zLib->Compress((unsigned char*)zBuf, bufSize, cBuffer, bufSize);
ofstream fileHandler(fileName.c_str(), ios::out | ios::binary);
fileHandler.write((const char*)cBuffer, compressSize);
delete[] cBuffer;
}
else if (extension == "png")
{
vtkNew<vtkPNGWriter> writer;
writer->SetFileName(fileName.c_str());
writer->SetInputData(image);
writer->Write();
}
else if (extension == "jpg" || extension == "jpeg")
{
vtkNew<vtkJPEGWriter> writer;
writer->SetFileName(fileName.c_str());
writer->SetInputData(image);
writer->Write();
}
else if (extension == "bmp")
{
vtkNew<vtkBMPWriter> writer;
writer->SetFileName(fileName.c_str());
writer->SetInputData(image);
writer->Write();
}
else if (extension == "ppm")
{
vtkNew<vtkPNMWriter> writer;
writer->SetFileName(fileName.c_str());
writer->SetInputData(image);
writer->Write();
}
else if (extension == "tif" || extension == "tiff")
{
vtkNew<vtkTIFFWriter> writer;
writer->SetFileName(fileName.c_str());
writer->SetInputData(image);
writer->Write();
}
else if (extension == "vti")
{
vtkNew<vtkXMLImageDataWriter> writer;
writer->SetFileName(fileName.c_str());
writer->SetInputData(image);
writer->Write();
}
else
{
vtkDataArray* scalars = image->GetPointData()->GetScalars();
int scalarSize = scalars->GetDataTypeSize();
const char* scalarPtr = static_cast<const char*>(scalars->GetVoidPointer(0));
size_t numberOfScalars = image->GetNumberOfPoints();
ofstream fileHandler(fileName.c_str(), ios::out | ios::binary);
fileHandler.write(scalarPtr, numberOfScalars * scalarSize);
}
}
sharedData->EndWorker();
return VTK_THREAD_RETURN_VALUE;
}
}
//****************************************************************************
class vtkThreadedImageWriter::vtkInternals
{
private:
std::map<vtkTypeUInt32, vtkSmartPointer<vtkUnsignedCharArray> > ClonedOutputs;
std::vector<int> RunningThreadIds;
public:
vtkNew<vtkMultiThreader> Threader;
vtkSharedData SharedData;
vtkTypeUInt64 Counter;
vtkSmartPointer<vtkUnsignedCharArray> lastBase64Image;
vtkInternals()
: Counter(0)
{
lastBase64Image = vtkSmartPointer<vtkUnsignedCharArray>::New();
}
void TerminateAllWorkers()
{
// request and wait for all threads to close.
if (!this->RunningThreadIds.empty())
{
this->SharedData.RequestAndWaitForWorkersToEnd();
}
// Stop threads
while (!this->RunningThreadIds.empty())
{
this->Threader->TerminateThread(this->RunningThreadIds.back());
this->RunningThreadIds.pop_back();
}
}
void SpawnWorkers(vtkTypeUInt32 numberOfThreads)
{
for (vtkTypeUInt32 cc = 0; cc < numberOfThreads; cc++)
{
this->RunningThreadIds.push_back(this->Threader->SpawnThread(&Worker, &this->SharedData));
}
}
};
vtkStandardNewMacro(vtkThreadedImageWriter);
//----------------------------------------------------------------------------
vtkThreadedImageWriter::vtkThreadedImageWriter()
: Internals(new vtkInternals())
{
this->MaxThreads = MAX_NUMBER_OF_THREADS_IN_POOL;
}
//----------------------------------------------------------------------------
vtkThreadedImageWriter::~vtkThreadedImageWriter()
{
this->Internals->TerminateAllWorkers();
delete this->Internals;
this->Internals = nullptr;
}
//----------------------------------------------------------------------------
void vtkThreadedImageWriter::SetMaxThreads(vtkTypeUInt32 maxThreads)
{
if (maxThreads < MAX_NUMBER_OF_THREADS_IN_POOL && maxThreads > 0)
{
this->MaxThreads = maxThreads;
}
}
//----------------------------------------------------------------------------
void vtkThreadedImageWriter::Initialize()
{
// Stop any started thread first
this->Internals->TerminateAllWorkers();
// Make sure we don't keep adding new threads
// this->Internals->TerminateAllWorkers();
// Register new worker threads
this->Internals->SpawnWorkers(this->MaxThreads);
}
//----------------------------------------------------------------------------
void vtkThreadedImageWriter::EncodeAndWrite(vtkImageData* image, char* fileName)
{
// Error checking
if (image == nullptr)
{
vtkErrorMacro(<< "Write:Please specify an input!");
return;
}
this->PushImageToQueue(image, fileName);
}
//----------------------------------------------------------------------------
void vtkThreadedImageWriter::PushImageToQueue(vtkImageData*& data, char* fileName)
{
this->Internals->SharedData.PushImageToQueue(data, fileName);
assert(data == nullptr);
}
//----------------------------------------------------------------------------
void vtkThreadedImageWriter::PrintSelf(ostream& os, vtkIndent indent)
{
this->Superclass::PrintSelf(os, indent);
}
//----------------------------------------------------------------------------
void vtkThreadedImageWriter::Finalize()
{
this->Internals->TerminateAllWorkers();
}
/*=========================================================================
Program: Visualization Toolkit
Module: vtkThreadedImageWriter.h
Copyright (c) Ken Martin, Will Schroeder, Bill Lorensen
All rights reserved.
See Copyright.txt or http://www.kitware.com/Copyright.htm for details.
This software is distributed WITHOUT ANY WARRANTY; without even
the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
PURPOSE. See the above copyright notice for more information.
=========================================================================*/
/**
* @class vtkThreadedImageWriter
* @brief class used to compress/write images using threads to prevent
* locking while encoding data.
*
* @details This writer allow to encode an image data based on its file
* extension: tif, tiff, bpm, png, jpg, jpeg, vti, Z, ppm, raw
*
* @author Patricia Kroll Fasel @ LANL
*/
#ifndef vtkThreadedImageWriter_h
#define vtkThreadedImageWriter_h
#include "vtkIOAsynchronousModule.h" // For export macro
#include "vtkObject.h"
class vtkImageData;
class VTKIOASYNCHRONOUS_EXPORT vtkThreadedImageWriter : public vtkObject
{
public:
static vtkThreadedImageWriter* New();
vtkTypeMacro(vtkThreadedImageWriter, vtkObject);
void PrintSelf(ostream& os, vtkIndent indent) override;
/**
* Need to be called at least once before using the class.
* Then it should be called again after any change on the
* thread count or if Finalize() was called.
*
* This method will wait for any running thread to terminate and start
* a new pool with the given number of threads.
*/
void Initialize();
/**
* Use vtkErrorMacro to check that image is valid then
* call PushImageToQueue() internally if image is provided.
*/
void EncodeAndWrite(vtkImageData* image, char* fileName);
/**
* Push an image into the threaded writer. It is not safe to modify the image
* after this point, including changing the reference counts for it.
* You may run into thread safety issues. Typically, the caller code will
* simply release reference to the data and stop using it.
*/
void PushImageToQueue(vtkImageData*& data, char* fileName);
/**
* Define the number of worker thread to use.
* Initialize() need to be called after any thread count change.
*/
void SetMaxThreads(vtkTypeUInt32);
vtkGetMacro(MaxThreads, vtkTypeUInt32);
/**
* This method will wait for any running thread to terminate.
*/
void Finalize();
protected:
vtkThreadedImageWriter();
~vtkThreadedImageWriter() override;
private:
vtkThreadedImageWriter(const vtkThreadedImageWriter&) = delete;
void operator=(const vtkThreadedImageWriter&) = delete;
class vtkInternals;
vtkInternals* Internals;
vtkTypeUInt32 MaxThreads;
};
#endif
......@@ -19,6 +19,7 @@ set(Module_SRCS
vtkMedicalImageReader2.cxx
vtkMetaImageReader.cxx
vtkMetaImageWriter.cxx
vtkMRCReader.cxx
vtkNIFTIImageHeader.cxx
vtkNIFTIImageReader.cxx
vtkNIFTIImageWriter.cxx
......@@ -33,7 +34,6 @@ set(Module_SRCS
vtkTIFFWriter.cxx
vtkVolume16Reader.cxx
vtkVolumeReader.cxx
vtkMRCReader.cxx
)
vtk_module_library(vtkIOImage ${Module_SRCS})
Markdown is supported
0%