/*
* libmplugin_mqtt.cpp
*
* Created on: 06.01.2017
* Author: kalle
*/
#include "libmplugin_mqtt.h"
#include <iostream>
#include "../MonitorLogging.h"
#include "../MonitorExceptions.h"
#ifndef DD
#define DD(msg) std::cout << "[" << __FUNCTION__ << "]: " << msg << std::endl
#endif
#ifndef nullptr
#define nullptr NULL
#endif
MonitorPlugInMQTT::MonitorPlugInMQTT()
{
DD("start");
// init vars, TODO: use c++11 declaration in header file
bConnected = false;
pMQTT = nullptr;
strTopic = "";
}
MonitorPlugInMQTT::~MonitorPlugInMQTT()
{
DD("start");
}
bool MonitorPlugInMQTT::initProcessing(class MonitorConfiguration* configPtr, XMLNode config)
{
std::string hostname;
unsigned int port;
int ret = 0;
hostname = getNodeText(config, "hostname", "localhost");
port = getNodeInt(config, "port", 3366);
strTopic = getNodeText(config, "topic", "monitord");
LOG_INFO("using mqtt host: " << hostname << " at port: " << port << " trying to connect");
if (pMQTT == nullptr)
{
pMQTT = new myMQTT();
}
// connect to mqtt server
check(pMQTT->connect_async(hostname.c_str(), port), "Could not connect to MQTT server!");
check(pMQTT->loop_start(), "Could not start loop!");
// transmit version string
std::string topic = strTopic + "/version";
std::string msg = VERSION;
pMQTT->publish(NULL, topic.c_str(), msg.length(), msg.c_str());
// logging output
LOG_INFO("connected successfull");
return true;
}
bool MonitorPlugInMQTT::processResult(class ModuleResultBase *pRes)
{
std::string msg = "";
std::string typ = (*pRes)["typ"];
std::string topic = strTopic;
LOG_DEBUG("MQTT processing result");
if (pMQTT == nullptr)
{
LOG_ERROR("disconected from MQTT server");
return false;
}
topic += "/";
topic += typ;
msg += "{";
if (typ == "fms")
{
msg += "";
}
else if (typ == "pocsag")
{
msg += "";
}
else if (typ == "zvei")
{
msg += "\"zvei\":\"" + (*pRes)["zvei"] + "\"";
}
msg += "}";
LOG_DEBUG(msg);
// transmit message
//check(pMQTT->publish(NULL, topic.c_str(), msg.length(), msg.c_str()), "Could not publish", false);
pMQTT->send(topic, msg);
return true;
}
bool MonitorPlugInMQTT::quitProcessing()
{
// todo: check for failure
if (pMQTT == nullptr)
{
return true;
}
// disable loop
check(pMQTT->loop_stop(), "Could not stop loop", false);
// disconnect from server
check(pMQTT->disconnect(), "Could not disconnect", false);
LOG_INFO("disconnecting");
return true;
}
void MonitorPlugInMQTT::Show()
{
DD("start");
}
void MonitorPlugInMQTT::check(const int returnValue, const std::string& errorMessage, const bool throwException)
{
if (returnValue != MOSQ_ERR_SUCCESS)
{
LOG_ERROR(errorMessage);
if (throwException)
{
MonitorException(errorMessage.c_str());
}
}
}
// +++++ myMQTT +++++ //
myMQTT::myMQTT() : mosqpp::mosquittopp(NULL, true)
{
}
myMQTT::~myMQTT()
{
}
void myMQTT::send(const std::string& topic, const std::string& message)
{
publish(NULL, topic.c_str(), message.length(), message.c_str());
}
void myMQTT::on_connect(int rc)
{
LOG_INFO("on_connect");
}
void myMQTT::on_disconnect(int rc)
{
LOG_INFO("on_disconnect");
}
void myMQTT::on_message(const struct mosquitto_message *message)
{
char payload[message->payloadlen];
memcpy(payload, message->payload, message->payloadlen);
std::string topic = message->topic;
std::string msg = payload;
LOG_DEBUG("Got message: " << topic << " - " << msg);
}
void myMQTT::on_subcribe(int mid, int qos_count, const int *granted_qos)
{
DD("on_subcribe");
}