Skip to content
Snippets Groups Projects
Commit ea99c75f authored by Graziano Scalamera's avatar Graziano Scalamera
Browse files

event subscription moved to thread

parent 00d6acd9
No related branches found
No related tags found
No related merge requests found
This diff is collapsed.
...@@ -49,10 +49,11 @@ ...@@ -49,10 +49,11 @@
#include "alarm_table.h" #include "alarm_table.h"
#include "event_table.h" #include "event_table.h"
#include "SubscribeThread.h"
#define MAX_ALARMS 1024 #define MAX_ALARMS 1024
#define _USE_ELETTRA_DB_RW //#define _USE_ELETTRA_DB_RW
//using namespace Tango; //using namespace Tango;
...@@ -60,6 +61,12 @@ class alarm_thread; ...@@ -60,6 +61,12 @@ class alarm_thread;
class log_thread; class log_thread;
class update_thread; class update_thread;
# define DECLARE_TIME_VAR struct timeval
# define GET_TIME(t) gettimeofday(&t, NULL);
# define ELAPSED(before, after) \
1000.0*(after.tv_sec-before.tv_sec) + \
((double)after.tv_usec-before.tv_usec) / 1000
/*----- PROTECTED REGION END -----*/ // Alarm.h /*----- PROTECTED REGION END -----*/ // Alarm.h
...@@ -84,7 +91,13 @@ class Alarm : public TANGO_BASE_CLASS ...@@ -84,7 +91,13 @@ class Alarm : public TANGO_BASE_CLASS
// Add your own data members // Add your own data members
public: public:
bool compare_without_domain(string str1, string str2);
string remove_domain(string str);
//TODO: real attributes
Tango::DevLong attr_AttributeStartedNumber_read;
Tango::DevLong attr_AttributePausedNumber_read;
Tango::DevLong attr_AttributeStoppedNumber_read;
Tango::DevLong attr_AttributeNumber_read;
/*----- PROTECTED REGION END -----*/ // Alarm::Data Members /*----- PROTECTED REGION END -----*/ // Alarm::Data Members
...@@ -108,6 +121,8 @@ public: ...@@ -108,6 +121,8 @@ public:
string dbPort; string dbPort;
// InstanceName: Name used to associate configured alarm rules to this instance // InstanceName: Name used to associate configured alarm rules to this instance
string instanceName; string instanceName;
// SubscribeRetryPeriod: retry period in seconds
Tango::DevLong subscribeRetryPeriod;
// Attribute data members // Attribute data members
public: public:
...@@ -283,6 +298,8 @@ public: ...@@ -283,6 +298,8 @@ public:
// Additional Method prototypes // Additional Method prototypes
friend class alarm_thread; friend class alarm_thread;
friend class SubscribeThread;
friend class event_table;
protected : protected :
...@@ -291,7 +308,7 @@ private: ...@@ -291,7 +308,7 @@ private:
alarm_table alarms; alarm_table alarms;
event_table* events; event_table* events;
// event_list evlist; /* producer/consumer events list */ //gcc 4 problem?? // event_list evlist; /* producer/consumer events list */ //gcc 4 problem??
EventCallBack ecb; /* callback handles */ // EventCallBack ecb; /* callback handles */
alarm_thread *almloop; alarm_thread *almloop;
update_thread *updateloop; update_thread *updateloop;
vector<alarm_t> alarmed; vector<alarm_t> alarmed;
...@@ -299,6 +316,7 @@ private: ...@@ -299,6 +316,7 @@ private:
vector<alarm_t> internal; vector<alarm_t> internal;
ReadersWritersLock *internallock; ReadersWritersLock *internallock;
ReadersWritersLock *dslock; ReadersWritersLock *dslock;
int period; //subscribe thread period
static int instanceCounter; static int instanceCounter;
...@@ -308,10 +326,14 @@ private: ...@@ -308,10 +326,14 @@ private:
char dss[MAX_ALARMS][10124]; char dss[MAX_ALARMS][10124];
void init_events(vector<string> &evn); void init_events(vector<string> &evn);
#if 0
void init_alarms(map< string,vector<string> > &alarm_events); void init_alarms(map< string,vector<string> > &alarm_events);
#endif
void add_alarm(alarm_t& a) throw(string&); void add_alarm(alarm_t& a) throw(string&);
void add_event(alarm_t& a, vector<string> &evn) throw(string&); void add_event(alarm_t& a, vector<string> &evn) throw(string&);
#if 0
void subscribe_event(alarm_t& a, EventCallBack& ecb, vector<string> &evn) throw(string&); void subscribe_event(alarm_t& a, EventCallBack& ecb, vector<string> &evn) throw(string&);
#endif
// void do_alarm(bei_t& e); //gcc 4 problem?? // void do_alarm(bei_t& e); //gcc 4 problem??
bool remove_alarm(string& s) throw(string&); bool remove_alarm(string& s) throw(string&);
//void add_to_database(alarm_t& a) throw(string&); //void add_to_database(alarm_t& a) throw(string&);
...@@ -327,7 +349,10 @@ private: ...@@ -327,7 +349,10 @@ private:
void prepare_alarm_attr(); //for read attribute alarm and push_change_event void prepare_alarm_attr(); //for read attribute alarm and push_change_event
SubscribeThread *thread;
public: public:
void put_signal_property();
void do_alarm(bei_t& e); //public instead of protected for gcc 4 problem?? void do_alarm(bei_t& e); //public instead of protected for gcc 4 problem??
void timer_update(); //public instead of protected for gcc 4 problem?? void timer_update(); //public instead of protected for gcc 4 problem??
event_list evlist; /* producer/consumer events list */ //public instead of protected for gcc 4 problem?? event_list evlist; /* producer/consumer events list */ //public instead of protected for gcc 4 problem??
......
<?xml version="1.0" encoding="ASCII"?> <?xml version="1.0" encoding="ASCII"?>
<pogoDsl:PogoSystem xmi:version="2.0" xmlns:xmi="http://www.omg.org/XMI" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:pogoDsl="http://www.esrf.fr/tango/pogo/PogoDsl"> <pogoDsl:PogoSystem xmi:version="2.0" xmlns:xmi="http://www.omg.org/XMI" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:pogoDsl="http://www.esrf.fr/tango/pogo/PogoDsl">
<classes name="Alarm" pogoRevision="9.1"> <classes name="Alarm" pogoRevision="9.1">
<description description="Elettra alarm device server" title="Elettra alarm device server" sourcePath="/home/graziano/workspace/git/alarm/src" language="Cpp" filestogenerate="XMI file,Code files,Protected Regions,Eclipse Project" license="GPL" copyright="" hasMandatoryProperty="false" hasConcreteProperty="true" hasAbstractCommand="false" hasAbstractAttribute="false"> <description description="Elettra alarm device server" title="Elettra alarm device server" sourcePath="/home/graziano/workspace/git/alarm/src" language="Cpp" filestogenerate="XMI file,Code files,Protected Regions" license="GPL" copyright="" hasMandatoryProperty="false" hasConcreteProperty="true" hasAbstractCommand="false" hasAbstractAttribute="false">
<inheritances classname="Device_4Impl" sourcePath=""/> <inheritances classname="Device_4Impl" sourcePath=""/>
<identification contact="at elettra.eu - graziano.scalamera" author="graziano.scalamera" emailDomain="elettra.eu" classFamily="SoftwareSystem" siteSpecific="" platform="Unix Like" bus="Not Applicable" manufacturer="" reference=""/> <identification contact="at elettra.eu - graziano.scalamera" author="graziano.scalamera" emailDomain="elettra.eu" classFamily="SoftwareSystem" siteSpecific="" platform="Unix Like" bus="Not Applicable" manufacturer="" reference=""/>
</description> </description>
...@@ -41,6 +41,10 @@ ...@@ -41,6 +41,10 @@
<type xsi:type="pogoDsl:StringType"/> <type xsi:type="pogoDsl:StringType"/>
<status abstract="false" inherited="false" concrete="true" concreteHere="true"/> <status abstract="false" inherited="false" concrete="true" concreteHere="true"/>
</deviceProperties> </deviceProperties>
<deviceProperties name="SubscribeRetryPeriod" description="retry period in seconds">
<type xsi:type="pogoDsl:IntType"/>
<status abstract="false" inherited="false" concrete="true" concreteHere="true"/>
</deviceProperties>
<commands name="State" description="This command gets the device state (stored in its &lt;i>device_state&lt;/i> data member) and returns it to the caller." execMethod="dev_state" displayLevel="OPERATOR" polledPeriod="0"> <commands name="State" description="This command gets the device state (stored in its &lt;i>device_state&lt;/i> data member) and returns it to the caller." execMethod="dev_state" displayLevel="OPERATOR" polledPeriod="0">
<argin description="none."> <argin description="none.">
<type xsi:type="pogoDsl:VoidType"/> <type xsi:type="pogoDsl:VoidType"/>
...@@ -137,6 +141,6 @@ ...@@ -137,6 +141,6 @@
<status abstract="false" inherited="false" concrete="true" concreteHere="true"/> <status abstract="false" inherited="false" concrete="true" concreteHere="true"/>
<properties description="" label="" unit="" standardUnit="" displayUnit="" format="" maxValue="" minValue="" maxAlarm="" minAlarm="" maxWarning="" minWarning="" deltaTime="" deltaValue=""/> <properties description="" label="" unit="" standardUnit="" displayUnit="" format="" maxValue="" minValue="" maxAlarm="" minAlarm="" maxWarning="" minWarning="" deltaTime="" deltaValue=""/>
</dynamicAttributes> </dynamicAttributes>
<preferences docHome="./doc_html" makefileHome="/usr/local/tango-8.1.2.c/share/pogo/preferences"/> <preferences docHome="./doc_html" makefileHome="/usr/local/tango-9.2.2/share/pogo/preferences"/>
</classes> </classes>
</pogoDsl:PogoSystem> </pogoDsl:PogoSystem>
...@@ -483,6 +483,19 @@ void AlarmClass::set_default_property() ...@@ -483,6 +483,19 @@ void AlarmClass::set_default_property()
} }
else else
add_wiz_dev_prop(prop_name, prop_desc); add_wiz_dev_prop(prop_name, prop_desc);
prop_name = "SubscribeRetryPeriod";
prop_desc = "retry period in seconds";
prop_def = "";
vect_data.clear();
if (prop_def.length()>0)
{
Tango::DbDatum data(prop_name);
data << vect_data ;
dev_def_prop.push_back(data);
add_wiz_dev_prop(prop_name, prop_desc, prop_def);
}
else
add_wiz_dev_prop(prop_name, prop_desc);
} }
//-------------------------------------------------------- //--------------------------------------------------------
......
static const char *RcsId = "$Header: /home/cvsadm/cvsroot/fermi/servers/hdb++/hdb++es/src/SubscribeThread.cpp,v 1.6 2014-03-06 15:21:43 graziano Exp $";
//+=============================================================================
//
// file : HdbEventHandler.cpp
//
// description : C++ source for thread management
// project : TANGO Device Server
//
// $Author: graziano $
//
// $Revision: 1.6 $
//
// $Log: SubscribeThread.cpp,v $
// Revision 1.6 2014-03-06 15:21:43 graziano
// StartArchivingAtStartup,
// start_all and stop_all,
// archiving of first event received at subscribe
//
// Revision 1.5 2014-02-20 14:59:02 graziano
// name and path fixing
// removed start acquisition from add
//
// Revision 1.4 2013-09-24 08:42:21 graziano
// bug fixing
//
// Revision 1.3 2013-09-02 12:13:22 graziano
// cleaned
//
// Revision 1.2 2013-08-23 10:04:53 graziano
// development
//
// Revision 1.1 2013-07-17 13:37:43 graziano
// *** empty log message ***
//
//
//
// copyleft : European Synchrotron Radiation Facility
// BP 220, Grenoble 38043
// FRANCE
//
//-=============================================================================
#include "Alarm.h"
#include "event_table.h"
namespace Alarm_ns
{
//=============================================================================
//=============================================================================
SubscribeThread::SubscribeThread(Alarm *dev):Tango::LogAdapter(dev)
{
alarm_dev = dev;
period = 1;
shared = dev->events;
}
//=============================================================================
//=============================================================================
void SubscribeThread::updateProperty()
{
shared->put_signal_property();
}
//=============================================================================
//=============================================================================
void *SubscribeThread::run_undetached(void *ptr)
{
INFO_STREAM << "SubscribeThread id="<<omni_thread::self()->id()<<endl;
while(shared->get_if_stop()==false)
{
// Try to subscribe
DEBUG_STREAM << "SubscribeThread::"<<__func__<<": AWAKE"<<endl;
updateProperty();
alarm_dev->events->subscribe_events();
int nb_to_subscribe = shared->nb_sig_to_subscribe();
// And wait a bit before next time or
// wait a long time if all signals subscribed
{
omni_mutex_lock sync(*shared);
//shared->lock();
if (nb_to_subscribe==0 && shared->action == NOTHING)
{
DEBUG_STREAM << "SubscribeThread::"<<__func__<<": going to wait nb_to_subscribe=0"<<endl;
//shared->condition.wait();
shared->wait();
//shared->wait(3*period*1000);
}
else if(shared->action == NOTHING)
{
DEBUG_STREAM << "SubscribeThread::"<<__func__<<": going to wait period="<<period<<" nb_to_subscribe="<<nb_to_subscribe<<endl;
//unsigned long s,n;
//omni_thread::get_time(&s,&n,period,0);
//shared->condition.timedwait(s,n);
shared->wait(period*1000);
}
//shared->unlock();
}
}
shared->unsubscribe_events();
INFO_STREAM <<"SubscribeThread::"<< __func__<<": exiting..."<<endl;
return NULL;
}
//=============================================================================
//=============================================================================
} // namespace
//=============================================================================
//
// file : HdbEventHandler.h
//
// description : Include for the HDbDevice class.
//
// project : Tango Device Server
//
// $Author: graziano $
//
// $Revision: 1.5 $
//
// $Log: SubscribeThread.h,v $
// Revision 1.5 2014-03-06 15:21:43 graziano
// StartArchivingAtStartup,
// start_all and stop_all,
// archiving of first event received at subscribe
//
// Revision 1.4 2013-09-24 08:42:21 graziano
// bug fixing
//
// Revision 1.3 2013-09-02 12:11:32 graziano
// cleaned
//
// Revision 1.2 2013-08-23 10:04:53 graziano
// development
//
// Revision 1.1 2013-07-17 13:37:43 graziano
// *** empty log message ***
//
//
//
// copyleft : European Synchrotron Radiation Facility
// BP 220, Grenoble 38043
// FRANCE
//
//=============================================================================
#ifndef _SUBSCRIBE_THREAD_H
#define _SUBSCRIBE_THREAD_H
#include <tango.h>
#include <eventconsumer.h>
#include <stdint.h>
#include "event_table.h"
/**
* @author $Author: graziano $
* @version $Revision: 1.5 $
*/
// constants definitions here.
//-----------------------------------------------
namespace Alarm_ns
{
//class ArchiveCB;
class Alarm;
class SubscribeThread;
//=========================================================
/**
* Create a thread retry to subscribe event.
*/
//=========================================================
class SubscribeThread: public omni_thread, public Tango::LogAdapter
{
private:
/**
* Shared data
*/
event_table *shared;
/**
* HdbDevice object
*/
Alarm *alarm_dev;
public:
int period;
SubscribeThread(Alarm *dev);
void updateProperty();
/**
* Execute the thread loop.
* This thread is awaken when a command has been received
* and falled asleep when no command has been received from a long time.
*/
void *run_undetached(void *);
void start() {start_undetached();}
};
} // namespace_ns
#endif // _SUBSCRIBE_THREAD_H
This diff is collapsed.
...@@ -29,6 +29,10 @@ using namespace std; ...@@ -29,6 +29,10 @@ using namespace std;
#define TYPE_TANGO_ERR -2 #define TYPE_TANGO_ERR -2
#define TYPE_GENERIC_ERR -3 #define TYPE_GENERIC_ERR -3
#define SUB_ERR -1
#define NOTHING 0
#define UPDATE_PROP 1
class event; class event;
class event_list; class event_list;
class event_table; class event_table;
...@@ -41,9 +45,9 @@ typedef vector<Tango::DevDouble> value_t; ...@@ -41,9 +45,9 @@ typedef vector<Tango::DevDouble> value_t;
*/ */
class event { class event {
public: public:
string name, /* event name */ string name; /* event name */
device, /* device name */ string devname;
attribute; /* attribute name */ string attname;
value_t value; /* event value */ value_t value; /* event value */
int quality; int quality;
//Tango::DevErrorList errors; //Tango::DevErrorList errors;
...@@ -56,10 +60,25 @@ class event { ...@@ -56,10 +60,25 @@ class event {
err_counter; /* molteplicita' errore */ err_counter; /* molteplicita' errore */
//map<string, string> m_alarm; //map<string, string> m_alarm;
vector<string> m_alarm; vector<string> m_alarm;
bool valid; bool valid; //TODO: old
bool first;//TODO: new
Tango::DeviceProxy *dp; bool first_err;//TODO: new
unsigned int eid; //Tango::DeviceProxy *dp;
Tango::AttributeProxy *attr;
Tango::DevState evstate;
unsigned int event_id;
bool isZMQ;
EventCallBack *event_cb;
bool running;
bool paused;
bool stopped;
uint32_t okev_counter;
uint32_t okev_counter_freq;
timeval last_okev;
uint32_t nokev_counter;
uint32_t nokev_counter_freq;
timeval last_nokev;
timespec last_ev;
vector<string> filter; vector<string> filter;
/* /*
* methods * methods
...@@ -74,6 +93,7 @@ class event { ...@@ -74,6 +93,7 @@ class event {
bool operator==(const event& e); bool operator==(const event& e);
// bool event::operator==(const string& s); //TODO: gcc 4 problem?? // bool event::operator==(const string& s); //TODO: gcc 4 problem??
bool operator==(const string& s); bool operator==(const string& s);
ReadersWritersLock *siglock;
protected: protected:
private: private:
}; };
...@@ -114,36 +134,68 @@ class event_list : public omni_mutex { ...@@ -114,36 +134,68 @@ class event_list : public omni_mutex {
/* /*
* store all the events * store all the events
*/ */
class event_table : public event , Tango::LogAdapter { class event_table : public Tango::TangoMonitor, public Tango::LogAdapter {
public: public:
event_table(Tango::DeviceImpl *s):Tango::LogAdapter(s) {} event_table(Tango::DeviceImpl *s);//:Tango::LogAdapter(s) {mydev = s;}
~event_table(void) {} ~event_table(void) {}
void push_back(event e); //void push_back(event e);
void show(void); void show(void);
unsigned int size(void); unsigned int size(void);
#if 0
void init_proxy(void) throw(vector<string> &); void init_proxy(void) throw(vector<string> &);
void free_proxy(void); void free_proxy(void);
void subscribe(EventCallBack& ecb) throw(vector<string> &);//throw(string&); void subscribe(EventCallBack& ecb) throw(vector<string> &);//throw(string&);
void unsubscribe(void) throw(string&); void unsubscribe(void) throw(string&);
#endif
/**
* Add a new signal.
*/
void add(string &signame, vector<string> contexts);
void add(string &signame, vector<string> contexts, int to_do, bool start);
event *get_signal(string signame);
void stop(string &signame);
void remove(string &signame, bool stop);
void subscribe_events();
void unsubscribe_events();
void start(string &signame);
void start_all();
void update_events(bei_t& e) throw(string&); void update_events(bei_t& e) throw(string&);
void update_property();
/**
* return number of signals to be subscribed
*/
int nb_sig_to_subscribe();
/**
* build a list of signal to set HDB device property
*/
void put_signal_property();
bool is_initialized();
bool get_if_stop();
void stop_thread();
vector<event> v_event; vector<event> v_event;
protected: ReadersWritersLock veclock;
bool stop_it;
bool initialized;
int action;
private: private:
Tango::DeviceImpl *mydev;
}; /* class event_table */ }; /* class event_table */
/* /*
* event callback * event callback
*/ */
class EventCallBack : public Tango::CallBack { class EventCallBack : public Tango::CallBack, public Tango::LogAdapter
{
public: public:
EventCallBack(void); EventCallBack(Tango::DeviceImpl *s);
~EventCallBack(void); ~EventCallBack(void);
void push_event(Tango::EventData* ev); void push_event(Tango::EventData* ev);
void init(event_list* e); //void init(event_list* e);
void extract_values(Tango::DeviceAttribute *attr_value, vector<double> &val, int &type); void extract_values(Tango::DeviceAttribute *attr_value, vector<double> &val, int &type);
private: private:
event_list* e_ptr; //event_list* e_ptr;
Tango::DeviceImpl *mydev;
}; };
/* /*
......
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment