Skip to content
Snippets Groups Projects
Commit 8c109163 authored by Martin Scarcia's avatar Martin Scarcia
Browse files

more robust handling of cfg + reset cmd

parent 1593dc24
No related branches found
No related tags found
No related merge requests found
......@@ -21,7 +21,7 @@ def createDynAttrsDict():
# add further info specifically
dyn_attrs["IntShortAttr"].default = 1 # default value
dyn_attrs["IntShortAttr"].memorized = True # memorized in DB
#dyn_attrs["IntShortAttr"].memorized = True # memorized in DB
dyn_attrs["FloatAttr"].default = 3.66 # default value
dyn_attrs["FloatAttr"].permission = PyTango.AttrWriteType.READ
......
......@@ -249,82 +249,88 @@ class Dynamic(PyTango.Device_4Impl):
self.cfg_module = importlib.import_module(self.cfg_module_name)
#print("imported cfg")
except Exception as ex:
sys.stderr.write(str(ex)+"\n")
self.cfg_module = None
#self.warn_stream("Invalid cfg script file\n")
print("Invalid cfg script file\n",ex)
self.set_state(PyTango.DevState.OFF)
self.set_status("Configurator script file not found")
self.set_state(PyTango.DevState.ON)
return
#creation of dynamic attributes from cfg script
dynattrs = self.cfg_module.createDynAttrsDict()
for attr in dynattrs:
attr_obj = dynattrs[attr]
try:
dynattrs = self.cfg_module.createDynAttrsDict()
if not (attr_obj.handler.lower() == self.get_name().lower() or attr_obj.handler.lower() == 'self'):
try:
attr_proxy = PyTango.AttributeProxy(str(attr_obj.tango_name))
attr_config = attr_proxy.get_config()
attr = PyTango.Attr(str(attr_obj.name), attr_config.data_type, attr_config.writable)
self.add_attribute(attr, self.read_proxy, self.write_proxy)
self.myattrs[attr_obj.name] = attr_proxy
except PyTango.DevFailed as ex:
self.warn_stream("%s" % str(ex))
continue
if attr_obj.name in self.myattrs:
PyTango.Except.throw_exception("Exist",
"Attribute "+attr_obj.name+ " already defined",
"NewAttribute")
# It's possible to define python methods for calculating the attribute value
self.read_functions[attr_obj.name] = attr_obj.read_function
self.write_functions[attr_obj.name] = attr_obj.write_function
if attr_obj.dimension == 1:
attr = PyTango.Attr(str(attr_obj.name), attr_obj.datatype, attr_obj.permission)
if attr_obj.memorized == 1:
attr.set_memorized()
self.add_attribute(attr, self.read_Scalar,self.write_Scalar)
if attr_obj.datatype == PyTango.ArgType.DevString:
self.myattrs[attr_obj.name] = ""
else:
self.myattrs[attr_obj.name] = 0
if attr_obj.default:
self.myattrs[attr_obj.name] = attr_obj.default
if attr_obj.memorized == 1:
dev_attr_props = self.db.get_device_attribute_property(self.get_name(),[attr_obj.name])
attr_prop = dev_attr_props[attr_obj.name]
for pr in attr_prop.keys():
if (pr == '__value') and (attr_obj.datatype == PyTango.ArgType.DevString):
self.myattrs[attr_obj.name] = attr_prop['__value'][0]
elif (pr == '__value') and (attr_obj.datatype == PyTango.ArgType.DevFloat):
self.myattrs[attr_obj.name] = float(attr_prop['__value'][0])
elif (pr == '__value') and (attr_obj.datatype == PyTango.ArgType.DevDouble):
self.myattrs[attr_obj.name] = float(attr_prop['__value'][0])
elif (pr == '__value') and (attr_obj.datatype == PyTango.ArgType.DevBoolean):
self.myattrs[attr_obj.name] = bool(attr_prop['__value'][0] == "True")
elif (pr == '__value'):
self.myattrs[attr_obj.name] = int(attr_prop['__value'][0])
elif isinstance(attr_obj.dimension, tuple):
sizx = attr_obj.dimension[0]
sizy = attr_obj.dimension[1]
attr = PyTango.ImageAttr(str(attr_obj.name), attr_obj.datatype, attr_obj.permission,sizx, sizy)
self.add_attribute(attr, self.read_Image, self.write_Image)
if attr_obj.default:
self.myattrs[attr_obj.name] = eval(attr_obj.default)
elif attr_obj.datatype == PyTango.ArgType.DevString:
self.myattrs[attr_obj.name] = [[],[]]
else:
self.myattrs[attr_obj.name] = np.zeros((sizy,sizx),fromPyTango2NumpyType(attr_obj.datatype))
else :
self.debug_stream("Creation of attribute %s, dimension = %s" % (attr_obj.name, str(attr_obj.dimension)))
attr = PyTango.SpectrumAttr(str(attr_obj.name), attr_obj.datatype, attr_obj.permission,attr_obj.dimension)
self.add_attribute(attr, self.read_Spectrum, self.write_Spectrum)
if attr_obj.default:
self.myattrs[attr_obj.name] = eval(attr_obj.default)
elif attr_obj.datatype == PyTango.ArgType.DevString:
self.myattrs[attr_obj.name] = []
else:
self.myattrs[attr_obj.name] = np.zeros(attr_obj.dimension, fromPyTango2NumpyType(attr_obj.datatype))
for attribute in dynattrs:
attr_obj = dynattrs[attribute]
if not (attr_obj.handler.lower() == self.get_name().lower() or attr_obj.handler.lower() == 'self'):
try:
attr_proxy = PyTango.AttributeProxy(str(attr_obj.tango_name))
attr_config = attr_proxy.get_config()
attr = PyTango.Attr(str(attr_obj.name), attr_config.data_type, attr_config.writable)
self.add_attribute(attr, self.read_proxy, self.write_proxy)
self.myattrs[attr_obj.name] = attr_proxy
except PyTango.DevFailed as ex:
self.warn_stream("%s" % str(ex))
continue
if attr_obj.name in self.myattrs:
PyTango.Except.throw_exception("Exist",
"Attribute "+attr_obj.name+ " already defined",
"NewAttribute")
# It's possible to define python methods for calculating the attribute value
self.read_functions[attr_obj.name] = attr_obj.read_function
self.write_functions[attr_obj.name] = attr_obj.write_function
if attr_obj.dimension == 1:
attr = PyTango.Attr(str(attr_obj.name), attr_obj.datatype, attr_obj.permission)
if attr_obj.memorized == 1:
attr.set_memorized()
self.add_attribute(attr, self.read_Scalar,self.write_Scalar)
if attr_obj.datatype == PyTango.ArgType.DevString:
self.myattrs[attr_obj.name] = ""
else:
self.myattrs[attr_obj.name] = 0
if attr_obj.default:
self.myattrs[attr_obj.name] = attr_obj.default
if attr_obj.memorized == 1:
dev_attr_props = self.db.get_device_attribute_property(self.get_name(),[attr_obj.name])
attr_prop = dev_attr_props[attr_obj.name]
for pr in attr_prop.keys():
if (pr == '__value') and (attr_obj.datatype == PyTango.ArgType.DevString):
self.myattrs[attr_obj.name] = attr_prop['__value'][0]
elif (pr == '__value') and (attr_obj.datatype == PyTango.ArgType.DevFloat):
self.myattrs[attr_obj.name] = float(attr_prop['__value'][0])
elif (pr == '__value') and (attr_obj.datatype == PyTango.ArgType.DevDouble):
self.myattrs[attr_obj.name] = float(attr_prop['__value'][0])
elif (pr == '__value') and (attr_obj.datatype == PyTango.ArgType.DevBoolean):
self.myattrs[attr_obj.name] = bool(attr_prop['__value'][0] == "True")
elif (pr == '__value'):
self.myattrs[attr_obj.name] = int(attr_prop['__value'][0])
elif isinstance(attr_obj.dimension, tuple):
sizx = attr_obj.dimension[0]
sizy = attr_obj.dimension[1]
attr = PyTango.ImageAttr(str(attr_obj.name), attr_obj.datatype, attr_obj.permission,sizx, sizy)
self.add_attribute(attr, self.read_Image, self.write_Image)
if attr_obj.default:
self.myattrs[attr_obj.name] = eval(attr_obj.default)
elif attr_obj.datatype == PyTango.ArgType.DevString:
self.myattrs[attr_obj.name] = [[],[]]
else:
self.myattrs[attr_obj.name] = np.zeros((sizy,sizx),fromPyTango2NumpyType(attr_obj.datatype))
else :
self.debug_stream("Creation of attribute %s, dimension = %s" % (attr_obj.name, str(attr_obj.dimension)))
attr = PyTango.SpectrumAttr(str(attr_obj.name), attr_obj.datatype, attr_obj.permission,attr_obj.dimension)
self.add_attribute(attr, self.read_Spectrum, self.write_Spectrum)
if attr_obj.default:
self.myattrs[attr_obj.name] = eval(attr_obj.default)
elif attr_obj.datatype == PyTango.ArgType.DevString:
self.myattrs[attr_obj.name] = []
else:
self.myattrs[attr_obj.name] = np.zeros(attr_obj.dimension, fromPyTango2NumpyType(attr_obj.datatype))
except Exception as ex:
sys.stderr.write(str(ex)+"\n")
self.set_status("Invalid cfg script file skipped")
self.set_state(PyTango.DevState.ON)
......@@ -533,6 +539,22 @@ class Dynamic(PyTango.Device_4Impl):
self.set_status('Unable to write file ' + self.ConfiguratorScript)
finally:
conf_file.close()
#------------------------------------------------------------------
# Load config from remote location
#------------------------------------------------------------------
def ResetCfgFile(self):
""" Restore default cfg file. Overwrite the existing cfg file.
:param argin: PyTango.DevVoid
:return:
:rtype: PyTango.DevVoid
"""
default_fn = self.DefaultCfgScript
file_name = self.ConfiguratorScript
if os.path.isfile(default_fn):
# if default file exists
shutil.copyfile(default_fn, file_name)
#==================================================================
#
# DynamicClass class utilities
......@@ -602,10 +624,10 @@ class DynamicClass(PyTango.DeviceClass):
[PyTango.DevString,
"Path to the python script that will be executed.",
[] ],
'PythonExecutable':
'DefaultCfgScript':
[PyTango.DevString,
"Path to the python executable.",
[ "python"] ],
"Path to the default config file",
[] ] ,
}
......@@ -626,6 +648,9 @@ class DynamicClass(PyTango.DeviceClass):
'LoadCfgFile':
[[PyTango.DevString,""],
[PyTango.DevVoid,""]],
'ResetCfgFile':
[[PyTango.DevVoid,""],
[PyTango.DevVoid,""]],
}
......
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment