Skip to content
Snippets Groups Projects
event_table.cpp 44.5 KiB
Newer Older
			bool	isZMQ = true;
			bool	err = false;

			try
			{
				event_id = sig->attr->subscribe_event(
												Tango::CHANGE_EVENT,
												sig->event_cb,
												/*stateless=*/false);
				/*sig->evstate  = Tango::ON;
				//sig->first  = false;	//first event already arrived at subscribe_event
				sig->status.clear();
				sig->status = "Subscribed";
				DEBUG_STREAM << sig->name <<  "  Subscribed" << endl;*/

				//	Check event source  ZMQ/Notifd ?
				/*Tango::ZmqEventConsumer	*consumer =
						Tango::ApiUtil::instance()->get_zmq_event_consumer();*/
				isZMQ = true;//(consumer->get_event_system_for_event_id(event_id) == Tango::ZMQ);//TODO: remove
				DEBUG_STREAM << sig_name << "(id="<< event_id <<"):	Subscribed " << ((isZMQ)? "ZMQ Event" : "NOTIFD Event") << endl;
			}
			catch (Tango::DevFailed &e)
			{
				string ex_reason(e.errors[0].reason);
				ex_reason = std::regex_replace(ex_reason, std::regex(R"((\n)|(\r)|(\t)|(\0))"), " "); //match raw string "\n" or "\t" and replace with " "
				string ex_desc(e.errors[0].desc);
				ex_desc = std::regex_replace(ex_desc, std::regex(R"((\n)|(\r)|(\t)|(\0))"), " "); //match raw string "\n" or "\t" and replace with " "
				string ex_origin(e.errors[0].origin);
				ex_origin = std::regex_replace(ex_origin, std::regex(R"((\n)|(\r)|(\t)|(\0))"), " "); //match raw string "\n" or "\t" and replace with " "
				bei_t ex;
				ostringstream o;
				o << "Event exception for'" \
					<< sig_name << "' error=" << ex_desc;
				INFO_STREAM <<"event_table::"<<__func__<<": sig->attr->subscribe_event: " << o.str() << endl;
				err = true;
				Tango::Except::print_exception(e);
				//sig->siglock->writerIn(); //not yet subscribed, no one can modify
				sig->ex_reason = ex.ex_reason = ex_reason;
				sig->ex_desc = ex.ex_desc = ex_desc;
				sig->ex_origin = ex.ex_origin = ex_origin;
				sig->event_id = SUB_ERR;
				delete sig->event_cb;
				sig->ts = ex.ts = gettime();
				sig->quality = ex.quality = Tango::ATTR_INVALID;
				ex.ev_name = sig->name;
				//sig->siglock->writerOut();//not yet subscribed, no one can modify
				veclock.readerOut();
				//since event callback not called for this attribute, need to manually trigger do_alarm to update interlan structures
				ex.type = TYPE_TANGO_ERR;
				ex.msg=o.str();
				try
				{//DevFailed for push events
					static_cast<AlarmHandler_ns::AlarmHandler *>(mydev)->do_alarm(ex);
				} catch(Tango::DevFailed & ee)
				{
					WARN_STREAM << "event_table::"<<__func__<<": " << ex.ev_name << " - EXCEPTION PUSHING EVENTS: " << ee.errors[0].desc << endl;
				}
				continue;
				//sig->siglock->writerIn(); //nobody else write event_id and isZMQ
				sig->event_id = event_id;
				sig->isZMQ = isZMQ;
				//sig->siglock->writerOut();//nobody else write event_id and isZMQ
			sig->siglock->readerOut();
		veclock.readerOut();
	}
	initialized = true;
}

