From ea99c75fb8f4d568497c9068fd2b01a6cb641385 Mon Sep 17 00:00:00 2001 From: gscalamera <graziano.scalamera@elettra.eu> Date: Thu, 15 Sep 2016 16:05:36 +0200 Subject: [PATCH] event subscription moved to thread --- src/Alarm.cpp | 323 ++++++++++++--- src/Alarm.h | 31 +- src/Alarm.xmi | 8 +- src/AlarmClass.cpp | 13 + src/SubscribeThread.cpp | 109 +++++ src/SubscribeThread.h | 97 +++++ src/event_table.cpp | 860 +++++++++++++++++++++++++++++++++++++++- src/event_table.h | 82 +++- 8 files changed, 1429 insertions(+), 94 deletions(-) create mode 100644 src/SubscribeThread.cpp create mode 100644 src/SubscribeThread.h diff --git a/src/Alarm.cpp b/src/Alarm.cpp index 2d7c109..93180ff 100644 --- a/src/Alarm.cpp +++ b/src/Alarm.cpp @@ -180,13 +180,11 @@ void Alarm::delete_device() abortflag = true; DEBUG_STREAM << "Alarm::delete_device(): after abortflag=true..." << endl; try { - events->unsubscribe(); + events->unsubscribe_events(); } catch (string& err) { ERROR_STREAM << err << endl; } DEBUG_STREAM << "Alarm::delete_device(): events unsubscribed!" << endl; - events->free_proxy(); - DEBUG_STREAM << "Alarm::delete_device(): device proxy deleted!" << endl; /* * kill alarm thread */ @@ -290,6 +288,7 @@ void Alarm::init_device() abortflag = false; instanceCounter++; events = new event_table(this); + thread = new SubscribeThread(this); //because of static map<string, unsigned int> grp_str and of exception while subscribing //more than one time the same event in the same executable, control the number of instances if(instanceCounter > 1) @@ -313,6 +312,7 @@ void Alarm::init_device() /*----- PROTECTED REGION ID(Alarm::init_device) ENABLED START -----*/ // Initialize device + thread->period = subscribeRetryPeriod; #ifdef _USE_ELETTRA_DB_RW host_rw = ""; @@ -598,17 +598,39 @@ void Alarm::init_device() alarms.startup_complete = gettime(); //enable actions execution in 10 seconds - ecb.init(&evlist); + //TODO:ecb.init(&evlist); for(map<string, vector<string> >::iterator al_ev_it=alarm_event.begin(); \ al_ev_it!=alarm_event.end(); al_ev_it++) { alarm_container_t::iterator i = alarms.v_alarm.find(al_ev_it->first); if(i != alarms.v_alarm.end()) { -#if TANGO_VER < 611 +#if 1 try { - add_event(i->second, al_ev_it->second); - subscribe_event(i->second, ecb, al_ev_it->second); + //add_event(i->second, al_ev_it->second); + //TODO:subscribe_event(i->second, ecb, al_ev_it->second); + for (vector<string>::iterator j = al_ev_it->second.begin(); j != al_ev_it->second.end(); j++) + { + vector<event>::iterator k = \ + find(events->v_event.begin(), events->v_event.end(), *j); + if (k == events->v_event.end()) //if not already present + { + string name=*j; + vector<string> context;//TODO + events->add(name, context); + } + } + add_event(i->second, al_ev_it->second);//moved after events->add + for (vector<string>::iterator j = al_ev_it->second.begin(); j != al_ev_it->second.end(); j++) + { + vector<event>::iterator k = \ + find(events->v_event.begin(), events->v_event.end(), *j); + if (k == events->v_event.end()) //if not already present + { + string name=*j; + events->start(name); + } + } } catch (string& err) { WARN_STREAM << "Alarm::init_device(): " << err << endl; for(vector<string>::iterator j=al_ev_it->second.begin(); j!=al_ev_it->second.end(); j++) @@ -646,6 +668,19 @@ void Alarm::init_device() } } +#if 0 + //now subscribe all events + for (vector<string>::iterator j = evn.begin(); j != evn.end(); j++) + { + vector<event>::iterator k = \ + find(events->v_event.begin(), events->v_event.end(), *j); + if (k != events->v_event.end()) + { + + } + } +#endif + /* * update event table with fresh-subscribed event[s] data @@ -676,6 +711,10 @@ void Alarm::init_device() updateloop = new update_thread(this); updateloop->start(); + + thread->start(); + + events->start_all(); set_state(Tango::RUNNING); set_status("Alarm server is running"); @@ -717,6 +756,7 @@ void Alarm::get_device_property() dev_prop.push_back(Tango::DbDatum("DbName")); dev_prop.push_back(Tango::DbDatum("DbPort")); dev_prop.push_back(Tango::DbDatum("InstanceName")); + dev_prop.push_back(Tango::DbDatum("SubscribeRetryPeriod")); // is there at least one property to be read ? if (dev_prop.size()>0) @@ -830,6 +870,17 @@ void Alarm::get_device_property() // And try to extract InstanceName value from database if (dev_prop[i].is_empty()==false) dev_prop[i] >> instanceName; + // Try to initialize SubscribeRetryPeriod from class property + cl_prop = ds_class->get_class_property(dev_prop[++i].name); + if (cl_prop.is_empty()==false) cl_prop >> subscribeRetryPeriod; + else { + // Try to initialize SubscribeRetryPeriod from default device value + def_prop = ds_class->get_default_device_property(dev_prop[i].name); + if (def_prop.is_empty()==false) def_prop >> subscribeRetryPeriod; + } + // And try to extract SubscribeRetryPeriod value from database + if (dev_prop[i].is_empty()==false) dev_prop[i] >> subscribeRetryPeriod; + } /*----- PROTECTED REGION ID(Alarm::get_device_property_after) ENABLED START -----*/ @@ -1377,6 +1428,7 @@ void Alarm::load(Tango::DevString argin) (const char*)err.c_str(), \ (const char*)"Alarm::load()", Tango::ERR); } +#if 0 try { add_event(alm, evn); } catch (string& err) { @@ -1399,11 +1451,83 @@ void Alarm::load(Tango::DevString argin) (const char*)err.c_str(), \ (const char*)"Alarm::load()", Tango::ERR); } +#endif string cmd_name_full = alm.cmd_name_a + string(";") + alm.cmd_name_n; alarms.log_alarm_db(TYPE_LOG_DESC_ADD, ts, alm.name, "", "", //add new alarm on log before subscribe event alm.formula, alm.time_threshold, alm.grp2str(), alm.lev, alm.msg, cmd_name_full, alm.silent_time); //but if it fails remove it from table + + + + + alarm_container_t::iterator i = alarms.v_alarm.find(alm.name); + if(i != alarms.v_alarm.end()) + { + try { + //add_event(i->second, al_ev_it->second); + + for(vector<string>::iterator j = evn.begin(); j != evn.end(); j++) + { + vector<event>::iterator k = \ + find(events->v_event.begin(), events->v_event.end(), *j); + if (k == events->v_event.end()) //if not already present + { + string name=*j; + vector<string> context;//TODO + events->add(name, context, UPDATE_PROP, false);//throws exception if already present + } + } + add_event(i->second, evn);//moved after events->add + for(vector<string>::iterator j = evn.begin(); j != evn.end(); j++) + { + vector<event>::iterator k = \ + find(events->v_event.begin(), events->v_event.end(), *j); + if (k != events->v_event.end()) //if already present + { + string name=*j; + events->start(name);//throws exception if not found + } + } + } catch (string& err) { + WARN_STREAM << "Alarm::"<<__func__<<": string exception=" << err << endl; + //TODO: handle error +#if 0 + for(vector<string>::iterator j = evn.begin(); j != evn.end(); j++) + { + DEBUG_STREAM << "Alarm::"<<__func__<<": Removing alarm=" << i->second.name << " from event=" << *j << endl; + vector<event>::iterator k = \ + find(events->v_event.begin(), events->v_event.end(), *j); + if (k != events->v_event.end()) + { + k->pop_alarm(i->second.name); //remove alarm/formula just added to event + DEBUG_STREAM << "Alarm::"<<__func__<<": Removed!!!! alarm=" << i->second.name << " from event=" << *j << endl; + if(k->m_alarm.empty()) + { + events->v_event.erase(k); //remove event just added to event_table + DEBUG_STREAM << "Alarm::"<<__func__<<": event=" << *j << " no more used, REMOVED!!!" << endl; + } + } + } + set_internal_alarm(INTERNAL_ERROR, gettime(), err); +#endif + } + catch (Tango::DevFailed &e) { + WARN_STREAM << "Alarm::"<<__func__<<": Tango exception=" << e.errors[0].desc << endl; + } + } + + + + + + + + +#if 0//TODO + try { - subscribe_event(alm, ecb, evn); + //TODO:subscribe_event(alm, ecb, evn); + vector<string> contexts;//TODO + events->add(alm.name, contexts, UPDATE_PROP, false); } catch (string& err) { WARN_STREAM << "Alarm::load(): " << err << endl; #ifndef _RW_LOCK @@ -1427,6 +1551,8 @@ void Alarm::load(Tango::DevString argin) (const char*)"Alarm::load()", Tango::ERR); } +#endif + if(alm.cmd_name_a.length() > 0) { #ifndef _RW_LOCK @@ -1921,7 +2047,7 @@ void Alarm::modify(Tango::DevString argin) { DEBUG_STREAM << "Alarm::Modify() - " << device_name << endl; /*----- PROTECTED REGION ID(Alarm::modify) ENABLED START -----*/ - + DEBUG_STREAM << "Alarm::Modify: " << argin << endl; // Add your own code //------------------------------ //1: parse to get alarm name @@ -2012,7 +2138,7 @@ void Alarm::modify(Tango::DevString argin) (const char*)__func__, Tango::ERR); } - + DEBUG_STREAM << "Alarm::Modify: parsing ended: alm name=" << alm.name << endl; //------------------------------ //2: if alarm already exist and // formula is not changed @@ -2172,6 +2298,7 @@ void Alarm::modify(Tango::DevString argin) //3: remove (set active=0 on db) //------------------------------ remove((Tango::DevString)alm.name.c_str()); + DEBUG_STREAM << "Alarm::Modify: removed alm name=" << alm.name << endl; //------------------------------ //4: load modified alarm //------------------------------ @@ -2329,7 +2456,7 @@ void Alarm::load_alarm(string alarm_string, alarm_t &alm, vector<string> &evn) Tango::Except::throw_exception( \ (const char*)"Parsing Failed!", \ (const char*)o.str().c_str(), \ - (const char*)"Alarm::load()", Tango::ERR); + (const char*)"Alarm::load_alarm()", Tango::ERR); } alm.ts = gettime(); DEBUG_STREAM << "Alarm::load_alarm(): name = '" << alm.name << "'" << endl; @@ -2375,6 +2502,7 @@ void Alarm::load_alarm(string alarm_string, alarm_t &alm, vector<string> &evn) (const char*)"Alarm::load_alarm()", Tango::ERR); } } +#if 0 void Alarm::init_alarms(map< string,vector<string> > &alarm_events) { #ifndef _RW_LOCK @@ -2415,6 +2543,7 @@ void Alarm::init_alarms(map< string,vector<string> > &alarm_events) alarms.vlock->readerOut(); #endif } +#endif void Alarm::init_events(vector<string> &evn) { if (evn.empty() == false) { @@ -2426,7 +2555,7 @@ void Alarm::init_events(vector<string> &evn) } vector<string>::iterator j = evn.begin(); while (j != evn.end()) { - events->push_back(event(*j)); + //TODOevents->push_back(event(*j)); j++; } } /* if */ @@ -2481,8 +2610,8 @@ void Alarm::add_event(alarm_t& a, vector<string> &evn) throw(string&) /* * new event; add to event table */ - event e(*j); - events->push_back(e); + //event e(*j); + //events->push_back(e); /* * update per-alarm event list */ @@ -2509,29 +2638,13 @@ void Alarm::add_event(alarm_t& a, vector<string> &evn) throw(string&) alarms.vlock->readerOut(); #endif /* - * now, for the just-added event, subscribe + * now, for the just-added event */ k = find(events->v_event.begin(), events->v_event.end(), *j); if (k != events->v_event.end()) { k->push_alarm(a.name); - try { - k->dp = new Tango::DeviceProxy(k->device); - } catch(Tango::DevFailed& e) - { - TangoSys_MemStream out_stream; - out_stream << "Failed to connect device proxy=" << k->device << ends; - k->pop_alarm(a.name); //remove alarm/formula just added to event - //events->v_event.pop_back(); - events->v_event.erase(k); //remove event just added to event_table - throw out_stream.str(); - /*Tango::Except::re_throw_exception(e, - (const char *) "Error writing serial", - out_stream.str(), - (const char *) "Alarm::add_event()", Tango::ERR);*/ - } - DEBUG_STREAM << "Alarm::add_event(): connected to DeviceProxy: " \ - << k->device << endl; +#if 0 //now initialize value of this attribute try { Tango::DeviceAttribute attr_value; @@ -2567,10 +2680,12 @@ void Alarm::add_event(alarm_t& a, vector<string> &evn) throw(string&) //delete attr_value; throw out_stream.str(); } +#endif } } } //for (vector<string>::iterator j = evn.begin(); ... } +#if 0 void Alarm::subscribe_event(alarm_t& a, EventCallBack& ecb, vector<string> &evn) throw(string&) { //now subscribe all events @@ -2610,6 +2725,7 @@ void Alarm::subscribe_event(alarm_t& a, EventCallBack& ecb, vector<string> &evn) } // if (k != events->v_event.end())/ } // for } +#endif /* * because called asynchronously by alarm evaluating thread * will use an alarm to report errors @@ -2623,7 +2739,7 @@ void Alarm::do_alarm(bei_t& e) { ostringstream o; o << e.msg << endl; - WARN_STREAM << "Alarm::do_alarm(): " << o.str() << endl; + WARN_STREAM << "Alarm::"<<__func__<<": " << o.str() << endl; vector<event>::iterator found_ev = \ find(events->v_event.begin(), events->v_event.end(), e.ev_name); if (found_ev == events->v_event.end()) @@ -2640,14 +2756,14 @@ void Alarm::do_alarm(bei_t& e) if(pos_dot < pos_slash && pos_dot != string::npos && pos_colon != string::npos && pos_slash != string::npos) //dot is in the TANGO_HOST part { string ev_name_str_no_domain = ev_name_str.substr(0,pos_dot) + ev_name_str.substr(pos_colon); - DEBUG_STREAM << __FUNCTION__ << " event "<< e.ev_name << " not found, trying without domain: " << ev_name_str_no_domain; + //DEBUG_STREAM << "Alarm::"<<__func__<<": event "<< e.ev_name << " not found, trying without domain: " << ev_name_str_no_domain; found_ev = \ find(events->v_event.begin(), events->v_event.end(), ev_name_str_no_domain); } if (found_ev == events->v_event.end() && pos_slash != string::npos) { ev_name_str = ev_name_str.substr(pos_slash + 1);//remove FQDN - DEBUG_STREAM << __FUNCTION__ << " event "<< e.ev_name << " not found, trying without fqdn: " << ev_name_str; + //DEBUG_STREAM << "Alarm::"<<__func__<<": event "<< e.ev_name << " not found, trying without fqdn: " << ev_name_str; found_ev = \ find(events->v_event.begin(), events->v_event.end(), ev_name_str); } @@ -2661,7 +2777,7 @@ void Alarm::do_alarm(bei_t& e) ostringstream o; o << "TANGO Error but event '" \ << e.ev_name << "' not found in event table!" << ends; - WARN_STREAM << "Alarm::do_alarm(): " << o.str() << endl; + WARN_STREAM << "Alarm::"<<__func__<<": " << o.str() << endl; set_internal_alarm(e.ev_name, gettime(), o.str()); } } @@ -2705,7 +2821,7 @@ void Alarm::do_alarm(bei_t& e) errors[0].reason = CORBA::string_dup(it->second.ex_reason.c_str()); errors[0].origin = CORBA::string_dup(it->second.ex_origin.c_str()); Tango::DevFailed except(errors); - DEBUG_STREAM << "PUSHING EXCEPTION FOR " << it->second.attr_name << " " << it->second.ex_desc << "-" << it->second.ex_reason << "-" << it->second.ex_origin << endl; + DEBUG_STREAM << "Alarm::"<<__func__<<": PUSHING EXCEPTION FOR " << it->second.attr_name << " " << it->second.ex_desc << "-" << it->second.ex_reason << "-" << it->second.ex_origin << endl; push_change_event(it->second.attr_name, &except); push_archive_event(it->second.attr_name, &except); }catch(Tango::DevFailed &ex) @@ -2721,7 +2837,7 @@ void Alarm::do_alarm(bei_t& e) } return; } - DEBUG_STREAM << "Alarm::do_alarm(): arrived event=" << e.ev_name << endl; + DEBUG_STREAM << "Alarm::"<<__func__<<": arrived event=" << e.ev_name << endl; formula_res_t res; vector<event>::iterator found = \ @@ -2740,14 +2856,14 @@ void Alarm::do_alarm(bei_t& e) if(pos_dot < pos_slash && pos_dot != string::npos && pos_colon != string::npos && pos_slash != string::npos) //dot is in the TANGO_HOST part { string ev_name_str_no_domain = ev_name_str.substr(0,pos_dot) + ev_name_str.substr(pos_colon); - DEBUG_STREAM << __FUNCTION__ << " event "<< e.ev_name << " not found, trying without domain: " << ev_name_str_no_domain; + //DEBUG_STREAM << "Alarm::"<<__func__<<": event "<< e.ev_name << " not found, trying without domain: " << ev_name_str_no_domain; found = \ find(events->v_event.begin(), events->v_event.end(), ev_name_str_no_domain); } if (found == events->v_event.end() && pos_slash != string::npos) { ev_name_str = ev_name_str.substr(pos_slash + 1);//remove FQDN - DEBUG_STREAM << __FUNCTION__ << " event "<< e.ev_name << " not found, trying without fqdn: " << ev_name_str; + //DEBUG_STREAM << "Alarm::"<<__func__<<": event "<< e.ev_name << " not found, trying without fqdn: " << ev_name_str; found = \ find(events->v_event.begin(), events->v_event.end(), ev_name_str); } @@ -2761,7 +2877,7 @@ void Alarm::do_alarm(bei_t& e) ostringstream o; o << "event '" \ << e.ev_name << "' not found in event table!" << ends; - WARN_STREAM << "Alarm::do_alarm(): " << o.str() << endl; + WARN_STREAM << "Alarm::"<<__func__<<": " << o.str() << endl; set_internal_alarm(INTERNAL_ERROR, gettime(), o.str()); } } @@ -2793,7 +2909,7 @@ void Alarm::do_alarm(bei_t& e) try { string attr_values; res = eval_formula(it->second.formula_tree, attr_values); - DEBUG_STREAM << "Alarm::do_alarm(): Evaluation of " << it->second.formula << "; result=" << res.value << " quality=" << res.quality << endl; + DEBUG_STREAM << "Alarm::"<<__func__<<": Evaluation of " << it->second.formula << "; result=" << res.value << " quality=" << res.quality << endl; #ifndef _RW_LOCK alarms.unlock(); #else @@ -2827,7 +2943,7 @@ void Alarm::do_alarm(bei_t& e) } } catch(Tango::DevFailed & ex) { - WARN_STREAM << "Alarm::do_alarm(): EXCEPTION PUSHING EVENTS: " << ex.errors[0].desc << endl; + WARN_STREAM << "Alarm::"<<__func__<<": EXCEPTION PUSHING EVENTS: " << ex.errors[0].desc << endl; } } catch(std::out_of_range& ex) { @@ -2838,7 +2954,7 @@ void Alarm::do_alarm(bei_t& e) #endif ostringstream o; o << tmpname << ": in formula array index out of range!" << ends; - WARN_STREAM << "Alarm::do_alarm(): " << o.str() << endl; + WARN_STREAM << "Alarm::"<<__func__<<": " << o.str() << endl; set_internal_alarm(INTERNAL_ERROR, gettime(), o.str()); try { //DevFailed for push events @@ -2856,7 +2972,7 @@ void Alarm::do_alarm(bei_t& e) push_archive_event(it->second.attr_name, &except); } catch(Tango::DevFailed & ex) { - WARN_STREAM << "Alarm::do_alarm(): EXCEPTION PUSHING EVENTS: " << ex.errors[0].desc << endl; + WARN_STREAM << "Alarm::"<<__func__<<": EXCEPTION PUSHING EVENTS: " << ex.errors[0].desc << endl; } } catch(string & ex) { @@ -2867,7 +2983,7 @@ void Alarm::do_alarm(bei_t& e) #endif ostringstream o; o << tmpname << ": in formula err=" << ex << ends; - WARN_STREAM << "Alarm::do_alarm(): " << o.str() << endl; + WARN_STREAM << "Alarm::"<<__func__<<": " << o.str() << endl; set_internal_alarm(INTERNAL_ERROR, gettime(), o.str()); try { //DevFailed for push events @@ -2885,7 +3001,7 @@ void Alarm::do_alarm(bei_t& e) push_archive_event(it->second.attr_name, &except); } catch(Tango::DevFailed & ex) { - WARN_STREAM << "Alarm::do_alarm(): EXCEPTION PUSHING EVENTS: " << ex.errors[0].desc << endl; + WARN_STREAM << "Alarm::"<<__func__<<": EXCEPTION PUSHING EVENTS: " << ex.errors[0].desc << endl; } } } @@ -2899,7 +3015,7 @@ void Alarm::do_alarm(bei_t& e) ostringstream o; //o << j->first << ": not found formula in alarm table" << ends; o << (*j) << ": not found formula in alarm table" << ends; - WARN_STREAM << "Alarm::do_alarm(): " << o.str() << endl; + WARN_STREAM << "Alarm::"<<__func__<<": " << o.str() << endl; set_internal_alarm(INTERNAL_ERROR, gettime(), o.str()); try { //DevFailed for push events @@ -2917,7 +3033,7 @@ void Alarm::do_alarm(bei_t& e) push_archive_event(it->second.attr_name, &except); } catch(Tango::DevFailed & ex) { - WARN_STREAM << "Alarm::do_alarm(): EXCEPTION PUSHING EVENTS: " << ex.errors[0].desc << endl; + WARN_STREAM << "Alarm::"<<__func__<<": EXCEPTION PUSHING EVENTS: " << ex.errors[0].desc << endl; } } j++; @@ -2991,15 +3107,18 @@ void Alarm::timer_update() bool Alarm::remove_alarm(string& s) throw(string&) { + DEBUG_STREAM << "Alarm::"<<__func__<<": entering alm name=" << s << endl; #ifndef _RW_LOCK alarms.lock(); #else alarms.vlock->writerIn(); #endif alarm_container_t::iterator i = alarms.v_alarm.find(s); - if (i != alarms.v_alarm.end()) { + if (i != alarms.v_alarm.end()) { + DEBUG_STREAM << "Alarm::"<<__func__<<": found in table alm name=" << s << endl; for (set<string>::iterator j = i->second.s_event.begin(); \ j != i->second.s_event.end(); j++) { + DEBUG_STREAM << "Alarm::"<<__func__<<": looping event =" << *j << endl; /* * for each event into the per-alarm event list find * the event table entry and remove this alarm from @@ -3008,10 +3127,12 @@ bool Alarm::remove_alarm(string& s) throw(string&) vector<event>::iterator k = \ find(events->v_event.begin(), events->v_event.end(), *j); if (k != events->v_event.end()) { + DEBUG_STREAM << "Alarm::"<<__func__<<": found event =" << *j << " in vector events, removing from its alarm list name=" << i->second.name << endl; /* * remove alarm */ k->pop_alarm(i->second.name); + DEBUG_STREAM << "Alarm::"<<__func__<<": after pop_alarm" << endl; if (k->m_alarm.empty()) { /* * no more alarms associated to this event, unsubscribe @@ -3020,7 +3141,8 @@ bool Alarm::remove_alarm(string& s) throw(string&) DEBUG_STREAM << "Alarm::remove_alarm(): removing event '" \ << k->name << "' from event table" << endl; try { - k->dp->unsubscribe_event(k->eid); + events->stop(k->name); + events->remove(k->name, false); } catch (...) { ostringstream o; o << "unsubscribe_event() failed for " \ @@ -3034,9 +3156,7 @@ bool Alarm::remove_alarm(string& s) throw(string&) throw o.str(); //return false; } - delete k->dp; - k->dp = NULL; - events->v_event.erase(k); + //events->v_event.erase(k); } } else { /* @@ -3055,6 +3175,7 @@ bool Alarm::remove_alarm(string& s) throw(string&) //return false; } } /* for */ + events->update_property(); //delete proxy for actions if(i->second.dp_a) delete i->second.dp_a; @@ -3074,6 +3195,10 @@ bool Alarm::remove_alarm(string& s) throw(string&) #endif return true; } + else + { + WARN_STREAM << "Alarm::"<<__func__<<": NOT found in table alm name=" << s << endl; + } #ifndef _RW_LOCK alarms.unlock(); #else @@ -3874,6 +3999,96 @@ void Alarm::prepare_alarm_attr() dslock->writerOut(); } +//============================================================================= +string Alarm::remove_domain(string str) +{ + string::size_type end1 = str.find("."); + if (end1 == string::npos) + { + return str; + } + else + { + string::size_type start = str.find("tango://"); + if (start == string::npos) + { + start = 0; + } + else + { + start = 8; //tango:// len + } + string::size_type end2 = str.find(":", start); + if(end1 > end2) //'.' not in the tango host part + return str; + string th = str.substr(0, end1); + th += str.substr(end2, str.size()-end2); + return th; + } +} +//============================================================================= +//============================================================================= +bool Alarm::compare_without_domain(string str1, string str2) +{ + string str1_nd = remove_domain(str1); + string str2_nd = remove_domain(str2); + return (str1_nd==str2_nd); +} + +//============================================================================= +//============================================================================= +void Alarm::put_signal_property() +{ + vector<string> prop; +#ifndef _RW_LOCK + alarms.lock(); +#else + alarms.vlock->readerIn(); +#endif + alarm_container_t::iterator it; + for(it = alarms.v_alarm.begin(); it != alarms.v_alarm.end(); it++) + { + prop.push_back(it->first); + } +#ifndef _RW_LOCK + alarms.unlock(); +#else + alarms.vlock->readerOut(); +#endif + + + Tango::DbData data; + data.push_back(Tango::DbDatum("AlarmList")); + data[0] << prop; +#ifndef _USE_ELETTRA_DB_RW + Tango::Database *db = new Tango::Database(); +#else + //save properties using host_rw e port_rw to connect to database + Tango::Database *db; + if(host_rw != "") + db = new Tango::Database(host_rw,port_rw); + else + db = new Tango::Database(); + DEBUG_STREAM << __func__<<": connecting to db "<<host_rw<<":"<<port_rw; +#endif + try + { + DECLARE_TIME_VAR t0, t1; + GET_TIME(t0); + db->set_timeout_millis(10000); + db->put_device_property(get_name(), data); + GET_TIME(t1); + DEBUG_STREAM << __func__ << ": saving properties size="<<prop.size()<<" -> " << ELAPSED(t0, t1) << " ms" << endl; + } + catch(Tango::DevFailed &e) + { + stringstream o; + o << " Error saving properties='" << e.errors[0].desc << "'"; + WARN_STREAM << __FUNCTION__<< o.str(); + } + delete db; +} + /*----- PROTECTED REGION END -----*/ // Alarm::namespace_ending } // namespace diff --git a/src/Alarm.h b/src/Alarm.h index 681eb2d..29ec800 100644 --- a/src/Alarm.h +++ b/src/Alarm.h @@ -49,10 +49,11 @@ #include "alarm_table.h" #include "event_table.h" +#include "SubscribeThread.h" #define MAX_ALARMS 1024 -#define _USE_ELETTRA_DB_RW +//#define _USE_ELETTRA_DB_RW //using namespace Tango; @@ -60,6 +61,12 @@ class alarm_thread; class log_thread; class update_thread; +# define DECLARE_TIME_VAR struct timeval +# define GET_TIME(t) gettimeofday(&t, NULL); +# define ELAPSED(before, after) \ + 1000.0*(after.tv_sec-before.tv_sec) + \ + ((double)after.tv_usec-before.tv_usec) / 1000 + /*----- PROTECTED REGION END -----*/ // Alarm.h @@ -84,7 +91,13 @@ class Alarm : public TANGO_BASE_CLASS // Add your own data members public: - + bool compare_without_domain(string str1, string str2); + string remove_domain(string str); + //TODO: real attributes + Tango::DevLong attr_AttributeStartedNumber_read; + Tango::DevLong attr_AttributePausedNumber_read; + Tango::DevLong attr_AttributeStoppedNumber_read; + Tango::DevLong attr_AttributeNumber_read; /*----- PROTECTED REGION END -----*/ // Alarm::Data Members @@ -108,6 +121,8 @@ public: string dbPort; // InstanceName: Name used to associate configured alarm rules to this instance string instanceName; + // SubscribeRetryPeriod: retry period in seconds + Tango::DevLong subscribeRetryPeriod; // Attribute data members public: @@ -283,6 +298,8 @@ public: // Additional Method prototypes friend class alarm_thread; +friend class SubscribeThread; +friend class event_table; protected : @@ -291,7 +308,7 @@ private: alarm_table alarms; event_table* events; // event_list evlist; /* producer/consumer events list */ //gcc 4 problem?? - EventCallBack ecb; /* callback handles */ +// EventCallBack ecb; /* callback handles */ alarm_thread *almloop; update_thread *updateloop; vector<alarm_t> alarmed; @@ -299,6 +316,7 @@ private: vector<alarm_t> internal; ReadersWritersLock *internallock; ReadersWritersLock *dslock; + int period; //subscribe thread period static int instanceCounter; @@ -308,10 +326,14 @@ private: char dss[MAX_ALARMS][10124]; void init_events(vector<string> &evn); +#if 0 void init_alarms(map< string,vector<string> > &alarm_events); +#endif void add_alarm(alarm_t& a) throw(string&); void add_event(alarm_t& a, vector<string> &evn) throw(string&); +#if 0 void subscribe_event(alarm_t& a, EventCallBack& ecb, vector<string> &evn) throw(string&); +#endif // void do_alarm(bei_t& e); //gcc 4 problem?? bool remove_alarm(string& s) throw(string&); //void add_to_database(alarm_t& a) throw(string&); @@ -327,7 +349,10 @@ private: void prepare_alarm_attr(); //for read attribute alarm and push_change_event + SubscribeThread *thread; + public: + void put_signal_property(); void do_alarm(bei_t& e); //public instead of protected for gcc 4 problem?? void timer_update(); //public instead of protected for gcc 4 problem?? event_list evlist; /* producer/consumer events list */ //public instead of protected for gcc 4 problem?? diff --git a/src/Alarm.xmi b/src/Alarm.xmi index 1623e05..07e0ecb 100644 --- a/src/Alarm.xmi +++ b/src/Alarm.xmi @@ -1,7 +1,7 @@ <?xml version="1.0" encoding="ASCII"?> <pogoDsl:PogoSystem xmi:version="2.0" xmlns:xmi="http://www.omg.org/XMI" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:pogoDsl="http://www.esrf.fr/tango/pogo/PogoDsl"> <classes name="Alarm" pogoRevision="9.1"> - <description description="Elettra alarm device server" title="Elettra alarm device server" sourcePath="/home/graziano/workspace/git/alarm/src" language="Cpp" filestogenerate="XMI file,Code files,Protected Regions,Eclipse Project" license="GPL" copyright="" hasMandatoryProperty="false" hasConcreteProperty="true" hasAbstractCommand="false" hasAbstractAttribute="false"> + <description description="Elettra alarm device server" title="Elettra alarm device server" sourcePath="/home/graziano/workspace/git/alarm/src" language="Cpp" filestogenerate="XMI file,Code files,Protected Regions" license="GPL" copyright="" hasMandatoryProperty="false" hasConcreteProperty="true" hasAbstractCommand="false" hasAbstractAttribute="false"> <inheritances classname="Device_4Impl" sourcePath=""/> <identification contact="at elettra.eu - graziano.scalamera" author="graziano.scalamera" emailDomain="elettra.eu" classFamily="SoftwareSystem" siteSpecific="" platform="Unix Like" bus="Not Applicable" manufacturer="" reference=""/> </description> @@ -41,6 +41,10 @@ <type xsi:type="pogoDsl:StringType"/> <status abstract="false" inherited="false" concrete="true" concreteHere="true"/> </deviceProperties> + <deviceProperties name="SubscribeRetryPeriod" description="retry period in seconds"> + <type xsi:type="pogoDsl:IntType"/> + <status abstract="false" inherited="false" concrete="true" concreteHere="true"/> + </deviceProperties> <commands name="State" description="This command gets the device state (stored in its <i>device_state</i> data member) and returns it to the caller." execMethod="dev_state" displayLevel="OPERATOR" polledPeriod="0"> <argin description="none."> <type xsi:type="pogoDsl:VoidType"/> @@ -137,6 +141,6 @@ <status abstract="false" inherited="false" concrete="true" concreteHere="true"/> <properties description="" label="" unit="" standardUnit="" displayUnit="" format="" maxValue="" minValue="" maxAlarm="" minAlarm="" maxWarning="" minWarning="" deltaTime="" deltaValue=""/> </dynamicAttributes> - <preferences docHome="./doc_html" makefileHome="/usr/local/tango-8.1.2.c/share/pogo/preferences"/> + <preferences docHome="./doc_html" makefileHome="/usr/local/tango-9.2.2/share/pogo/preferences"/> </classes> </pogoDsl:PogoSystem> diff --git a/src/AlarmClass.cpp b/src/AlarmClass.cpp index e4c2fa1..eba9661 100644 --- a/src/AlarmClass.cpp +++ b/src/AlarmClass.cpp @@ -483,6 +483,19 @@ void AlarmClass::set_default_property() } else add_wiz_dev_prop(prop_name, prop_desc); + prop_name = "SubscribeRetryPeriod"; + prop_desc = "retry period in seconds"; + prop_def = ""; + vect_data.clear(); + if (prop_def.length()>0) + { + Tango::DbDatum data(prop_name); + data << vect_data ; + dev_def_prop.push_back(data); + add_wiz_dev_prop(prop_name, prop_desc, prop_def); + } + else + add_wiz_dev_prop(prop_name, prop_desc); } //-------------------------------------------------------- diff --git a/src/SubscribeThread.cpp b/src/SubscribeThread.cpp new file mode 100644 index 0000000..0ee1711 --- /dev/null +++ b/src/SubscribeThread.cpp @@ -0,0 +1,109 @@ +static const char *RcsId = "$Header: /home/cvsadm/cvsroot/fermi/servers/hdb++/hdb++es/src/SubscribeThread.cpp,v 1.6 2014-03-06 15:21:43 graziano Exp $"; +//+============================================================================= +// +// file : HdbEventHandler.cpp +// +// description : C++ source for thread management +// project : TANGO Device Server +// +// $Author: graziano $ +// +// $Revision: 1.6 $ +// +// $Log: SubscribeThread.cpp,v $ +// Revision 1.6 2014-03-06 15:21:43 graziano +// StartArchivingAtStartup, +// start_all and stop_all, +// archiving of first event received at subscribe +// +// Revision 1.5 2014-02-20 14:59:02 graziano +// name and path fixing +// removed start acquisition from add +// +// Revision 1.4 2013-09-24 08:42:21 graziano +// bug fixing +// +// Revision 1.3 2013-09-02 12:13:22 graziano +// cleaned +// +// Revision 1.2 2013-08-23 10:04:53 graziano +// development +// +// Revision 1.1 2013-07-17 13:37:43 graziano +// *** empty log message *** +// +// +// +// copyleft : European Synchrotron Radiation Facility +// BP 220, Grenoble 38043 +// FRANCE +// +//-============================================================================= + + +#include "Alarm.h" +#include "event_table.h" + + +namespace Alarm_ns +{ + +//============================================================================= +//============================================================================= +SubscribeThread::SubscribeThread(Alarm *dev):Tango::LogAdapter(dev) +{ + alarm_dev = dev; + period = 1; + shared = dev->events; +} +//============================================================================= +//============================================================================= +void SubscribeThread::updateProperty() +{ + shared->put_signal_property(); +} +//============================================================================= +//============================================================================= +void *SubscribeThread::run_undetached(void *ptr) +{ + INFO_STREAM << "SubscribeThread id="<<omni_thread::self()->id()<<endl; + while(shared->get_if_stop()==false) + { + // Try to subscribe + DEBUG_STREAM << "SubscribeThread::"<<__func__<<": AWAKE"<<endl; + updateProperty(); + alarm_dev->events->subscribe_events(); + int nb_to_subscribe = shared->nb_sig_to_subscribe(); + // And wait a bit before next time or + // wait a long time if all signals subscribed + { + omni_mutex_lock sync(*shared); + //shared->lock(); + if (nb_to_subscribe==0 && shared->action == NOTHING) + { + DEBUG_STREAM << "SubscribeThread::"<<__func__<<": going to wait nb_to_subscribe=0"<<endl; + //shared->condition.wait(); + shared->wait(); + //shared->wait(3*period*1000); + } + else if(shared->action == NOTHING) + { + DEBUG_STREAM << "SubscribeThread::"<<__func__<<": going to wait period="<<period<<" nb_to_subscribe="<<nb_to_subscribe<<endl; + //unsigned long s,n; + //omni_thread::get_time(&s,&n,period,0); + //shared->condition.timedwait(s,n); + shared->wait(period*1000); + } + //shared->unlock(); + } + } + shared->unsubscribe_events(); + INFO_STREAM <<"SubscribeThread::"<< __func__<<": exiting..."<<endl; + return NULL; +} +//============================================================================= +//============================================================================= + + + +} // namespace diff --git a/src/SubscribeThread.h b/src/SubscribeThread.h new file mode 100644 index 0000000..e34ddfd --- /dev/null +++ b/src/SubscribeThread.h @@ -0,0 +1,97 @@ +//============================================================================= +// +// file : HdbEventHandler.h +// +// description : Include for the HDbDevice class. +// +// project : Tango Device Server +// +// $Author: graziano $ +// +// $Revision: 1.5 $ +// +// $Log: SubscribeThread.h,v $ +// Revision 1.5 2014-03-06 15:21:43 graziano +// StartArchivingAtStartup, +// start_all and stop_all, +// archiving of first event received at subscribe +// +// Revision 1.4 2013-09-24 08:42:21 graziano +// bug fixing +// +// Revision 1.3 2013-09-02 12:11:32 graziano +// cleaned +// +// Revision 1.2 2013-08-23 10:04:53 graziano +// development +// +// Revision 1.1 2013-07-17 13:37:43 graziano +// *** empty log message *** +// +// +// +// copyleft : European Synchrotron Radiation Facility +// BP 220, Grenoble 38043 +// FRANCE +// +//============================================================================= + +#ifndef _SUBSCRIBE_THREAD_H +#define _SUBSCRIBE_THREAD_H + +#include <tango.h> +#include <eventconsumer.h> +#include <stdint.h> +#include "event_table.h" + +/** + * @author $Author: graziano $ + * @version $Revision: 1.5 $ + */ + + // constants definitions here. + //----------------------------------------------- + +namespace Alarm_ns +{ + +//class ArchiveCB; +class Alarm; + +class SubscribeThread; + +//========================================================= +/** + * Create a thread retry to subscribe event. + */ +//========================================================= +class SubscribeThread: public omni_thread, public Tango::LogAdapter +{ +private: + /** + * Shared data + */ + event_table *shared; + /** + * HdbDevice object + */ + Alarm *alarm_dev; + + +public: + int period; + SubscribeThread(Alarm *dev); + void updateProperty(); + /** + * Execute the thread loop. + * This thread is awaken when a command has been received + * and falled asleep when no command has been received from a long time. + */ + void *run_undetached(void *); + void start() {start_undetached();} +}; + + +} // namespace_ns + +#endif // _SUBSCRIBE_THREAD_H diff --git a/src/event_table.cpp b/src/event_table.cpp index da0c2cc..40a3b63 100644 --- a/src/event_table.cpp +++ b/src/event_table.cpp @@ -16,6 +16,10 @@ #include <sys/time.h> #include <tango.h> #include "event_table.h" +#include "Alarm.h" + +//for get_event_system_for_event_id, to know if ZMQ +#include <eventconsumer.h> static const char __FILE__rev[] = __FILE__ " $Revision: 1.5 $"; @@ -136,13 +140,13 @@ event::event(string& s, value_t& v, Tango::TimeVal& t) : \ if (*c == '/') j++; if (j < num_slashes) - device.push_back(*c); + devname.push_back(*c); else if (*c != '/') - attribute.push_back(*c); + attname.push_back(*c); c++; } type = -1; - eid = 0; + event_id = SUB_ERR; err_counter = 0; valid = false; } @@ -158,13 +162,13 @@ event::event(string& s) : name(s) if (*c == '/') j++; if (j < num_slashes) - device.push_back(*c); + devname.push_back(*c); else if (*c != '/') - attribute.push_back(*c); + attname.push_back(*c); c++; } type = -1; - eid = 0; + event_id = SUB_ERR; err_counter = 0; valid = false; } @@ -172,12 +176,17 @@ event::event(string& s) : name(s) void event::push_alarm(string& n) { m_alarm.push_back(n); + cout << "event::"<<__func__<< ": event="<<name<<" alm="<< n << " size="<< m_alarm.size() << endl; } void event::pop_alarm(string& n) { + cout << "event::"<<__func__<< ": event="<<name<<" alm="<< n << " size="<< m_alarm.size() << endl; vector<string>::iterator it = find(m_alarm.begin(), m_alarm.end(), n); - m_alarm.erase(it); + if(it != m_alarm.end()) + m_alarm.erase(it); + else + cout << "event::"<<__func__<< ": event="<<name<<" ALARM '"<< n << "' NOT FOUND!"<< endl; } @@ -194,10 +203,10 @@ bool event::operator==(const string& s) /* * event_table class methods */ -void event_table::push_back(event e) +/*void event_table::push_back(event e) { - v_event.push_back(e); -} +// v_event.push_back(e);//TODO: replaced with add +}*/ void event_table::show(void) { @@ -211,11 +220,16 @@ void event_table::show(void) } } +event_table::event_table(Tango::DeviceImpl *s):Tango::LogAdapter(s) +{ + mydev = s; +} + unsigned int event_table::size(void) { return(v_event.size()); } - +#if 0 void event_table::init_proxy(void) throw(vector<string> &) { vector<string> proxy_error; @@ -304,6 +318,744 @@ void event_table::unsubscribe(void) throw(string&) if(o.str().length() > 0) throw o.str(); } +#endif +//============================================================================= +/** + * get signal by name. + */ +//============================================================================= +event *event_table::get_signal(string signame) +{ + //omni_mutex_lock sync(*this); + for (unsigned int i=0 ; i<v_event.size() ; i++) + { + event *sig = &v_event[i]; + if (sig->name==signame) + return sig; + } + for (unsigned int i=0 ; i<v_event.size() ; i++) + { + event *sig = &v_event[i]; + if (static_cast<Alarm_ns::Alarm *>(mydev)->compare_without_domain(sig->name,signame)) + return sig; + } + return NULL; +} + + +//============================================================================= +/** + * Stop saving on DB a signal. + */ +//============================================================================= +void event_table::stop(string &signame) +{ + DEBUG_STREAM <<"event_table::"<< __func__<<": entering signame="<< signame << endl; + ReaderLock lock(veclock); + for (unsigned int i=0 ; i<v_event.size() ; i++) + { + if (v_event[i].name==signame) + { + v_event[i].siglock->writerIn(); + if(!v_event[i].stopped) + { + v_event[i].stopped=true; + static_cast<Alarm_ns::Alarm *>(mydev)->attr_AttributeStoppedNumber_read++; + if(v_event[i].running) + { + static_cast<Alarm_ns::Alarm *>(mydev)->attr_AttributeStartedNumber_read--; + try + { + remove(signame, true); + } + catch (Tango::DevFailed &e) + { + //Tango::Except::print_exception(e); + INFO_STREAM << "event_table::stop: error removing " << signame << endl; + } + } + if(v_event[i].paused) + { + static_cast<Alarm_ns::Alarm *>(mydev)->attr_AttributePausedNumber_read--; + try + { + remove(signame, true); + } + catch (Tango::DevFailed &e) + { + //Tango::Except::print_exception(e); + INFO_STREAM << "event_table::stop: error removing " << signame << endl; + } + } + v_event[i].running=false; + v_event[i].paused=false; + } + v_event[i].siglock->writerOut(); + return; + } + } + for (unsigned int i=0 ; i<v_event.size() ; i++) + { +#ifndef _MULTI_TANGO_HOST + if (static_cast<Alarm_ns::Alarm *>(mydev)->compare_without_domain(v_event[i].name,signame)) +#else + if (!static_cast<Alarm_ns::Alarm *>(mydev)->compare_tango_names(v_event[i].name,signame)) +#endif + { + v_event[i].siglock->writerIn(); + if(!v_event[i].stopped) + { + v_event[i].stopped=true; + static_cast<Alarm_ns::Alarm *>(mydev)->attr_AttributeStoppedNumber_read++; + if(v_event[i].running) + { + static_cast<Alarm_ns::Alarm *>(mydev)->attr_AttributeStartedNumber_read--; + try + { + remove(signame, true); + } + catch (Tango::DevFailed &e) + { + //Tango::Except::print_exception(e); + INFO_STREAM << "event_table::stop: error removing " << signame << endl; + } + } + if(v_event[i].paused) + { + static_cast<Alarm_ns::Alarm *>(mydev)->attr_AttributePausedNumber_read--; + try + { + remove(signame, true); + } + catch (Tango::DevFailed &e) + { + //Tango::Except::print_exception(e); + INFO_STREAM << "event_table::stop: error removing " << signame << endl; + } + } + v_event[i].running=false; + v_event[i].paused=false; + } + v_event[i].siglock->writerOut(); + return; + } + } + + // if not found + Tango::Except::throw_exception( + (const char *)"BadSignalName", + "Signal " + signame + " NOT subscribed", + (const char *)"event_table::stop()"); +} + + +//============================================================================= +/** + * Remove a signal in the list. + */ +//============================================================================= +void event_table::remove(string &signame, bool stop) +{ + DEBUG_STREAM <<"event_table::"<< __func__<<": entering signame="<< signame << endl; + // Remove in signals list (vector) + { + if(!stop) + veclock.readerIn(); + event *sig = get_signal(signame); + int event_id = sig->event_id; + Tango::AttributeProxy *attr = sig->attr; + if(!stop) + veclock.readerOut(); + if(stop) + { + try + { + if(event_id != SUB_ERR && attr) + { + DEBUG_STREAM <<"event_table::"<< __func__<<": unsubscribing... "<< signame << endl; + //unlocking, locked in event_table::stop but possible deadlock if unsubscribing remote attribute with a faulty event connection + sig->siglock->writerOut(); + attr->unsubscribe_event(event_id); + sig->siglock->writerIn(); + DEBUG_STREAM <<"event_table::"<< __func__<<": unsubscribed... "<< signame << endl; + } + } + catch (Tango::DevFailed &e) + { + // Do nothing + // Unregister failed means Register has also failed + sig->siglock->writerIn(); + INFO_STREAM <<"event_table::"<< __func__<<": Exception unsubscribing " << signame << " err=" << e.errors[0].desc << endl; + } + } + + if(!stop) + veclock.writerIn(); + vector<event>::iterator pos = v_event.begin(); + + bool found = false; + for (unsigned int i=0 ; i<v_event.size() && !found ; i++, pos++) + { + event *sig = &v_event[i]; + if (sig->name==signame) + { + found = true; + if(stop) + { + DEBUG_STREAM <<"event_table::"<<__func__<< ": removing " << signame << endl; + //sig->siglock->writerIn(); //: removed, already locked in event_table::stop + try + { + if(sig->event_id != SUB_ERR) + { + delete sig->event_cb; + } + if(sig->attr) + delete sig->attr; + } + catch (Tango::DevFailed &e) + { + // Do nothing + // Unregister failed means Register has also failed + INFO_STREAM <<"event_table::"<< __func__<<": Exception deleting " << signame << " err=" << e.errors[0].desc << endl; + } + //sig->siglock->writerOut(); + DEBUG_STREAM <<"event_table::"<< __func__<<": stopped " << signame << endl; + } + if(!stop) + { + if(sig->running) + static_cast<Alarm_ns::Alarm *>(mydev)->attr_AttributeStartedNumber_read--; + if(sig->paused) + static_cast<Alarm_ns::Alarm *>(mydev)->attr_AttributePausedNumber_read--; + if(sig->stopped) + static_cast<Alarm_ns::Alarm *>(mydev)->attr_AttributeStoppedNumber_read--; + static_cast<Alarm_ns::Alarm *>(mydev)->attr_AttributeNumber_read--; + delete sig->siglock; + v_event.erase(pos); + DEBUG_STREAM <<"event_table::"<< __func__<<": removed " << signame << endl; + } + break; + } + } + pos = v_event.begin(); + if (!found) + { + for (unsigned int i=0 ; i<v_event.size() && !found ; i++, pos++) + { + event *sig = &v_event[i]; +#ifndef _MULTI_TANGO_HOST + if (static_cast<Alarm_ns::Alarm *>(mydev)->compare_without_domain(sig->name,signame)) +#else + if (!static_cast<Alarm_ns::Alarm *>(mydev)->compare_tango_names(sig->name,signame)) +#endif + { + found = true; + DEBUG_STREAM <<"event_table::"<<__func__<< ": removing " << signame << endl; + if(stop) + { + sig->siglock->writerIn(); + try + { + if(sig->event_id != SUB_ERR) + { + delete sig->event_cb; + } + if(sig->attr) + delete sig->attr; + } + catch (Tango::DevFailed &e) + { + // Do nothing + // Unregister failed means Register has also failed + INFO_STREAM <<"event_table::"<< __func__<<": Exception unsubscribing " << signame << " err=" << e.errors[0].desc << endl; + } + sig->siglock->writerOut(); + DEBUG_STREAM <<"event_table::"<< __func__<<": stopped " << signame << endl; + } + if(!stop) + { + if(sig->running) + static_cast<Alarm_ns::Alarm *>(mydev)->attr_AttributeStartedNumber_read--; + if(sig->paused) + static_cast<Alarm_ns::Alarm *>(mydev)->attr_AttributePausedNumber_read--; + if(sig->stopped) + static_cast<Alarm_ns::Alarm *>(mydev)->attr_AttributeStoppedNumber_read--; + static_cast<Alarm_ns::Alarm *>(mydev)->attr_AttributeNumber_read--; + delete sig->siglock; + v_event.erase(pos); + DEBUG_STREAM <<"event_table::"<< __func__<<": removed " << signame << endl; + } + break; + } + } + } + if(!stop) + veclock.writerOut(); + if (!found) + Tango::Except::throw_exception( + (const char *)"BadSignalName", + "Signal " + signame + " NOT subscribed", + (const char *)"event_table::remove()"); + } + // then, update property +/* if(!stop) + { + DEBUG_STREAM <<"event_table::"<< __func__<<": going to increase action... action="<<action<<"++" << endl; + if(action <= UPDATE_PROP) + action++; + //put_signal_property(); //TODO: wakeup thread and let it do it? -> signal() + signal(); + }*/ +} +void event_table::update_property() +{ + DEBUG_STREAM <<"event_table::"<< __func__<<": going to increase action... action="<<action<<"++" << endl; + if(action <= UPDATE_PROP) + action++; + //put_signal_property(); //TODO: wakeup thread and let it do it? -> signal() + signal(); +} +//============================================================================= +/** + * Remove a signal in the list. + */ +//============================================================================= +void event_table::unsubscribe_events() +{ + DEBUG_STREAM <<"event_table::"<<__func__<< " entering..."<< endl; + veclock.readerIn(); + vector<event> local_signals(v_event); + veclock.readerOut(); + for (unsigned int i=0 ; i<local_signals.size() ; i++) + { + event *sig = &local_signals[i]; + if (local_signals[i].event_id != SUB_ERR && sig->attr) + { + DEBUG_STREAM <<"event_table::"<<__func__<< " unsubscribe " << sig->name << " id="<<omni_thread::self()->id()<< endl; + try + { + sig->attr->unsubscribe_event(sig->event_id); + DEBUG_STREAM <<"event_table::"<<__func__<< " unsubscribed " << sig->name << endl; + } + catch (Tango::DevFailed &e) + { + // Do nothing + // Unregister failed means Register has also failed + INFO_STREAM <<"event_table::"<<__func__<< " ERROR unsubscribing " << sig->name << " err="<<e.errors[0].desc<< endl; + } + } + } + veclock.writerIn(); + for (unsigned int i=0 ; i<v_event.size() ; i++) + { + event *sig = &v_event[i]; + sig->siglock->writerIn(); + if (v_event[i].event_id != SUB_ERR && sig->attr) + { + delete sig->event_cb; + DEBUG_STREAM <<"event_table::"<<__func__<< " deleted cb " << sig->name << endl; + } + if(sig->attr) + { + delete sig->attr; + DEBUG_STREAM <<"event_table::"<<__func__<< " deleted proxy " << sig->name << endl; + } + sig->siglock->writerOut(); + delete sig->siglock; + DEBUG_STREAM <<"event_table::"<<__func__<< " deleted lock " << sig->name << endl; + } + DEBUG_STREAM <<"event_table::"<<__func__<< " ended loop, deleting vector" << endl; + + /*for (unsigned int j=0 ; j<signals.size() ; j++, pos++) + { + signals[j].event_id = SUB_ERR; + signals[j].event_conf_id = SUB_ERR; + signals[j].archive_cb = NULL; + signals[j].attr = NULL; + }*/ + v_event.clear(); + veclock.writerOut(); + DEBUG_STREAM <<"event_table::"<< __func__<< ": exiting..."<<endl; +} +//============================================================================= +/** + * Add a new signal. + */ +//============================================================================= +void event_table::add(string &signame, vector<string> contexts) +{ + add(signame, contexts, NOTHING, false); +} +//============================================================================= +/** + * Add a new signal. + */ +//============================================================================= +void event_table::add(string &signame, vector<string> contexts, int to_do, bool start) +{ + DEBUG_STREAM << "event_table::"<<__func__<<": Adding " << signame << " to_do="<<to_do<<" start="<<(start ? "Y" : "N")<< endl; + { + veclock.readerIn(); + event *sig; + // Check if already subscribed + bool found = false; + for (unsigned int i=0 ; i<v_event.size() && !found ; i++) + { + sig = &v_event[i]; + found = (sig->name==signame); + } + for (unsigned int i=0 ; i<v_event.size() && !found ; i++) + { + sig = &v_event[i]; + found = static_cast<Alarm_ns::Alarm *>(mydev)->compare_without_domain(sig->name,signame); + } + veclock.readerOut(); + //DEBUG_STREAM << "event_table::"<<__func__<<": signame="<<signame<<" found="<<(found ? "Y" : "N") << " start="<<(start ? "Y" : "N")<< endl; + if (found && !start) + Tango::Except::throw_exception( + (const char *)"BadSignalName", + "Signal " + signame + " already subscribed", + (const char *)"event_table::add()"); + event *signal; + if (!found && !start) + { + // on name, split device name and attrib name + string::size_type idx = signame.find_last_of("/"); + if (idx==string::npos) + { + Tango::Except::throw_exception( + (const char *)"SyntaxError", + "Syntax error in signal name " + signame, + (const char *)"event_table::add()"); + } + signal = new event(); + // Build Hdb Signal object + signal->name = signame; + signal->siglock = new(ReadersWritersLock); + signal->devname = signal->name.substr(0, idx); + signal->attname = signal->name.substr(idx+1); + signal->ex_reason = "NOT_connected"; + signal->ex_desc = "Attribute not subscribed"; + signal->ex_origin = "..."; + signal->attr = NULL; + signal->running = false; + signal->stopped = true; + signal->paused = false; + //DEBUG_STREAM << "event_table::"<<__func__<<": signame="<<signame<<" created signal"<< endl; + } + else if(found && start) + { + signal = sig; + signal->siglock->writerIn(); + signal->ex_reason = "NOT_connected"; + signal->ex_desc = "Attribute not subscribed"; + signal->ex_origin = "..."; + signal->siglock->writerOut(); + //DEBUG_STREAM << "created proxy to " << signame << endl; + // create Attribute proxy + signal->attr = new Tango::AttributeProxy(signal->name); //TODO: OK out of siglock? accessed only inside the same thread? + DEBUG_STREAM << "event_table::"<<__func__<<": signame="<<signame<<" created proxy"<< endl; + } + signal->event_id = SUB_ERR; + signal->evstate = Tango::ALARM; + signal->isZMQ = false; + signal->okev_counter = 0; + signal->okev_counter_freq = 0; + signal->nokev_counter = 0; + signal->nokev_counter_freq = 0; + signal->first = true; + signal->first_err = true; + clock_gettime(CLOCK_MONOTONIC, &signal->last_ev); + + if(found && start) + { + try + { + Tango::AttributeInfo info; + if(signal->attr) + { + info = signal->attr->get_config(); + } + } + catch (Tango::DevFailed &e) + { + INFO_STREAM <<"event_table::"<<__func__<< " ERROR for " << signame << " in get_config err=" << e.errors[0].desc << endl; + } + } + + //DEBUG_STREAM <<"event_table::"<< __func__<< " created proxy to " << signame << endl; + if (!found && !start) + { + veclock.writerIn(); + // Add in vector + v_event.push_back(*signal); + delete signal; + static_cast<Alarm_ns::Alarm *>(mydev)->attr_AttributeNumber_read++; + static_cast<Alarm_ns::Alarm *>(mydev)->attr_AttributeStoppedNumber_read++; + veclock.writerOut(); + //DEBUG_STREAM << "event_table::"<<__func__<<": signame="<<signame<<" push_back signal"<< endl; + } + else if(found && start) + { + + } + DEBUG_STREAM <<"event_table::"<< __func__<<": going to increase action... action="<<action<<" += " << to_do << endl; + if(action <= UPDATE_PROP) + action += to_do; + } + DEBUG_STREAM <<"event_table::"<< __func__<<": exiting... " << signame << endl; + signal(); + //condition.signal(); +} +//============================================================================= +/** + * Subscribe archive event for each signal + */ +//============================================================================= +void event_table::subscribe_events() +{ + /*for (unsigned int ii=0 ; ii<v_event.size() ; ii++) + { + event *sig2 = &v_event[ii]; + int ret = pthread_rwlock_trywrlock(&sig2->siglock); + DEBUG_STREAM << __func__<<": pthread_rwlock_trywrlock i="<<ii<<" name="<<sig2->name<<" just entered " << ret << endl; + if(ret == 0) pthread_rwlock_unlock(&sig2->siglock); + }*/ + //omni_mutex_lock sync(*this); + veclock.readerIn(); + for (unsigned int i=0 ; i<v_event.size() ; i++) + { + event *sig = &v_event[i]; + sig->siglock->writerIn(); + if (sig->event_id==SUB_ERR && !sig->stopped) + { + if(!sig->attr) + { + try + { + vector<string> contexts; //TODO!!! + add(sig->name, contexts, NOTHING, true); + } + catch (Tango::DevFailed &e) + { + //Tango::Except::print_exception(e); + INFO_STREAM << "event_table::subscribe_events: error adding " << sig->name <<" err="<< e.errors[0].desc << endl; + v_event[i].ex_reason = e.errors[0].reason; + v_event[i].ex_desc = e.errors[0].desc; + v_event[i].ex_origin = e.errors[0].origin; + v_event[i].siglock->writerOut(); + continue; + } + } + sig->event_cb = new EventCallBack(static_cast<Alarm_ns::Alarm *>(mydev)); + Tango::AttributeInfo info; + try + { + sig->siglock->writerOut(); + sig->siglock->readerIn(); + info = sig->attr->get_config(); + sig->siglock->readerOut(); + sig->siglock->writerIn(); + } + catch (Tango::DevFailed &e) + { + Tango::Except::print_exception(e); + //sig->siglock->writerOut(); + sig->siglock->readerOut(); + sig->siglock->writerIn(); + sig->event_id = SUB_ERR; + delete sig->event_cb; + sig->ex_reason = e.errors[0].reason; + sig->ex_desc = e.errors[0].desc; + sig->ex_origin = e.errors[0].origin; + sig->siglock->writerOut(); + continue; + } + sig->first = true; + sig->first_err = true; + DEBUG_STREAM << "event_table::"<<__func__<<":Subscribing for " << sig->name << " " << (sig->first ? "FIRST" : "NOT FIRST") << endl; + sig->siglock->writerOut(); + int event_id = SUB_ERR; + bool isZMQ = true; + bool err = false; + + try + { + event_id = sig->attr->subscribe_event( + Tango::CHANGE_EVENT, + sig->event_cb, + /*stateless=*/false); + /*sig->evstate = Tango::ON; + //sig->first = false; //first event already arrived at subscribe_event + sig->status.clear(); + sig->status = "Subscribed"; + DEBUG_STREAM << sig->name << " Subscribed" << endl;*/ + + // Check event source ZMQ/Notifd ? + Tango::ZmqEventConsumer *consumer = + Tango::ApiUtil::instance()->get_zmq_event_consumer(); + isZMQ = (consumer->get_event_system_for_event_id(event_id) == Tango::ZMQ); + + DEBUG_STREAM << sig->name << "(id="<< event_id <<"): Subscribed " << ((isZMQ)? "ZMQ Event" : "NOTIFD Event") << endl; + } + catch (Tango::DevFailed &e) + { + INFO_STREAM <<"event_table::"<<__func__<<": sig->attr->subscribe_event EXCEPTION:" << endl; + err = true; + Tango::Except::print_exception(e); + sig->siglock->writerIn(); + sig->ex_reason = e.errors[0].reason; + sig->ex_desc = e.errors[0].desc; + sig->ex_origin = e.errors[0].origin; + sig->event_id = SUB_ERR; + delete sig->event_cb; + sig->siglock->writerOut(); + } + if(!err) + { + sig->siglock->writerIn(); + sig->event_id = event_id; + sig->isZMQ = isZMQ; + sig->siglock->writerOut(); + } + } + else + { + sig->siglock->writerOut(); + } + } + veclock.readerOut(); + initialized = true; +} + +void event_table::start(string &signame) +{ + ReaderLock lock(veclock); + vector<string> contexts; + for (unsigned int i=0 ; i<v_event.size() ; i++) + { + if (v_event[i].name==signame) + { + v_event[i].siglock->writerIn(); + if(!v_event[i].running) + { + if(v_event[i].stopped) + { + static_cast<Alarm_ns::Alarm *>(mydev)->attr_AttributeStoppedNumber_read--; + try + { + add(signame, contexts, NOTHING, true); + } + catch (Tango::DevFailed &e) + { + //Tango::Except::print_exception(e); + INFO_STREAM << "event_table::start: error adding " << signame <<" err="<< e.errors[0].desc << endl; + v_event[i].ex_reason = e.errors[0].reason; + v_event[i].ex_desc = e.errors[0].desc; + v_event[i].ex_origin = e.errors[0].origin; + /*v_event[i].siglock->writerOut(); + return;*/ + } + } + v_event[i].running=true; + if(v_event[i].paused) + static_cast<Alarm_ns::Alarm *>(mydev)->attr_AttributePausedNumber_read--; + static_cast<Alarm_ns::Alarm *>(mydev)->attr_AttributeStartedNumber_read++; + v_event[i].paused=false; + v_event[i].stopped=false; + } + v_event[i].siglock->writerOut(); + return; + } + } + for (unsigned int i=0 ; i<v_event.size() ; i++) + { +#ifndef _MULTI_TANGO_HOST + if (static_cast<Alarm_ns::Alarm *>(mydev)->compare_without_domain(v_event[i].name,signame)) +#else + if (!static_cast<Alarm_ns::Alarm *>(mydev)->compare_tango_names(v_event[i].name,signame)) +#endif + { + v_event[i].siglock->writerIn(); + if(!v_event[i].running) + { + if(v_event[i].stopped) + { + static_cast<Alarm_ns::Alarm *>(mydev)->attr_AttributeStoppedNumber_read--; + try + { + add(signame, contexts, NOTHING, true); + } + catch (Tango::DevFailed &e) + { + //Tango::Except::print_exception(e); + INFO_STREAM << "event_table::start: error adding " << signame << endl; + v_event[i].ex_reason = e.errors[0].reason; + v_event[i].ex_desc = e.errors[0].desc; + v_event[i].ex_origin = e.errors[0].origin; + /*signals[i].siglock->writerOut(); + return;*/ + } + } + v_event[i].running=true; + if(v_event[i].paused) + static_cast<Alarm_ns::Alarm *>(mydev)->attr_AttributePausedNumber_read--; + static_cast<Alarm_ns::Alarm *>(mydev)->attr_AttributeStartedNumber_read++; + v_event[i].paused=false; + v_event[i].stopped=false; + } + v_event[i].siglock->writerOut(); + return; + } + } + + // if not found + Tango::Except::throw_exception( + (const char *)"BadSignalName", + "Signal " + signame + " NOT subscribed", + (const char *)"event_table::start()"); +} + +void event_table::start_all() +{ + ReaderLock lock(veclock); + vector<string> contexts; + for (unsigned int i=0 ; i<v_event.size() ; i++) + { + v_event[i].siglock->writerIn(); + if(!v_event[i].running) + { + if(v_event[i].stopped) + { + string signame = v_event[i].name; + static_cast<Alarm_ns::Alarm *>(mydev)->attr_AttributeStoppedNumber_read--; + try + { + add(signame, contexts, NOTHING, true); + } + catch (Tango::DevFailed &e) + { + //Tango::Except::print_exception(e); + INFO_STREAM << "event_table::start: error adding " << signame <<" err="<< e.errors[0].desc << endl; + v_event[i].ex_reason = e.errors[0].reason; + v_event[i].ex_desc = e.errors[0].desc; + v_event[i].ex_origin = e.errors[0].origin; + /*v_event[i].siglock->writerOut(); + return;*/ + } + } + v_event[i].running=true; + if(v_event[i].paused) + static_cast<Alarm_ns::Alarm *>(mydev)->attr_AttributePausedNumber_read--; + static_cast<Alarm_ns::Alarm *>(mydev)->attr_AttributeStartedNumber_read++; + v_event[i].paused=false; + v_event[i].stopped=false; + } + v_event[i].siglock->writerOut(); + } +} + void event_table::update_events(bei_t &e) throw(string&) { @@ -359,18 +1111,74 @@ void event_table::update_events(bei_t &e) throw(string&) } } +//============================================================================= +//============================================================================= +bool event_table::get_if_stop() +{ + //omni_mutex_lock sync(*this); + return stop_it; +} +//============================================================================= +//============================================================================= +void event_table::stop_thread() +{ + //omni_mutex_lock sync(*this); + stop_it = true; + signal(); + //condition.signal(); +} +//============================================================================= +/** + * return number of signals to be subscribed + */ +//============================================================================= +int event_table::nb_sig_to_subscribe() +{ + ReaderLock lock(veclock); + + int nb = 0; + for (unsigned int i=0 ; i<v_event.size() ; i++) + { + v_event[i].siglock->readerIn(); + if (v_event[i].event_id == SUB_ERR && !v_event[i].stopped) + { + nb++; + } + v_event[i].siglock->readerOut(); + } + return nb; +} +//============================================================================= +/** + * build a list of signal to set HDB device property + */ +//============================================================================= +void event_table::put_signal_property() +{ + DEBUG_STREAM << "event_table::"<<__func__<<": put_signal_property entering action=" << action << endl; + //ReaderLock lock(veclock); + if (action>NOTHING) + { + static_cast<Alarm_ns::Alarm *>(mydev)->put_signal_property(); + if(action >= UPDATE_PROP) + action--; + } + DEBUG_STREAM << "event_table::"<<__func__<<": put_signal_property exiting action=" << action << endl; +} + /* * EventCallBack class methods */ -EventCallBack::EventCallBack(void) +EventCallBack::EventCallBack(Tango::DeviceImpl *s):Tango::LogAdapter(s) { - e_ptr = NULL; + //e_ptr = NULL; + mydev = s; } EventCallBack::~EventCallBack(void) { - e_ptr = NULL; + //e_ptr = NULL; } void EventCallBack::push_event(Tango::EventData* ev) @@ -457,14 +1265,14 @@ void EventCallBack::push_event(Tango::EventData* ev) e.msg = o.str(); //cerr << o.str() << endl; } - e_ptr->push_back(e); + static_cast<Alarm_ns::Alarm *>(mydev)->evlist.push_back(e); } /* push_event() */ void EventCallBack::extract_values(Tango::DeviceAttribute *attr_value, vector<double> &val, int &type) { Tango::DevState stval; vector<Tango::DevState> v_st; -#if TANGO_VER >= 600 +#if 1//TANGO_VER >= 600 vector<Tango::DevULong> v_ulo; #endif vector<Tango::DevUChar> v_uch; @@ -474,6 +1282,8 @@ void EventCallBack::extract_values(Tango::DeviceAttribute *attr_value, vector<do vector<Tango::DevDouble> v_do; vector<Tango::DevFloat> v_fl; vector<Tango::DevBoolean> v_bo; + vector<Tango::DevLong64> v_lo64; + vector<Tango::DevULong64> v_ulo64; if (attr_value->get_type() == Tango::DEV_UCHAR) { *(attr_value) >> v_uch; @@ -502,7 +1312,7 @@ void EventCallBack::extract_values(Tango::DeviceAttribute *attr_value, vector<do for(vector<Tango::DevState>::iterator it = v_st.begin(); it != v_st.end(); it++) val.push_back((double)(*it)); //convert all to double type = Tango::DEV_STATE; -#if TANGO_VER >= 600 +#if 1//TANGO_VER >= 600 } else if (attr_value->get_type() == Tango::DEV_ULONG) { *(attr_value) >> v_ulo; for(vector<Tango::DevULong>::iterator it = v_ulo.begin(); it != v_ulo.end(); it++) @@ -524,6 +1334,16 @@ void EventCallBack::extract_values(Tango::DeviceAttribute *attr_value, vector<do for(vector<Tango::DevBoolean>::iterator it = v_bo.begin(); it != v_bo.end(); it++) val.push_back((double)(*it)); //convert all to double type = Tango::DEV_BOOLEAN; + } else if (attr_value->get_type() == Tango::DEV_LONG64) { + *(attr_value) >> v_lo64; + for(vector<Tango::DevLong64>::iterator it = v_lo64.begin(); it != v_lo64.end(); it++) + val.push_back((double)(*it)); //convert all to double + type = Tango::DEV_LONG64; + } else if (attr_value->get_type() == Tango::DEV_ULONG64) { + *(attr_value) >> v_ulo64; + for(vector<Tango::DevULong64>::iterator it = v_ulo64.begin(); it != v_ulo64.end(); it++) + val.push_back((double)(*it)); //convert all to double + type = Tango::DEV_ULONG64; } else { ostringstream o; @@ -532,10 +1352,10 @@ void EventCallBack::extract_values(Tango::DeviceAttribute *attr_value, vector<do } } -void EventCallBack::init(event_list* e) +/*void EventCallBack::init(event_list* e) { - e_ptr = e; -} + //e_ptr = e; +}*/ Tango::TimeVal gettime(void) diff --git a/src/event_table.h b/src/event_table.h index 8e2b5cb..6d995f9 100644 --- a/src/event_table.h +++ b/src/event_table.h @@ -29,6 +29,10 @@ using namespace std; #define TYPE_TANGO_ERR -2 #define TYPE_GENERIC_ERR -3 +#define SUB_ERR -1 +#define NOTHING 0 +#define UPDATE_PROP 1 + class event; class event_list; class event_table; @@ -41,9 +45,9 @@ typedef vector<Tango::DevDouble> value_t; */ class event { public: - string name, /* event name */ - device, /* device name */ - attribute; /* attribute name */ + string name; /* event name */ + string devname; + string attname; value_t value; /* event value */ int quality; //Tango::DevErrorList errors; @@ -56,10 +60,25 @@ class event { err_counter; /* molteplicita' errore */ //map<string, string> m_alarm; vector<string> m_alarm; - bool valid; - - Tango::DeviceProxy *dp; - unsigned int eid; + bool valid; //TODO: old + bool first;//TODO: new + bool first_err;//TODO: new + //Tango::DeviceProxy *dp; + Tango::AttributeProxy *attr; + Tango::DevState evstate; + unsigned int event_id; + bool isZMQ; + EventCallBack *event_cb; + bool running; + bool paused; + bool stopped; + uint32_t okev_counter; + uint32_t okev_counter_freq; + timeval last_okev; + uint32_t nokev_counter; + uint32_t nokev_counter_freq; + timeval last_nokev; + timespec last_ev; vector<string> filter; /* * methods @@ -74,6 +93,7 @@ class event { bool operator==(const event& e); // bool event::operator==(const string& s); //TODO: gcc 4 problem?? bool operator==(const string& s); + ReadersWritersLock *siglock; protected: private: }; @@ -114,36 +134,68 @@ class event_list : public omni_mutex { /* * store all the events */ -class event_table : public event , Tango::LogAdapter { +class event_table : public Tango::TangoMonitor, public Tango::LogAdapter { public: - event_table(Tango::DeviceImpl *s):Tango::LogAdapter(s) {} + event_table(Tango::DeviceImpl *s);//:Tango::LogAdapter(s) {mydev = s;} ~event_table(void) {} - void push_back(event e); + //void push_back(event e); void show(void); unsigned int size(void); +#if 0 void init_proxy(void) throw(vector<string> &); void free_proxy(void); void subscribe(EventCallBack& ecb) throw(vector<string> &);//throw(string&); void unsubscribe(void) throw(string&); +#endif + /** + * Add a new signal. + */ + void add(string &signame, vector<string> contexts); + void add(string &signame, vector<string> contexts, int to_do, bool start); + event *get_signal(string signame); + void stop(string &signame); + void remove(string &signame, bool stop); + void subscribe_events(); + void unsubscribe_events(); + void start(string &signame); + void start_all(); void update_events(bei_t& e) throw(string&); + void update_property(); + /** + * return number of signals to be subscribed + */ + int nb_sig_to_subscribe(); + /** + * build a list of signal to set HDB device property + */ + void put_signal_property(); + bool is_initialized(); + bool get_if_stop(); + void stop_thread(); vector<event> v_event; - protected: + ReadersWritersLock veclock; + bool stop_it; + bool initialized; + int action; private: + Tango::DeviceImpl *mydev; }; /* class event_table */ /* * event callback */ -class EventCallBack : public Tango::CallBack { +class EventCallBack : public Tango::CallBack, public Tango::LogAdapter +{ public: - EventCallBack(void); + EventCallBack(Tango::DeviceImpl *s); ~EventCallBack(void); void push_event(Tango::EventData* ev); - void init(event_list* e); + //void init(event_list* e); void extract_values(Tango::DeviceAttribute *attr_value, vector<double> &val, int &type); private: - event_list* e_ptr; + //event_list* e_ptr; + Tango::DeviceImpl *mydev; }; /* -- GitLab