Newer
Older
monitord / monitord / plugins / libmplugin_activemq.cpp
@ckl ckl on 23 Jan 2012 10 KB added monitord-activemq
/**
 * monitord-activemq
 * http://github.com/schakko/monitord-activemq
 * Contains *untested* prototyped code
 */
#include "libmplugin_activemq.h"

using namespace std;
using namespace activemq;
using namespace activemq::core;
using namespace decaf::lang::exceptions;
using namespace cms;

MonitorPlugInActiveMQ::MonitorPlugInActiveMQ()
{
	m_bUseCompression = false;
	m_bClientAck = false;
	m_bConnected = false;
 	m_brokerUri = "tcp://127.0.0.1:61616";
 	m_username = "";
 	m_password = "";
 	m_clientId = "";
	m_destUri = "";
 	m_sendTimeout = 0;
	m_closeTimeout = 0;
	m_producerWindowSize = 0;

	m_genericTopic.bUseTopic = false;
	m_genericTopic.bDeliveryModePersistent = false;
	m_genericTopic.destUri = "";

	m_bDeliveryModePersistent = false;
	m_bTopicsInitialized = false;

	m_session = NULL;
	m_connection = NULL;

	activemq::library::ActiveMQCPP::initializeLibrary();
}

MonitorPlugInActiveMQ::~MonitorPlugInActiveMQ()
{
	Topics::iterator i ;
	TopicInfo *pTopicInfo;
	
  	// Destroy resources.
	for (i = m_topics.begin(); i != m_topics.end(); i++)
	{
		pTopicInfo = i->second;

		try {
			if (pTopicInfo->destination != NULL) { 
				delete pTopicInfo->destination;
			}
       	}
		catch (CMSException& e) { 
			e.printStackTrace(); 
		}
			
		pTopicInfo->destination = NULL;

		try {
			if (pTopicInfo->producer != NULL) {
				delete pTopicInfo->producer;
			}
		}
		catch (CMSException& e) { 
			e.printStackTrace(); 
		}
		
		pTopicInfo->producer = NULL;
	}

	// Close open resources.
	try {
		if (m_session != NULL) {
			m_session->close();
		}

		if (m_connection != NULL) {
			m_connection->close();
		}
	}
	catch (CMSException& e) {
		e.printStackTrace(); 
	}

	try {
		if (m_session != NULL) {
			delete m_session;
		}
	}
	catch (CMSException& e) { 
		e.printStackTrace(); 
	}

	m_session = NULL;

	try {
		if (m_connection != NULL) {
			delete m_connection;
		}
	} 
	catch (CMSException& e) { 
		e.printStackTrace(); 
	}

	m_connection = NULL;
}

void MonitorPlugInActiveMQ::Show()
{
	FILE_LOG(logINFO) << "MonitorActiveMQPlugin successfully loaded" ;
}

bool MonitorPlugInActiveMQ::processResult(class ModuleResultBase *pRes)
{
	FILE_LOG(logDEBUG) << "apachemq: processing Result...";

	if (m_bConnected == false) {
		FILE_LOG(logERROR) << "apachmq: ignoring message 'cause no active connection";
		return false;
	}

	if (m_bTopicsInitialized == false) {
		throw RuntimeException(__FILE__, __LINE__, "processResult must be called AFTER initializeTopics()");
	}


	std::string type = (*pRes)["typ"];

	if (m_topics.find(type) == m_topics.end()) {
		FILE_LOG(logERROR) << "apachemq: received type " << type << " which is not registered";
		return false;
	}
	
	FILE_LOG(logINFO) << "Preparing new message";
	
	TopicInfo* topicInfo = (m_topics.find(type))->second;
	TextMessage* message = m_session->createTextMessage();

	ResultItemsMap::iterator i;

	for (i = (*pRes).m_Items.begin(); i != (*pRes).m_Items.end(); i++) {
		message->setStringProperty(i->first, i->second);
	}


	topicInfo->producer->send(message);
	
	FILE_LOG(logINFO) << "message sent";

	delete message;

	return true;
}

