From 6083f8e9c32792a2931cfebf925a67ba7c9c4fb5 Mon Sep 17 00:00:00 2001 From: gscalamera <graziano.scalamera@elettra.eu> Date: Tue, 16 Mar 2021 16:52:07 +0100 Subject: [PATCH] Fix various data races and deadlocks --- src/AlarmHandler.cpp | 256 +++++++++++++++++++++++++++++++--------- src/AlarmHandler.h | 2 + src/SubscribeThread.cpp | 4 +- src/alarm_table.cpp | 42 +++---- src/alarm_table.h | 64 +++++++--- src/event_table.cpp | 121 ++++++++++++------- src/event_table.h | 9 +- src/update-thread.cpp | 3 +- 8 files changed, 361 insertions(+), 140 deletions(-) diff --git a/src/AlarmHandler.cpp b/src/AlarmHandler.cpp index 62a0b4f..149793d 100644 --- a/src/AlarmHandler.cpp +++ b/src/AlarmHandler.cpp @@ -214,10 +214,14 @@ void AlarmHandler::delete_device() if(!shutting_down && !restarting) alarms.del_rwlock(); //otherwise moved in alarm_table destructor alarms.stop_cmdthread(); + events->stop_thread(); + usleep(50000); + DEBUG_STREAM << "AlarmHandler::delete_device(): stopped alarm and log threads!" << endl; delete events; + thread->join(nullptr); sleep(1); //wait for alarm_thread, update_thread and log_thread to exit //delete almloop; - DEBUG_STREAM << "AlarmHandler::delete_device(): stopped alarm and log threads!" << endl; + DEBUG_STREAM << "AlarmHandler::delete_device(): deleted events" << endl; //delete proxy for actions @@ -260,7 +264,6 @@ void AlarmHandler::delete_device() * clear all data structures */ alarms.v_alarm.clear(); - events->v_event.clear(); evlist.clear(); /* for (int i = ds_num - 1; i >= 0; i--) { CORBA::string_free(ds[i]); @@ -317,6 +320,7 @@ void AlarmHandler::delete_device() */ alarmed.clear(); delete alarmedlock; + delete savedlock; delete internallock; delete dslock; @@ -372,6 +376,7 @@ void AlarmHandler::init_device() alarmedlock = new(ReadersWritersLock); internallock = new(ReadersWritersLock); dslock = new(ReadersWritersLock); + savedlock = new(ReadersWritersLock); alarms.set_dev(this); /*----- PROTECTED REGION END -----*/ // AlarmHandler::init_device_before @@ -511,7 +516,7 @@ void AlarmHandler::init_device() } }*/ try { - alarms.get_alarm_list_db(tmp_alm_vec, saved_alarms); + alarms.get_alarm_list_db(tmp_alm_vec, saved_alarms, savedlock);//holds write locks savedlock } catch(string & e) { ERROR_STREAM << "AlarmHandler::init_device(): " << e << endl; @@ -1515,7 +1520,7 @@ void AlarmHandler::load(Tango::DevString argin) //std::transform(s.begin(), s.end(), s.begin(), (int(*)(int))tolower); //transform to lowercase Tango::TimeVal ts = gettime(); try { - load_alarm(s, alm, evn); + load_alarm(s, alm, evn);//doesn't hold locks } catch(Tango::DevFailed& e) { ostringstream err; @@ -1528,7 +1533,7 @@ void AlarmHandler::load(Tango::DevString argin) } try { - add_alarm(alm); + add_alarm(alm);//holds alarm writer lock + reader lock } catch (string& err) { //TODO: not throwing string exception WARN_STREAM << "AlarmHandler::load(): " << err << endl; Tango::Except::throw_exception( \ @@ -1565,18 +1570,23 @@ void AlarmHandler::load(Tango::DevString argin) (const char*)"AlarmHandler::load()", Tango::ERR); } #endif +#if 0//already saved attribute property by subscribe thread alarms.save_alarm_conf_db(alm.attr_name, alm.name, "", "", alm.enabled, //add new alarm on log before subscribe event alm.formula, alm.on_delay, alm.off_delay, alm.grp2str(), alm.lev, alm.msg, alm.url, alm.cmd_name_a, alm.cmd_name_n, alm.silent_time); //but if it fails remove it from table + DEBUG_STREAM << "AlarmHandler::LOAD save_alarm_conf_db-" << alm.attr_name << endl; + string conf_str; alm.confstr(conf_str); + savedlock->writerIn(); saved_alarms.insert(make_pair(alm.attr_name,conf_str)); - + savedlock->writerOut(); +#endif alarms.vlock->readerIn(); alarm_container_t::iterator i = alarms.v_alarm.find(alm.name); if(i != alarms.v_alarm.end()) { try { - add_event(i->second, evn);//moved after events->add + add_event(i->second, evn);//moved after events->add //holds alarm reader lock } catch (string& err) { WARN_STREAM << "AlarmHandler::"<<__func__<<": string exception=" << err << endl; //TODO: handle error @@ -3030,7 +3040,7 @@ void AlarmHandler::re_load_all() /*----- PROTECTED REGION ID(AlarmHandler::re_load_all) ENABLED START -----*/ // Add your own code vector<string> tmp_alm_vec; - alarms.get_alarm_list_db(tmp_alm_vec, saved_alarms); + alarms.get_alarm_list_db(tmp_alm_vec, saved_alarms, savedlock);//holds write locks savedlock for(const auto &it_al : tmp_alm_vec) { bool modify_err=false; @@ -3447,6 +3457,7 @@ void AlarmHandler::do_alarm(bei_t& e) ostringstream o; o << e.msg; WARN_STREAM << "AlarmHandler::"<<__func__<<": " << o.str() << endl; + events->veclock.readerIn(); vector<event>::iterator found_ev = \ find(events->v_event.begin(), events->v_event.end(), e.ev_name); if (found_ev == events->v_event.end()) @@ -3501,6 +3512,10 @@ void AlarmHandler::do_alarm(bei_t& e) found_ev->quality = e.quality; //LOOP ALARMS IN WHICH THIS EVENT IS USED list<string> m_alarm=found_ev->m_alarm.show(); + string found_ev_ex_reason = found_ev->ex_reason; + string found_ev_ex_desc = found_ev->ex_desc; + string found_ev_ex_origin = found_ev->ex_origin; + events->veclock.readerOut(); list<string>::iterator j = m_alarm.begin(); while (j != m_alarm.end()) { @@ -3516,11 +3531,11 @@ void AlarmHandler::do_alarm(bei_t& e) it->second.ts_err_delay = gettime(); //first occurrance of this error, now begin to wait for err delay } if(e.type == TYPE_TANGO_ERR) - it->second.ex_reason = found_ev->ex_reason; + it->second.ex_reason = found_ev_ex_reason; else - it->second.ex_reason = found_ev->ex_reason; - it->second.ex_desc = found_ev->ex_desc; - it->second.ex_origin = found_ev->ex_origin; + it->second.ex_reason = found_ev_ex_reason; + it->second.ex_desc = found_ev_ex_desc; + it->second.ex_origin = found_ev_ex_origin; if(errorDelay > 0) { if((ts.tv_sec - errorDelay) > it->second.ts_err_delay.tv_sec) //error is present and err delay has passed @@ -3599,6 +3614,7 @@ void AlarmHandler::do_alarm(bei_t& e) } DEBUG_STREAM << "AlarmHandler::"<<__func__<<": arrived event=" << e.ev_name << endl; formula_res_t res; + events->veclock.readerIn(); vector<event>::iterator found = \ find(events->v_event.begin(), events->v_event.end(), e.ev_name); if (found == events->v_event.end()) @@ -3655,19 +3671,22 @@ void AlarmHandler::do_alarm(bei_t& e) found->read_size = e.read_size; found->err_counter = 0; list<string> m_alarm=found->m_alarm.show(); + Tango::TimeVal ts = found->ts; + events->veclock.readerOut(); //do not hold events->veclock in do_alarm_eval which push events list<string>::iterator j = m_alarm.begin(); while (j != m_alarm.end()) { DEBUG_STREAM << "AlarmHandler::"<<__func__<<": before do_alarm_eval name=" << *j << " ev=" << e.ev_name << endl; - changed = do_alarm_eval(*j, e.ev_name, found->ts); + changed = do_alarm_eval(*j, e.ev_name, ts); if(changed) num_changed++; j++; } + if(num_changed==0) { - prepare_alm_mtx->lock(); alarms.vlock->readerIn(); + prepare_alm_mtx->lock(); alarm_container_t::iterator ai; size_t freq_ind = 0; for (ai = alarms.v_alarm.begin(); ai != alarms.v_alarm.end(); ai++) @@ -3675,8 +3694,8 @@ void AlarmHandler::do_alarm(bei_t& e) attr_alarmFrequency_read[freq_ind] = ai->second.freq_counter; freq_ind++; } - alarms.vlock->readerOut(); prepare_alm_mtx->unlock(); + alarms.vlock->readerOut(); push_change_event("alarmFrequency",&attr_alarmFrequency_read[0], listAlarms_sz); push_archive_event("alarmFrequency",&attr_alarmFrequency_read[0], listAlarms_sz); return; @@ -3729,9 +3748,7 @@ bool AlarmHandler::do_alarm_eval(string alm_name, string ev_name, Tango::TimeVal bool prev_error = false; formula_res_t res; //alarm_container_t::iterator it = alarms.v_alarm.find(j->first); - DEBUG_STREAM << "AlarmHandler::"<<__func__<<": before lock name=" << alm_name<< " ev=" << ev_name << endl; alarms.vlock->readerIn(); - DEBUG_STREAM << "AlarmHandler::"<<__func__<<": after lock name=" << alm_name<< " ev=" << ev_name << endl; alarm_container_t::iterator it = alarms.v_alarm.find(alm_name); if(it != alarms.v_alarm.end()) { @@ -4119,6 +4136,7 @@ bool AlarmHandler::remove_alarm(string& s) throw(string&) void AlarmHandler::set_internal_alarm(string name, Tango::TimeVal t, string msg, unsigned int count) { +#if 0 alarm_t alm; bool existing=false; ostringstream o; @@ -4196,6 +4214,7 @@ void AlarmHandler::set_internal_alarm(string name, Tango::TimeVal t, string msg, internal.push_back(alm); } internallock->writerOut(); +#endif } //============================================================== @@ -4223,8 +4242,10 @@ formula_res_t AlarmHandler::eval_expression(iter_t const& i, string &attr_values string val_d(i->value.begin(), i->value.end()); formula_res_t res; res.value = strtod(val_d.c_str(), 0); +#ifdef _DEBUG_FORMULA DEBUG_STREAM << " node value real = " << val_d << "(value="<<res.value<<" quality="<<res.quality<<")" << endl; - return res; +#endif + return res; } else if (i->value.id() == formula_grammar::val_hID) { @@ -4234,7 +4255,9 @@ formula_res_t AlarmHandler::eval_expression(iter_t const& i, string &attr_values throw err.str(); } string val_d(i->value.begin(), i->value.end()); +#ifdef _DEBUG_FORMULA DEBUG_STREAM << " node value hex = " << val_d << endl; +#endif formula_res_t res; res.value = strtod(val_d.c_str(), 0); return res; @@ -4248,7 +4271,9 @@ formula_res_t AlarmHandler::eval_expression(iter_t const& i, string &attr_values } string val_st(i->value.begin(), i->value.end()); double st = i->value.value(); //get value directly from node saved with access_node_d +#ifdef _DEBUG_FORMULA DEBUG_STREAM << " node value state : " << val_st << "=" << st << endl; +#endif formula_res_t res; res.value = st; return res; @@ -4262,7 +4287,9 @@ formula_res_t AlarmHandler::eval_expression(iter_t const& i, string &attr_values } string val_st(i->value.begin(), i->value.end()); double st = i->value.value(); //get value directly from node saved with access_node_d +#ifdef _DEBUG_FORMULA DEBUG_STREAM << " node value alarm enum state : " << val_st << "=" << st << endl; +#endif formula_res_t res; res.value = st; return res; @@ -4277,14 +4304,18 @@ formula_res_t AlarmHandler::eval_expression(iter_t const& i, string &attr_values string val_quality(i->value.begin(), i->value.end()); double quality = i->value.value(); //get value directly from node saved with access_node_d +#ifdef _DEBUG_FORMULA DEBUG_STREAM << " node value quality : " << val_quality << "=" << quality << endl; +#endif formula_res_t res; res.value = quality; return res; } else if (i->value.id() == formula_grammar::unary_exprID) { +#ifdef _DEBUG_FORMULA DEBUG_STREAM << " node unary expression: " << string(i->value.begin(), i->value.end()) << endl; +#endif if(i->children.size() != 1) { err << "in node unary_exprID(" << string(i->value.begin(), i->value.end()) << ") children=" << i->children.size(); @@ -4313,7 +4344,9 @@ formula_res_t AlarmHandler::eval_expression(iter_t const& i, string &attr_values } else if (i->value.id() == formula_grammar::mult_exprID) { +#ifdef _DEBUG_FORMULA DEBUG_STREAM << " node mult expression: " << string(i->value.begin(), i->value.end()) << endl; +#endif if(i->children.size() != 2) { err << "in node mult_exprID(" << string(i->value.begin(), i->value.end()) << ") children=" << i->children.size(); @@ -4337,7 +4370,9 @@ formula_res_t AlarmHandler::eval_expression(iter_t const& i, string &attr_values } else if (i->value.id() == formula_grammar::add_exprID) { +#ifdef _DEBUG_FORMULA DEBUG_STREAM << " node add expression: " << string(i->value.begin(), i->value.end()) << endl; +#endif if(i->children.size() != 2) { err << "in node add_exprID(" << string(i->value.begin(), i->value.end()) << ") children=" << i->children.size(); @@ -4361,7 +4396,9 @@ formula_res_t AlarmHandler::eval_expression(iter_t const& i, string &attr_values } else if (i->value.id() == formula_grammar::event_ID) { +#ifdef _DEBUG_FORMULA DEBUG_STREAM << " node event" << string(i->value.begin(), i->value.end()) << endl; +#endif formula_res_t ind; if(i->children.size() != 2) { @@ -4376,21 +4413,27 @@ formula_res_t AlarmHandler::eval_expression(iter_t const& i, string &attr_values { formula_res_t res = eval_expression(i->children.begin(), attr_values, (int)ind.value); res.value = res.quality; +#ifdef _DEBUG_FORMULA DEBUG_STREAM << " node event.quality -> " << res.value << endl; +#endif return res; } else if(string((i->children.begin()+1)->value.begin(), (i->children.begin()+1)->value.end()) == ".alarm") { formula_res_t res = eval_expression(i->children.begin(), attr_values, (int)ind.value); res.value = (res.value == _UNACK) || (res.value == _ACKED); +#ifdef _DEBUG_FORMULA DEBUG_STREAM << " node event.alarm -> " << res.value << endl; +#endif return res; } else if(string((i->children.begin()+1)->value.begin(), (i->children.begin()+1)->value.end()) == ".normal") { formula_res_t res = eval_expression(i->children.begin(), attr_values, (int)ind.value); res.value = (res.value == _NORM) || (res.value == _RTNUN); +#ifdef _DEBUG_FORMULA DEBUG_STREAM << " node event.normal -> " << res.value << endl; +#endif return res; } } @@ -4405,14 +4448,15 @@ formula_res_t AlarmHandler::eval_expression(iter_t const& i, string &attr_values { if(i->children.size() != 0) { - err << "in node nameID(" << string(i->value.begin(), i->value.end()) << ") children=" << i->children.size(); - throw err.str(); - } - vector<event>::iterator it = events->v_event.begin(); - string s(i->value.begin(), i->value.end()); - std::transform(s.begin(), s.end(), s.begin(), (int(*)(int))tolower); //transform to lowercase - while ((it != events->v_event.end()) && (it->name != s)) - it++; + err << "in node nameID(" << string(i->value.begin(), i->value.end()) << ") children=" << i->children.size(); + throw err.str(); + } + events->veclock.readerIn(); + vector<event>::iterator it = events->v_event.begin(); + string s(i->value.begin(), i->value.end()); + std::transform(s.begin(), s.end(), s.begin(), (int(*)(int))tolower); //transform to lowercase + while ((it != events->v_event.end()) && (it->name != s)) + it++; if (it != events->v_event.end()) { if(!it->valid) @@ -4421,6 +4465,7 @@ formula_res_t AlarmHandler::eval_expression(iter_t const& i, string &attr_values err << "attribute '" << string(i->value.begin(), i->value.end()) << "' exception: '" << it->ex_desc << "'"; else err << "attribute '" << string(i->value.begin(), i->value.end()) << "' value not valid!"; + events->veclock.readerOut(); throw err.str(); } else if(it->type != Tango::DEV_STRING && it->value.empty()) @@ -4429,6 +4474,7 @@ formula_res_t AlarmHandler::eval_expression(iter_t const& i, string &attr_values err << "attribute '" << string(i->value.begin(), i->value.end()) << "' exception: '" << it->ex_desc << "'"; else err << "attribute '" << string(i->value.begin(), i->value.end()) << "' value not initialized!!"; + events->veclock.readerOut(); throw err.str(); } ostringstream temp_attr_val; @@ -4442,12 +4488,16 @@ formula_res_t AlarmHandler::eval_expression(iter_t const& i, string &attr_values res.ex_reason = it->ex_reason; res.ex_desc = it->ex_desc; res.ex_origin = it->ex_origin; +#ifdef _DEBUG_FORMULA DEBUG_STREAM << " node name -> " << temp_attr_val.str() << " quality=" << res.quality << endl; +#endif res.value = it->value.at(ev_ind); //throw std::out_of_range + events->veclock.readerOut(); return res; } else { + events->veclock.readerOut(); err << "in event: (" << string(i->value.begin(), i->value.end()) << ") not found in event table"; throw err.str(); } @@ -4460,14 +4510,18 @@ formula_res_t AlarmHandler::eval_expression(iter_t const& i, string &attr_values throw err.str(); } string val_d(i->value.begin(), i->value.end()); +#ifdef _DEBUG_FORMULA DEBUG_STREAM << " node index = " << val_d << endl; +#endif formula_res_t res; res.value = strtod(val_d.c_str(), 0); return res; } else if (i->value.id() == formula_grammar::logical_exprID) { +#ifdef _DEBUG_FORMULA DEBUG_STREAM << " node logical expression: " << string(i->value.begin(), i->value.end()) << endl; +#endif if(i->children.size() != 2) { err << "in node logical_exprID(" << string(i->value.begin(), i->value.end()) << ") children=" << i->children.size(); @@ -4491,7 +4545,9 @@ formula_res_t AlarmHandler::eval_expression(iter_t const& i, string &attr_values } else if (i->value.id() == formula_grammar::bitwise_exprID) { +#ifdef _DEBUG_FORMULA DEBUG_STREAM << " node bitwise expression: " << string(i->value.begin(), i->value.end()) << endl; +#endif if(i->children.size() != 2) { err << "in node bitwise_exprID(" << string(i->value.begin(), i->value.end()) << ") children=" << i->children.size(); @@ -4546,7 +4602,9 @@ formula_res_t AlarmHandler::eval_expression(iter_t const& i, string &attr_values } else if (i->value.id() == formula_grammar::shift_exprID) { +#ifdef _DEBUG_FORMULA DEBUG_STREAM << " node shift expression: " << string(i->value.begin(), i->value.end()) << endl; +#endif if(i->children.size() != 2) { err << "in node shift_exprID(" << string(i->value.begin(), i->value.end()) << ") children=" << i->children.size(); @@ -4591,7 +4649,9 @@ formula_res_t AlarmHandler::eval_expression(iter_t const& i, string &attr_values } else if (i->value.id() == formula_grammar::equality_exprID) { +#ifdef _DEBUG_FORMULA DEBUG_STREAM << " node equality expression: " << string(i->value.begin(), i->value.end()) << endl; +#endif if(i->children.size() != 2) { err << "in node equality_exprID(" << string(i->value.begin(), i->value.end()) << ") children=" << i->children.size(); @@ -4612,6 +4672,7 @@ formula_res_t AlarmHandler::eval_expression(iter_t const& i, string &attr_values string name_id(i2_1->value.begin(), i2_1->value.end()); std::transform(name_id.begin(), name_id.end(), name_id.begin(), (int(*)(int))tolower); //transform to lowercase formula_res_t res; + events->veclock.readerIn(); vector<event>::iterator it = events->v_event.begin(); while ((it != events->v_event.end()) && (it->name != name_id)) @@ -4623,6 +4684,7 @@ formula_res_t AlarmHandler::eval_expression(iter_t const& i, string &attr_values err << "in node equality_exprID -> nameID(" << string(i2_1->value.begin(), i2_1->value.end()) << ") value not valid!"; if(it->ex_desc.length() > 0) err << " EX: '" << it->ex_desc << "'"; + events->veclock.readerOut(); throw err.str(); } else if(it->type != Tango::DEV_STRING && it->value.empty()) @@ -4630,6 +4692,7 @@ formula_res_t AlarmHandler::eval_expression(iter_t const& i, string &attr_values err << "in node nameID(" << string(i2_1->value.begin(), i2_1->value.end()) << ") value not initialized!!"; if(it->ex_desc.length() > 0) err << " EX: '" << it->ex_desc << "'"; + events->veclock.readerOut(); throw err.str(); } ostringstream temp_attr_val; @@ -4639,11 +4702,15 @@ formula_res_t AlarmHandler::eval_expression(iter_t const& i, string &attr_values res.ex_reason = it->ex_reason; res.ex_desc = it->ex_desc; res.ex_origin = it->ex_origin; +#ifdef _DEBUG_FORMULA DEBUG_STREAM << " node name -> " << temp_attr_val.str() << " quality=" << res.quality << endl; +#endif attr_val = string("'") + it->value_string + string("'"); + events->veclock.readerOut(); } else { + events->veclock.readerOut(); err << "in event: (" << string(i->value.begin(), i->value.end()) << ") not found in event table"; throw err.str(); } @@ -4689,7 +4756,9 @@ formula_res_t AlarmHandler::eval_expression(iter_t const& i, string &attr_values } else if (i->value.id() == formula_grammar::compare_exprID) { +#ifdef _DEBUG_FORMULA DEBUG_STREAM << " node compare expression: " << string(i->value.begin(), i->value.end()) << endl; +#endif if(i->children.size() != 2) { err << "in node compare_exprID(" << string(i->value.begin(), i->value.end()) << ") children=" << i->children.size(); @@ -4723,7 +4792,9 @@ formula_res_t AlarmHandler::eval_expression(iter_t const& i, string &attr_values } else if (i->value.id() == formula_grammar::funcID) { +#ifdef _DEBUG_FORMULA DEBUG_STREAM << " node function: " << string(i->value.begin(), i->value.end()) << endl; +#endif if(i->children.size() != 1) { err << "in node funcID(" << string(i->value.begin(), i->value.end()) << ") children=" << i->children.size(); @@ -4757,10 +4828,13 @@ formula_res_t AlarmHandler::eval_expression(iter_t const& i, string &attr_values } else if ((string(i->value.begin(), i->value.end()) == string("AND") || string(i->value.begin(), i->value.end()) == string("OR")) && i->children.begin()->value.id() == formula_grammar::nameID) { + events->veclock.readerIn(); vector<event>::iterator it = events->v_event.begin(); string s(i->children.begin()->value.begin(), i->children.begin()->value.end()); std::transform(s.begin(), s.end(), s.begin(), (int(*)(int))tolower); //transform to lowercase +#ifdef _DEBUG_FORMULA DEBUG_STREAM << " -> " << string(i->value.begin(), i->value.end()) << "(" << s << ")" << endl; +#endif while ((it != events->v_event.end()) && (it->name != s)) it++; if (it != events->v_event.end()) @@ -4772,6 +4846,7 @@ formula_res_t AlarmHandler::eval_expression(iter_t const& i, string &attr_values err << "attribute '" << string(i->value.begin(), i->value.end()) << "' exception: '" << it->ex_desc << "'"; else err << "attribute '" << string(i->value.begin(), i->value.end()) << "' value not valid!"; + events->veclock.readerOut(); throw err.str(); } else if(it->type != Tango::DEV_STRING && it->value.empty()) @@ -4780,6 +4855,7 @@ formula_res_t AlarmHandler::eval_expression(iter_t const& i, string &attr_values err << "attribute '" << string(i->value.begin(), i->value.end()) << "' exception: '" << it->ex_desc << "'"; else err << "attribute '" << string(i->value.begin(), i->value.end()) << "' value not initialized!!"; + events->veclock.readerOut(); throw err.str(); } @@ -4819,12 +4895,16 @@ formula_res_t AlarmHandler::eval_expression(iter_t const& i, string &attr_values res.ex_reason = it->ex_reason; res.ex_desc = it->ex_desc; res.ex_origin = it->ex_origin; +#ifdef _DEBUG_FORMULA DEBUG_STREAM << " node name -> " << temp_attr_val.str() << " quality=" << res.quality << endl; +#endif res.value = result; + events->veclock.readerOut(); return res; } else { + events->veclock.readerOut(); err << "in function " << string(i->value.begin(), i->value.end()) << " event (" << s << ") not found in event table" << ends; throw err.str(); } @@ -4837,7 +4917,9 @@ formula_res_t AlarmHandler::eval_expression(iter_t const& i, string &attr_values } else if (i->value.id() == formula_grammar::func_dualID) { +#ifdef _DEBUG_FORMULA DEBUG_STREAM << " node function dual: " << string(i->value.begin(), i->value.end()) << endl; +#endif if(i->children.size() != 2) { err << "in node func_dualID(" << string(i->value.begin(), i->value.end()) << ") children=" << i->children.size(); @@ -4883,7 +4965,9 @@ formula_res_t AlarmHandler::eval_expression(iter_t const& i, string &attr_values } else if (i->value.id() == formula_grammar::cond_exprID) { +#ifdef _DEBUG_FORMULA DEBUG_STREAM << " node ternary_if expression: " << string(i->value.begin(), i->value.end()) << endl; +#endif if(i->children.size() != 3) { err << "in node ternary_ifID(" << string(i->value.begin(), i->value.end()) << ") children=" << i->children.size(); @@ -4903,7 +4987,9 @@ formula_res_t AlarmHandler::eval_expression(iter_t const& i, string &attr_values } else { +#ifdef _DEBUG_FORMULA DEBUG_STREAM << " node unknown id: " << string(i->value.begin(), i->value.end()) << endl; +#endif { err << "node unknown!! value=" << string(i->value.begin(), i->value.end()); throw err.str(); @@ -4922,9 +5008,9 @@ void AlarmHandler::find_event_formula(tree_parse_info_t tree, vector<string> & e void AlarmHandler::eval_node_event(iter_t const& i, vector<string> & ev) { - DEBUG_STREAM << "In eval_node_event. i->value = '" << - string(i->value.begin(), i->value.end()) << - "' i->children.size() = " << i->children.size() << " NODE=" << rule_names[i->value.id()] << endl; + //DEBUG_STREAM << "In eval_node_event. i->value = '" << + // string(i->value.begin(), i->value.end()) << + // "' i->children.size() = " << i->children.size() << " NODE=" << rule_names[i->value.id()] << endl; ostringstream err; err << "Looking for event in formula tree: "; /*if (i->value.id() == formula_grammar::event_ID) @@ -4933,7 +5019,7 @@ void AlarmHandler::eval_node_event(iter_t const& i, vector<string> & ev) } else*/ if (i->value.id() == formula_grammar::nameID) { - DEBUG_STREAM << "eval_node_event(): find event name=" << string(i->value.begin(), i->value.end()) << endl; + //DEBUG_STREAM << "eval_node_event(): find event name=" << string(i->value.begin(), i->value.end()) << endl; if(i->children.size() != 0) { err << "in node nameID(" << string(i->value.begin(), i->value.end()) << ") children=" << i->children.size(); @@ -4953,11 +5039,11 @@ void AlarmHandler::eval_node_event(iter_t const& i, vector<string> & ev) void AlarmHandler::prepare_alarm_attr() { - prepare_alm_mtx->lock(); alarm_container_t::iterator ai; vector<alarm_t>::iterator aid; bool is_audible=false; alarms.vlock->readerIn(); + prepare_alm_mtx->lock(); outOfServiceAlarms_sz=0; shelvedAlarms_sz=0; acknowledgedAlarms_sz=0; @@ -4970,6 +5056,7 @@ void AlarmHandler::prepare_alarm_attr() string almstate; for (ai = alarms.v_alarm.begin(); ai != alarms.v_alarm.end(); ai++) { + //DEBUG_STREAM << __func__<<": looping v_alarm.size="<<alarms.v_alarm.size()<<", listAlarms_sz="<<listAlarms_sz<<" : "<<ai->second.name<<endl; #ifndef ALM_SUM_STR stringstream alm_summary; alm_summary << KEY(NAME_KEY) << ai->first << SEP; @@ -5176,8 +5263,13 @@ void AlarmHandler::prepare_alarm_attr() aid->lev = ai->second.lev; aid->is_new = ai->second.is_new; //copy is_new state //ai->second.is_new = 0; //and set state as not more new //12-06-08: StopNew command set it to 0 +#ifdef _CNT_ATOMIC + aid->on_counter.store(ai->second.on_counter); + aid->off_counter.store(ai->second.off_counter); +#else aid->on_counter = ai->second.on_counter; aid->off_counter = ai->second.off_counter; +#endif aid->ack = ai->second.ack; //if already acknowledged but has arrived new alarm ack is reset aid->silenced = ai->second.silenced; //update silenced from alarm table (maybe not necessary) aid->silent_time = ai->second.silent_time; //if already alarmed and not saved correctly in properties needed to update @@ -5220,8 +5312,13 @@ void AlarmHandler::prepare_alarm_attr() aid->lev = ai->second.lev; aid->is_new = ai->second.is_new; //copy is_new state //ai->second.is_new = 0; //and set state as not more new //12-06-08: StopNew command set it to 0 +#ifdef _CNT_ATOMIC + aid->on_counter.store(ai->second.on_counter); + aid->off_counter.store(ai->second.off_counter); +#else aid->on_counter = ai->second.on_counter; aid->off_counter = ai->second.off_counter; +#endif aid->ack = ai->second.ack; //if already acknowledged but has arrived new alarm ack is reset aid->silenced = ai->second.silenced; //update silenced from alarm table (maybe not necessary) aid->silent_time = ai->second.silent_time; //if already alarmed and not saved correctly in properties needed to update @@ -5257,8 +5354,13 @@ void AlarmHandler::prepare_alarm_attr() aid->url = ai->second.url; aid->grp = ai->second.grp; aid->lev = ai->second.lev; +#ifdef _CNT_ATOMIC + aid->on_counter.store(ai->second.on_counter); + aid->off_counter.store(ai->second.off_counter); +#else aid->on_counter = ai->second.on_counter; aid->off_counter = ai->second.off_counter; +#endif aid->ack = ai->second.ack; //if already acknowledged but has arrived new alarm ack is reset aid->is_new = ai->second.is_new; //copy is_new state aid->silenced = ai->second.silenced; //update silenced from alarm table (maybe not necessary) @@ -5300,8 +5402,8 @@ void AlarmHandler::prepare_alarm_attr() alarmSummary_sz++; } /* for */ *attr_alarmAudible_read = is_audible; - alarms.vlock->readerOut(); prepare_alm_mtx->unlock(); + alarms.vlock->readerOut(); vector<string> tmp_alarm_table; string is_new; ostringstream os1; @@ -5446,60 +5548,84 @@ bool AlarmHandler::compare_without_domain(string str1, string str2) void AlarmHandler::put_signal_property() { vector<string> prop; - alarms.vlock->readerIn();//TODO: avoid keeping lock - alarm_container_t::iterator it; - for(it = alarms.v_alarm.begin(); it != alarms.v_alarm.end(); it++) + alarms.vlock->readerIn(); + auto local_alarms(alarms.v_alarm); + alarms.vlock->readerOut(); + for(auto &it:local_alarms) { - prop.push_back(it->first); + prop.push_back(it.first); string conf_str; - it->second.confstr(conf_str); - map<string,string>::iterator itmap = saved_alarms.find(it->first); + it.second.confstr(conf_str); + savedlock->readerIn(); + auto itmap = saved_alarms.find(it.first); if(itmap == saved_alarms.end()) { - DEBUG_STREAM << __func__<<": SAVING '" << it->first << "'" << endl; - alarms.save_alarm_conf_db(it->second.attr_name, it->second.name, it->second.stat, it->second.ack, it->second.enabled, - it->second.formula, it->second.on_delay, it->second.off_delay, it->second.grp2str(), it->second.lev, it->second.msg, it->second.url, it->second.cmd_name_a, it->second.cmd_name_n, it->second.silent_time); - saved_alarms.insert(make_pair(it->first,conf_str)); + savedlock->readerOut(); + DEBUG_STREAM << __func__<<": SAVING '" <<it.first << "'" << endl; + DECLARE_TIME_VAR t0, t1; + GET_TIME(t0); + alarms.save_alarm_conf_db(it.second.attr_name,it.second.name,it.second.stat,it.second.ack,it.second.enabled, + it.second.formula,it.second.on_delay,it.second.off_delay,it.second.grp2str(),it.second.lev,it.second.msg,it.second.url,it.second.cmd_name_a,it.second.cmd_name_n,it.second.silent_time); + GET_TIME(t1); + DEBUG_STREAM << __func__ << ": SAVED '" <<it.first << "' in " << ELAPSED(t0, t1) << " ms" << endl; + //alarms.vlock->readerOut();//TODO: avoid keeping lock + savedlock->writerIn(); + saved_alarms.insert(make_pair(it.first,conf_str)); + savedlock->writerOut(); + //alarms.vlock->readerIn();//TODO: avoid keeping lock } else { string conf_string; - it->second.confstr(conf_string); + it.second.confstr(conf_string); //alarm found but configuration changed if(conf_string != itmap->second) { - DEBUG_STREAM << __func__<<": UPDATING " << it->first << endl; - alarms.save_alarm_conf_db(it->second.attr_name, it->second.name, it->second.stat, it->second.ack, it->second.enabled, - it->second.formula, it->second.on_delay, it->second.off_delay, it->second.grp2str(), it->second.lev, it->second.msg, it->second.url, it->second.cmd_name_a, it->second.cmd_name_n, it->second.silent_time); + DEBUG_STREAM << __func__<<": UPDATING " <<it.first << " because conf changed: \""<<conf_string<<"\"<<---"<< itmap->second << endl; + DECLARE_TIME_VAR t0, t1; + GET_TIME(t0); itmap->second = conf_string; + savedlock->readerOut(); + alarms.save_alarm_conf_db(it.second.attr_name,it.second.name,it.second.stat,it.second.ack,it.second.enabled, + it.second.formula,it.second.on_delay,it.second.off_delay,it.second.grp2str(),it.second.lev,it.second.msg,it.second.url,it.second.cmd_name_a,it.second.cmd_name_n,it.second.silent_time); + GET_TIME(t1); + DEBUG_STREAM << __func__ << ": UPDATED '" <<it.first << "' in " << ELAPSED(t0, t1) << " ms" << endl; + } + else + { + savedlock->readerOut(); } } } - alarms.vlock->readerOut(); + + savedlock->readerIn(); map<string, string>::iterator it2=saved_alarms.begin(); while(it2 != saved_alarms.end()) { if(!it2->first.empty())//TODO: should not be needed, bug if it happens { - alarms.vlock->readerIn(); - alarm_container_t::iterator found = alarms.v_alarm.find(it2->first); - if (found == alarms.v_alarm.end()) + auto found = local_alarms.find(it2->first); + if (found == local_alarms.end()) { - alarms.vlock->readerOut(); DEBUG_STREAM << __func__<<": DELETING '" << it2->first << "'" << endl; + DECLARE_TIME_VAR t0, t1; + GET_TIME(t0); alarms.delete_alarm_conf_db(it2->first); + GET_TIME(t1); + DEBUG_STREAM << __func__ << ": DELETED '" <<it2->first << "' in " << ELAPSED(t0, t1) << " ms" << endl; + //savedlock->readerOut();//TODO: with boost shared lock to be released to take exclusive + savedlock->writerIn(); saved_alarms.erase(it2); - } - else - { - alarms.vlock->readerOut(); + savedlock->writerOut(); + //savedlock->readerIn(); } } if(it2 != saved_alarms.end()) it2++; } - + savedlock->readerOut(); + Tango::DbData data; @@ -5543,6 +5669,24 @@ void AlarmHandler::put_signal_property() } delete db; } +//============================================================================= +//============================================================================= +bool AlarmHandler::check_signal_property() +{ + savedlock->readerIn(); + size_t saved_size=saved_alarms.size(); + savedlock->readerOut(); + + alarms.vlock->readerIn(); + size_t alarm_size=alarms.v_alarm.size(); + alarms.vlock->readerOut(); + if (saved_size<alarm_size) + { + DEBUG_STREAM << "AlarmHandler::"<<__func__<<": saved_size="<<saved_size<<" alarm_size="<< alarm_size << endl; + return true; + } + return false; +} //-------------------------------------------------------- /** * remove a AlarmState dynamic attribute without cleaning DB. diff --git a/src/AlarmHandler.h b/src/AlarmHandler.h index 9f670fd..e68f83a 100644 --- a/src/AlarmHandler.h +++ b/src/AlarmHandler.h @@ -561,6 +561,7 @@ private: ReadersWritersLock *dslock; int period; //subscribe thread period map<string, string> saved_alarms; + ReadersWritersLock *savedlock; static int instanceCounter; @@ -604,6 +605,7 @@ private: public: void put_signal_property(); + bool check_signal_property(); void do_alarm(bei_t& e); //public instead of protected for gcc 4 problem?? bool do_alarm_eval(string alm_name, string ev_name, Tango::TimeVal ts); void timer_update(); //public instead of protected for gcc 4 problem?? diff --git a/src/SubscribeThread.cpp b/src/SubscribeThread.cpp index d54d6dc..0e01254 100644 --- a/src/SubscribeThread.cpp +++ b/src/SubscribeThread.cpp @@ -77,12 +77,14 @@ void *SubscribeThread::run_undetached(void *ptr) updateProperty(); alarm_dev->events->subscribe_events(); int nb_to_subscribe = shared->nb_sig_to_subscribe(); + shared->check_signal_property(); //check if, while subscribing, new alarms to be saved in properties where added (update action) // And wait a bit before next time or // wait a long time if all signals subscribed { omni_mutex_lock sync(*shared); //shared->lock(); - if (nb_to_subscribe==0 && shared->action == NOTHING) + int act=shared->action.load(); + if (nb_to_subscribe==0 && act == NOTHING) { DEBUG_STREAM << "SubscribeThread::"<<__func__<<": going to wait nb_to_subscribe=0"<<endl; //shared->condition.wait(); diff --git a/src/alarm_table.cpp b/src/alarm_table.cpp index f794ae9..825fe91 100644 --- a/src/alarm_table.cpp +++ b/src/alarm_table.cpp @@ -26,22 +26,6 @@ static const char __FILE__rev[] = __FILE__ " $Revision: 1.5 $"; /* * alarm_t class methods */ -alarm_t::alarm_t() -{ - grp=0; - on_counter=0; - off_counter=0; - freq_counter=0; - stat = S_NORMAL; - ack = ACK; - on_delay = 0; - off_delay = 0; - silent_time = -1; - cmd_name_a=string(""); - cmd_name_n=string(""); - enabled=true; - shelved=false; -} bool alarm_t::operator==(const alarm_t &that) { @@ -60,7 +44,13 @@ void alarm_t::str2alm(const string &s) istringstream is(s); ostringstream temp_msg; string temp_grp; - is >> ts.tv_sec >> ts.tv_usec >> name >> stat >> ack >> on_counter >> lev >> silent_time >> temp_grp >> msg; //stop at first white space in msg + unsigned int on_cnt; + is >> ts.tv_sec >> ts.tv_usec >> name >> stat >> ack >> on_cnt >> lev >> silent_time >> temp_grp >> msg; //stop at first white space in msg +#ifdef _CNT_ATOMIC + on_counter.store(on_cnt); +#else + on_counter = on_cnt; +#endif temp_msg << is.rdbuf(); //read all remaining characters as msg msg += temp_msg.str(); str2grp(temp_grp); @@ -70,8 +60,13 @@ string alarm_t::alm2str(void) { ostringstream os; os.clear(); +#ifdef _CNT_ATOMIC + os << ts.tv_sec << "\t" << ts.tv_usec << "\t" << name << "\t" \ + << stat << "\t" << ack << "\t" << on_counter.load() << "\t" << lev << "\t" << silent_time << "\t" << grp2str() << "\t" << msg << ends; +#else os << ts.tv_sec << "\t" << ts.tv_usec << "\t" << name << "\t" \ << stat << "\t" << ack << "\t" << on_counter << "\t" << lev << "\t" << silent_time << "\t" << grp2str() << "\t" << msg << ends; +#endif return(os.str()); } @@ -189,7 +184,7 @@ void alarm_t::confstr(string &s) KEY(SILENT_TIME_KEY)<<silent_time << SEP << KEY(GROUP_KEY)<< grp2str() << SEP << KEY(MESSAGE_KEY)<< msg << SEP << - KEY(URL_KEY)<< msg << SEP << + KEY(URL_KEY)<< url << SEP << KEY(ON_COMMAND_KEY)<< cmd_name_a << SEP << KEY(OFF_COMMAND_KEY)<< cmd_name_n << SEP << KEY(ENABLED_KEY)<< (enabled ? "1" : "0"); @@ -819,7 +814,11 @@ vector<string> alarm_table::to_be_evaluated_list() void alarm_table::new_rwlock() { +#ifndef _USE_BOOST_LOCK vlock = new(ReadersWritersLock); +#else + vlock = new rwlock_t("VLOCK"); +#endif } void alarm_table::del_rwlock() { @@ -893,6 +892,7 @@ void alarm_table::save_alarm_conf_db(const string &att_name, const string &name, { Tango::DbDevice *db_dev = mydev->get_db_device(); db_dev->get_dbase()->put_device_attribute_property(dev_name,db_data); + //Tango::Util::instance()->get_database()->put_device_attribute_property(dev_name,db_data); } catch(Tango::DevFailed &e) { @@ -946,9 +946,8 @@ void alarm_table::delete_alarm_conf_db(string att_name) } } -void alarm_table::get_alarm_list_db(vector<string> &al_list, map<string, string> &saved_alarms) +void alarm_table::get_alarm_list_db(vector<string> &al_list, map<string, string> &saved_alarms, ReadersWritersLock *savedlock) { - saved_alarms.clear(); string dev_name(mydev->get_name()); vector<string> att_list; @@ -969,6 +968,8 @@ void alarm_table::get_alarm_list_db(vector<string> &al_list, map<string, string> { cout << __func__ << ": Exception reading configuration = " << e.errors[0].desc<<endl; } + savedlock->writerIn(); + saved_alarms.clear(); for (size_t i=0;i < db_data.size();/*i++*/) { Tango::DevLong64 nb_prop; @@ -1046,6 +1047,7 @@ void alarm_table::get_alarm_list_db(vector<string> &al_list, map<string, string> al_list.push_back(alm.str()); saved_alarms.insert(make_pair(alm_name,alm.str())); } + savedlock->writerOut(); #if 0 diff --git a/src/alarm_table.h b/src/alarm_table.h index d00b9c5..b4f01b3 100644 --- a/src/alarm_table.h +++ b/src/alarm_table.h @@ -45,6 +45,10 @@ #else #include <boost/spirit/include/classic_ast.hpp> //for ast parse trees (in tree_formula) #endif +//#define _USE_BOOST_LOCK +#ifdef _USE_BOOST_LOCK +#include <boost/thread/shared_mutex.hpp> +#endif //#include "log_thread.h" @@ -98,6 +102,20 @@ class alarm_table; class log_thread; class cmd_thread; +#ifdef _USE_BOOST_LOCK +struct rwlock_t +{ + string name; + rwlock_t(string n){name=n;}; + rwlock_t(){name=string("RWLOCK");}; + boost::shared_mutex mut; + void readerIn(){cout<<name<<": " << __func__<<endl;mut.lock_shared();} + void readerOut(){cout<<name<<": " << __func__<<endl;mut.unlock_shared();} + void writerIn(){cout<<name<<": " << __func__<<endl;mut.lock();} + void writerOut(){cout<<name<<": " << __func__<<endl;mut.unlock();} +}; +#endif + struct formula_res_t { @@ -274,15 +292,21 @@ class alarm_t { string ex_desc; string ex_origin; Tango::TimeVal ts; - string stat, - ack; + string stat{S_NORMAL}, + ack{ACK}; bool error; - bool enabled; - bool shelved; - unsigned int on_counter; - unsigned int off_counter; - unsigned int err_counter; - unsigned int freq_counter; + bool enabled{true}; + bool shelved{false}; +#ifdef _CNT_ATOMIC + atomic_uint on_counter= {0}; + atomic_uint off_counter= {0}; + atomic_uint err_counter= {0}; +#else + unsigned int on_counter= {0}; + unsigned int off_counter= {0}; + unsigned int err_counter= {0}; +#endif + unsigned int freq_counter{0}; tree_parse_info_t formula_tree; @@ -292,27 +316,27 @@ class alarm_t { bool to_be_evaluated; string msg; string url; - unsigned int grp; + unsigned int grp{0}; string lev; set<string> s_event; int is_new; Tango::TimeVal ts_on_delay; //says when it has gone in alarm status for the first time - unsigned int on_delay; //TODO: seconds, is it enough precision? + unsigned int on_delay{0}; //TODO: seconds, is it enough precision? Tango::TimeVal ts_off_delay; //says when it returned normal status - unsigned int off_delay; //TODO: seconds, is it enough precision? + unsigned int off_delay{0}; //TODO: seconds, is it enough precision? Tango::TimeVal ts_err_delay; //says when it has gone in error status for the first time Tango::TimeVal ts_time_silenced; //says when it has been silenced - int silent_time; //minutes max to be silent + int silent_time{-1}; //minutes max to be silent int silenced; //minutes still to be silent string attr_values; //attr_values string attr_values_delay; //attr_values of first occurrence of alarm waiting for on or off delay - string cmd_name_a; //action to execute: when NORMAL -> ALARM, cmd_name = cmd_dp_a/cmd_action_a + string cmd_name_a{""}; //action to execute: when NORMAL -> ALARM, cmd_name = cmd_dp_a/cmd_action_a string cmd_dp_a; //device proxy part of cmd_name_a string cmd_action_a; //action part of cmd_name_a bool send_arg_a; //send as string argument alarm name and attr values Tango::DeviceProxy *dp_a; - string cmd_name_n; //action to execute: when ALARM -> NORMAL, cmd_name_n = cmd_dp_n/cmd_action_n + string cmd_name_n{""}; //action to execute: when ALARM -> NORMAL, cmd_name_n = cmd_dp_n/cmd_action_n string cmd_dp_n; //device proxy part of cmd_name_n string cmd_action_n; //action part of cmd_name_n bool send_arg_n; //send as string argument alarm name and attr values @@ -320,7 +344,11 @@ class alarm_t { /* * methods */ - alarm_t(); //constructor + alarm_t() noexcept {}; //constructor +#ifdef _CNT_ATOMIC + alarm_t& operator=(const alarm_t& rhs) { on_counter = rhs.on_counter.load(); off_counter = rhs.off_counter.load(); err_counter = rhs.err_counter.load(); return *this; } + alarm_t(const alarm_t& rhs) { on_counter = rhs.on_counter.load(); off_counter = rhs.off_counter.load(); err_counter = rhs.err_counter.load();} +#endif void init_static_map(vector<string> &group_names); bool operator==(const alarm_t& that); bool operator==(const string& n); @@ -360,14 +388,18 @@ class alarm_table { vector<string> to_be_evaluated_list(); //vector<alarm_t> v_alarm; alarm_container_t v_alarm; +#ifndef _USE_BOOST_LOCK ReadersWritersLock *vlock; +#else + rwlock_t *vlock; +#endif void new_rwlock(); void del_rwlock(); void save_alarm_conf_db(const string &att_name, const string &name, const string &status, const string &ack, bool enabled, const string &formula, unsigned int on_delay, unsigned int off_delay, const string &grp, const string &url, const string &lev, const string &msg, const string &cmd_a, const string &cmd_n, int silent_time); void delete_alarm_conf_db(string att_name); - void get_alarm_list_db(vector<string> &al_list, map<string, string> &saved_alarms); + void get_alarm_list_db(vector<string> &al_list, map<string, string> &saved_alarms, ReadersWritersLock *savedlock); void init_cmdthread(); void stop_cmdthread(); Tango::TimeVal startup_complete; //to disable action execution at startup diff --git a/src/event_table.cpp b/src/event_table.cpp index 386740e..af43aa8 100644 --- a/src/event_table.cpp +++ b/src/event_table.cpp @@ -345,7 +345,7 @@ event_table::event_table(Tango::DeviceImpl *s):Tango::LogAdapter(s) { mydev = s; stop_it = false; - action = NOTHING; + action.store(NOTHING); } unsigned int event_table::size(void) @@ -734,10 +734,9 @@ void event_table::remove(string &signame, bool stop) } void event_table::update_property() { - DEBUG_STREAM <<"event_table::"<< __func__<<": going to increase action... action="<<action<<"++" << endl; - if(action <= UPDATE_PROP) - action++; - //put_signal_property(); //TODO: wakeup thread and let it do it? -> signal() + DEBUG_STREAM <<"event_table::"<< __func__<<": going to increase action... action="<<action.load()<<"++" << endl; + int expected=NOTHING; + action.compare_exchange_strong(expected, UPDATE_PROP); //if it is NOTHING, then change to UPDATE_PROP signal(); } //============================================================================= @@ -882,7 +881,7 @@ void event_table::add(string &signame, vector<string> contexts, int to_do, bool //DEBUG_STREAM << "created proxy to " << signame << endl; // create Attribute proxy signal->attr = new Tango::AttributeProxy(signal->name); //TODO: OK out of siglock? accessed only inside the same thread? - DEBUG_STREAM << "event_table::"<<__func__<<": signame="<<signame<<" created proxy"<< endl; + DEBUG_STREAM << "event_table::"<<__func__<<": signame="<<signame<<" created proxy"<< endl; } signal->event_id = SUB_ERR; signal->evstate = Tango::ALARM; @@ -916,9 +915,10 @@ void event_table::add(string &signame, vector<string> contexts, int to_do, bool { } - DEBUG_STREAM <<"event_table::"<< __func__<<": going to increase action... action="<<action<<" += " << to_do << endl; - if(action <= UPDATE_PROP) - action += to_do; + int act=action.load(); + DEBUG_STREAM <<"event_table::"<< __func__<<": going to increase action... action="<<act<<" += " << to_do << endl; + int expected=NOTHING; + action.compare_exchange_strong(expected, UPDATE_PROP); //if it is NOTHING, then change to UPDATE_PROP } DEBUG_STREAM <<"event_table::"<< __func__<<": exiting... " << signame << endl; signal(); @@ -939,12 +939,15 @@ void event_table::subscribe_events() if(ret == 0) pthread_rwlock_unlock(&sig2->siglock); }*/ //omni_mutex_lock sync(*this); - veclock.readerIn(); + list<string> l_events; + show(l_events); DEBUG_STREAM << "event_table::" << __func__ << ": going to subscribe " << v_event.size() << " attributes" << endl; - for (unsigned int i=0 ; i<v_event.size() ; i++) + for (auto it : l_events) { - event *sig = &v_event[i]; - sig->siglock->writerIn(); + veclock.readerIn(); + event *sig = get_signal(it); + sig->siglock->readerIn(); + string sig_name(sig->name); if (sig->event_id==SUB_ERR && !sig->stopped) { if(!sig->attr) @@ -967,26 +970,33 @@ void event_table::subscribe_events() o << "Error adding'" \ << sig->name << "' error=" << ex_desc; INFO_STREAM << "event_table::subscribe_events: " << o.str() << endl; - v_event[i].ex_reason = ex.ex_reason = ex_reason; - v_event[i].ex_desc = ex.ex_desc = ex_desc; -// v_event[i].ex_desc.erase(std::remove(v_event[i].ex_desc.begin(), v_event[i].ex_desc.end(), '\n'), v_event[i].ex_desc.end()); - v_event[i].ex_origin = ex.ex_origin = ex_origin; - v_event[i].ts = ex.ts = gettime(); - v_event[i].quality = ex.quality = Tango::ATTR_INVALID; + sig->ex_reason = ex.ex_reason = ex_reason; + sig->ex_desc = ex.ex_desc = ex_desc; +// sig->ex_desc.erase(std::remove(sig->ex_desc.begin(), sig->ex_desc.end(), '\n'), sig->ex_desc.end()); + sig->ex_origin = ex.ex_origin = ex_origin; + sig->ts = ex.ts = gettime(); + sig->quality = ex.quality = Tango::ATTR_INVALID; ex.ev_name = sig->name; - v_event[i].siglock->writerOut(); - //TODO: since event callback not called for this attribute, need to manually trigger do_alarm to update interlan structures ? + sig->siglock->readerOut(); + veclock.readerOut(); + //TODO: since event callback not called for this attribute, need to manually trigger do_alarm to update internal structures ? ex.type = TYPE_TANGO_ERR; ex.msg=o.str(); - static_cast<AlarmHandler_ns::AlarmHandler *>(mydev)->do_alarm(ex); + try + {//DevFailed for push events + static_cast<AlarmHandler_ns::AlarmHandler *>(mydev)->do_alarm(ex); + } catch(Tango::DevFailed & ee) + { + WARN_STREAM << "event_table::"<<__func__<<": " << sig_name << " - EXCEPTION PUSHING EVENTS: " << ee.errors[0].desc << endl; + } continue; } } sig->event_cb = new EventCallBack(static_cast<AlarmHandler_ns::AlarmHandler *>(mydev)); sig->first = true; sig->first_err = true; - DEBUG_STREAM << "event_table::"<<__func__<<":Subscribing for " << sig->name << " " << (sig->first ? "FIRST" : "NOT FIRST") << endl; - sig->siglock->writerOut(); + DEBUG_STREAM << "event_table::"<<__func__<<":Subscribing for " << sig_name << " " << (sig->first ? "FIRST" : "NOT FIRST") << endl; + sig->siglock->readerOut(); int event_id = SUB_ERR; bool isZMQ = true; bool err = false; @@ -1004,11 +1014,11 @@ void event_table::subscribe_events() DEBUG_STREAM << sig->name << " Subscribed" << endl;*/ // Check event source ZMQ/Notifd ? - Tango::ZmqEventConsumer *consumer = - Tango::ApiUtil::instance()->get_zmq_event_consumer(); - isZMQ = (consumer->get_event_system_for_event_id(event_id) == Tango::ZMQ); + /*Tango::ZmqEventConsumer *consumer = + Tango::ApiUtil::instance()->get_zmq_event_consumer();*/ + isZMQ = true;//(consumer->get_event_system_for_event_id(event_id) == Tango::ZMQ);//TODO: remove - DEBUG_STREAM << sig->name << "(id="<< event_id <<"): Subscribed " << ((isZMQ)? "ZMQ Event" : "NOTIFD Event") << endl; + DEBUG_STREAM << sig_name << "(id="<< event_id <<"): Subscribed " << ((isZMQ)? "ZMQ Event" : "NOTIFD Event") << endl; } catch (Tango::DevFailed &e) { @@ -1021,11 +1031,11 @@ void event_table::subscribe_events() bei_t ex; ostringstream o; o << "Event exception for'" \ - << sig->name << "' error=" << ex_desc; + << sig_name << "' error=" << ex_desc; INFO_STREAM <<"event_table::"<<__func__<<": sig->attr->subscribe_event: " << o.str() << endl; err = true; Tango::Except::print_exception(e); - sig->siglock->writerIn(); + //sig->siglock->writerIn(); //not yet subscribed, no one can modify sig->ex_reason = ex.ex_reason = ex_reason; sig->ex_desc = ex.ex_desc = ex_desc; sig->ex_origin = ex.ex_origin = ex_origin; @@ -1034,26 +1044,34 @@ void event_table::subscribe_events() sig->ts = ex.ts = gettime(); sig->quality = ex.quality = Tango::ATTR_INVALID; ex.ev_name = sig->name; - sig->siglock->writerOut(); + //sig->siglock->writerOut();//not yet subscribed, no one can modify + veclock.readerOut(); //since event callback not called for this attribute, need to manually trigger do_alarm to update interlan structures ex.type = TYPE_TANGO_ERR; ex.msg=o.str(); - static_cast<AlarmHandler_ns::AlarmHandler *>(mydev)->do_alarm(ex); + try + {//DevFailed for push events + static_cast<AlarmHandler_ns::AlarmHandler *>(mydev)->do_alarm(ex); + } catch(Tango::DevFailed & ee) + { + WARN_STREAM << "event_table::"<<__func__<<": " << ex.ev_name << " - EXCEPTION PUSHING EVENTS: " << ee.errors[0].desc << endl; + } + continue; } if(!err) { - sig->siglock->writerIn(); + //sig->siglock->writerIn(); //nobody else write event_id and isZMQ sig->event_id = event_id; sig->isZMQ = isZMQ; - sig->siglock->writerOut(); + //sig->siglock->writerOut();//nobody else write event_id and isZMQ } } else { - sig->siglock->writerOut(); + sig->siglock->readerOut(); } + veclock.readerOut(); } - veclock.readerOut(); initialized = true; } @@ -1230,15 +1248,34 @@ int event_table::nb_sig_to_subscribe() //============================================================================= void event_table::put_signal_property() { - DEBUG_STREAM << "event_table::"<<__func__<<": put_signal_property entering action=" << action << endl; - //ReaderLock lock(veclock); - if (action>NOTHING) + int act=action.load(); + DEBUG_STREAM << "event_table::"<<__func__<<": entering action=" << act << endl; + + if (act>NOTHING) { static_cast<AlarmHandler_ns::AlarmHandler *>(mydev)->put_signal_property(); - if(action >= UPDATE_PROP) - action--; + int expected=UPDATE_PROP; + action.compare_exchange_strong(expected, NOTHING); //if it is UPDATE_PROP, then change to NOTHING + } + DEBUG_STREAM << "event_table::"<<__func__<<": exiting action=" << action.load() << endl; +} +//============================================================================= +/** + * build a list of signal to set HDB device property + */ +//============================================================================= +void event_table::check_signal_property() +{ + int act=action.load(); + DEBUG_STREAM << "event_table::"<<__func__<<": entering action=" << act << endl; + if (act>NOTHING) + return; + if (static_cast<AlarmHandler_ns::AlarmHandler *>(mydev)->check_signal_property()) + { + int expected=NOTHING; + action.compare_exchange_strong(expected, UPDATE_PROP); //if it is NOTHING, then change to UPDATE_PROP } - DEBUG_STREAM << "event_table::"<<__func__<<": put_signal_property exiting action=" << action << endl; + DEBUG_STREAM << "event_table::"<<__func__<<": exiting action=" << action.load() << endl; } diff --git a/src/event_table.h b/src/event_table.h index 4e5988e..5d39215 100644 --- a/src/event_table.h +++ b/src/event_table.h @@ -19,7 +19,7 @@ #include <iostream> #include <string> #include <map> - +#include <atomic> #include <tango.h> @@ -29,8 +29,8 @@ using namespace std; #define TYPE_TANGO_ERR -2 #define TYPE_GENERIC_ERR -3 #define SUB_ERR -1 -#define NOTHING 0 -#define UPDATE_PROP 1 +constexpr int NOTHING = 0; +constexpr int UPDATE_PROP = 1; class alarm_list { public: @@ -185,6 +185,7 @@ class event_table : public Tango::TangoMonitor, public Tango::LogAdapter { * build a list of signal to set HDB device property */ void put_signal_property(); + void check_signal_property(); bool is_initialized(); bool get_if_stop(); void stop_thread(); @@ -192,7 +193,7 @@ class event_table : public Tango::TangoMonitor, public Tango::LogAdapter { ReadersWritersLock veclock; bool stop_it; bool initialized; - int action; + atomic_int action; private: Tango::DeviceImpl *mydev; }; /* class event_table */ diff --git a/src/update-thread.cpp b/src/update-thread.cpp index 1249e5f..9738e1c 100644 --- a/src/update-thread.cpp +++ b/src/update-thread.cpp @@ -36,7 +36,7 @@ update_thread::update_thread(AlarmHandler_ns::AlarmHandler *p) : p_Alarm(p),Tang */ update_thread::~update_thread() { - DEBUG_STREAM << __func__ << "update_thread::run(): exiting!" << endl; + DEBUG_STREAM << __func__ << "update_thread::~update_thread(): entering!" << endl; p_Alarm = NULL; } @@ -45,6 +45,7 @@ update_thread::~update_thread() */ void update_thread::run(void *) { + DEBUG_STREAM << __func__ << "update_thread::run(): entering!" << endl; //printf("update_thread::run(): running...\n"); unsigned int pausasec, pausanano; pausasec=1; -- GitLab