void event_table::start(string &signame)
{
	DEBUG_STREAM << "event_table::"<<__func__<< " entering signame=" << signame << endl;
	ReaderLock lock(veclock);
	vector<string> contexts;
	for (unsigned int i=0 ; i<v_event.size() ; i++)
	{
		if (v_event[i].name==signame)
		{
			v_event[i].siglock->writerIn();
			if(!v_event[i].running)
			{
				if(v_event[i].stopped)
				{
					static_cast<AlarmHandler_ns::AlarmHandler *>(mydev)->attr_AttributeStoppedNumber_read--;
					try
					{
						add(signame, contexts, NOTHING, true);
					}
					catch (Tango::DevFailed &e)
					{
						//Tango::Except::print_exception(e);
						INFO_STREAM << "event_table::start: error adding  " << signame <<" err="<< e.errors[0].desc << endl;
						v_event[i].ex_reason = e.errors[0].reason;
						v_event[i].ex_desc = e.errors[0].desc;
						v_event[i].ex_origin = e.errors[0].origin;
						/*v_event[i].siglock->writerOut();
						return;*/
					}
				}
				v_event[i].running=true;
				if(v_event[i].paused)
					static_cast<AlarmHandler_ns::AlarmHandler *>(mydev)->attr_AttributePausedNumber_read--;
				static_cast<AlarmHandler_ns::AlarmHandler *>(mydev)->attr_AttributeStartedNumber_read++;
				v_event[i].paused=false;
				v_event[i].stopped=false;
			}
			v_event[i].siglock->writerOut();
			return;
		}
	}
	for (unsigned int i=0 ; i<v_event.size() ; i++)
	{
#ifndef _MULTI_TANGO_HOST
		if (static_cast<AlarmHandler_ns::AlarmHandler *>(mydev)->compare_without_domain(v_event[i].name,signame))
		if (!static_cast<AlarmHandler_ns::AlarmHandler *>(mydev)->compare_tango_names(v_event[i].name,signame))
#endif
		{
			v_event[i].siglock->writerIn();
			if(!v_event[i].running)
			{
				if(v_event[i].stopped)
				{
					static_cast<AlarmHandler_ns::AlarmHandler *>(mydev)->attr_AttributeStoppedNumber_read--;
					try
					{
						add(signame, contexts, NOTHING, true);
					}
					catch (Tango::DevFailed &e)
					{
						//Tango::Except::print_exception(e);
						INFO_STREAM << "event_table::start: error adding  " << signame << endl;
						v_event[i].ex_reason = e.errors[0].reason;
						v_event[i].ex_desc = e.errors[0].desc;
						v_event[i].ex_origin = e.errors[0].origin;
						/*signals[i].siglock->writerOut();
						return;*/
					}
				}
				v_event[i].running=true;
				if(v_event[i].paused)
					static_cast<AlarmHandler_ns::AlarmHandler *>(mydev)->attr_AttributePausedNumber_read--;
				static_cast<AlarmHandler_ns::AlarmHandler *>(mydev)->attr_AttributeStartedNumber_read++;
				v_event[i].paused=false;
				v_event[i].stopped=false;
			}
			v_event[i].siglock->writerOut();
			return;
		}
	}

	//	if not found
	Tango::Except::throw_exception(
				(const char *)"BadSignalName",
				"Signal " + signame + " NOT subscribed",
				(const char *)"event_table::start()");
}

void event_table::start_all()
{
	ReaderLock lock(veclock);
	vector<string> contexts;
	for (unsigned int i=0 ; i<v_event.size() ; i++)
	{
		v_event[i].siglock->writerIn();
		if(!v_event[i].running)
		{
			if(v_event[i].stopped)
			{
				string signame = v_event[i].name;
				static_cast<AlarmHandler_ns::AlarmHandler *>(mydev)->attr_AttributeStoppedNumber_read--;
				try
				{
					add(signame, contexts, NOTHING, true);
				}
				catch (Tango::DevFailed &e)
				{
					//Tango::Except::print_exception(e);
					INFO_STREAM << "event_table::start: error adding  " << signame <<" err="<< e.errors[0].desc << endl;
					v_event[i].ex_reason = e.errors[0].reason;
					v_event[i].ex_desc = e.errors[0].desc;
					v_event[i].ex_origin = e.errors[0].origin;
					/*v_event[i].siglock->writerOut();
					return;*/
				}
			}
			v_event[i].running=true;
			if(v_event[i].paused)
				static_cast<AlarmHandler_ns::AlarmHandler *>(mydev)->attr_AttributePausedNumber_read--;
			static_cast<AlarmHandler_ns::AlarmHandler *>(mydev)->attr_AttributeStartedNumber_read++;
			v_event[i].paused=false;
			v_event[i].stopped=false;
		}
		v_event[i].siglock->writerOut();
	}
}

//=============================================================================
//=============================================================================
bool event_table::get_if_stop()
{
	//omni_mutex_lock sync(*this);
	return stop_it;
}
//=============================================================================
//=============================================================================
void event_table::stop_thread()
{
	//omni_mutex_lock sync(*this);
	stop_it = true;
	signal();
	//condition.signal();
}
//=============================================================================
/**
 *	return number of signals to be subscribed
 */
//=============================================================================
int event_table::nb_sig_to_subscribe()
{
	ReaderLock lock(veclock);

	int	nb = 0;
	for (unsigned int i=0 ; i<v_event.size() ; i++)
	{
		v_event[i].siglock->readerIn();
		if (v_event[i].event_id == SUB_ERR && !v_event[i].stopped)
		{
			nb++;
		}
		v_event[i].siglock->readerOut();
	}
	return nb;
}
//=============================================================================
/**
 *	build a list of signal to set HDB device property
 */
//=============================================================================
void event_table::put_signal_property()
{
	int act=action.load();
	DEBUG_STREAM << "event_table::"<<__func__<<": entering action=" << act << endl;

	if (act>NOTHING)
		static_cast<AlarmHandler_ns::AlarmHandler *>(mydev)->put_signal_property();
		int expected=UPDATE_PROP;
		action.compare_exchange_strong(expected, NOTHING); //if it is UPDATE_PROP, then change to NOTHING
	}
	DEBUG_STREAM << "event_table::"<<__func__<<": exiting action=" << action.load() << endl;
}
//=============================================================================
/**
 *	build a list of signal to set HDB device property
 */
//=============================================================================
void event_table::check_signal_property()
{
	int act=action.load();
	DEBUG_STREAM << "event_table::"<<__func__<<": entering action=" << act << endl;
	if (act>NOTHING)
		return;
	if (static_cast<AlarmHandler_ns::AlarmHandler *>(mydev)->check_signal_property())
	{
		int expected=NOTHING;
		action.compare_exchange_strong(expected, UPDATE_PROP); //if it is NOTHING, then change to UPDATE_PROP
	DEBUG_STREAM << "event_table::"<<__func__<<": exiting action=" << action.load() << endl;
EventCallBack::EventCallBack(Tango::DeviceImpl *s):Tango::LogAdapter(s)
	//e_ptr = NULL;
	mydev = s;
	//e_ptr = NULL;
}

void EventCallBack::push_event(Tango::EventData* ev)
{
	string temp_name;	
	bei_t e;
	e.ex_reason = string("");
	e.ex_desc = string("");
	e.ex_origin = string("");
		//e.errors = ev->errors;
		e.quality = Tango::ATTR_VALID;
		//cout << "EVENT="<<ev->attr_name<<" quality="<<e.quality<<endl;
			e.quality = (int)ev->attr_value->get_quality();
#if 0//TANGO_VER >= 711
 			string ev_name_str(ev->attr_name);
 			string::size_type pos = ev_name_str.find("tango://");
 			if (pos != string::npos)
 			{
 				pos = ev_name_str.find('/',8);
 				ev_name_str = ev_name_str.substr(pos + 1);
 			}
 			e.ev_name = ev_name_str.c_str();
#else			
			e.ev_name = ev->attr_name;
#endif
			e.ts = ev->attr_value->time;
			extract_values(ev->attr_value, e.value, e.value_string, e.type, e.read_size);
			e.quality = Tango::ATTR_INVALID;
#if 0//TANGO_VER >= 711
 			string ev_name_str(ev->attr_name);
 			string::size_type pos = ev_name_str.find("tango://");
 			if (pos != string::npos)
 			{
 				pos = ev_name_str.find('/',8);
 				ev_name_str = ev_name_str.substr(pos + 1);
 			}
 			temp_name = ev_name_str.c_str() + string(".") + ev->event;
#else
			temp_name = ev->attr_name + string(".") + ev->event;		//TODO: BUG IN TANGO: part of attr_name after first dot continues in field event
#endif
			size_t pos_change = temp_name.find(".change");
			if(pos_change != string::npos)
			{
				temp_name = temp_name.substr(0,pos_change);
			}
			ostringstream o;
			o << "Tango error for '" << temp_name << "'=" << ev->errors[0].desc.in();			
			e.ev_name = temp_name;
			e.type = TYPE_TANGO_ERR;
			//e.ev_name = INTERNAL_ERROR;
			//e.type = -1;
			e.msg = o.str();
			e.msg = std::regex_replace(e.msg, std::regex(R"((\n)|(\r)|(\t)|(\0))"), " "); //match raw string "\n" or "\t" and replace with " "
		}
	} 
	catch (string &err) {
		e.msg = err + " for event '" + ev->attr_name + "'";
		e.ev_name = ev->attr_name;
		e.type = TYPE_GENERIC_ERR;
		//e.value.i = 0;
		e.ts = gettime();
		//cerr << o.str() << endl;		
	} catch(Tango::DevFailed& Terr)
	{
		ostringstream o;
		o << "Event exception for'" \
			<< ev->attr_name << "' error=" << Terr.errors[0].desc;
		e.ev_name = ev->attr_name;
		e.type = TYPE_GENERIC_ERR;
		//e.value.i = 0;
		e.ts = gettime();
		e.msg = o.str();
		e.msg = std::regex_replace(e.msg, std::regex(R"((\n)|(\r)|(\t)|(\0))"), " "); //match raw string "\n" or "\t" and replace with " "
	}	
	catch (...) {
		ostringstream o;
		o << "Generic Event exception for'" \
			<< ev->attr_name << "'";
		e.ev_name = ev->attr_name;
		e.type = TYPE_GENERIC_ERR;
		//e.value.i = 0;
		e.ts = gettime();
		e.msg = o.str();
		//cerr << o.str() << endl;		
	}
	static_cast<AlarmHandler_ns::AlarmHandler *>(mydev)->evlist.push_back(e);
void EventCallBack::extract_values(Tango::DeviceAttribute *attr_value, vector<double> &val, string &val_string, int &type, int &read_size)
{
	Tango::DevState stval;
	vector<Tango::DevState> v_st;
	vector<Tango::DevULong> v_ulo;
	vector<Tango::DevUChar> v_uch;
	vector<Tango::DevShort> v_sh;
	vector<Tango::DevUShort> v_ush;
	vector<Tango::DevLong> v_lo;
	vector<Tango::DevDouble> v_do;
	vector<Tango::DevFloat> v_fl;
	vector<Tango::DevBoolean> v_bo;
	vector<Tango::DevLong64> v_lo64;
	vector<Tango::DevULong64> v_ulo64;
Graziano Scalamera's avatar
Graziano Scalamera committed
	vector<Tango::DevEnum> v_enum;
	vector<string> v_string;
	val_string = string("");
	//Tango::AttributeDimension attr_w_dim;
	Tango::AttributeDimension attr_r_dim;

        //attr_value->reset_exceptions(Tango::DeviceAttribute::isempty_flag); //disable is_empty exception //commented to throw exceptions if empty
        if(!attr_value->is_empty())
        {
                //attr_w_dim = data->attr_value->get_w_dimension();
                attr_r_dim = attr_value->get_r_dimension();
        }
        else
        {
                attr_r_dim.dim_x = 0;
                //attr_w_dim.dim_x = 0;
                attr_r_dim.dim_y = 0;
                //attr_w_dim.dim_y = 0;
        }
        read_size = attr_r_dim.dim_x;
        if(attr_r_dim.dim_y > 1)
                read_size *= attr_r_dim.dim_y;

	if (attr_value->get_type() == Tango::DEV_UCHAR) {
		*(attr_value) >> v_uch;
		for(vector<Tango::DevUChar>::iterator it = v_uch.begin(); it != v_uch.end(); it++)
			val.push_back((double)(*it));		//convert all to double
		type = Tango::DEV_UCHAR;		
	} else if (attr_value->get_type() == Tango::DEV_SHORT) {
		*(attr_value) >> v_sh;
		for(vector<Tango::DevShort>::iterator  it = v_sh.begin(); it != v_sh.end(); it++)
			val.push_back((double)(*it));		//convert all to double				
		type = Tango::DEV_SHORT;
	} else if (attr_value->get_type() == Tango::DEV_USHORT) {
		*(attr_value) >> v_ush;
		for(vector<Tango::DevUShort>::iterator  it = v_ush.begin(); it != v_ush.end(); it++)
			val.push_back((double)(*it));		//convert all to double						
		type = Tango::DEV_USHORT;			
	} else if (attr_value->get_type() == Tango::DEV_LONG) {
		*(attr_value) >> v_lo;
		for(vector<Tango::DevLong>::iterator  it = v_lo.begin(); it != v_lo.end(); it++)
			val.push_back((double)(*it));		//convert all to double						
		type = Tango::DEV_LONG;
	} else if (attr_value->get_type() == Tango::DEV_STATE) {
		//*(attr_value) >> v_st;		//doesn't work in tango 5
		*(attr_value) >> stval;
		v_st.push_back(stval);
		for(vector<Tango::DevState>::iterator it = v_st.begin(); it != v_st.end(); it++)
			val.push_back((double)(*it));		//convert all to double
		type = Tango::DEV_STATE;
#if 1//TANGO_VER >= 600
	} else if (attr_value->get_type() == Tango::DEV_ULONG) {
		*(attr_value) >> v_ulo;
		for(vector<Tango::DevULong>::iterator  it = v_ulo.begin(); it != v_ulo.end(); it++)
			val.push_back((double)(*it));		//convert all to double						
		type = Tango::DEV_ULONG;
#endif  //TANGO_VER >= 600								
	} else if (attr_value->get_type() == Tango::DEV_DOUBLE) {
		*(attr_value) >> v_do;
		for(vector<Tango::DevDouble>::iterator  it = v_do.begin(); it != v_do.end(); it++)
			val.push_back((double)(*it));		//convert all to double						
		type = Tango::DEV_DOUBLE;
	} else if (attr_value->get_type() == Tango::DEV_FLOAT) {
		*(attr_value) >> v_fl;
		for(vector<Tango::DevFloat>::iterator  it = v_fl.begin(); it != v_fl.end(); it++)
			val.push_back((double)(*it));		//convert all to double						
		type = Tango::DEV_FLOAT;
	} else if (attr_value->get_type() == Tango::DEV_BOOLEAN) {
		*(attr_value) >> v_bo;
		for(vector<Tango::DevBoolean>::iterator  it = v_bo.begin(); it != v_bo.end(); it++)
			val.push_back((double)(*it));		//convert all to double		
		type = Tango::DEV_BOOLEAN;
	} else if (attr_value->get_type() == Tango::DEV_LONG64) {
		*(attr_value) >> v_lo64;
		for(vector<Tango::DevLong64>::iterator  it = v_lo64.begin(); it != v_lo64.end(); it++)
			val.push_back((double)(*it));		//convert all to double
		type = Tango::DEV_LONG64;
	} else if (attr_value->get_type() == Tango::DEV_ULONG64) {
		*(attr_value) >> v_ulo64;
		for(vector<Tango::DevULong64>::iterator  it = v_ulo64.begin(); it != v_ulo64.end(); it++)
			val.push_back((double)(*it));		//convert all to double
		type = Tango::DEV_ULONG64;
Graziano Scalamera's avatar
Graziano Scalamera committed
	} else if (attr_value->get_type() == Tango::DEV_ENUM) {
		*(attr_value) >> v_enum;
		for(vector<Tango::DevEnum>::iterator  it = v_enum.begin(); it != v_enum.end(); it++)
			val.push_back((double)(*it));		//convert all to double
		type = Tango::DEV_ENUM;
	} else if (attr_value->get_type() == Tango::DEV_STRING) {
		*(attr_value) >> v_string;
		val_string = *(v_string.begin());	//TODO: support string spectrum attrbutes
		type = Tango::DEV_STRING;
		o << "unknown type";
/*void EventCallBack::init(event_list* e)


Tango::TimeVal gettime(void)
{
	struct timeval tv;
	struct timezone tz;
	Tango::TimeVal t;
	
	gettimeofday(&tv, &tz);
	t.tv_sec = tv.tv_sec;
	t.tv_usec = tv.tv_usec;
	t.tv_nsec = 0;
	return t;
}

/* EOF */