diff --git a/Makefile.am b/Makefile.am index e6c82cb..63d3c0d 100644 --- a/Makefile.am +++ b/Makefile.am @@ -151,6 +151,33 @@ endif # mysql Support ? # -------------------------------------------------------- +# Monitor-Plugin: ActiveMQ +# +if ACTIVEMQ +pkglib_LTLIBRARIES+= \ + monitord/plugins/libmplugin_activemq.la + +monitord_plugins_libmplugin_activemq_la_SOURCES=\ + ${monitord_plugins_common} \ + monitord/plugins/mplugin.cpp \ + monitord/plugins/libmplugin_activemq.cpp + +monitord_plugins_libmplugin_activemq_la_CPPFLAGS =\ + ${monitord_monitord_CPPFLAGS} \ + ${ACTIVEMQ_CFLAGS} \ + ${DLL} + +monitord_plugins_libmplugin_activemq_la_LDFLAGS=\ + -no-undefined \ + ${LIBACTIVEMQ} \ + ${DLL} + +monitord_plugin_libmplugin_activemq_la_LIBS=\ + ${ACTIVEMQ_LIBS} + +endif + +# -------------------------------------------------------- # Audio-Plugin: Recorder # monitord_plugins_libmplugin_audiorecorder_la_SOURCES=\ diff --git a/README.md b/README.md new file mode 100644 index 0000000..e4248e3 --- /dev/null +++ b/README.md @@ -0,0 +1,52 @@ +# monitord-activemq + +## Dependency +monitord-activemq depends on activemq-cpp. + +## Installation +Just push back this directory into /monitord/trunk. + +## Compiling on CentOS 6.2 + + yum install autoconf automake pkg-config alsa-lib-devel lua-devel + aclocal -Im4 + ./configure --enable-plugins --with-activemq + +## Configuration +Edit your monitord.xml: + + + + plugins/libmplugin_activemq.so + + activemq.log + DEBUG + tcp://127.0.0.1:61616 + your_username_or_empty + your_password_or_empty + your_clientid_or_empty + 5 + 5 + 5 + 1 + 0 + + + 1 + 0 + zabos + + + + zabos.fms + + + + + 0 + + + + + + diff --git a/configure.ac b/configure.ac index d2f5d32..64db5e0 100644 --- a/configure.ac +++ b/configure.ac @@ -11,7 +11,6 @@ AX_LUA_HEADERS AX_LUA_LIBS - case "$host_os" in linux*) LINUX=true @@ -26,6 +25,8 @@ SOXLIBDLL=[libsox.so] MYSQLLIBNAME=[mysqlclient] MYSQLLIBDLL=[libmysqlclient.so] + ACTIVEMQLIBNAME=[activemq-cpp] + ACTIVEMQDLL=[libactivemq-cpp.so] ;; mingw32*) WINDOWS=true @@ -40,6 +41,8 @@ VORBISENCLIBNAME=[vorbisenc-0] MYSQLLIBNAME=[mysql] MYSQLLIBDLL=[libmysql.dll] + ACTIVEMQNAME=]activemq-cpp] + ACTIVEMQDLL=[libactivemq-cpp.dll] AC_LIBTOOL_WIN32_DLL ;; esac @@ -73,6 +76,11 @@ [enable experimental mysql support (default is no)])], [], [use_mysql=no]) +AC_ARG_WITH([activemq], + [AC_HELP_STRING([--with-activemq], + [enable experimental ActiveMQ support (default is no)])], + [use_als=$withval], + [use_activemq=no]) AC_ARG_WITH([lame], [AC_HELP_STRING([--with-lame], [enable experimental mp3 lame support (default is no)])], @@ -107,6 +115,9 @@ if test "x$use_vorbis" != xno; then AC_MSG_ERROR([[vorbis support is not supported without --enable-plugins]]) fi + if test "x$use_activemq" != xno; then + AC_MSG_ERROR([[ActiveMQ support is not supported without --enable-plugins]]) + fi else if test "x$use_mysql" != xno; then AC_CHECK_HEADER([mysql/mysql.h], @@ -148,12 +159,36 @@ ]) if test "x$use_sox" != xno; then - AC_CHECK_HEADER([sox.h], - , - [AC_MSG_ERROR([sox.h not found])]) + AC_CHECK_HEADER([sox.h], + , + [AC_MSG_ERROR([sox not found])]) fi + + AS_IF([test "x$use_activemq" != xno], + [PKG_CHECK_MODULES( + [ACTIVEMQ], + [$ACTIVEMQLIBNAME], + [ + AC_MSG_NOTICE([activemq is present - ActiveMQ plugin will be created]) + AC_DEFINE([HAVE_ACTIVEMQ], [1], [Define if you have activemq-cpp]) + activemq=true + ], + [AC_MSG_FAILURE([$ACTIVEMQLIBNAME is not installed as package])] + )]) +# [AC_CHECK_LIB([$ACTIVEMQLIBNAME],[main], +# [ +# AC_SUBST([LIBACTIVEMQ], ["-l$ACTIVEMQLIBNAME"]) +# AC_DEFINE([HAVE_ACTIVEMQ], [1], [Define if you have activemq]) +# activemq=true +# ], +# [if test "x$use_activemq" != xcheck; then +# AC_MSG_FAILURE([--with-activemq but $ACTIVEMQLIBNAME failed]) +# fi +# ]) +# ]) - LIBSOX= + + LIBS= AS_IF([test "x$use_sox" != xno], [AC_CHECK_LIB([$SOXLIBNAME],[sox_init], [AC_SUBST([LIBSOX], ["-l$SOXLIBNAME"]) @@ -195,6 +230,7 @@ AM_CONDITIONAL(LAME, test x$lame = xtrue) AM_CONDITIONAL(SOX, test x$sox = xtrue) AM_CONDITIONAL(VORBIS, test x$vorbis = xtrue) +AM_CONDITIONAL(ACTIVEMQ, test x$activemq = xtrue) AC_SUBST(LUA_LIB) AC_SUBST(LUA_INCLUDE) diff --git a/monitord/plugins/libmplugin_activemq.cpp b/monitord/plugins/libmplugin_activemq.cpp new file mode 100644 index 0000000..cacb55a --- /dev/null +++ b/monitord/plugins/libmplugin_activemq.cpp @@ -0,0 +1,396 @@ +/** + * monitord-activemq + * http://github.com/schakko/monitord-activemq + * Contains *untested* prototyped code + */ +#include "libmplugin_activemq.h" + +using namespace std; +using namespace activemq; +using namespace activemq::core; +using namespace decaf::lang::exceptions; +using namespace cms; + +MonitorPlugInActiveMQ::MonitorPlugInActiveMQ() +{ + m_bUseCompression = false; + m_bClientAck = false; + m_bConnected = false; + m_brokerUri = "tcp://127.0.0.1:61616"; + m_username = ""; + m_password = ""; + m_clientId = ""; + m_destUri = ""; + m_sendTimeout = 0; + m_closeTimeout = 0; + m_producerWindowSize = 0; + + m_genericTopic.bUseTopic = false; + m_genericTopic.bDeliveryModePersistent = false; + m_genericTopic.destUri = ""; + + m_bDeliveryModePersistent = false; + m_bTopicsInitialized = false; + + m_session = NULL; + m_connection = NULL; + + activemq::library::ActiveMQCPP::initializeLibrary(); +} + +MonitorPlugInActiveMQ::~MonitorPlugInActiveMQ() +{ + Topics::iterator i ; + TopicInfo *pTopicInfo; + + // Destroy resources. + for (i = m_topics.begin(); i != m_topics.end(); i++) + { + pTopicInfo = i->second; + + try { + if (pTopicInfo->destination != NULL) { + delete pTopicInfo->destination; + } + } + catch (CMSException& e) { + e.printStackTrace(); + } + + pTopicInfo->destination = NULL; + + try { + if (pTopicInfo->producer != NULL) { + delete pTopicInfo->producer; + } + } + catch (CMSException& e) { + e.printStackTrace(); + } + + pTopicInfo->producer = NULL; + } + + // Close open resources. + try { + if (m_session != NULL) { + m_session->close(); + } + + if (m_connection != NULL) { + m_connection->close(); + } + } + catch (CMSException& e) { + e.printStackTrace(); + } + + try { + if (m_session != NULL) { + delete m_session; + } + } + catch (CMSException& e) { + e.printStackTrace(); + } + + m_session = NULL; + + try { + if (m_connection != NULL) { + delete m_connection; + } + } + catch (CMSException& e) { + e.printStackTrace(); + } + + m_connection = NULL; +} + +void MonitorPlugInActiveMQ::Show() +{ + FILE_LOG(logINFO) << "MonitorActiveMQPlugin successfully loaded" ; +} + +bool MonitorPlugInActiveMQ::processResult(class ModuleResultBase *pRes) +{ + FILE_LOG(logDEBUG) << "apachemq: processing Result..."; + + if (m_bConnected == false) { + FILE_LOG(logERROR) << "apachmq: ignoring message 'cause no active connection"; + return false; + } + + if (m_bTopicsInitialized == false) { + throw RuntimeException(__FILE__, __LINE__, "processResult must be called AFTER initializeTopics()"); + } + + + std::string type = (*pRes)["typ"]; + + if (m_topics.find(type) == m_topics.end()) { + FILE_LOG(logERROR) << "apachemq: received type " << type << " which is not registered"; + return false; + } + + FILE_LOG(logINFO) << "Preparing new message"; + + TopicInfo* topicInfo = (m_topics.find(type))->second; + TextMessage* message = m_session->createTextMessage(); + + ResultItemsMap::iterator i; + + for (i = (*pRes).m_Items.begin(); i != (*pRes).m_Items.end(); i++) { + message->setStringProperty(i->first, i->second); + } + + + topicInfo->producer->send(message); + + FILE_LOG(logINFO) << "message sent"; + + delete message; + + return true; +} + +void MonitorPlugInActiveMQ::initializeConfiguration(XMLNode config) +{ + FILE_LOG(logDEBUG) << "Reading broker URI"; + m_brokerUri = getNodeText(config, ACTIVEMQ_XMLNODE_BROKERURI, "tcp://127.0.0.1:61616"); + FILE_LOG(logDEBUG) << "Reading username:"; + m_username = getNodeText(config, ACTIVEMQ_XMLNODE_USERNAME, ""); + FILE_LOG(logDEBUG) << "Reading password"; + m_password = getNodeText(config, ACTIVEMQ_XMLNODE_PASSWORD, ""); + FILE_LOG(logDEBUG) << "Reading clientId"; + m_clientId = getNodeText(config, ACTIVEMQ_XMLNODE_CLIENTID, ""); + FILE_LOG(logDEBUG) << "Reading sendTimeout"; + m_sendTimeout = getNodeInt(config, ACTIVEMQ_XMLNODE_SENDTIMEOUT, 0); + FILE_LOG(logDEBUG) << "Reading closeTimeout"; + m_closeTimeout = getNodeInt(config, ACTIVEMQ_XMLNODE_CLOSETIMEOUT, 0); + FILE_LOG(logDEBUG) << "Reading producerWindowSize"; + m_producerWindowSize = getNodeInt(config, ACTIVEMQ_XMLNODE_PRODUCERWINDOWSIZE, 0); + FILE_LOG(logDEBUG) << "Reading bUseCompression"; + m_bUseCompression = getNodeBool(config, ACTIVEMQ_XMLNODE_USECOMPRESSION, false); + FILE_LOG(logDEBUG) << "Reading bClientAck"; + m_bClientAck = getNodeBool(config, ACTIVEMQ_XMLNODE_CLIENTACK, false); + FILE_LOG(logDEBUG) << "Reading logFile"; + std::string logFile = getNodeText(config, ACTIVEMQ_XMLNODE_LOGFILE, "screen"); + FILE_LOG(logDEBUG) << "Reading logLevel"; + std::string logLevel = getNodeText(config, ACTIVEMQ_XMLNODE_LOGLEVEL, "INFO"); + + #ifdef WIN32 + if (!(logFile == "screen")) { + FILE* pFile = fopen(logFile.c_str(), "a"); + Output2FILE::Stream() = pFile; + } + + FILELog::ReportingLevel() = FILELog::FromString(logLevel); + FILE_LOG(logINFO) << "logging started"; + #endif +} + +Topics MonitorPlugInActiveMQ::getTopics() +{ + return m_topics; +} + +void MonitorPlugInActiveMQ::setTopics(Topics& topics) +{ + m_topics = topics; +} + +void MonitorPlugInActiveMQ::initializeConnectionFactory(ActiveMQConnectionFactory *connectionFactory) +{ + // Set Broker-URI first - otherwise username and password is lost + connectionFactory->setBrokerURI(m_brokerUri); + + if (!m_username.empty()) { + connectionFactory->setUsername(m_username); + } + + if (!m_password.empty()) { + connectionFactory->setPassword(m_password); + } + + if (!m_clientId.empty()) { + connectionFactory->setClientId(m_clientId); + } + + connectionFactory->setUseCompression(m_bUseCompression); + connectionFactory->setSendTimeout(m_sendTimeout); + connectionFactory->setCloseTimeout(m_closeTimeout); + connectionFactory->setProducerWindowSize(m_producerWindowSize); + + FILE_LOG(logDEBUG) << "Connection factory initialized"; +} + +bool MonitorPlugInActiveMQ::initializeActiveMqConnection() +{ + auto_ptr connectionFactory(new ActiveMQConnectionFactory()); + initializeConnectionFactory(connectionFactory.get()); + + // create a connection + try { + m_connection = connectionFactory->createConnection(); + m_connection->start(); + + if (m_bClientAck) { + m_session = m_connection->createSession(Session::CLIENT_ACKNOWLEDGE); + } + else { + m_session = m_connection->createSession(Session::AUTO_ACKNOWLEDGE); + } + + m_bConnected = true; + } + catch (CMSException& e) { + FILE_LOG(logERROR) << "Could not connect to messaging queue \"" << m_brokerUri << "\" with username=\"" << m_username << "\""; + m_bConnected = false; + } + + FILE_LOG(logDEBUG) << "Connection initialized"; + + return m_bConnected; +} + +bool MonitorPlugInActiveMQ::initProcessing(class MonitorConfiguration* configPtr, XMLNode config) +{ + // initialize ActiveMQ + activemq::library::ActiveMQCPP::initializeLibrary(); + + // read default configuration for topics + parseTopic(config, m_genericTopic, m_genericTopic); + + initializeConfiguration(config); + + if (initializeActiveMqConnection()) { + parseTopics(config, m_topics, m_genericTopic); + initializeTopics(m_topics); + } + + return m_bConnected; +} + +void MonitorPlugInActiveMQ::initializeTopics(Topics &topics) +{ + if (m_bConnected == false) { + throw RuntimeException(__FILE__, __LINE__, "Tried to initialize topics without established ActiveMQ connection. Call initializeActiveMqConnection() first."); + } + + Topics::iterator i; + TopicInfo *pTopicInfo; + + // create new producers + for (i = topics.begin(); i != topics.end(); i++) + { + pTopicInfo = i->second; + + if (pTopicInfo->bUseTopic) { + pTopicInfo->destination = m_session->createTopic(pTopicInfo->destUri); + } + else { + pTopicInfo->destination = m_session->createQueue(pTopicInfo->destUri); + } + + pTopicInfo->producer = m_session->createProducer(pTopicInfo->destination); + + if (pTopicInfo->bDeliveryModePersistent) { + pTopicInfo->producer->setDeliveryMode(DeliveryMode::PERSISTENT); + } + else { + pTopicInfo->producer->setDeliveryMode(DeliveryMode::NON_PERSISTENT); + } + + FILE_LOG(logINFO) << "Topic destination \"" << pTopicInfo->destination << "\" created"; + } + + FILE_LOG(logINFO) << "Topics initialized"; + + m_bTopicsInitialized = true; +} + +bool MonitorPlugInActiveMQ::quitProcessing() +{ + return true; +} + +void MonitorPlugInActiveMQ::parseTopics(XMLNode config, Topics &topics, TopicInfo &referenceTopic) +{ + XMLNode topicNode; + Topics::iterator i; + TopicInfo *pTopicInfo; + + // defaults + vector channels; + channels.push_back(ACTIVEMQ_KEY_POCSAG); + channels.push_back(ACTIVEMQ_KEY_ZVEI); + channels.push_back(ACTIVEMQ_KEY_FMS); + + for (unsigned int i = 0, m = channels.size(); i < m; i++) + { + pTopicInfo = new TopicInfo; + initializeTopic(*pTopicInfo, referenceTopic); + topics.insert(PairMapping(channels.at(i), pTopicInfo)); + } + + int nTopic = config.nChildNode(ACTIVEMQ_XMLNODE_TOPIC); + + for (int num = 0; num < nTopic ; ++num) + { + if (!((topicNode = config.getChildNode(ACTIVEMQ_XMLNODE_TOPIC, num))).isEmpty()) { + std::string type = topicNode.getAttribute(ACTIVEMQ_XMLATTR_TYPE) ; + + if ((type == ACTIVEMQ_KEY_POCSAG) || (type == ACTIVEMQ_KEY_FMS) || (type == ACTIVEMQ_KEY_ZVEI)) { + pTopicInfo = (topics.find(type))->second; + parseTopic(topicNode, *pTopicInfo, referenceTopic); + } + } + } +} + +void MonitorPlugInActiveMQ::initializeTopic(TopicInfo &topicInfo, TopicInfo &referenceTopic) +{ + topicInfo.bUseTopic = referenceTopic.bUseTopic; + topicInfo.bDeliveryModePersistent = referenceTopic.bDeliveryModePersistent; + topicInfo.destUri = referenceTopic.destUri; + topicInfo.destination = NULL; + topicInfo.producer = NULL; +} + +void MonitorPlugInActiveMQ::parseTopic(XMLNode config, TopicInfo &topicInfo, TopicInfo &referenceTopic) +{ + initializeTopic(topicInfo, referenceTopic); + + if (config.nChildNode(ACTIVEMQ_XMLNODE_USETOPIC) >= 1) { + topicInfo.bUseTopic = getNodeBool(config, ACTIVEMQ_XMLNODE_USETOPIC, false); + } + + if (config.nChildNode(ACTIVEMQ_XMLNODE_DELIVERYMODEPERSISTENT) >= 1) { + topicInfo.bDeliveryModePersistent = getNodeBool(config, ACTIVEMQ_XMLNODE_DELIVERYMODEPERSISTENT, false); + } + + if (config.nChildNode(ACTIVEMQ_XMLNODE_DESTURI) >= 1) { + topicInfo.destUri = getNodeText(config, ACTIVEMQ_XMLNODE_DESTURI, "monitord"); + } +} + +MonitorPlugInActiveMQFactory::MonitorPlugInActiveMQFactory() +{ +} + +MonitorPlugInActiveMQFactory::~MonitorPlugInActiveMQFactory() +{ +} + +MonitorPlugIn * MonitorPlugInActiveMQFactory::CreatePlugIn() +{ + return new MonitorPlugInActiveMQ(); +} + + +DLL_EXPORT void * factory0( void ) +{ + return new MonitorPlugInActiveMQFactory; +} diff --git a/monitord/plugins/libmplugin_activemq.h b/monitord/plugins/libmplugin_activemq.h new file mode 100644 index 0000000..7410fa1 --- /dev/null +++ b/monitord/plugins/libmplugin_activemq.h @@ -0,0 +1,113 @@ +#ifndef MPLUGINACTIVEMQ_H_ +#define MPLUGINACTIVEMQ_H_ + +#ifdef PLUGINS +#include +#include + +#ifdef WIN32 +#define usleep Sleep +#include +#endif + +#include "mplugin.h" +#include "../MonitorLogging.h" +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#define ACTIVEMQ_KEY_POCSAG "pocsag" +#define ACTIVEMQ_KEY_FMS "fms" +#define ACTIVEMQ_KEY_ZVEI "zvei" +#define ACTIVEMQ_XMLNODE_BROKERURI "brokerUri" +#define ACTIVEMQ_XMLNODE_USERNAME "username" +#define ACTIVEMQ_XMLNODE_PASSWORD "password" +#define ACTIVEMQ_XMLNODE_CLIENTID "clientId" +#define ACTIVEMQ_XMLNODE_SENDTIMEOUT "sendTimeout" +#define ACTIVEMQ_XMLNODE_CLOSETIMEOUT "closeTimeout" +#define ACTIVEMQ_XMLNODE_PRODUCERWINDOWSIZE "producerWindowSize" +#define ACTIVEMQ_XMLNODE_USECOMPRESSION "useCompression" +#define ACTIVEMQ_XMLNODE_CLIENTACK "clientAck" +#define ACTIVEMQ_XMLNODE_LOGFILE "logfile" +#define ACTIVEMQ_XMLNODE_LOGLEVEL "loglevel" +#define ACTIVEMQ_XMLNODE_TOPIC "topic" +#define ACTIVEMQ_XMLATTR_TYPE "type" +#define ACTIVEMQ_XMLNODE_USETOPIC "useTopic" +#define ACTIVEMQ_XMLNODE_DELIVERYMODEPERSISTENT "deliveryModePersistent" +#define ACTIVEMQ_XMLNODE_DESTURI "destUri" + + +typedef struct +{ + bool bUseTopic; + bool bDeliveryModePersistent; + std::string destUri; + cms::Destination *destination; + cms::MessageProducer *producer; +} TopicInfo; + +typedef std::map Topics; +typedef std::pair PairMapping; + +class MonitorPlugInActiveMQ : public MonitorPlugIn +{ +public: + MonitorPlugInActiveMQ(); + ~MonitorPlugInActiveMQ(); + std::string m_brokerUri; + std::string m_username; + std::string m_password; + std::string m_clientId; + std::string m_destUri; + unsigned int m_sendTimeout; + unsigned int m_closeTimeout; + unsigned int m_producerWindowSize; + bool m_bUseCompression; + bool m_bClientAck; + bool m_bDeliveryModePersistent; + bool m_bConnected; + bool m_bTopicsInitialized; + + cms::Connection* m_connection; + cms::Session* m_session; + + TopicInfo m_genericTopic; + Topics m_topics; + + bool initProcessing(class MonitorConfiguration* configPtr,XMLNode config); + bool processResult(class ModuleResultBase *pRes); + bool quitProcessing(); + void Show(); + + Topics getTopics(); + void setTopics(Topics &topics); + void shutdownActiveMQCPPLibrary(); + void initializeActiveMQCPPLibrary(); + void initializeConfiguration(XMLNode config); + void initializeConnectionFactory(activemq::core::ActiveMQConnectionFactory *connectionFactory); + bool initializeActiveMqConnection(); + void initializeTopics(Topics &topics); + void parseTopics(XMLNode config, Topics &topics, TopicInfo &referenceTopic); + void parseTopic(XMLNode config, TopicInfo &topicInfo, TopicInfo &referenceTopic); + void initializeTopic(TopicInfo &topicInfo, TopicInfo &referenceTopic); +}; + +class MonitorPlugInActiveMQFactory : public MonitorPlugInFactory +{ +public: + MonitorPlugInActiveMQFactory(); + ~MonitorPlugInActiveMQFactory(); + virtual MonitorPlugIn * CreatePlugIn(); +}; + + + +#endif +#endif /*MPLUGINACTIVEMQ_H_*/ diff --git a/monitord/plugins/libmplugin_activemq.la b/monitord/plugins/libmplugin_activemq.la new file mode 100644 index 0000000..ab5cfe3 --- /dev/null +++ b/monitord/plugins/libmplugin_activemq.la @@ -0,0 +1,41 @@ +# libmplugin_activemq.la - a libtool library file +# Generated by ltmain.sh (GNU libtool) 2.2.6b Debian-2.2.6b-2ubuntu1 +# +# Please DO NOT delete this file! +# It is necessary for linking the library. + +# The name that we can dlopen(3). +dlname='libmplugin_activemq.so.0' + +# Names of this library. +library_names='libmplugin_activemq.so.0.0.0 libmplugin_activemq.so.0 libmplugin_activemq.so' + +# The name of the static archive. +old_library='' + +# Linker flags that can not go in dependency_libs. +inherited_linker_flags='' + +# Libraries that this one depends upon. +dependency_libs='' + +# Names of additional weak libraries provided by this library +weak_library_names='' + +# Version information for libmplugin_activemq. +current=0 +age=0 +revision=0 + +# Is this an already installed library? +installed=no + +# Should we warn about portability when linking against -modules? +shouldnotlink=no + +# Files to dlopen/dlpreopen +dlopen='' +dlpreopen='' + +# Directory that this library needs to be installed in: +libdir='/usr/local/lib/monitord' diff --git a/monitord/plugins/libmplugin_activemq/MonitorPluginActiveMqTestSuite.h b/monitord/plugins/libmplugin_activemq/MonitorPluginActiveMqTestSuite.h new file mode 100644 index 0000000..de10d74 --- /dev/null +++ b/monitord/plugins/libmplugin_activemq/MonitorPluginActiveMqTestSuite.h @@ -0,0 +1,191 @@ +#define PLUGINS +#include "../libmplugin_activemq.h" +#include + +#define XMLSETTING(xmlString, xmlResults, xmlNode, CHARS) \ + const char *xmlString = CHARS; \ + XMLResults xmlResults;\ + XMLNode xmlNode; \ + xmlNode = XMLNode::parseString(xmlString, NULL, &xmlResults); \ + +#define XML_PARAMS_DEFAULT "tcp://localhostnew_usernamenew_passwordnew_client_id22211" + +#define XML_PARAMS_TOPIC_DEFAULT "11activemq_test" + +using namespace std; +using namespace activemq; +using namespace activemq::core; +using namespace cms; + +class MonitorPlugInActiveMqTestSuite : public CxxTest::TestSuite +{ +public: + void testMainConfiguration() { + MonitorPlugInActiveMQ mock; + TS_ASSERT_EQUALS(mock.m_bConnected, false); + TS_ASSERT_EQUALS(mock.m_bUseCompression, false); + TS_ASSERT_EQUALS(mock.m_bClientAck, false); + TS_ASSERT_EQUALS(mock.m_bDeliveryModePersistent, false); + + XMLSETTING(xmlString, xmlResults, xmlNode, XML_PARAMS_DEFAULT); + mock.initializeConfiguration(xmlNode); + + TS_ASSERT_EQUALS(mock.m_brokerUri, "tcp://localhost"); + TS_ASSERT_EQUALS(mock.m_username, "new_username"); + TS_ASSERT_EQUALS(mock.m_password, "new_password"); + TS_ASSERT_EQUALS(mock.m_clientId, "new_client_id"); + TS_ASSERT_EQUALS(mock.m_sendTimeout, 2); + TS_ASSERT_EQUALS(mock.m_closeTimeout, 2); + TS_ASSERT_EQUALS(mock.m_producerWindowSize, 2); + TS_ASSERT_EQUALS(mock.m_bUseCompression, true); + TS_ASSERT_EQUALS(mock.m_bClientAck, true); + } + + void testInitialzeConnectionFactory() { + MonitorPlugInActiveMQ mock; + + auto_ptr connectionFactory(new ActiveMQConnectionFactory()); + + XMLSETTING(xmlString, xmlResults, xmlNode, XML_PARAMS_DEFAULT); + mock.initializeConfiguration(xmlNode); + ActiveMQConnectionFactory *pCF = connectionFactory.get(); + mock.initializeConnectionFactory(pCF); + + TS_ASSERT_EQUALS(pCF->getBrokerURI().toString(), "tcp://localhost"); + TS_ASSERT_EQUALS(pCF->getClientId(), "new_client_id"); + TS_ASSERT_EQUALS(pCF->getPassword(), "new_password"); + TS_ASSERT_EQUALS(pCF->getUsername(), "new_username"); + TS_ASSERT_EQUALS(pCF->isUseCompression(), true); + TS_ASSERT_EQUALS(pCF->getSendTimeout(), 2); + TS_ASSERT_EQUALS(pCF->getCloseTimeout(), 2); + TS_ASSERT_EQUALS(pCF->getProducerWindowSize(), 2); + } + + void testParseTopic() { + MonitorPlugInActiveMQ mock; + XMLSETTING(xmlString, xmlResults, xmlNode, XML_PARAMS_TOPIC_DEFAULT); + TopicInfo topicInfo; + topicInfo.bUseTopic = false; + topicInfo.bDeliveryModePersistent = false; + topicInfo.destUri = ""; + + mock.parseTopic(xmlNode, topicInfo, topicInfo); + + TS_ASSERT_EQUALS(topicInfo.bUseTopic, true); + TS_ASSERT_EQUALS(topicInfo.bDeliveryModePersistent, true); + TS_ASSERT_EQUALS(topicInfo.destUri, "activemq_test"); + + + XMLSETTING(xmlString2, xmlResults2, xmlNode2, "0overwritten"); + TopicInfo topicInfo2; + + mock.parseTopic(xmlNode2, topicInfo2, topicInfo); + TS_ASSERT_EQUALS(topicInfo2.bUseTopic, true); // Taken from first + TS_ASSERT_EQUALS(topicInfo2.bDeliveryModePersistent, false); // Taken from first + TS_ASSERT_EQUALS(topicInfo2.destUri, "overwritten"); // overwritten + } + + void testGetTopics() { + MonitorPlugInActiveMQ mock; + Topics topics = mock.getTopics(); + TS_ASSERT_EQUALS(topics.size(), 0); + } + + void testParseTopics() { + MonitorPlugInActiveMQ mock; + TopicInfo referenceTopic; + // Daten initalisieren + referenceTopic.bDeliveryModePersistent = false; + referenceTopic.bUseTopic = false; + Topics topics; + + XMLSETTING(xmlString, xmlResults, xmlNode, "\ +default\ +\ + 1\ + 1\ + fms\ +\ +\ + 0\ + pocsag\ +"); + mock.parseTopic(xmlNode, referenceTopic, referenceTopic); + TS_ASSERT_EQUALS(referenceTopic.destUri, "default"); + + mock.parseTopics(xmlNode, topics, referenceTopic); + TS_ASSERT_EQUALS(topics.size(), 3); + TS_ASSERT_EQUALS((*(topics.find(ACTIVEMQ_KEY_FMS))->second).destUri, "fms"); + TS_ASSERT_EQUALS((*(topics.find(ACTIVEMQ_KEY_FMS))->second).bDeliveryModePersistent, true); + TS_ASSERT_EQUALS((*(topics.find(ACTIVEMQ_KEY_FMS))->second).bUseTopic, true); + TS_ASSERT_EQUALS((*(topics.find(ACTIVEMQ_KEY_POCSAG))->second).destUri, "pocsag"); + TS_ASSERT_EQUALS((*(topics.find(ACTIVEMQ_KEY_POCSAG))->second).bDeliveryModePersistent, false); + TS_ASSERT_EQUALS((*(topics.find(ACTIVEMQ_KEY_POCSAG))->second).bUseTopic, false); + TS_ASSERT_EQUALS((*(topics.find(ACTIVEMQ_KEY_ZVEI))->second).destUri, "default"); + TS_ASSERT_EQUALS((*(topics.find(ACTIVEMQ_KEY_ZVEI))->second).bUseTopic, false); + TS_ASSERT_EQUALS((*(topics.find(ACTIVEMQ_KEY_ZVEI))->second).bDeliveryModePersistent, false); + } + + void testInitializeActiveMqConnection() { + MonitorPlugInActiveMQ mock; + XMLSETTING(xmlString, xmlResults, xmlNode, "default\ +tcp://192.168.1.101:61616"); + mock.initializeConfiguration(xmlNode); + + bool success = mock.initializeActiveMqConnection(); + TS_ASSERT_EQUALS(success, false); + + XMLSETTING(xmlString2, xmlResults2, xmlNode2, "default\ +tcp://192.168.1.100:61616"); + mock.initializeConfiguration(xmlNode2); + + success = mock.initializeActiveMqConnection(); + TS_ASSERT_EQUALS(success, true); + } + + void testInitializeTopics() { + MonitorPlugInActiveMQ mock; + ModuleResultBase mrb; + TopicInfo referenceTopic; + Topics topics = mock.getTopics(); + // Daten initalisieren + referenceTopic.bDeliveryModePersistent = false; + referenceTopic.bUseTopic = false; + + XMLSETTING(xmlString, xmlResults, xmlNode, "default\ +tcp://192.168.1.100:61616"); + mock.initializeConfiguration(xmlNode); + + bool success = mock.initializeActiveMqConnection(); + TS_ASSERT_EQUALS(success, true); + + mrb.set("key1", "val1"); + mrb.set("key2", "val2"); + mrb.set("typ", "unknown"); + + TS_ASSERT_EQUALS(mrb.get("key1"), "val1"); + TS_ASSERT_EQUALS(mrb.get("key2"), "val2"); + + // fail -> not initialized + TS_ASSERT_THROWS_ANYTHING(mock.processResult(&mrb)); + + // load default topic + mock.parseTopic(xmlNode, referenceTopic, referenceTopic); + // parse all topics + mock.parseTopics(xmlNode, topics, referenceTopic); + mock.initializeTopics(topics); + + TS_ASSERT_EQUALS(topics.size(), 3); + mock.setTopics(topics); + TS_ASSERT_EQUALS(mock.getTopics().size(), 3); + + success = mock.processResult(&mrb); + TS_TRACE("all initialized"); + TS_ASSERT_EQUALS(success, false); // could not send, unknown type + + mrb.set("typ", "fms"); + TS_ASSERT_EQUALS(mrb.get("typ"), "fms"); + TS_ASSERT_EQUALS(mock.processResult(&mrb), true); + + } +}; diff --git a/monitord/plugins/libmplugin_activemq/unittest.sh b/monitord/plugins/libmplugin_activemq/unittest.sh new file mode 100755 index 0000000..560a4cd --- /dev/null +++ b/monitord/plugins/libmplugin_activemq/unittest.sh @@ -0,0 +1,24 @@ +#!/bin/bash +# Dreckiges Script zum Ausfuehren der Unittests +CXXTEST=~/dev/app/cxxtest +$CXXTEST/cxxtestgen.pl --error-printer -o runner.cpp MonitorPluginActiveMqTestSuite.h +echo Unittest generated + +cd ../../../ +echo "Compiling ..." +make +cd - +find ../../../ -name "*.o" | xargs -i cp -v {} o/ +# Doppelte Abhaengigkeiten eentfernen +rm o/monitord_plugins_libmplugin_activemq_la-mplugin.o +rm o/monitord_plugins_libmplugin_activemq_la-xml*.o +rm o/monitord_plugins_libmplugin_audiorecorder_la-xml*.o +rm o/monitord_plugins_libmplugin_audiorecorder_la-m*.o +# main() entfernen +rm o/monitord_monitord-Monitor.o +g++ -I /usr/local/include/activemq-cpp-3.4.0 -I $CXXTEST -I ~/dev/app/monitord/trunk -lasound -ldl -lpthread -lm -lactivemq-cpp -o runner o/*.o runner.cpp +echo "Unittest compiled" +# libactivemq-cpp importieren +export LD_LIBRARY_PATH=$LD_LIBRARY_PATH:/usr/local/lib + +./runner