Commit a3f4daae authored by Francesco Tripaldi's avatar Francesco Tripaldi
Browse files

Changed DCCT device and use PyGal Module instead Matplotlib

parent 3b7171c0
......@@ -3,7 +3,6 @@
#
# Vgrid gun scan and find best value, pre saturation
#
# attr;interval_sec;double;0.04
# attr;scan_start;double;15
# attr;scan_stop;double;28
# attr;scan_step;double;0.5
......@@ -11,6 +10,8 @@
# attr;ScriptErr;string;''
# attr;publish_to_elog;bool;1
# attr;graph_title;string;GUN grid Scan
# attr;elog_text;string;Scan Graph
# attr;best_vgrid_result;double;0
# attr;plot_best_vgrid;bool;1
......@@ -21,23 +22,20 @@ import sys, os
import time
import numpy as np
import io
import matplotlib
import requests
matplotlib.use('Agg')
import matplotlib.pyplot as plt
def fig2elog(fig, text):
from datetime import datetime
from scipy.signal import find_peaks
def fig2elog(file_buffer, text,elog_url='http://elog.elettra.eu/upload/upload.php',text_after =''):
buf = io.BytesIO()
fig.savefig(buf)
buf.seek(0)
try:
rs = requests.post('http://elog.elettra.eu/upload/upload.php', files={'file': ('booster_performance.png', buf, 'image/png')}, data={'string':text})
rs = requests.post(elog_url, files=file_buffer, data={'string':text,'string_after':text_after})
# print(rs)
# print(rs.content.decode())
except Exception as e:
selfseq_dev.write_attribute("ScriptErr", f"{type(e).__name__} while connection to elog: {e}") # scrive l'eventuale errore sull'attributo "ScriptErr" della sequenza
sys.exit(1)
selfseq_dev.write_attribute("ScriptErr", f"{type(e).__name__} while connection to website: {e}") # scrive l'eventuale errore sull'attributo "ScriptErr" della sequenza
pass
def main():
......@@ -47,8 +45,8 @@ def main():
try:
selfseq_dev = PyTango.DeviceProxy(sys.argv[1])
gun_dev = PyTango.DeviceProxy('p/gun/guntiming_p1.1')
dcct_dev = PyTango.DeviceProxy('sr/diagnostics/topupdcct')
interval = selfseq_dev.read_attribute("interval_sec").value
dcct_dev = PyTango.DeviceProxy('b/diagnostics/dcct_b')
dcct_topup = PyTango.DeviceProxy('sr/DIAGNOSTICS/topupdcct')
scan_start = selfseq_dev.read_attribute("scan_start").value
scan_stop = selfseq_dev.read_attribute("scan_stop").value
scan_step = selfseq_dev.read_attribute("scan_step").value
......@@ -79,10 +77,10 @@ def main():
selfseq_dev.write_attribute("ScriptErr","Sequence not running")
gun_dev.write_attribute('FastGridVoltage',starting_gun)
sys.exit(1)
if bool(timing_enable.read_attribute("ExtractionEnabled").value) == False:
selfseq_dev.write_attribute("ScriptErr", 'SE Extraction DISABLED')
sys.exit(1)
plc_status = dcct_dev.read_attribute('Stat').value
# if bool(timing_enable.read_attribute("ExtractionEnabled").value) == False:
# selfseq_dev.write_attribute("ScriptErr", 'SE Extraction DISABLED')
# sys.exit(1)
plc_status = dcct_topup.read_attribute('Stat').value
if float(gun_dev.read_attribute('GridVoltage').value) < 0.5 and topup_test_mode == False:
selfseq_dev.write_attribute("ScriptErr","Grid disabled or zero voltage during scan")
gun_dev.write_attribute('FastGridVoltage',starting_gun)
......@@ -96,17 +94,19 @@ def main():
gun_dev.write_attribute('FastGridVoltage',x)
if not first_time:
#print('first time')
time.sleep(0.3)
time.sleep(2)
first_time = True
temp_store =0
for mean in range(0, dcct_samples,1):
if topup_test_mode == True and (plc_status[9] == True or bool(trigger_gun.read_attribute("gun_trigger_on").value) == False):
dcct_samples = mean+1
break
else:
temp_store += dcct_dev.read_attribute('BCurrent').value
time.sleep(interval)
dcct_value = temp_store/dcct_samples
# temp_store =0
# for mean in range(0, dcct_samples,1):
# if topup_test_mode == True and (plc_status[9] == True or bool(trigger_gun.read_attribute("gun_trigger_on").value) == False):
# dcct_samples = mean+1
# break
# else:
# temp_store += dcct_dev.read_attribute('BCurrent').value
time.sleep(dcct_samples*0.25)
dcct_value = np.mean(dcct_dev.command_inout('GetCurrent',[0,0,dcct_samples]))
X.append(x)
Y.append(dcct_value)
......@@ -124,31 +124,69 @@ def main():
yp=p(xp)
# calculate second order deriv.
y_conv = np.convolve(y1, yp, mode="same")
best_vgrid = xp[np.where(y_conv == np.amin(y_conv))[0][0]]
best_current = yp[np.where(y_conv == np.amin(y_conv))[0][0]]
# best_vgrid = xp[np.where(y_conv == np.amin(y_conv))[0][0]]
# best_current = yp[np.where(y_conv == np.amin(y_conv))[0][0]]
peaks, _ = find_peaks(y_conv*-1)
current_peaks = []
for peak in peaks:
current_peaks.append(yp[peak])
max_current= max(current_peaks)
max_index = current_peaks.index(max_current)
best_vgrid = xp[peaks[max_index]-4]
best_current = yp[peaks[max_index]-4]
gun_dev.write_attribute('FastGridVoltage',best_vgrid)
selfseq_dev.write_attribute("best_vgrid_result", float(best_vgrid))
if publish_to_elog == True:
plt.style.use(['seaborn-darkgrid'])
if plot_best_vgrid == True:
plt.plot(best_vgrid, best_current, marker="o", markersize=15, markeredgecolor="red", markerfacecolor="green")
plt.text(scan_start+((scan_stop-scan_start)/2), 0.05,'Best Vgrid: '+f'{best_vgrid:.2f}'+'V', fontsize = 12, bbox = dict(facecolor = 'red', alpha = 0.5))
plt.scatter(X, Y,label = "GUN Vgrid scan data", color="blue")
plt.plot(xp,p(xp), label = "Fitting", color="orange")
#plt.plot(xp,y_conv[:-3], label = "2nd order derivate")
plt.title(str(graph_title))
plt.xlabel('GUN Vgrid (V)')
plt.ylabel('Booster current (mA)')
import pygal
from pygal.style import Style
super_data = []
super_data2 = []
super_data3 = []
yrange =()
custom_style = Style(
#background='transparent',
#plot_background='transparent',
major_label_font_size = 12,
label_font_size = 12,
# foreground='#53E89B',
# foreground_strong='#53A0E8',
# foreground_subtle='#630C0D',
# opacity='.6',
# opacity_hover='.9',
# transition='400ms ease-in',
colors=('#0066ff', '#ff8000', '#ff3399', '#E87653', '#E89B53'))
if topup_test_mode == True:
plt.ylim(0,0.6)
y_range = (0,0.6)
else:
plt.ylim(0,1.6)
plt.legend(loc=0)
fig = plt.gcf()
fig2elog(fig,elog_text)
y_range = (0,1.2)
for index,data in enumerate(Y):
super_data.append((X[index],Y[index]))
line_chart = pygal.XY(explicit_size=True,height=550,width=500,legend_at_bottom=True,human_readable=True,style=custom_style ,x_title='GUN Vgrid (V)',y_title='Booster current (mA)',range=y_range )#, x_labels_major_count=5)#,show_minor_x_labels=False)
line_chart.add("GUN Vgrid scan data", super_data)
yp=p(xp)
for index2,data2 in enumerate(yp):
super_data2.append((xp[index2],yp[index2]))
for index3,data3 in enumerate(xp):
super_data3.append((xp[index3],y_conv[index3]))
#plt.plot(xp,y_conv[:-3], label = "2nd order derivate")
#line_chart.add("2nd derivate", super_data3,show_dots=False)
line_chart.add("Fitting", super_data2,show_dots=False)
line_chart.title = str(graph_title)
if plot_best_vgrid == True:
line_chart.add( 'Best Vgrid:'+"{:.2f}".format(best_vgrid) +'V', [(best_vgrid,best_current)],dots_size=12)
graph_svg=line_chart.render(style=custom_style)
file_buffer=[('file',('booster_performance'+datetime.now().strftime("%d-%m-%Y-%H-%M-%S-n")+'.svg',graph_svg,'image/svg+xml'))]
fig2elog(file_buffer,elog_text)
sys.exit(0)
......
#!/usr/bin/python3
#
#
# Vgrid gun scan and find best value, pre saturation
#
# attr;interval_sec;double;0.04
# attr;scan_start;double;15
# attr;scan_stop;double;28
# attr;scan_step;double;0.5
# attr;dcct_mean_samples;20
# attr;ScriptErr;string;''
# attr;publish_to_elog;bool;1
# attr;graph_title;string;GUN grid Scan
# attr;elog_text;string;Scan Graph
# attr;best_vgrid_result;double;0
# attr;plot_best_vgrid;bool;1
import PyTango
import sys, os
import time
import numpy as np
import io
import matplotlib
import requests
matplotlib.use('Agg')
import matplotlib.pyplot as plt
def fig2elog(fig, text):
buf = io.BytesIO()
fig.savefig(buf)
buf.seek(0)
try:
rs = requests.post('http://elog.elettra.eu/upload/upload.php', files={'file': ('booster_performance.png', buf, 'image/png')}, data={'string':text})
except Exception as e:
selfseq_dev.write_attribute("ScriptErr", f"{type(e).__name__} while connection to elog: {e}") # scrive l'eventuale errore sull'attributo "ScriptErr" della sequenza
sys.exit(1)
def main():
X = []
Y = []
try:
selfseq_dev = PyTango.DeviceProxy(sys.argv[1])
gun_dev = PyTango.DeviceProxy('p/gun/guntiming_p1.1')
dcct_dev = PyTango.DeviceProxy('sr/diagnostics/topupdcct')
interval = selfseq_dev.read_attribute("interval_sec").value
scan_start = selfseq_dev.read_attribute("scan_start").value
scan_stop = selfseq_dev.read_attribute("scan_stop").value
scan_step = selfseq_dev.read_attribute("scan_step").value
topup_test_mode = selfseq_dev.read_attribute("topup_test_mode").value
dcct_samples = int(selfseq_dev.read_attribute("dcct_mean_samples").value)
starting_gun = gun_dev.read_attribute('GridVoltage').value
selfseq_dev.write_attribute("ScriptErr","")
publish_to_elog = bool(selfseq_dev.read_attribute("publish_to_elog").value)
plot_best_vgrid = bool(selfseq_dev.read_attribute("plot_best_vgrid").value)
graph_title = selfseq_dev.read_attribute("graph_title").value
elog_text = str(selfseq_dev.read_attribute("elog_text").value)
trigger_gun = PyTango.DeviceProxy('booster/accessi/plc_accessi')
timing_enable = PyTango.DeviceProxy('b/timing/injexttiming_b')
except Exception as e:
#print(str(e))
selfseq_dev.write_attribute("ScriptErr", f"{type(e).__name__} while defining tango devices: {e}") # scrive l'eventuale errore sull'attributo "ScriptErr" della sequenza
sys.exit(1)
if bool(trigger_gun.read_attribute("gun_trigger_on").value) == False:
selfseq_dev.write_attribute("ScriptErr", 'GUN DISABLED')
sys.exit(1)
first_time = False
for x in np.arange(scan_start,scan_stop,scan_step):
if str(selfseq_dev.State()) != 'RUNNING':
selfseq_dev.write_attribute("ScriptErr","Sequence not running")
gun_dev.write_attribute('FastGridVoltage',starting_gun)
sys.exit(1)
if bool(timing_enable.read_attribute("ExtractionEnabled").value) == False:
selfseq_dev.write_attribute("ScriptErr", 'SE Extraction DISABLED')
sys.exit(1)
plc_status = dcct_dev.read_attribute('Stat').value
if float(gun_dev.read_attribute('GridVoltage').value) < 0.5 and topup_test_mode == False:
selfseq_dev.write_attribute("ScriptErr","Grid disabled or zero voltage during scan")
gun_dev.write_attribute('FastGridVoltage',starting_gun)
sys.exit(1)
if topup_test_mode == True and (plc_status[9] == True or bool(trigger_gun.read_attribute("gun_trigger_on").value) == False):
#print("Interlock dcct booster, stop")
scan_stop = x
break
#print("valore gun: " + str(x))
gun_dev.write_attribute('FastGridVoltage',x)
if not first_time:
#print('first time')
time.sleep(0.3)
first_time = True
temp_store =0
for mean in range(0, dcct_samples,1):
if topup_test_mode == True and (plc_status[9] == True or bool(trigger_gun.read_attribute("gun_trigger_on").value) == False):
dcct_samples = mean+1
break
else:
temp_store += dcct_dev.read_attribute('BCurrent').value
time.sleep(interval)
dcct_value = temp_store/dcct_samples
X.append(x)
Y.append(dcct_value)
smooth_width = 50
z = np.polyfit(X, Y, 5)
p = np.poly1d(z)
xp = np.linspace(scan_start, scan_stop, smooth_width)
# create convolution kernel for calculating
# the smoothed second order derivative
xp = xp[:-3]
x1 = np.linspace(-3,3,smooth_width)
norm = np.sum(np.exp(-x1**2)) * (x1[1]-x1[0]) # ad hoc normalization
y1 = (4*x1**2 - 2) * np.exp(-x1**2) / smooth_width *8#norm*(x1[1]-x1[0])
yp=p(xp)
# calculate second order deriv.
y_conv = np.convolve(y1, yp, mode="same")
best_vgrid = xp[np.where(y_conv == np.amin(y_conv))[0][0]]
best_current = yp[np.where(y_conv == np.amin(y_conv))[0][0]]
gun_dev.write_attribute('FastGridVoltage',best_vgrid)
selfseq_dev.write_attribute("best_vgrid_result", float(best_vgrid))
if publish_to_elog == True:
plt.style.use(['seaborn-darkgrid'])
if plot_best_vgrid == True:
plt.plot(best_vgrid, best_current, marker="o", markersize=15, markeredgecolor="red", markerfacecolor="green")
plt.text(scan_start+((scan_stop-scan_start)/2), 0.05,'Best Vgrid: '+f'{best_vgrid:.2f}'+'V', fontsize = 12, bbox = dict(facecolor = 'red', alpha = 0.5))
plt.scatter(X, Y,label = "GUN Vgrid scan data", color="blue")
plt.plot(xp,p(xp), label = "Fitting", color="orange")
#plt.plot(xp,y_conv[:-3], label = "2nd order derivate")
plt.title(str(graph_title))
plt.xlabel('GUN Vgrid (V)')
plt.ylabel('Booster current (mA)')
if topup_test_mode == True:
plt.ylim(0,0.6)
else:
plt.ylim(0,1.6)
plt.legend(loc=0)
fig = plt.gcf()
fig2elog(fig,elog_text)
sys.exit(0)
if __name__ == '__main__':
main()
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import numpy as np
import requests
from datetime import datetime
print('ou')
def fig2elog(file_buffer, text,elog_url='http://elog.elettra.eu/upload/upload.php',text_after =''):
try:
rs = requests.post(elog_url, files=file_buffer, data={'string':text,'string_after':text_after})
print(rs)
print(rs.content.decode())
except Exception as e:
selfseq_dev.write_attribute("ScriptErr", f"{type(e).__name__} while connection to website: {e}") # scrive l'eventuale errore sull'attributo "ScriptErr" della sequenza
pass
X = np.linspace(16.5, 28, 10)
Y = [0,0,0.2,0.3,0.5,0.5,0.4,0.3,0.3,0.3]
smooth_width = 50
z = np.polyfit(X, Y, 5)
p = np.poly1d(z)
xp = np.linspace(16.5, 28, smooth_width)
publish_to_elog = True
topup_test_mode = False
elog_text = 'Scan booster test'
graph_title = 'Booster scan'
best_vgrid = 22
best_current=0.5
plot_best_vgrid = True
if True:
if publish_to_elog == True:
import pygal
from pygal.style import Style
super_data = []
super_data2 = []
yrange =()
custom_style = Style(
#background='transparent',
#plot_background='transparent',
major_label_font_size = 12,
label_font_size = 12,
# foreground='#53E89B',
# foreground_strong='#53A0E8',
# foreground_subtle='#630C0D',
# opacity='.6',
# opacity_hover='.9',
# transition='400ms ease-in',
colors=('#0066ff', '#ff8000', '#ff3399', '#E87653', '#E89B53'))
if topup_test_mode == True:
y_range = (0,0.6)
else:
y_range = (0,1.6)
for index,data in enumerate(Y):
super_data.append((X[index],Y[index]))
line_chart = pygal.XY(explicit_size=True,height=550,width=500,legend_at_bottom=True,human_readable=True,style=custom_style ,x_title='GUN Vgrid (V)',y_title='Booster current (mA)',range=y_range )#, x_labels_major_count=5)#,show_minor_x_labels=False)
line_chart.add("GUN Vgrid scan data", super_data)
yp=p(xp)
for index2,data2 in enumerate(yp):
super_data2.append((xp[index2],yp[index2]))
line_chart.add("Fitting", super_data2,show_dots=False)
line_chart.title = str(graph_title)
if plot_best_vgrid == True:
line_chart.add( 'Best Vgrid:'+"{:.2f}".format(best_vgrid) +'V', [(best_vgrid,best_current)],dots_size=12)
graph_svg=line_chart.render(style=custom_style)
file_buffer=[('file',('booster_performance'+datetime.now().strftime("%d-%m-%Y-%H-%M-%S-n")+'.svg',graph_svg,'image/svg+xml'))]
fig2elog(file_buffer,elog_text)
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment