Skip to content
Snippets Groups Projects
Commit c73dc69b authored by Lucio Zambon's avatar Lucio Zambon
Browse files

Add new file

parent 74bce34e
No related branches found
No related tags found
No related merge requests found
from flask import Flask
from flask_caching import Cache
from flask_cors import CORS
import tango
import time
from werkzeug.routing import BaseConverter
class SignedIntegerConverter(BaseConverter):
def to_python(self, value):
number = int(value)
return number
def to_url(self, value):
return str(value)
def read_tango_attribute(device_name, attribute_name, decimation, offset, length, cache_key, token=-1):
debug = 0
try:
t0 = time.time()
# Create a device proxy for the specified device
device = tango.DeviceProxy(device_name)
t1 = time.time()
if debug: print("DeviceProxy", t1 - t0)
# Read the attribute value
attribute_value = device.read_attribute(attribute_name)
# Print the attribute value
t2 = time.time();
if debug: print("read_attribute", t2 - t1)
if cache_key:
cache.set(cache_key, attribute_value.value, timeout=900)
t3 = time.time();
timest = attribute_value.time.tv_sec*1000+attribute_value.time.tv_usec/1000
if debug: print("cache", t3 - t2)
mylen = len(attribute_value.value)
if offset > -1:
mylen = length
else:
offset = 0
if offset > len(attribute_value.value)-mylen:
offset = len(attribute_value.value)-mylen
if decimation == 0: decimation = 1
if decimation < 0: decimation = round(len(attribute_value.value)/(-decimation))
if debug: print("mylen:", mylen, " offset:", offset, "decimation:", decimation, range(offset, mylen, decimation))
array_string = ', '.join(f'{x:.5f}' for x in [attribute_value.value[i] for i in range(offset, offset+mylen, decimation)])
t4 = time.time();
if debug: print("stringify", t4 - t3)
return '{'+'"value": [' + array_string + '], "timestamp": ' + str(timest) + ', "decimation": ' + str(decimation) + ', "token": ' + str(token) + '}'
except tango.DevFailed as e:
print("Failed to read attribute: {e}")
app = Flask(__name__)
app.url_map.converters['signed_int'] = SignedIntegerConverter
CORS(app)
app.config['CACHE_TYPE'] = 'simple' # Use simple in-memory cache
cache = Cache(app)
@app.route('/bpmchart/src=<string:bpm>/offset=<int:offset>/length=<int:length>/cache=<int:token>')
@app.route('/bpmchart/src=<string:bpm>/offset=<int:offset>/cache=<int:token>')
@app.route('/bpmchart/src=<string:bpm>/decimation=<signed_int:decimation>/cache=<int:token>')
def get_cache(bpm, token, decimation=1, offset=-1, length=2000):
print('token:', token)
cache_key = bpm + str(token)
float_values = cache.get(cache_key)
mylen = len(float_values)
if offset > -1:
mylen = length
else:
offset = 0
if offset > len(float_values)-mylen:
offset = len(float_values)-mylen
if decimation == 0: decimation = 1
if decimation < 0: decimation = round(len(float_values)/(-decimation))
array_string = ', '.join(f'{x:.5f}' for x in [float_values[i] for i in range(offset, offset+mylen, decimation)])
return '{'+'"value": [' + array_string + '], "decimation": ' + str(decimation) + ', "token": ' + str(token) + '}'
@app.route('/bpmchart/src=<string:bpm>/offset=<int:offset>/length=<int:length>/token=<int:token>')
@app.route('/bpmchart/src=<string:bpm>/offset=<int:offset>/length=<int:length>')
@app.route('/bpmchart/src=<string:bpm>/offset=<int:offset>/token=<int:token>')
@app.route('/bpmchart/src=<string:bpm>/offset=<int:offset>')
@app.route('/bpmchart/src=<string:bpm>/decimation=<signed_int:decimation>/token=<int:token>')
@app.route('/bpmchart/src=<string:bpm>/token=<int:token>')
@app.route('/bpmchart/src=<string:bpm>/decimation=<signed_int:decimation>')
@app.route('/bpmchart/src=<string:bpm>')
def get_src(bpm, decimation=1, offset=-1, token=-1, length=2000):
attribute_name = "HorPosFastHistory"
# print(bpm)
if ';' in bpm:
myres = ''
for i in bpm.split(';'):
device_name = "srv-tango-sre-01:20000/sr/feedback/"+i
cache_key = i + str(token) if token>-1 else ""
myres = myres + ("," if myres else "[") + read_tango_attribute(device_name, attribute_name, decimation, offset, length, cache_key, token)
return myres + "]"
else:
device_name = "srv-tango-sre-01:20000/sr/feedback/"+bpm
attribute_name = "HorPosFastHistory"
cache_key = bpm + str(token) if token>-1 else ""
return read_tango_attribute(device_name, attribute_name, decimation, offset, length, cache_key, token)
if __name__ == '__main__':
app.run(debug=False, host='pwma.elettra.eu', port=8080)
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment