Newer
Older
monitord / monitord / .svn / text-base / SocketServer.cpp.svn-base
@root root on 23 Jan 2012 19 KB Migration from SVN revision 455
#include "SocketServer.h"
#include "SocketThreadMonitord.h"
#include "SocketThreadFMS32.h"
#include "SocketThreadCrusader.h"

#ifndef WIN32
	#include <sys/param.h>
	#include <arpa/inet.h> // fuer inet_ntoa
#else
	#define usleep Sleep
	#define socklen_t int
#endif

#include "memlock.h"
#include "MonitorModulesResults.h"
#include "base64.h"
#include "convert.h"
#include "Monitor.h"
#include "MonitorLogging.h"

#include <stdio.h>
#include <fcntl.h>
#include <errno.h>

#ifdef HAVE_CONFIG_H
#include <config.h>
#endif

using namespace std;

// Vielleicht sollte man das hier spaeter mal auslagern ...
// TODO Auslagern in externe Dateien (.h, .cpp)-Paar
//

const int SERVER_PORT = 9333 ;

typedef struct {long tv_sec; long tv_usec;} timval;




SocketServer::SocketServer(MonitorConfiguration *config, int iLockStartwert)
{
	m_MonitorConfiguration=config ;
	m_ServerModus=SocketThread::monitord ;
	m_iLockStartwert=iLockStartwert ;
	m_bWantStop=false ;
}


SocketServer::SocketServer(MonitorConfiguration *config, std::string FilterFileName, int iLockStartwert)
{
	m_MonitorConfiguration=config ;
	m_ServerModus=SocketThread::monitord ;
	m_iLockStartwert=iLockStartwert ;
	m_bWantStop=false ;
        #ifdef LUA
	m_bUseLUAScript=false ;
        #endif

	if (! FilterFileName.empty()) {

		#ifdef LUA
			// LUA TEST
			try
			{
				L = lua_open() ;
				luaL_openlibs(L) ;


				if(luaL_loadfile(L, FilterFileName.c_str()))
				{
						throw std::string(std::string(lua_tostring(L, -1)));
				}

				if (lua_pcall(L, 0, 0, 0))
				{
					FILE_LOG(logERROR) << "LUA test fehlgeschlagen" << endl ;
				}

				m_bUseLUAScript=true ;
				FILE_LOG(logINFO) << "Successfully loaded LUA filter: " << FilterFileName ;
			}
			catch (const std::string &e)
			{
				FILE_LOG(logERROR) << "Error loading lua script: "  << e;
			}

		#endif
	}

}


SocketServer::~SocketServer()
{
	// Beim SocketManager abmelden
	GetSocketsManager()->removeModule(this);

	#ifdef LUA
		if (L!=NULL)
		{
			lua_close(L) ;
		}
	#endif
}


bool SocketServer::createListeningSocket()
{
		bool result=false;
#ifdef _WIN32
    /* Initialisiere TCP fuer Windows ("winsock") */
    short wVersionRequested;
    WSADATA wsaData;
    wVersionRequested = MAKEWORD (1, 1);
    if (WSAStartup (wVersionRequested, &wsaData) != 0) {
    	ThrowMonitorException("Failed to init windows sockets (WSAStartup)") ;
        //fprintf( stderr, "Failed to init windows sockets\n");
        //exit(1);
    }
#endif

    /* Erzeuge das Socket */
    m_sock = socket( PF_INET, SOCK_STREAM, 0);
    if (m_sock < 0) {
    	ThrowMonitorException("Failed to create listening socket") ;
        //perror( "failed to create socket");
        //exit(1);
    }


	switch (m_ServerModus)
	{
	case SocketThread::fms32pro:
		m_iPort=m_MonitorConfiguration->m_PortFMS32Pro ;
		break ;
	case SocketThread::crusader:
		m_iPort=m_MonitorConfiguration->m_PortCrusader ;
		break ;
	case SocketThread::monitord:
	default:
		m_iPort=m_MonitorConfiguration->m_Port ;
		break ;
	}

	if ( (m_iPort<=0) && (m_iPort >65535))
	{
		if (m_ServerModus==SocketThread::monitord)
		{
			m_iPort=SERVER_PORT ;
		} else {
			m_iPort=0 ;
		}
	}
    /* Erzeuge die Socketadresse des Servers
     * Sie besteht aus Typ und Portnummer */
    memset( &m_server, 0, sizeof (m_server));
    m_server.sin_family = AF_INET;

    if (m_MonitorConfiguration->m_BindIP=="")
    {
    	m_server.sin_addr.s_addr = htonl( INADDR_ANY);
    } else {
    	m_server.sin_addr.s_addr = (inet_addr(m_MonitorConfiguration->m_BindIP.c_str()));
    }
    m_server.sin_port = htons( m_iPort);

	int on = 1 ;
#ifndef WIN32
    setsockopt(m_sock,SOL_SOCKET,SO_REUSEADDR,&on,sizeof(on)) ;
#else
	setsockopt(m_sock,SOL_SOCKET,SO_REUSEADDR,(char*) &on,sizeof(on)) ;
#endif

    /* Erzeuge die Bindung an die Serveradresse
     * (d.h. an einen bestimmten Port) */
    if (bind( m_sock, (struct sockaddr*)&m_server, sizeof( m_server)) < 0) {
		// Verbindungsfehler ignorieren, damit die Nachrichten auf jeden Fall
		// aus dem ModuleResultSet "gepumpt" werden !
		//
        //perror( "can't bind socket");
        //exit(1);
	} else {
		/* Teile dem Socket mit, dass Verbindungswuensche
		 * von Clients entgegengenommen werden */
		listen( m_sock, 10);
		result=true ;

		/**
		 * In globaler ManagerListe eintragen/anmelden
		 */
		GetSocketsManager()->addModule(this) ;
	}


	return result ;
}

void *SocketServer::Thread()
{
	this->ThreadStarted() ; // Erstmal Bescheid geben, dass wir laufen

	if (createListeningSocket())
	{
		initSocketThreads() ;
	}


	if (memLockOpen( 12345, & m_queueLock) < 0) {
		ThrowMonitorException("memLockOpen failed") ;
    }

	while (!m_bWantStop)
	{
	    /* Bearbeite die Verbindungswünsche von Clients
	     * in einer Endlosschleife
	     * Der Aufruf von accept() blockiert solange,
	     * bis ein Client Verbindung aufnimmt, oder der timeout abgelaufen ist */

	   	fd_set fdset ;
	    timeval tv ;
	    int result=0 ;
	    int maxHandle ;
	    m_bWantStop=false ;
	    while ((result==0) && (!m_bWantStop))
	    {
	    	tv.tv_sec=0; // Wartezeit auf Socketereignis, danach Kontrolle, ob ein "Sendeauftrag" vorliegt
	    	tv.tv_usec=100000 ;
	    	FD_ZERO(&fdset) ;   // Den Socket m_sock ueberwachen ...
	   		FD_SET(m_sock,&fdset);

	   		// Diese Zeile sollte man NICHT mit "1" uebersetzen !
	   		// Das geht voll daneben. Es muss m_sock+1 sein ! Auch bei einem (!) Socket !
	   		maxHandle = m_sock + 1;

	    	result= select(maxHandle, &fdset, &fdset, &fdset, &tv); // wartet Zeitraum wie in tv definiert

			if (FD_ISSET(m_sock,&fdset)>0) // Socket ereignis ?
			{
				result=1 ;
			}

			// Neue Verbindungsanfrage ?
			// Dann annehmen und als eigenen Thread laufen lassen
			//
			if (result>0)
			{
				int useSocket=-1 ;

				// Jetzt freien SocketThread suchen
				for (int i=0;i<MAX_CLIENTS;++i)
				{
					if (!socketThread[i]->IsRunning())
					{
						useSocket=i ;
						i=MAX_CLIENTS ;
					}
				}

				if ((useSocket>=MAX_CLIENTS) || (useSocket<0))
				{
					// TODO zu viele Clients !
					ThrowMonitorException("Too many client connects") ;
					perror("Zu viele Clients !\r\n") ;
					exit (1) ;
				}

				socklen_t sin_size = sizeof (sockaddr_in);
				int fd = accept(m_sock,(sockaddr*) &socketThread[useSocket]->m_client,&sin_size);
				FILE_LOG(logINFO) << "new connection from " << inet_ntoa(socketThread[useSocket]->m_client.sin_addr) ;

				// Thread mit dem oben angenommenen Socket starten
				socketThread[useSocket]->setFD(fd) ;
				socketThread[useSocket]->Start() ;
			}
		}
	}

	// Listening Socket beenden
	closesocket( m_sock);

	// Alle Clients beenden
	FILE_LOG(logINFO) << "Beende alle Clients"  ;
	for (int i=0;i<MAX_CLIENTS;++i)
	{
		FILE_LOG(logDEBUG) << "beende client " << i  ;
		if (socketThread[i]->IsRunning())
		{
			socketThread[i]->closeSocket() ;
			FILE_LOG(logINFO) << i << ": closesocket done" ;
			socketThread[i]->Kill() ;
			FILE_LOG(logINFO) << i << ": kill done" ;
		}
	}

	// uns selbst beenden
	usleep(1000) ;
	//JThread::Kill() ;
	return NULL ;
}


void SocketServer::initSocketThreads()
{
	int i ;
	for (i=0;i<MAX_CLIENTS;i++)
	{
		switch (m_ServerModus)
		{
			case SocketThread::monitord:
				socketThread[i]=new SocketThreadMonitord(m_MonitorConfiguration, m_iLockStartwert+i,i+1) ;
				break ;
			case SocketThread::fms32pro:
				socketThread[i]=new SocketThreadFMS32(m_MonitorConfiguration,m_iLockStartwert+i,i+1) ;
				break ;
			case SocketThread::crusader:
				socketThread[i]=new SocketThreadCrusader(m_MonitorConfiguration,m_iLockStartwert+i,i+1) ;
				break;
			default:
				break ;
		}
	}
}


void SocketServer::addResult(ModuleResultBase* pRes)
{
	// Nun f?r die nachgeordneten Threads die Ausgabe nach den Uebergabewerte zusammenbauen
	//
	#ifdef LUA
		char eins[255],zwei[255] ;
	#endif

	// Wenn ja, Ereignis an alle Threads weitergeben
	for (int i=0;i<MAX_CLIENTS;++i)
	{
		if (socketThread[i]->IsRunning())
		{
			m_bSkipDispatching=false ;
			// erstmal ggf. LUA Filter aufrufen
				#ifdef LUA

				if (m_bUseLUAScript==true)
				{
					int z ;
				      /* push functions and arguments */
				     lua_getglobal(L, "filter");  /* function to be called */
				     // start array structure
					   lua_newtable( L );
					   int numCount=1 ;
						for (ResultItemsMap::iterator iter=pRes->m_Items.begin(); iter!=pRes->m_Items.end(); ++iter)
		 				{
		 				  {
						   memset(eins,0,200) ;
						   memset(zwei,0,200) ;
						   strncpy(eins,iter->first.c_str(),199) ;
						   strncpy(zwei,iter->second.c_str(),199) ;

						   lua_pushstring( L, eins );
						   lua_pushstring( L, zwei );
						   lua_rawset( L, -3 );
						   numCount++ ;
		 				  }
		 				}

		 				// noch clientIP, authenticated und loginname dazupacken

		 				// Authenticated
		 				memset(eins,0,200) ;
						memset(zwei,0,200) ;
						strncpy(eins,"client_authenticated",199) ;
						if (socketThread[i]->isClientAuthenticated())
						{
							strncpy(zwei,"1",199) ;
						} else {
							strncpy(zwei,"0",199) ;
						}
						lua_pushstring( L, eins );
						lua_pushstring( L, zwei );
						lua_rawset( L, -3 );
						numCount++ ;

		 				// ClientIP
		 				memset(eins,0,200) ;
						memset(zwei,0,200) ;
						strncpy(eins,"client_ip",199) ;
						strncpy(zwei,socketThread[i]->getClientIP().c_str(),199) ;
						lua_pushstring( L, eins );
						lua_pushstring( L, zwei );
						lua_rawset( L, -3 );
						numCount++ ;

		 				// Loginname
		 				memset(eins,0,200) ;
						memset(zwei,0,200) ;
						strncpy(eins,"client_loginname",199) ;
						strncpy(zwei,socketThread[i]->getClientLogin().c_str(),199) ;
						lua_pushstring( L, eins );
						lua_pushstring( L, zwei );
						lua_rawset( L, -3 );
						numCount++ ;

		 				// Clienttype
		 				memset(eins,0,200) ;
						memset(zwei,0,200) ;
						strncpy(eins,"client_type",199) ;
						switch (m_ServerModus)
						{
							case SocketThread::monitord:
								strncpy(zwei,"monitord",199) ;
								break ;
							case SocketThread::fms32pro:
								strncpy(zwei,"fms32",199) ;
								break ;
							case SocketThread::crusader:
								strncpy(zwei,"crusader",199) ;
								break ;
							default:
								strncpy(zwei,"unknown",199) ;
								break ;
						}
						lua_pushstring( L, eins );
						lua_pushstring( L, zwei );
						lua_rawset( L, -3 );
						numCount++ ;

					   // set the number of elements (index to the last array element)
					   lua_pushliteral( L, "n" );
					   lua_pushnumber( L, numCount-1 );
					   lua_rawset( L, -3 );

					   // set the name of the array that the script will access
					   lua_setglobal( L, "arg" );

				      /* do the call (2 arguments, 1 result) */
				      if (lua_pcall(L, 0, LUA_MULTRET, 0) != 0) {
				      	FILE_LOG(logERROR) << "Fehler beim Aufruf lua dispatcher script:" << lua_tostring(L, -1);
				        //error(L, "error running function `f': %s",
				        //         lua_tostring(L, -1));
					  }

				      /* retrieve result */
				      if (!lua_isnumber(L, -1)) {
				      	FILE_LOG(logERROR) << "nicht-numerische Antwort vom lua dispatcher script" ;
				        //error(L, "function `f' must return a number");
					  }
				      z = lua_tonumber(L, -1);
				      lua_pop(L, 1);  /* pop returned value */
					  FILE_LOG(logDEBUG1) << "lua Result (global dispatcher)" << z ;

					  if (z==1) m_bSkipDispatching=true ;
				}
				#endif

				if (m_bSkipDispatching==false)
				{
					socketThread[i]->addResult(pRes) ;
				}
		}

	}
}


/*
 * -----------------------------------------------------------
 * ThreadBase
 * -----------------------------------------------------------
 */

ThreadBase::~ThreadBase()
{
}

ThreadBase::ThreadBase(int locknum)
{
	m_iLockNum= locknum ;
	m_exitThread=false ;
}

bool ThreadBase::createLock()
{
	if ( memLockCreate( m_iLockNum, &m_Lock) < 0) {

		ThrowMonitorException("memLockCreate failed for " + convertIntToString(m_iLockNum)) ;
    }
    return true;
}

void ThreadBase::releaseLock()
{
	 memLockDestroy(m_Lock) ;

}


/*
 * -----------------------------------------------------------
 * SocketThread
 * -----------------------------------------------------------
 */

SocketThread::~SocketThread()
{
}

SocketThread::SocketThread(MonitorConfiguration *config, int locknum, int PortNum, SocketMode ServerMode)
: ThreadBase(locknum)
{
	m_MonitorConfiguration = config;
	m_iPortNum = PortNum ;
	m_ServerMode = ServerMode ;
	ResetThreadVars() ;
}

void SocketThread::setFD(int fd)
{
	m_fd=fd ;
}

void SocketThread::ResetThreadVars()
{
	// Zuruecksetzen:
	// Kein Socket zugewiesen
	// Client ist nicht angemeldet
	// Thread soll sich nicht beenden
	//

	m_fd=0 ;
	m_authenticated=false ;
	m_exitThread=false ;

	// Commandbuffer loeschen
	memset(m_CommandBuffer,0,MAX_COMMANDLINE) ;

}

void *SocketThread::Thread()
{
	if (m_fd==0)
	{
		ThrowMonitorException("SocketThread started wird fd=NULL") ;
		return NULL ;
	}

	this->ThreadStarted() ; // Erstmal Bescheid geben, dass wir laufen
	createLock() ;
	createSocket() ;

	FILE_LOG(logINFO) << "SocketThreads exits" ;

	releaseLock() ;
	return NULL;
}


std::string SocketThread::createZVEIOutputString(ModuleResultBase Result)
{
	std::string socketText="" ;
	return socketText ;
}


std::string SocketThread::createPOCSAGOutputString(ModuleResultBase Result)
{
	std::string socketText="" ;
	return socketText ;
}

std::string SocketThread::createFMSOutputString(ModuleResultBase Result)
{
	std::string socketText="" ;

	return socketText ;
}

std::string SocketThread::createOutputString(ModuleResultBase Result)
{
	std::string socketText="" ;

	if (Result["typ"]=="fms")
	{
		socketText=createFMSOutputString(Result) ;
	}
	else if (Result["typ"]== "pocsag")
	{
		socketText=createPOCSAGOutputString(Result) ;
	}
	else if (Result["typ"]=="zvei")
	{
		socketText=createZVEIOutputString(Result) ;
	}

	return socketText ;
}
void SocketThread::addResult(ModuleResultBase* pRes)
{
	std::string outText ;

	outText=createOutputString(*pRes);
	addOutputText(outText) ;
}

void SocketThread::addOutputText(std::string outText)
{
	memLock(m_Lock) ;
	if (!(outText==""))
	{
		m_outputStrings.insert(m_outputStrings.begin(),outText) ;
	}
	memUnlock(m_Lock) ;
}

void SocketThread::say(const std::string& something)
{
	unsigned int len=send( m_fd, something.c_str(), something.length(), 0);
	if (len!=something.length())
	{
		FILE_LOG(logERROR) << "error sending date to client. thread exiting"  ;
		doLogout() ;
	}
}

void SocketThread::say(const char *something)
{
	unsigned int len = send( m_fd, something, strlen(something), 0);
	if (len!=strlen(something))
	{
		FILE_LOG(logERROR) << "error sending date to client. thread exiting"  ;
		doLogout() ;
	}
}

void SocketThread::processInput()
{

}


void SocketThread::doLogout()
{
	m_exitThread=true ;
}


void SocketThread::sayWelcome()
{
}

void SocketThread::sayGoodbye()
{
}

void SocketThread::createSocket()
{
	timeval tv ;

    if (m_fd < 0) {
    	ThrowMonitorException("accept failed") ;
    	//perror( "accept failed");
        //exit(1);
    }

	// Hier ist die Sitzung noch nicht "authenticated"
	fd_set fdset ;

    FD_ZERO(&fdset) ;
    FD_SET(m_fd,&fdset);

    m_sClientIP= inet_ntoa(m_client.sin_addr) ;

	m_authenticated=false ;
	// Gültige IP Adresse, die sich nicht anmelden muss ?
	if (m_MonitorConfiguration->IsValidLogin("","",this->m_sClientIP))
	{
		FILE_LOG(logINFO) << "login authentication (ip allowed): " << m_sClientIP ;
		this->m_authenticated=true ;
	}

	// FMS32Pro ist dafuer zu doof ;-)
	//
	if ((m_ServerMode==SocketThread::fms32pro))
	{
		// TODO: IP Liste auch bei FMS32 anwenden ? dann nachfolgendes auskommentiert lassen
		// this->m_authenticated=true ;
	}

	m_CommandBuffer[0]='\0';

	// Wir sagen natuerlich brav "Guten Tag" ...
	sayWelcome() ;

	fd_set fdset_write ;
	fd_set fdset_exceptions ;


    for (;;) {
    	if (m_exitThread)
    	{
    		break ;
    	}

        // Fuer eine Sekunde auf Meldung am Port warten
        tv.tv_sec=0 ;
        tv.tv_usec=10000 ;
        FD_ZERO(&fdset) ;
        FD_SET(m_fd,&fdset);

        // Neu: write + exceptions
        FD_ZERO(&fdset_write) ;
    	FD_ZERO(&fdset_exceptions) ;
		//FD_SET(m_fd,&fdset_write);
    	FD_SET(m_fd,&fdset_exceptions);


        int result = select(1, &fdset, NULL, &fdset_exceptions, &tv);

		if (FD_ISSET(m_fd,&fdset)>0) // Socket ereignis ?
		{
			FILE_LOG(logDEBUG) << "Socket reports read event" ;
			result=1 ;
		}

		if (FD_ISSET(m_fd,&fdset_write)>0) // Problem ?
		{
			FILE_LOG(logDEBUG) << "Socket reports write event" ;
			result=0 ;
			//m_exitThread=true ;
		}

		if (FD_ISSET(m_fd,&fdset_exceptions)>0) // Problem ?
		{
			FILE_LOG(logDEBUG) << "Socket reports exception event" ;
			result=0 ;
			m_exitThread=true ;
		}

        if (result>0)
        {
        	char buffer[RECV_BUFFER] ;
			char * posPtr ;
			int gelesen ;
			gelesen=recv (m_fd, buffer, RECV_BUFFER-1, 0) ;
        	if (gelesen<=0) // Nix am Port, aber doch Port Event ?
			{
				FILE_LOG(logINFO) << "recv()<=0 => socketthread exiting"  ;
				m_exitThread=true ;
			}
			buffer[gelesen]='\0';

			/* gelesenen Puffer an evtl. vorhanden Reste im Kommandopuffer anhängen */
        	strncat(m_CommandBuffer,buffer,MAX_COMMANDLINE-strlen(m_CommandBuffer)-1) ;

			/* Erst wenn mindestens eine Eingabezeile empfangen wurde, werden alle vollständigen Zeilen ausgewertet */
			while ((posPtr=strstr(m_CommandBuffer,"\r\n"))>0)
			{
				/* Puffer nach dem CRLF zwischenspeichern */
				char tempbuffer[MAX_COMMANDLINE] ;
				tempbuffer[0]='\0';
				strncpy(tempbuffer,&posPtr[2],MAX_COMMANDLINE) ;

				/* Kommando mit dem Zeilenende beenden */
				posPtr[0]='\0';

				processInput() ;

				/* restlichen Puffer nach dem CRLF wiederherstellen */
				strncpy(m_CommandBuffer,tempbuffer,MAX_COMMANDLINE) ;
			}

        } else
        {
     		// Nix tun
        }

        // Gibt es etwas zu senden ?
        //
		memLock(m_Lock) ;
		while (!m_outputStrings.empty())
		{
			std::string retString=m_outputStrings.back() ;

			if (m_authenticated)
			{
				retString.push_back('\r') ;
				retString.push_back('\n') ;
				say(retString) 	;
			}
			m_outputStrings.pop_back() ;
		}
		memUnlock(m_Lock) ;
    }

    closesocket(m_fd);
    ResetThreadVars() ;
}

