diff --git a/Makefile.am b/Makefile.am
index e6c82cb..63d3c0d 100644
--- a/Makefile.am
+++ b/Makefile.am
@@ -151,6 +151,33 @@
endif # mysql Support ?
# --------------------------------------------------------
+# Monitor-Plugin: ActiveMQ
+#
+if ACTIVEMQ
+pkglib_LTLIBRARIES+= \
+ monitord/plugins/libmplugin_activemq.la
+
+monitord_plugins_libmplugin_activemq_la_SOURCES=\
+ ${monitord_plugins_common} \
+ monitord/plugins/mplugin.cpp \
+ monitord/plugins/libmplugin_activemq.cpp
+
+monitord_plugins_libmplugin_activemq_la_CPPFLAGS =\
+ ${monitord_monitord_CPPFLAGS} \
+ ${ACTIVEMQ_CFLAGS} \
+ ${DLL}
+
+monitord_plugins_libmplugin_activemq_la_LDFLAGS=\
+ -no-undefined \
+ ${LIBACTIVEMQ} \
+ ${DLL}
+
+monitord_plugin_libmplugin_activemq_la_LIBS=\
+ ${ACTIVEMQ_LIBS}
+
+endif
+
+# --------------------------------------------------------
# Audio-Plugin: Recorder
#
monitord_plugins_libmplugin_audiorecorder_la_SOURCES=\
diff --git a/README.md b/README.md
new file mode 100644
index 0000000..e4248e3
--- /dev/null
+++ b/README.md
@@ -0,0 +1,52 @@
+# monitord-activemq
+
+## Dependency
+monitord-activemq depends on activemq-cpp.
+
+## Installation
+Just push back this directory into /monitord/trunk.
+
+## Compiling on CentOS 6.2
+
+ yum install autoconf automake pkg-config alsa-lib-devel lua-devel
+ aclocal -Im4
+ ./configure --enable-plugins --with-activemq
+
+## Configuration
+Edit your monitord.xml:
+
+
+
+ plugins/libmplugin_activemq.so
+
+ activemq.log
+ DEBUG
+ tcp://127.0.0.1:61616
+ your_username_or_empty
+ your_password_or_empty
+ your_clientid_or_empty
+ 5
+ 5
+ 5
+ 1
+ 0
+
+
+ 1
+ 0
+ zabos
+
+
+
+ zabos.fms
+
+
+
+
+ 0
+
+
+
+
+
+
diff --git a/configure.ac b/configure.ac
index d2f5d32..64db5e0 100644
--- a/configure.ac
+++ b/configure.ac
@@ -11,7 +11,6 @@
AX_LUA_HEADERS
AX_LUA_LIBS
-
case "$host_os" in
linux*)
LINUX=true
@@ -26,6 +25,8 @@
SOXLIBDLL=[libsox.so]
MYSQLLIBNAME=[mysqlclient]
MYSQLLIBDLL=[libmysqlclient.so]
+ ACTIVEMQLIBNAME=[activemq-cpp]
+ ACTIVEMQDLL=[libactivemq-cpp.so]
;;
mingw32*)
WINDOWS=true
@@ -40,6 +41,8 @@
VORBISENCLIBNAME=[vorbisenc-0]
MYSQLLIBNAME=[mysql]
MYSQLLIBDLL=[libmysql.dll]
+ ACTIVEMQNAME=]activemq-cpp]
+ ACTIVEMQDLL=[libactivemq-cpp.dll]
AC_LIBTOOL_WIN32_DLL
;;
esac
@@ -73,6 +76,11 @@
[enable experimental mysql support (default is no)])],
[],
[use_mysql=no])
+AC_ARG_WITH([activemq],
+ [AC_HELP_STRING([--with-activemq],
+ [enable experimental ActiveMQ support (default is no)])],
+ [use_als=$withval],
+ [use_activemq=no])
AC_ARG_WITH([lame],
[AC_HELP_STRING([--with-lame],
[enable experimental mp3 lame support (default is no)])],
@@ -107,6 +115,9 @@
if test "x$use_vorbis" != xno; then
AC_MSG_ERROR([[vorbis support is not supported without --enable-plugins]])
fi
+ if test "x$use_activemq" != xno; then
+ AC_MSG_ERROR([[ActiveMQ support is not supported without --enable-plugins]])
+ fi
else
if test "x$use_mysql" != xno; then
AC_CHECK_HEADER([mysql/mysql.h],
@@ -148,12 +159,36 @@
])
if test "x$use_sox" != xno; then
- AC_CHECK_HEADER([sox.h],
- ,
- [AC_MSG_ERROR([sox.h not found])])
+ AC_CHECK_HEADER([sox.h],
+ ,
+ [AC_MSG_ERROR([sox not found])])
fi
+
+ AS_IF([test "x$use_activemq" != xno],
+ [PKG_CHECK_MODULES(
+ [ACTIVEMQ],
+ [$ACTIVEMQLIBNAME],
+ [
+ AC_MSG_NOTICE([activemq is present - ActiveMQ plugin will be created])
+ AC_DEFINE([HAVE_ACTIVEMQ], [1], [Define if you have activemq-cpp])
+ activemq=true
+ ],
+ [AC_MSG_FAILURE([$ACTIVEMQLIBNAME is not installed as package])]
+ )])
+# [AC_CHECK_LIB([$ACTIVEMQLIBNAME],[main],
+# [
+# AC_SUBST([LIBACTIVEMQ], ["-l$ACTIVEMQLIBNAME"])
+# AC_DEFINE([HAVE_ACTIVEMQ], [1], [Define if you have activemq])
+# activemq=true
+# ],
+# [if test "x$use_activemq" != xcheck; then
+# AC_MSG_FAILURE([--with-activemq but $ACTIVEMQLIBNAME failed])
+# fi
+# ])
+# ])
- LIBSOX=
+
+ LIBS=
AS_IF([test "x$use_sox" != xno],
[AC_CHECK_LIB([$SOXLIBNAME],[sox_init],
[AC_SUBST([LIBSOX], ["-l$SOXLIBNAME"])
@@ -195,6 +230,7 @@
AM_CONDITIONAL(LAME, test x$lame = xtrue)
AM_CONDITIONAL(SOX, test x$sox = xtrue)
AM_CONDITIONAL(VORBIS, test x$vorbis = xtrue)
+AM_CONDITIONAL(ACTIVEMQ, test x$activemq = xtrue)
AC_SUBST(LUA_LIB)
AC_SUBST(LUA_INCLUDE)
diff --git a/monitord/plugins/libmplugin_activemq.cpp b/monitord/plugins/libmplugin_activemq.cpp
new file mode 100644
index 0000000..cacb55a
--- /dev/null
+++ b/monitord/plugins/libmplugin_activemq.cpp
@@ -0,0 +1,396 @@
+/**
+ * monitord-activemq
+ * http://github.com/schakko/monitord-activemq
+ * Contains *untested* prototyped code
+ */
+#include "libmplugin_activemq.h"
+
+using namespace std;
+using namespace activemq;
+using namespace activemq::core;
+using namespace decaf::lang::exceptions;
+using namespace cms;
+
+MonitorPlugInActiveMQ::MonitorPlugInActiveMQ()
+{
+ m_bUseCompression = false;
+ m_bClientAck = false;
+ m_bConnected = false;
+ m_brokerUri = "tcp://127.0.0.1:61616";
+ m_username = "";
+ m_password = "";
+ m_clientId = "";
+ m_destUri = "";
+ m_sendTimeout = 0;
+ m_closeTimeout = 0;
+ m_producerWindowSize = 0;
+
+ m_genericTopic.bUseTopic = false;
+ m_genericTopic.bDeliveryModePersistent = false;
+ m_genericTopic.destUri = "";
+
+ m_bDeliveryModePersistent = false;
+ m_bTopicsInitialized = false;
+
+ m_session = NULL;
+ m_connection = NULL;
+
+ activemq::library::ActiveMQCPP::initializeLibrary();
+}
+
+MonitorPlugInActiveMQ::~MonitorPlugInActiveMQ()
+{
+ Topics::iterator i ;
+ TopicInfo *pTopicInfo;
+
+ // Destroy resources.
+ for (i = m_topics.begin(); i != m_topics.end(); i++)
+ {
+ pTopicInfo = i->second;
+
+ try {
+ if (pTopicInfo->destination != NULL) {
+ delete pTopicInfo->destination;
+ }
+ }
+ catch (CMSException& e) {
+ e.printStackTrace();
+ }
+
+ pTopicInfo->destination = NULL;
+
+ try {
+ if (pTopicInfo->producer != NULL) {
+ delete pTopicInfo->producer;
+ }
+ }
+ catch (CMSException& e) {
+ e.printStackTrace();
+ }
+
+ pTopicInfo->producer = NULL;
+ }
+
+ // Close open resources.
+ try {
+ if (m_session != NULL) {
+ m_session->close();
+ }
+
+ if (m_connection != NULL) {
+ m_connection->close();
+ }
+ }
+ catch (CMSException& e) {
+ e.printStackTrace();
+ }
+
+ try {
+ if (m_session != NULL) {
+ delete m_session;
+ }
+ }
+ catch (CMSException& e) {
+ e.printStackTrace();
+ }
+
+ m_session = NULL;
+
+ try {
+ if (m_connection != NULL) {
+ delete m_connection;
+ }
+ }
+ catch (CMSException& e) {
+ e.printStackTrace();
+ }
+
+ m_connection = NULL;
+}
+
+void MonitorPlugInActiveMQ::Show()
+{
+ FILE_LOG(logINFO) << "MonitorActiveMQPlugin successfully loaded" ;
+}
+
+bool MonitorPlugInActiveMQ::processResult(class ModuleResultBase *pRes)
+{
+ FILE_LOG(logDEBUG) << "apachemq: processing Result...";
+
+ if (m_bConnected == false) {
+ FILE_LOG(logERROR) << "apachmq: ignoring message 'cause no active connection";
+ return false;
+ }
+
+ if (m_bTopicsInitialized == false) {
+ throw RuntimeException(__FILE__, __LINE__, "processResult must be called AFTER initializeTopics()");
+ }
+
+
+ std::string type = (*pRes)["typ"];
+
+ if (m_topics.find(type) == m_topics.end()) {
+ FILE_LOG(logERROR) << "apachemq: received type " << type << " which is not registered";
+ return false;
+ }
+
+ FILE_LOG(logINFO) << "Preparing new message";
+
+ TopicInfo* topicInfo = (m_topics.find(type))->second;
+ TextMessage* message = m_session->createTextMessage();
+
+ ResultItemsMap::iterator i;
+
+ for (i = (*pRes).m_Items.begin(); i != (*pRes).m_Items.end(); i++) {
+ message->setStringProperty(i->first, i->second);
+ }
+
+
+ topicInfo->producer->send(message);
+
+ FILE_LOG(logINFO) << "message sent";
+
+ delete message;
+
+ return true;
+}
+
+void MonitorPlugInActiveMQ::initializeConfiguration(XMLNode config)
+{
+ FILE_LOG(logDEBUG) << "Reading broker URI";
+ m_brokerUri = getNodeText(config, ACTIVEMQ_XMLNODE_BROKERURI, "tcp://127.0.0.1:61616");
+ FILE_LOG(logDEBUG) << "Reading username:";
+ m_username = getNodeText(config, ACTIVEMQ_XMLNODE_USERNAME, "");
+ FILE_LOG(logDEBUG) << "Reading password";
+ m_password = getNodeText(config, ACTIVEMQ_XMLNODE_PASSWORD, "");
+ FILE_LOG(logDEBUG) << "Reading clientId";
+ m_clientId = getNodeText(config, ACTIVEMQ_XMLNODE_CLIENTID, "");
+ FILE_LOG(logDEBUG) << "Reading sendTimeout";
+ m_sendTimeout = getNodeInt(config, ACTIVEMQ_XMLNODE_SENDTIMEOUT, 0);
+ FILE_LOG(logDEBUG) << "Reading closeTimeout";
+ m_closeTimeout = getNodeInt(config, ACTIVEMQ_XMLNODE_CLOSETIMEOUT, 0);
+ FILE_LOG(logDEBUG) << "Reading producerWindowSize";
+ m_producerWindowSize = getNodeInt(config, ACTIVEMQ_XMLNODE_PRODUCERWINDOWSIZE, 0);
+ FILE_LOG(logDEBUG) << "Reading bUseCompression";
+ m_bUseCompression = getNodeBool(config, ACTIVEMQ_XMLNODE_USECOMPRESSION, false);
+ FILE_LOG(logDEBUG) << "Reading bClientAck";
+ m_bClientAck = getNodeBool(config, ACTIVEMQ_XMLNODE_CLIENTACK, false);
+ FILE_LOG(logDEBUG) << "Reading logFile";
+ std::string logFile = getNodeText(config, ACTIVEMQ_XMLNODE_LOGFILE, "screen");
+ FILE_LOG(logDEBUG) << "Reading logLevel";
+ std::string logLevel = getNodeText(config, ACTIVEMQ_XMLNODE_LOGLEVEL, "INFO");
+
+ #ifdef WIN32
+ if (!(logFile == "screen")) {
+ FILE* pFile = fopen(logFile.c_str(), "a");
+ Output2FILE::Stream() = pFile;
+ }
+
+ FILELog::ReportingLevel() = FILELog::FromString(logLevel);
+ FILE_LOG(logINFO) << "logging started";
+ #endif
+}
+
+Topics MonitorPlugInActiveMQ::getTopics()
+{
+ return m_topics;
+}
+
+void MonitorPlugInActiveMQ::setTopics(Topics& topics)
+{
+ m_topics = topics;
+}
+
+void MonitorPlugInActiveMQ::initializeConnectionFactory(ActiveMQConnectionFactory *connectionFactory)
+{
+ // Set Broker-URI first - otherwise username and password is lost
+ connectionFactory->setBrokerURI(m_brokerUri);
+
+ if (!m_username.empty()) {
+ connectionFactory->setUsername(m_username);
+ }
+
+ if (!m_password.empty()) {
+ connectionFactory->setPassword(m_password);
+ }
+
+ if (!m_clientId.empty()) {
+ connectionFactory->setClientId(m_clientId);
+ }
+
+ connectionFactory->setUseCompression(m_bUseCompression);
+ connectionFactory->setSendTimeout(m_sendTimeout);
+ connectionFactory->setCloseTimeout(m_closeTimeout);
+ connectionFactory->setProducerWindowSize(m_producerWindowSize);
+
+ FILE_LOG(logDEBUG) << "Connection factory initialized";
+}
+
+bool MonitorPlugInActiveMQ::initializeActiveMqConnection()
+{
+ auto_ptr connectionFactory(new ActiveMQConnectionFactory());
+ initializeConnectionFactory(connectionFactory.get());
+
+ // create a connection
+ try {
+ m_connection = connectionFactory->createConnection();
+ m_connection->start();
+
+ if (m_bClientAck) {
+ m_session = m_connection->createSession(Session::CLIENT_ACKNOWLEDGE);
+ }
+ else {
+ m_session = m_connection->createSession(Session::AUTO_ACKNOWLEDGE);
+ }
+
+ m_bConnected = true;
+ }
+ catch (CMSException& e) {
+ FILE_LOG(logERROR) << "Could not connect to messaging queue \"" << m_brokerUri << "\" with username=\"" << m_username << "\"";
+ m_bConnected = false;
+ }
+
+ FILE_LOG(logDEBUG) << "Connection initialized";
+
+ return m_bConnected;
+}
+
+bool MonitorPlugInActiveMQ::initProcessing(class MonitorConfiguration* configPtr, XMLNode config)
+{
+ // initialize ActiveMQ
+ activemq::library::ActiveMQCPP::initializeLibrary();
+
+ // read default configuration for topics
+ parseTopic(config, m_genericTopic, m_genericTopic);
+
+ initializeConfiguration(config);
+
+ if (initializeActiveMqConnection()) {
+ parseTopics(config, m_topics, m_genericTopic);
+ initializeTopics(m_topics);
+ }
+
+ return m_bConnected;
+}
+
+void MonitorPlugInActiveMQ::initializeTopics(Topics &topics)
+{
+ if (m_bConnected == false) {
+ throw RuntimeException(__FILE__, __LINE__, "Tried to initialize topics without established ActiveMQ connection. Call initializeActiveMqConnection() first.");
+ }
+
+ Topics::iterator i;
+ TopicInfo *pTopicInfo;
+
+ // create new producers
+ for (i = topics.begin(); i != topics.end(); i++)
+ {
+ pTopicInfo = i->second;
+
+ if (pTopicInfo->bUseTopic) {
+ pTopicInfo->destination = m_session->createTopic(pTopicInfo->destUri);
+ }
+ else {
+ pTopicInfo->destination = m_session->createQueue(pTopicInfo->destUri);
+ }
+
+ pTopicInfo->producer = m_session->createProducer(pTopicInfo->destination);
+
+ if (pTopicInfo->bDeliveryModePersistent) {
+ pTopicInfo->producer->setDeliveryMode(DeliveryMode::PERSISTENT);
+ }
+ else {
+ pTopicInfo->producer->setDeliveryMode(DeliveryMode::NON_PERSISTENT);
+ }
+
+ FILE_LOG(logINFO) << "Topic destination \"" << pTopicInfo->destination << "\" created";
+ }
+
+ FILE_LOG(logINFO) << "Topics initialized";
+
+ m_bTopicsInitialized = true;
+}
+
+bool MonitorPlugInActiveMQ::quitProcessing()
+{
+ return true;
+}
+
+void MonitorPlugInActiveMQ::parseTopics(XMLNode config, Topics &topics, TopicInfo &referenceTopic)
+{
+ XMLNode topicNode;
+ Topics::iterator i;
+ TopicInfo *pTopicInfo;
+
+ // defaults
+ vector channels;
+ channels.push_back(ACTIVEMQ_KEY_POCSAG);
+ channels.push_back(ACTIVEMQ_KEY_ZVEI);
+ channels.push_back(ACTIVEMQ_KEY_FMS);
+
+ for (unsigned int i = 0, m = channels.size(); i < m; i++)
+ {
+ pTopicInfo = new TopicInfo;
+ initializeTopic(*pTopicInfo, referenceTopic);
+ topics.insert(PairMapping(channels.at(i), pTopicInfo));
+ }
+
+ int nTopic = config.nChildNode(ACTIVEMQ_XMLNODE_TOPIC);
+
+ for (int num = 0; num < nTopic ; ++num)
+ {
+ if (!((topicNode = config.getChildNode(ACTIVEMQ_XMLNODE_TOPIC, num))).isEmpty()) {
+ std::string type = topicNode.getAttribute(ACTIVEMQ_XMLATTR_TYPE) ;
+
+ if ((type == ACTIVEMQ_KEY_POCSAG) || (type == ACTIVEMQ_KEY_FMS) || (type == ACTIVEMQ_KEY_ZVEI)) {
+ pTopicInfo = (topics.find(type))->second;
+ parseTopic(topicNode, *pTopicInfo, referenceTopic);
+ }
+ }
+ }
+}
+
+void MonitorPlugInActiveMQ::initializeTopic(TopicInfo &topicInfo, TopicInfo &referenceTopic)
+{
+ topicInfo.bUseTopic = referenceTopic.bUseTopic;
+ topicInfo.bDeliveryModePersistent = referenceTopic.bDeliveryModePersistent;
+ topicInfo.destUri = referenceTopic.destUri;
+ topicInfo.destination = NULL;
+ topicInfo.producer = NULL;
+}
+
+void MonitorPlugInActiveMQ::parseTopic(XMLNode config, TopicInfo &topicInfo, TopicInfo &referenceTopic)
+{
+ initializeTopic(topicInfo, referenceTopic);
+
+ if (config.nChildNode(ACTIVEMQ_XMLNODE_USETOPIC) >= 1) {
+ topicInfo.bUseTopic = getNodeBool(config, ACTIVEMQ_XMLNODE_USETOPIC, false);
+ }
+
+ if (config.nChildNode(ACTIVEMQ_XMLNODE_DELIVERYMODEPERSISTENT) >= 1) {
+ topicInfo.bDeliveryModePersistent = getNodeBool(config, ACTIVEMQ_XMLNODE_DELIVERYMODEPERSISTENT, false);
+ }
+
+ if (config.nChildNode(ACTIVEMQ_XMLNODE_DESTURI) >= 1) {
+ topicInfo.destUri = getNodeText(config, ACTIVEMQ_XMLNODE_DESTURI, "monitord");
+ }
+}
+
+MonitorPlugInActiveMQFactory::MonitorPlugInActiveMQFactory()
+{
+}
+
+MonitorPlugInActiveMQFactory::~MonitorPlugInActiveMQFactory()
+{
+}
+
+MonitorPlugIn * MonitorPlugInActiveMQFactory::CreatePlugIn()
+{
+ return new MonitorPlugInActiveMQ();
+}
+
+
+DLL_EXPORT void * factory0( void )
+{
+ return new MonitorPlugInActiveMQFactory;
+}
diff --git a/monitord/plugins/libmplugin_activemq.h b/monitord/plugins/libmplugin_activemq.h
new file mode 100644
index 0000000..7410fa1
--- /dev/null
+++ b/monitord/plugins/libmplugin_activemq.h
@@ -0,0 +1,113 @@
+#ifndef MPLUGINACTIVEMQ_H_
+#define MPLUGINACTIVEMQ_H_
+
+#ifdef PLUGINS
+#include
+#include
+
+#ifdef WIN32
+#define usleep Sleep
+#include
+#endif
+
+#include "mplugin.h"
+#include "../MonitorLogging.h"
+#include
+#include
+#include
+#include
+#include
+#include
+#include
+#include
+#include
+#include
+
+#define ACTIVEMQ_KEY_POCSAG "pocsag"
+#define ACTIVEMQ_KEY_FMS "fms"
+#define ACTIVEMQ_KEY_ZVEI "zvei"
+#define ACTIVEMQ_XMLNODE_BROKERURI "brokerUri"
+#define ACTIVEMQ_XMLNODE_USERNAME "username"
+#define ACTIVEMQ_XMLNODE_PASSWORD "password"
+#define ACTIVEMQ_XMLNODE_CLIENTID "clientId"
+#define ACTIVEMQ_XMLNODE_SENDTIMEOUT "sendTimeout"
+#define ACTIVEMQ_XMLNODE_CLOSETIMEOUT "closeTimeout"
+#define ACTIVEMQ_XMLNODE_PRODUCERWINDOWSIZE "producerWindowSize"
+#define ACTIVEMQ_XMLNODE_USECOMPRESSION "useCompression"
+#define ACTIVEMQ_XMLNODE_CLIENTACK "clientAck"
+#define ACTIVEMQ_XMLNODE_LOGFILE "logfile"
+#define ACTIVEMQ_XMLNODE_LOGLEVEL "loglevel"
+#define ACTIVEMQ_XMLNODE_TOPIC "topic"
+#define ACTIVEMQ_XMLATTR_TYPE "type"
+#define ACTIVEMQ_XMLNODE_USETOPIC "useTopic"
+#define ACTIVEMQ_XMLNODE_DELIVERYMODEPERSISTENT "deliveryModePersistent"
+#define ACTIVEMQ_XMLNODE_DESTURI "destUri"
+
+
+typedef struct
+{
+ bool bUseTopic;
+ bool bDeliveryModePersistent;
+ std::string destUri;
+ cms::Destination *destination;
+ cms::MessageProducer *producer;
+} TopicInfo;
+
+typedef std::map Topics;
+typedef std::pair PairMapping;
+
+class MonitorPlugInActiveMQ : public MonitorPlugIn
+{
+public:
+ MonitorPlugInActiveMQ();
+ ~MonitorPlugInActiveMQ();
+ std::string m_brokerUri;
+ std::string m_username;
+ std::string m_password;
+ std::string m_clientId;
+ std::string m_destUri;
+ unsigned int m_sendTimeout;
+ unsigned int m_closeTimeout;
+ unsigned int m_producerWindowSize;
+ bool m_bUseCompression;
+ bool m_bClientAck;
+ bool m_bDeliveryModePersistent;
+ bool m_bConnected;
+ bool m_bTopicsInitialized;
+
+ cms::Connection* m_connection;
+ cms::Session* m_session;
+
+ TopicInfo m_genericTopic;
+ Topics m_topics;
+
+ bool initProcessing(class MonitorConfiguration* configPtr,XMLNode config);
+ bool processResult(class ModuleResultBase *pRes);
+ bool quitProcessing();
+ void Show();
+
+ Topics getTopics();
+ void setTopics(Topics &topics);
+ void shutdownActiveMQCPPLibrary();
+ void initializeActiveMQCPPLibrary();
+ void initializeConfiguration(XMLNode config);
+ void initializeConnectionFactory(activemq::core::ActiveMQConnectionFactory *connectionFactory);
+ bool initializeActiveMqConnection();
+ void initializeTopics(Topics &topics);
+ void parseTopics(XMLNode config, Topics &topics, TopicInfo &referenceTopic);
+ void parseTopic(XMLNode config, TopicInfo &topicInfo, TopicInfo &referenceTopic);
+ void initializeTopic(TopicInfo &topicInfo, TopicInfo &referenceTopic);
+};
+
+class MonitorPlugInActiveMQFactory : public MonitorPlugInFactory
+{
+public:
+ MonitorPlugInActiveMQFactory();
+ ~MonitorPlugInActiveMQFactory();
+ virtual MonitorPlugIn * CreatePlugIn();
+};
+
+
+
+#endif
+#endif /*MPLUGINACTIVEMQ_H_*/
diff --git a/monitord/plugins/libmplugin_activemq.la b/monitord/plugins/libmplugin_activemq.la
new file mode 100644
index 0000000..ab5cfe3
--- /dev/null
+++ b/monitord/plugins/libmplugin_activemq.la
@@ -0,0 +1,41 @@
+# libmplugin_activemq.la - a libtool library file
+# Generated by ltmain.sh (GNU libtool) 2.2.6b Debian-2.2.6b-2ubuntu1
+#
+# Please DO NOT delete this file!
+# It is necessary for linking the library.
+
+# The name that we can dlopen(3).
+dlname='libmplugin_activemq.so.0'
+
+# Names of this library.
+library_names='libmplugin_activemq.so.0.0.0 libmplugin_activemq.so.0 libmplugin_activemq.so'
+
+# The name of the static archive.
+old_library=''
+
+# Linker flags that can not go in dependency_libs.
+inherited_linker_flags=''
+
+# Libraries that this one depends upon.
+dependency_libs=''
+
+# Names of additional weak libraries provided by this library
+weak_library_names=''
+
+# Version information for libmplugin_activemq.
+current=0
+age=0
+revision=0
+
+# Is this an already installed library?
+installed=no
+
+# Should we warn about portability when linking against -modules?
+shouldnotlink=no
+
+# Files to dlopen/dlpreopen
+dlopen=''
+dlpreopen=''
+
+# Directory that this library needs to be installed in:
+libdir='/usr/local/lib/monitord'
diff --git a/monitord/plugins/libmplugin_activemq/MonitorPluginActiveMqTestSuite.h b/monitord/plugins/libmplugin_activemq/MonitorPluginActiveMqTestSuite.h
new file mode 100644
index 0000000..de10d74
--- /dev/null
+++ b/monitord/plugins/libmplugin_activemq/MonitorPluginActiveMqTestSuite.h
@@ -0,0 +1,191 @@
+#define PLUGINS
+#include "../libmplugin_activemq.h"
+#include
+
+#define XMLSETTING(xmlString, xmlResults, xmlNode, CHARS) \
+ const char *xmlString = CHARS; \
+ XMLResults xmlResults;\
+ XMLNode xmlNode; \
+ xmlNode = XMLNode::parseString(xmlString, NULL, &xmlResults); \
+
+#define XML_PARAMS_DEFAULT "tcp://localhostnew_usernamenew_passwordnew_client_id22211"
+
+#define XML_PARAMS_TOPIC_DEFAULT "11activemq_test"
+
+using namespace std;
+using namespace activemq;
+using namespace activemq::core;
+using namespace cms;
+
+class MonitorPlugInActiveMqTestSuite : public CxxTest::TestSuite
+{
+public:
+ void testMainConfiguration() {
+ MonitorPlugInActiveMQ mock;
+ TS_ASSERT_EQUALS(mock.m_bConnected, false);
+ TS_ASSERT_EQUALS(mock.m_bUseCompression, false);
+ TS_ASSERT_EQUALS(mock.m_bClientAck, false);
+ TS_ASSERT_EQUALS(mock.m_bDeliveryModePersistent, false);
+
+ XMLSETTING(xmlString, xmlResults, xmlNode, XML_PARAMS_DEFAULT);
+ mock.initializeConfiguration(xmlNode);
+
+ TS_ASSERT_EQUALS(mock.m_brokerUri, "tcp://localhost");
+ TS_ASSERT_EQUALS(mock.m_username, "new_username");
+ TS_ASSERT_EQUALS(mock.m_password, "new_password");
+ TS_ASSERT_EQUALS(mock.m_clientId, "new_client_id");
+ TS_ASSERT_EQUALS(mock.m_sendTimeout, 2);
+ TS_ASSERT_EQUALS(mock.m_closeTimeout, 2);
+ TS_ASSERT_EQUALS(mock.m_producerWindowSize, 2);
+ TS_ASSERT_EQUALS(mock.m_bUseCompression, true);
+ TS_ASSERT_EQUALS(mock.m_bClientAck, true);
+ }
+
+ void testInitialzeConnectionFactory() {
+ MonitorPlugInActiveMQ mock;
+
+ auto_ptr connectionFactory(new ActiveMQConnectionFactory());
+
+ XMLSETTING(xmlString, xmlResults, xmlNode, XML_PARAMS_DEFAULT);
+ mock.initializeConfiguration(xmlNode);
+ ActiveMQConnectionFactory *pCF = connectionFactory.get();
+ mock.initializeConnectionFactory(pCF);
+
+ TS_ASSERT_EQUALS(pCF->getBrokerURI().toString(), "tcp://localhost");
+ TS_ASSERT_EQUALS(pCF->getClientId(), "new_client_id");
+ TS_ASSERT_EQUALS(pCF->getPassword(), "new_password");
+ TS_ASSERT_EQUALS(pCF->getUsername(), "new_username");
+ TS_ASSERT_EQUALS(pCF->isUseCompression(), true);
+ TS_ASSERT_EQUALS(pCF->getSendTimeout(), 2);
+ TS_ASSERT_EQUALS(pCF->getCloseTimeout(), 2);
+ TS_ASSERT_EQUALS(pCF->getProducerWindowSize(), 2);
+ }
+
+ void testParseTopic() {
+ MonitorPlugInActiveMQ mock;
+ XMLSETTING(xmlString, xmlResults, xmlNode, XML_PARAMS_TOPIC_DEFAULT);
+ TopicInfo topicInfo;
+ topicInfo.bUseTopic = false;
+ topicInfo.bDeliveryModePersistent = false;
+ topicInfo.destUri = "";
+
+ mock.parseTopic(xmlNode, topicInfo, topicInfo);
+
+ TS_ASSERT_EQUALS(topicInfo.bUseTopic, true);
+ TS_ASSERT_EQUALS(topicInfo.bDeliveryModePersistent, true);
+ TS_ASSERT_EQUALS(topicInfo.destUri, "activemq_test");
+
+
+ XMLSETTING(xmlString2, xmlResults2, xmlNode2, "0overwritten");
+ TopicInfo topicInfo2;
+
+ mock.parseTopic(xmlNode2, topicInfo2, topicInfo);
+ TS_ASSERT_EQUALS(topicInfo2.bUseTopic, true); // Taken from first
+ TS_ASSERT_EQUALS(topicInfo2.bDeliveryModePersistent, false); // Taken from first
+ TS_ASSERT_EQUALS(topicInfo2.destUri, "overwritten"); // overwritten
+ }
+
+ void testGetTopics() {
+ MonitorPlugInActiveMQ mock;
+ Topics topics = mock.getTopics();
+ TS_ASSERT_EQUALS(topics.size(), 0);
+ }
+
+ void testParseTopics() {
+ MonitorPlugInActiveMQ mock;
+ TopicInfo referenceTopic;
+ // Daten initalisieren
+ referenceTopic.bDeliveryModePersistent = false;
+ referenceTopic.bUseTopic = false;
+ Topics topics;
+
+ XMLSETTING(xmlString, xmlResults, xmlNode, "\
+default\
+\
+ 1\
+ 1\
+ fms\
+\
+\
+ 0\
+ pocsag\
+");
+ mock.parseTopic(xmlNode, referenceTopic, referenceTopic);
+ TS_ASSERT_EQUALS(referenceTopic.destUri, "default");
+
+ mock.parseTopics(xmlNode, topics, referenceTopic);
+ TS_ASSERT_EQUALS(topics.size(), 3);
+ TS_ASSERT_EQUALS((*(topics.find(ACTIVEMQ_KEY_FMS))->second).destUri, "fms");
+ TS_ASSERT_EQUALS((*(topics.find(ACTIVEMQ_KEY_FMS))->second).bDeliveryModePersistent, true);
+ TS_ASSERT_EQUALS((*(topics.find(ACTIVEMQ_KEY_FMS))->second).bUseTopic, true);
+ TS_ASSERT_EQUALS((*(topics.find(ACTIVEMQ_KEY_POCSAG))->second).destUri, "pocsag");
+ TS_ASSERT_EQUALS((*(topics.find(ACTIVEMQ_KEY_POCSAG))->second).bDeliveryModePersistent, false);
+ TS_ASSERT_EQUALS((*(topics.find(ACTIVEMQ_KEY_POCSAG))->second).bUseTopic, false);
+ TS_ASSERT_EQUALS((*(topics.find(ACTIVEMQ_KEY_ZVEI))->second).destUri, "default");
+ TS_ASSERT_EQUALS((*(topics.find(ACTIVEMQ_KEY_ZVEI))->second).bUseTopic, false);
+ TS_ASSERT_EQUALS((*(topics.find(ACTIVEMQ_KEY_ZVEI))->second).bDeliveryModePersistent, false);
+ }
+
+ void testInitializeActiveMqConnection() {
+ MonitorPlugInActiveMQ mock;
+ XMLSETTING(xmlString, xmlResults, xmlNode, "default\
+tcp://192.168.1.101:61616");
+ mock.initializeConfiguration(xmlNode);
+
+ bool success = mock.initializeActiveMqConnection();
+ TS_ASSERT_EQUALS(success, false);
+
+ XMLSETTING(xmlString2, xmlResults2, xmlNode2, "default\
+tcp://192.168.1.100:61616");
+ mock.initializeConfiguration(xmlNode2);
+
+ success = mock.initializeActiveMqConnection();
+ TS_ASSERT_EQUALS(success, true);
+ }
+
+ void testInitializeTopics() {
+ MonitorPlugInActiveMQ mock;
+ ModuleResultBase mrb;
+ TopicInfo referenceTopic;
+ Topics topics = mock.getTopics();
+ // Daten initalisieren
+ referenceTopic.bDeliveryModePersistent = false;
+ referenceTopic.bUseTopic = false;
+
+ XMLSETTING(xmlString, xmlResults, xmlNode, "default\
+tcp://192.168.1.100:61616");
+ mock.initializeConfiguration(xmlNode);
+
+ bool success = mock.initializeActiveMqConnection();
+ TS_ASSERT_EQUALS(success, true);
+
+ mrb.set("key1", "val1");
+ mrb.set("key2", "val2");
+ mrb.set("typ", "unknown");
+
+ TS_ASSERT_EQUALS(mrb.get("key1"), "val1");
+ TS_ASSERT_EQUALS(mrb.get("key2"), "val2");
+
+ // fail -> not initialized
+ TS_ASSERT_THROWS_ANYTHING(mock.processResult(&mrb));
+
+ // load default topic
+ mock.parseTopic(xmlNode, referenceTopic, referenceTopic);
+ // parse all topics
+ mock.parseTopics(xmlNode, topics, referenceTopic);
+ mock.initializeTopics(topics);
+
+ TS_ASSERT_EQUALS(topics.size(), 3);
+ mock.setTopics(topics);
+ TS_ASSERT_EQUALS(mock.getTopics().size(), 3);
+
+ success = mock.processResult(&mrb);
+ TS_TRACE("all initialized");
+ TS_ASSERT_EQUALS(success, false); // could not send, unknown type
+
+ mrb.set("typ", "fms");
+ TS_ASSERT_EQUALS(mrb.get("typ"), "fms");
+ TS_ASSERT_EQUALS(mock.processResult(&mrb), true);
+
+ }
+};
diff --git a/monitord/plugins/libmplugin_activemq/unittest.sh b/monitord/plugins/libmplugin_activemq/unittest.sh
new file mode 100755
index 0000000..560a4cd
--- /dev/null
+++ b/monitord/plugins/libmplugin_activemq/unittest.sh
@@ -0,0 +1,24 @@
+#!/bin/bash
+# Dreckiges Script zum Ausfuehren der Unittests
+CXXTEST=~/dev/app/cxxtest
+$CXXTEST/cxxtestgen.pl --error-printer -o runner.cpp MonitorPluginActiveMqTestSuite.h
+echo Unittest generated
+
+cd ../../../
+echo "Compiling ..."
+make
+cd -
+find ../../../ -name "*.o" | xargs -i cp -v {} o/
+# Doppelte Abhaengigkeiten eentfernen
+rm o/monitord_plugins_libmplugin_activemq_la-mplugin.o
+rm o/monitord_plugins_libmplugin_activemq_la-xml*.o
+rm o/monitord_plugins_libmplugin_audiorecorder_la-xml*.o
+rm o/monitord_plugins_libmplugin_audiorecorder_la-m*.o
+# main() entfernen
+rm o/monitord_monitord-Monitor.o
+g++ -I /usr/local/include/activemq-cpp-3.4.0 -I $CXXTEST -I ~/dev/app/monitord/trunk -lasound -ldl -lpthread -lm -lactivemq-cpp -o runner o/*.o runner.cpp
+echo "Unittest compiled"
+# libactivemq-cpp importieren
+export LD_LIBRARY_PATH=$LD_LIBRARY_PATH:/usr/local/lib
+
+./runner