Commit e5156829 authored by Giacomo Strangolino's avatar Giacomo Strangolino
Browse files

minor fixes

parent df26c6d2
......@@ -12,6 +12,7 @@
#include <culog.h>
#include <cadbhandle.h>
#include <cajson-src-bundle-ex.h>
#include <stdexcept>
#define _BUFSIZ 512
......@@ -49,8 +50,17 @@ void CaSupervisor::onResult(const CuData &data) {
else {
for(const CuData& da : dl)
printf("ca-supervisor.onResult: extracted %s chan %s id %s global method \"%s\"\n", datos(da), chan.c_str(), id.c_str(), global_m.c_str());
CaSupDbAEvent *e = new CaSupDbAEvent(atol(extram["srv_id"].c_str()), std::stoull(id.c_str()), chan, dl);
d->dba->new_event(e);
try {
const unsigned long long clid = id.length() > 0 ? std::stoull(id.c_str()) : 0;
const long srvid = extram["srv_id"].length() > 0 ? std::stol(extram["srv_id"].c_str()) : -1;
// ca-proxy sets the method globally for "u" and "s" so that it's easy for us
// to determine whether to insert or remove activities (see CaSupDbActivity::execute)
CaSupDbAEvent *e = new CaSupDbAEvent(srvid, clid, chan, global_m, dl);
d->dba->new_event(e);
}
catch(const std::invalid_argument& ia) {
d->log->write("ca-supervisor", std::string("invalid argument: ") + ia.what());
}
}
......
......@@ -105,10 +105,12 @@ void CaSupDbActivity::execute() {
dbf.set_last_recover_successful(d->dbh, e->success);
}
else if (e->type == CaSupDbAEvent::DbQuery) {
const std::list<CuData> &dali = e->datalist;
printf(" [0x%lx ] CaSupDbActivity.execute: db query event data - server ID %ld\n", pthread_self(), e->server_id);
for(const CuData& da : dali)
dbf.register_activity(d->dbh, e->server_id, e->cli_id, da.s("src"), da.s("channel"), "", "");
// ca-proxy divides subscribe requests from unsubscribes
printf(" [0x%lx ] CaSupDbActivity.execute: db query event data - server ID %ld method %s\n", pthread_self(), e->server_id, e->method.c_str());
if(e->method == "s")
dbf.register_activities(d->dbh, e->server_id, e->cli_id, e->channel, e->datalist, "", "");
else if(e->method == "u")
dbf.unregister_activities(d->dbh, e->server_id, e->cli_id, e->channel, e->datalist);
}
delete e;
......
......@@ -14,8 +14,12 @@ class CaSupDbAEvent {
public:
enum CaSupDbAEventType { DbQuery, RecoverSuccessful };
CaSupDbAEvent(long srv_id, const unsigned long long &clid, const std::string& chan, const std::list<CuData>& dlist)
: server_id(srv_id), cli_id(clid), channel(chan), type(DbQuery) {
CaSupDbAEvent(long srv_id,
const unsigned long long &clid,
const std::string& chan,
const std::string& met,
const std::list<CuData>& dlist)
: server_id(srv_id), cli_id(clid), channel(chan), method(met), type(DbQuery) {
datalist = std::move(dlist);
}
......@@ -23,7 +27,7 @@ public:
long server_id;
unsigned long long cli_id;
std::string channel;
std::string channel, method;
CaSupDbAEventType type;
std::list<CuData> datalist;
bool success;
......
......@@ -18,7 +18,7 @@ CaSupDbFuncs::CaSupDbFuncs() { }
std::vector<CuData> CaSupDbFuncs::get_active_services(CaDbH *dbh) {
// 1. select ACTIVE ALIVE (expected >= current timestamp)
std::string stm = "SELECT service.*,srvconf.srvnam,srvconf.addr,srvconf.port,current_timestamp "
"FROM service,srvconf WHERE state=$1 AND expected >= (SELECT current_timestamp) AND service.conf_id=srvconf.conf_id";
"FROM service,srvconf WHERE state=$1 AND expected >= (SELECT current_timestamp) AND service.conf_id=srvconf.conf_id";
CaDbRes res = dbh->execute(stm, std::vector<std::string> { "ACTIVE" });
std::vector<CuData> v = m_res_to_data(res);
for(CuData &da : v) {
......@@ -30,9 +30,9 @@ std::vector<CuData> CaSupDbFuncs::get_active_services(CaDbH *dbh) {
// excluded those services listed in the recovery table
//
stm = "SELECT service.*,srvconf.srvnam,srvconf.addr,srvconf.port,current_timestamp "
"FROM service,srvconf WHERE state=$1 AND expected < (SELECT current_timestamp) AND service.conf_id=srvconf.conf_id "
"AND service.id IN (SELECT srv_id FROM activity) "
"AND id NOT IN (SELECT srv_id FROM recovery WHERE started > current_timestamp - interval '30m' )";
"FROM service,srvconf WHERE state=$1 AND expected < (SELECT current_timestamp) AND service.conf_id=srvconf.conf_id "
"AND service.id IN (SELECT srv_id FROM activity) "
"AND id NOT IN (SELECT srv_id FROM recovery WHERE started > current_timestamp - interval '30m' )";
res = dbh->execute(stm, std::vector<std::string> { "ACTIVE" });
std::vector<CuData> vlate_recov = m_res_to_data(res);
for(CuData &da : vlate_recov) {
......@@ -46,8 +46,8 @@ std::vector<CuData> CaSupDbFuncs::get_active_services(CaDbH *dbh) {
// listed in the recovery table
//
stm = "SELECT service.*,srvconf.srvnam,srvconf.addr,srvconf.port,current_timestamp "
"FROM service,srvconf WHERE state=$1 AND expected < (SELECT current_timestamp) AND service.conf_id=srvconf.conf_id "
"AND id IN (SELECT srv_id FROM recovery WHERE started > current_timestamp - interval '30m' )";
"FROM service,srvconf WHERE state=$1 AND expected < (SELECT current_timestamp) AND service.conf_id=srvconf.conf_id "
"AND id IN (SELECT srv_id FROM recovery WHERE started > current_timestamp - interval '30m' )";
res = dbh->execute(stm, std::vector<std::string> { "ACTIVE" });
std::vector<CuData> vlate_norecov = m_res_to_data(res);
for(CuData &da : vlate_norecov) {
......@@ -58,8 +58,8 @@ std::vector<CuData> CaSupDbFuncs::get_active_services(CaDbH *dbh) {
v.insert(v.end(), vlate_norecov.begin(), vlate_norecov.end());
stm = "SELECT service.*,srvconf.srvnam,srvconf.addr,srvconf.port,current_timestamp "
"FROM service,srvconf WHERE state=$1 AND expected < (SELECT current_timestamp) AND service.conf_id=srvconf.conf_id "
"AND id NOT IN (SELECT srv_id FROM recovery WHERE started > current_timestamp - interval '30m' )";
"FROM service,srvconf WHERE state=$1 AND expected < (SELECT current_timestamp) AND service.conf_id=srvconf.conf_id "
"AND id NOT IN (SELECT srv_id FROM recovery WHERE started > current_timestamp - interval '30m' )";
res = dbh->execute(stm, std::vector<std::string> { "ACTIVE" });
std::vector<CuData> vlate_zombie = m_res_to_data(res);
for(CuData &da : vlate_zombie) {
......@@ -76,7 +76,7 @@ std::vector<CuData> CaSupDbFuncs::get_active_services(CaDbH *dbh) {
std::vector<CuData>* CaSupDbFuncs::get_srcs_for_srv_id(CaDbH *h, int id) {
std::string stm = "SELECT activity.source,activity.chan,service.conf_id from activity,service "
" WHERE service.id=activity.srv_id AND service.id=$1";
" WHERE service.id=activity.srv_id AND service.id=$1";
CaDbRes res = h->execute(stm, std::vector<std::string> { std::to_string(id) });
std::vector<CuData> *v = new std::vector<CuData>();
*v = m_res_to_data(res);
......@@ -140,75 +140,113 @@ bool CaSupDbFuncs::insert_recover_operation_start(CaDbH *h, int superv_id, int f
return !r.error();
}
bool CaSupDbFuncs::register_activity(CaDbH *dbhan, long s_id,
unsigned long long cli_id,
const std::string &src,
const std::string &chan,
const std::string& amsg,
const std::string &recovered_from_conf_id) {
printf("CaSupDbFuncs::register_activity \e[1;33m!!!\e[0m check logic and optimize transaction %ld %llu \"%s\" chan \"%s\"\n", s_id, cli_id, src.c_str(), chan.c_str());
std::ostringstream o;
std::string stm;
/// NOTE:
// see Notes on activities in the caserver database plugin design rationale
//
// is there an activity recorded with src and chan? Yes, update srv_id
stm = "SELECT srv_id FROM activity WHERE source=$1 AND chan=$2 AND cli_id=$3";
CaDbRes res = dbhan->execute(stm, std::vector<std::string> { src, chan, std::to_string(cli_id) });
if(!res.error() && res.size() > 0) {
// same source, same chan, same client id
stm = "UPDATE activity SET srv_id=$1 WHERE source=$2 AND chan=$3";
res = dbhan->execute(stm, std::vector<std::string> { std::to_string(s_id), src, chan });
}
else if(!res.error()) {
stm = "INSERT INTO activity (srv_id,cli_id,source,chan) VALUES($1,$2,$3,$4)";
res = dbhan->execute(stm, std::vector<std::string> { std::to_string(s_id), std::to_string(cli_id), src, chan });
bool CaSupDbFuncs::register_activities(CaDbH *dbhan, long s_id,
unsigned long long cli_id,
const std::string& chan,
const std::list<CuData>& dali,
const std::string& amsg,
const std::string &recovered_from_conf_id) {
last_db_error.clear();
CaDbRes res = dbhan->begin_transaction();
for(const CuData& da : dali) {
if(!res.error()) {
const std::string& src = da.s("src");
printf("CaSupDbFuncs::register_activity \e[1;33m!!!\e[0m check logic and optimize transaction %ld %llu \"%s\" chan \"%s\"\n", s_id, cli_id, src.c_str(), chan.c_str());
std::ostringstream o;
std::string stm;
/// NOTE:
// see Notes on activities in the caserver database plugin design rationale
//
// is there an activity recorded with src and chan? Yes, update srv_id
stm = "SELECT srv_id FROM activity WHERE source=$1 AND chan=$2 AND cli_id=$3";
res = dbhan->execute(stm, std::vector<std::string> { src, chan, std::to_string(cli_id) });
if(!res.error() && res.size() > 0) {
// same source, same chan, same client id
stm = "UPDATE activity SET srv_id=$1 WHERE source=$2 AND chan=$3";
res = dbhan->execute(stm, std::vector<std::string> { std::to_string(s_id), src, chan });
}
else if(!res.error()) {
stm = "INSERT INTO activity (srv_id,cli_id,source,chan) VALUES($1,$2,$3,$4)";
res = dbhan->execute(stm, std::vector<std::string> { std::to_string(s_id), std::to_string(cli_id), src, chan });
}
msg = o.str();
last_db_error = res.errmsg;
} // ! err
}
// if(!res.error()) {
// // history
// if(recovered_from_conf_id.length() > 0) {
// stm = "INSERT INTO activity_history (srv_conf_id,source,chan,started,stopped,state,message,recovered_from_srv_conf_id) VALUES "
// "( (SELECT conf_id FROM service WHERE id=$1),$2,$3,current_timestamp,NULL,$4,$5,$6)";
// res = dbhan->execute(stm, std::vector<std::string> {std::to_string(s_id), src, chan, "SUBSCRIBED", amsg, recovered_from_conf_id } );
// }
// else {
// stm = "INSERT INTO activity_history (srv_conf_id,source,chan,started,stopped,state) VALUES "
// "( (SELECT conf_id FROM service WHERE id=$1),$2,$3,current_timestamp,NULL,$4)";
// res = dbhan->execute(stm, std::vector<std::string> {std::to_string(s_id), src, chan, "SUBSCRIBED" } );
// }
// }
msg = o.str();
last_db_error = res.errmsg;
return !res.error() && msg.length() == 0;
if(!res.error())
res = dbhan->commit_transaction();
return !res.error() && last_db_error.length() == 0;
}
bool CaSupDbFuncs::unregister_activity(CaDbH *dbhan, long s_id, unsigned long long cli_id, const std::string &src, const std::string& chan) {
std::ostringstream o;
std::string stm = "SELECT srv_id FROM activity WHERE srv_id=$1 AND source=$2 AND chan=$3 AND cli_id=$4";
CaDbRes res = dbhan->execute(stm, std::vector<std::string> { std::to_string(s_id), src, chan, std::to_string(cli_id) });
// printf("CaDbUtils::unregister_activity query %s id %d src %s chan %s cli id %ull, result size %ld\n", stm.c_str(), s_id, src.c_str(), chan.c_str(), res.size(), cli_id);
bool CaSupDbFuncs::m_do_unregister_a(CaDbH *dbhan, const std::string& s_id,
const std::string& s_clid, const std::string &chan, const CuData &da) {
const std::string& src = da.s("src");
// std::ostringstream o;
std::string stm;
CaDbRes res;
if(src.length() > 0 && chan.length() > 0 && s_clid.length() > 0 && s_id.length() > 0) {
stm = "SELECT srv_id FROM activity WHERE srv_id=$1 AND source=$2 AND chan=$3 AND cli_id=$4";
res = dbhan->execute(stm, std::vector<std::string> { s_id, src, chan, s_clid });
} else if(src.length() > 0 && s_clid.length() > 0 && s_id.length() > 0) {
stm = "SELECT srv_id FROM activity WHERE srv_id=$1 AND source=$2 AND cli_id=$3";
res = dbhan->execute(stm, std::vector<std::string> { s_id, src, s_clid });
}
else if(s_clid.length() > 0 && s_id.length() > 0) {
stm = "SELECT srv_id FROM activity WHERE srv_id=$1 AND cli_id=$2";
res = dbhan->execute(stm, std::vector<std::string> { s_id, s_clid });
}
if(!res.error() && res.size() == 1) {
stm = "DELETE FROM activity WHERE srv_id=$1 AND source=$2 AND chan=$3 AND cli_id=$4";
res = dbhan->execute(stm, std::vector<std::string> { std::to_string(s_id), src, chan, std::to_string(cli_id)});
// printf("CaDbUtils::unregister_activity: deleted '%s' chan '%s' from activity table for service %d\n", src.c_str(), chan.c_str(), s_id);
if(src.length() > 0 && chan.length() > 0) {
stm = "DELETE FROM activity WHERE srv_id=$1 AND source=$2 AND chan=$3 AND cli_id=$4";
res = dbhan->execute(stm, std::vector<std::string> { s_id, src, chan, s_clid});
} else if(src.length() > 0) {
stm = "DELETE FROM activity WHERE srv_id=$1 AND source=$2 AND cli_id=$3";
res = dbhan->execute(stm, std::vector<std::string> { s_id, s_clid});
} else {
stm = "DELETE FROM activity WHERE srv_id=$1 AND cli_id=$2";
res = dbhan->execute(stm, std::vector<std::string> { s_id, s_clid});
}
}
else if(!res.error()) {
o << "unregister_activity failed: expected activity for source " << src << " on server ID " << s_id << " not recorded on database";
// msg = o.str();
if(res.error())
last_db_error = res.errmsg;
return !res.error();
}
bool CaSupDbFuncs::unregister_activities(CaDbH *dbhan,
long s_id,
unsigned long long cli_id,
const std::string& chan,
const std::list<CuData> &dali) {
last_db_error.clear();
CaDbRes res = dbhan->begin_transaction();
bool ok = !res.error();
for(const CuData& da : dali) {
if(ok) {
const std::string& src = da.s("src");
std::string s_clid;
if(cli_id > 0) { // global cli_id
s_clid = std::to_string(cli_id);
printf("CaSupDbFuncs.unregister_activities (with global cli ID): s_id %s client %s chan %s data %s\n", std::to_string(s_id).c_str(), s_clid.c_str(), chan.c_str(), datos(da));
ok = m_do_unregister_a(dbhan, std::to_string(s_id), s_clid, chan, da);
}
else if(da.containsKey("ids")) {
// multiple client ids within a single CuData (see cajson-src-bundle-ex in caserver-lib)
// protocol recommends when a client exits sends a global unsubscribe by client id
// {"ids":["27"],"method":"u","srv_id":"1"}
const std::vector<std::string> & vids = da["ids"].toStringVector();
for(const std::string& id : vids) {
if(ok) {
printf("CaSupDbFuncs.unregister_activities (with IDs): s_id %s client %s chan %s data %s\n", std::to_string(s_id).c_str(), id.c_str(), chan.c_str(), datos(da));
ok = m_do_unregister_a(dbhan, std::to_string(s_id), id, chan, da);
}
}
}
} // !err
}
// if(!res.error()) {
// // history
// stm = "UPDATE activity_history SET stopped=current_timestamp,state=$1 WHERE "
// "srv_conf_id=(SELECT conf_id FROM service WHERE id=$2) AND source=$3 AND stopped IS NULL AND chan=$4";
// res = dbhan->execute(stm, std::vector<std::string> { "UNSUBSCRIBED", std::to_string(s_id), src, chan });
// if(res.affected_rows != 1)
// o << "unregister_activity unexpected database history entries for source " << src << " on service id " << s_id
// << " expected 1 found " << res.affected_rows;
// }
msg = o.str();
last_db_error = res.errmsg;
return !res.error() && msg.length() == 0;
if(ok)
res = dbhan->commit_transaction();
return ok && !res.error() && last_db_error.length() == 0;
}
......@@ -230,19 +268,19 @@ std::vector<CuData> CaSupDbFuncs::m_res_to_data(const CaDbRes &res) {
* \return true if database operation is successful, false otherwise
*/
bool CaSupDbFuncs::insert_recovery_data(CaDbH *h, int srv_id,int superv_id) {
std::string stm = "UPDATE recovery SET started=current_timestamp WHERE srv_id=$1";
CaDbRes res = h->execute(stm, std::vector<std::string> { std::to_string(srv_id) });
if(!res.error() && res.affected_rows == 0) {
stm = "INSERT INTO recovery (id,srv_id,started,supervisor_id) VALUES(DEFAULT,$1,current_timestamp,$2)";
res = h->execute(stm, std::vector<std::string> { std::to_string(srv_id), std::to_string(superv_id) });
}
last_db_error = res.errmsg;
return !res.error();
std::string stm = "UPDATE recovery SET started=current_timestamp WHERE srv_id=$1";
CaDbRes res = h->execute(stm, std::vector<std::string> { std::to_string(srv_id) });
if(!res.error() && res.affected_rows == 0) {
stm = "INSERT INTO recovery (id,srv_id,started,supervisor_id) VALUES(DEFAULT,$1,current_timestamp,$2)";
res = h->execute(stm, std::vector<std::string> { std::to_string(srv_id), std::to_string(superv_id) });
}
last_db_error = res.errmsg;
return !res.error();
}
bool CaSupDbFuncs::remove_from_recovery(CaDbH *h, int srv_id) {
std::string stm = "DELETE FROM recovery WHERE srv_id=$1";
CaDbRes res = h->execute(stm, std::vector<std::string> { std::to_string(srv_id) });
last_db_error = res.errmsg;
return !res.error();
std::string stm = "DELETE FROM recovery WHERE srv_id=$1";
CaDbRes res = h->execute(stm, std::vector<std::string> { std::to_string(srv_id) });
last_db_error = res.errmsg;
return !res.error();
}
......@@ -2,6 +2,7 @@
#define CASUPDBFUNCS_H
#include <vector>
#include <list>
#include <cudata.h>
class CaDbH;
......@@ -24,8 +25,22 @@ public:
int register_instance(CaDbH *h, const std::string &url);
bool unregister_instance(CaDbH *h, int id);
bool insert_recover_operation_start(CaDbH *h, int superv_id, int from_srv_id, const std::vector<CuData> *srcs);
bool unregister_activity(CaDbH *dbhan, long s_id, unsigned long long cli_id, const std::string &src, const std::string &chan);
bool register_activity(CaDbH *dbhan, long s_id, unsigned long long cli_id, const std::string &src, const std::string &chan, const std::string &amsg, const std::string &recovered_from_conf_id);
bool unregister_activities(CaDbH *dbhan, long s_id, unsigned long long cli_id,
const std::string& chan, const std::list<CuData> &dali);
bool register_activities(CaDbH *dbhan,
long s_id,
unsigned long long cli_id,
const std::string& chan,
const std::list<CuData> &dali,
const std::string &amsg,
const std::string &recovered_from_conf_id);
private:
bool m_do_unregister_a(CaDbH *dbhan,
const std::string &s_id,
const std::string &cli_id,
const std::string& chan,
const CuData &da);
};
#endif // CASUPDBFUNCS_H
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment