Commit fa61066b authored by Giacomo Strangolino's avatar Giacomo Strangolino
Browse files

supervisor receives messages from ca-proxy

parent 35408429
......@@ -12,6 +12,8 @@ src/casupjsoniz.cpp
src/casupjsoniz.h
src/casuplog.cpp
src/casuplog.h
src/casupreceiver.cpp
src/casupreceiver.h
src/casuprecoverxmitevent.cpp
src/casuprecoverxmitevent.h
src/casupredistrib.cpp
......
user=giacomo
host=localhost
pass=giacomo
dbnam=cadb
#include "ca-supervisor.h"
#include "casupjsoniz.h"
CaSupervisor::CaSupervisor() {
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <netdb.h>
#include <errno.h>
#include <unistd.h>
#include <arpa/inet.h> // inet_ntop
#include <culog.h>
#include <cadbhandle.h>
#include <cajson-src-bundle-ex.h>
#define _BUFSIZ 512
class CaSupervisorPrivate {
public:
CaSupervisorPrivate(CaDbH* h, CuLogImplI *l) : log(l) , dbh(h) {}
CuLogImplI *log;
CaDbH *dbh;
};
CaSupervisor::CaSupervisor(CaDbH *dbh, CuLogImplI *log) {
d = new CaSupervisorPrivate(dbh, log);
}
CaSupervisor::~CaSupervisor() {
delete d;
}
......@@ -9,6 +34,34 @@ void CaSupervisor::onProgress(int step, int total, const CuData &data) { }
void CaSupervisor::onResult(const CuData &data) {
printf("CaSupervisor.onResult: received %s\n", datos(data));
if(data.containsKey("data")) {
const std::string& s = data.s("data");
CaJsonSrcBundleExtract bux;
std::list<CuData> dl;
std::string chan, id, global_m;
bux.extract(s, dl, &chan, &id, &global_m);
if(bux.error_msg.length() > 0) {
d->log->write("ca-supervisor", "Json error: " + bux.error_msg + " | msg payload " + s, CuLog::LevelError);
}
else {
for(const CuData& da : dl)
printf("ca-supervisor.onResult: extracted %s chan %s id %s global method \"%s\"\n", datos(da), chan.c_str(), id.c_str(), global_m.c_str());
}
int fd = data.I("fd");
int byw = m_sock_write(fd, CaSupJsoniz().make_msg_ok());
printf("[0x%lx] \e[1;32mCaSupervisor::onResult \e[0;35m %d bytes written in reply \e[0m\n", pthread_self(), byw);
}
// connection / disconnection from ca-proxy
if(data.containsKey("fd") && data.containsKey("fdopen")) {
bool o = data.B("fdopen");
const int fd = data.I("fd");
d->log->write("ca-supervisor", "peer " + std::string(o ? "" : "dis") + "connected: sofd " + std::to_string(fd), CuLog::LevelInfo);
if(!o) {
close(fd);
}
}
}
void CaSupervisor::onResult(const std::vector<CuData> &datalist) { }
......@@ -16,3 +69,16 @@ void CaSupervisor::onResult(const std::vector<CuData> &datalist) { }
CuData CaSupervisor::getToken() const {
return CuData("type", "ca-supervisor");
}
int CaSupervisor::m_sock_write(int sofd, const std::string &buf) {
int totclibw = 0, clibw = 0, wlen = buf.length();
// write rbuf back to client fd
while(totclibw < wlen && clibw >= 0) {
clibw = send(sofd, buf.c_str() + totclibw, wlen - totclibw < _BUFSIZ ? wlen - totclibw : _BUFSIZ, MSG_NOSIGNAL);
if(clibw < 0) {
d->log->write("ca-supervisor", "write failed :" + std::string(strerror(errno)));
}
totclibw += clibw;
}
return clibw >=0 ? totclibw : clibw;
}
......@@ -4,10 +4,16 @@
#include <cuthreadlistener.h>
class CuLogImplI;
class CaDbH;
class CaSupervisorPrivate;
class CaSupervisor : public CuThreadListener
{
public:
CaSupervisor();
CaSupervisor(CaDbH *dbh, CuLogImplI *log);
virtual ~CaSupervisor();
// CuThreadListener interface
public:
......@@ -15,6 +21,9 @@ public:
void onResult(const CuData &data);
void onResult(const std::vector<CuData> &datalist);
CuData getToken() const;
private:
int m_sock_write(int sofd, const std::string &buf);
CaSupervisorPrivate *d;
};
#endif // CASUPERVISOR_H
#include "casupdbfuncs.h"
#include "casupdbhandle.h"
#include <cadbhandle.h>
CaSupDbFuncs::CaSupDbFuncs() { }
......@@ -9,17 +9,17 @@ CaSupDbFuncs::CaSupDbFuncs() { }
*
* Recovery period is set by default to 30 minutes.
*
* \param dbh handle to CaSupDbH
* \param dbh handle to CaDbH
* \return vector of CuData with listed:
* 1. active and alive services
* 2. active dormant not yet in recovery phase
* 3. active dormant in recovery
*/
std::vector<CuData> CaSupDbFuncs::get_active_services(CaSupDbH *dbh) {
std::vector<CuData> CaSupDbFuncs::get_active_services(CaDbH *dbh) {
// 1. select ACTIVE ALIVE (expected >= current timestamp)
std::string stm = "SELECT service.*,srvconf.srvnam,srvconf.addr,srvconf.port,current_timestamp "
"FROM service,srvconf WHERE state=$1 AND expected >= (SELECT current_timestamp) AND service.conf_id=srvconf.conf_id";
CaSupDbRes res = dbh->execute(stm, std::vector<std::string> { "ACTIVE" });
CaDbRes res = dbh->execute(stm, std::vector<std::string> { "ACTIVE" });
std::vector<CuData> v = m_res_to_data(res);
for(CuData &da : v) {
da["err"] = false;
......@@ -74,10 +74,10 @@ std::vector<CuData> CaSupDbFuncs::get_active_services(CaSupDbH *dbh) {
return v;
}
std::vector<CuData>* CaSupDbFuncs::get_srcs_for_srv_id(CaSupDbH *h, int id) {
std::vector<CuData>* CaSupDbFuncs::get_srcs_for_srv_id(CaDbH *h, int id) {
std::string stm = "SELECT activity.source,activity.chan,service.conf_id from activity,service "
" WHERE service.id=activity.srv_id AND service.id=$1";
CaSupDbRes res = h->execute(stm, std::vector<std::string> { std::to_string(id) });
CaDbRes res = h->execute(stm, std::vector<std::string> { std::to_string(id) });
std::vector<CuData> *v = new std::vector<CuData>();
*v = m_res_to_data(res);
last_db_error = res.errmsg;
......@@ -86,41 +86,41 @@ std::vector<CuData>* CaSupDbFuncs::get_srcs_for_srv_id(CaSupDbH *h, int id) {
// clean all unreachable recovery records (whose service id is not in the activity records)
//
bool CaSupDbFuncs::clean_recovery_table(CaSupDbH *h) {
bool CaSupDbFuncs::clean_recovery_table(CaDbH *h) {
std::string stm = "DELETE FROM recovery WHERE srv_id NOT IN (SELECT srv_id FROM activity)";
CaSupDbRes res = h->execute(stm);
CaDbRes res = h->execute(stm);
last_db_error = res.errmsg;
return !res.error();
}
bool CaSupDbFuncs::set_last_recover_successful(CaSupDbH *h, bool success) {
bool CaSupDbFuncs::set_last_recover_successful(CaDbH *h, bool success) {
std::string stm = "UPDATE recover_operations SET success=$1,complete=current_timestamp WHERE id=(SELECT MAX(id) FROM recover_operations)";
CaSupDbRes r = h->execute(stm, std::vector<std::string> { success ? "TRUE" : "FALSE" });
CaDbRes r = h->execute(stm, std::vector<std::string> { success ? "TRUE" : "FALSE" });
last_db_error = r.errmsg;
return !r.error();
}
int CaSupDbFuncs::register_instance(CaSupDbH *h, const std::string& url)
int CaSupDbFuncs::register_instance(CaDbH *h, const std::string& url)
{
std::string stm = "INSERT INTO supervisor (id,started,stopped,url) VALUES(DEFAULT, current_timestamp, NULL, $1) RETURNING id";
CaSupDbRes r = h->execute(stm, std::vector<std::string> { url });
CaDbRes r = h->execute(stm, std::vector<std::string> { url });
if(!r.error() && r.size() > 0)
return atoi(r.value(0, "id").c_str());
last_db_error = r.errmsg;
return -1;
}
bool CaSupDbFuncs::unregister_instance(CaSupDbH *h, int id) {
bool CaSupDbFuncs::unregister_instance(CaDbH *h, int id) {
std::string stm = "UPDATE supervisor SET stopped=current_timestamp WHERE id=$1";
CaSupDbRes r = h->execute(stm, std::vector<std::string> { std::to_string(id) });
CaDbRes r = h->execute(stm, std::vector<std::string> { std::to_string(id) });
last_db_error = r.errmsg;
return !r.error();
}
bool CaSupDbFuncs::insert_recover_operation_start(CaSupDbH *h, int superv_id, int from_srv_id, const std::vector<CuData> *srcs) {
bool CaSupDbFuncs::insert_recover_operation_start(CaDbH *h, int superv_id, int from_srv_id, const std::vector<CuData> *srcs) {
std::string conf_id;
std::string stm = "SELECT srvconf.conf_id FROM srvconf,service where service.id=$1 AND srvconf.conf_id=service.conf_id";
CaSupDbRes r = h->execute(stm, std::vector<std::string> { std::to_string(from_srv_id) });
CaDbRes r = h->execute(stm, std::vector<std::string> { std::to_string(from_srv_id) });
if(!r.error() && r.size() > 0)
conf_id = r.value(0, "conf_id");
if(conf_id.length() > 0) {
......@@ -140,7 +140,7 @@ bool CaSupDbFuncs::insert_recover_operation_start(CaSupDbH *h, int superv_id, in
return !r.error();
}
std::vector<CuData> CaSupDbFuncs::m_res_to_data(const CaSupDbRes &res) {
std::vector<CuData> CaSupDbFuncs::m_res_to_data(const CaDbRes &res) {
std::vector<CuData> v;
for(size_t r = 0; r < res.size(); r++) { // fetch rows
CuData da;
......@@ -157,9 +157,9 @@ std::vector<CuData> CaSupDbFuncs::m_res_to_data(const CaSupDbRes &res) {
* \param srv_id the server which activities are being recovered
* \return true if database operation is successful, false otherwise
*/
bool CaSupDbFuncs::insert_recovery_data(CaSupDbH *h, int srv_id,int superv_id) {
bool CaSupDbFuncs::insert_recovery_data(CaDbH *h, int srv_id,int superv_id) {
std::string stm = "UPDATE recovery SET started=current_timestamp WHERE srv_id=$1";
CaSupDbRes res = h->execute(stm, std::vector<std::string> { std::to_string(srv_id) });
CaDbRes res = h->execute(stm, std::vector<std::string> { std::to_string(srv_id) });
if(!res.error() && res.affected_rows == 0) {
stm = "INSERT INTO recovery (id,srv_id,started,supervisor_id) VALUES(DEFAULT,$1,current_timestamp,$2)";
res = h->execute(stm, std::vector<std::string> { std::to_string(srv_id), std::to_string(superv_id) });
......@@ -168,9 +168,9 @@ bool CaSupDbFuncs::insert_recovery_data(CaSupDbH *h, int srv_id,int superv_id)
return !res.error();
}
bool CaSupDbFuncs::remove_from_recovery(CaSupDbH *h, int srv_id) {
bool CaSupDbFuncs::remove_from_recovery(CaDbH *h, int srv_id) {
std::string stm = "DELETE FROM recovery WHERE srv_id=$1";
CaSupDbRes res = h->execute(stm, std::vector<std::string> { std::to_string(srv_id) });
CaDbRes res = h->execute(stm, std::vector<std::string> { std::to_string(srv_id) });
last_db_error = res.errmsg;
return !res.error();
}
......@@ -4,26 +4,26 @@
#include <vector>
#include <cudata.h>
class CaSupDbH;
class CaSupDbRes;
class CaDbH;
class CaDbRes;
class CaSupDbFuncs
{
public:
CaSupDbFuncs();
std::vector<CuData> get_active_services(CaSupDbH *dbh);
std::vector<CuData> *get_srcs_for_srv_id(CaSupDbH *h, int id);
std::vector<CuData> get_active_services(CaDbH *dbh);
std::vector<CuData> *get_srcs_for_srv_id(CaDbH *h, int id);
std::string last_db_error;
std::vector<CuData> m_res_to_data(const CaSupDbRes& res);
bool insert_recovery_data(CaSupDbH *h, int srv_id, int superv_id);
bool remove_from_recovery(CaSupDbH *h, int srv_id);
bool clean_recovery_table(CaSupDbH *h);
bool set_last_recover_successful(CaSupDbH *h, bool success);
int register_instance(CaSupDbH *h, const std::string &url);
bool unregister_instance(CaSupDbH *h, int id);
bool insert_recover_operation_start(CaSupDbH *h, int superv_id, int from_srv_id, const std::vector<CuData> *srcs);
std::vector<CuData> m_res_to_data(const CaDbRes& res);
bool insert_recovery_data(CaDbH *h, int srv_id, int superv_id);
bool remove_from_recovery(CaDbH *h, int srv_id);
bool clean_recovery_table(CaDbH *h);
bool set_last_recover_successful(CaDbH *h, bool success);
int register_instance(CaDbH *h, const std::string &url);
bool unregister_instance(CaDbH *h, int id);
bool insert_recover_operation_start(CaDbH *h, int superv_id, int from_srv_id, const std::vector<CuData> *srcs);
};
#endif // CASUPDBFUNCS_H
#include "casupdbmon.h"
#include <cumacros.h>
#include "casupdbhandle.h"
#include <cadbhandle.h>
#include "casupdbfuncs.h"
#include "casuplog.h"
#include "casuprecoverxmitevent.h"
......@@ -13,7 +13,7 @@
class CaSupDbMonPrivate {
public:
CaSupDbH *dbh;
CaDbH *dbh;
std::string msg;
bool error;
int superv_id;
......@@ -43,7 +43,7 @@ void CaSupDbMon::init()
else
setInterval(HEARTBEAT_INTERVAL * 1000);
const CuData&tok = getToken();
d->dbh = new CaSupDbH(tok["dbu"].toString(), tok["dbp"].toString(), tok["dbna"].toString(), tok["dbh"].toString());
d->dbh = new CaDbH(tok["dbuser"].toString(), tok["dbpass"].toString(), tok["dbnam"].toString(), tok["dbhost"].toString());
d->error = !d->dbh->connect();
if(!d->error) {
CaSupDbFuncs dbf;
......
......@@ -56,3 +56,9 @@ std::map<std::string, std::string> CaSupJsoniz::jsonize(const std::vector<CuData
}
return jsonma;
}
const std::string CaSupJsoniz::make_msg_ok() const {
nlohmann::json o;
o["msg"] = "ok";
return o.dump();
}
......@@ -14,6 +14,7 @@ class CaSupJsoniz
public:
CaSupJsoniz();
std::map<std::string, std::string> jsonize(const std::vector<CuData>&dl) const;
const std::string make_msg_ok() const;
};
#endif // CASUPJSONIZ_H
#include "casupreceiver.h"
CaSupReceiver::CaSupReceiver(const CuData& tok) : CaReceiver_A(tok) {
}
bool CaSupReceiver::buf_complete(const std::string &buf, int expected_len) const
{
return buf[buf.length() - 1] == '\0';
}
#ifndef CASUPRECEIVER_H
#define CASUPRECEIVER_H
#include <ca-receiver-a.h>
class CaSupReceiver : public CaReceiver_A
{
public:
CaSupReceiver(const CuData &tok);
// CaReceiver_A interface
public:
bool buf_complete(const std::string &buf, int expected_len) const;
};
#endif // CASUPRECEIVER_H
......@@ -118,7 +118,7 @@ int main(int argc, char *argv[]) {
CaSupDbMon *sup = new CaSupDbMon(montok, log_i);
cumbia->registerActivity(sup, redistrib, opts, *thf_impl, *thread_eb_f);
CaSupervisor *supervisor = new CaSupervisor();
CaSupervisor *supervisor = new CaSupervisor(nullptr, log_i);
CaReceiver_A *server_a = nullptr;
printf("\e[1;32m # \e[0mmain.cpp: starting receiver on %s\n", datos(opts));
server_a = new CaReceiver_A(opts);
......@@ -128,6 +128,8 @@ int main(int argc, char *argv[]) {
printf("\e[1;32m # \e[0mmain.cpp: \e[1;32mcasupervisor version \e[2;32m%s\e[0m started\n", CASUPERVISOR_VERSION);
loo_s->exec(false);
cumbia->unregisterActivity(sup);
server_a->stop();
cumbia->unregisterActivity(server_a);
delete cumbia;
printf("\e[1;32m # \e[0mmain.cpp: casupervisor exiting\n");
log_i->write("casupervisor", "exiting", CuLog::LevelAll);
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment