Commit c1547133 authored by Giacomo Strangolino's avatar Giacomo Strangolino
Browse files

invert-expired-app-record-management saved first adaptation

parent 135b6085
......@@ -14,6 +14,7 @@
#include <culog.h>
#include <cadbhandle.h>
#include <cajson-src-bundle-ex.h>
#include <canetmsg.h>
#define _BUFSIZ 512
......@@ -50,7 +51,7 @@ void CaSupervisor::onResult(const CuData &data) {
}
// from CaReceiver_A activity
else if(data.containsKey("data")) {
const std::string& s = data.s("data");
const std::string& s = CaNetMsg(data.s("data")).payload;
printf("ca-supervisor.onResult: received \e[1;36m%s\e[0m\n", s.c_str());
CaJsonSrcBundleExtract bux;
std::list<CuData> dl;
......@@ -70,7 +71,8 @@ void CaSupervisor::onResult(const CuData &data) {
d->dba->new_event(e);
}
int fd = data.I("fd");
int byw = m_sock_write(fd, CaSupJsoniz().make_msg_ok());
CaNetMsg m(200, CaSupJsoniz().make_msg_ok());
int byw = m_sock_write(fd, m.raw());
printf("[0x%lx] \e[1;32mCaSupervisor::onResult \e[0;35m %d bytes written in reply \e[0m\n", pthread_self(), byw);
}
// connection / disconnection from ca-proxy
......@@ -85,6 +87,9 @@ void CaSupervisor::onResult(const CuData &data) {
}
void CaSupervisor::onResult(const std::vector<CuData> &srcs) {
pretty_pri("data siz %ld\n", srcs.size());
for(const CuData &da : srcs)
printf("- %s\n", datos(da));
// from CaSupDbActivity
CaSupJsoniz jiz;
bool ok;
......
......@@ -100,11 +100,12 @@ void CaSupDbActivity::execute() {
for(CaSupDbAEvent *e : events) {
CaSupDbFuncs dbf;
if(e->type == CaSupDbAEvent::RecoverEv) {
// "UPDATE recover_operations SET success=$1,complete=current_timestamp WHERE id=(SELECT MAX(id) FROM recover_operations)"
dbf.set_last_recover_successful(d->dbh, e->success);
printf("[0x%lx ] CaSupDbActivity::execute %s : ", pthread_self(), e->success ? "\e[1;32mrecover successful" : "\e[1;31mrecover failed" );
for(const CuData& da : e->datalist)
printf("\t- %s\n", datos(da));
// in case of failure, activities (sources) that had been moved from the activity
// in case of failure, activities (sources) that had been moved from the activity
// table to the recover_activities table must be restored back into the activity table
// (moving away from activity prevents concurrent operations from
// different instances of ca-supervisor on the same sources)
......@@ -114,9 +115,10 @@ void CaSupDbActivity::execute() {
printf("\e[1;35mCaSupDbActivity.execute: seems that recover operation failed: check if %ld activities have been"
" restored into the activity table\e[0m\n", e->datalist.size());
}
else {
ok = dbf.remove_from_recovery(d->dbh, e->datalist);
}
// remove from recovery in every case (success or not)
ok = dbf.remove_from_recovery(d->dbh, e->datalist);
printf("CaSubDbActivity.execute \e[1;32mIN CASE OF SUCCESSFUL RESTORE sent restore for %ld activities\e[0m\n", e->datalist.size());
}
else if (e->type == CaSupDbAEvent::DbQueryEv) {
......@@ -127,6 +129,12 @@ void CaSupDbActivity::execute() {
ok = dbf.register_activities(d->dbh, e->server_id, e->cli_id, e->channel, e->datalist);
else if(e->method == "u")
ok = dbf.unregister_activities(d->dbh, e->cli_id, e->channel, e->datalist);
else if(e->method == "upd") {
ok = dbf.unregister_activities_not_in(d->dbh, e->datalist);
pretty_pri("up to date list siz %ld cli id %s channel %s", e->datalist.size(), e->cli_id.c_str(), e->channel.c_str());
for(const CuData& da : e->datalist)
printf("- %s\n", datos(da));
}
}
if(!ok) // database error
d->log->write("ca-supervisor", "database error: " + dbf.last_db_error);
......@@ -184,9 +192,11 @@ void CaSupDbActivity::onTimeout(CuTimer *) {
}
else {
printf("[0x%lx] \e[1;32m recovering %ld activities for service id %d\e[0m\n", pthread_self(), srcs.size(), id);
bool ok = f.insert_recovery_data(d->dbh, id, d->superv_id);
if(ok)
bool ok = f.insert_recovery_data(d->dbh, id, d->superv_id); // updates recovery table
if(ok) {
// insert into recover_operations and recover_activities
ok = f.insert_recover_operation_start(d->dbh, d->superv_id, id, srcs);
}
if(ok)
// remove the activities being recovered from the activity table to prevent
// concurrent supervisor operations on them
......
#include "casupdbfuncs.h"
#include <cadbhandle.h>
#include <caidlisth.h>
CaSupDbFuncs::CaSupDbFuncs() { }
......@@ -220,6 +221,44 @@ bool CaSupDbFuncs::m_unregister_a(CaDbH *dbhan,
return !res.error();
}
bool CaSupDbFuncs::unregister_activities_not_in(CaDbH *dbhan, const std::list<CuData> &datalist) {
bool ok = false;
CaDbRes res;
last_db_error.clear();
if(datalist.size() > 0) {
const CuData& da = *(datalist.begin());
pretty_pri("processing data %s", datos(da));
std::string stmde;
res = dbhan->begin_transaction();
ok = !res.error();
std::string ids;
if(ok && da.containsKey("ids")) {
const std::vector<unsigned long long> & vi = CaIDListH().uncompress(da.s("ids"));
if(vi.size() > 0) {
size_t i;
ids = "(";
for(i = 0; i < vi.size() - 1; i++)
ids += std::to_string(vi[i]) + ",";
ids += std::to_string(vi[i]) + ")";
stmde = "DELETE FROM activity WHERE cli_id NOT IN " + ids;
}
else {
// if vi is empty, no active client
stmde = "DELETE FROM activity";
}
res = dbhan->execute(stmde, std::vector<std::string>());
ok = !res.error();
if(res.error())
last_db_error = res.errmsg;
}
if(ok)
res = dbhan->commit_transaction();
last_db_error = res.errmsg;
}
return ok && !res.error() && last_db_error.length() == 0;
}
bool CaSupDbFuncs::unregister_activities(CaDbH *dbhan,
const std::string & cli_id,
const std::string &chan,
......@@ -328,15 +367,15 @@ bool CaSupDbFuncs::remove_from_recovery(CaDbH *h, int srv_id) {
// get all the operation IDs associated to srv_id
std::string stm = "SELECT operation_id FROM recover_activities WHERE from_srv_conf="
"(SELECT conf_id FROM service WHERE id=$1)";
// if(!res.error()) // begin transaction successful
// res = h->execute(stm, std::vector<std::string> { std::to_string(srv_id) });
// // now delete from recover_activities and recover_operations by operation_id
// for(int i = 0; !res.error() && i < res.size(); i++) {
// const std::string& opid = res.value(0, "operation_id");
// res = h->execute("DELETE FROM recover_activities WHERE operation_id=$1", std::vector<std::string> { opid });
if(!res.error()) // begin transaction successful
res = h->execute(stm, std::vector<std::string> { std::to_string(srv_id) });
// now delete from recover_activities and recover_operations by operation_id
for(int i = 0; !res.error() && i < res.size(); i++) {
const std::string& opid = res.value(0, "operation_id");
res = h->execute("DELETE FROM recover_activities WHERE operation_id=$1", std::vector<std::string> { opid });
// if(!res.error())
// res = h->execute("DELETE FROM recover_operations WHERE id=$1", std::vector<std::string> { opid });
// }
}
if(!res.error()) {
stm = "DELETE FROM recovery WHERE srv_id=$1";
res = h->execute(stm, std::vector<std::string> { std::to_string(srv_id) });
......@@ -364,12 +403,12 @@ bool CaSupDbFuncs::remove_from_recovery(CaDbH *h, const std::list<CuData> &datal
}
}
} // for datali
// for(const std::string& opid : opids) {
// if(!res.error())
// res = h->execute("DELETE FROM recover_activities WHERE operation_id=$1", std::vector<std::string> {opid});
for(const std::string& opid : opids) {
if(!res.error())
res = h->execute("DELETE FROM recover_activities WHERE operation_id=$1", std::vector<std::string> {opid});
// if(!res.error())
// res = h->execute("DELETE FROM recover_operations WHERE id=$1", std::vector<std::string> {opid});
// }
}
for(const std::string& srvco : srvconfs) {
if(!res.error())
res = h->execute("DELETE FROM recovery WHERE srv_id=(SELECT id FROM service WHERE conf_id=$1)",
......
......@@ -42,6 +42,8 @@ public:
const std::string& cid,
const std::string& src,
const std::string& chan);
bool unregister_activities_not_in(CaDbH *dbhan, const std::list<CuData> &datalist);
};
#endif // CASUPDBFUNCS_H
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment