Commit 35408429 authored by Giacomo Strangolino's avatar Giacomo Strangolino
Browse files
parents b33956ec 42667343
......@@ -10,6 +10,8 @@ The supervision and failover relies on the contributions of
## Definitions
- *system*: the set of running cooperating services that make up the framework: several instances of caservers, ca-proxies, ca-supervisors, nginx.
- *caserver-delay*: the critical delay after the *expected* renewal of the *service* heartbeat. After this critical delay, a
recovery operation shall be undertaken.
......@@ -31,8 +33,9 @@ The supervision and failover relies on the contributions of
2. the dedicated caserver [plugin](https://gitlab.elettra.eu/puma/server/ca3-db-plugin) is loaded.
3. one *ca-proxy* talks with only one *caserver async* (and one only *caserver sync*, but the latter is not relevant in the
recovery context)
3. one *ca-proxy* talks with only one *caserver async* (and one only *caserver sync*, but the latter is not relevant in the recovery context)
4. *ca-proxies* do not store any state in memory, so that they can be restarted at any time without losing information, and failures have limited consequences, ideally compromising only the operation contextual to the failure.
## ca-proxy tasks
......@@ -71,10 +74,12 @@ of the *ca-proxy* depends on the request:
2. The *ca-supervisor* shall record a new *activity* into the *activities* database when a corresponding event (a new successfull source monitoring subscription) is received from [ca-proxy](https://gitlab.elettra.eu/puma/server/caserver-proxy).
3. The *ca-supervisor* shall delete an *activity* record from the *activities* database when a corresponding event (a successfull source unsubscribe) is received from [ca-proxy](https://gitlab.elettra.eu/puma/server/caserver-proxy).
3. The *ca-supervisor* shall delete an *activity* record from the *activities* database when a corresponding event (a source unsubscribe, successful or not) is received from [ca-proxy](https://gitlab.elettra.eu/puma/server/caserver-proxy).
4. The *ca-supervisor* shall support a *global* unsubscribe operation identified by the client ID indicated by the *ca-proxy*.
5. The *ca-supervisor* shall support multiple instances of the service. It must be designed so that two instances never modify the state of the *system* concurrently. In othe words, the design shall prevent two instances to make the same operation at the same time. For example, if a supervisor starts a recovery operation on a set of orphan activities, it shall delete them from the database before proceeding, so that another instance does not attempt to do the same operation on the same activities right after.
#### 1. fails
The *ca-supervisor* shall wait for some time (the failing *caserver* may be in the restart process), check again the *expected*
......@@ -84,7 +89,7 @@ timestamp and redistribute the load if the *expected* has not been updated
*caserver (async)* tasks are carried out by means of an additional [plugin](https://gitlab.elettra.eu/puma/server/ca3-db-plugin).
The caserver (*async* instance only) shall periodically update the *expected* timestamp so that *ca-supervisor* stays happy.
The caserver (*async* instance only) shall periodically update the *expected* timestamp so that *ca-supervisor* is aware the specific instance is active.
### caserver stopped
......
-std=c17
\ No newline at end of file
// Add predefined macros for your project here. For example:
// #define THE_ANSWER 42
-std=c++17
\ No newline at end of file
casupervisor-taeyang.conf
meson.build
src/ca-supervisor.cpp
src/ca-supervisor.h
src/casupcurl.cpp
src/casupcurl.h
src/casupdbfuncs.cpp
src/casupdbfuncs.h
src/casupdbmon.cpp
src/casupdbmon.h
src/casupjsoniz.cpp
src/casupjsoniz.h
src/casuplog.cpp
src/casuplog.h
src/casuprecoverxmitevent.cpp
src/casuprecoverxmitevent.h
src/casupredistrib.cpp
src/casupredistrib.h
src/config.h
src/main.cpp
src
src/modules
/usr/local/cumbia-libs/include/cumbia
/usr/include/openssl
/usr/local/include
src/engines
src/json
.
src/output
subproject
src/auth-impls
/usr/local/caserver-lib/include
# database for service and activity records
ca-supervisor:dbuser=www-data
ca-supervisor:dbpass=cadbdb
ca-supervisor:dbhost=localhost
ca-supervisor:dbnam=cadb
# bind to
ca-supervisor:host=192.168.205.25
ca-supervisor:port=9295
user=www-data
pass=cadbdb
host=pwma-dev.elettra.trieste.it
# database for service and activity records
dbuser=www-data
dbpass=cadbdb
dbhost=localhost
dbnam=cadb
# bind to
host=192.168.205.25
port=9295
......@@ -14,6 +14,7 @@ curldep = dependency('libcurl')
# for activities / threads
cumbiadep = dependency('cumbia', version: '>=1.4.0')
systemd_dep = dependency('systemd')
caserverlib_dep = dependency('caserver-lib')
nlohmann_json_dep = dependency('nlohmann_json', required : false)
if not nlohmann_json_dep.found()
......@@ -37,6 +38,7 @@ deps = [ dependency('threads'),
nlohmann_json_dep,
curldep,
cumbiadep,
caserverlib_dep,
systemd_dep ]
......@@ -94,10 +96,8 @@ endif
headers = [
'src/config.h',
'src/ca-supervisor.h',
'src/casupopt.h',
'src/casupdbmon.h',
'src/casupredistrib.h',
'src/casupdbhandle.h',
'src/casupdbfuncs.h',
'src/casupjsoniz.h',
'src/casupcurl.h',
......@@ -108,10 +108,8 @@ headers = [
sources = [
'src/ca-supervisor.cpp',
'src/main.cpp',
'src/casupopt.cpp',
'src/casupdbmon.cpp',
'src/casupredistrib.cpp',
'src/casupdbhandle.cpp',
'src/casupdbfuncs.cpp',
'src/casupjsoniz.cpp',
'src/casupcurl.cpp',
......
......@@ -3,3 +3,16 @@
CaSupervisor::CaSupervisor() {
}
void CaSupervisor::onProgress(int step, int total, const CuData &data) { }
void CaSupervisor::onResult(const CuData &data) {
printf("CaSupervisor.onResult: received %s\n", datos(data));
}
void CaSupervisor::onResult(const std::vector<CuData> &datalist) { }
CuData CaSupervisor::getToken() const {
return CuData("type", "ca-supervisor");
}
......@@ -2,10 +2,19 @@
#define CASUPERVISOR_H
class CaSupervisor
#include <cuthreadlistener.h>
class CaSupervisor : public CuThreadListener
{
public:
CaSupervisor();
// CuThreadListener interface
public:
void onProgress(int step, int total, const CuData &data);
void onResult(const CuData &data);
void onResult(const std::vector<CuData> &datalist);
CuData getToken() const;
};
#endif // CASUPERVISOR_H
#include "casupdbhandle.h"
#include <cumacros.h>
#include <algorithm>
#include <string>
#include <sstream>
#include <arpa/inet.h> // host for postgres is IP addr or name
CaSupDbRes::CaSupDbRes(PGresult *res)
{
int status = PQresultStatus(res);
affected_rows = atoi(PQcmdTuples(res));
bool ok = (status == PGRES_COMMAND_OK || status == PGRES_TUPLES_OK);
if (!ok)
errmsg = std::string(PQresultErrorMessage(res)) + " status: " + PQresStatus(PQresultStatus(res));
size_t nrows = PQntuples(res);
int ncols = PQnfields(res);
results.resize(nrows);
for(size_t i = 0; i < nrows; i++) {
for(int col = 0; col < ncols; col++)
results[i].push_back(std::string(PQgetvalue(res, i, col)));
for(int col = 0; col < ncols; col++)
header.push_back(PQfname(res, col));
}
}
void CaSupDbRes::print() const {
for(size_t i = 0; i < header.size(); i++)
printf("%20s|", header[i].c_str());
printf("\n");
for(size_t i = 0; i < results.size(); i++) {
for(size_t r = 0 ; r < results[i].size(); r++)
printf("%20s|", results[i][r].c_str());
printf("\n");
}
}
bool CaSupDbRes::error() const {
return errmsg.length() > 0;
}
std::string CaSupDbRes::value(int row, const std::string &col) const {
std::vector<std::string>::const_iterator it = std::find(header.begin(), header.end(), col);
if(it == header.end())
return std::string();
if(results.size() > static_cast<size_t>(row))
return results[row][std::distance(header.begin(), it)];
return std::string();
}
int CaSupDbRes::find_row(const std::string &col, const std::string &search) const {
int cidx = -1;
std::vector<std::string>::const_iterator it = std::find(header.begin(), header.end(), col);
if(it != header.end()) {
cidx = std::distance(header.begin(), it);
for(size_t i = 0; i < results.size(); i++)
if(results[i].size() > static_cast<size_t>(cidx) && results[i][cidx] == search)
return i;
}
return -1;
}
std::vector<std::string> CaSupDbRes::get_row(int idx) const {
return results.size() > static_cast<size_t>(idx) && idx >= 0 ? results[idx] : std::vector<std::string>();
}
bool CaSupDbH::connect() {
std::stringstream ss;
unsigned char buf[sizeof(struct in6_addr)];
if(user.length() > 0) ss << "user=" << user << " ";
if(password.length() > 0 ) ss << "password=" << password << " ";
if(dbnam.length() > 0) ss << "dbname=" << dbnam << " ";
// host as address or name ?
if(host.length() > 0 && inet_pton(AF_INET, host.c_str(), buf) > 0) ss << "hostaddr=" << host;
else if(host.length() > 0) ss << "host=" << host;
if(port.length() > 0 && atoi(port.c_str()) > 0) ss << "port=" << port << " ";
conn = PQconnectdb(ss.str().c_str());
if (PQstatus(conn) == CONNECTION_BAD) {
msg = "connection to database failed: " + std::string(PQerrorMessage(conn));
} else {
connected = true;
std::stringstream ss;
ss << "casupdbhandle: connected to server version " << PQserverVersion(conn) << " user \"" << PQuser(conn) << "\" db: \"" << PQdb(conn) << "\"";
if(host.length() > 0) ss << " host \"" << host << "\"";
if(port.length() > 0) ss << " port " << port.c_str();
msg = ss.str();
}
return connected;
}
CaSupDbRes CaSupDbH::execute(const std::string &stmt, const std::vector<std::string> &params) {
int nParams = static_cast<int>(params.size());
const char **paramValues = (const char **) malloc(sizeof(char *) * nParams);
for(int i = 0; i < nParams; i++)
paramValues[i] = params[i].c_str();
PGresult *res = PQexecParams(conn, stmt.c_str(), nParams, nullptr, paramValues, nullptr, nullptr, 0);
CaSupDbRes cares(res);
free(paramValues);
PQclear(res);
return cares;
}
CaSupDbH::~CaSupDbH() {
if(conn)
PQfinish(conn);
}
size_t CaSupDbRes::size() const {
return results.size();
}
#ifndef CASUPDBH_H
#define CASUPDBH_H
#include <string>
#include <vector>
#include <map>
#include <libpq-fe.h>
class CaSupDbRes {
public:
CaSupDbRes(PGresult *res);
virtual ~CaSupDbRes() {}
// stores the result of the previous execute
std::vector <std::vector < std::string> > results;
std::vector<std::string> header;
std::string errmsg;
int affected_rows;
std::string value(int row, const std::string& col) const;
int find_row(const std::string& col, const std::string& search) const;
std::vector<std::string> get_row(int idx) const;
size_t size() const;
void print() const;
bool error() const;
};
class CaSupDbH { // ca db handle
public:
CaSupDbH(const std::string& u, const std::string& p, const std::string& db, const std::string& ho = "localhost", const std::string& _port = "") :
conn(nullptr), connected(false), user(u), password(p) , dbnam(db), host(ho), port(_port) {
}
~CaSupDbH();
bool connect();
CaSupDbRes execute(const std::string& stmt, const std::vector<std::string> &params = std::vector<std::string>());
PGconn *conn;
bool connected;
std::string user, password, dbnam, host, port, msg;
};
#endif // CADBHANDLE_H
#include "casupopt.h"
#include <unistd.h>
#include <culog.h>
#include <string>
#include <iostream>
#include <fstream>
CaSupOpt::CaSupOpt() {
}
CuData CaSupOpt::parse_cmdline(int argc, char *argv[]) {
CuData res;
int ch;
errors.clear();
std::string log_where, user;
int log_lev = CuLog::LevelError;
while ((ch = getopt(argc, argv, "h:c:f:u:c:kiwvs:t:")) != -1) {
switch (ch) {
case 'c':
res["dbfile"] = std::string(optarg);
break;
case 'k':
res["curlopt_ssl_verifypeer"] = false;
break;
case 'u':
res["url"] = std::string(optarg);
break;
case 'f':
log_where = std::string(optarg);
break;
case 'i':
log_lev = CuLog::LevelInfo;
break;
case 'w':
log_lev = CuLog::LevelWarn;
break;
case 'v':
log_lev = CuLog::LevelDebug;
break;
case 's':
res["user"] = std::string(optarg);
break;
case 't':
try {
res["mon_interval"] = std::atoi(optarg);
} catch (const std::invalid_argument& ia) {
errors.push_back("option '-t': invalid argument " + std::string(optarg) + ": " + ia.what());
}
break;
case '?':
default:
usage(argv[0]);
}
}
argc -= optind;
argv += optind;
// defaults
if(!res.containsKey("curlopt_ssl_verifypeer"))
res["curlopt_ssl_verifypeer"] = true;
res["log_where"] = log_where.size() == 0 ? "console" : log_where;
res["log_level"] = log_lev;
return res;
}
std::string CaSupOpt::usage(const char *a) const
{
return std::string(a) + "-c file where file is the name of the database configuration file\n"
"-u \e[1;32mnginx_pub_url\e[0m is a compulsory argument in the form http[s]://nginx.server.url:PORT/pub\n"
" where PORT is the number configured under the http/server \"listen\" directive\n"
" in the nginx.conf file. The \"/pub\" section must match the \"nchan_publisher\n"
" location directive under the related http/server location in nginx.conf.\n"
"-k disable curl SSL verify peer option (default: verify SSL peer)\n"
"[-w (log level warn)] [-i (log level info)] [-v (log level debug)]\n\n";
}
CuData CaSupOpt::db_params(const std::string &file) {
errors.clear();
CuData res;
std::string line;
std::ifstream f(file);
if(f.is_open()) {
while(std::getline(f, line)) {
if(line.find("user=") == 0)
res["dbu"] = line.substr(strlen("user="));
else if(line.find("pass=") == 0)
res["dbp"] = line.substr(strlen("pass="));
else if(line.find("host=") == 0)
res["dbh"] = line.substr(strlen("host="));
else if(line.find("dbnam=") == 0)
res["dbna"] = line.substr(strlen("dbnam="));
else if(line.find("port=") == 0) {
try {
res["port"] = line.substr(strlen("port="));
} catch (const std::invalid_argument& ia) {
errors.push_back("invalid port in line " + line + ": " + ia.what());
}
}
}
}
else
errors.push_back("error opening file \"" + file + "\" in read only mode: " + std::string(strerror(errno)));
return res;
}
#ifndef CASUPOPTPARSER_H
#define CASUPOPTPARSER_H
#include <cudata.h>
class CaSupOpt
{
public:
CaSupOpt();
CuData parse_cmdline(int argc, char *argv[]);
std::string usage(const char *a) const;
CuData db_params(const std::string& file);
std::vector<std::string> errors;
};
#endif // CASUPOPTPARSER_H
......@@ -11,10 +11,13 @@
#include <unistd.h>
#include "config.h"
#include "casupopt.h"
#include "casupdbmon.h"
#include "casupredistrib.h"
#include "casuplog.h"
#include "ca-supervisor.h"
#include <ca-receiver-a.h>
#include <ca-opt-parser.h>
CuEventLoopService *loo_s = nullptr;
......@@ -48,8 +51,8 @@ int drop_privileges(const char* user, char* msg) {
}
int main(int argc, char *argv[]) {
CaSupOpt options;
CuData opts = options.parse_cmdline(argc, argv);
CaOptParser options;
CuData opts = options.get_options(argc, argv, "ca-supervisor");
if(options.errors.size() > 0) {
perr("main.cpp: error in command line args: \n");
for(const std::string& e : options.errors)
......@@ -85,18 +88,11 @@ int main(int argc, char *argv[]) {
}
}
if(opts.containsKey("dbfile")){
opts.merge(options.db_params(opts["dbfile"].toString()));
}
else {
perr("main.cpp: missing command line option \"-c db_conf_file\"\n");
return EXIT_FAILURE;
}
if(!opts.containsKey("url")) {
perr("main.cpp: missing command line option \"-u nginx_url\", i.e. \"-u http://woody.elettra.eu:8001\"");
return EXIT_FAILURE;
}
if(!opts.containsKey("dbu") || !opts.containsKey("dbp") || !opts.containsKey("dbh")) {
if(!opts.containsKey("dbuser") || !opts.containsKey("dbpass") || !opts.containsKey("dbhost")) {
perr("main.cpp: one or more options \"user=\" \"host=\" \"pass=\" are missing "
"in the database configuration file \"%s\"", vtoc2(opts, "dbfile"));
return EXIT_FAILURE;
......@@ -122,6 +118,13 @@ int main(int argc, char *argv[]) {
CaSupDbMon *sup = new CaSupDbMon(montok, log_i);
cumbia->registerActivity(sup, redistrib, opts, *thf_impl, *thread_eb_f);
CaSupervisor *supervisor = new CaSupervisor();
CaReceiver_A *server_a = nullptr;
printf("\e[1;32m # \e[0mmain.cpp: starting receiver on %s\n", datos(opts));
server_a = new CaReceiver_A(opts);
cumbia->registerActivity(server_a, supervisor, CuData("type", "ca-supervisor-receiver"), *thf_impl, *thread_eb_f);
printf("\e[1;32m # \e[0mmain.cpp: \e[1;32mcasupervisor version \e[2;32m%s\e[0m started\n", CASUPERVISOR_VERSION);
loo_s->exec(false);
cumbia->unregisterActivity(sup);
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment