Changeset 605 for trunk/avango-core
- Timestamp:
- 01/10/12 17:36:46 (4 months ago)
- File:
-
- 1 edited
-
trunk/avango-core/src/avango/network/NetNode.cpp (modified) (12 diffs)
Legend:
- Unmodified
- Added
- Removed
-
trunk/avango-core/src/avango/network/NetNode.cpp
r542 r605 38 38 #include "NetGroup.h" 39 39 40 #include <avango/Config.h> 41 42 #if defined(ZMQ_DISTRIBUTION_SUPPORT) 43 #include "NetNodeClient.h" 44 #include "NetNodeServer.h" 45 #include <boost/tokenizer.hpp> 46 #endif 47 40 48 namespace 41 49 { 42 50 av::Logger &logger(av::getLogger("av::NetNode")); 51 52 #if defined(ZMQ_DISTRIBUTION_SUPPORT) 53 av::NetNodeClient* gClient = 0; 54 av::NetNodeServer* gServer = 0; 55 const std::string gClientEndpoint("{Endpt:127.0.0.2:34818:215:0}"); 56 const std::string gServerEndpoint("{Endpt:127.0.0.2:34818:215:0}"); 57 #endif 43 58 } 44 59 … … 68 83 #endif 69 84 85 #ifdef ZMQ_DISTRIBUTION_SUPPORT 86 bool isServer; 87 std::string hostName; 88 std::string port; 89 uint64_t serverHWM = 2; 90 { 91 // from http://stackoverflow.com/questions/541561/using-boost-tokenizer-escaped-list-separator-with-different-parameters 92 std::string gn(groupName); 93 94 typedef boost::tokenizer< boost::escaped_list_separator<char> > Tokenizer; 95 boost::escaped_list_separator<char> Separator( ' ', '|' ); 96 Tokenizer tok( gn, Separator ); 97 unsigned numTokens = 0; 98 for (Tokenizer::iterator iter = tok.begin(); iter != tok.end(); ++iter) { 99 ++numTokens; 100 } 101 if (3 == numTokens) { 102 Tokenizer::iterator iter = tok.begin(); 103 isServer = ("AVSERVER" == *iter); 104 ++iter; 105 hostName = *iter; 106 ++iter; 107 port = *iter; 108 } else if (4 == numTokens) { 109 Tokenizer::iterator iter = tok.begin(); 110 isServer = ("AVSERVER" == *iter); 111 ++iter; 112 hostName = *iter; 113 ++iter; 114 port = *iter; 115 serverHWM = atoi((*iter).c_str()); 116 } else { 117 std::stringstream msg; 118 msg << "ERROR in av::NetNode::join(const std::string& groupName), could not join - invalid CONFIG"; 119 throw (std::runtime_error(msg.str())); 120 } 121 122 } 123 124 if (isServer) { 125 gServer = new NetNodeServer(hostName,port, this, gClientEndpoint, gServerEndpoint, serverHWM); 126 joined(gServerEndpoint); 127 128 av::Msg av_msg; 129 getStateFragment(gServerEndpoint,av_msg); 130 setStateFragment(gClientEndpoint,av_msg); 131 } else { 132 gClient = new NetNodeClient(hostName,port, this, gClientEndpoint, gServerEndpoint); 133 joined(gClientEndpoint); 134 gClient->start(); 135 } 136 //handleNetworkSends(); 137 //handleNetworkReceives(); 138 139 mIdCounter = 0; 140 141 #else 70 142 leave(); 71 143 … … 129 201 logger.debug() << "av::NetNode::join: completed."; 130 202 #endif 203 204 #endif // #ifdef ZMQ_DISTRIBUTION_SUPPORT 131 205 } 132 206 … … 148 222 { 149 223 assert(onAir()); 224 #ifdef ZMQ_DISTRIBUTION_SUPPORT 225 if(gServer) 226 return NetID(gServerEndpoint, ++mIdCounter); 227 else 228 return NetID(gClientEndpoint, ++mIdCounter); 229 #else 150 230 return NetID(netEID(), ++mIdCounter); 231 #endif 151 232 assert(mIdCounter > 0); // detect overflow 152 233 } … … 155 236 av::NetNode::onAir() const 156 237 { 238 #ifdef ZMQ_DISTRIBUTION_SUPPORT 239 return true; 240 #else 157 241 return (mMember != 0); 242 #endif 158 243 } 159 244 … … 170 255 { 171 256 // consume_received_messages(); 172 257 #ifdef ZMQ_DISTRIBUTION_SUPPORT 258 if (0 != gServer) { 259 notifyGroup(); 260 } 261 #else 173 262 if (!mBlocked) 174 263 notifyGroup(); 264 #endif 175 265 } 176 266 … … 178 268 av::NetNode::handleNetworkReceives() 179 269 { 270 #ifdef ZMQ_DISTRIBUTION_SUPPORT 271 if (gClient) { 272 consumeReceivedMessages(); 273 } 274 #else 180 275 while (mMember && mMember->upcallSerializer().handleNextUpcall(this)) 181 276 ; 277 #endif 182 278 } 183 279 … … 420 516 av_pushMsg(p_msg, sPackedMsg); 421 517 518 #ifdef ZMQ_DISTRIBUTION_SUPPORT 519 gServer->cast(p_msg); 520 #else 422 521 mMember->cast(p_msg); 522 #endif 423 523 } 424 524 … … 459 559 av_pushMsg(p_msg, sPackedMsg); 460 560 561 #ifdef ZMQ_DISTRIBUTION_SUPPORT 562 gServer->cast(p_msg); 563 #else 461 564 mMember->cast(p_msg); 565 #endif 462 566 } 463 567 … … 497 601 av_pushMsg(p_msg, sPackedMsg); 498 602 603 #ifdef ZMQ_DISTRIBUTION_SUPPORT 604 gServer->cast(p_msg); 605 #else 499 606 mMember->cast(p_msg); 607 #endif 500 608 } 501 609 … … 714 822 715 823 void 716 av::NetNode::receiveMessage(const std::string &/*origin*/, av::Msg& msg)824 av::NetNode::receiveMessage(const std::string &origin, av::Msg& msg) 717 825 { 718 826 // stack up the received messages in any case … … 720 828 // which received it. 721 829 boost::mutex::scoped_lock lock(mMessageMutex); 830 831 #ifdef ZMQ_DISTRIBUTION_SUPPORT 832 // we need to reset our old state fragment before we receive the new state fragment 833 if (!mObjectMap.slotExists(origin)) 834 { 835 logger.debug() << "ALARM: in av::NetNode::receiveMessage !!!!!!!!!!!!!!!! slot was empty for origin " << origin; 836 mObjectMap.addSlot(origin); 837 } 838 #endif 722 839 723 840 Msg av_msg(msg);
Note: See TracChangeset
for help on using the changeset viewer.
