Changeset 605 for trunk/avango-core


Ignore:
Timestamp:
01/10/12 17:36:46 (4 months ago)
Author:
acbernst
Message:

added ZMQ_DISTRIBUTION_SUPPORT

File:
1 edited

Legend:

Unmodified
Added
Removed
  • trunk/avango-core/src/avango/network/NetNode.cpp

    r542 r605  
    3838#include "NetGroup.h" 
    3939 
     40#include <avango/Config.h>  
     41 
     42#if defined(ZMQ_DISTRIBUTION_SUPPORT) 
     43#include "NetNodeClient.h" 
     44#include "NetNodeServer.h" 
     45#include <boost/tokenizer.hpp> 
     46#endif 
     47 
    4048namespace 
    4149{ 
    4250  av::Logger &logger(av::getLogger("av::NetNode")); 
     51 
     52#if defined(ZMQ_DISTRIBUTION_SUPPORT) 
     53  av::NetNodeClient* gClient = 0; 
     54  av::NetNodeServer* gServer = 0; 
     55  const std::string  gClientEndpoint("{Endpt:127.0.0.2:34818:215:0}"); 
     56  const std::string  gServerEndpoint("{Endpt:127.0.0.2:34818:215:0}"); 
     57#endif 
    4358} 
    4459 
     
    6883#endif 
    6984 
     85#ifdef ZMQ_DISTRIBUTION_SUPPORT 
     86  bool isServer; 
     87  std::string hostName; 
     88  std::string port; 
     89  uint64_t serverHWM = 2; 
     90  { 
     91    // from http://stackoverflow.com/questions/541561/using-boost-tokenizer-escaped-list-separator-with-different-parameters 
     92    std::string gn(groupName); 
     93     
     94    typedef boost::tokenizer< boost::escaped_list_separator<char> > Tokenizer; 
     95    boost::escaped_list_separator<char> Separator( ' ', '|' ); 
     96    Tokenizer tok( gn, Separator ); 
     97    unsigned numTokens = 0; 
     98    for (Tokenizer::iterator iter = tok.begin(); iter != tok.end(); ++iter) { 
     99      ++numTokens; 
     100    } 
     101    if (3 == numTokens) { 
     102      Tokenizer::iterator iter = tok.begin(); 
     103      isServer = ("AVSERVER" == *iter); 
     104      ++iter; 
     105      hostName = *iter; 
     106      ++iter; 
     107      port = *iter; 
     108    } else if (4 == numTokens) { 
     109      Tokenizer::iterator iter = tok.begin(); 
     110      isServer = ("AVSERVER" == *iter); 
     111      ++iter; 
     112      hostName = *iter; 
     113      ++iter; 
     114      port = *iter; 
     115      serverHWM = atoi((*iter).c_str()); 
     116    } else { 
     117      std::stringstream msg; 
     118      msg << "ERROR in av::NetNode::join(const std::string& groupName), could not join - invalid CONFIG"; 
     119      throw (std::runtime_error(msg.str())); 
     120    } 
     121 
     122  } 
     123   
     124  if (isServer) { 
     125    gServer = new NetNodeServer(hostName,port, this, gClientEndpoint, gServerEndpoint, serverHWM); 
     126    joined(gServerEndpoint); 
     127 
     128    av::Msg av_msg; 
     129    getStateFragment(gServerEndpoint,av_msg); 
     130    setStateFragment(gClientEndpoint,av_msg); 
     131  } else { 
     132    gClient = new NetNodeClient(hostName,port, this, gClientEndpoint, gServerEndpoint); 
     133    joined(gClientEndpoint); 
     134    gClient->start(); 
     135  } 
     136  //handleNetworkSends(); 
     137  //handleNetworkReceives(); 
     138 
     139  mIdCounter = 0; 
     140 
     141#else 
    70142  leave(); 
    71143 
     
    129201  logger.debug() << "av::NetNode::join: completed."; 
    130202#endif 
     203 
     204#endif // #ifdef ZMQ_DISTRIBUTION_SUPPORT 
    131205} 
    132206 
     
    148222{ 
    149223  assert(onAir()); 
     224#ifdef ZMQ_DISTRIBUTION_SUPPORT 
     225  if(gServer) 
     226     return NetID(gServerEndpoint, ++mIdCounter); 
     227  else 
     228     return NetID(gClientEndpoint, ++mIdCounter); 
     229#else 
    150230  return NetID(netEID(), ++mIdCounter); 
     231#endif 
    151232  assert(mIdCounter > 0); // detect overflow 
    152233} 
     
    155236av::NetNode::onAir() const 
    156237{ 
     238#ifdef ZMQ_DISTRIBUTION_SUPPORT 
     239  return true; 
     240#else 
    157241  return (mMember != 0); 
     242#endif 
    158243} 
    159244 
     
    170255{ 
    171256  // consume_received_messages(); 
    172  
     257#ifdef ZMQ_DISTRIBUTION_SUPPORT 
     258  if (0 != gServer) { 
     259    notifyGroup(); 
     260  } 
     261#else 
    173262  if (!mBlocked) 
    174263    notifyGroup(); 
     264#endif 
    175265} 
    176266 
     
    178268av::NetNode::handleNetworkReceives() 
    179269{ 
     270#ifdef ZMQ_DISTRIBUTION_SUPPORT 
     271  if (gClient) { 
     272    consumeReceivedMessages(); 
     273  } 
     274#else 
    180275  while (mMember && mMember->upcallSerializer().handleNextUpcall(this)) 
    181276    ; 
     277#endif 
    182278} 
    183279 
     
    420516  av_pushMsg(p_msg, sPackedMsg); 
    421517 
     518#ifdef ZMQ_DISTRIBUTION_SUPPORT 
     519  gServer->cast(p_msg); 
     520#else 
    422521  mMember->cast(p_msg); 
     522#endif 
    423523} 
    424524 
     
    459559  av_pushMsg(p_msg, sPackedMsg); 
    460560 
     561#ifdef ZMQ_DISTRIBUTION_SUPPORT 
     562  gServer->cast(p_msg); 
     563#else 
    461564  mMember->cast(p_msg); 
     565#endif 
    462566} 
    463567 
     
    497601  av_pushMsg(p_msg, sPackedMsg); 
    498602 
     603#ifdef ZMQ_DISTRIBUTION_SUPPORT 
     604  gServer->cast(p_msg); 
     605#else 
    499606  mMember->cast(p_msg); 
     607#endif 
    500608} 
    501609 
     
    714822 
    715823void 
    716   av::NetNode::receiveMessage(const std::string &/*origin*/, av::Msg& msg) 
     824av::NetNode::receiveMessage(const std::string &origin, av::Msg& msg) 
    717825{ 
    718826  // stack up the received messages in any case 
     
    720828  // which received it. 
    721829  boost::mutex::scoped_lock lock(mMessageMutex); 
     830 
     831#ifdef ZMQ_DISTRIBUTION_SUPPORT 
     832  // we need to reset our old state fragment before we receive the new state fragment 
     833  if (!mObjectMap.slotExists(origin)) 
     834  { 
     835    logger.debug() << "ALARM: in av::NetNode::receiveMessage !!!!!!!!!!!!!!!! slot was empty for origin " << origin; 
     836    mObjectMap.addSlot(origin); 
     837  } 
     838#endif 
    722839 
    723840  Msg av_msg(msg); 
Note: See TracChangeset for help on using the changeset viewer.