Newer
Older
#include "ca-tango-db-cache-mgr.h"
#include "casupjsoniz.h"
#include "casupcurl.h"
#include "catdbcacheu.h"
#include "catdbcaredisu.h"
#include <map>
#include <set>
#include <culog.h>
#include <cajson-src-bundle-ex.h>
#include <canetmsg.h>
#include <queue>
#include <cutimer.h>
#include <cutimerservice.h>
#include <cumbia.h>
#include <cueventloop.h>
#include <queue>
class CaTgDbCSrcData {
public:
CaTgDbCSrcData(const std::string& s) : src(s) {}
std::string src;
};
class CaTgDbCacheMgrPrivate {
public:
CaTgDbCacheMgrPrivate(CuLogImplI *l, const CuData& op) :
log(l), cachedim(1e6L), redisu(op), ltag("ca-tango-db-cache-mgr") {
}
CuLogImplI *log;
std::map<std::string, Tango::DeviceProxy *>devmap;
// std::set is an associative container that contains a __sorted__ set of unique objects
std::map<std::string, int> evidmap;
CaTDBCacheU tgu; // tango utils
CaTDBCaRedisU redisu; // redis utils
const std::string ltag; // log "tag"
CuTimerService tmr_s;
CuTimer* tmr;
std::queue<CaTgDbCSrcData> srcq;
CaTgDbCacheMgr::CaTgDbCacheMgr(Cumbia* cu,CuEventLoopService*loos,
const CuData &opts, CuLogImplI *log) {
d = new CaTgDbCacheMgrPrivate(log, opts);
if(opts.containsKey("cachedim")) opts["cachedim"].to<long>(d->cachedim);
if(opts.containsKey("subscribe-rate")) opts["subscribe-rate"].to<long>(d->sub_rate);
else d->sub_rate = 2; // default 2 per second
d->tmr = d->tmr_s.registerListener(this, 1000/d->sub_rate, loos);
}
CaTgDbCacheMgr::~CaTgDbCacheMgr() {
delete d;
}
void CaTgDbCacheMgr::onProgress(int step, int total, const CuData &data) { }
void CaTgDbCacheMgr::onResult(const CuData &data) {
printf("CaTgDbCacheMgr.onResult: received %s\n", datos(data));
// from CaReceiver_A activity
//
// curl -v http://woody.elettra.eu:9296 -d $'{"srcs":[{"src":"test/device/1/double_scalar"}]}'
//
if(data.containsKey("data")) {
bool ok = true;
std::vector<std::string> errors;
int schedcnt = 0, in_queue = d->srcq.size();
const std::string& s = data.s("data");
CaNetMsg nm(s);
printf("ca-tango-db-cache-mgr.onResult: received \e[1;36m%s\e[0m\n", s.c_str());
CaJsonSrcBundleExtract bux;
std::list<CuData> dl;
std::string chan, id, global_m;
bux.extract(nm.payload, dl, &chan, &id);
if(bux.error_msg.length() > 0) {
d->log->write(d->ltag, "Json error: " + bux.error_msg + " | msg payload " + s, CuLog::LevelError);
}
else {
for(const CuData& da : dl) {
if(da.containsKey("src")) {
const std::string& src = da.s("src");
if(d->evidmap.find(src) != d->evidmap.end()) {
d->log->write(d->ltag, src + " already monitored: event id " + std::to_string(d->evidmap[src]), CuLog::LevelInfo);
} else {
// >= 3 to contemplate tango:://host:port/a/b/c/att or host:port/a/b/c/att
ok = src.length() > 0 && std::count(src.begin(), src.end(), '/') >= 3
&& src.find("->") == std::string::npos;
if(!ok)
errors.push_back("invalid source " + src);
else
d->srcq.emplace(CaTgDbCSrcData(src));
schedcnt++;
}
}
}
if(d->srcq.size() > 0)
d->tmr_s.restart(d->tmr, 1000/d->sub_rate);
printf("ca-tango-db-cache-mgr.onResult: scheduled %s\n", datos(da));
for(const std::string& e : errors)
d->log->write(d->ltag, e);
nm.setStatus(ok ? 200 : 400).setPayload(CaSupJsoniz().make_scheduled_subscribe("ok", schedcnt, in_queue, d->srcq.size(), errors));
int byw = m_sock_write(fd, nm.raw());
printf("[0x%lx] \e[1;32mCaSupervisor::onResult \e[0;35m %d bytes written in reply \e[0m\n", pthread_self(), byw);
}
// connection / disconnection from ca-proxy
if(data.containsKey("fd") && data.containsKey("fdopen")) {
bool o = data.B("fdopen");
const int fd = data.I("fd");
d->log->write(d->ltag, "peer " + std::string(o ? "" : "dis") + "connected: sofd " + std::to_string(fd), CuLog::LevelInfo);
if(!o)
if(close(fd) < 0) d->log->write(d->ltag, "error closing peer socket: " + std::string(strerror(errno)));
}
}
void CaTgDbCacheMgr::onResult(const std::vector<CuData> &srcs) {
}
CuData CaTgDbCacheMgr::getToken() const {
return CuData("type", "ca-tango-db-cache-mgr");
}
void CaTgDbCacheMgr::push_event(Tango::AttrConfEventData *ed) {
if(ed->err) {
d->log->write(d->ltag, "push_event error: " + d->tgu.tg_strerror(ed->errors) + " on attribute " + ed->attr_name.c_str());
}
std::string dnam = d->tgu.m_dev_get_name(ed->device);
if(dnam.length() == 0)
d->log->write("ca-tg-db-cache-mgr", "push_event: dev->name failed: " + d->tgu.error);
CuData co;
d->tgu.m_fill_from_attconf(ed->attr_conf, co);
d->log->write(d->ltag, "configuration changed for " + ed->attr_name + ": " + datos(co), CuLog::LevelInfo );
bool redisok = d->redisu.update(ed->attr_name, co);
if(!redisok)
d->log->write(d->ltag, "error updating conf data on redis: " + d->redisu.error());
else {
CuData out;
d->redisu.get(ed->attr_name, out);
printf("CaTgDbCacheMgr::push_event read just updated data for \e[1;36m%s\e[0m : \e[1;32m%s\e[0m\n",
ed->attr_name.c_str(), datos(out));
}
}
}
int CaTgDbCacheMgr::m_sock_write(int sofd, const std::string &buf) {
int totclibw = 0, clibw = 0, wlen = buf.length();
// write rbuf back to client fd
while(totclibw < wlen && clibw >= 0) {
clibw = send(sofd, buf.c_str() + totclibw, wlen - totclibw < _BUFSIZ ? wlen - totclibw : _BUFSIZ, MSG_NOSIGNAL);
if(clibw < 0) {
d->log->write(d->ltag, "write failed :" + std::string(strerror(errno)));
}
totclibw += clibw;
}
return clibw >=0 ? totclibw : clibw;
}
int CaTgDbCacheMgr::m_monitor(const std::string &src) {
// src length, !contains("->") and count('/') already checked by onTimeout
// src not already monitored (checked within onResult)
Tango::DeviceProxy *dev = nullptr;
std::map<std::string, Tango::DeviceProxy *>::const_iterator it = d->devmap.find(src);
size_t lastsep = src.rfind("/");
const std::string &devnam = src.substr(0, lastsep);
const std::string &attnam = src.substr(lastsep + 1);
if(it != d->devmap.end())
dev = it->second;
else {
printf("CaTgDbCacheMgr: got a new srcs dev \"%s\" attr \"%s\"\n", devnam.c_str(), attnam.c_str());
dev = d->tgu.m_get_dev(devnam);
if(dev != nullptr)
d->devmap[devnam] = dev;
}
evid = d->tgu.m_att_conf_change_subscribe(dev, devnam, attnam, this);
if(evid > -1) {
d->evidmap[src] = evid;
d->log->write(d->ltag,
src + " successfully subscribed to att_conf_event: id "
+ std::to_string(evid) +
"[" + std::to_string(d->evidmap.size()) + "/" +
std::to_string(d->cachedim) + "]", CuLog::LevelInfo);
} else {
d->log->write(d->ltag, "error subscribing " + src + ": " + d->tgu.error);
void CaTgDbCacheMgr::onTimeout(CuTimer *t) {
bool ok = false;
// dequeue one src only, skipping invalid srcs
while(d->srcq.size() > 0 && !ok) {
const CaTgDbCSrcData& sd = d->srcq.front();
const std::string& src = sd.src;
m_monitor(src);
d->srcq.pop();
}
if(d->srcq.size() > 0)
d->tmr_s.restart(t, 1000/d->sub_rate);
}