#include "ca-tango-db-cache-mgr.h"
#include "casupjsoniz.h"
#include "casupcurl.h"
#include "catdbcacheu.h"
#include "catdbcaredisu.h"

#include <map>
#include <set>
#include <culog.h>
#include <cajson-src-bundle-ex.h>
#include <canetmsg.h>
#include <queue>
#include <cutimer.h>
#include <cutimerservice.h>
#include <cumbia.h>
#include <cueventloop.h>
#include <queue>

#define _BUFSIZ 512

class CaTgDbCSrcData {
public:
    CaTgDbCSrcData(const std::string& s) : src(s) {}
    std::string src;
};

class CaTgDbCacheMgrPrivate {
public:
    CaTgDbCacheMgrPrivate(CuLogImplI *l, const CuData& op) :
        log(l), cachedim(1e6L), redisu(op), ltag("ca-tango-db-cache-mgr") {
    }

    long cachedim, sub_rate;
    CuLogImplI *log;
    std::map<std::string, Tango::DeviceProxy *>devmap;
    // std::set is an associative container that contains a __sorted__ set of unique objects
    std::map<std::string, int> evidmap;
    CaTDBCacheU tgu; // tango utils
    CaTDBCaRedisU redisu; // redis utils
    const std::string ltag; // log "tag"

    CuTimerService tmr_s;
    CuTimer* tmr;

    std::queue<CaTgDbCSrcData> srcq;
};

CaTgDbCacheMgr::CaTgDbCacheMgr(Cumbia* cu,CuEventLoopService*loos,
                               const CuData &opts, CuLogImplI *log) {
    d = new CaTgDbCacheMgrPrivate(log, opts);
    if(opts.containsKey("cachedim")) opts["cachedim"].to<long>(d->cachedim);
    if(opts.containsKey("subscribe-rate")) opts["subscribe-rate"].to<long>(d->sub_rate);
    else d->sub_rate = 2; // default 2 per second
    d->tmr = d->tmr_s.registerListener(this, 1000/d->sub_rate, loos);
}

CaTgDbCacheMgr::~CaTgDbCacheMgr() {
    delete d;
}

void CaTgDbCacheMgr::onProgress(int step, int total, const CuData &data) { }

void CaTgDbCacheMgr::onResult(const CuData &data) {
    printf("CaTgDbCacheMgr.onResult: received %s\n", datos(data));

    // from CaReceiver_A activity
    //
    // curl -v http://woody.elettra.eu:9296 -d $'{"srcs":[{"src":"test/device/1/double_scalar"}]}'
    //
    if(data.containsKey("data")) {
        bool ok = true;
        std::vector<std::string> errors;
        int schedcnt = 0, in_queue = d->srcq.size();
        const std::string& s = data.s("data");
        CaNetMsg nm(s);
        printf("ca-tango-db-cache-mgr.onResult: received \e[1;36m%s\e[0m\n", s.c_str());
        CaJsonSrcBundleExtract bux;
        std::list<CuData> dl;
        std::string chan, id, global_m;
        bux.extract(nm.payload, dl, &chan, &id);
        if(bux.error_msg.length() > 0) {
            d->log->write(d->ltag, "Json error: " + bux.error_msg + " | msg payload " + s, CuLog::LevelError);
        }
        else {
            for(const CuData& da : dl) {
                if(da.containsKey("src")) {
                    const std::string& src = da.s("src");
                    if(d->evidmap.find(src) != d->evidmap.end()) {
                        d->log->write(d->ltag, src + " already monitored: event id " + std::to_string(d->evidmap[src]), CuLog::LevelInfo);
                    } else {
                        // >= 3 to contemplate tango:://host:port/a/b/c/att or host:port/a/b/c/att
                        ok = src.length() > 0 && std::count(src.begin(), src.end(), '/') >= 3
                                && src.find("->") == std::string::npos;
                        if(!ok)
                            errors.push_back("invalid source " + src);
                        else
                            d->srcq.emplace(CaTgDbCSrcData(src));
                        schedcnt++;
                    }
                }
            }
            if(d->srcq.size() > 0)
                d->tmr_s.restart(d->tmr, 1000/d->sub_rate);
            for(const CuData& da : dl)
                printf("ca-tango-db-cache-mgr.onResult: scheduled %s\n", datos(da));

        }
        for(const std::string& e : errors)
            d->log->write(d->ltag, e);
        int fd = data.I("fd");
        nm.setStatus(ok ? 200 : 400).setPayload(CaSupJsoniz().make_scheduled_subscribe("ok", schedcnt, in_queue, d->srcq.size(), errors));
        int byw = m_sock_write(fd, nm.raw());
        printf("[0x%lx] \e[1;32mCaSupervisor::onResult \e[0;35m %d bytes written in reply \e[0m\n", pthread_self(),  byw);
    }
    // connection / disconnection from ca-proxy
    if(data.containsKey("fd") && data.containsKey("fdopen")) {
        bool o =  data.B("fdopen");
        const int fd = data.I("fd");
        d->log->write(d->ltag, "peer " + std::string(o ? "" : "dis") + "connected: sofd " + std::to_string(fd), CuLog::LevelInfo);
        if(!o)
            if(close(fd) < 0) d->log->write(d->ltag, "error closing peer socket: " + std::string(strerror(errno)));
    }
}

void CaTgDbCacheMgr::onResult(const std::vector<CuData> &srcs) {

}

CuData CaTgDbCacheMgr::getToken() const {
    return CuData("type", "ca-tango-db-cache-mgr");
}

void CaTgDbCacheMgr::push_event(Tango::AttrConfEventData *ed) {
    if(ed->err) {
        d->log->write(d->ltag, "push_event error: " + d->tgu.tg_strerror(ed->errors) + " on attribute " + ed->attr_name.c_str());
    }
    else {
        std::string dnam = d->tgu.m_dev_get_name(ed->device);
        if(dnam.length() == 0)
            d->log->write("ca-tg-db-cache-mgr", "push_event: dev->name failed: " + d->tgu.error);
        CuData co;
        d->tgu.m_fill_from_attconf(ed->attr_conf, co);
        d->log->write(d->ltag, "configuration changed for " + ed->attr_name + ": " + datos(co), CuLog::LevelInfo );
        bool redisok = d->redisu.update(ed->attr_name, co);
        if(!redisok)
            d->log->write(d->ltag, "error updating conf data on redis: " + d->redisu.error());
        else {
            CuData out;
            d->redisu.get(ed->attr_name, out);
            printf("CaTgDbCacheMgr::push_event read just updated data for \e[1;36m%s\e[0m : \e[1;32m%s\e[0m\n",
                   ed->attr_name.c_str(), datos(out));
        }
    }
}

int CaTgDbCacheMgr::m_sock_write(int sofd, const std::string &buf) {
    int totclibw = 0, clibw = 0, wlen = buf.length();
    // write rbuf back to client fd
    while(totclibw < wlen && clibw >= 0) {
        clibw = send(sofd, buf.c_str() + totclibw, wlen - totclibw < _BUFSIZ ? wlen - totclibw : _BUFSIZ, MSG_NOSIGNAL);
        if(clibw < 0) {
            d->log->write(d->ltag, "write failed :" + std::string(strerror(errno)));
        }
        totclibw += clibw;
    }
    return clibw >=0 ? totclibw : clibw;
}

int CaTgDbCacheMgr::m_monitor(const std::string &src) {
    int evid = -1;
    // src length, !contains("->") and count('/') already checked by onTimeout
    // src not already monitored (checked within onResult)
    Tango::DeviceProxy *dev = nullptr;
    std::map<std::string, Tango::DeviceProxy *>::const_iterator it = d->devmap.find(src);
    size_t lastsep = src.rfind("/");
    const std::string &devnam = src.substr(0, lastsep);
    const std::string &attnam = src.substr(lastsep + 1);

    if(it != d->devmap.end())
        dev = it->second;
    else {
        printf("CaTgDbCacheMgr: got a new srcs dev \"%s\" attr \"%s\"\n", devnam.c_str(), attnam.c_str());
        dev = d->tgu.m_get_dev(devnam);
        if(dev != nullptr)
            d->devmap[devnam] = dev;
    }

    evid = d->tgu.m_att_conf_change_subscribe(dev, devnam, attnam, this);
    if(evid > -1) {
        d->evidmap[src] = evid;
        d->log->write(d->ltag,
                      src + " successfully subscribed to att_conf_event: id "
                      + std::to_string(evid) +
                      "[" + std::to_string(d->evidmap.size()) + "/" +
                      std::to_string(d->cachedim) + "]", CuLog::LevelInfo);
    } else {
        d->log->write(d->ltag, "error subscribing " + src + ": " + d->tgu.error);
    }

    return evid;
}

void CaTgDbCacheMgr::onTimeout(CuTimer *t) {
    bool ok = false;
    // dequeue one src only, skipping invalid srcs
    while(d->srcq.size() > 0 && !ok) {
        const CaTgDbCSrcData& sd = d->srcq.front();
        const std::string& src = sd.src;
        m_monitor(src);
        d->srcq.pop();
    }
    if(d->srcq.size() > 0)
        d->tmr_s.restart(t, 1000/d->sub_rate);
}