diff --git a/json_gateway.py b/json_gateway.py new file mode 100644 index 0000000000000000000000000000000000000000..cac4420e5260fee0eb827a2cdd691c5977cc8537 --- /dev/null +++ b/json_gateway.py @@ -0,0 +1,166 @@ +import json +import tango +from tango.server import Device, attribute, command, run, device_property +from tango import Attr, AttrWriteType, AttrQuality, DevState +from tango.server import attribute + +# Type mapping for Tango data types +TANGO_TYPE_MAPPING = { + "DEV_BOOLEAN": tango.DevBoolean, + "DEV_SHORT": tango.DevShort, + "DEV_LONG": tango.DevLong, + "DEV_FLOAT": tango.DevFloat, + "DEV_DOUBLE": tango.DevDouble, + "DEV_STRING": tango.DevString, +} +class JSONGateway(Device): + # JSON property to define dynamic attributes + dynamic_attributes_config = device_property(dtype=(str,), default_value=("[]")) + dynamic_attributes = {} + attributes_config = {} + + def init_device(self): + print("init_device") + """Initialize the device and create dynamic attributes.""" + super().init_device() + try: + # Parse the JSON configuration for dynamic attributes + self.attributes_config = json.loads(' '.join(self.dynamic_attributes_config)) + except Exception as e: + print("ERROR: not valid JSON string in property dynamic_attributes_config.\nMust be:\n[\n {\"name\": \"name1\", \"type\": \"DEV_STRING\", \"writable\": true},\n {\"name\": \"name2\", \"type\": \"DEV_STRING\", \"writable\": false, \"value\": {\"device\":\"a/b/c\", \"operation\":\"command\", \"name\":\"Status\"}}\n]") + self.error_stream("Failed to initialize dynamic attributes: {e}") + try: + # Iterate through attribute definitions and create attributes + for attr_def in self.attributes_config: + print("attr_def: ", attr_def) + name = attr_def["name"] + dtype_str = attr_def["type"] + writable = attr_def.get("writable", False) + self.add_dynamic_attribute(name, dtype_str, writable) + + except Exception as e: + print("Failed to initialize dynamic attributes:", e) + self.error_stream("Failed to initialize dynamic attributes: {e}") + + # Define getter method + def read_callback(self, attr_obj): + name = attr_obj.get_name() + # print(name) + for attr_def in self.attributes_config: + if attr_def["name"]==name: + request = attr_def["value"] + target_device = request.get("device", None) + operation = request["operation"] # "command", "attribute", or "dynamic" + name = request["name"] # Command or attribute name + value = request.get("value") # Optional value for write operations + # Other operations: Command or Attribute + target_device = target_device or self.default_tango_device + device = tango.DeviceProxy(target_device) + if operation == "command": + result = device.command_inout(name, json.loads(value)) if value else device.command_inout(name) + elif operation == "attribute": + if value is not None: + # Write attribute + device.write_attribute(name, json.loads(value)) + result = {"status": "success", "message": "Attribute {name} written successfully"} + else: + # Read attribute + result = device.read_attribute(name).value + else: + raise ValueError("Invalid operation type. Use 'command', 'attribute', or 'dynamic'.") + self.info_stream("Reading attribute '{name}': {value}") + # print(result) + attr_obj.set_value(json.dumps(result.tolist())) + + # Define setter method (only if writable) + def write_callback(self, attr_obj): + # e.g. "{\"device\":\"g/gun/guntiming-p1.1\", \"operation\":\"command\", \"name\":\"Status\"}" + # "{\"device\":\"sr/sim/srElettra2_high_betax_long_straights\", \"operation\":\"command\", \"name\":\"GetHistExtendedOpticsData\", \"value\": \"[[0,100],[\\\"SR__DIAGNOSTICS__BPM_S04.09__horpos\\\"]]\"}" + value = attr_obj.get_write_value() + name = attr_obj.get_name() + for attr_def in self.attributes_config: + if attr_def["name"]==name: + try: + attr_def["value"] = json.loads(value) + except Exception as e: + raise ValueError("Invalid JSON string, please use syntax: {\"device\":\"a/b/c\", \"operation\":\"command\", \"name\":\"Status\"}") + if not "device" in attr_def["value"]: + raise ValueError("Missing device, please use syntax: {\"device\":\"a/b/c\", \"operation\":\"command\", \"name\":\"Status\"}") + if not "operation" in attr_def["value"]: + raise ValueError("Missing operation, please use syntax: {\"device\":\"a/b/c\", \"operation\":\"command\", \"name\":\"Status\"}") + if not "name" in attr_def["value"]: + raise ValueError("Missing name, please use syntax: {\"device\":\"a/b/c\", \"operation\":\"command\", \"name\":\"Status\"}") + # self.dynamic_attributes[name] = value + # self.info_stream("Writing attribute '{name}': {value}") + print("Writing attribute ", name, " : ", value) + + def add_dynamic_attribute(self, name, dtype_str, writable): + """Dynamically add an attribute to the device.""" + dtype = TANGO_TYPE_MAPPING.get(dtype_str) + if dtype is None: + self.error_stream(f"Unsupported Tango data type: {dtype_str}") + return + + # Define the attribute using the Attr class + attr = Attr(name, dtype, AttrWriteType.READ_WRITE if writable else AttrWriteType.READ) + + # Register the attribute with the device + self.add_attribute(attr, self.read_callback, self.write_callback if writable else None) + + # Initialize storage for the attribute value + self.dynamic_attributes[name] = None + + # Command to handle JSON requests + @command(dtype_in=str, dtype_out=str) + def TransformJSON(self, json_request): + """ + Transforms a JSON request into a Tango command or attribute operation. + """ + try: + # Parse the JSON request + request = json.loads(json_request) + + # Extract details + target_device = request.get("device", None) + operation = request["operation"] # "command", "attribute", or "dynamic" + name = request["name"] # Command or attribute name + value = request.get("value") # Optional value for write operations + + if operation == "dynamic": + # Handle dynamic attributes + if name in self.dynamic_attributes: + if value is not None: + self.dynamic_attributes[name] = value + result = {"status": "success", "message": "Dynamic attribute {name} set to {value}"} + else: + result = self.dynamic_attributes[name] + else: + raise KeyError("Dynamic attribute '{name}' not found") + else: + # Other operations: Command or Attribute + target_device = target_device or self.default_tango_device + device = tango.DeviceProxy(target_device) + if operation == "command": + result = device.command_inout(name, value) if value else device.command_inout(name) + elif operation == "attribute": + if value is not None: + # Write attribute + device.write_attribute(name, value) + result = {"status": "success", "message": "Attribute {name} written successfully"} + else: + # Read attribute + result = device.read_attribute(name).value + else: + raise ValueError("Invalid operation type. Use 'command', 'attribute', or 'dynamic'.") + + # Format response + response = {"status": "success", "result": result} + except Exception as e: + response = {"status": "error", "message": str(e)} + + return json.dumps(response) + + +# Entry point for the device server +if __name__ == "__main__": + run([JSONGateway])