/** * A monitord plug-in for native sending of POOCSAG/FMS/ZVEI messages from the monitord core to ActiveMQs queues or topics. * * http://github.com/schakko/monitord */ #include "libmplugin_activemq.h" using namespace std; using namespace activemq; using namespace activemq::core; using namespace decaf::lang::exceptions; using namespace cms; using namespace decaf::net; /** * Important: the m_connectionFactory() must be explicitly called. Using the auto_ptr inside the header file does not work anymore. */ MonitorPlugInActiveMQ::MonitorPlugInActiveMQ() : m_connectionFactory(new ActiveMQConnectionFactory()) { m_bUseCompression = false; m_bClientAck = false; m_bConnected = false; m_brokerUri = "tcp://127.0.0.1:61616"; m_username = ""; m_password = ""; m_clientId = ""; m_destUri = ""; m_sendTimeout = 0; m_closeTimeout = 0; m_producerWindowSize = 0; m_genericTopic.bUseTopic = false; m_genericTopic.bDeliveryModePersistent = false; m_genericTopic.destUri = ""; m_bDeliveryModePersistent = false; m_bTopicsInitialized = false; m_bConnected = false; m_session = NULL; m_connection = NULL; activemq::library::ActiveMQCPP::initializeLibrary(); } MonitorPlugInActiveMQ::~MonitorPlugInActiveMQ() { freeTopics(); freeConnection(); } void MonitorPlugInActiveMQ::Show() { LOG_INFO("MonitorActiveMQPlugin successfully loaded" ) } void MonitorPlugInActiveMQ::onException(const CMSException& ex AMQCPP_UNUSED) { LOG_ERROR("Exception caught from ActiveMQ: " << ex.getMessage()); freeTopics(); freeConnection(); } /** * Interface method; called by the monitord core on arrival of a new message */ bool MonitorPlugInActiveMQ::processResult(class ModuleResultBase *pRes) { LOG_INFO("apachemq: processing Result...") std::string type = (*pRes)["typ"]; if (m_topics.find(type) == m_topics.end()) { LOG_ERROR("apachemq: received type " << type << " which is not registered") return false; } TopicInfo* topicInfo = (m_topics.find(type))->second; LOG_INFO("Preparing Ping message") // connection has been lost after/during the send of the last received internal message. The connection could not be recovered if (!establishConnection()) { LOG_ERROR("Connection could not be established, discarding this message"); return false; } // send a test message to enmsure we have a working connection TextMessage* ping = m_session->createTextMessage(); ping->setStringProperty("ping", "pong"); topicInfo->producer->send(ping); delete ping; // after the ping has been sent, we ensure that a successful has been established if (!establishConnection()) { LOG_ERROR("Connection could not be established after failed Ping, discarding this message"); return false; } LOG_INFO("Preparing final alarm message") TextMessage* message = m_session->createTextMessage(); updateTextMessage(*message, *pRes); // if sending the messasge item fails, the connection will be recovered on the next processResult call topicInfo->producer->send(message); LOG_INFO("message sent to topic") delete message; return true; } void MonitorPlugInActiveMQ::updateTextMessage(cms::TextMessage& textMessage, class ModuleResultBase& pRes) { ResultItemsMap::iterator i; for (i = pRes.m_Items.begin(); i != pRes.m_Items.end(); i++) { // ZABOS-150: Binary data (encrypted POCSAG messages) must be converted to Base64 so ActiveMQ can decode the data if (i->first == "text") { textMessage.setStringProperty( i->first, base64_encode( reinterpret_cast<const unsigned char*>(i->second.c_str()), i->second.length() ) ); } else { textMessage.setStringProperty(i->first, i->second); } } } /** * Read the <plugin name="activem">..</plugin> section from the configuration XML file. * All parameters are stored as member variables of the MonitorPlugInActiveMQ instance. */ void MonitorPlugInActiveMQ::initializeConfiguration(XMLNode config) { LOG_INFO("Initializing ActiveMQ configuration") m_brokerUri = getNodeText(config, ACTIVEMQ_XMLNODE_BROKERURI, "tcp://127.0.0.1:61616"); m_username = getNodeText(config, ACTIVEMQ_XMLNODE_USERNAME, ""); m_password = getNodeText(config, ACTIVEMQ_XMLNODE_PASSWORD, ""); m_clientId = getNodeText(config, ACTIVEMQ_XMLNODE_CLIENTID, ""); m_sendTimeout = getNodeInt(config, ACTIVEMQ_XMLNODE_SENDTIMEOUT, 0); m_closeTimeout = getNodeInt(config, ACTIVEMQ_XMLNODE_CLOSETIMEOUT, 0); m_producerWindowSize = getNodeInt(config, ACTIVEMQ_XMLNODE_PRODUCERWINDOWSIZE, 0); m_bUseCompression = getNodeBool(config, ACTIVEMQ_XMLNODE_USECOMPRESSION, false); m_bClientAck = getNodeBool(config, ACTIVEMQ_XMLNODE_CLIENTACK, false); LOG_INFO("ActiveMQ configuration initialized") } /** * Initialize the ActiveMQ connection factory. The connection factory creates new connections. */ void MonitorPlugInActiveMQ::initializeConnectionFactory() { LOG_INFO("Initiailizing ActiveMQ connection factory") // Set Broker-URI first - otherwise username and password configuration are lost m_connectionFactory->setBrokerURI(m_brokerUri); if (!m_username.empty()) { m_connectionFactory->setUsername(m_username); } if (!m_password.empty()) { m_connectionFactory->setPassword(m_password); } if (!m_clientId.empty()) { m_connectionFactory->setClientId(m_clientId); } m_connectionFactory->setUseCompression(m_bUseCompression); m_connectionFactory->setSendTimeout(m_sendTimeout); m_connectionFactory->setCloseTimeout(m_closeTimeout); m_connectionFactory->setProducerWindowSize(m_producerWindowSize); LOG_INFO("Connection factory initialized") } /** * Initialize the connection and its session instance * @return cms::Connection */ bool MonitorPlugInActiveMQ::initializeConnection() { LOG_INFO("Initializing new ActiveMQ connection") // create a connection try { m_connection = dynamic_cast<activemq::core::ActiveMQConnection*>(m_connectionFactory->createConnection()); LOG_INFO("Connection prepared") m_connection->start(); LOG_INFO("Connection started") if (m_bClientAck) { LOG_DEBUG("Setting: client acknowledge required") m_session = dynamic_cast<activemq::core::ActiveMQSession*>(m_connection->createSession(Session::CLIENT_ACKNOWLEDGE)); } else { LOG_DEBUG("Setting: auto acknowledge enabled") m_session = dynamic_cast<activemq::core::ActiveMQSession*>(m_connection->createSession(Session::AUTO_ACKNOWLEDGE)); } m_bConnected = true; } catch (CMSException& e) { LOG_ERROR("Could not connect to message queue \"" << m_brokerUri << "\" with username=\"" << m_username << "\"") m_bConnected = false; } LOG_INFO("Connection initialized") return m_bConnected; } void MonitorPlugInActiveMQ::freeConnection() { LOG_INFO("Freeing connection") // close open resources try { if (m_session != NULL) { LOG_DEBUG("Closing session..") m_session->close(); } if (m_connection != NULL) { LOG_DEBUG("Closing connection...") m_connection->close(); } } catch (CMSException& e) { e.printStackTrace(); } try { if (m_session != NULL) { delete m_session; } } catch (CMSException& e) { e.printStackTrace(); } m_session = NULL; try { if (m_connection != NULL) { delete m_connection; } } catch (CMSException& e) { e.printStackTrace(); } m_bConnected = false; m_connection = NULL; } bool MonitorPlugInActiveMQ::initProcessing(class MonitorConfiguration* configPtr, XMLNode config) { // initialize ActiveMQ activemq::library::ActiveMQCPP::initializeLibrary(); // read default configuration for topics LOG_INFO("Parsing generic topic configuration"); parseTopic(config, m_genericTopic, m_genericTopic); // read inherited topic configuration LOG_INFO("Parsing individual topic configuration"); parseTopics(config, m_topics, m_genericTopic); initializeConfiguration(config); initializeConnectionFactory(); bool r = establishConnection(); return r; } bool MonitorPlugInActiveMQ::establishConnection() { // if some connection issue has occured, try to reconnect to the broker try { if (m_connection != NULL) { LOG_DEBUG("Checking for closed connection") m_connection->checkClosed(); } if (m_session != NULL && (false == m_session->isStarted())) { throw CMSException("Connection open but session not started"); } } catch (CMSException& ex) { LOG_ERROR("Connection is broke:" << ex.getMessage()) LOG_INFO("Freeing resources for reconnecting...") freeConnection(); } if (m_bConnected == false) { LOG_DEBUG("m_bConnected is false: \"" << m_bConnected << "\"") if (initializeConnection()) { initializeTopics(m_topics); } } return m_bConnected; } /** * Iterate over every topic and initialize the topic/queue for the ActiveMQ connection */ void MonitorPlugInActiveMQ::initializeTopics(Topics &topics) { LOG_INFO("Initializing topics") if (m_bConnected == false) { throw RuntimeException(__FILE__, __LINE__, "Tried to initialize topics without an established ActiveMQ connection. Call initializeActiveMqConnection() first."); } Topics::iterator i; TopicInfo *pTopicInfo; LOG_INFO("Number of topics to initialize: " << topics.size()) // create new producers for (i = topics.begin(); i != topics.end(); i++) { pTopicInfo = i->second; if (pTopicInfo->bUseTopic) { pTopicInfo->destination = m_session->createTopic(pTopicInfo->destUri); } else { pTopicInfo->destination = m_session->createQueue(pTopicInfo->destUri); } pTopicInfo->producer = m_session->createProducer(pTopicInfo->destination); if (pTopicInfo->bDeliveryModePersistent) { pTopicInfo->producer->setDeliveryMode(DeliveryMode::PERSISTENT); } else { pTopicInfo->producer->setDeliveryMode(DeliveryMode::NON_PERSISTENT); } LOG_INFO("Topic destination \"" << pTopicInfo->destination << "\" created") } LOG_INFO("Topics initialized") m_bTopicsInitialized = true; } void MonitorPlugInActiveMQ::freeTopics() { Topics::iterator i ; TopicInfo *pTopicInfo; // Destroy resources. for (i = m_topics.begin(); i != m_topics.end(); i++) { pTopicInfo = i->second; try { if (pTopicInfo->destination != NULL) { delete pTopicInfo->destination; } } catch (CMSException& e) { e.printStackTrace(); } pTopicInfo->destination = NULL; try { if (pTopicInfo->producer != NULL) { delete pTopicInfo->producer; } } catch (CMSException& e) { e.printStackTrace(); } pTopicInfo->producer = NULL; } } bool MonitorPlugInActiveMQ::quitProcessing() { return true; } /** * Read the <topic type="...">...</topic> tag instances and produce a new Topic instance for every tag */ void MonitorPlugInActiveMQ::parseTopics(XMLNode config, Topics &topics, TopicInfo &referenceTopic) { XMLNode topicNode; Topics::iterator i; TopicInfo *pTopicInfo; // defaults vector<string> channels; channels.push_back(ACTIVEMQ_KEY_POCSAG); channels.push_back(ACTIVEMQ_KEY_ZVEI); channels.push_back(ACTIVEMQ_KEY_FMS); // an ActiveMQ topic/queue must be mapped to valid soundcard channel for (unsigned int i = 0, m = channels.size(); i < m; i++) { pTopicInfo = new TopicInfo; LOG_INFO("Initializing topic for channel num #" << i) initializeTopic(*pTopicInfo, referenceTopic); topics.insert(PairMapping(channels.at(i), pTopicInfo)); } int nTopic = config.nChildNode(ACTIVEMQ_XMLNODE_TOPIC); LOG_INFO("Available topics for channel: " << nTopic) for (int num = 0; num < nTopic ; ++num) { if (!((topicNode = config.getChildNode(ACTIVEMQ_XMLNODE_TOPIC, num))).isEmpty()) { std::string type = topicNode.getAttribute(ACTIVEMQ_XMLATTR_TYPE) ; if ((type == ACTIVEMQ_KEY_POCSAG) || (type == ACTIVEMQ_KEY_FMS) || (type == ACTIVEMQ_KEY_ZVEI)) { LOG_INFO("Topic definition has valid type \"" << type << "\"") pTopicInfo = (topics.find(type))->second; parseTopic(topicNode, *pTopicInfo, referenceTopic); } } } } void MonitorPlugInActiveMQ::parseTopic(XMLNode config, TopicInfo &topicInfo, TopicInfo &referenceTopic) { initializeTopic(topicInfo, referenceTopic); LOG_INFO("Parsing topic definition") if (config.nChildNode(ACTIVEMQ_XMLNODE_USETOPIC) >= 1) { topicInfo.bUseTopic = getNodeBool(config, ACTIVEMQ_XMLNODE_USETOPIC, false); } if (config.nChildNode(ACTIVEMQ_XMLNODE_DELIVERYMODEPERSISTENT) >= 1) { topicInfo.bDeliveryModePersistent = getNodeBool(config, ACTIVEMQ_XMLNODE_DELIVERYMODEPERSISTENT, false); } if (config.nChildNode(ACTIVEMQ_XMLNODE_DESTURI) >= 1) { topicInfo.destUri = getNodeText(config, ACTIVEMQ_XMLNODE_DESTURI, "monitord"); } } void MonitorPlugInActiveMQ::initializeTopic(TopicInfo &topicInfo, TopicInfo &referenceTopic) { topicInfo.bUseTopic = referenceTopic.bUseTopic; topicInfo.bDeliveryModePersistent = referenceTopic.bDeliveryModePersistent; topicInfo.destUri = referenceTopic.destUri; topicInfo.destination = NULL; topicInfo.producer = NULL; } MonitorPlugInActiveMQFactory::MonitorPlugInActiveMQFactory() { } MonitorPlugInActiveMQFactory::~MonitorPlugInActiveMQFactory() { } MonitorPlugIn * MonitorPlugInActiveMQFactory::CreatePlugIn() { return new MonitorPlugInActiveMQ(); } DLL_EXPORT void * factory0( void ) { return new MonitorPlugInActiveMQFactory; }