Skip to content
Snippets Groups Projects
Commit 3b208e93 authored by Giacomo Strangolino's avatar Giacomo Strangolino
Browse files

imported project

parent 9c1702ad
No related branches found
No related tags found
No related merge requests found
#include "casupjsoniz.h"
#include <nlohmann/json.hpp>
#include <cudata.h>
CaSupJsoniz::CaSupJsoniz()
{
}
/*!
* \brief CaSupJsoniz::jsonize returns a vector of json requests grouped by cli_id
* \param dl vector of data as CuData
* \return a vector of json requests to use with caserver, each element refers to a different cli_id
*/
std::map<std::string, std::string> CaSupJsoniz::jsonize(const std::vector<CuData> &dl,
std::map<std::string, std::list<CuData>> & clidmap) const {
std::map<std::string, std::string> jsonma;
// group by cli_id
for(const CuData& d : dl) {
clidmap[d["cli_id"].toString()].push_back(d);
}
for(std::map<std::string, std::list<CuData>>::const_iterator it = clidmap.begin(); it != clidmap.end(); ++it) {
nlohmann::json src_array;
for(const CuData& d : it->second) {
nlohmann::json data_o;
nlohmann::json o, options;
// OPTIONS
data_o["subscribe-only"] = "true";
data_o["value-only"] = "true";
// option names list
options.push_back("subscribe-only");
options.push_back("value-only");
options.push_back("recovered-from-srv-conf");
options.push_back("recovered-by");
// more option names may follow:
//
// options.push_back("blabla");
// data_o["blabla"] = "blabbbla";
// "options" key contains the list of option keys
data_o["options"] = options;
// END OPTIONS
//
// from source (db table name) to src
data_o["src"] = d["source"].toString();
data_o["method"] = "S";
data_o["recovered-from-srv-conf"] = d["conf_id"].toString();
data_o["recovered-by"] = "casupervisor";
data_o["channel"] = d.s("chan").length() > 0 ? d.s("chan") : d.s("channel");
src_array.push_back(data_o);
}
nlohmann::json req;
req["srcs"] = src_array;
req["id"] = it->first;
jsonma[it->first] = req.dump() + "\r\n\r\n";
}
return jsonma;
}
void CaSupJsoniz::extract(const std::string &json, std::list<CuData> &dl) const {
}
const std::string CaSupJsoniz::make_msg_ok() const {
nlohmann::json o;
o["msg"] = "ok";
return o.dump();
}
const std::string CaSupJsoniz::make_err_msg(const std::string &msg) const
{
nlohmann::json o;
o["msg"] = msg.length() == 0 ? "ok" : msg;
o["err"] = msg.length() > 0;
return o.dump();
}
bool CaSupJsoniz::is_err_msg(const std::string &json, std::string &errmsg) const
{
errmsg.clear();
if(json.length() > 0)
{
try {
nlohmann::json js = nlohmann::json::parse(json);
if(js.is_array()) {
for (nlohmann::json jse : js) {
if(jse.contains("err")) {
bool err = jse["err"];
if(err && jse.contains("msg"))
errmsg = jse["msg"];
}
}
}
} catch (const nlohmann::detail::parse_error& pe) {
perr("CaSupJsoniz::is_err_msg: JSON parse error: %s in \"%s\"", pe.what(), json.c_str());
errmsg = std::string(pe.what());
} catch (const nlohmann::detail::type_error& te) {
perr("CaSupJsoniz::is_err_msg: nlohmann::json type error: %s", te.what());
errmsg = std::string(te.what());
}
}
return errmsg.length() > 0;
}
#ifndef CASUPJSONIZ_H
#define CASUPJSONIZ_H
#include <string>
#include <map>
#include <vector>
#include <list>
class CuData;
// Jsonizer
class CaSupJsoniz
{
public:
CaSupJsoniz();
std::map<std::string, std::string> jsonize(const std::vector<CuData>&dl,
std::map<std::string, std::list<CuData> > &clidmap) const;
void extract(const std::string& json, std::list<CuData>& dl) const;
const std::string make_msg_ok() const;
const std::string make_err_msg(const std::string& msg) const;
bool is_err_msg(const std::string& json, std::string& errmsg) const;
};
#endif // CASUPJSONIZ_H
#include "casupredistrib.h"
#include "casupjsoniz.h"
#include "casupcurl.h"
#include "casuprecoverxmitevent.h"
#include <cumacros.h>
#include <map>
#include <casuplog.h>
#include <cumbia.h>
#include <cuactivity.h>
class CaSupRedistribPrivate {
public:
CuLogImplI *log_i;
Cumbia *cumbia;
};
CaSupRedistrib::CaSupRedistrib(const std::string& url, bool ssl_verify_peer, CuLogImplI *li, Cumbia *cu) {
d = new CaSupRedistribPrivate;
d->sucu = new CaSupCurl(url, ssl_verify_peer);
d->log_i = li;
d->cumbia = cu;
}
CaSupRedistrib::~CaSupRedistrib() {
delete d->sucu;
delete d;
}
void CaSupRedistrib::onProgress(int step, int total, const CuData &data) {
}
void CaSupRedistrib::onResult(const CuData &data) {
// if(data.has("type", "init")) {
// bool err = data["err"].toBool();
// d->log_i->write("ca-supervisor", data["msg"].toString(), err ? CuLog::LevelError : CuLog::LevelInfo);
// if(err) {
// perr("ca-supervisor: %s", vtoc2(data, "msg"));
// exit(EXIT_FAILURE);
// }
// }
}
void CaSupRedistrib::onResult(const std::vector<CuData> &datalist)
{
}
CuData CaSupRedistrib::getToken() const {
return CuData("type", "casupredistrib");
}
#ifndef CASUPREDISTRIB_H
#define CASUPREDISTRIB_H
#include <cuthreadlistener.h>
class CaSupRedistribPrivate;
class CuLogImplI;
class Cumbia;
class CaSupRedistrib : public CuThreadListener
{
public:
CaSupRedistrib(const std::string& url, bool ssl_verify_peer, CuLogImplI *li, Cumbia *cu);
virtual ~CaSupRedistrib();
// CuThreadListener interface
public:
void onProgress(int step, int total, const CuData &data);
void onResult(const CuData &data);
void onResult(const std::vector<CuData> &datalist);
CuData getToken() const;
private:
CaSupRedistribPrivate *d;
};
#endif // CASUPREDISTRIB_H
#include "catdbcacheu.h"
#include <cudata.h>
CaTDBCacheU::CaTDBCacheU()
{
}
Tango::DeviceProxy *CaTDBCacheU::m_get_dev(const std::string &nam) {
error.clear();
Tango::DeviceProxy *dev = nullptr;
try {
dev = new Tango::DeviceProxy(nam.c_str());
}
catch(const Tango::DevFailed& e) {
dev = nullptr;
error = std::string("device ") + nam + " connection error: " + tg_strerror(e);
}
return dev;
}
int CaTDBCacheU::m_att_conf_change_subscribe(Tango::DeviceProxy *dev, const std::string&devna, const string &attna, Tango::CallBack* cb) {
int evid = -1;
error.clear();
try {
evid = dev->subscribe_event(attna, Tango::ATTR_CONF_EVENT, cb, true);
}
catch(const Tango::DevFailed& e) {
error = "device " + devna + " subscribe error: " + tg_strerror(e);
}
return evid;
}
std::string CaTDBCacheU::m_dev_get_name(Tango::DeviceProxy *dev) {
std::string n;
error.clear();
try {
n = dev->name();
}
catch(const Tango::DevFailed& e) {
error = "dev->name failed: " + tg_strerror(e);
}
return n;
}
std::string CaTDBCacheU::m_mkattsrc(const string &dev, const string &att) const {
return dev + "/" + att;
}
void CaTDBCacheU::m_fill_from_attconf(const Tango::AttributeInfoEx *ai, CuData &dat)
{
dat["type"] = "property";
dat["df"] = ai->data_format;
dat["dfs"] = format_to_str(ai->data_format); /* as string */
dat["dt"] = ai->data_type;
dat["description"] = ai->description;
ai->display_unit != std::string("No display unit") ? dat["display_unit"] = ai->display_unit : dat["display_unit"] = "";
dat["format"] = ai->format;
dat["label"] = ai->label;
dat["max_alarm"] = ai->max_alarm;
dat["max_dim_x"] = ai->max_dim_x;
dat["max_dim_y"] = ai->max_dim_y;
dat["max"] = ai->max_value;
dat["min"] = ai->min_value;
dat["min_alarm"] = ai->min_alarm;
dat["name"] = ai->name;
dat["standard_unit"] = ai->standard_unit;
dat["unit"] = ai->unit;
dat["writable"] = ai->writable;
dat["writable_attr_name"] = ai->writable_attr_name;
dat["disp_level"] = ai->disp_level;
dat["root_attr_name"] = ai->root_attr_name; // Root attribute name (in case of forwarded attribute)
Tango::AttributeAlarmInfo aai = ai->alarms;
dat["delta_t"] = aai.delta_t;
dat["delta_val"] = aai.delta_val;
dat["max_alarm"] = aai.max_alarm;
dat["min_alarm"] = aai.min_alarm;
dat["max_warning"] = aai.max_warning;
dat["min_warning"] = aai.min_warning;
Tango::AttributeEventInfo ei = ai->events;
dat["archive_abs_change"] = ei.arch_event.archive_abs_change;
dat["archive_period"] = ei.arch_event.archive_period;
dat["archive_rel_change"] = ei.arch_event.archive_rel_change;
dat["abs_change"] = ei.ch_event.abs_change;
dat["rel_change"] = ei.ch_event.rel_change;
dat["periodic_period"] = ei.per_event.period;
// dim_x property contains the actual number of x elements
long int dimx = dat["value"].getSize(); // if !contains value, empty variant, 0 dimx
if(dimx > 0)
dat["dim_x"] = dimx;
}
string CaTDBCacheU::format_to_str(Tango::AttrDataFormat f) const {
switch(f) {
case Tango::SCALAR:
return "scalar";
case Tango::SPECTRUM:
return "vector";
case Tango::IMAGE:
return "matrix";
default:
return "data format unknown";
}
}
std::string CaTDBCacheU::tg_strerror(const Tango::DevFailed &e) const
{
std::string msg;
if(e.errors.length() > 0)
msg = tg_strerror(e.errors);
return msg;
}
#ifndef CATDBCACHEU_H
#define CATDBCACHEU_H
#include <tango.h>
#include <string>
class CuData;
class CaTDBCacheU
{
public:
CaTDBCacheU();
Tango::DeviceProxy *m_get_dev(const std::string &nam);
int m_att_conf_change_subscribe(Tango::DeviceProxy *dev, const string &devna, const std::string& attna, Tango::CallBack *cb);
// -----------------------------------------------------------------------------+
// copied from CuTangoWorld (to avoid including cumbia-tango as dependency)
void m_fill_from_attconf(const Tango::AttributeInfoEx *ai, CuData &dat);
std::string tg_strerror(const Tango::DevFailed &e) const;
string format_to_str(Tango::AttrDataFormat f) const;
// -----------------------------------------------------------------------------+
std::string error;
std::string m_dev_get_name(Tango::DeviceProxy *dev);
std::string m_mkattsrc(const std::string& dev, const std::string& att) const;
std::string m_mkcmdsrc(const std::string& dev, const std::string& cmdnam) const;
};
#endif // CATDBCACHEU_H
#include "catdbcaredisu.h"
#include <cudata.h>
#include <optional>
#include <chrono>
class CaTDBCaRedisU_P {
public:
CaTDBCaRedisU_P() : redis(nullptr) {}
sw::redis::Redis *redis;
std::string error;
// attribute configuration keys used by cumbia apps
// see CuTangoWorld::fillFromAttributeConfig and
// CaTDBCacheU::m_fill_from_attconf
//
// NOTE
// keeping a fixed list of keys lets us use hmget
// avoiding hgetall() and hkeys():
// It's always a bad idea to call `hkeys` on a large hash, since it will block Redis.
// It's always a bad idea to call `hgetall` on a large hash, since it will block Redis.
// https://github.com/sewenew/redis-plus-plus/blob/master/src/sw/redis%2B%2B/redis.h
std::vector<std::string> attcnf_keys;
};
CaTDBCaRedisU::CaTDBCaRedisU(const CuData &o) {
d = new CaTDBCaRedisU_P;
// Connection options: see also https://github.com/sewenew/redis-plus-plus#connection-options
sw::redis::ConnectionOptions co;
if(o.containsKey("redis_ho")) co.host = o.s("redis_ho");
if(o.s("redis_po").length() > 0 && atoi(o.s("redis_po").c_str()) > 0) {
co.port = atoi(o.s("redis_po").c_str());
}
// You don't need to check whether Redis object connects to server successfully.
// If Redis fails to create a connection to Redis server, or the connection is
// broken at some time, it throws an exception of type Error when you try to send
// command with Redis
// (https://github.com/sewenew/redis-plus-plus#api-reference)
d->redis = new sw::redis::Redis(co);
}
CaTDBCaRedisU::~CaTDBCaRedisU() {
delete d->redis;
delete d;
}
bool CaTDBCaRedisU::update(const std::string &src, const CuData &c) {
d->error.clear();
if(d->attcnf_keys.size() == 0)
d->attcnf_keys = c.keys();
std::unordered_map<std::string, std::string> m;
for(const std::string& k : d->attcnf_keys)
m[k] = c.s(k);
try {
d->redis->hmset(src, m.begin(), m.end());
}
catch(const sw::redis::Error& e) {
d->error = std::string(e.what());
}
return d->error.length() == 0;
}
bool CaTDBCaRedisU::get(const std::string& src, CuData& out) {
d->error.clear();
std::vector<std::optional<std::string> > vals;
try {
d->attcnf_keys.push_back("crappuzzang");
std::chrono::steady_clock::time_point begin = std::chrono::steady_clock::now();
d->redis->hmget(src, d->attcnf_keys.begin(), d->attcnf_keys.end(), std::back_inserter(vals));
std::chrono::steady_clock::time_point end = std::chrono::steady_clock::now();
printf("CaTDBCaRedisU::get: hmget took %ldus\n", std::chrono::duration_cast<std::chrono::microseconds>(end - begin).count());
int i = 0;
for(const std::optional<std::string>& val : vals) {
const std::string& k = d->attcnf_keys[i++];
if(val) {
printf("CaTDBCaRedisU.get: %s --> %s\n", k.c_str(), val->c_str());
out[k] = val.value();
}
else {
out[k] = "";
printf("CaTDBCaRedisU.get: %s --> null value\n", k.c_str());
}
}
}
catch(const sw::redis::Error& e) {
d->error = std::string(e.what());
}
return d->error.length() == 0;
}
std::string CaTDBCaRedisU::error() const {
return d->error;
}
#ifndef CATDBCACHEREDISU_H
#define CATDBCACHEREDISU_H
#include <sw/redis++/redis++.h>
class CaTDBCaRedisU_P;
class CuData;
class CaTDBCaRedisU
{
public:
CaTDBCaRedisU(const CuData &o);
~CaTDBCaRedisU();
bool update(const std::string& src, const CuData& c);
std::string error() const;
bool get(const std::string &src, CuData &out);
private:
CaTDBCaRedisU_P *d;
};
#endif // CATDBCACHEREDISU_H
#ifndef CONFIG_H
#define CONFIG_H
#define CASUPERVISOR_VERSION __CASUPERVISOR_VERSION__
#endif // CONFIG_H
#include <stdio.h>
#include <stdlib.h>
#include <cumacros.h>
#include <cumbia.h>
#include <cueventloop.h>
#include <signal.h>
#include <cuserviceprovider.h>
#include <cuthreadfactoryimpl.h>
#include <cuthreadseventbridge.h>
#include <pwd.h>
#include <unistd.h>
#include <calog.h>
#include "config.h"
#include "casupredistrib.h"
#include "ca-tango-db-cache-mgr.h"
#include <ca-receiver-a.h>
#include <ca-opt-parser.h>
CuEventLoopService *loo_s = nullptr;
void on_int(int signo) {
if(signo == SIGINT || signo == SIGTERM) {
if(loo_s)
loo_s->exit();
}
}
int drop_privileges(const char* user, char* msg) {
struct passwd *p = getpwnam(user);
if(p == nullptr) {
snprintf(msg, 512, "failed to find UID for user \"%s\": %s", user, strerror(errno));
return 1;
}
if(setgid(p->pw_gid) != 0) {
snprintf(msg, 512, "unable to drop group privileges to GID %d: %s", p->pw_gid, strerror(errno));
return 1;
}
if(setuid(p->pw_uid) != 0) {
snprintf(msg, 512, "unable to drop user privileges to UID %d: %s", p->pw_uid, strerror(errno));
return 1;
}
if (setuid(0) != -1) {
snprintf(msg, 512, "setuid back to zero succeeded, quitting as this is a security risk");
return 1;
}
msg[0] = '\0';
return 0;
}
int main(int argc, char *argv[]) {
CaOptParser options;
CuData opts = options.get_options(argc, argv, "ca-tango-db-cache-mgr");
if(options.errors.size() > 0) {
perr("main.cpp: error in command line args: \n");
for(const std::string& e : options.errors)
perr("- %s\n", e.c_str());
return EXIT_FAILURE;
}
// logging - 1. level
CaLogFactory logfa;
CuLogImplI *log_i = nullptr;
CuLog::Level logle = static_cast<CuLog::Level>(opts["log_level"].toInt());
// logging - 2. where
// drop privileges after this so that the log file can be created anywhere
// CaFLog will change the file ownership to the unprivileged user after creation
if(opts.has("log_where", "syslog")) log_i = logfa.create(CaLogFactory::Syslog, logle);
else if(opts["log_where"].toString() != "console") log_i = logfa.create(CaLogFactory::LogFile, logle, opts["log_where"].toString().c_str(), opts.containsKey("user") ? vtoc2(opts, "user") : nullptr);
else log_i = logfa.create(CaLogFactory::LogConsole, logle);
// end logging setup
log_i->write("main.cpp", "enter: " + std::string(argv[0]) + " version " + std::string(CATANGODBCACHEMGR_VERSION), CuLog::LevelAll);
if(getuid() == 0 && !opts.containsKey("user")) {
log_i->write("main.cpp", "cannot run as root and \"-s username\" command line argument missing", CuLog::LevelError);
return EXIT_FAILURE;
}
else if(getuid() == 0 && opts.containsKey("user")) {
char msg[512];
if(drop_privileges(vtoc2(opts, "user"), msg) != 0) {
log_i->write("main.cpp", msg, CuLog::LevelError);
return EXIT_FAILURE;
}
}
printf("main.cpp: options %s\n", datos(opts));
if(!opts.containsKey("redis_ho") || !opts.containsKey("host") || !opts.containsKey("port")) {
perr("main.cpp: one or more options \"redis_ho=\", \"host\" or \"port\" missing in the configuration file \"%s\"",
vtoc2(opts, "dbfile"));
return EXIT_FAILURE;
}
else {
signal(SIGINT, on_int);
signal(SIGTERM, on_int);
Cumbia *cumbia = new Cumbia();
// The event loop is a single object shared across all Cumbia instances
loo_s = new CuEventLoopService();
// register the service with the shared option set to true
cumbia->getServiceProvider()->registerSharedService(CuServices::EventLoop, loo_s);
CuData camgrtok("type", "catgcachemgr");
camgrtok.merge(opts);
CuThreadFactoryImpl* thf_impl = new CuThreadFactoryImpl;
CuThreadsEventBridgeFactory *thread_eb_f = new CuThreadsEventBridgeFactory;
CaTgDbCacheMgr *cachemgr = new CaTgDbCacheMgr(opts, log_i);
CaReceiver_A *server_a = nullptr;
printf("\e[1;32m # \e[0mmain.cpp: starting receiver on %s\n", datos(opts));
server_a = new CaReceiver_A(opts);
cumbia->registerActivity(server_a, cachemgr, CuData("type", "catgcachemgr-receiver"), *thf_impl, *thread_eb_f);
printf("\e[1;32m # \e[0mmain.cpp: \e[1;32mca-tango-db-cache-mgr version \e[2;32m%s\e[0m started\n", CATANGODBCACHEMGR_VERSION);
loo_s->exec(false);
server_a->stop();
cumbia->unregisterActivity(server_a);
delete cumbia;
printf("\e[1;32m # \e[0mmain.cpp: ca-tango-db-cache-mgr exiting\n");
log_i->write("ca-tango-db-cache-mgr", "exiting", CuLog::LevelAll);
delete log_i;
}
}
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment