Newer
Older
monitord / monitord / plugins / libmplugin_activemq.h
#ifndef MPLUGINACTIVEMQ_H_
#define MPLUGINACTIVEMQ_H_

#ifdef PLUGINS
#include <typeinfo>
#include <iostream>

#ifdef WIN32
#define usleep Sleep
#include <windows.h>
#endif

#include "mplugin.h"
#include "base64.h"
#include "../MonitorLogging.h"
#include <activemq/core/ActiveMQConnectionFactory.h>
#include <activemq/core/ActiveMQConnection.h>
#include <activemq/core/ActiveMQSession.h>
#include <activemq/util/Config.h>
#include <activemq/library/ActiveMQCPP.h>
#include <cms/Connection.h>
#include <cms/Session.h>
#include <cms/TextMessage.h>
#include <cms/BytesMessage.h>
#include <cms/MapMessage.h>
#include <cms/ExceptionListener.h>
#include <cms/MessageListener.h>
#include <activemq/commands/ActiveMQTextMessage.h>

#define ACTIVEMQ_KEY_POCSAG "pocsag"
#define ACTIVEMQ_KEY_FMS "fms"
#define ACTIVEMQ_KEY_ZVEI "zvei"
#define ACTIVEMQ_XMLNODE_BROKERURI "brokerUri"
#define ACTIVEMQ_XMLNODE_USERNAME "username"
#define ACTIVEMQ_XMLNODE_PASSWORD "password"
#define ACTIVEMQ_XMLNODE_CLIENTID "clientId"
#define ACTIVEMQ_XMLNODE_SENDTIMEOUT "sendTimeout"
#define ACTIVEMQ_XMLNODE_CLOSETIMEOUT "closeTimeout"
#define ACTIVEMQ_XMLNODE_PRODUCERWINDOWSIZE "producerWindowSize"
#define ACTIVEMQ_XMLNODE_USECOMPRESSION "useCompression"
#define ACTIVEMQ_XMLNODE_CLIENTACK "clientAck"
#define ACTIVEMQ_XMLNODE_LOGFILE "logfile"
#define ACTIVEMQ_XMLNODE_LOGLEVEL "loglevel"
#define ACTIVEMQ_XMLNODE_TOPIC "topic"
#define ACTIVEMQ_XMLATTR_TYPE "type"
#define ACTIVEMQ_XMLNODE_USETOPIC "useTopic"
#define ACTIVEMQ_XMLNODE_DELIVERYMODEPERSISTENT "deliveryModePersistent"
#define ACTIVEMQ_XMLNODE_DESTURI "destUri"


typedef struct
{
	bool bUseTopic;
	bool bDeliveryModePersistent;
	std::string destUri;
	cms::Destination *destination;
	cms::MessageProducer *producer;
} TopicInfo;

typedef std::map<std::string,TopicInfo*> Topics;
typedef std::pair<std::string,TopicInfo*> PairMapping;

class MonitorPlugInActiveMQ : public MonitorPlugIn, public cms::ExceptionListener
{
public:
	MonitorPlugInActiveMQ();
	~MonitorPlugInActiveMQ();
 	std::string m_brokerUri;
 	std::string m_username;
 	std::string m_password;
 	std::string m_clientId;
	std::string m_destUri;
 	unsigned int m_sendTimeout;
	unsigned int m_closeTimeout;
	unsigned int m_producerWindowSize;
	bool m_bUseCompression;
	bool m_bClientAck;
	bool m_bDeliveryModePersistent;
 	bool m_bConnected;
	bool m_bTopicsInitialized;

	std::auto_ptr<activemq::core::ActiveMQConnectionFactory> m_connectionFactory;
	activemq::core::ActiveMQConnection* m_connection;
	activemq::core::ActiveMQSession* m_session;

	TopicInfo m_genericTopic;
	Topics m_topics;

	bool initProcessing(class MonitorConfiguration* configPtr,XMLNode config);
	bool processResult(class ModuleResultBase *pRes);
	void updateTextMessage(cms::TextMessage& textMessage, class ModuleResultBase& pRes);
	bool quitProcessing();
	void Show();

	void shutdownActiveMQCPPLibrary();
	void initializeActiveMQCPPLibrary();
	void initializeConfiguration(XMLNode config);
	void initializeConnectionFactory();

	bool establishConnection();
	bool initializeConnection();
	void freeConnection();

	void initializeTopics(Topics &topics);
	void freeTopics();

	void parseTopics(XMLNode config, Topics &topics, TopicInfo &referenceTopic);
	void parseTopic(XMLNode config, TopicInfo &topicInfo, TopicInfo &referenceTopic);
	void initializeTopic(TopicInfo &topicInfo, TopicInfo &referenceTopic);

	/** Exception-Listener */
	virtual void onException(const cms::CMSException& ex AMQCPP_UNUSED);
};

class MonitorPlugInActiveMQFactory : public MonitorPlugInFactory
{
public:
	MonitorPlugInActiveMQFactory();
	~MonitorPlugInActiveMQFactory();
	virtual MonitorPlugIn * CreatePlugIn();
};



#endif
#endif /*MPLUGINACTIVEMQ_H_*/