void MonitorPlugInActiveMQ::initializeConfiguration(XMLNode config)
{	
	FILE_LOG(logDEBUG) << "Reading broker URI";
	m_brokerUri 	= getNodeText(config, ACTIVEMQ_XMLNODE_BROKERURI, "tcp://127.0.0.1:61616");
	FILE_LOG(logDEBUG) << "Reading username:";
	m_username 		= getNodeText(config, ACTIVEMQ_XMLNODE_USERNAME, "");
	FILE_LOG(logDEBUG) << "Reading password";
	m_password 		= getNodeText(config, ACTIVEMQ_XMLNODE_PASSWORD, "");
	FILE_LOG(logDEBUG) << "Reading clientId";
	m_clientId 		= getNodeText(config, ACTIVEMQ_XMLNODE_CLIENTID, "");
	FILE_LOG(logDEBUG) << "Reading sendTimeout";
	m_sendTimeout 	= getNodeInt(config, ACTIVEMQ_XMLNODE_SENDTIMEOUT, 0);
	FILE_LOG(logDEBUG) << "Reading closeTimeout";
	m_closeTimeout 	= getNodeInt(config, ACTIVEMQ_XMLNODE_CLOSETIMEOUT, 0);
	FILE_LOG(logDEBUG) << "Reading producerWindowSize";
	m_producerWindowSize 	= getNodeInt(config, ACTIVEMQ_XMLNODE_PRODUCERWINDOWSIZE, 0);
	FILE_LOG(logDEBUG) << "Reading bUseCompression";
	m_bUseCompression 		= getNodeBool(config, ACTIVEMQ_XMLNODE_USECOMPRESSION, false);
	FILE_LOG(logDEBUG) << "Reading bClientAck";
	m_bClientAck 	= getNodeBool(config, ACTIVEMQ_XMLNODE_CLIENTACK, false);
	FILE_LOG(logDEBUG) << "Reading logFile";
	std::string logFile		= getNodeText(config, ACTIVEMQ_XMLNODE_LOGFILE, "screen");
	FILE_LOG(logDEBUG) << "Reading logLevel";
	std::string logLevel	= getNodeText(config, ACTIVEMQ_XMLNODE_LOGLEVEL, "INFO");

	#ifdef WIN32
	if (!(logFile == "screen")) {
		FILE* pFile = fopen(logFile.c_str(), "a");
		Output2FILE::Stream() = pFile;
	}

	FILELog::ReportingLevel() = FILELog::FromString(logLevel);
	FILE_LOG(logINFO) << "logging started";
	#endif
}

Topics MonitorPlugInActiveMQ::getTopics()
{
	return m_topics;
}

void MonitorPlugInActiveMQ::setTopics(Topics& topics)
{
	m_topics = topics;
}

void MonitorPlugInActiveMQ::initializeConnectionFactory(ActiveMQConnectionFactory *connectionFactory)
{
	// Set Broker-URI first - otherwise username and password is lost
	connectionFactory->setBrokerURI(m_brokerUri);

	if (!m_username.empty()) {
		connectionFactory->setUsername(m_username);
	}

	if (!m_password.empty()) {
		connectionFactory->setPassword(m_password);
	}

	if (!m_clientId.empty()) {
		connectionFactory->setClientId(m_clientId);
	}

	connectionFactory->setUseCompression(m_bUseCompression);
	connectionFactory->setSendTimeout(m_sendTimeout);
	connectionFactory->setCloseTimeout(m_closeTimeout);
	connectionFactory->setProducerWindowSize(m_producerWindowSize);
	
	FILE_LOG(logDEBUG) << "Connection factory initialized";
}

bool MonitorPlugInActiveMQ::initializeActiveMqConnection()
{
	auto_ptr<ActiveMQConnectionFactory> connectionFactory(new ActiveMQConnectionFactory());
	initializeConnectionFactory(connectionFactory.get());

	// create a connection
	try {
        m_connection = connectionFactory->createConnection();
       	m_connection->start();

		if (m_bClientAck) {
			m_session = m_connection->createSession(Session::CLIENT_ACKNOWLEDGE);
		} 
		else {
			m_session = m_connection->createSession(Session::AUTO_ACKNOWLEDGE);
		}

		m_bConnected = true;
	} 
	catch (CMSException& e) {
		FILE_LOG(logERROR) << "Could not connect to messaging queue \"" << m_brokerUri << "\" with username=\"" << m_username << "\"";
		m_bConnected = false;
	}

	FILE_LOG(logDEBUG) << "Connection initialized";

	return m_bConnected;
}

bool MonitorPlugInActiveMQ::initProcessing(class MonitorConfiguration* configPtr, XMLNode config)
{
	// initialize ActiveMQ
	activemq::library::ActiveMQCPP::initializeLibrary();

	// read default configuration for topics
	parseTopic(config, m_genericTopic, m_genericTopic);

	initializeConfiguration(config);

	if (initializeActiveMqConnection()) {
		parseTopics(config, m_topics, m_genericTopic);
		initializeTopics(m_topics);
	}

	return m_bConnected;
}

void MonitorPlugInActiveMQ::initializeTopics(Topics &topics)
{
	if (m_bConnected == false) {
		throw RuntimeException(__FILE__, __LINE__, "Tried to initialize topics without established ActiveMQ connection. Call initializeActiveMqConnection() first.");
	}

	Topics::iterator i;
	TopicInfo *pTopicInfo;

	// create new producers
	for (i = topics.begin(); i != topics.end(); i++)
	{
		pTopicInfo = i->second;
		
		if (pTopicInfo->bUseTopic) {
			pTopicInfo->destination = m_session->createTopic(pTopicInfo->destUri);
		} 
		else {
			pTopicInfo->destination = m_session->createQueue(pTopicInfo->destUri);
		}

		pTopicInfo->producer = m_session->createProducer(pTopicInfo->destination);

		if (pTopicInfo->bDeliveryModePersistent) {
			pTopicInfo->producer->setDeliveryMode(DeliveryMode::PERSISTENT);
		} 
		else {
			pTopicInfo->producer->setDeliveryMode(DeliveryMode::NON_PERSISTENT);
		}
		
		FILE_LOG(logINFO) << "Topic destination \"" << pTopicInfo->destination << "\" created";
	}

	FILE_LOG(logINFO) << "Topics initialized";
		
	m_bTopicsInitialized = true;
}

bool MonitorPlugInActiveMQ::quitProcessing() 
{
	return true;
}

void MonitorPlugInActiveMQ::parseTopics(XMLNode config, Topics &topics, TopicInfo &referenceTopic)
{
	XMLNode topicNode;
	Topics::iterator i;
	TopicInfo *pTopicInfo;
	
	// defaults
	vector<string> channels;
	channels.push_back(ACTIVEMQ_KEY_POCSAG);
	channels.push_back(ACTIVEMQ_KEY_ZVEI);
	channels.push_back(ACTIVEMQ_KEY_FMS);
	
	for (unsigned int i = 0, m = channels.size(); i < m; i++)
	{
		pTopicInfo = new TopicInfo;
		initializeTopic(*pTopicInfo, referenceTopic);
		topics.insert(PairMapping(channels.at(i), pTopicInfo));
	}

	int nTopic = config.nChildNode(ACTIVEMQ_XMLNODE_TOPIC);
	
	for (int num = 0; num < nTopic ; ++num)
	{
		if (!((topicNode = config.getChildNode(ACTIVEMQ_XMLNODE_TOPIC, num))).isEmpty()) {
			std::string type = topicNode.getAttribute(ACTIVEMQ_XMLATTR_TYPE) ;
			
			if ((type == ACTIVEMQ_KEY_POCSAG) || (type == ACTIVEMQ_KEY_FMS) || (type == ACTIVEMQ_KEY_ZVEI)) {
				pTopicInfo = (topics.find(type))->second;
				parseTopic(topicNode, *pTopicInfo, referenceTopic);
			}
		}
	}
}

void MonitorPlugInActiveMQ::initializeTopic(TopicInfo &topicInfo, TopicInfo &referenceTopic)
{
	topicInfo.bUseTopic = referenceTopic.bUseTopic;
	topicInfo.bDeliveryModePersistent = referenceTopic.bDeliveryModePersistent;
	topicInfo.destUri = referenceTopic.destUri;
	topicInfo.destination = NULL;
	topicInfo.producer = NULL;
}

void MonitorPlugInActiveMQ::parseTopic(XMLNode config, TopicInfo &topicInfo, TopicInfo &referenceTopic)
{
	initializeTopic(topicInfo, referenceTopic);

	if (config.nChildNode(ACTIVEMQ_XMLNODE_USETOPIC) >= 1) {
		topicInfo.bUseTopic = getNodeBool(config, ACTIVEMQ_XMLNODE_USETOPIC, false);
	}

	if (config.nChildNode(ACTIVEMQ_XMLNODE_DELIVERYMODEPERSISTENT) >= 1) {
		topicInfo.bDeliveryModePersistent = getNodeBool(config, ACTIVEMQ_XMLNODE_DELIVERYMODEPERSISTENT, false);
	}

	if (config.nChildNode(ACTIVEMQ_XMLNODE_DESTURI) >= 1) {
		topicInfo.destUri = getNodeText(config, ACTIVEMQ_XMLNODE_DESTURI, "monitord");
	}
}

MonitorPlugInActiveMQFactory::MonitorPlugInActiveMQFactory()
{
}

MonitorPlugInActiveMQFactory::~MonitorPlugInActiveMQFactory()
{
}

MonitorPlugIn * MonitorPlugInActiveMQFactory::CreatePlugIn()
{
	return new MonitorPlugInActiveMQ();
}


DLL_EXPORT void * factory0( void )
{
	return new MonitorPlugInActiveMQFactory;
}