diff --git a/monitord/plugins/libmplugin_activemq.cpp b/monitord/plugins/libmplugin_activemq.cpp
index 6c7fbd1..cdaecdd 100644
--- a/monitord/plugins/libmplugin_activemq.cpp
+++ b/monitord/plugins/libmplugin_activemq.cpp
@@ -1,7 +1,7 @@
/**
- * monitord-activemq
- * http://github.com/schakko/monitord-activemq
- * Contains *untested* prototyped code
+ * A monitord plug-in for native sending of POOCSAG/FMS/ZVEI messages from the monitord core to ActiveMQs queues or topics.
+ *
+ * http://github.com/schakko/monitord
*/
#include "libmplugin_activemq.h"
@@ -10,8 +10,12 @@
using namespace activemq::core;
using namespace decaf::lang::exceptions;
using namespace cms;
+using namespace decaf::net;
-MonitorPlugInActiveMQ::MonitorPlugInActiveMQ()
+/**
+ * Important: the m_connectionFactory() must be explicitly called. Using the auto_ptr inside the header file does not work anymore.
+ */
+MonitorPlugInActiveMQ::MonitorPlugInActiveMQ() : m_connectionFactory(new ActiveMQConnectionFactory())
{
m_bUseCompression = false;
m_bClientAck = false;
@@ -50,14 +54,17 @@
}
void MonitorPlugInActiveMQ::onException(const CMSException& ex AMQCPP_UNUSED) {
- LOG_ERROR("Something really bad happend to ActiveMQ:" << ex.getMessage());
+ LOG_ERROR("Exception caught from ActiveMQ: " << ex.getMessage());
freeTopics();
freeConnection();
}
+/**
+ * Interface method; called by the monitord core on arrival of a new message
+ */
bool MonitorPlugInActiveMQ::processResult(class ModuleResultBase *pRes)
{
- LOG_DEBUG("apachemq: processing Result...")
+ LOG_INFO("apachemq: processing Result...")
std::string type = (*pRes)["typ"];
@@ -71,20 +78,19 @@
LOG_INFO("Preparing Ping message")
- // Verbindung wurde durch das Versenden des letzten Alarms getrennt und kann nicht wiederhergestellt werden
+ // connection has been lost after/during the send of the last received internal message. The connection could not be recovered
if (!establishConnection()) {
LOG_ERROR("Connection could not be established, discarding this message");
return false;
}
- // Nachricht - falls etwas schief geht, wird der Exception-Listener aufgerufen und löscht die Verbindungen
+ // send a test message to enmsure we have a working connection
TextMessage* ping = m_session->createTextMessage();
ping->setStringProperty("ping", "pong");
topicInfo->producer->send(ping);
delete ping;
- // Ping wurde gesendet, es könnte aber eine Exception aufgetreten sein
- // Also Verbindung noch mal sicherstellen
+ // after the ping has been sent, we ensure that a successful has been established
if (!establishConnection()) {
LOG_ERROR("Connection could not be established after failed Ping, discarding this message");
return false;
@@ -100,7 +106,7 @@
message->setStringProperty(i->first, i->second);
}
- // Wenn das Senden der Nachricht jetzt fehlschlägt, wird beim nächsten Mal probiert, die Verbindung wieder zu öffnen
+ // if sending the messasge item fails, the connection will be recovered on the next processResult call
topicInfo->producer->send(message);
LOG_INFO("message sent to topic")
@@ -110,8 +116,14 @@
return true;
}
+/**
+ * Read the .. section from the configuration XML file.
+ * All parameters are stored as member variables of the MonitorPlugInActiveMQ instance.
+ */
void MonitorPlugInActiveMQ::initializeConfiguration(XMLNode config)
-{
+{
+ LOG_INFO("Initializing ActiveMQ configuration")
+
m_brokerUri = getNodeText(config, ACTIVEMQ_XMLNODE_BROKERURI, "tcp://127.0.0.1:61616");
m_username = getNodeText(config, ACTIVEMQ_XMLNODE_USERNAME, "");
m_password = getNodeText(config, ACTIVEMQ_XMLNODE_PASSWORD, "");
@@ -121,11 +133,18 @@
m_producerWindowSize = getNodeInt(config, ACTIVEMQ_XMLNODE_PRODUCERWINDOWSIZE, 0);
m_bUseCompression = getNodeBool(config, ACTIVEMQ_XMLNODE_USECOMPRESSION, false);
m_bClientAck = getNodeBool(config, ACTIVEMQ_XMLNODE_CLIENTACK, false);
+
+ LOG_INFO("ActiveMQ configuration initialized")
}
+/**
+ * Initialize the ActiveMQ connection factory. The connection factory creates new connections.
+ */
void MonitorPlugInActiveMQ::initializeConnectionFactory()
{
- // Set Broker-URI first - otherwise username and password is lost
+ LOG_INFO("Initiailizing ActiveMQ connection factory")
+
+ // Set Broker-URI first - otherwise username and password configuration are lost
m_connectionFactory->setBrokerURI(m_brokerUri);
if (!m_username.empty()) {
@@ -145,11 +164,16 @@
m_connectionFactory->setCloseTimeout(m_closeTimeout);
m_connectionFactory->setProducerWindowSize(m_producerWindowSize);
- LOG_DEBUG("Connection factory initialized")
+ LOG_INFO("Connection factory initialized")
}
+/**
+ * Initialize the connection and its session instance
+ * @return cms::Connection
+ */
bool MonitorPlugInActiveMQ::initializeConnection()
{
+ LOG_INFO("Initializing new ActiveMQ connection")
// create a connection
try {
@@ -166,17 +190,17 @@
m_bConnected = true;
}
catch (CMSException& e) {
- LOG_ERROR("Could not connect to messaging queue \"" << m_brokerUri << "\" with username=\"" << m_username << "\"")
+ LOG_ERROR("Could not connect to message queue \"" << m_brokerUri << "\" with username=\"" << m_username << "\"")
m_bConnected = false;
}
- LOG_DEBUG("Connection initialized")
+ LOG_INFO("Connection initialized")
return m_bConnected;
}
void MonitorPlugInActiveMQ::freeConnection() {
- // Close open resources.
+ // close open resources
try {
if (m_session != NULL) {
m_session->close();
@@ -220,8 +244,11 @@
activemq::library::ActiveMQCPP::initializeLibrary();
// read default configuration for topics
+ LOG_INFO("Parsing generic topic configuration");
parseTopic(config, m_genericTopic, m_genericTopic);
+
// read inherited topic configuration
+ LOG_INFO("Parsing individual topic configuration");
parseTopics(config, m_topics, m_genericTopic);
initializeConfiguration(config);
@@ -239,15 +266,22 @@
return m_bConnected;
}
+/**
+ * Iterate over every topic and initialize the topic/queue for the ActiveMQ connection
+ */
void MonitorPlugInActiveMQ::initializeTopics(Topics &topics)
{
+ LOG_INFO("Initializing topics")
+
if (m_bConnected == false) {
- throw RuntimeException(__FILE__, __LINE__, "Tried to initialize topics without established ActiveMQ connection. Call initializeActiveMqConnection() first.");
+ throw RuntimeException(__FILE__, __LINE__, "Tried to initialize topics without an established ActiveMQ connection. Call initializeActiveMqConnection() first.");
}
Topics::iterator i;
TopicInfo *pTopicInfo;
+ LOG_INFO("Number of topics to initialize: " << topics.size())
+
// create new producers
for (i = topics.begin(); i != topics.end(); i++)
{
@@ -316,6 +350,9 @@
return true;
}
+/**
+ * Read the ... tag instances and produce a new Topic instance for every tag
+ */
void MonitorPlugInActiveMQ::parseTopics(XMLNode config, Topics &topics, TopicInfo &referenceTopic)
{
XMLNode topicNode;
@@ -328,21 +365,25 @@
channels.push_back(ACTIVEMQ_KEY_ZVEI);
channels.push_back(ACTIVEMQ_KEY_FMS);
+ // an ActiveMQ topic/queue must be mapped to valid soundcard channel
for (unsigned int i = 0, m = channels.size(); i < m; i++)
{
pTopicInfo = new TopicInfo;
+ LOG_INFO("Initializing topic for channel num #" << i)
initializeTopic(*pTopicInfo, referenceTopic);
topics.insert(PairMapping(channels.at(i), pTopicInfo));
}
int nTopic = config.nChildNode(ACTIVEMQ_XMLNODE_TOPIC);
-
+ LOG_INFO("Available topics for channel: " << nTopic)
+
for (int num = 0; num < nTopic ; ++num)
{
if (!((topicNode = config.getChildNode(ACTIVEMQ_XMLNODE_TOPIC, num))).isEmpty()) {
std::string type = topicNode.getAttribute(ACTIVEMQ_XMLATTR_TYPE) ;
if ((type == ACTIVEMQ_KEY_POCSAG) || (type == ACTIVEMQ_KEY_FMS) || (type == ACTIVEMQ_KEY_ZVEI)) {
+ LOG_INFO("Topic definition has valid type \"" << type << "\"")
pTopicInfo = (topics.find(type))->second;
parseTopic(topicNode, *pTopicInfo, referenceTopic);
}
@@ -353,6 +394,7 @@
void MonitorPlugInActiveMQ::parseTopic(XMLNode config, TopicInfo &topicInfo, TopicInfo &referenceTopic)
{
initializeTopic(topicInfo, referenceTopic);
+ LOG_INFO("Parsing topic definition")
if (config.nChildNode(ACTIVEMQ_XMLNODE_USETOPIC) >= 1) {
topicInfo.bUseTopic = getNodeBool(config, ACTIVEMQ_XMLNODE_USETOPIC, false);