Commit 6761b3f1 authored by Giacomo Strangolino's avatar Giacomo Strangolino
Browse files

fixes to recovery section - database and operation

parent 4af906a0
......@@ -97,7 +97,6 @@ headers = [
'src/config.h',
'src/ca-supervisor.h',
'src/casupdbactivity.h',
'src/casupredistrib.h',
'src/casupdbfuncs.h',
'src/casupjsoniz.h',
'src/casupcurl.h',
......@@ -109,7 +108,6 @@ sources = [
'src/ca-supervisor.cpp',
'src/main.cpp',
'src/casupdbactivity.cpp',
'src/casupredistrib.cpp',
'src/casupdbfuncs.cpp',
'src/casupjsoniz.cpp',
'src/casupcurl.cpp',
......
#include "ca-supervisor.h"
#include "casupjsoniz.h"
#include "casupdbactivity.h"
#include "casupcurl.h"
#include "casuprecoverxmitevent.h"
#include <sys/types.h>
#include <sys/socket.h>
......@@ -20,13 +22,16 @@ public:
CaSupervisorPrivate(CaSupDbActivity* _dba, CuLogImplI *l) : log(l) , dba(_dba) {}
CuLogImplI *log;
CaSupDbActivity *dba;
CaSupCurl *sucu;
};
CaSupervisor::CaSupervisor(CaSupDbActivity *dba, CuLogImplI *log) {
CaSupervisor::CaSupervisor(CaSupDbActivity *dba, const std::string &url, bool ssl_verify_peer, CuLogImplI *log) {
d = new CaSupervisorPrivate(dba, log);
d->sucu = new CaSupCurl(url, ssl_verify_peer);
}
CaSupervisor::~CaSupervisor() {
delete d->sucu;
delete d;
}
......@@ -35,7 +40,16 @@ void CaSupervisor::onProgress(int step, int total, const CuData &data) { }
void CaSupervisor::onResult(const CuData &data) {
printf("CaSupervisor.onResult: received %s\n", datos(data));
if(data.containsKey("data")) {
if(data.has("type", "init")) { // from CaSupDbActivity
bool err = data["err"].toBool();
d->log->write("ca-supervisor", data["msg"].toString(), err ? CuLog::LevelError : CuLog::LevelInfo);
if(err) {
perr("ca-supervisor: %s", vtoc2(data, "msg"));
exit(EXIT_FAILURE);
}
}
// from CaReceiver_A activity
else if(data.containsKey("data")) {
const std::string& s = data.s("data");
CaJsonSrcBundleExtract bux;
std::list<CuData> dl;
......@@ -53,8 +67,6 @@ void CaSupervisor::onResult(const CuData &data) {
CaSupDbAEvent *e = new CaSupDbAEvent(extram["srv_id"], id, extram["method"], chan, dl);
d->dba->new_event(e);
}
int fd = data.I("fd");
int byw = m_sock_write(fd, CaSupJsoniz().make_msg_ok());
printf("[0x%lx] \e[1;32mCaSupervisor::onResult \e[0;35m %d bytes written in reply \e[0m\n", pthread_self(), byw);
......@@ -70,7 +82,19 @@ void CaSupervisor::onResult(const CuData &data) {
}
}
void CaSupervisor::onResult(const std::vector<CuData> &datalist) { }
void CaSupervisor::onResult(const std::vector<CuData> &srcs) {
// from CaSupDbActivity
CaSupJsoniz jiz;
const std::map<std::string, std::string> &jsonma = jiz.jsonize(srcs);
if(jsonma.size() > 0)
d->log->write("ca-supervisor", "requests grouped by *channel*", CuLog::LevelInfo);
for(const auto &[k, v] : jsonma) {
d->log->write("ca-supervisor", k + " --> " + v, CuLog::LevelInfo);
}
std::string errmsg;
bool ok = d->sucu->xmit(jsonma) && !jiz.is_err_msg(d->sucu->response(), errmsg);
d->dba->new_event(new CaSupDbAEvent(ok, std::list<CuData>(srcs.begin(), srcs.end()), errmsg));
}
CuData CaSupervisor::getToken() const {
return CuData("type", "ca-supervisor");
......
......@@ -12,14 +12,14 @@ class CaSupervisorPrivate;
class CaSupervisor : public CuThreadListener
{
public:
CaSupervisor(CaSupDbActivity *dba, CuLogImplI *log);
CaSupervisor(CaSupDbActivity *dba, const std::string& url, bool ssl_verify_peer, CuLogImplI *log);
virtual ~CaSupervisor();
// CuThreadListener interface
public:
void onProgress(int step, int total, const CuData &data);
void onResult(const CuData &data);
void onResult(const std::vector<CuData> &datalist);
void onResult(const std::vector<CuData> &srcs);
CuData getToken() const;
private:
int m_sock_write(int sofd, const std::string &buf);
......
......@@ -5,7 +5,7 @@
class CaSupCurlPrivate {
public:
CURL *curl;
std::string url, message;
std::string url, message, response;
bool ok, ssl_verify_peer;
};
......@@ -27,7 +27,7 @@ bool CaSupCurl::xmit(const std::map<std::string,std::string> &datamap) {
for(const auto& [chan, json] : datamap) {
++cnt;
struct curl_slist *slist = nullptr;
std::string response, chanhdr = "X-Channel: " + chan;
std::string chanhdr = "X-Channel: " + chan;
std::string content_len = "Content-Length: " + std::to_string(json.length());
char errbuf[CURL_ERROR_SIZE];
curl_slist_append(slist, "Accept: application/json");
......@@ -43,7 +43,8 @@ bool CaSupCurl::xmit(const std::map<std::string,std::string> &datamap) {
// suppress response to stdout
curl_easy_setopt(d->curl, CURLOPT_WRITEFUNCTION, CaSupCurl::write_callback);
CaSupCurlWriteFuncData *wd = new CaSupCurlWriteFuncData(std::to_string(cnt));
curl_easy_setopt(d->curl, CURLOPT_WRITEDATA, (void *) &response );
d->response.clear();
curl_easy_setopt(d->curl, CURLOPT_WRITEDATA, (void *) &d->response );
CURLcode cuco = curl_easy_perform(d->curl);
d->ok = cuco == CURLE_OK;
......@@ -53,24 +54,25 @@ bool CaSupCurl::xmit(const std::map<std::string,std::string> &datamap) {
curl_slist_free_all(slist);
if(!d->ok)
perr("CaSupCurl::xmit: %s", d->message.c_str());
printf("CaSupCurl::xmit: response: \"\e[1;32m %s\e[0m\"\n", response.c_str());
delete wd;
}
return d->ok;
}
size_t CaSupCurl::write_callback(char *contents, size_t size, size_t nmemb, void *userdata) {
printf("CaSupCurl.write_callback ...\n");
std::string *response = static_cast<std::string *>(userdata);
response->append((char*)contents, size * nmemb);
return size * nmemb;
}
int CaSupCurl::socket_callback(CURL *easy, curl_socket_t s, int what, void *userp, void *socketp) {
return 0;
}
int CaSupCurl::start_timeout(CURLM *multi, long timeout_ms, void *userp) {
return 0;
}
const std::string &CaSupCurl::response() const {
return d->response;
}
......@@ -35,6 +35,7 @@ public:
void *userp, /* private callback pointer */
void *socketp);
static int start_timeout(CURLM *multi, long timeout_ms, void *userp);
const std::string& response() const;
private:
CaSupCurlPrivate *d;
......
......@@ -102,10 +102,22 @@ void CaSupDbActivity::execute() {
CaSupDbFuncs dbf;
printf(" [0x%lx ] CaSupDbActivity.execute: new event\n", pthread_self());
if(e->type == CaSupDbAEvent::RecoverSuccessful) {
if(e->type == CaSupDbAEvent::RecoverEv) {
dbf.set_last_recover_successful(d->dbh, e->success);
// in case of failure, activities (sources) that had been moved from the activity
// table to the recover_activities table must be restored back into the activity table
// (moving away from activity prevents concurrent operations from
// different instances of ca-supervisor on the same sources)
if(!e->success) {
d->log->write("ca-supervisor", "activity recover error: " + e->method);
ok = dbf.restore_activities_after_recover_failure(d->dbh, e->datalist);
printf("\e[1;35mCaSupDbActivity.execute: seems that recover operation failed: check if %ld activities have been"
" restored into the activity table\e[0m\n", e->datalist.size());
}
else
printf("CaSubDbActivity.execute \e[1;32mSuccessfully sent restore for %ld activities\e[0m\n", e->datalist.size());
}
else if (e->type == CaSupDbAEvent::DbQuery) {
else if (e->type == CaSupDbAEvent::DbQueryEv) {
const std::list<CuData> &dali = e->datalist;
printf(" [0x%lx ] CaSupDbActivity.execute: db query event data - server ID %s method %s\n",
pthread_self(), e->server_id.c_str(), e->method.c_str());
......@@ -113,10 +125,9 @@ void CaSupDbActivity::execute() {
ok = dbf.register_activities(d->dbh, e->server_id, e->cli_id, e->channel, e->datalist);
else if(e->method == "u")
ok = dbf.unregister_activities(d->dbh, e->server_id, e->cli_id, e->channel, e->datalist);
if(!ok)
d->log->write("ca-supervisor", "database error: " + dbf.last_db_error);
}
if(!ok) // database error
d->log->write("ca-supervisor", "database error: " + dbf.last_db_error);
delete e;
} // for event in events
......@@ -200,7 +211,8 @@ void CaSupDbActivity::onTimeout(CuTimer *) {
d->log->write("ca-supervisor", stat, CuLog::LevelInfo);
cuprintf("%s", stat.c_str());
// remove active services from the recovery table
f.remove_from_recovery(d->dbh, id);
// if(s.has("state", "ACTIVE"))
// f.remove_from_recovery(d->dbh, id);
}
else { // dormant
const std::vector<CuData> *srcs = nullptr;
......@@ -235,7 +247,7 @@ void CaSupDbActivity::onTimeout(CuTimer *) {
d->log->write("ca-supevisor", "database error: " + f.last_db_error);
}
// now recover
// publish result on CaSupRedistrib
// publish result on CaSupervisor
publishResult(srcs);
}
}
......@@ -259,11 +271,5 @@ int CaSupDbActivity::repeat() const {
}
void CaSupDbActivity::event(CuActivityEvent *e) {
printf("[0x%lx] CaSupDbActivity.event: %d\n", pthread_self(), e->getType());
if(e->getType() == static_cast<int>(CaSupRecoverXmitEvent::RecoverXmitEventType)) {
CaSupRecoverXmitEvent* re = static_cast<CaSupRecoverXmitEvent *>(e);
printf("[0x%lx] CaSupDbActivity.event: recover event ok ? %s sources size %ld\n",
pthread_self(), re->success ? "YES" : "NO", re->srcs.size());
}
}
......@@ -12,22 +12,23 @@ class CuEventLoopService;
class CaSupDbAEvent {
public:
enum CaSupDbAEventType { DbQuery, RecoverSuccessful };
enum CaSupDbAEventType { DbQueryEv, RecoverEv };
CaSupDbAEvent(const std::string& srv_id,
const std::string& clid,
const std::string& me,
const std::string& chan,
const std::list<CuData>& dlist)
: server_id(srv_id), cli_id(clid), channel(chan), method(me), type(DbQuery) {
: server_id(srv_id), cli_id(clid), channel(chan), method(me), type(DbQueryEv) {
datalist = std::move(dlist);
}
CaSupDbAEvent(bool su) : type(RecoverSuccessful), success(su) {}
CaSupDbAEvent(bool su, const std::list<CuData>& dali, const std::string& msg)
: type(RecoverEv), datalist(dali), success(su), method(msg) {} // use method to store msg
std::string server_id;
std::string cli_id;
std::string channel, method;
std::string channel, method; // method can store error message in type RecoverEv
CaSupDbAEventType type;
std::list<CuData> datalist;
bool success;
......
......@@ -77,27 +77,37 @@ bool CaSupDbFuncs::remove_activities_for_srv_id(CaDbH *h, int id) {
return !r.error();
}
bool CaSupDbFuncs::restore_activities_after_recover_failure(CaDbH *h, const std::vector<CuData> &srcs) {
/// NOTE ??
/// restore after interval 30 minutes
bool CaSupDbFuncs::restore_activities_after_recover_failure(CaDbH *h, const std::list<CuData> &srcs) {
printf("CaSupDbFuncs::restore_activities_after_recover_failure sources size %ld\n", srcs.size());
last_db_error.clear();
const std::string &stm = "INSERT INTO activity (srv_id,cli_id,source,chan) VALUES($1,$2,$3,$4)";
CaDbRes r = h->begin_transaction();
for(const CuData &s : srcs) {
const std::string& conf_id = s.s("conf_id");
printf("CaSupDbFuncs::restore_activities_after_recover_failure conf_id %s src %s\n", conf_id.c_str(), datos(s));
if(!r.error() && conf_id.length() > 0) {
r = h->execute("SELECT id FROM service WHERE conf_id=$1", std::vector<std::string> {conf_id});
if(r.size() == 1)
r = h->execute(stm, std::vector<std::string> { r.value(0, "id"), s.s("conf_id"), s.s("src"), s.s("channel")});
if(r.size() == 1) {
r = h->execute(stm, std::vector<std::string> { r.value(0, "id"), s.s("conf_id"), s.s("source"), s.s("chan")});
printf("CaSupDbFuncs::restore_activities_after_recover_failure executed with ID %s src %s\n", r.value(0, "id").c_str(), s.s("src").c_str());
}
else
last_db_error = "restore_activities_after_recover_failure: no service with conf_id " + conf_id;
}
}
if(!r.error())
h->commit_transaction();
if(r.error()) last_db_error = r.errmsg;
return !r.error();
}
// clean all unreachable recovery records (whose service id is not in the activity records)
//
qua il problema
bool CaSupDbFuncs::clean_recovery_table(CaDbH *h) {
std::string stm = "DELETE FROM recovery WHERE srv_id NOT IN (SELECT srv_id FROM activity)";
CaDbRes res = h->execute(stm);
......@@ -286,9 +296,11 @@ bool CaSupDbFuncs::insert_recovery_data(CaDbH *h, int srv_id,int superv_id) {
std::string stm = "UPDATE recovery SET started=current_timestamp WHERE srv_id=$1";
CaDbRes res = h->execute(stm, std::vector<std::string> { std::to_string(srv_id) });
if(!res.error() && res.affected_rows == 0) {
printf("CaSupDbFuncs::insert_recovery_data: inserting new row id %d superv_id %d\n", srv_id, superv_id);
stm = "INSERT INTO recovery (id,srv_id,started,supervisor_id) VALUES(DEFAULT,$1,current_timestamp,$2)";
res = h->execute(stm, std::vector<std::string> { std::to_string(srv_id), std::to_string(superv_id) });
}
printf("CaSupDbFuncs::insert_recovery_data: error %d msg %s\n", res.error(), res.errmsg.c_str());
last_db_error = res.errmsg;
return !res.error();
}
......
......@@ -15,7 +15,7 @@ public:
std::vector<CuData> get_active_services(CaDbH *dbh);
std::vector<CuData> *get_srcs_for_srv_id(CaDbH *h, int id);
bool remove_activities_for_srv_id(CaDbH *h, int id);
bool restore_activities_after_recover_failure(CaDbH *h, const std::vector<CuData>& srcs);
bool restore_activities_after_recover_failure(CaDbH *h, const std::list<CuData> &srcs);
std::string last_db_error, msg;
......
......@@ -44,7 +44,7 @@ std::map<std::string, std::string> CaSupJsoniz::jsonize(const std::vector<CuData
//
// from source (db table name) to src
data_o["src"] = d["source"].toString();
data_o["method"] = "s";
data_o["method"] = "S";
data_o["recovered-from-srv-conf"] = d["conf_id"].toString();
data_o["recovered-by"] = "casupervisor";
data_o["channel"] = d["channel"].toString();
......@@ -63,3 +63,27 @@ const std::string CaSupJsoniz::make_msg_ok() const {
o["msg"] = "ok";
return o.dump();
}
bool CaSupJsoniz::is_err_msg(const std::string &json, std::string &errmsg) const
{
try {
nlohmann::json js = nlohmann::json::parse(json);
if(js.is_array()) {
for (nlohmann::json jse : js) {
if(jse.contains("err")) {
bool err = jse["err"];
if(err && jse.contains("msg"))
errmsg = jse["msg"];
}
}
}
} catch (const nlohmann::detail::parse_error& pe) {
perr("CaSupJsoniz::is_err_msg: JSON parse error: %s in \"%s\"", pe.what(), json.c_str());
errmsg = std::string(pe.what());
} catch (const nlohmann::detail::type_error& te) {
perr("CaSupJsoniz::is_err_msg: nlohmann::json type error: %s", te.what());
errmsg = std::string(te.what());
}
return errmsg.length() > 0;
}
......@@ -15,6 +15,7 @@ public:
CaSupJsoniz();
std::map<std::string, std::string> jsonize(const std::vector<CuData>&dl) const;
const std::string make_msg_ok() const;
bool is_err_msg(const std::string& json, std::string& errmsg) const;
};
#endif // CASUPJSONIZ_H
......@@ -10,7 +10,6 @@
class CaSupRedistribPrivate {
public:
CaSupCurl *sucu;
CuLogImplI *log_i;
Cumbia *cumbia;
};
......@@ -32,31 +31,19 @@ void CaSupRedistrib::onProgress(int step, int total, const CuData &data) {
}
void CaSupRedistrib::onResult(const CuData &data) {
if(data.has("type", "init")) {
bool err = data["err"].toBool();
d->log_i->write("ca-supervisor", data["msg"].toString(), err ? CuLog::LevelError : CuLog::LevelInfo);
if(err) {
perr("ca-supervisor: %s", vtoc2(data, "msg"));
exit(EXIT_FAILURE);
}
}
// if(data.has("type", "init")) {
// bool err = data["err"].toBool();
// d->log_i->write("ca-supervisor", data["msg"].toString(), err ? CuLog::LevelError : CuLog::LevelInfo);
// if(err) {
// perr("ca-supervisor: %s", vtoc2(data, "msg"));
// exit(EXIT_FAILURE);
// }
// }
}
void CaSupRedistrib::onResult(const std::vector<CuData> &datalist)
{
CaSupJsoniz jiz;
const std::map<std::string, std::string> &jsonma = jiz.jsonize(datalist);
if(jsonma.size() > 0)
d->log_i->write("ca-supervisor", "requests grouped by *channel*", CuLog::LevelInfo);
for(const auto &[k, v] : jsonma) {
d->log_i->write("ca-supervisor", k + " --> " + v, CuLog::LevelInfo);
}
bool ok = d->sucu->xmit(jsonma);
CuActivity *a = d->cumbia->findActivity(CuData("type", "supdba"));
if(a) {
printf("CaSupRedistrib.onResult: posting event to %p %s\n", a, datos(a->getToken()));
d->cumbia->postEvent(a, new CaSupRecoverXmitEvent(-1, ok, datalist));
}
}
......
......@@ -112,16 +112,14 @@ int main(int argc, char *argv[]) {
CuThreadFactoryImpl* thf_impl = new CuThreadFactoryImpl;
CuThreadsEventBridgeFactory *thread_eb_f = new CuThreadsEventBridgeFactory;
CaSupRedistrib *redistrib = new CaSupRedistrib(opts["url"].toString(), opts["curlopt_ssl_verifypeer"].toBool(), log_i, cumbia);
CaSupDbActivity *sup = new CaSupDbActivity(montok, loo_s, log_i);
cumbia->registerActivity(sup, redistrib, montok, *thf_impl, *thread_eb_f);
CaSupervisor *supervisor = new CaSupervisor(sup, opts["url"].toString(), opts["curlopt_ssl_verifypeer"].toBool(), log_i);
CaSupervisor *supervisor = new CaSupervisor(sup, log_i);
CaReceiver_A *server_a = nullptr;
printf("\e[1;32m # \e[0mmain.cpp: starting receiver on %s\n", datos(opts));
server_a = new CaReceiver_A(opts);
cumbia->registerActivity(server_a, supervisor, CuData("type", "ca-supervisor-receiver"), *thf_impl, *thread_eb_f);
cumbia->registerActivity(sup, supervisor, montok, *thf_impl, *thread_eb_f);
printf("\e[1;32m # \e[0mmain.cpp: \e[1;32mcasupervisor version \e[2;32m%s\e[0m started\n", CASUPERVISOR_VERSION);
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment