Commit f30c0cf9 authored by Giacomo Strangolino's avatar Giacomo Strangolino
Browse files

fixes to recovery operations

parent 0ff4afef
......@@ -32,7 +32,7 @@ CaSupervisor::CaSupervisor(CaSupDbActivity *dba, const std::string &url, bool ss
CaSupervisor::~CaSupervisor() {
delete d->sucu;
delete d;
delete d;
}
......@@ -42,11 +42,11 @@ void CaSupervisor::onResult(const CuData &data) {
printf("CaSupervisor.onResult: received %s\n", datos(data));
if(data.has("type", "init")) { // from CaSupDbActivity
bool err = data["err"].toBool();
d->log->write("ca-supervisor", data["msg"].toString(), err ? CuLog::LevelError : CuLog::LevelInfo);
if(err) {
perr("ca-supervisor: %s", vtoc2(data, "msg"));
exit(EXIT_FAILURE);
}
d->log->write("ca-supervisor", data["msg"].toString(), err ? CuLog::LevelError : CuLog::LevelInfo);
if(err) {
perr("ca-supervisor: %s", vtoc2(data, "msg"));
exit(EXIT_FAILURE);
}
}
// from CaReceiver_A activity
else if(data.containsKey("data")) {
......@@ -85,15 +85,23 @@ void CaSupervisor::onResult(const CuData &data) {
void CaSupervisor::onResult(const std::vector<CuData> &srcs) {
// from CaSupDbActivity
CaSupJsoniz jiz;
const std::map<std::string, std::string> &jsonma = jiz.jsonize(srcs);
bool ok;
std::string errmsg;
std::map<std::string, std::list<CuData>> clidmap; // client id -> CuData map
// jsonize: return map client id->json request with requests divided by channel
// jsonize: chdmap will hold client id->CuData map useful for later error reporting
const std::map<std::string, std::string> &jsonma = jiz.jsonize(srcs, clidmap);
if(jsonma.size() > 0)
d->log->write("ca-supervisor", "requests grouped by *channel*", CuLog::LevelInfo);
for(const auto &[k, v] : jsonma) {
d->log->write("ca-supervisor", k + " --> " + v, CuLog::LevelInfo);
}
std::string errmsg;
bool ok = d->sucu->xmit(jsonma) && !jiz.is_err_msg(d->sucu->response(), errmsg);
d->dba->new_event(new CaSupDbAEvent(ok, std::list<CuData>(srcs.begin(), srcs.end()), errmsg));
// returns a map client id --> json error message (json error message is empty if ok)
std::map<std::string, std::string> msgmap = d->sucu->xmit(jsonma);
for(std::map<std::string, std::string>::const_iterator it = msgmap.begin(); it != msgmap.end(); ++it) {
ok = !jiz.is_err_msg(it->second, errmsg);
d->dba->new_event(new CaSupDbAEvent(ok, clidmap[it->first], errmsg));
}
}
CuData CaSupervisor::getToken() const {
......
#include "casupcurl.h"
#include "casupjsoniz.h"
#include <curl/curl.h>
#include <cumacros.h>
......@@ -22,8 +23,12 @@ CaSupCurl::~CaSupCurl() {
delete d;
}
bool CaSupCurl::xmit(const std::map<std::string,std::string> &datamap) {
std::map<std::string, std::string> CaSupCurl::xmit(const std::map<std::string,std::string> &datamap) {
std::map<std::string, std::string> resultmap;
int cnt = 0;
bool ok;
std::string message;
CaSupJsoniz jiz;
for(const auto& [chan, json] : datamap) {
++cnt;
struct curl_slist *slist = nullptr;
......@@ -47,16 +52,16 @@ bool CaSupCurl::xmit(const std::map<std::string,std::string> &datamap) {
curl_easy_setopt(d->curl, CURLOPT_WRITEDATA, (void *) &d->response );
CURLcode cuco = curl_easy_perform(d->curl);
d->ok = cuco == CURLE_OK;
cuco != CURLE_OK ? d->message = "CaSupCurl::xmit: " +
std::string(curl_easy_strerror(cuco)) + ": { " + errbuf + ": code" + std::to_string(cuco) + " }"
: d->message = std::string();
ok = (cuco == CURLE_OK);
// if CURL successful let message hold the server reply
ok ? message = d->response :
message = jiz.make_err_msg(std::string("CaSupCurl::xmit: ") + curl_easy_strerror(cuco) + ": "
+ errbuf + ": code " + std::to_string(cuco));
curl_slist_free_all(slist);
if(!d->ok)
perr("CaSupCurl::xmit: %s", d->message.c_str());
resultmap[chan] = message;
delete wd;
}
return d->ok;
return resultmap;
}
size_t CaSupCurl::write_callback(char *contents, size_t size, size_t nmemb, void *userdata) {
......
......@@ -25,7 +25,7 @@ public:
CaSupCurl(const std::string &url, bool ssl_verifypeer);
virtual ~CaSupCurl();
bool xmit(const std::map<std::string, std::string> &datamap);
std::map<std::string, std::string> xmit(const std::map<std::string, std::string> &datamap);
static size_t write_callback(char *ptr, size_t size, size_t nmemb, void *userdata);
......
......@@ -99,12 +99,14 @@ void CaSupDbActivity::execute() {
// process messages
for(CaSupDbAEvent *e : events) {
CaSupDbFuncs dbf;
printf(" [0x%lx ] CaSupDbActivity.execute: new event\n", pthread_self());
if(e->type == CaSupDbAEvent::RecoverEv) {
dbf.set_last_recover_successful(d->dbh, e->success);
// in case of failure, activities (sources) that had been moved from the activity
printf("[0x%lx ] CaSupDbActivity::execute %s : ", pthread_self(), e->success ? "\e[1;32mrecover successful" : "\e[1;31mrecover failed" );
for(const CuData& da : e->datalist)
printf("\t- %s\n", datos(da));
// in case of failure, activities (sources) that had been moved from the activity
// table to the recover_activities table must be restored back into the activity table
// (moving away from activity prevents concurrent operations from
// different instances of ca-supervisor on the same sources)
......@@ -114,8 +116,11 @@ void CaSupDbActivity::execute() {
printf("\e[1;35mCaSupDbActivity.execute: seems that recover operation failed: check if %ld activities have been"
" restored into the activity table\e[0m\n", e->datalist.size());
}
else
printf("CaSubDbActivity.execute \e[1;32mSuccessfully sent restore for %ld activities\e[0m\n", e->datalist.size());
else {
ok = dbf.remove_from_recovery(d->dbh, e->datalist);
}
printf("CaSubDbActivity.execute \e[1;32mIN CASE OF SUCCESSFUL RESTORE sent restore for %ld activities\e[0m\n", e->datalist.size());
}
else if (e->type == CaSupDbAEvent::DbQueryEv) {
const std::list<CuData> &dali = e->datalist;
......@@ -142,63 +147,6 @@ void CaSupDbActivity::onExit() {
dbf.unregister_instance(d->dbh, d->superv_id);
}
}
std::string CaSupDbActivity::m_get_status(const CuData &da, CaSupDbFuncs *f) const
{
bool err = da["err"].toBool();
bool recover = da.has("state", "RECOVER");
int conf_id;
char msg[2048];
memset(msg, 0, sizeof(char ) * 2048);
da["conf_id"].to<int>(conf_id);
// convert expected to datetime
std::tm te = {}, ts = {}, tc = {}; // expected, started, current
std::istringstream exp(da["expected"].toString());
std::istringstream started(da["started"].toString());
std::istringstream current(da["current_timestamp"].toString());
// 2021-05-17 15:48:55.670696
// https://en.cppreference.com/w/cpp/io/manip/get_time
// Y parses full year as a 4 digit decimal number, leading zeroes permitted but not required
// m parses the month as a decimal number (range [01,12]), leading zeroes permitted but not required
// d parses the day of the month as a decimal number (range [01,31]), leading zeroes permitted but not required
// H parses the hour as a decimal number, 24 hour clock (range [00-23]), leading zeroes permitted but not required
// M parses minute as a decimal number (range [00,59]), leading zeroes permitted but not required
// S parses second as a decimal number (range [00,60]), leading zeroes permitted but not required
exp >> std::get_time(&te, "%Y-%m-%d %H:%M:%S");
if(exp.fail())
perr("CaSubDbMon.m_print_status: failed to parse expected date/time \"%s\"", vtoc2(da, "expected"));
started >> std::get_time(&ts, "%Y-%m-%d %H:%M:%S");
if(started.fail())
perr("CaSubDbMon.m_print_status: failed to parse started date/time \"%s\"", vtoc2(da, "started"));
current >> std::get_time(&tc, "%Y-%m-%d %H:%M:%S");
if(current.fail())
perr("CaSubDbMon.m_print_status: failed to parse current date/time \"%s\"", vtoc2(da, "started"));
if(!exp.fail() && !started.fail() && !current.fail()) {
double expected_in = std::chrono::duration<double, std::milli>(mktime(&te) - mktime(&tc)).count();
char de[256], ds[256];
char srvna[20];
char stat[32], stat_col[16], strike_stat_col[16];
memset(srvna, 0, sizeof(char) * 16);
snprintf(srvna, 12, "\"%s\"", da["srvnam"].toString().c_str());
if(da["srvnam"].toString().length() > 12)
strncat(srvna, "...", 4);
strcpy(stat, da["state"].toString().c_str());
err ? (!recover ? strcpy(stat_col, "\e[1;31m") : strcpy(stat_col, "\e[1;35m") ) : strcpy(stat_col, "\e[1;32m");
err ? strcpy(strike_stat_col, "\e[2;31;1m") : strcpy(strike_stat_col, "\e[1;32;3m");
strncpy(de, asctime(&te), 255);
strncpy(ds, asctime(&ts), 255);
if(strlen(de) > 0) de[strlen(de) - 1 ] = '\0';
if(strlen(ds) > 0) ds[strlen(ds) - 1 ] = '\0';
snprintf(msg, 2047, "%s #\e[0m%8s \e[1;34m%15s\e[0m \e[1;36m%15s:%-5s\e[0m"
"\e[1;33m|\e[0m expected: %s%5.0fs \e[0m [%s]\e[1;33m|\e[0mstarted: %s",
stat_col, stat, srvna, vtoc2(da, "addr"), vtoc2(da, "port"), strike_stat_col, expected_in, de, ds);
}
return std::string(msg);
}
void CaSupDbActivity::onTimeout(CuTimer *) {
std::string stat;
CaSupDbFuncs f;
......@@ -211,8 +159,9 @@ void CaSupDbActivity::onTimeout(CuTimer *) {
d->log->write("ca-supervisor", stat, CuLog::LevelInfo);
cuprintf("%s", stat.c_str());
// remove active services from the recovery table
// if(s.has("state", "ACTIVE"))
// f.remove_from_recovery(d->dbh, id);
// they are expected to recover their stray activities at startup
if(s.has("state", "ACTIVE"))
f.remove_from_recovery(d->dbh, id);
}
else { // dormant
const std::vector<CuData> *srcs = nullptr;
......@@ -241,22 +190,25 @@ void CaSupDbActivity::onTimeout(CuTimer *) {
bool ok = f.insert_recovery_data(d->dbh, id, d->superv_id);
if(ok)
ok = f.insert_recover_operation_start(d->dbh, d->superv_id, id, srcs);
if(ok) // remove the activities being recovered from the activity table to prevent concurrent supervisor operations on them
if(ok)
// remove the activities being recovered from the activity table to prevent
// concurrent supervisor operations on them
ok = f.remove_activities_for_srv_id(d->dbh, id); // get_srcs_for_srv_id counterpart
if(!ok)
d->log->write("ca-supevisor", "database error: " + f.last_db_error);
}
// now recover
// publish result on CaSupervisor
// publish result on CaSupervisor (onResult)
publishResult(srcs);
}
}
}
if(active_srv.size() == 0)
printf("\e[1;33m - \e[0m no active services\n");
// old impl:
// remove old entries from recovery (rows with stray service id, probably recovered by another service
// and no more usable when their service id is not among the activities srv_id
f.clean_recovery_table(d->dbh);
// f.clean_recovery_table(d->dbh);
d->timer_srv->restart(d->timer, d->timer->timeout());
}
......@@ -273,3 +225,59 @@ int CaSupDbActivity::repeat() const {
void CaSupDbActivity::event(CuActivityEvent *e) {
}
std::string CaSupDbActivity::m_get_status(const CuData &da, CaSupDbFuncs *f) const
{
bool err = da["err"].toBool();
bool recover = da.has("state", "RECOVER");
int conf_id;
char msg[2048];
memset(msg, 0, sizeof(char ) * 2048);
da["conf_id"].to<int>(conf_id);
// convert expected to datetime
std::tm te = {}, ts = {}, tc = {}; // expected, started, current
std::istringstream exp(da["expected"].toString());
std::istringstream started(da["started"].toString());
std::istringstream current(da["current_timestamp"].toString());
// 2021-05-17 15:48:55.670696
// https://en.cppreference.com/w/cpp/io/manip/get_time
// Y parses full year as a 4 digit decimal number, leading zeroes permitted but not required
// m parses the month as a decimal number (range [01,12]), leading zeroes permitted but not required
// d parses the day of the month as a decimal number (range [01,31]), leading zeroes permitted but not required
// H parses the hour as a decimal number, 24 hour clock (range [00-23]), leading zeroes permitted but not required
// M parses minute as a decimal number (range [00,59]), leading zeroes permitted but not required
// S parses second as a decimal number (range [00,60]), leading zeroes permitted but not required
exp >> std::get_time(&te, "%Y-%m-%d %H:%M:%S");
if(exp.fail())
perr("CaSubDbMon.m_print_status: failed to parse expected date/time \"%s\"", vtoc2(da, "expected"));
started >> std::get_time(&ts, "%Y-%m-%d %H:%M:%S");
if(started.fail())
perr("CaSubDbMon.m_print_status: failed to parse started date/time \"%s\"", vtoc2(da, "started"));
current >> std::get_time(&tc, "%Y-%m-%d %H:%M:%S");
if(current.fail())
perr("CaSubDbMon.m_print_status: failed to parse current date/time \"%s\"", vtoc2(da, "started"));
if(!exp.fail() && !started.fail() && !current.fail()) {
double expected_in = std::chrono::duration<double, std::milli>(mktime(&te) - mktime(&tc)).count();
char de[256], ds[256];
char srvna[20];
char stat[32], stat_col[16], strike_stat_col[16];
memset(srvna, 0, sizeof(char) * 16);
snprintf(srvna, 12, "\"%s\"", da["srvnam"].toString().c_str());
if(da["srvnam"].toString().length() > 12)
strncat(srvna, "...", 4);
strcpy(stat, da["state"].toString().c_str());
err ? (!recover ? strcpy(stat_col, "\e[1;31m") : strcpy(stat_col, "\e[1;35m") ) : strcpy(stat_col, "\e[1;32m");
err ? strcpy(strike_stat_col, "\e[2;31;1m") : strcpy(strike_stat_col, "\e[1;32;3m");
strncpy(de, asctime(&te), 255);
strncpy(ds, asctime(&ts), 255);
if(strlen(de) > 0) de[strlen(de) - 1 ] = '\0';
if(strlen(ds) > 0) ds[strlen(ds) - 1 ] = '\0';
snprintf(msg, 2047, "%s #\e[0m%8s \e[1;34m%15s\e[0m \e[1;36m%15s:%-5s\e[0m"
"\e[1;33m|\e[0m expected: %s%5.0fs \e[0m [%s]\e[1;33m|\e[0mstarted: %s",
stat_col, stat, srvna, vtoc2(da, "addr"), vtoc2(da, "port"), strike_stat_col, expected_in, de, ds);
}
return std::string(msg);
}
......@@ -61,9 +61,13 @@ std::vector<CuData> CaSupDbFuncs::get_active_services(CaDbH *dbh) {
return v;
}
// return sources (=activities) for the given service id
// sources client id must be valid (i.e. in the register table)
std::vector<CuData>* CaSupDbFuncs::get_srcs_for_srv_id(CaDbH *h, int id) {
std::string stm = "SELECT activity.source,activity.chan,service.conf_id,activity.cli_id from activity,service "
" WHERE service.id=activity.srv_id AND service.id=$1";
std::string stm = "SELECT activity.source,activity.chan,service.conf_id,activity.cli_id "
"FROM activity,service,register "
"WHERE service.id=activity.srv_id AND service.id=$1 AND activity.cli_id "
"IN (SELECT client_id FROM register)";
CaDbRes res = h->execute(stm, std::vector<std::string> { std::to_string(id) });
std::vector<CuData> *v = new std::vector<CuData>();
*v = m_res_to_data(res);
......@@ -77,8 +81,7 @@ bool CaSupDbFuncs::remove_activities_for_srv_id(CaDbH *h, int id) {
return !r.error();
}
/// NOTE ??
/// restore after interval 30 minutes
//
bool CaSupDbFuncs::restore_activities_after_recover_failure(CaDbH *h, const std::list<CuData> &srcs) {
printf("CaSupDbFuncs::restore_activities_after_recover_failure sources size %ld\n", srcs.size());
last_db_error.clear();
......@@ -86,11 +89,13 @@ bool CaSupDbFuncs::restore_activities_after_recover_failure(CaDbH *h, const std:
CaDbRes r = h->begin_transaction();
for(const CuData &s : srcs) {
const std::string& conf_id = s.s("conf_id");
printf("CaSupDbFuncs::restore_activities_after_recover_failure conf_id %s src %s\n", conf_id.c_str(), datos(s));
printf("CaSupDbFuncs::restore_activities_after_recover_failure conf_id %s src %s client id %s\n",
conf_id.c_str(), datos(s), s.s("cli_id").c_str());
if(!r.error() && conf_id.length() > 0) {
r = h->execute("SELECT id FROM service WHERE conf_id=$1", std::vector<std::string> {conf_id});
r = h->execute("SELECT id FROM service,register "
"WHERE service.conf_id=$1 AND register.client_id=$2", std::vector<std::string> {conf_id, s.s("cli_id")});
if(r.size() == 1) {
r = h->execute(stm, std::vector<std::string> { r.value(0, "id"), s.s("conf_id"), s.s("source"), s.s("chan")});
r = h->execute(stm, std::vector<std::string> { r.value(0, "id"), s.s("cli_id"), s.s("source"), s.s("chan")});
printf("CaSupDbFuncs::restore_activities_after_recover_failure executed with ID %s src %s\n", r.value(0, "id").c_str(), s.s("src").c_str());
}
else
......@@ -107,7 +112,6 @@ bool CaSupDbFuncs::restore_activities_after_recover_failure(CaDbH *h, const std:
// clean all unreachable recovery records (whose service id is not in the activity records)
//
qua il problema
bool CaSupDbFuncs::clean_recovery_table(CaDbH *h) {
std::string stm = "DELETE FROM recovery WHERE srv_id NOT IN (SELECT srv_id FROM activity)";
CaDbRes res = h->execute(stm);
......@@ -139,29 +143,6 @@ bool CaSupDbFuncs::unregister_instance(CaDbH *h, int id) {
return !r.error();
}
bool CaSupDbFuncs::insert_recover_operation_start(CaDbH *h, int superv_id, int from_srv_id, const std::vector<CuData> *srcs) {
std::string conf_id;
std::string stm = "SELECT srvconf.conf_id FROM srvconf,service where service.id=$1 AND srvconf.conf_id=service.conf_id";
CaDbRes r = h->execute(stm, std::vector<std::string> { std::to_string(from_srv_id) });
if(!r.error() && r.size() > 0)
conf_id = r.value(0, "conf_id");
if(conf_id.length() > 0) {
std::string recover_op_id;
stm = "INSERT INTO recover_operations (id,supervisor_id,started) VALUES (DEFAULT,$1,current_timestamp) RETURNING id";
r = h->execute(stm, std::vector<std::string> { std::to_string(superv_id) });
if(!r.error() && r.size() > 0)
recover_op_id = r.value(0, "id");
if(recover_op_id.length() > 0) {
for (const CuData& s : *srcs) {
stm = "INSERT INTO recover_activities (operation_id,source,chan,cli_id,from_srv_conf) VALUES ($1,$2,$3,$4,$5)";
r = h->execute(stm, std::vector<std::string> { recover_op_id, s.s("source"), s.s("chan"),s.s("cli_id"), conf_id });
}
}
}
last_db_error = r.errmsg;
return !r.error();
}
bool CaSupDbFuncs::register_activities(CaDbH *dbhan, const std::string &srv_id,
const std::string& cli_id,
const std::string &channel,
......@@ -172,31 +153,30 @@ bool CaSupDbFuncs::register_activities(CaDbH *dbhan, const std::string &srv_id,
last_db_error.clear();
CaDbRes res = dbhan->begin_transaction();
err = res.error();
printf("CaSupDbFuncs::register_activities datalist siz %ld chennel %s\n", datalist.size(), channel.c_str());
for(const CuData& da : datalist) {
if(!err) {
const std::string& src = da.s("src");
const std::string& src = da.s("src"), &_chan = da.containsKey("channel") ? da.s("channel") : channel;
/// NOTE:
// see Notes on activities in the caserver database plugin design rationale
//
// is there an activity recorded with src and chan? Yes, update srv_id
stm = "SELECT srv_id FROM activity WHERE source=$1 AND chan=$2 AND cli_id=$3";
res = dbhan->execute(stm, std::vector<std::string> { src, channel, cli_id });
res = dbhan->execute(stm, std::vector<std::string> { src, _chan, cli_id });
if(!res.error() && res.size() > 0) {
// same source, same chan, same client id
stm = "UPDATE activity SET srv_id=$1 WHERE source=$2 AND chan=$3";
res = dbhan->execute(stm, std::vector<std::string> { srv_id, src, channel });
res = dbhan->execute(stm, std::vector<std::string> { srv_id, src, _chan });
if(res.error())
printf("\e[1;31mupdate fails %s\e[0m\n", res.errmsg.c_str());
}
else if(!res.error()) {
stm = "INSERT INTO activity (srv_id,cli_id,source,chan) VALUES($1,$2,$3,$4)";
res = dbhan->execute(stm, std::vector<std::string> { srv_id, cli_id, src, channel });
res = dbhan->execute(stm, std::vector<std::string> { srv_id, cli_id, src, _chan });
if(res.error())
printf("\e[1;31mINSERT fails %s\e[0m\n", res.errmsg.c_str());
}
if(res.error())
printf("\e[1;31mISELECtT fails %s\e[0m\n", res.errmsg.c_str());
printf("CaSupDbFuncs::register_activities \e[1;31mSELECT fails %s\e[0m\n", res.errmsg.c_str());
err = res.error();
}
}
......@@ -305,9 +285,88 @@ bool CaSupDbFuncs::insert_recovery_data(CaDbH *h, int srv_id,int superv_id) {
return !res.error();
}
bool CaSupDbFuncs::insert_recover_operation_start(CaDbH *h,
int superv_id,
int from_srv_id,
const std::vector<CuData> *srcs) {
std::string conf_id;
std::string stm = "SELECT srvconf.conf_id FROM srvconf,service where service.id=$1 AND srvconf.conf_id=service.conf_id";
CaDbRes r = h->execute(stm, std::vector<std::string> { std::to_string(from_srv_id) });
if(!r.error() && r.size() > 0)
conf_id = r.value(0, "conf_id");
if(conf_id.length() > 0) {
std::string recover_op_id;
stm = "INSERT INTO recover_operations (id,supervisor_id,started) VALUES (DEFAULT,$1,current_timestamp) RETURNING id";
r = h->execute(stm, std::vector<std::string> { std::to_string(superv_id) });
if(!r.error() && r.size() > 0)
recover_op_id = r.value(0, "id");
if(recover_op_id.length() > 0) {
for (const CuData& s : *srcs) {
printf("\e[1;33minsert_recover_operation_start: data ras %s\e[0m\n", datos(s));
stm = "INSERT INTO recover_activities (operation_id,source,chan,cli_id,from_srv_conf) VALUES ($1,$2,$3,$4,$5)";
r = h->execute(stm, std::vector<std::string> { recover_op_id, s.s("source"), s.s("chan"),s.s("cli_id"), conf_id });
}
}
}
last_db_error = r.errmsg;
return !r.error();
}
// invoked when a service is active
bool CaSupDbFuncs::remove_from_recovery(CaDbH *h, int srv_id) {
std::string stm = "DELETE FROM recovery WHERE srv_id=$1";
CaDbRes res = h->execute(stm, std::vector<std::string> { std::to_string(srv_id) });
CaDbRes res = h->begin_transaction();
// get all the operation IDs associated to srv_id
std::string stm = "SELECT operation_id FROM recover_activities WHERE from_srv_conf="
"(SELECT conf_id FROM service WHERE id=$1)";
if(!res.error()) // begin transaction successful
res = h->execute(stm, std::vector<std::string> { std::to_string(srv_id) });
// now delete from recover_activities and recover_operations by operation_id
for(int i = 0; !res.error() && i < res.size(); i++) {
const std::string& opid = res.value(0, "operation_id");
res = h->execute("DELETE FROM recover_activities WHERE operation_id=$1", std::vector<std::string> { opid });
if(!res.error())
res = h->execute("DELETE FROM recover_operations WHERE id=$1", std::vector<std::string> { opid });
}
if(!res.error()) {
stm = "DELETE FROM recovery WHERE srv_id=$1";
res = h->execute(stm, std::vector<std::string> { std::to_string(srv_id) });
}
if(!res.error())
h->commit_transaction();
last_db_error = res.errmsg;
return !res.error();
}
bool CaSupDbFuncs::remove_from_recovery(CaDbH *h, const std::list<CuData> &datali) {
CaDbRes res = h->begin_transaction();
std::vector<std::string> opids, srvconfs;
// get involved operation IDs to remove from recover_activities and recover_operations
// get involved srv conf to delete from recovery by srv id
std::string stm = "SELECT operation_id,from_srv_conf FROM recover_activities WHERE source=$1 AND chan=$2"
" AND cli_id=$3 AND from_srv_conf=$4";
for(const CuData& da : datali) {
if(!res.error()) {
res = h->execute(stm, std::vector<std::string> { da.s("source"), da.s("chan"), da.s("cli_id"), da.s("conf_id")});
printf("CaSupDbFuncs::remove_from_recovery \e[1;35m removing from recovery %s\e[0m\n", datos(da));
for(int i = 0; !res.error() && i < res.size(); i++) {
opids.push_back(res.value(i, "operation_id"));
srvconfs.push_back(res.value(i, "from_srv_conf"));
}
}
} // for datali
for(const std::string& opid : opids) {
if(!res.error())
res = h->execute("DELETE FROM recover_activities WHERE operation_id=$1", std::vector<std::string> {opid});
if(!res.error())
res = h->execute("DELETE FROM recover_operations WHERE id=$1", std::vector<std::string> {opid});
}
for(const std::string& srvco : srvconfs) {
if(!res.error())
res = h->execute("DELETE FROM recovery WHERE srv_id=(SELECT id FROM service WHERE conf_id=$1)",
std::vector<std::string> {srvco});
}
if(!res.error())
h->commit_transaction();
last_db_error = res.errmsg;
return !res.error();
}
......@@ -22,6 +22,7 @@ public:
std::vector<CuData> m_res_to_data(const CaDbRes& res);
bool insert_recovery_data(CaDbH *h, int srv_id, int superv_id);
bool remove_from_recovery(CaDbH *h, int srv_id);
bool remove_from_recovery(CaDbH *h, const std::list<CuData>& datali);
bool clean_recovery_table(CaDbH *h);
bool set_last_recover_successful(CaDbH *h, bool success);
int register_instance(CaDbH *h, const std::string &url);
......
......@@ -8,18 +8,18 @@ CaSupJsoniz::CaSupJsoniz()
}
/*!
* \brief CaSupJsoniz::jsonize returns a vector of json requests grouped by channel
* \brief CaSupJsoniz::jsonize returns a vector of json requests grouped by cli_id
* \param dl vector of data as CuData
* \return a vector of json requests to use with caserver, each element refers to a different channel
* \return a vector of json requests to use with caserver, each element refers to a different cli_id
*/
std::map<std::string, std::string> CaSupJsoniz::jsonize(const std::vector<CuData> &dl) const {
std::map<std::string, std::string> CaSupJsoniz::jsonize(const std::vector<CuData> &dl,
std::map<std::string, std::list<CuData>> & clidmap) const {
std::map<std::string, std::string> jsonma;
// group by channel
std::map<std::string, std::vector<CuData>> chdmap;
// group by cli_id
for(const CuData& d : dl) {
chdmap[d["cli_id"].toString()].push_back(d);
clidmap[d["cli_id"].toString()].push_back(d);
}
for(std::map<std::string, std::vector<CuData>>::const_iterator it = chdmap.begin(); it != chdmap.end(); ++it) {
for(std::map<std::string, std::list<CuData>>::const_iterator it = clidmap.begin(); it != clidmap.end(); ++it) {
nlohmann::json src_array;
for(const CuData& d : it->second) {
nlohmann::json data_o;
......@@ -47,7 +47,7 @@ std::map<std::string, std::string> CaSupJsoniz::jsonize(const std::vector<CuData
data_o["method"] = "S";
data_o["recovered-from-srv-conf"] = d["conf_id"].toString();
data_o["recovered-by"] = "casupervisor";
data_o["channel"] = d["channel"].toString();
data_o["channel"] = d.s("chan").length() > 0 ? d.s("chan") : d.s("channel");
src_array.push_back(data_o);
}
nlohmann::json req;
......@@ -64,26 +64,38 @@ const std::string CaSupJsoniz::make_msg_ok() const {
return o.dump();
}
const std::string CaSupJsoniz::make_err_msg(const std::string &msg) const
{
nlohmann::json o;
o["msg"] = msg.length() == 0 ? "ok" : msg;
o["err"] = msg.length() > 0;
return o.dump();
}
bool CaSupJsoniz::is_err_msg(const std::string &json, std::string &errmsg) const
{
try {
nlohmann::json js = nlohmann::json::parse(json);
if(js.is_array()) {
for (nlohmann::json jse : js) {
if(jse.contains("err")) {
bool err = jse["err"];
if(err && jse.contains("msg"))
errmsg = jse["msg"];
errmsg.clear();
if(json.length() > 0)
{
try {
nlohmann::json js = nlohmann::json::parse(json);
if(js.is_array()) {
for (nlohmann::json jse : js) {
if(jse.contains("err")) {