diff --git a/monitord/plugins/libmplugin_activemq.cpp b/monitord/plugins/libmplugin_activemq.cpp index 8e05e15..6c7fbd1 100644 --- a/monitord/plugins/libmplugin_activemq.cpp +++ b/monitord/plugins/libmplugin_activemq.cpp @@ -40,37 +40,142 @@ MonitorPlugInActiveMQ::~MonitorPlugInActiveMQ() { - Topics::iterator i ; - TopicInfo *pTopicInfo; + freeTopics(); + freeConnection(); +} + +void MonitorPlugInActiveMQ::Show() +{ + LOG_INFO("MonitorActiveMQPlugin successfully loaded" ) +} + +void MonitorPlugInActiveMQ::onException(const CMSException& ex AMQCPP_UNUSED) { + LOG_ERROR("Something really bad happend to ActiveMQ:" << ex.getMessage()); + freeTopics(); + freeConnection(); +} + +bool MonitorPlugInActiveMQ::processResult(class ModuleResultBase *pRes) +{ + LOG_DEBUG("apachemq: processing Result...") - // Destroy resources. - for (i = m_topics.begin(); i != m_topics.end(); i++) - { - pTopicInfo = i->second; + + std::string type = (*pRes)["typ"]; - try { - if (pTopicInfo->destination != NULL) { - delete pTopicInfo->destination; - } - } - catch (CMSException& e) { - e.printStackTrace(); - } - - pTopicInfo->destination = NULL; + if (m_topics.find(type) == m_topics.end()) { + LOG_ERROR("apachemq: received type " << type << " which is not registered") + return false; + } + + TopicInfo* topicInfo = (m_topics.find(type))->second; - try { - if (pTopicInfo->producer != NULL) { - delete pTopicInfo->producer; - } - } - catch (CMSException& e) { - e.printStackTrace(); - } - - pTopicInfo->producer = NULL; + LOG_INFO("Preparing Ping message") + + // Verbindung wurde durch das Versenden des letzten Alarms getrennt und kann nicht wiederhergestellt werden + if (!establishConnection()) { + LOG_ERROR("Connection could not be established, discarding this message"); + return false; } + // Nachricht - falls etwas schief geht, wird der Exception-Listener aufgerufen und löscht die Verbindungen + TextMessage* ping = m_session->createTextMessage(); + ping->setStringProperty("ping", "pong"); + topicInfo->producer->send(ping); + delete ping; + + // Ping wurde gesendet, es könnte aber eine Exception aufgetreten sein + // Also Verbindung noch mal sicherstellen + if (!establishConnection()) { + LOG_ERROR("Connection could not be established after failed Ping, discarding this message"); + return false; + } + + + LOG_INFO("Preparing final alarm message") + TextMessage* message = m_session->createTextMessage(); + + ResultItemsMap::iterator i; + + for (i = (*pRes).m_Items.begin(); i != (*pRes).m_Items.end(); i++) { + message->setStringProperty(i->first, i->second); + } + + // Wenn das Senden der Nachricht jetzt fehlschlägt, wird beim nächsten Mal probiert, die Verbindung wieder zu öffnen + topicInfo->producer->send(message); + + LOG_INFO("message sent to topic") + + delete message; + + return true; +} + +void MonitorPlugInActiveMQ::initializeConfiguration(XMLNode config) +{ + m_brokerUri = getNodeText(config, ACTIVEMQ_XMLNODE_BROKERURI, "tcp://127.0.0.1:61616"); + m_username = getNodeText(config, ACTIVEMQ_XMLNODE_USERNAME, ""); + m_password = getNodeText(config, ACTIVEMQ_XMLNODE_PASSWORD, ""); + m_clientId = getNodeText(config, ACTIVEMQ_XMLNODE_CLIENTID, ""); + m_sendTimeout = getNodeInt(config, ACTIVEMQ_XMLNODE_SENDTIMEOUT, 0); + m_closeTimeout = getNodeInt(config, ACTIVEMQ_XMLNODE_CLOSETIMEOUT, 0); + m_producerWindowSize = getNodeInt(config, ACTIVEMQ_XMLNODE_PRODUCERWINDOWSIZE, 0); + m_bUseCompression = getNodeBool(config, ACTIVEMQ_XMLNODE_USECOMPRESSION, false); + m_bClientAck = getNodeBool(config, ACTIVEMQ_XMLNODE_CLIENTACK, false); +} + +void MonitorPlugInActiveMQ::initializeConnectionFactory() +{ + // Set Broker-URI first - otherwise username and password is lost + m_connectionFactory->setBrokerURI(m_brokerUri); + + if (!m_username.empty()) { + m_connectionFactory->setUsername(m_username); + } + + if (!m_password.empty()) { + m_connectionFactory->setPassword(m_password); + } + + if (!m_clientId.empty()) { + m_connectionFactory->setClientId(m_clientId); + } + + m_connectionFactory->setUseCompression(m_bUseCompression); + m_connectionFactory->setSendTimeout(m_sendTimeout); + m_connectionFactory->setCloseTimeout(m_closeTimeout); + m_connectionFactory->setProducerWindowSize(m_producerWindowSize); + + LOG_DEBUG("Connection factory initialized") +} + +bool MonitorPlugInActiveMQ::initializeConnection() +{ + + // create a connection + try { + m_connection = m_connectionFactory->createConnection(); + m_connection->start(); + + if (m_bClientAck) { + m_session = m_connection->createSession(Session::CLIENT_ACKNOWLEDGE); + } + else { + m_session = m_connection->createSession(Session::AUTO_ACKNOWLEDGE); + } + + m_bConnected = true; + } + catch (CMSException& e) { + LOG_ERROR("Could not connect to messaging queue \"" << m_brokerUri << "\" with username=\"" << m_username << "\"") + m_bConnected = false; + } + + LOG_DEBUG("Connection initialized") + + return m_bConnected; +} + +void MonitorPlugInActiveMQ::freeConnection() { // Close open resources. try { if (m_session != NULL) { @@ -105,145 +210,10 @@ e.printStackTrace(); } + m_bConnected = false; m_connection = NULL; } -void MonitorPlugInActiveMQ::Show() -{ - LOG_INFO("MonitorActiveMQPlugin successfully loaded" ) -} - -bool MonitorPlugInActiveMQ::processResult(class ModuleResultBase *pRes) -{ - LOG_DEBUG("apachemq: processing Result...") - - if (m_bConnected == false) { - LOG_ERROR("apachmq: ignoring message 'cause no active connection") - return false; - } - - if (m_bTopicsInitialized == false) { - throw RuntimeException(__FILE__, __LINE__, "processResult must be called AFTER initializeTopics()"); - } - - - std::string type = (*pRes)["typ"]; - - if (m_topics.find(type) == m_topics.end()) { - LOG_ERROR("apachemq: received type " << type << " which is not registered") - return false; - } - - LOG_INFO("Preparing new message") - - TopicInfo* topicInfo = (m_topics.find(type))->second; - TextMessage* message = m_session->createTextMessage(); - - ResultItemsMap::iterator i; - - for (i = (*pRes).m_Items.begin(); i != (*pRes).m_Items.end(); i++) { - message->setStringProperty(i->first, i->second); - } - - - topicInfo->producer->send(message); - - LOG_INFO("message sent") - - delete message; - - return true; -} - -void MonitorPlugInActiveMQ::initializeConfiguration(XMLNode config) -{ - m_brokerUri = getNodeText(config, ACTIVEMQ_XMLNODE_BROKERURI, "tcp://127.0.0.1:61616"); - m_username = getNodeText(config, ACTIVEMQ_XMLNODE_USERNAME, ""); - m_password = getNodeText(config, ACTIVEMQ_XMLNODE_PASSWORD, ""); - m_clientId = getNodeText(config, ACTIVEMQ_XMLNODE_CLIENTID, ""); - m_sendTimeout = getNodeInt(config, ACTIVEMQ_XMLNODE_SENDTIMEOUT, 0); - m_closeTimeout = getNodeInt(config, ACTIVEMQ_XMLNODE_CLOSETIMEOUT, 0); - m_producerWindowSize = getNodeInt(config, ACTIVEMQ_XMLNODE_PRODUCERWINDOWSIZE, 0); - m_bUseCompression = getNodeBool(config, ACTIVEMQ_XMLNODE_USECOMPRESSION, false); - m_bClientAck = getNodeBool(config, ACTIVEMQ_XMLNODE_CLIENTACK, false); - std::string logFile = getNodeText(config, ACTIVEMQ_XMLNODE_LOGFILE, "screen"); - std::string logLevel = getNodeText(config, ACTIVEMQ_XMLNODE_LOGLEVEL, "INFO"); - - #ifdef WIN32 - if (!(logFile == "screen")) { - FILE* pFile = fopen(logFile.c_str(), "a"); - Output2FILE::Stream() = pFile; - } - - FILELog::ReportingLevel() = FILELog::FromString(logLevel); - LOG_INFO("logging started") - #endif -} - -Topics MonitorPlugInActiveMQ::getTopics() -{ - return m_topics; -} - -void MonitorPlugInActiveMQ::setTopics(Topics& topics) -{ - m_topics = topics; -} - -void MonitorPlugInActiveMQ::initializeConnectionFactory(ActiveMQConnectionFactory *connectionFactory) -{ - // Set Broker-URI first - otherwise username and password is lost - connectionFactory->setBrokerURI(m_brokerUri); - - if (!m_username.empty()) { - connectionFactory->setUsername(m_username); - } - - if (!m_password.empty()) { - connectionFactory->setPassword(m_password); - } - - if (!m_clientId.empty()) { - connectionFactory->setClientId(m_clientId); - } - - connectionFactory->setUseCompression(m_bUseCompression); - connectionFactory->setSendTimeout(m_sendTimeout); - connectionFactory->setCloseTimeout(m_closeTimeout); - connectionFactory->setProducerWindowSize(m_producerWindowSize); - - LOG_DEBUG("Connection factory initialized") -} - -bool MonitorPlugInActiveMQ::initializeActiveMqConnection() -{ - auto_ptr connectionFactory(new ActiveMQConnectionFactory()); - initializeConnectionFactory(connectionFactory.get()); - - // create a connection - try { - m_connection = connectionFactory->createConnection(); - m_connection->start(); - - if (m_bClientAck) { - m_session = m_connection->createSession(Session::CLIENT_ACKNOWLEDGE); - } - else { - m_session = m_connection->createSession(Session::AUTO_ACKNOWLEDGE); - } - - m_bConnected = true; - } - catch (CMSException& e) { - LOG_ERROR("Could not connect to messaging queue \"" << m_brokerUri << "\" with username=\"" << m_username << "\"") - m_bConnected = false; - } - - LOG_DEBUG("Connection initialized") - - return m_bConnected; -} - bool MonitorPlugInActiveMQ::initProcessing(class MonitorConfiguration* configPtr, XMLNode config) { // initialize ActiveMQ @@ -251,11 +221,18 @@ // read default configuration for topics parseTopic(config, m_genericTopic, m_genericTopic); + // read inherited topic configuration + parseTopics(config, m_topics, m_genericTopic); initializeConfiguration(config); + initializeConnectionFactory(); + bool r = establishConnection(); - if (initializeActiveMqConnection()) { - parseTopics(config, m_topics, m_genericTopic); + return r; +} + +bool MonitorPlugInActiveMQ::establishConnection() { + if (initializeConnection()) { initializeTopics(m_topics); } @@ -300,6 +277,40 @@ m_bTopicsInitialized = true; } +void MonitorPlugInActiveMQ::freeTopics() { + Topics::iterator i ; + TopicInfo *pTopicInfo; + + // Destroy resources. + for (i = m_topics.begin(); i != m_topics.end(); i++) + { + pTopicInfo = i->second; + + try { + if (pTopicInfo->destination != NULL) { + delete pTopicInfo->destination; + } + } + catch (CMSException& e) { + e.printStackTrace(); + } + + pTopicInfo->destination = NULL; + + try { + if (pTopicInfo->producer != NULL) { + delete pTopicInfo->producer; + } + } + catch (CMSException& e) { + e.printStackTrace(); + } + + pTopicInfo->producer = NULL; + } +} + + bool MonitorPlugInActiveMQ::quitProcessing() { return true; @@ -339,15 +350,6 @@ } } -void MonitorPlugInActiveMQ::initializeTopic(TopicInfo &topicInfo, TopicInfo &referenceTopic) -{ - topicInfo.bUseTopic = referenceTopic.bUseTopic; - topicInfo.bDeliveryModePersistent = referenceTopic.bDeliveryModePersistent; - topicInfo.destUri = referenceTopic.destUri; - topicInfo.destination = NULL; - topicInfo.producer = NULL; -} - void MonitorPlugInActiveMQ::parseTopic(XMLNode config, TopicInfo &topicInfo, TopicInfo &referenceTopic) { initializeTopic(topicInfo, referenceTopic); @@ -365,6 +367,16 @@ } } +void MonitorPlugInActiveMQ::initializeTopic(TopicInfo &topicInfo, TopicInfo &referenceTopic) +{ + topicInfo.bUseTopic = referenceTopic.bUseTopic; + topicInfo.bDeliveryModePersistent = referenceTopic.bDeliveryModePersistent; + topicInfo.destUri = referenceTopic.destUri; + topicInfo.destination = NULL; + topicInfo.producer = NULL; +} + + MonitorPlugInActiveMQFactory::MonitorPlugInActiveMQFactory() { } diff --git a/monitord/plugins/libmplugin_activemq.h b/monitord/plugins/libmplugin_activemq.h index 7410fa1..cdde046 100644 --- a/monitord/plugins/libmplugin_activemq.h +++ b/monitord/plugins/libmplugin_activemq.h @@ -56,7 +56,7 @@ typedef std::map Topics; typedef std::pair PairMapping; -class MonitorPlugInActiveMQ : public MonitorPlugIn +class MonitorPlugInActiveMQ : public MonitorPlugIn, public cms::ExceptionListener { public: MonitorPlugInActiveMQ(); @@ -75,6 +75,7 @@ bool m_bConnected; bool m_bTopicsInitialized; + std::auto_ptr m_connectionFactory; cms::Connection* m_connection; cms::Session* m_session; @@ -86,17 +87,24 @@ bool quitProcessing(); void Show(); - Topics getTopics(); - void setTopics(Topics &topics); void shutdownActiveMQCPPLibrary(); void initializeActiveMQCPPLibrary(); void initializeConfiguration(XMLNode config); - void initializeConnectionFactory(activemq::core::ActiveMQConnectionFactory *connectionFactory); - bool initializeActiveMqConnection(); + void initializeConnectionFactory(); + + bool establishConnection(); + bool initializeConnection(); + void freeConnection(); + void initializeTopics(Topics &topics); + void freeTopics(); + void parseTopics(XMLNode config, Topics &topics, TopicInfo &referenceTopic); void parseTopic(XMLNode config, TopicInfo &topicInfo, TopicInfo &referenceTopic); void initializeTopic(TopicInfo &topicInfo, TopicInfo &referenceTopic); + + /** Exception-Listener */ + virtual void onException(const cms::CMSException& ex AMQCPP_UNUSED); }; class MonitorPlugInActiveMQFactory : public MonitorPlugInFactory diff --git a/monitord/plugins/libmplugin_activemq.la b/monitord/plugins/libmplugin_activemq.la deleted file mode 100644 index ab5cfe3..0000000 --- a/monitord/plugins/libmplugin_activemq.la +++ /dev/null @@ -1,41 +0,0 @@ -# libmplugin_activemq.la - a libtool library file -# Generated by ltmain.sh (GNU libtool) 2.2.6b Debian-2.2.6b-2ubuntu1 -# -# Please DO NOT delete this file! -# It is necessary for linking the library. - -# The name that we can dlopen(3). -dlname='libmplugin_activemq.so.0' - -# Names of this library. -library_names='libmplugin_activemq.so.0.0.0 libmplugin_activemq.so.0 libmplugin_activemq.so' - -# The name of the static archive. -old_library='' - -# Linker flags that can not go in dependency_libs. -inherited_linker_flags='' - -# Libraries that this one depends upon. -dependency_libs='' - -# Names of additional weak libraries provided by this library -weak_library_names='' - -# Version information for libmplugin_activemq. -current=0 -age=0 -revision=0 - -# Is this an already installed library? -installed=no - -# Should we warn about portability when linking against -modules? -shouldnotlink=no - -# Files to dlopen/dlpreopen -dlopen='' -dlpreopen='' - -# Directory that this library needs to be installed in: -libdir='/usr/local/lib/monitord'