Skip to content
Snippets Groups Projects
Commit 5f822054 authored by Lucio Zambon's avatar Lucio Zambon
Browse files

Add new file

parent 1beb8bf7
No related branches found
No related tags found
No related merge requests found
import json
import tango
from tango.server import Device, attribute, command, run, device_property
from tango import Attr, AttrWriteType, AttrQuality, DevState
from tango.server import attribute
# Type mapping for Tango data types
TANGO_TYPE_MAPPING = {
"DEV_BOOLEAN": tango.DevBoolean,
"DEV_SHORT": tango.DevShort,
"DEV_LONG": tango.DevLong,
"DEV_FLOAT": tango.DevFloat,
"DEV_DOUBLE": tango.DevDouble,
"DEV_STRING": tango.DevString,
}
class JSONGateway(Device):
# JSON property to define dynamic attributes
dynamic_attributes_config = device_property(dtype=(str,), default_value=("[]"))
dynamic_attributes = {}
attributes_config = {}
def init_device(self):
print("init_device")
"""Initialize the device and create dynamic attributes."""
super().init_device()
try:
# Parse the JSON configuration for dynamic attributes
self.attributes_config = json.loads(' '.join(self.dynamic_attributes_config))
except Exception as e:
print("ERROR: not valid JSON string in property dynamic_attributes_config.\nMust be:\n[\n {\"name\": \"name1\", \"type\": \"DEV_STRING\", \"writable\": true},\n {\"name\": \"name2\", \"type\": \"DEV_STRING\", \"writable\": false, \"value\": {\"device\":\"a/b/c\", \"operation\":\"command\", \"name\":\"Status\"}}\n]")
self.error_stream("Failed to initialize dynamic attributes: {e}")
try:
# Iterate through attribute definitions and create attributes
for attr_def in self.attributes_config:
print("attr_def: ", attr_def)
name = attr_def["name"]
dtype_str = attr_def["type"]
writable = attr_def.get("writable", False)
self.add_dynamic_attribute(name, dtype_str, writable)
except Exception as e:
print("Failed to initialize dynamic attributes:", e)
self.error_stream("Failed to initialize dynamic attributes: {e}")
# Define getter method
def read_callback(self, attr_obj):
name = attr_obj.get_name()
# print(name)
for attr_def in self.attributes_config:
if attr_def["name"]==name:
request = attr_def["value"]
target_device = request.get("device", None)
operation = request["operation"] # "command", "attribute", or "dynamic"
name = request["name"] # Command or attribute name
value = request.get("value") # Optional value for write operations
# Other operations: Command or Attribute
target_device = target_device or self.default_tango_device
device = tango.DeviceProxy(target_device)
if operation == "command":
result = device.command_inout(name, json.loads(value)) if value else device.command_inout(name)
elif operation == "attribute":
if value is not None:
# Write attribute
device.write_attribute(name, json.loads(value))
result = {"status": "success", "message": "Attribute {name} written successfully"}
else:
# Read attribute
result = device.read_attribute(name).value
else:
raise ValueError("Invalid operation type. Use 'command', 'attribute', or 'dynamic'.")
self.info_stream("Reading attribute '{name}': {value}")
# print(result)
attr_obj.set_value(json.dumps(result.tolist()))
# Define setter method (only if writable)
def write_callback(self, attr_obj):
# e.g. "{\"device\":\"g/gun/guntiming-p1.1\", \"operation\":\"command\", \"name\":\"Status\"}"
# "{\"device\":\"sr/sim/srElettra2_high_betax_long_straights\", \"operation\":\"command\", \"name\":\"GetHistExtendedOpticsData\", \"value\": \"[[0,100],[\\\"SR__DIAGNOSTICS__BPM_S04.09__horpos\\\"]]\"}"
value = attr_obj.get_write_value()
name = attr_obj.get_name()
for attr_def in self.attributes_config:
if attr_def["name"]==name:
try:
attr_def["value"] = json.loads(value)
except Exception as e:
raise ValueError("Invalid JSON string, please use syntax: {\"device\":\"a/b/c\", \"operation\":\"command\", \"name\":\"Status\"}")
if not "device" in attr_def["value"]:
raise ValueError("Missing device, please use syntax: {\"device\":\"a/b/c\", \"operation\":\"command\", \"name\":\"Status\"}")
if not "operation" in attr_def["value"]:
raise ValueError("Missing operation, please use syntax: {\"device\":\"a/b/c\", \"operation\":\"command\", \"name\":\"Status\"}")
if not "name" in attr_def["value"]:
raise ValueError("Missing name, please use syntax: {\"device\":\"a/b/c\", \"operation\":\"command\", \"name\":\"Status\"}")
# self.dynamic_attributes[name] = value
# self.info_stream("Writing attribute '{name}': {value}")
print("Writing attribute ", name, " : ", value)
def add_dynamic_attribute(self, name, dtype_str, writable):
"""Dynamically add an attribute to the device."""
dtype = TANGO_TYPE_MAPPING.get(dtype_str)
if dtype is None:
self.error_stream(f"Unsupported Tango data type: {dtype_str}")
return
# Define the attribute using the Attr class
attr = Attr(name, dtype, AttrWriteType.READ_WRITE if writable else AttrWriteType.READ)
# Register the attribute with the device
self.add_attribute(attr, self.read_callback, self.write_callback if writable else None)
# Initialize storage for the attribute value
self.dynamic_attributes[name] = None
# Command to handle JSON requests
@command(dtype_in=str, dtype_out=str)
def TransformJSON(self, json_request):
"""
Transforms a JSON request into a Tango command or attribute operation.
"""
try:
# Parse the JSON request
request = json.loads(json_request)
# Extract details
target_device = request.get("device", None)
operation = request["operation"] # "command", "attribute", or "dynamic"
name = request["name"] # Command or attribute name
value = request.get("value") # Optional value for write operations
if operation == "dynamic":
# Handle dynamic attributes
if name in self.dynamic_attributes:
if value is not None:
self.dynamic_attributes[name] = value
result = {"status": "success", "message": "Dynamic attribute {name} set to {value}"}
else:
result = self.dynamic_attributes[name]
else:
raise KeyError("Dynamic attribute '{name}' not found")
else:
# Other operations: Command or Attribute
target_device = target_device or self.default_tango_device
device = tango.DeviceProxy(target_device)
if operation == "command":
result = device.command_inout(name, value) if value else device.command_inout(name)
elif operation == "attribute":
if value is not None:
# Write attribute
device.write_attribute(name, value)
result = {"status": "success", "message": "Attribute {name} written successfully"}
else:
# Read attribute
result = device.read_attribute(name).value
else:
raise ValueError("Invalid operation type. Use 'command', 'attribute', or 'dynamic'.")
# Format response
response = {"status": "success", "result": result}
except Exception as e:
response = {"status": "error", "message": str(e)}
return json.dumps(response)
# Entry point for the device server
if __name__ == "__main__":
run([JSONGateway])
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment