Skip to content

Instantly share code, notes, and snippets.

@zhehaowang
Created December 8, 2016 23:45
Show Gist options
  • Save zhehaowang/500cf1c1c2440df29c8b5c19c6f97b24 to your computer and use it in GitHub Desktop.
Save zhehaowang/500cf1c1c2440df29c8b5c19c6f97b24 to your computer and use it in GitHub Desktop.
NDN-opt test publisher
#include <cstdlib>
#include <iostream>
#include <time.h>
#include <unistd.h>
/*
#include <boost/thread/thread.hpp>
#include <boost/thread/mutex.hpp>
*/
#include <ndn-cpp/common.hpp>
#include <ndn-cpp/interest.hpp>
#include <ndn-cpp/data.hpp>
#include <ndn-cpp/transport/tcp-transport.hpp>
#include <ndn-cpp/transport/udp-transport.hpp>
#include <ndn-cpp/face.hpp>
#include <ndn-cpp/util/memory-content-cache.hpp>
#include <ndn-cpp/security/key-chain.hpp>
using namespace std;
using namespace ndn;
string timestamp = "1000";
string trackId = "10";
/*
namespace ndn
{
class NdnController {
public:
NdnController();
~NdnController();
int
startNdnProcessing();
int
stopNdnProcessing();
int
publishMessage(const std::string& name, const int& dataFreshnessMs, const void* message, const int& messageLength);
std::string
getBasePrefix();
static std::string
getInstancePrefix(const std::string& hubPrefix, const std::string& nodeName);
static int
getInstanceStartTime();
static void
instanceStart();
private:
//ros::WallTimer faceEventsTimer_;
boost::mutex faceMutex_;
Parameters parameters_;
ptr_lib::shared_ptr<ndn::KeyChain> keyChain_;
ptr_lib::shared_ptr<ndn::Face> face_;
ptr_lib::shared_ptr<MemoryContentCache> memoryCache_;
void
startProcessEventsLoop();
void
processFaceEventsCallback(const ros::WallTimerEvent& timerEvent);
void
onRegisterFailed(const ptr_lib::shared_ptr<const Name>& prefix);
};
}
static int instanceStartTime_ = 0;
NdnController::NdnController():
keyChain_(new KeyChain())
{
face_.reset(new Face());
face_->setCommandSigningInfo(*keyChain_, keyChain_->getDefaultCertificateName());
// create memory content cache
memoryCache_.reset(new MemoryContentCache(face_.get()));
}
NdnController::~NdnController()
{
stopNdnProcessing();
cout << "NdnController dtor" << endl;
}
int
NdnController::startNdnProcessing()
{
cout << "start NDN processing..." << endl;
// register prefix
Name instancePrefix("/ndn/edu/ucla/remap");
memoryCache_->registerPrefix(instancePrefix, bind(&NdnController::onRegisterFailed,
this, _1));
cout << "prefix registration intiated. starting process events loop..." << endl;
// start processEvents loop
startProcessEventsLoop();
cout << "NDN processing started" << endl;
return 0;
}
int
NdnController::stopNdnProcessing()
{
cout << "stop NDN processing..." << endl;
faceEventsTimer_.stop();
memoryCache_->unregisterAll();
return 0;
}
int
NdnController::publishMessage(const string& name, const int& dataFreshnessMs, const void* message, const int& messageLength)
{
Name dataName(name);
Data ndnData(dataName);
ndnData.getMetaInfo().setFreshnessPeriod(dataFreshnessMs);
int ndnDataLength = (messageLength > parameters_.segmentLength)? parameters_.segmentLength : messageLength;
ndnData.setContent((const uint8_t*)message, ndnDataLength);
keyChain_->sign(ndnData, keyChain_->getDefaultCertificateName());
{
boost::mutex::scoped_lock scopedLock(faceMutex_);
memoryCache_->add(ndnData);
}
cout << "published data " << name << endl;
return 0;
}
string
NdnController::getBasePrefix()
{
stringstream ss;
ss << parameters_.prefix << "/" << getInstanceStartTime();
return ss.str();
}
string
NdnController::getInstancePrefix(const string& hubPrefix, const string& nodeName)
{
stringstream ss;
ss << hubPrefix << "/" << NameComponents::NameComponentApp << "/" << nodeName;
return ss.str();
}
void
NdnController::instanceStart()
{
if (instanceStartTime_ == 0)
{
ros::Time time = ros::Time::now();
instanceStartTime_ = time.sec;
}
}
int
NdnController::getInstanceStartTime()
{
return instanceStartTime_;
}
// private
void
NdnController::onRegisterFailed(const ptr_lib::shared_ptr<const Name>& prefix)
{
cout << "prefix registration failed" << endl;
throw new std::runtime_error("prefix registration failed");
}
void
NdnController::startProcessEventsLoop()
{
cout << "starting face events timer..." << endl;
//faceEventsTimer_ = parameters_.nh.createWallTimer(ros::WallDuration(0.001), &NdnController::processFaceEventsCallback, this);
cout << "timer started" << endl;
}
void
NdnController::processFaceEventsCallback(const ros::WallTimerEvent& timerEvent)
{
boost::mutex::scoped_lock scopedLock(faceMutex_);
face_->processEvents();
}
*/
class Publisher {
public:
Publisher(KeyChain &keyChain, const Name& certificateName)
: keyChain_(keyChain), certificateName_(certificateName), responseCount_(0)
{
}
// onInterest.
void operator()
(const ptr_lib::shared_ptr<const Name>& prefix,
const ptr_lib::shared_ptr<const Interest>& interest, Transport& transport,
uint64_t registeredPrefixId)
{
cout << "Data not found" << interest->getName().toUri() << endl;
// initial interest
if (interest->getName().size() == 6) {
cout << "Initial interest" << endl;
} else if (interest->getName().get(-1).toEscapedString() == "track_hint") {
cout << "Fetch track hint interest" << endl;
}
// Make and sign a Data packet.
/*
Data data(interest->getName());
string content(string("Publisher ") + interest->getName().toUri());
data.setContent((const uint8_t *)&content[0], content.size());
keyChain_.sign(data, certificateName_);
Blob encodedData = data.wireEncode();
cout << "Sent content " << content << endl;
transport.send(*encodedData);
*/
}
// onRegisterFailed.
void operator()(const ptr_lib::shared_ptr<const Name>& prefix)
{
++responseCount_;
cout << "Register failed for prefix " << prefix->toUri() << endl;
}
KeyChain keyChain_;
Name certificateName_;
int responseCount_;
int seq_;
};
int main(int argc, char** argv)
{
try {
// The default Face will connect using a Unix socket, or to "localhost".
Face face;
// Use the system default key chain and certificate name to sign commands.
KeyChain keyChain;
face.setCommandSigningInfo(keyChain, keyChain.getDefaultCertificateName());
ptr_lib::shared_ptr<MemoryContentCache> memoryCache;
memoryCache.reset(new MemoryContentCache(&face));
// Also use the default certificate name to sign data packets.
Publisher Publisher(keyChain, keyChain.getDefaultCertificateName());
Name prefix("/ndn/edu/ucla/remap");
memoryCache->registerPrefix(prefix, func_lib::ref(Publisher), func_lib::ref(Publisher));
int cnt = 0;
int seq = 0;
// The main event loop.
// Wait forever to receive one interest for the prefix.
while (true) {
face.processEvents();
// We need to sleep for a few milliseconds so we don't use 100% of the CPU.
usleep(10);
cnt ++;
if (cnt == 3333) {
Name dataName("/ndn/edu/ucla/remap/opt/node0");
dataName.append(timestamp).append("tracks").append(trackId).append(std::to_string(seq));
Data data(dataName);
string content(string("Stub"));
data.setContent((const uint8_t *)&content[0], content.size());
data.getMetaInfo().setFreshnessPeriod(5000);
keyChain.sign(data, keyChain.getDefaultCertificateName());
cout << "Add content " << data.getName().toUri() << endl;
memoryCache->add(data);
cnt = 0;
seq ++;
}
}
} catch (std::exception& e) {
cout << "exception: " << e.what() << endl;
}
return 0;
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment