Skip to content
Snippets Groups Projects
Commit 70a99d5a authored by Nick Laurenson's avatar Nick Laurenson
Browse files

Add a generic PacketFileSender which is based on pcap timestamp

- use wall clock instead of CPU clock (bugfix)
- make the CLI more user friendly
parent 16ce1d6a
No related branches found
No related tags found
No related merge requests found
......@@ -83,7 +83,7 @@ endif(WIN32)
set(Boost_USE_MULTITHREADED ON)
# the expected behavior of find_package when used wiht REQUIRED and OPTIONAL_COMPONENTS
# it not the one expected, as it also required the optional component...
find_package(Boost REQUIRED COMPONENTS system thread chrono regex date_time filesystem)
find_package(Boost REQUIRED COMPONENTS system thread chrono regex date_time filesystem program_options)
include_directories(${SYSTEM_OPTION} ${Boost_INCLUDE_DIRS})
set(ALL_BOOST_LIBRARIES ${Boost_LIBRARIES})
find_package(Boost OPTIONAL_COMPONENTS atomic timer)
......
......@@ -13,125 +13,88 @@
// limitations under the License.
#include "vvPacketSender.h"
#include "vtkDataPacket.h"
#include "vtkPacketFileReader.h"
#include <boost/asio.hpp>
#include <boost/thread/thread.hpp>
using namespace DataPacketFixedLength;
//-----------------------------------------------------------------------------
class vvPacketSender::vvInternal
{
public:
vvInternal(std::string destinationIp, int lidarPort, int positionPort)
: LIDARSocket(0)
, PositionSocket(0)
, PacketReader(0)
, Done(false)
, lastTimestamp(0)
, PacketCount(0)
, LIDAREndpoint(boost::asio::ip::address_v4::from_string(destinationIp), lidarPort)
, PositionEndpoint(boost::asio::ip::address_v4::from_string(destinationIp), positionPort)
{
}
boost::asio::ip::udp::socket* LIDARSocket;
boost::asio::ip::udp::socket* PositionSocket;
vtkPacketFileReader* PacketReader;
bool Done;
unsigned int lastTimestamp;
size_t PacketCount;
boost::asio::ip::udp::endpoint LIDAREndpoint;
boost::asio::ip::udp::endpoint PositionEndpoint;
boost::asio::io_service IOService;
};
//-----------------------------------------------------------------------------
vvPacketSender::vvPacketSender(
std::string pcapfile, std::string destinationIp, int lidarPort, int positionPort)
: Internal(new vvPacketSender::vvInternal(destinationIp, lidarPort, positionPort))
: LIDARSocket(0)
, LIDAREndpoint(boost::asio::ip::address_v4::from_string(destinationIp), lidarPort)
, PositionSocket(0)
, PositionEndpoint(boost::asio::ip::address_v4::from_string(destinationIp), positionPort)
, PacketReader(0)
, Done(false)
, PacketCount(0)
{
this->Internal->PacketReader = new vtkPacketFileReader;
this->Internal->PacketReader->Open(pcapfile);
if (!this->Internal->PacketReader->IsOpen())
this->PacketReader = new vtkPacketFileReader;
this->PacketReader->Open(pcapfile);
if (!this->PacketReader->IsOpen())
{
throw std::runtime_error("Unable to open packet file");
}
this->Internal->LIDARSocket = new boost::asio::ip::udp::socket(this->Internal->IOService);
this->Internal->LIDARSocket->open(this->Internal->LIDAREndpoint.protocol());
this->Internal->LIDARSocket->set_option(boost::asio::ip::udp::socket::reuse_address(true));
this->LIDARSocket = new boost::asio::ip::udp::socket(this->IOService);
this->LIDARSocket->open(this->LIDAREndpoint.protocol());
this->LIDARSocket->set_option(boost::asio::ip::udp::socket::reuse_address(true));
// Allow to send the packet on the same machine
this->Internal->LIDARSocket->set_option(boost::asio::ip::multicast::enable_loopback(true));
this->LIDARSocket->set_option(boost::asio::ip::multicast::enable_loopback(true));
this->Internal->PositionSocket = new boost::asio::ip::udp::socket(this->Internal->IOService);
this->Internal->PositionSocket->open(this->Internal->PositionEndpoint.protocol());
this->Internal->PositionSocket->set_option(boost::asio::ip::udp::socket::reuse_address(true));
this->PositionSocket = new boost::asio::ip::udp::socket(this->IOService);
this->PositionSocket->open(this->PositionEndpoint.protocol());
this->PositionSocket->set_option(boost::asio::ip::udp::socket::reuse_address(true));
// Allow to send the packet on the same machine
this->Internal->PositionSocket->set_option(boost::asio::ip::multicast::enable_loopback(true));
this->PositionSocket->set_option(boost::asio::ip::multicast::enable_loopback(true));
}
//-----------------------------------------------------------------------------
vvPacketSender::~vvPacketSender()
{
delete this->Internal->LIDARSocket;
delete this->Internal->PositionSocket;
delete this->Internal;
delete this->LIDARSocket;
delete this->PositionSocket;
}
//-----------------------------------------------------------------------------
int vvPacketSender::pumpPacket()
double vvPacketSender::pumpPacket()
{
if (this->Internal->Done)
if (this->Done)
{
return std::numeric_limits<int>::max();
}
// some return value
const unsigned char* data = 0;
unsigned int dataLength = 0;
double timeSinceStart = 0;
int timeDiff = 0;
if (!this->Internal->PacketReader->NextPacket(data, dataLength, timeSinceStart))
// Get the next packet
if (!this->PacketReader->NextPacket(data, dataLength, timeSinceStart))
{
this->Internal->Done = true;
this->Done = true;
return timeSinceStart;
}
// Position packet
if ((dataLength == 512))
{
this->Internal->PositionSocket->send_to(
boost::asio::buffer(data, dataLength), this->Internal->PositionEndpoint);
this->PositionSocket->send_to(
boost::asio::buffer(data, dataLength), this->PositionEndpoint);
}
// Recurse until we get to the right kind of packet
else
// if (dataLength == 1206)
else // Lidar packet
{
const HDLDataPacket* dataPacket = reinterpret_cast<const HDLDataPacket*>(data);
timeDiff =
static_cast<int>(dataPacket->gpsTimestamp) - static_cast<int>(this->Internal->lastTimestamp);
this->Internal->lastTimestamp = dataPacket->gpsTimestamp;
++this->Internal->PacketCount;
this->Internal->LIDARSocket->send_to(
boost::asio::buffer(data, dataLength), this->Internal->LIDAREndpoint);
this->LIDARSocket->send_to(
boost::asio::buffer(data, dataLength), this->LIDAREndpoint);
}
return timeDiff;
this->PacketCount++;
return timeSinceStart;
}
//-----------------------------------------------------------------------------
size_t vvPacketSender::packetCount() const
size_t vvPacketSender::GetPacketCount() const
{
return this->Internal->PacketCount;
return this->PacketCount;
}
//-----------------------------------------------------------------------------
bool vvPacketSender::done() const
bool vvPacketSender::IsDone() const
{
return this->Internal->Done;
return this->Done;
}
......@@ -15,6 +15,11 @@
#include <string>
#include <vtkSystemIncludes.h>
#include "vtkPacketFileReader.h"
#include <boost/asio.hpp>
#include <boost/thread/thread.hpp>
class vtkPacketFileReader;
class VTK_EXPORT vvPacketSender
......@@ -24,11 +29,34 @@ public:
int lidarport = 2368, int positionport = 8308);
~vvPacketSender();
int pumpPacket();
bool done() const;
size_t packetCount() const;
/**
* @brief pumpPacket
* @return the timestamp of the packet send
*/
double pumpPacket();
/**
* @copydoc Done
*/
bool IsDone() const;
/**
* @brief PacketCount return the number of packet already send
*/
size_t GetPacketCount() const;
private:
class vvInternal;
vvInternal* Internal;
boost::asio::ip::udp::socket* LIDARSocket;
boost::asio::ip::udp::endpoint LIDAREndpoint;
boost::asio::ip::udp::socket* PositionSocket;
boost::asio::ip::udp::endpoint PositionEndpoint;
boost::asio::io_service IOService;
vtkPacketFileReader* PacketReader;
//! Indicate if the end of the pcap file has been reach
bool Done;
//! Number of packet already send
size_t PacketCount;
};
......@@ -28,6 +28,7 @@
// .NAME PacketFileSender -
// .SECTION Description
// This program reads a pcap file and sends the packets using UDP.
// The default playback speed is based on the timestamps specified in the pcap file
#include "vtkPacketFileReader.h"
#include "vvPacketSender.h"
......@@ -36,135 +37,123 @@
#include <ctime>
#include <iostream>
#include <string>
#include <chrono>
#include <boost/thread/thread.hpp>
#include <boost/program_options.hpp>
namespace po = boost::program_options;
const int OUTPUT_WIDTH = 15; // width of the column (#packet, duration, ...) in the output stream
int main(int argc, char* argv[])
{
if (argc < 2)
{
std::cout << "Usage: " << argv[0]
<< " <packet file> [loop] [ip] [dataPort] [position Port] [PlaybackSpeedMultiplier]"
<< std::endl;
return 1;
bool loop = false; // run the capture 1 time or in loop
// parse the command line options
po::options_description visible("Allowed options");
visible.add_options()
("help", "produce help message")
("ip", po::value<std::string>()->default_value("127.0.0.1"), "destination ip adress")
("loop", po::bool_switch(&loop), "run the capture in loop")
("lidarPort", po::value<unsigned int>()->default_value(2368), "destination port for lidar packets")
("GPSPort", po::value<unsigned int>()->default_value(8308), "destination port for GPS packets")
("speed", po::value<double>()->default_value(1), "playback speed")
("display-frequency", po::value<unsigned int>()->default_value(1000), "print information after every interval of X sent packets")
;
po::options_description hidden("Hidden options");
hidden.add_options()
("input-file", po::value<std::string>(), "input file")
;
po::positional_options_description p;
p.add("input-file", -1);
po::options_description cmdline_options;
cmdline_options.add(visible).add(hidden);
po::variables_map vm;
po::store(po::command_line_parser(argc, argv).
options(cmdline_options).positional(p).run(), vm);
po::notify(vm);
if (vm.count("help") || argc < 2) {
cout << "Usage: PacketFileSender <pcap_file> [options]\n";
cout << visible << "\n";
return 1;
}
std::string filename(argv[1]);
int loop = 0;
double speed = 1;
std::string destinationIp = "127.0.0.1";
int dataPort = 2368;
int positionPort = 8308;
if (argc > 2)
{
loop = atoi(argv[2]);
}
if (argc > 3)
{
destinationIp = argv[3];
}
if (argc > 5)
{
dataPort = atoi(argv[4]);
positionPort = atoi(argv[5]);
}
if (argc > 6)
{
speed = static_cast<double>(atof(argv[6]));
}
// Default time to wait -> it is the elapsed
// time between two firing of a HDL-32 sensor
const int defaultTimeToWait = 553;
// The sensor send one packet every hundreds microseconds
// thus, the first initialization of timeToWait is defaultTimeToWait
int timeToWaitPerPacket = static_cast<int>(1.0 / speed * defaultTimeToWait);
// convert to the right type
std::string filename = vm["input-file"].as<std::string>();
double speed = vm["speed"].as<double>();
std::string destinationIp = vm["ip"].as<std::string>();
unsigned int lidarPort = vm["lidarPort"].as<unsigned int>();
unsigned int GPSPort = vm["GPSPort"].as<unsigned int>();
unsigned int display_frequency = vm["display-frequency"].as<unsigned int>();
// The timer's resolution is only 1000 microsecond
// But we need to send packets every X microseconds
// Thus, we send packet by group of NumberPacketsByPool
// and then wait the necessary time
int NumberPacketsByPool = 40;
// The minimalTimeToWait resolution so that
// timeToWait * NumberPacketsByPool > 1000
double minimalTimeToWait = 1000.0 / NumberPacketsByPool;
// Measurement of the sleep
clock_t T1, T2;
T1 = std::clock();
double elapsedTimeMeasured;
double elapsedTimeBetweenPackets = 0;
const int microSecondsPerSecond = 1e6;
std::cout << "Start sending" << std::endl;
try
{
do
{
vvPacketSender sender(filename, destinationIp, dataPort, positionPort);
// socket.connect(destinationEndpoint);
auto replayStartTime = std::chrono::steady_clock::now();
// output the column header for the displayed values
std::cout << "----------------------------------------------------------------------------" << std::endl
<< std::right << std::setw(OUTPUT_WIDTH) << "# packets"
<< std::right << std::setw(OUTPUT_WIDTH) << "duration (s)"
<< std::right << std::setw(OUTPUT_WIDTH) << "f (Hz)"
<< std::right << std::setw(OUTPUT_WIDTH) << "delay (us)"
<< std::endl
<< "----------------------------------------------------------------------------" << std::endl;
// Create a Packet Sender
vvPacketSender sender(filename, destinationIp, lidarPort, GPSPort);
// Case starting time
double pcapStartTime = sender.pumpPacket();
sender.pumpPacket();
while (!sender.done())
while (!sender.IsDone())
{
// Get the timestamp of the packet to compute
// the next timeToWait
int currentTimeDiff = sender.pumpPacket();
// Elapsed time measured from the timestamp of the packets
// This timestamp will be used to compute the next time to wait
elapsedTimeBetweenPackets += currentTimeDiff;
// Every 1000 packets sent, give some information
// about the number of packet sent, the elapsed time
// and the next time to wait for the 1000 next packets
if ((sender.packetCount() % 1000) == 0)
{
// Elapsed time measured from the clock of the computer
// This timestamp will be used to inform the user
// This timestamp should be greater than elapsedTime2
T2 = std::clock();
elapsedTimeMeasured = T2 - T1;
std::cout << "total sent packets : " << sender.packetCount() << std::endl
<< " Elapsed time per packets asked : " << timeToWaitPerPacket
<< " microseconds" << std::endl
<< " Elapsed time per packets measured : " << elapsedTimeMeasured
<< " microseconds" << std::endl
<< std::endl;
// time from the pcap file
double pcapCurrentTime = sender.pumpPacket();
double pcapmicroSecondsSinceStart = (pcapCurrentTime - pcapStartTime) * microSecondsPerSecond;
// time from the wall clock
auto replayCurrentTime = std::chrono::steady_clock::now();
int replaymicroSecondsSinceStart =
std::chrono::duration_cast<std::chrono::microseconds>(replayCurrentTime - replayStartTime).count();
timeToWaitPerPacket = static_cast<int>(1.0 / speed * elapsedTimeBetweenPackets / 1000);
// If the computed timeToWait is too high we assume that
// there is a corruption into the packets timestamp
// then we set the timeToWait to the HDL-32 firing rate
if (timeToWaitPerPacket > 2500)
{
timeToWaitPerPacket = static_cast<int>(1.0 / speed * defaultTimeToWait);
}
// If the computed timeToWait is under the minimal resolution
// we set the timeToWait to this minimal resolution
if (timeToWaitPerPacket < minimalTimeToWait)
{
timeToWaitPerPacket = static_cast<int>(minimalTimeToWait);
}
// Refresh the measured timestamps
T1 = std::clock();
elapsedTimeBetweenPackets = 0;
// check if the replay is to much in advance compared to the pcap time step
// if this is the case, we sleep the thread until the pcap catch up the replay
double time_delay = static_cast<double>(pcapmicroSecondsSinceStart) / speed - replaymicroSecondsSinceStart;
if (time_delay > 0)
{
boost::this_thread::sleep(
boost::posix_time::microseconds(static_cast<int>(time_delay)));
}
// boost::this_thread::sleep(boost::posix_time::microseconds(553));
// Every NumberPacketsByPool packets we wait to be consistent with
// the real recording of the data. The number of packets should not be
// too small -> the clock resolution is 1000 microsecond. Hence,
// NumberPacketsByPool * timeToWait should be greater than 1000
if ((sender.packetCount() % NumberPacketsByPool) == 0)
// Display the user some information
if ((sender.GetPacketCount() % display_frequency) == 0)
{
boost::this_thread::sleep(
boost::posix_time::microseconds(NumberPacketsByPool * timeToWaitPerPacket));
// Compute nb of packets sended including the one from previous loops
int nbPacketSended = sender.GetPacketCount();
// Compute time since the replay began
double secondSinceStart = static_cast<double>(replaymicroSecondsSinceStart) / microSecondsPerSecond;
// Nice output
std::cout << std::fixed
<< std::right << std::setw(OUTPUT_WIDTH) << nbPacketSended
<< std::fixed << std::right << std::setw(OUTPUT_WIDTH) << secondSinceStart
<< std::defaultfloat << std::right << std::setw(OUTPUT_WIDTH)
<< static_cast<double>(nbPacketSended) / secondSinceStart
<< std::right << std::setw(OUTPUT_WIDTH) << time_delay
<< std::endl;
}
}
} while (loop);
......
......@@ -76,7 +76,7 @@ int main(int argc, char* argv[])
vvPacketSender sender(pcapFileName, destinationIp, dataPort);
boost::this_thread::sleep(boost::posix_time::microseconds(1000));
sender.pumpPacket();
while (!sender.done())
while (!sender.IsDone())
{
sender.pumpPacket();
boost::this_thread::sleep(boost::posix_time::microseconds(1000));
......@@ -108,7 +108,7 @@ int main(int argc, char* argv[])
boost::this_thread::sleep(boost::posix_time::microseconds(1000));
sender.pumpPacket();
while (!sender.done())
while (!sender.IsDone())
{
sender.pumpPacket();
boost::this_thread::sleep(boost::posix_time::microseconds(1000));
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment