| | /** |
---|
| | * monitord-activemq |
---|
| | * http://github.com/schakko/monitord-activemq |
---|
| | * Contains *untested* prototyped code |
---|
| | * A monitord plug-in for native sending of POOCSAG/FMS/ZVEI messages from the monitord core to ActiveMQs queues or topics. |
---|
| | * |
---|
| | * http://github.com/schakko/monitord |
---|
| | */ |
---|
| | #include "libmplugin_activemq.h" |
---|
| | |
---|
| | using namespace std; |
---|
| | using namespace activemq; |
---|
| | using namespace activemq::core; |
---|
| | using namespace decaf::lang::exceptions; |
---|
| | using namespace cms; |
---|
| | |
---|
| | MonitorPlugInActiveMQ::MonitorPlugInActiveMQ() |
---|
| | using namespace decaf::net; |
---|
| | |
---|
| | /** |
---|
| | * Important: the m_connectionFactory() must be explicitly called. Using the auto_ptr inside the header file does not work anymore. |
---|
| | */ |
---|
| | MonitorPlugInActiveMQ::MonitorPlugInActiveMQ() : m_connectionFactory(new ActiveMQConnectionFactory()) |
---|
| | { |
---|
| | m_bUseCompression = false; |
---|
| | m_bClientAck = false; |
---|
| | m_bConnected = false; |
---|
| |
---|
| | LOG_INFO("MonitorActiveMQPlugin successfully loaded" ) |
---|
| | } |
---|
| | |
---|
| | void MonitorPlugInActiveMQ::onException(const CMSException& ex AMQCPP_UNUSED) { |
---|
| | LOG_ERROR("Something really bad happend to ActiveMQ:" << ex.getMessage()); |
---|
| | LOG_ERROR("Exception caught from ActiveMQ: " << ex.getMessage()); |
---|
| | freeTopics(); |
---|
| | freeConnection(); |
---|
| | } |
---|
| | |
---|
| | /** |
---|
| | * Interface method; called by the monitord core on arrival of a new message |
---|
| | */ |
---|
| | bool MonitorPlugInActiveMQ::processResult(class ModuleResultBase *pRes) |
---|
| | { |
---|
| | LOG_DEBUG("apachemq: processing Result...") |
---|
| | LOG_INFO("apachemq: processing Result...") |
---|
| | |
---|
| | |
---|
| | std::string type = (*pRes)["typ"]; |
---|
| | |
---|
| |
---|
| | TopicInfo* topicInfo = (m_topics.find(type))->second; |
---|
| | |
---|
| | LOG_INFO("Preparing Ping message") |
---|
| | |
---|
| | // Verbindung wurde durch das Versenden des letzten Alarms getrennt und kann nicht wiederhergestellt werden |
---|
| | // connection has been lost after/during the send of the last received internal message. The connection could not be recovered |
---|
| | if (!establishConnection()) { |
---|
| | LOG_ERROR("Connection could not be established, discarding this message"); |
---|
| | return false; |
---|
| | } |
---|
| | |
---|
| | // Nachricht - falls etwas schief geht, wird der Exception-Listener aufgerufen und löscht die Verbindungen |
---|
| | // send a test message to enmsure we have a working connection |
---|
| | TextMessage* ping = m_session->createTextMessage(); |
---|
| | ping->setStringProperty("ping", "pong"); |
---|
| | topicInfo->producer->send(ping); |
---|
| | delete ping; |
---|
| | |
---|
| | // Ping wurde gesendet, es könnte aber eine Exception aufgetreten sein |
---|
| | // Also Verbindung noch mal sicherstellen |
---|
| | // after the ping has been sent, we ensure that a successful has been established |
---|
| | if (!establishConnection()) { |
---|
| | LOG_ERROR("Connection could not be established after failed Ping, discarding this message"); |
---|
| | return false; |
---|
| | } |
---|
| |
---|
| | for (i = (*pRes).m_Items.begin(); i != (*pRes).m_Items.end(); i++) { |
---|
| | message->setStringProperty(i->first, i->second); |
---|
| | } |
---|
| | |
---|
| | // Wenn das Senden der Nachricht jetzt fehlschlägt, wird beim nächsten Mal probiert, die Verbindung wieder zu öffnen |
---|
| | // if sending the messasge item fails, the connection will be recovered on the next processResult call |
---|
| | topicInfo->producer->send(message); |
---|
| | |
---|
| | LOG_INFO("message sent to topic") |
---|
| | |
---|
| |
---|
| | |
---|
| | return true; |
---|
| | } |
---|
| | |
---|
| | /** |
---|
| | * Read the <plugin name="activem">..</plugin> section from the configuration XML file. |
---|
| | * All parameters are stored as member variables of the MonitorPlugInActiveMQ instance. |
---|
| | */ |
---|
| | void MonitorPlugInActiveMQ::initializeConfiguration(XMLNode config) |
---|
| | { |
---|
| | { |
---|
| | LOG_INFO("Initializing ActiveMQ configuration") |
---|
| | |
---|
| | m_brokerUri = getNodeText(config, ACTIVEMQ_XMLNODE_BROKERURI, "tcp://127.0.0.1:61616"); |
---|
| | m_username = getNodeText(config, ACTIVEMQ_XMLNODE_USERNAME, ""); |
---|
| | m_password = getNodeText(config, ACTIVEMQ_XMLNODE_PASSWORD, ""); |
---|
| | m_clientId = getNodeText(config, ACTIVEMQ_XMLNODE_CLIENTID, ""); |
---|
| |
---|
| | m_closeTimeout = getNodeInt(config, ACTIVEMQ_XMLNODE_CLOSETIMEOUT, 0); |
---|
| | m_producerWindowSize = getNodeInt(config, ACTIVEMQ_XMLNODE_PRODUCERWINDOWSIZE, 0); |
---|
| | m_bUseCompression = getNodeBool(config, ACTIVEMQ_XMLNODE_USECOMPRESSION, false); |
---|
| | m_bClientAck = getNodeBool(config, ACTIVEMQ_XMLNODE_CLIENTACK, false); |
---|
| | } |
---|
| | |
---|
| | |
---|
| | LOG_INFO("ActiveMQ configuration initialized") |
---|
| | } |
---|
| | |
---|
| | /** |
---|
| | * Initialize the ActiveMQ connection factory. The connection factory creates new connections. |
---|
| | */ |
---|
| | void MonitorPlugInActiveMQ::initializeConnectionFactory() |
---|
| | { |
---|
| | // Set Broker-URI first - otherwise username and password is lost |
---|
| | LOG_INFO("Initiailizing ActiveMQ connection factory") |
---|
| | |
---|
| | // Set Broker-URI first - otherwise username and password configuration are lost |
---|
| | m_connectionFactory->setBrokerURI(m_brokerUri); |
---|
| | |
---|
| | if (!m_username.empty()) { |
---|
| | m_connectionFactory->setUsername(m_username); |
---|
| |
---|
| | m_connectionFactory->setSendTimeout(m_sendTimeout); |
---|
| | m_connectionFactory->setCloseTimeout(m_closeTimeout); |
---|
| | m_connectionFactory->setProducerWindowSize(m_producerWindowSize); |
---|
| | |
---|
| | LOG_DEBUG("Connection factory initialized") |
---|
| | } |
---|
| | |
---|
| | LOG_INFO("Connection factory initialized") |
---|
| | } |
---|
| | |
---|
| | /** |
---|
| | * Initialize the connection and its session instance |
---|
| | * @return cms::Connection |
---|
| | */ |
---|
| | bool MonitorPlugInActiveMQ::initializeConnection() |
---|
| | { |
---|
| | LOG_INFO("Initializing new ActiveMQ connection") |
---|
| | |
---|
| | // create a connection |
---|
| | try { |
---|
| | m_connection = m_connectionFactory->createConnection(); |
---|
| |
---|
| | |
---|
| | m_bConnected = true; |
---|
| | } |
---|
| | catch (CMSException& e) { |
---|
| | LOG_ERROR("Could not connect to messaging queue \"" << m_brokerUri << "\" with username=\"" << m_username << "\"") |
---|
| | LOG_ERROR("Could not connect to message queue \"" << m_brokerUri << "\" with username=\"" << m_username << "\"") |
---|
| | m_bConnected = false; |
---|
| | } |
---|
| | |
---|
| | LOG_DEBUG("Connection initialized") |
---|
| | LOG_INFO("Connection initialized") |
---|
| | |
---|
| | return m_bConnected; |
---|
| | } |
---|
| | |
---|
| | void MonitorPlugInActiveMQ::freeConnection() { |
---|
| | // Close open resources. |
---|
| | // close open resources |
---|
| | try { |
---|
| | if (m_session != NULL) { |
---|
| | m_session->close(); |
---|
| | } |
---|
| |
---|
| | // initialize ActiveMQ |
---|
| | activemq::library::ActiveMQCPP::initializeLibrary(); |
---|
| | |
---|
| | // read default configuration for topics |
---|
| | LOG_INFO("Parsing generic topic configuration"); |
---|
| | parseTopic(config, m_genericTopic, m_genericTopic); |
---|
| | |
---|
| | // read inherited topic configuration |
---|
| | LOG_INFO("Parsing individual topic configuration"); |
---|
| | parseTopics(config, m_topics, m_genericTopic); |
---|
| | |
---|
| | initializeConfiguration(config); |
---|
| | initializeConnectionFactory(); |
---|
| |
---|
| | |
---|
| | return m_bConnected; |
---|
| | } |
---|
| | |
---|
| | /** |
---|
| | * Iterate over every topic and initialize the topic/queue for the ActiveMQ connection |
---|
| | */ |
---|
| | void MonitorPlugInActiveMQ::initializeTopics(Topics &topics) |
---|
| | { |
---|
| | LOG_INFO("Initializing topics") |
---|
| | |
---|
| | if (m_bConnected == false) { |
---|
| | throw RuntimeException(__FILE__, __LINE__, "Tried to initialize topics without established ActiveMQ connection. Call initializeActiveMqConnection() first."); |
---|
| | throw RuntimeException(__FILE__, __LINE__, "Tried to initialize topics without an established ActiveMQ connection. Call initializeActiveMqConnection() first."); |
---|
| | } |
---|
| | |
---|
| | Topics::iterator i; |
---|
| | TopicInfo *pTopicInfo; |
---|
| | |
---|
| | LOG_INFO("Number of topics to initialize: " << topics.size()) |
---|
| | |
---|
| | // create new producers |
---|
| | for (i = topics.begin(); i != topics.end(); i++) |
---|
| | { |
---|
| |
---|
| | { |
---|
| | return true; |
---|
| | } |
---|
| | |
---|
| | /** |
---|
| | * Read the <topic type="...">...</topic> tag instances and produce a new Topic instance for every tag |
---|
| | */ |
---|
| | void MonitorPlugInActiveMQ::parseTopics(XMLNode config, Topics &topics, TopicInfo &referenceTopic) |
---|
| | { |
---|
| | XMLNode topicNode; |
---|
| | Topics::iterator i; |
---|
| |
---|
| | channels.push_back(ACTIVEMQ_KEY_POCSAG); |
---|
| | channels.push_back(ACTIVEMQ_KEY_ZVEI); |
---|
| | channels.push_back(ACTIVEMQ_KEY_FMS); |
---|
| | |
---|
| | // an ActiveMQ topic/queue must be mapped to valid soundcard channel |
---|
| | for (unsigned int i = 0, m = channels.size(); i < m; i++) |
---|
| | { |
---|
| | pTopicInfo = new TopicInfo; |
---|
| | LOG_INFO("Initializing topic for channel num #" << i) |
---|
| | initializeTopic(*pTopicInfo, referenceTopic); |
---|
| | topics.insert(PairMapping(channels.at(i), pTopicInfo)); |
---|
| | } |
---|
| | |
---|
| | int nTopic = config.nChildNode(ACTIVEMQ_XMLNODE_TOPIC); |
---|
| | |
---|
| | LOG_INFO("Available topics for channel: " << nTopic) |
---|
| | |
---|
| | for (int num = 0; num < nTopic ; ++num) |
---|
| | { |
---|
| | if (!((topicNode = config.getChildNode(ACTIVEMQ_XMLNODE_TOPIC, num))).isEmpty()) { |
---|
| | std::string type = topicNode.getAttribute(ACTIVEMQ_XMLATTR_TYPE) ; |
---|
| | |
---|
| | if ((type == ACTIVEMQ_KEY_POCSAG) || (type == ACTIVEMQ_KEY_FMS) || (type == ACTIVEMQ_KEY_ZVEI)) { |
---|
| | LOG_INFO("Topic definition has valid type \"" << type << "\"") |
---|
| | pTopicInfo = (topics.find(type))->second; |
---|
| | parseTopic(topicNode, *pTopicInfo, referenceTopic); |
---|
| | } |
---|
| | } |
---|
| |
---|
| | |
---|
| | void MonitorPlugInActiveMQ::parseTopic(XMLNode config, TopicInfo &topicInfo, TopicInfo &referenceTopic) |
---|
| | { |
---|
| | initializeTopic(topicInfo, referenceTopic); |
---|
| | LOG_INFO("Parsing topic definition") |
---|
| | |
---|
| | if (config.nChildNode(ACTIVEMQ_XMLNODE_USETOPIC) >= 1) { |
---|
| | topicInfo.bUseTopic = getNodeBool(config, ACTIVEMQ_XMLNODE_USETOPIC, false); |
---|
| | } |
---|
| |
---|
| | |