Commit 4af906a0 authored by Giacomo Strangolino's avatar Giacomo Strangolino
Browse files

supervisor code cleaning and restoring initial recovery functionality

parent b3b5ef7b
......@@ -6,5 +6,5 @@ dbhost=localhost
dbnam=cadb
# bind to
host=192.168.205.25
host=192.168.1.159
port=9295
......@@ -55,7 +55,7 @@ void CaSupDbActivity::stop() {
}
bool CaSupDbActivity::matches(const CuData &token) const {
printf("CaSupDbActivity.matches token %s matches my tok %s?\n", datos(token), datos(getToken()));
printf("CaSupDbActivity.matches: %s with %s\n", datos(getToken()), datos(token));
return getToken()["type"] == token["type"];
}
......@@ -208,7 +208,7 @@ void CaSupDbActivity::onTimeout(CuTimer *) {
d->log->write("ca-supervisor", stat, CuLog::LevelError);
cuprintf("%s", stat.c_str());
if(s.has("state", "RECOVER")) {
if(s.has("state", "RECOVER")) { // need RECOVER: CaSupDbFuncs::get_active_services (2.)
srcs = f.get_srcs_for_srv_id(d->dbh, id);
/// print debug section
if(srcs->size() > 0) {
......@@ -218,22 +218,25 @@ void CaSupDbActivity::onTimeout(CuTimer *) {
for(const CuData& src : *srcs)
d->log->write("ca-supervisor", "in charge of " + src["source"].toString() + " chan " + src["chan"].toString(), CuLog::LevelInfo);
d->log->write("ca-supervisor", "dormant service: {" + s.toString() + "} was in charge of " + std::to_string(srcs->size()) + " sources");
//
// publish result on CaSupRedistrib
publishResult(srcs);
if(srcs->size() == 0) {
d->log->write("ca-supervisor", "sources of dormant service " + s["addr"].toString() + ":" + s["port"].toString() +
" have been taken charge of by one (or more) service instances", CuLog::LevelInfo);
}
else {
printf("[0x%lx] \e[1;32m recovering %ld activities for service id %d\e[0m\n", pthread_self(), srcs->size(), id);
bool ok = f.insert_recovery_data(d->dbh, id, d->superv_id);
if(ok)
ok = f.insert_recover_operation_start(d->dbh, d->superv_id, id, srcs);
if(ok) // remove the activities being recovered from the activity table to prevent concurrent supervisor operations on them
ok = f.remove_activities_for_srv_id(d->dbh, id); // get_srcs_for_srv_id counterpart
if(!ok)
d->log->write("ca-supevisor", "database error: " + f.last_db_error);
}
// now recover
// publish result on CaSupRedistrib
publishResult(srcs);
}
}
}
......@@ -255,5 +258,12 @@ int CaSupDbActivity::repeat() const {
return -1;
}
void CaSupDbActivity::event(CuActivityEvent *) {
void CaSupDbActivity::event(CuActivityEvent *e) {
printf("[0x%lx] CaSupDbActivity.event: %d\n", pthread_self(), e->getType());
if(e->getType() == static_cast<int>(CaSupRecoverXmitEvent::RecoverXmitEventType)) {
CaSupRecoverXmitEvent* re = static_cast<CaSupRecoverXmitEvent *>(e);
printf("[0x%lx] CaSupDbActivity.event: recover event ok ? %s sources size %ld\n",
pthread_self(), re->success ? "YES" : "NO", re->srcs.size());
}
}
......@@ -18,22 +18,22 @@ CaSupDbFuncs::CaSupDbFuncs() { }
std::vector<CuData> CaSupDbFuncs::get_active_services(CaDbH *dbh) {
// 1. select ACTIVE ALIVE (expected >= current timestamp)
std::string stm = "SELECT service.*,srvconf.srvnam,srvconf.addr,srvconf.port,current_timestamp "
"FROM service,srvconf WHERE state=$1 AND expected >= (SELECT current_timestamp) AND service.conf_id=srvconf.conf_id";
CaDbRes res = dbh->execute(stm, std::vector<std::string> { "ACTIVE" });
"FROM service,srvconf WHERE expected >= (SELECT current_timestamp) AND service.conf_id=srvconf.conf_id";
CaDbRes res = dbh->execute(stm, std::vector<std::string>());
std::vector<CuData> v = m_res_to_data(res);
for(CuData &da : v) {
da["err"] = false;
da["state"] = "ACTIVE";
}
if(!res.error()) {
// 2. select ACTIVE "dormant" (expected < current_timestamp)
// excluded those services listed in the recovery table
// 2. select lagging (expected < current_timestamp) services in charge of activities.
// exclude services listed in the recovery table
//
stm = "SELECT service.*,srvconf.srvnam,srvconf.addr,srvconf.port,current_timestamp "
"FROM service,srvconf WHERE state=$1 AND expected < (SELECT current_timestamp) AND service.conf_id=srvconf.conf_id "
"FROM service,srvconf WHERE expected < (SELECT current_timestamp) AND service.conf_id=srvconf.conf_id "
"AND service.id IN (SELECT srv_id FROM activity) "
"AND id NOT IN (SELECT srv_id FROM recovery WHERE started > current_timestamp - interval '30m' )";
res = dbh->execute(stm, std::vector<std::string> { "ACTIVE" });
res = dbh->execute(stm, std::vector<std::string>());
std::vector<CuData> vlate_recov = m_res_to_data(res);
for(CuData &da : vlate_recov) {
da["err"] = true;
......@@ -46,36 +46,23 @@ std::vector<CuData> CaSupDbFuncs::get_active_services(CaDbH *dbh) {
// listed in the recovery table
//
stm = "SELECT service.*,srvconf.srvnam,srvconf.addr,srvconf.port,current_timestamp "
"FROM service,srvconf WHERE state=$1 AND expected < (SELECT current_timestamp) AND service.conf_id=srvconf.conf_id "
"FROM service,srvconf WHERE expected < (SELECT current_timestamp) AND service.conf_id=srvconf.conf_id "
"AND id IN (SELECT srv_id FROM recovery WHERE started > current_timestamp - interval '30m' )";
res = dbh->execute(stm, std::vector<std::string> { "ACTIVE" });
std::vector<CuData> vlate_norecov = m_res_to_data(res);
for(CuData &da : vlate_norecov) {
res = dbh->execute(stm, std::vector<std::string> ());
std::vector<CuData> vlate_recovering = m_res_to_data(res);
for(CuData &da : vlate_recovering) {
da["err"] = true;
da["state"] = "RECO_WAIT"; // in the recovery table, recent recover attempt
da["msg"] = "late: expected " + da["expected"].toString();
}
v.insert(v.end(), vlate_norecov.begin(), vlate_norecov.end());
stm = "SELECT service.*,srvconf.srvnam,srvconf.addr,srvconf.port,current_timestamp "
"FROM service,srvconf WHERE state=$1 AND expected < (SELECT current_timestamp) AND service.conf_id=srvconf.conf_id "
"AND id NOT IN (SELECT srv_id FROM recovery WHERE started > current_timestamp - interval '30m' )";
res = dbh->execute(stm, std::vector<std::string> { "ACTIVE" });
std::vector<CuData> vlate_zombie = m_res_to_data(res);
for(CuData &da : vlate_zombie) {
da["err"] = true;
da["state"] = "ZOMBIE"; // still ACTIVE, without any linked activity. useless
da["msg"] = "late: expected " + da["expected"].toString();
}
v.insert(v.end(), vlate_zombie.begin(), vlate_zombie.end());
v.insert(v.end(), vlate_recovering.begin(), vlate_recovering.end());
}
last_db_error = res.errmsg;
return v;
}
std::vector<CuData>* CaSupDbFuncs::get_srcs_for_srv_id(CaDbH *h, int id) {
std::string stm = "SELECT activity.source,activity.chan,service.conf_id from activity,service "
std::string stm = "SELECT activity.source,activity.chan,service.conf_id,activity.cli_id from activity,service "
" WHERE service.id=activity.srv_id AND service.id=$1";
CaDbRes res = h->execute(stm, std::vector<std::string> { std::to_string(id) });
std::vector<CuData> *v = new std::vector<CuData>();
......@@ -84,6 +71,31 @@ std::vector<CuData>* CaSupDbFuncs::get_srcs_for_srv_id(CaDbH *h, int id) {
return v;
}
bool CaSupDbFuncs::remove_activities_for_srv_id(CaDbH *h, int id) {
std::string stm = "DELETE FROM activity WHERE srv_id=$1";
CaDbRes r = h->execute(stm, std::vector<std::string> { std::to_string(id) } );
return !r.error();
}
bool CaSupDbFuncs::restore_activities_after_recover_failure(CaDbH *h, const std::vector<CuData> &srcs) {
last_db_error.clear();
const std::string &stm = "INSERT INTO activity (srv_id,cli_id,source,chan) VALUES($1,$2,$3,$4)";
CaDbRes r = h->begin_transaction();
for(const CuData &s : srcs) {
const std::string& conf_id = s.s("conf_id");
if(!r.error() && conf_id.length() > 0) {
r = h->execute("SELECT id FROM service WHERE conf_id=$1", std::vector<std::string> {conf_id});
if(r.size() == 1)
r = h->execute(stm, std::vector<std::string> { r.value(0, "id"), s.s("conf_id"), s.s("src"), s.s("channel")});
else
last_db_error = "restore_activities_after_recover_failure: no service with conf_id " + conf_id;
}
}
if(r.error()) last_db_error = r.errmsg;
return !r.error();
}
// clean all unreachable recovery records (whose service id is not in the activity records)
//
bool CaSupDbFuncs::clean_recovery_table(CaDbH *h) {
......@@ -131,8 +143,8 @@ bool CaSupDbFuncs::insert_recover_operation_start(CaDbH *h, int superv_id, int f
recover_op_id = r.value(0, "id");
if(recover_op_id.length() > 0) {
for (const CuData& s : *srcs) {
stm = "INSERT INTO recover_activities (operation_id,source,chan,from_srv_conf) VALUES ($1,$2,$3,$4)";
r = h->execute(stm, std::vector<std::string> { recover_op_id, s["source"].toString(), s["chan"].toString(), conf_id });
stm = "INSERT INTO recover_activities (operation_id,source,chan,cli_id,from_srv_conf) VALUES ($1,$2,$3,$4,$5)";
r = h->execute(stm, std::vector<std::string> { recover_op_id, s.s("source"), s.s("chan"),s.s("cli_id"), conf_id });
}
}
}
......
......@@ -14,6 +14,8 @@ public:
CaSupDbFuncs();
std::vector<CuData> get_active_services(CaDbH *dbh);
std::vector<CuData> *get_srcs_for_srv_id(CaDbH *h, int id);
bool remove_activities_for_srv_id(CaDbH *h, int id);
bool restore_activities_after_recover_failure(CaDbH *h, const std::vector<CuData>& srcs);
std::string last_db_error, msg;
......
......@@ -17,7 +17,7 @@ std::map<std::string, std::string> CaSupJsoniz::jsonize(const std::vector<CuData
// group by channel
std::map<std::string, std::vector<CuData>> chdmap;
for(const CuData& d : dl) {
chdmap[d["chan"].toString()].push_back(d);
chdmap[d["cli_id"].toString()].push_back(d);
}
for(std::map<std::string, std::vector<CuData>>::const_iterator it = chdmap.begin(); it != chdmap.end(); ++it) {
nlohmann::json src_array;
......@@ -47,11 +47,12 @@ std::map<std::string, std::string> CaSupJsoniz::jsonize(const std::vector<CuData
data_o["method"] = "s";
data_o["recovered-from-srv-conf"] = d["conf_id"].toString();
data_o["recovered-by"] = "casupervisor";
data_o["channel"] = d["channel"].toString();
src_array.push_back(data_o);
}
nlohmann::json req;
req["srcs"] = src_array;
req["channel"] = it->first;
req["id"] = it->first;
jsonma[it->first] = req.dump() + "\r\n\r\n";
}
return jsonma;
......
#include "casuprecoverxmitevent.h"
CaSupRecoverXmitEvent::CaSupRecoverXmitEvent(int recover_op_id, bool _success)
: recover_operation_id(recover_op_id), success(_success) {
CaSupRecoverXmitEvent::CaSupRecoverXmitEvent(int recover_op_id, bool _success, const std::vector<CuData> &_srcs)
: recover_operation_id(recover_op_id), success(_success), srcs(_srcs) {
}
......
......@@ -8,13 +8,14 @@ class CaSupRecoverXmitEvent : public CuActivityEvent
public:
enum Type { RecoverXmitEventType = CuActivityEvent::User + 18 };
CaSupRecoverXmitEvent(int recover_op_id, bool _success);
CaSupRecoverXmitEvent(int recover_op_id, bool _success, const std::vector<CuData> &srcs);
// CuActivityEvent interface
CuActivityEvent::Type getType() const;
int recover_operation_id;
bool success;
const std::vector<CuData> srcs;
};
......
......@@ -6,6 +6,7 @@
#include <map>
#include <casuplog.h>
#include <cumbia.h>
#include <cuactivity.h>
class CaSupRedistribPrivate {
public:
......@@ -51,9 +52,11 @@ void CaSupRedistrib::onResult(const std::vector<CuData> &datalist)
d->log_i->write("ca-supervisor", k + " --> " + v, CuLog::LevelInfo);
}
bool ok = d->sucu->xmit(jsonma);
CuActivity *a = d->cumbia->findActivity(CuData("type", "dbmon"));
if(a)
d->cumbia->postEvent(a, new CaSupRecoverXmitEvent(-1, ok));
CuActivity *a = d->cumbia->findActivity(CuData("type", "supdba"));
if(a) {
printf("CaSupRedistrib.onResult: posting event to %p %s\n", a, datos(a->getToken()));
d->cumbia->postEvent(a, new CaSupRecoverXmitEvent(-1, ok, datalist));
}
}
......
......@@ -107,7 +107,7 @@ int main(int argc, char *argv[]) {
// register the service with the shared option set to true
cumbia->getServiceProvider()->registerSharedService(CuServices::EventLoop, loo_s);
CuData montok("type", "dbmon");
CuData montok("type", "supdba");
montok.merge(opts);
CuThreadFactoryImpl* thf_impl = new CuThreadFactoryImpl;
CuThreadsEventBridgeFactory *thread_eb_f = new CuThreadsEventBridgeFactory;
......@@ -115,7 +115,7 @@ int main(int argc, char *argv[]) {
CaSupRedistrib *redistrib = new CaSupRedistrib(opts["url"].toString(), opts["curlopt_ssl_verifypeer"].toBool(), log_i, cumbia);
CaSupDbActivity *sup = new CaSupDbActivity(montok, loo_s, log_i);
cumbia->registerActivity(sup, redistrib, opts, *thf_impl, *thread_eb_f);
cumbia->registerActivity(sup, redistrib, montok, *thf_impl, *thread_eb_f);
CaSupervisor *supervisor = new CaSupervisor(sup, log_i);
CaReceiver_A *server_a = nullptr;
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment