From 56a03ca188659f10991c677af1fd3f05e3a9f23f Mon Sep 17 00:00:00 2001 From: Badar Ahmed Date: Mon, 24 Mar 2008 18:23:44 +0500 Subject: [PATCH] Some code fixes done to multicast service classes. Previous commit was not compiling. Modifications made to make code compile correctly. --- App/CMakeLists.txt | 1 + App/include/Tourist/App/Application.h | 11 +- App/src/Application.cpp | 96 ++++++++++++++++ App/src/Multicast.cpp | 134 ---------------------- App/testsuite/CMakeLists.txt | 1 + App/testsuite/src/CoreTestSuite.cpp | 2 + App/testsuite/src/TestMulticast.cpp | 22 ---- Constants.h.in | 17 ++- Core/include/Tourist/Constants.h | 11 ++ Message/CMakeLists.txt | 1 + Message/include/Tourist/Message/MessageTypeData.h | 2 + Message/src/MessageTypeData.cpp | 5 + node.properties | 12 +- 13 files changed, 149 insertions(+), 166 deletions(-) diff --git a/App/CMakeLists.txt b/App/CMakeLists.txt index 9e2a606..cdb8298 100644 --- a/App/CMakeLists.txt +++ b/App/CMakeLists.txt @@ -17,6 +17,7 @@ src/PeerConnectionWorker src/Bootstrap src/Bootstrapper src/Application +src/Multicast ) ADD_LIBRARY(TouristApp SHARED ${APP_SRCS}) diff --git a/App/include/Tourist/App/Application.h b/App/include/Tourist/App/Application.h index f7543f4..a5887ac 100644 --- a/App/include/Tourist/App/Application.h +++ b/App/include/Tourist/App/Application.h @@ -64,7 +64,16 @@ public: int getRemoteCache(RemoteNode * contactNode, int cacheType, int startLevel, int endLevel, int timeout, Tourist::NodeSet &result); - //Interface (CallbackInterface) method for connection call back + bool searchRemoteLinks(Node& recipient, NodeSet& result, + unsigned int flags=HIGHER_LEVELS|EQUAL_LEVELS|LOWER_LEVELS| + FIND_SUFFIXES|FIND_PREFIXES|ONE_LEVEL_LOWER); + + Node* getNextHop(NodeSet &audienceSet, Node *source, int step, int type); + + void setLevel(int newLevel); + + + //Interface (CallbackInterface) method for connection call back void callback(void *pipe); void addNewNode(RemoteNode *node); diff --git a/App/src/Application.cpp b/App/src/Application.cpp index 10b2cba..74e8c5b 100644 --- a/App/src/Application.cpp +++ b/App/src/Application.cpp @@ -402,6 +402,102 @@ void Application::updatePipesCache(RemoteNode &rn, AbstractPipe *pipe) } } +bool Application::searchRemoteLinks(Node& query, NodeSet& result, unsigned int flags) +{ + return nodeCache.searchRemoteLinks(query, result, flags); +} + +Node* Application::getNextHop(NodeSet &audienceSet, Node *source, int step, int type) +{ + Node * returnVal = NULL; + NodeSet::Iter iter(audienceSet); + NodeSet prefixNodes; + unsigned int seqN = -1; + char *method = "proto_getNextHop"; + + /*DebugLevel(4, + seqN = logMessage(seqN, method, "audienceSize", toString(audienceSet.treeSize)); + );*/ + + while (iter) + { + Node * nextHop = (Node*) iter->key; + if (nextHop == source) + { + iter++; continue; + } + + elem_t localMask1, remoteMask1, localMask2, remoteMask2; + //step should be within the range of prefixIndex array. + + if (type == TYPE_SUFFIX) + { + localMask1 = nodeCache.suffixIndex(step-1); + remoteMask1 = nextHop->suffixIndex(step-1); + localMask2 = nodeCache.suffixIndex(step); + remoteMask2 = nextHop->suffixIndex(step); + } + else + { + localMask1 = nodeCache.prefixIndex(step-1); + remoteMask1 = nextHop->prefixIndex(step-1); + localMask2 = nodeCache.prefixIndex(step); + remoteMask2 = nextHop->prefixIndex(step); + } + + if (step == 1) + { + //first bit should be different for the recipient + if (localMask2 != remoteMask2) + { + prefixNodes.insert(nextHop); + } + } + else + { + if (localMask1 == remoteMask1 && localMask2 != remoteMask2) + { + prefixNodes.insert(nextHop); + } + } + iter++; + } + + /*if (returnVal) + logMessage(seqN, "getNextHop", "hop", returnVal->hexstring()); + else + logMessage(seqN, "getNextHop", "hop", "NULL");*/ + + //get node for highest level. + if (prefixNodes.treeSize >0) + { + iter = prefixNodes.first(); + int level = -1; + while (iter) + { + Node *nextHop = (Node*) iter->key; + if (nextHop -> getLevel() == 0) + { + returnVal = nextHop; + break; + } + if (nextHop->getLevel() < level) + { + returnVal = nextHop; + level = nextHop->getLevel(); + } + iter++; + } + } + + return returnVal; +} + +void Application::setLevel(int newLevel) +{ + nodeCache.setLevel(newLevel); +} + int Application::getBwUsage() { time_t now = time(NULL); diff --git a/App/src/Multicast.cpp b/App/src/Multicast.cpp index 30e8fd5..9900c7b 100644 --- a/App/src/Multicast.cpp +++ b/App/src/Multicast.cpp @@ -3,8 +3,6 @@ #include "Tourist/Constants.h" #include "Tourist/Util/Util.h" #include "Poco/Instantiator.h" -#include //Remove This - Only for testing - using namespace Tourist::Util; @@ -19,8 +17,6 @@ logger(Logger::get("Tourist.App.Multicast." + Util::toString(objCount) )) { this->app = _app; this->shouldBeRunning = true; - - //this->objCount = objCount; } Multicast::~Multicast() @@ -45,16 +41,6 @@ void Multicast::stop() this -> shouldBeRunning = false; } -/* -string Multicast::getObjCountStr() -{ - if(debugFlag) - { - return "[Node " + Util::toString( - } - -}*/ - void Multicast::processQueue() { //debug(logger, "Multicast::processQueue()"); @@ -62,15 +48,12 @@ void Multicast::processQueue() //consume dispatched message while (queue.size() != 0) { - //info(logger, (debugFlag?(Util::toString(objCount)):"") + "Processing multicast message"); info(logger, "Processing multicast message"); //We Got the message.. let's process it MessageNotification *msgNotify = dynamic_cast (queue.dequeueNotification()); if (msgNotify == NULL) { - //error(logger, debugFlag ? ("[Node " + Util::toString(objCount) + "]") : "" + - //"Notification for multicast response is NULL!"); error(logger, "Notification for multicast response is NULL!"); continue; //let's see if there is any other message in queue. } @@ -88,8 +71,6 @@ void Multicast::processQueue() // Call localNode method findRemoteLinks with approprate flags // in a loop send multicast event to all nodes in NodeSet // end condition: if findRemoteLinks reuturns no node then no sendEvent & multicast finished. - - // TODO: PRoblem, get Localnode RemoteNode initNode = msgMulticast->getInitiatingPeer(); multicastRouting(&initNode, msgMulticast->getEventType(), msgMulticast->getStep()); @@ -105,10 +86,8 @@ void Multicast::multicastRouting(Node* source, int event_type, int step) bool status=false; if (event_type >= 1 && event_type <= 4) { - //findCandidatesMulticast(this, source, audienceSet,LOWER_LEVELS|EQUAL_LEVELS|FIND_PREFIXES|FIND_ALL); status = app->searchRemoteLinks(*source, audienceSet, LOWER_LEVELS|EQUAL_LEVELS|FIND_PREFIXES|FIND_ALL); } else { - //findCandidatesMulticast(this, source, audienceSet,LOWER_LEVELS|EQUAL_LEVELS|FIND_SUFFIXES|FIND_ALL); status = app->searchRemoteLinks(*source, audienceSet, LOWER_LEVELS|EQUAL_LEVELS|FIND_SUFFIXES|FIND_ALL); type = TYPE_SUFFIX; } @@ -134,8 +113,6 @@ void Multicast::multicastRouting(Node* source, int event_type, int step) } -// TODO: LocalNode or Node* -- ??? -//void Multicast::levelChangeEvent(int newLevel, LocalNode *localNode) void Multicast::levelChangeEvent(int newLevel, Node *localNode) { //Initiating prefix multicast @@ -151,41 +128,22 @@ void Multicast::levelChangeEvent(int newLevel, Node *localNode) suffixEvent = EVENT_SUFFIX_LEVEL_DOWN; } -/* Node **prefixTopNode; - localNode -> getTopNode(*localNode, prefixTopNode, TYPE_PREFIX); - int prefixAck = sendEvent(localNode, *prefixTopNode, prefixEvent, 0); -*/ Node *pTopNode; if(! (app -> getTopNode(*localNode, &pTopNode, PREFIX))) error(logger, "Cannot find Top Node!"); else debug(logger, "Top Node found"); int prefixAck = sendEvent(localNode, pTopNode, prefixEvent, 0); - //Node *n=*prefixTopNode; - //debug(logger, "Prefix TN Level: " + Util::toString( n->getLevel() )); - //debug(logger, "Prefix TN Level: " + Util::toString(localNode->getLevel())); - - //pTopNode = NULL; if(! (app -> getTopNode(*localNode, &pTopNode, SUFFIX))) error(logger, "Cannot find Top Node!"); else debug(logger, "Top Node found"); int suffixAck = sendEvent(localNode, pTopNode, suffixEvent, 0); - -/* - Node **suffixTopNode; - localNode -> getTopNode(*localNode, suffixTopNode, TYPE_SUFFIX); - int suffixAck = sendEvent(localNode, *suffixTopNode, suffixEvent, 0); -*/ - } -//TODO: debug & error method's of Poco Logger lib to be used. int Multicast::sendEvent(Node *source, Node *target, int event_type, int step) { - - if (source == NULL || target == NULL) return -1; @@ -274,13 +232,8 @@ int Multicast::sendEvent(Node *source, Node *target, int event_type, int step) //------------------------------------------------------------------------------ void Multicast::receiveMulticastMsg() { - /*MessageDispatcher dispatcher; - dispatcher.registerNotification("onMulticastMsg", - NObserver(*this, &Multicast::onMulticastMsg));*/ - app->getConfig()->getMessageBus()->registerNotification("Multicast", NObserver(*this, &Multicast::onMulticastMsg)); - } void Multicast::onMulticastMsg(const AutoPtr& notification) @@ -290,94 +243,7 @@ void Multicast::onMulticastMsg(const AutoPtr& notification) MessageNotification *notificationObj = const_cast(notification.get()); queue.enqueueNotification(notificationObj); -/* - string component = notification -> message -> getComponentName(); - - string message; - notification -> message -> getString(message); - - debug(logger, "Message Received. Parsing message ...."); - - MsgMulticast *multicast = dynamic_cast (notification -> message); - debug(logger, "Multicast Initiated by: " + multicast->getInitiatingPeer().toString()); - - RemoteNode thisNode; - app->getLocalNodeInfo(thisNode); - debug(logger, "This node is: " + thisNode.toString()); - - - //Initiate Tree based Multicast - - */ -} - -/* -//------------------------------------------------------------------------------ -// Find the next recipient in the chain to forward a message to -Node * findRouter(LocalNode* sender, Node *recipient) { - unsigned int i; - AvliSetEl< Node *> *result; - Node *retval; - - // Test whether the sender is also the recipient - if (*sender == *recipient){ - DoutLevel(2,cerr,"Recipient is Sender"); - return recipient; - } - - // Now search the prefix table - DebugLevel(6, - for (int j=0; jlevel<<"] [findRouter] [backuptable]"; - netLogger.warning(outstream.str()); - outstream.str(string("")); - );*/ - /*for (i=1;i<=sender->level;i++){ - DoutLevel(6,cerr,i<<": ["<BackupTable[i]); - if (sender->BackupTable[i]){ - DoutLevel(6,cerr,"Forwarding to backup level "<< - sender->BackupTable[i]->level<<" "<BackupTable[i]); - DoutLevel(6,cerr,"BackupTable="<BackupTable); - remoteNodes.insert(sender->BackupTable[i]); - } - } - } - - if (remoteNodes.length()==0) retval=NULL; - else{ - NodeCompSet::Iter iter(remoteNodes); - retval=(Node *)iter->key; // Simply return the closest candidate in node space - } - remoteNodes.empty(); - return retval; } -i*/ } //namespace Tourist } //namespace App diff --git a/App/testsuite/CMakeLists.txt b/App/testsuite/CMakeLists.txt index 821ecc4..404876e 100644 --- a/App/testsuite/CMakeLists.txt +++ b/App/testsuite/CMakeLists.txt @@ -12,6 +12,7 @@ src/Driver src/CoreTestSuite src/TestPeerHandshake src/TestCacheXfer +src/TestMulticast ) FIND_LIBRARY(CPPUNIT_LIBRARY NAMES CppUnit) diff --git a/App/testsuite/src/CoreTestSuite.cpp b/App/testsuite/src/CoreTestSuite.cpp index 73de7c8..4345da0 100644 --- a/App/testsuite/src/CoreTestSuite.cpp +++ b/App/testsuite/src/CoreTestSuite.cpp @@ -23,6 +23,7 @@ #include "CppUnit/TestSuite.h" #include "TestPeerHandshake.h" #include "TestCacheXfer.h" +#include "TestMulticast.h" CppUnit::Test* CoreTestSuite::suite() @@ -31,5 +32,6 @@ CppUnit::Test* CoreTestSuite::suite() pSuite -> addTest(TestPeerHandshake::suite()); pSuite -> addTest(TestCacheXfer::suite()); + pSuite -> addTest(TestMulticast::suite()); return pSuite; } diff --git a/App/testsuite/src/TestMulticast.cpp b/App/testsuite/src/TestMulticast.cpp index d906224..388e985 100644 --- a/App/testsuite/src/TestMulticast.cpp +++ b/App/testsuite/src/TestMulticast.cpp @@ -30,8 +30,6 @@ void TestMulticast::tearDown() void TestMulticast::testSendEvent() { - //create Application-1, configure the transport - //create Application-2, configure transport with different port Config config_1; config_1.setTCPEnable(true); @@ -39,9 +37,6 @@ void TestMulticast::testSendEvent() Application app1(config_1); assert(app1.init() == 0); - //std::cout<<"\ncalling init()\n"; - //app1.init(); - //std::cout<<"\ninit() call complete\n"; Config config_2; config_2.setTCPEnable(true); @@ -51,9 +46,6 @@ void TestMulticast::testSendEvent() assert (app2.init() == 0); //---> Prob here . NEED IMMEDIATE FIX - //app1.getConfig()->getLocalNode()->setLevel(0); - //app2.getConfig()->getLocalNode()->setLevel(2); - //app1.setLevel(0); //app2.setLevel(2); @@ -61,19 +53,11 @@ void TestMulticast::testSendEvent() Thread t1, t2; t1.start(app1); t2.start(app2); - //t3.start(app3); Thread::sleep(20); int i=0; - //app1.getConfig()->getLocalNode()->setLevel(0); - //app2.getConfig()->getLocalNode()->setLevel(2); - //LocalNode *localNode = app2.getConfig()->getLocalNode(); - //localNode->setLevel(2); - //std::cout<<"----- Local Node= "<< localNode->getID()<<" : "<getLevel(); - - RemoteNode app1Node, app2Node; app1.getLocalNodeInfo(app1Node); app2.getLocalNodeInfo(app2Node); @@ -89,14 +73,8 @@ void TestMulticast::testSendEvent() tm1.start(multicast); tm2.start(multicast2); - //multicast.receiveMulticastMsg(); multicast2.levelChangeEvent(3, &app2Node); - /*while(i<10) { - Thread::sleep(10); - }*/ - //std::cout<<"Multicast MSG received"; - Thread::sleep(100); app1.stopApp(); diff --git a/Constants.h.in b/Constants.h.in index 8333093..a5ff252 100644 --- a/Constants.h.in +++ b/Constants.h.in @@ -9,11 +9,11 @@ typedef unsigned int elem_t; Internal variable as used by the Id class to create the arry of the size ID_SIZE for holding UUID of the Tourist's Node */ -#define ID_SIZE @ID_SIZE@ +#define ID_SIZE 4 -#define SIZEOF_UNSIGNED_INT @SIZEOF_UNSIGNED_INT@ +#define SIZEOF_UNSIGNED_INT 4 -#define MAX_LEVEL @MAX_LEVEL@ +#define MAX_LEVEL 32 #define PREFIX_BIT (INT_MAX+1) @@ -47,6 +47,17 @@ typedef unsigned int elem_t; #define ONE_LEVEL_LOWER 64 #define FIND_RANDOM 64 +//Event Types +#define EVENT_PREFIX_JOIN 1 +#define EVENT_PREFIX_DISCONNECT 2 +#define EVENT_PREFIX_LEVEL_UP 3 +#define EVENT_PREFIX_LEVEL_DOWN 4 +#define EVENT_SUFFIX_JOIN 5 +#define EVENT_SUFFIX_DISCONNECT 6 +#define EVENT_SUFFIX_LEVEL_UP 7 +#define EVENT_SUFFIX_LEVEL_DOWN 8 +#define TYPE_PREFIX 9 +#define TYPE_SUFFIX 10 // Path to the configuration file containing different customization // parameters for this node. diff --git a/Core/include/Tourist/Constants.h b/Core/include/Tourist/Constants.h index 24571d5..a5ff252 100644 --- a/Core/include/Tourist/Constants.h +++ b/Core/include/Tourist/Constants.h @@ -47,6 +47,17 @@ typedef unsigned int elem_t; #define ONE_LEVEL_LOWER 64 #define FIND_RANDOM 64 +//Event Types +#define EVENT_PREFIX_JOIN 1 +#define EVENT_PREFIX_DISCONNECT 2 +#define EVENT_PREFIX_LEVEL_UP 3 +#define EVENT_PREFIX_LEVEL_DOWN 4 +#define EVENT_SUFFIX_JOIN 5 +#define EVENT_SUFFIX_DISCONNECT 6 +#define EVENT_SUFFIX_LEVEL_UP 7 +#define EVENT_SUFFIX_LEVEL_DOWN 8 +#define TYPE_PREFIX 9 +#define TYPE_SUFFIX 10 // Path to the configuration file containing different customization // parameters for this node. diff --git a/Message/CMakeLists.txt b/Message/CMakeLists.txt index f9bba9c..6fd7fb1 100644 --- a/Message/CMakeLists.txt +++ b/Message/CMakeLists.txt @@ -18,6 +18,7 @@ src/MsgBootstrapReply src/MessageHeader src/MessageTypeData src/MsgPeerAnnouncement +src/MsgMulticast src/RemoteNodeUtil src/MessageProcessor src/PeerNotification diff --git a/Message/include/Tourist/Message/MessageTypeData.h b/Message/include/Tourist/Message/MessageTypeData.h index 1221e75..8ad2f27 100644 --- a/Message/include/Tourist/Message/MessageTypeData.h +++ b/Message/include/Tourist/Message/MessageTypeData.h @@ -43,6 +43,8 @@ public: const static int BOOTSTRAP_REP; const static int PEER_ANNOUNCE; + + const static int MULTICAST; private: DynamicFactory dynFactory; diff --git a/Message/src/MessageTypeData.cpp b/Message/src/MessageTypeData.cpp index e36ce96..43e8b6a 100644 --- a/Message/src/MessageTypeData.cpp +++ b/Message/src/MessageTypeData.cpp @@ -24,6 +24,7 @@ #include "Tourist/Message/MsgPeerAnnouncement.h" #include "Tourist/Message/MsgBootstrapRequest.h" #include "Tourist/Message/MsgBootstrapReply.h" +#include "Tourist/Message/MsgMulticast.h" #include "Tourist/Util/Util.h" #include "Poco/Exception.h" #include "Poco/Instantiator.h" @@ -38,6 +39,7 @@ const int MessageTypeData::CACHE_XFER_REQ = 1; const int MessageTypeData::CACHE_XFER_REP = 2; const int MessageTypeData::BOOTSTRAP_REQ = 3; const int MessageTypeData::BOOTSTRAP_REP = 4; +const int MessageTypeData::MULTICAST = 5; MessageTypeData::MessageTypeData() { @@ -93,7 +95,10 @@ void MessageTypeData::registerBuiltIns() this -> registerMessageObj(BOOTSTRAP_REP, new Instantiator); + + this -> registerMessageObj(MULTICAST, + new Instantiator); } } // namespace Message diff --git a/node.properties b/node.properties index a68e2e0..bba5547 100644 --- a/node.properties +++ b/node.properties @@ -2,7 +2,7 @@ tourist.name = boot #boot porperites -tourist.boot.node = localhost:50001 +tourist.boot.node = localhost:2000 # boot.role controls the behaviour of a node when it comes to # bootstrapping itself or others. If the role value is '1' then @@ -14,15 +14,15 @@ tourist.boot.node = localhost:50001 # and after that will switch to '1' mode. If the value is set to '3' # the node will only bootstrap itself but won't service any other node. -tourist.boot.role = 2 +tourist.boot.role = 1 #transport -tourist.transport.tcp = 0 -tourist.transport.tcp.port = 2020 +tourist.transport.tcp = 1 +tourist.transport.tcp.port = 2020 tourist.tranposrt.rudp = 1 -tourist.transport.rudp.port = 5111 +tourist.transport.rudp = 20001 -#network bandwidth cap in bytes per second +#network tourist.network.bwThreshold = 20 #remote log -- 2.11.4.GIT