std::string SocketThread::getClientIP()
{
	return (m_sClientIP) ;
}

std::string SocketThread::getClientLogin()
{
	return (m_loginname) ;
}

bool SocketThread::isClientAuthenticated()
{
	return (m_authenticated) ;
}

void SocketThread::closeSocket()
{
	closesocket( m_fd);
}


bool SocketThread::paramIsHex(int param)
{
	// TODO
	return true ;
}

bool SocketThread::paramIsBase64(int param)
{
	// TODO
	return true ;
}


bool SocketThread::HexToString(int param, std::string &result)
{
	return convertHexToString(m_cmdParam[param],result);
}

// ----------------------------------------
//MonitorSocketsManager

MonitorSocketsManager *GlobalMonitorSocketsManager=NULL;
MonitorSocketsManager* GetSocketsManager() {
	if (GlobalMonitorSocketsManager==NULL) {
		GlobalMonitorSocketsManager = new MonitorSocketsManager();
	}
	return (GlobalMonitorSocketsManager);
}

MonitorSocketsManager::MonitorSocketsManager()
{
	FILE_LOG(logDEBUG) << "SocketManager erstellt"  ;

	if ( memLockCreate( 12347, & m_MemLock) < 0) {
   		ThrowMonitorException("SocketsManager: memLockCreate failed") ;
    }

    m_bStop=false ;
}


MonitorSocketsManager::~MonitorSocketsManager()
{
}


bool MonitorSocketsManager::addModule(SocketServer* pServer)
{
	m_Modules.push_back(pServer) ;

	return true ;
}


bool MonitorSocketsManager::dispatchResult(ModuleResultBase *pRes)
{
	tMonitorSocketServerVector::iterator i ;

	for (i= m_Modules.begin(); i< m_Modules.end(); i++)
	{
			(*i)->addResult(pRes) ;
	}
	return true ;
}


bool MonitorSocketsManager::removeModule(SocketServer* pServer)
{
	return true ;
}