Commit b3b5ef7b authored by Giacomo Strangolino's avatar Giacomo Strangolino
Browse files

restored missing files into master

parent 73c352c3
#include "ca-supervisor.h"
#include "casupjsoniz.h"
#include "casupdbactivity.h"
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <netdb.h>
#include <errno.h>
#include <unistd.h>
#include <arpa/inet.h> // inet_ntop
#include <culog.h>
#include <cadbhandle.h>
#include <cajson-src-bundle-ex.h>
#define _BUFSIZ 512
class CaSupervisorPrivate {
public:
CaSupervisorPrivate(CaSupDbActivity* _dba, CuLogImplI *l) : log(l) , dba(_dba) {}
CuLogImplI *log;
CaSupDbActivity *dba;
};
CaSupervisor::CaSupervisor(CaSupDbActivity *dba, CuLogImplI *log) {
d = new CaSupervisorPrivate(dba, log);
}
CaSupervisor::~CaSupervisor() {
delete d;
}
void CaSupervisor::onProgress(int step, int total, const CuData &data) { }
void CaSupervisor::onResult(const CuData &data) {
printf("CaSupervisor.onResult: received %s\n", datos(data));
if(data.containsKey("data")) {
const std::string& s = data.s("data");
CaJsonSrcBundleExtract bux;
std::list<CuData> dl;
std::string chan, id, global_m;
std::map<std::string, std::string> extram;
extram["srv_id"] = "";
extram["method"] = "";
bux.extract(s, dl, &chan, &id, &global_m, &extram);
if(bux.error_msg.length() > 0) {
d->log->write("ca-supervisor", "Json error: " + bux.error_msg + " | msg payload " + s, CuLog::LevelError);
}
else {
for(const CuData& da : dl)
printf("ca-supervisor.onResult: extracted %s chan %s id %s global method \"%s\"\n", datos(da), chan.c_str(), id.c_str(), global_m.c_str());
CaSupDbAEvent *e = new CaSupDbAEvent(extram["srv_id"], id, extram["method"], chan, dl);
d->dba->new_event(e);
}
int fd = data.I("fd");
int byw = m_sock_write(fd, CaSupJsoniz().make_msg_ok());
printf("[0x%lx] \e[1;32mCaSupervisor::onResult \e[0;35m %d bytes written in reply \e[0m\n", pthread_self(), byw);
}
// connection / disconnection from ca-proxy
if(data.containsKey("fd") && data.containsKey("fdopen")) {
bool o = data.B("fdopen");
const int fd = data.I("fd");
d->log->write("ca-supervisor", "peer " + std::string(o ? "" : "dis") + "connected: sofd " + std::to_string(fd), CuLog::LevelInfo);
if(!o) {
close(fd);
}
}
}
void CaSupervisor::onResult(const std::vector<CuData> &datalist) { }
CuData CaSupervisor::getToken() const {
return CuData("type", "ca-supervisor");
}
int CaSupervisor::m_sock_write(int sofd, const std::string &buf) {
int totclibw = 0, clibw = 0, wlen = buf.length();
// write rbuf back to client fd
while(totclibw < wlen && clibw >= 0) {
clibw = send(sofd, buf.c_str() + totclibw, wlen - totclibw < _BUFSIZ ? wlen - totclibw : _BUFSIZ, MSG_NOSIGNAL);
if(clibw < 0) {
d->log->write("ca-supervisor", "write failed :" + std::string(strerror(errno)));
}
totclibw += clibw;
}
return clibw >=0 ? totclibw : clibw;
}
#include "casupdbactivity.h"
#include <cumacros.h>
#include <cadbhandle.h>
#include "casupdbfuncs.h"
#include "casuplog.h"
#include "casuprecoverxmitevent.h"
#include <sstream>
#include <locale>
#include <iomanip>
#include <chrono>
#include <queue>
#include <mutex>
#include <condition_variable>
#include <cutimerservice.h>
#include <cueventloop.h>
#include <cutimer.h>
class CaSupDbActivityPrivate {
public:
CaDbH *dbh;
std::string msg;
bool error, quit;
int superv_id;
CuLogImplI *log;
CuTimerService *timer_srv;
CuTimer *timer;
std::queue <CaSupDbAEvent *> eq;
std::mutex mu;
std::condition_variable wait;
};
CaSupDbActivity::CaSupDbActivity(const CuData &tok,
CuEventLoopService *loo_s,
CuLogImplI* log) : CuActivity(tok) {
d = new CaSupDbActivityPrivate;
d->log = log;
d->superv_id = -1;
d->quit = d->error = false;
d->timer_srv = new CuTimerService();
int timeout = getToken().containsKey("mon_interval") ? getToken().I("mon_interval") : HEARTBEAT_INTERVAL;
d->timer = d->timer_srv->registerListener(this, timeout * 1000, loo_s);
}
CaSupDbActivity::~CaSupDbActivity() {
delete d;
}
void CaSupDbActivity::stop() {
std::unique_lock<std::mutex> lock(d->mu);
d->quit = true;
d->wait.notify_one();
}
bool CaSupDbActivity::matches(const CuData &token) const {
printf("CaSupDbActivity.matches token %s matches my tok %s?\n", datos(token), datos(getToken()));
return getToken()["type"] == token["type"];
}
void CaSupDbActivity::new_event(CaSupDbAEvent *e) const{
std::unique_lock<std::mutex> lock(d->mu);
d->eq.push(e);
d->wait.notify_one();
}
void CaSupDbActivity::init()
{
printf(" [0x%lx ] CaSupDbActivity.init enter\n", pthread_self());
const CuData&tok = getToken();
d->dbh = new CaDbH(tok["dbuser"].toString(), tok["dbpass"].toString(), tok["dbnam"].toString(), tok["dbhost"].toString());
d->error = !d->dbh->connect();
if(!d->error) {
CaSupDbFuncs dbf;
d->superv_id = dbf.register_instance(d->dbh, tok["url"].toString());
}
publishResult(CuData("type", "init").set("err", d->error).set("msg", d->dbh->msg));
}
void CaSupDbActivity::execute() {
bool ok;
printf(" [0x%lx ] CaSupDbActivity.execute enter\n", pthread_self());
if(!d->error) {
while(!d->quit) {
std::list<CaSupDbAEvent *> events;
{
std::unique_lock<std::mutex> lock(d->mu);
// copy messages locally while holding lock
while(d->eq.empty() && !d->quit)
d->wait.wait(lock);
while(!d->eq.empty() && !d->quit) {
events.emplace_back(std::move(d->eq.front()));
d->eq.pop();
}
}// end locked section
// process messages
for(CaSupDbAEvent *e : events) {
CaSupDbFuncs dbf;
printf(" [0x%lx ] CaSupDbActivity.execute: new event\n", pthread_self());
if(e->type == CaSupDbAEvent::RecoverSuccessful) {
dbf.set_last_recover_successful(d->dbh, e->success);
}
else if (e->type == CaSupDbAEvent::DbQuery) {
const std::list<CuData> &dali = e->datalist;
printf(" [0x%lx ] CaSupDbActivity.execute: db query event data - server ID %s method %s\n",
pthread_self(), e->server_id.c_str(), e->method.c_str());
if(e->method == "s")
ok = dbf.register_activities(d->dbh, e->server_id, e->cli_id, e->channel, e->datalist);
else if(e->method == "u")
ok = dbf.unregister_activities(d->dbh, e->server_id, e->cli_id, e->channel, e->datalist);
if(!ok)
d->log->write("ca-supervisor", "database error: " + dbf.last_db_error);
}
delete e;
} // for event in events
}
}
}
void CaSupDbActivity::onExit() {
printf(" [0x%lx ] CaSupDbActivity. exit superv id %d!\n", pthread_self(), d->superv_id);
if(d->superv_id > -1) {
CaSupDbFuncs dbf;
dbf.unregister_instance(d->dbh, d->superv_id);
}
}
std::string CaSupDbActivity::m_get_status(const CuData &da, CaSupDbFuncs *f) const
{
bool err = da["err"].toBool();
bool recover = da.has("state", "RECOVER");
int conf_id;
char msg[2048];
memset(msg, 0, sizeof(char ) * 2048);
da["conf_id"].to<int>(conf_id);
// convert expected to datetime
std::tm te = {}, ts = {}, tc = {}; // expected, started, current
std::istringstream exp(da["expected"].toString());
std::istringstream started(da["started"].toString());
std::istringstream current(da["current_timestamp"].toString());
// 2021-05-17 15:48:55.670696
// https://en.cppreference.com/w/cpp/io/manip/get_time
// Y parses full year as a 4 digit decimal number, leading zeroes permitted but not required
// m parses the month as a decimal number (range [01,12]), leading zeroes permitted but not required
// d parses the day of the month as a decimal number (range [01,31]), leading zeroes permitted but not required
// H parses the hour as a decimal number, 24 hour clock (range [00-23]), leading zeroes permitted but not required
// M parses minute as a decimal number (range [00,59]), leading zeroes permitted but not required
// S parses second as a decimal number (range [00,60]), leading zeroes permitted but not required
exp >> std::get_time(&te, "%Y-%m-%d %H:%M:%S");
if(exp.fail())
perr("CaSubDbMon.m_print_status: failed to parse expected date/time \"%s\"", vtoc2(da, "expected"));
started >> std::get_time(&ts, "%Y-%m-%d %H:%M:%S");
if(started.fail())
perr("CaSubDbMon.m_print_status: failed to parse started date/time \"%s\"", vtoc2(da, "started"));
current >> std::get_time(&tc, "%Y-%m-%d %H:%M:%S");
if(current.fail())
perr("CaSubDbMon.m_print_status: failed to parse current date/time \"%s\"", vtoc2(da, "started"));
if(!exp.fail() && !started.fail() && !current.fail()) {
double expected_in = std::chrono::duration<double, std::milli>(mktime(&te) - mktime(&tc)).count();
char de[256], ds[256];
char srvna[20];
char stat[32], stat_col[16], strike_stat_col[16];
memset(srvna, 0, sizeof(char) * 16);
snprintf(srvna, 12, "\"%s\"", da["srvnam"].toString().c_str());
if(da["srvnam"].toString().length() > 12)
strncat(srvna, "...", 4);
strcpy(stat, da["state"].toString().c_str());
err ? (!recover ? strcpy(stat_col, "\e[1;31m") : strcpy(stat_col, "\e[1;35m") ) : strcpy(stat_col, "\e[1;32m");
err ? strcpy(strike_stat_col, "\e[2;31;1m") : strcpy(strike_stat_col, "\e[1;32;3m");
strncpy(de, asctime(&te), 255);
strncpy(ds, asctime(&ts), 255);
if(strlen(de) > 0) de[strlen(de) - 1 ] = '\0';
if(strlen(ds) > 0) ds[strlen(ds) - 1 ] = '\0';
snprintf(msg, 2047, "%s #\e[0m%8s \e[1;34m%15s\e[0m \e[1;36m%15s:%-5s\e[0m"
"\e[1;33m|\e[0m expected: %s%5.0fs \e[0m [%s]\e[1;33m|\e[0mstarted: %s",
stat_col, stat, srvna, vtoc2(da, "addr"), vtoc2(da, "port"), strike_stat_col, expected_in, de, ds);
}
return std::string(msg);
}
void CaSupDbActivity::onTimeout(CuTimer *) {
std::string stat;
CaSupDbFuncs f;
const std::vector<CuData> &active_srv = f.get_active_services(d->dbh);
for(const CuData& s : active_srv) {
int id = -1;
s["id"].to<int>(id);
if(!s["err"].toBool()) {
stat = m_get_status(s, &f);
d->log->write("ca-supervisor", stat, CuLog::LevelInfo);
cuprintf("%s", stat.c_str());
// remove active services from the recovery table
f.remove_from_recovery(d->dbh, id);
}
else { // dormant
const std::vector<CuData> *srcs = nullptr;
stat = m_get_status(s, &f);
d->log->write("ca-supervisor", stat, CuLog::LevelError);
cuprintf("%s", stat.c_str());
if(s.has("state", "RECOVER")) {
srcs = f.get_srcs_for_srv_id(d->dbh, id);
/// print debug section
if(srcs->size() > 0) {
d->log->write("ca-supervisor", "dormant service: {" + s.toString() + "} expected: " + s["expected"].toString(), CuLog::LevelError);
}
for(const CuData& src : *srcs)
d->log->write("ca-supervisor", "in charge of " + src["source"].toString() + " chan " + src["chan"].toString(), CuLog::LevelInfo);
d->log->write("ca-supervisor", "dormant service: {" + s.toString() + "} was in charge of " + std::to_string(srcs->size()) + " sources");
//
// publish result on CaSupRedistrib
publishResult(srcs);
if(srcs->size() == 0) {
d->log->write("ca-supervisor", "sources of dormant service " + s["addr"].toString() + ":" + s["port"].toString() +
" have been taken charge of by one (or more) service instances", CuLog::LevelInfo);
}
else {
bool ok = f.insert_recovery_data(d->dbh, id, d->superv_id);
if(ok)
ok = f.insert_recover_operation_start(d->dbh, d->superv_id, id, srcs);
if(!ok)
d->log->write("ca-supevisor", "database error: " + f.last_db_error);
}
}
}
}
if(active_srv.size() == 0)
printf("\e[1;33m - \e[0m no active services\n");
// remove old entries from recovery (rows with stray service id, probably recovered by another service
// and no more usable when their service id is not among the activities srv_id
f.clean_recovery_table(d->dbh);
d->timer_srv->restart(d->timer, d->timer->timeout());
}
int CaSupDbActivity::getType() const {
return CaSupDbActivityType;
}
int CaSupDbActivity::repeat() const {
return -1;
}
void CaSupDbActivity::event(CuActivityEvent *) {
}
#ifndef CASUPDBACTIVITY_H
#define CASUPDBACTIVITY_H
#include <cuactivity.h>
#include <cutimerlistener.h>
#include <list>
class CaSupDbActivityPrivate;
class CaSupDbFuncs;
class CuLogImplI;
class CuEventLoopService;
class CaSupDbAEvent {
public:
enum CaSupDbAEventType { DbQuery, RecoverSuccessful };
CaSupDbAEvent(const std::string& srv_id,
const std::string& clid,
const std::string& me,
const std::string& chan,
const std::list<CuData>& dlist)
: server_id(srv_id), cli_id(clid), channel(chan), method(me), type(DbQuery) {
datalist = std::move(dlist);
}
CaSupDbAEvent(bool su) : type(RecoverSuccessful), success(su) {}
std::string server_id;
std::string cli_id;
std::string channel, method;
CaSupDbAEventType type;
std::list<CuData> datalist;
bool success;
};
class CaSupDbActivity : public CuActivity, public CuTimerListener
{
public:
enum Type { CaSupDbActivityType = 344 };
CaSupDbActivity(const CuData& tok,
CuEventLoopService *loo_s,
CuLogImplI *log);
~CaSupDbActivity();
void stop();
// CuActivity interface
public:
bool matches(const CuData &token) const;
void new_event(CaSupDbAEvent *e) const;
protected:
void init();
void execute();
void onExit();
int getType() const;
int repeat() const;
void event(CuActivityEvent *e);
private:
CaSupDbActivityPrivate *d;
std::string m_get_status(const CuData& da, CaSupDbFuncs *f) const;
// CuTimerListener interface
public:
void onTimeout(CuTimer *);
};
#endif // CaSupDbActivity_H
#include "casupdbfuncs.h"
#include <cadbhandle.h>
CaSupDbFuncs::CaSupDbFuncs() { }
/*!
* \brief Returns a vector of CuData where active and alive services are listed first, active
* likely down services (called *dormant*) follow, down services in recovery end the list
*
* Recovery period is set by default to 30 minutes.
*
* \param dbh handle to CaDbH
* \return vector of CuData with listed:
* 1. active and alive services
* 2. active dormant not yet in recovery phase
* 3. active dormant in recovery
*/
std::vector<CuData> CaSupDbFuncs::get_active_services(CaDbH *dbh) {
// 1. select ACTIVE ALIVE (expected >= current timestamp)
std::string stm = "SELECT service.*,srvconf.srvnam,srvconf.addr,srvconf.port,current_timestamp "
"FROM service,srvconf WHERE state=$1 AND expected >= (SELECT current_timestamp) AND service.conf_id=srvconf.conf_id";
CaDbRes res = dbh->execute(stm, std::vector<std::string> { "ACTIVE" });
std::vector<CuData> v = m_res_to_data(res);
for(CuData &da : v) {
da["err"] = false;
da["state"] = "ACTIVE";
}
if(!res.error()) {
// 2. select ACTIVE "dormant" (expected < current_timestamp)
// excluded those services listed in the recovery table
//
stm = "SELECT service.*,srvconf.srvnam,srvconf.addr,srvconf.port,current_timestamp "
"FROM service,srvconf WHERE state=$1 AND expected < (SELECT current_timestamp) AND service.conf_id=srvconf.conf_id "
"AND service.id IN (SELECT srv_id FROM activity) "
"AND id NOT IN (SELECT srv_id FROM recovery WHERE started > current_timestamp - interval '30m' )";
res = dbh->execute(stm, std::vector<std::string> { "ACTIVE" });
std::vector<CuData> vlate_recov = m_res_to_data(res);
for(CuData &da : vlate_recov) {
da["err"] = true;
da["state"] = "RECOVER"; // either not in the recovery table or recent recovery attempt: need recover
da["msg"] = "late: expected " + da["expected"].toString();
}
v.insert(v.end(), vlate_recov.begin(), vlate_recov.end());
// 3. select ACTIVE "dormant" (expected < current_timestamp)
// listed in the recovery table
//
stm = "SELECT service.*,srvconf.srvnam,srvconf.addr,srvconf.port,current_timestamp "
"FROM service,srvconf WHERE state=$1 AND expected < (SELECT current_timestamp) AND service.conf_id=srvconf.conf_id "
"AND id IN (SELECT srv_id FROM recovery WHERE started > current_timestamp - interval '30m' )";
res = dbh->execute(stm, std::vector<std::string> { "ACTIVE" });
std::vector<CuData> vlate_norecov = m_res_to_data(res);
for(CuData &da : vlate_norecov) {
da["err"] = true;
da["state"] = "RECO_WAIT"; // in the recovery table, recent recover attempt
da["msg"] = "late: expected " + da["expected"].toString();
}
v.insert(v.end(), vlate_norecov.begin(), vlate_norecov.end());
stm = "SELECT service.*,srvconf.srvnam,srvconf.addr,srvconf.port,current_timestamp "
"FROM service,srvconf WHERE state=$1 AND expected < (SELECT current_timestamp) AND service.conf_id=srvconf.conf_id "
"AND id NOT IN (SELECT srv_id FROM recovery WHERE started > current_timestamp - interval '30m' )";
res = dbh->execute(stm, std::vector<std::string> { "ACTIVE" });
std::vector<CuData> vlate_zombie = m_res_to_data(res);
for(CuData &da : vlate_zombie) {
da["err"] = true;
da["state"] = "ZOMBIE"; // still ACTIVE, without any linked activity. useless
da["msg"] = "late: expected " + da["expected"].toString();
}
v.insert(v.end(), vlate_zombie.begin(), vlate_zombie.end());
}
last_db_error = res.errmsg;
return v;
}
std::vector<CuData>* CaSupDbFuncs::get_srcs_for_srv_id(CaDbH *h, int id) {
std::string stm = "SELECT activity.source,activity.chan,service.conf_id from activity,service "
" WHERE service.id=activity.srv_id AND service.id=$1";
CaDbRes res = h->execute(stm, std::vector<std::string> { std::to_string(id) });
std::vector<CuData> *v = new std::vector<CuData>();
*v = m_res_to_data(res);
last_db_error = res.errmsg;
return v;
}
// clean all unreachable recovery records (whose service id is not in the activity records)
//
bool CaSupDbFuncs::clean_recovery_table(CaDbH *h) {
std::string stm = "DELETE FROM recovery WHERE srv_id NOT IN (SELECT srv_id FROM activity)";
CaDbRes res = h->execute(stm);
last_db_error = res.errmsg;
return !res.error();
}
bool CaSupDbFuncs::set_last_recover_successful(CaDbH *h, bool success) {
std::string stm = "UPDATE recover_operations SET success=$1,complete=current_timestamp WHERE id=(SELECT MAX(id) FROM recover_operations)";
CaDbRes r = h->execute(stm, std::vector<std::string> { success ? "TRUE" : "FALSE" });
last_db_error = r.errmsg;
return !r.error();
}
int CaSupDbFuncs::register_instance(CaDbH *h, const std::string& url)
{
std::string stm = "INSERT INTO supervisor (id,started,stopped,url) VALUES(DEFAULT, current_timestamp, NULL, $1) RETURNING id";
CaDbRes r = h->execute(stm, std::vector<std::string> { url });
if(!r.error() && r.size() > 0)
return atoi(r.value(0, "id").c_str());
last_db_error = r.errmsg;
return -1;
}
bool CaSupDbFuncs::unregister_instance(CaDbH *h, int id) {
std::string stm = "UPDATE supervisor SET stopped=current_timestamp WHERE id=$1";
CaDbRes r = h->execute(stm, std::vector<std::string> { std::to_string(id) });
last_db_error = r.errmsg;
return !r.error();
}
bool CaSupDbFuncs::insert_recover_operation_start(CaDbH *h, int superv_id, int from_srv_id, const std::vector<CuData> *srcs) {
std::string conf_id;
std::string stm = "SELECT srvconf.conf_id FROM srvconf,service where service.id=$1 AND srvconf.conf_id=service.conf_id";
CaDbRes r = h->execute(stm, std::vector<std::string> { std::to_string(from_srv_id) });
if(!r.error() && r.size() > 0)
conf_id = r.value(0, "conf_id");
if(conf_id.length() > 0) {
std::string recover_op_id;
stm = "INSERT INTO recover_operations (id,supervisor_id,started) VALUES (DEFAULT,$1,current_timestamp) RETURNING id";
r = h->execute(stm, std::vector<std::string> { std::to_string(superv_id) });
if(!r.error() && r.size() > 0)
recover_op_id = r.value(0, "id");
if(recover_op_id.length() > 0) {
for (const CuData& s : *srcs) {
stm = "INSERT INTO recover_activities (operation_id,source,chan,from_srv_conf) VALUES ($1,$2,$3,$4)";
r = h->execute(stm, std::vector<std::string> { recover_op_id, s["source"].toString(), s["chan"].toString(), conf_id });
}
}
}
last_db_error = r.errmsg;
return !r.error();
}
bool CaSupDbFuncs::register_activities(CaDbH *dbhan, const std::string &srv_id,
const std::string& cli_id,
const std::string &channel,
const std::list<CuData> &datalist) {
std::ostringstream o;
std::string stm;
bool err;
last_db_error.clear();
CaDbRes res = dbhan->begin_transaction();
err = res.error();
printf("CaSupDbFuncs::register_activities datalist siz %ld chennel %s\n", datalist.size(), channel.c_str());
for(const CuData& da : datalist) {
if(!err) {
const std::string& src = da.s("src");
/// NOTE:
// see Notes on activities in the caserver database plugin design rationale
//
// is there an activity recorded with src and chan? Yes, update srv_id
stm = "SELECT srv_id FROM activity WHERE source=$1 AND chan=$2 AND cli_id=$3";
res = dbhan->execute(stm, std::vector<std::string> { src, channel, cli_id });
if(!res.error() && res.size() > 0) {
// same source, same chan, same client id
stm = "UPDATE activity SET srv_id=$1 WHERE source=$2 AND chan=$3";
res = dbhan->execute(stm, std::vector<std::string> { srv_id, src, channel });
if(res.error())
printf("\e[1;31mupdate fails %s\e[0m\n", res.errmsg.c_str());
}
else if(!res.error()) {
stm = "INSERT INTO activity (srv_id,cli_id,source,chan) VALUES($1,$2,$3,$4)";
res = dbhan->execute(stm, std::vector<std::string> { srv_id, cli_id, src, channel });
if(res.error())