diff --git a/monitord/plugins/libmplugin_activemq.cpp b/monitord/plugins/libmplugin_activemq.cpp index 6c7fbd1..cdaecdd 100644 --- a/monitord/plugins/libmplugin_activemq.cpp +++ b/monitord/plugins/libmplugin_activemq.cpp @@ -1,7 +1,7 @@ /** - * monitord-activemq - * http://github.com/schakko/monitord-activemq - * Contains *untested* prototyped code + * A monitord plug-in for native sending of POOCSAG/FMS/ZVEI messages from the monitord core to ActiveMQs queues or topics. + * + * http://github.com/schakko/monitord */ #include "libmplugin_activemq.h" @@ -10,8 +10,12 @@ using namespace activemq::core; using namespace decaf::lang::exceptions; using namespace cms; +using namespace decaf::net; -MonitorPlugInActiveMQ::MonitorPlugInActiveMQ() +/** + * Important: the m_connectionFactory() must be explicitly called. Using the auto_ptr inside the header file does not work anymore. + */ +MonitorPlugInActiveMQ::MonitorPlugInActiveMQ() : m_connectionFactory(new ActiveMQConnectionFactory()) { m_bUseCompression = false; m_bClientAck = false; @@ -50,14 +54,17 @@ } void MonitorPlugInActiveMQ::onException(const CMSException& ex AMQCPP_UNUSED) { - LOG_ERROR("Something really bad happend to ActiveMQ:" << ex.getMessage()); + LOG_ERROR("Exception caught from ActiveMQ: " << ex.getMessage()); freeTopics(); freeConnection(); } +/** + * Interface method; called by the monitord core on arrival of a new message + */ bool MonitorPlugInActiveMQ::processResult(class ModuleResultBase *pRes) { - LOG_DEBUG("apachemq: processing Result...") + LOG_INFO("apachemq: processing Result...") std::string type = (*pRes)["typ"]; @@ -71,20 +78,19 @@ LOG_INFO("Preparing Ping message") - // Verbindung wurde durch das Versenden des letzten Alarms getrennt und kann nicht wiederhergestellt werden + // connection has been lost after/during the send of the last received internal message. The connection could not be recovered if (!establishConnection()) { LOG_ERROR("Connection could not be established, discarding this message"); return false; } - // Nachricht - falls etwas schief geht, wird der Exception-Listener aufgerufen und löscht die Verbindungen + // send a test message to enmsure we have a working connection TextMessage* ping = m_session->createTextMessage(); ping->setStringProperty("ping", "pong"); topicInfo->producer->send(ping); delete ping; - // Ping wurde gesendet, es könnte aber eine Exception aufgetreten sein - // Also Verbindung noch mal sicherstellen + // after the ping has been sent, we ensure that a successful has been established if (!establishConnection()) { LOG_ERROR("Connection could not be established after failed Ping, discarding this message"); return false; @@ -100,7 +106,7 @@ message->setStringProperty(i->first, i->second); } - // Wenn das Senden der Nachricht jetzt fehlschlägt, wird beim nächsten Mal probiert, die Verbindung wieder zu öffnen + // if sending the messasge item fails, the connection will be recovered on the next processResult call topicInfo->producer->send(message); LOG_INFO("message sent to topic") @@ -110,8 +116,14 @@ return true; } +/** + * Read the .. section from the configuration XML file. + * All parameters are stored as member variables of the MonitorPlugInActiveMQ instance. + */ void MonitorPlugInActiveMQ::initializeConfiguration(XMLNode config) -{ +{ + LOG_INFO("Initializing ActiveMQ configuration") + m_brokerUri = getNodeText(config, ACTIVEMQ_XMLNODE_BROKERURI, "tcp://127.0.0.1:61616"); m_username = getNodeText(config, ACTIVEMQ_XMLNODE_USERNAME, ""); m_password = getNodeText(config, ACTIVEMQ_XMLNODE_PASSWORD, ""); @@ -121,11 +133,18 @@ m_producerWindowSize = getNodeInt(config, ACTIVEMQ_XMLNODE_PRODUCERWINDOWSIZE, 0); m_bUseCompression = getNodeBool(config, ACTIVEMQ_XMLNODE_USECOMPRESSION, false); m_bClientAck = getNodeBool(config, ACTIVEMQ_XMLNODE_CLIENTACK, false); + + LOG_INFO("ActiveMQ configuration initialized") } +/** + * Initialize the ActiveMQ connection factory. The connection factory creates new connections. + */ void MonitorPlugInActiveMQ::initializeConnectionFactory() { - // Set Broker-URI first - otherwise username and password is lost + LOG_INFO("Initiailizing ActiveMQ connection factory") + + // Set Broker-URI first - otherwise username and password configuration are lost m_connectionFactory->setBrokerURI(m_brokerUri); if (!m_username.empty()) { @@ -145,11 +164,16 @@ m_connectionFactory->setCloseTimeout(m_closeTimeout); m_connectionFactory->setProducerWindowSize(m_producerWindowSize); - LOG_DEBUG("Connection factory initialized") + LOG_INFO("Connection factory initialized") } +/** + * Initialize the connection and its session instance + * @return cms::Connection + */ bool MonitorPlugInActiveMQ::initializeConnection() { + LOG_INFO("Initializing new ActiveMQ connection") // create a connection try { @@ -166,17 +190,17 @@ m_bConnected = true; } catch (CMSException& e) { - LOG_ERROR("Could not connect to messaging queue \"" << m_brokerUri << "\" with username=\"" << m_username << "\"") + LOG_ERROR("Could not connect to message queue \"" << m_brokerUri << "\" with username=\"" << m_username << "\"") m_bConnected = false; } - LOG_DEBUG("Connection initialized") + LOG_INFO("Connection initialized") return m_bConnected; } void MonitorPlugInActiveMQ::freeConnection() { - // Close open resources. + // close open resources try { if (m_session != NULL) { m_session->close(); @@ -220,8 +244,11 @@ activemq::library::ActiveMQCPP::initializeLibrary(); // read default configuration for topics + LOG_INFO("Parsing generic topic configuration"); parseTopic(config, m_genericTopic, m_genericTopic); + // read inherited topic configuration + LOG_INFO("Parsing individual topic configuration"); parseTopics(config, m_topics, m_genericTopic); initializeConfiguration(config); @@ -239,15 +266,22 @@ return m_bConnected; } +/** + * Iterate over every topic and initialize the topic/queue for the ActiveMQ connection + */ void MonitorPlugInActiveMQ::initializeTopics(Topics &topics) { + LOG_INFO("Initializing topics") + if (m_bConnected == false) { - throw RuntimeException(__FILE__, __LINE__, "Tried to initialize topics without established ActiveMQ connection. Call initializeActiveMqConnection() first."); + throw RuntimeException(__FILE__, __LINE__, "Tried to initialize topics without an established ActiveMQ connection. Call initializeActiveMqConnection() first."); } Topics::iterator i; TopicInfo *pTopicInfo; + LOG_INFO("Number of topics to initialize: " << topics.size()) + // create new producers for (i = topics.begin(); i != topics.end(); i++) { @@ -316,6 +350,9 @@ return true; } +/** + * Read the ... tag instances and produce a new Topic instance for every tag + */ void MonitorPlugInActiveMQ::parseTopics(XMLNode config, Topics &topics, TopicInfo &referenceTopic) { XMLNode topicNode; @@ -328,21 +365,25 @@ channels.push_back(ACTIVEMQ_KEY_ZVEI); channels.push_back(ACTIVEMQ_KEY_FMS); + // an ActiveMQ topic/queue must be mapped to valid soundcard channel for (unsigned int i = 0, m = channels.size(); i < m; i++) { pTopicInfo = new TopicInfo; + LOG_INFO("Initializing topic for channel num #" << i) initializeTopic(*pTopicInfo, referenceTopic); topics.insert(PairMapping(channels.at(i), pTopicInfo)); } int nTopic = config.nChildNode(ACTIVEMQ_XMLNODE_TOPIC); - + LOG_INFO("Available topics for channel: " << nTopic) + for (int num = 0; num < nTopic ; ++num) { if (!((topicNode = config.getChildNode(ACTIVEMQ_XMLNODE_TOPIC, num))).isEmpty()) { std::string type = topicNode.getAttribute(ACTIVEMQ_XMLATTR_TYPE) ; if ((type == ACTIVEMQ_KEY_POCSAG) || (type == ACTIVEMQ_KEY_FMS) || (type == ACTIVEMQ_KEY_ZVEI)) { + LOG_INFO("Topic definition has valid type \"" << type << "\"") pTopicInfo = (topics.find(type))->second; parseTopic(topicNode, *pTopicInfo, referenceTopic); } @@ -353,6 +394,7 @@ void MonitorPlugInActiveMQ::parseTopic(XMLNode config, TopicInfo &topicInfo, TopicInfo &referenceTopic) { initializeTopic(topicInfo, referenceTopic); + LOG_INFO("Parsing topic definition") if (config.nChildNode(ACTIVEMQ_XMLNODE_USETOPIC) >= 1) { topicInfo.bUseTopic = getNodeBool(config, ACTIVEMQ_XMLNODE_USETOPIC, false);