Skip to content
GitLab
Explore
Sign in
Primary navigation
Search or go to…
Project
B
BCS
Manage
Activity
Members
Labels
Plan
Issues
0
Issue boards
Milestones
Wiki
Code
Merge requests
0
Repository
Branches
Commits
Tags
Repository graph
Compare revisions
Snippets
Build
Pipelines
Jobs
Pipeline schedules
Artifacts
Deploy
Releases
Package Registry
Container Registry
Operate
Environments
Terraform modules
Monitor
Incidents
Analyze
Value stream analytics
Contributor analytics
CI/CD analytics
Repository analytics
Help
Help
Support
GitLab documentation
Compare GitLab plans
Community forum
Contribute to GitLab
Provide feedback
Keyboard shortcuts
?
Snippets
Groups
Projects
Show more breadcrumbs
spe
ds
BCS
Commits
b34fca46
Commit
b34fca46
authored
3 years ago
by
Roberto Borghes
Browse files
Options
Downloads
Patches
Plain Diff
Added source files
parent
a7777f5a
No related branches found
No related tags found
No related merge requests found
Changes
2
Hide whitespace changes
Inline
Side-by-side
Showing
2 changed files
Makefile
+19
-0
19 additions, 0 deletions
Makefile
src/bcs.py
+436
-0
436 additions, 0 deletions
src/bcs.py
with
455 additions
and
0 deletions
Makefile
0 → 100644
+
19
−
0
View file @
b34fca46
NAME
=
bcs-srv
MAIN
=
bcs.py
DIRNAME
=
$(
NAME:-srv
=
)
MODNAME
=
$(
MAIN:.py
=
)
default
:
bin
@
cp
src/
*
.py bin/
${
DIRNAME
}
@
echo
"#!/usr/bin/env python
\n
import sys
\n
sys.path.append(sys.path[0]+'/
${
DIRNAME
}
')
\n
from
${
MODNAME
}
import main
\n
if __name__ == '__main__':
\n
main()
\n
"
>
bin/
${
NAME
}
@
chmod
+x bin/
${
NAME
}
bin/
${
DIRNAME
}
/
${
MAIN
}
bin
:
@
test
-d
$@
||
mkdir
-p
$@
/
${
DIRNAME
}
clean
:
@
rm
-fr
bin/ src/
*
~ src/
*
.pyc
.PHONY
:
clean
This diff is collapsed.
Click to expand it.
src/bcs.py
0 → 100755
+
436
−
0
View file @
b34fca46
#!/usr/bin/python
# "$Name: $";
# "$Header: $";
#=============================================================================
#
# file : bcs-srv.py
#
# description : Python source for the BCS and its commands.
# The class is derived from Device. It represents the
# CORBA servant object which will be accessed from the
# network. All commands which can be executed on the
# BCS are implemented in this file.
#
# project : TANGO Device Server
#
# $Author: $
#
# $Revision: $
#
# $Log: $
#
# copyleft : European Synchrotron Radiation Facility
# BP 220, Grenoble 38043
# FRANCE
#
#=============================================================================
# This file is generated by POGO
# (Program Obviously used to Generate tango Object)
#
# (c) - Software Engineering Group - ESRF
#=============================================================================
#
import
PyTango
import
sys
import
numpy
as
np
import
socket
from
select
import
select
import
time
#==================================================================
# BCS Class Description:
#
# High level management of the LDM power supplies
#
#==================================================================
# Device States Description:
#
# DevState.ON :
# DevState.FAULT :
#==================================================================
class
BCS
(
PyTango
.
Device_4Impl
):
#--------- Add you global variables here --------------------------
#------------------------------------------------------------------
# Device constructor
#------------------------------------------------------------------
def
__init__
(
self
,
cl
,
name
):
PyTango
.
Device_4Impl
.
__init__
(
self
,
cl
,
name
)
BCS
.
init_device
(
self
)
#------------------------------------------------------------------
# Device destructor
#------------------------------------------------------------------
def
delete_device
(
self
):
#print "[Device delete_device method] for device",self.get_name()
if
self
.
connectionOk
:
self
.
s
.
close
()
del
self
.
s
#------------------------------------------------------------------
# Device initialization
#------------------------------------------------------------------
def
init_device
(
self
):
#print "In ", self.get_name(), "::init_device()"
self
.
set_state
(
PyTango
.
DevState
.
ON
)
self
.
get_device_properties
(
self
.
get_device_class
())
self
.
connectionOk
=
False
self
.
s
=
socket
.
socket
(
socket
.
AF_INET
,
socket
.
SOCK_STREAM
)
self
.
load_db
()
#------------------------------------------------------------------
# Always excuted hook method
#------------------------------------------------------------------
def
always_executed_hook
(
self
):
return
#==================================================================
#
# BCS read/write attribute methods
#
#==================================================================
#------------------------------------------------------------------
# Read Attribute Hardware
#------------------------------------------------------------------
def
read_attr_hardware
(
self
,
data
):
#print "In ", self.get_name(), "::read_attr_hardware()"
return
#------------------------------------------------------------------
# Read Monitoring attribute
#------------------------------------------------------------------
def
read_ConnectionOK
(
self
,
attr
):
#print "In ", self.get_name(), "::read_Monitoring()"
# Add your own code here
attr
.
set_value
(
self
.
connectionOk
)
#==================================================================
#
# BCS command methods
#
#==================================================================
#------------------------------------------------------------------
# WriteRead command:
#
# Description:
# argin:
#------------------------------------------------------------------
def
WriteRead
(
self
,
argin
):
#print "In ", self.get_name(), "::Acquire()"
# Add your own code here
reply
=
None
if
not
self
.
connectToBCS
():
return
reply
try
:
self
.
s
.
sendall
(
argin
+
"
\r\n
"
)
# Sockets from which we expect to read
timeout
=
2.5
readable
=
select
([
self
.
s
],[],[],
timeout
)
if
readable
:
reply
=
self
.
s
.
recv
(
256
)
except
Exception
,
e
:
self
.
failures
+=
1
if
self
.
failures
==
10
:
self
.
connectionOk
=
False
self
.
s
.
close
()
self
.
error_stream
(
"
BCS Communication error %s
"
%
str
(
reply
))
PyTango
.
Except
.
throw_exception
(
"
BCS Communication error
"
,
e
.
strip
(),
"
readUesFromBCS
"
)
return
reply
.
split
(
"
\r\n
"
)[
0
]
#==================================================================
#
# BCSClass class utilities
#
#==================================================================
#------------------------------------------------------------------
# readFromBCS attribute
#------------------------------------------------------------------
def
readFromBCS
(
self
,
attr
):
# Add your own code here
bcs_full_cmd
=
"
GET
"
+
self
.
BCS_cmds
[
attr
.
get_name
()]
reply
=
self
.
WriteRead
(
bcs_full_cmd
)
tokens
=
reply
.
strip
().
split
(
"
,
"
)
if
len
(
tokens
)
!=
5
:
self
.
error_stream
(
"
BCS Communication error %s
"
%
str
(
reply
))
PyTango
.
Except
.
throw_exception
(
"
BCS Communication error
"
,
reply
,
"
readFromBCS
"
)
if
attr
.
get_data_type
()
==
PyTango
.
DevLong
:
value
=
int
(
tokens
[
3
])
else
:
value
=
float
(
tokens
[
3
])
attr
.
set_value
(
value
)
#------------------------------------------------------------------
# writeToBCS attribute
#------------------------------------------------------------------
def
writeToBCS
(
self
,
attr
):
# Add your own code here
value_in
=
attr
.
get_write_value
()
if
attr
.
get_max_dim_x
()
>
1
:
bcs_full_cmd
=
"
SET
"
+
(
self
.
BCS_cmds
[
attr
.
get_name
()]
%
tuple
(
value_in
))
else
:
bcs_full_cmd
=
"
SET
"
+
(
self
.
BCS_cmds
[
attr
.
get_name
()]
%
(
value_in
))
reply
=
self
.
WriteRead
(
bcs_full_cmd
)
if
not
"
OK
"
in
reply
:
self
.
error_stream
(
"
BCS Communication error %s
"
%
str
(
reply
))
PyTango
.
Except
.
throw_exception
(
"
BCS Communication error
"
,
reply
,
"
writeToBCS
"
)
#------------------------------------------------------------------
# readUesFromBCS attribute
#------------------------------------------------------------------
def
readUesFromBCS
(
self
,
attr
):
# Add your own code here
bcs_full_cmd
=
"
GETUES
"
+
self
.
BCS_cmds
[
attr
.
get_name
()]
reply
=
self
.
WriteRead
(
bcs_full_cmd
)
tokens
=
reply
.
strip
().
split
(
"
,
"
)
if
len
(
tokens
)
!=
2
:
self
.
error_stream
(
"
BCS Communication error %s
"
%
str
(
reply
))
yTango
.
Except
.
throw_exception
(
"
BCS Communication error
"
,
reply
,
"
readUesFromBCS
"
)
value
=
int
(
tokens
[
0
])
attr
.
set_value
(
value
)
#------------------------------------------------------------------
# writedUesToBCS attribute
#------------------------------------------------------------------
def
writedUesToBCS
(
self
,
attr
):
# Add your own code here
value_in
=
attr
.
get_write_value
()
bcs_full_cmd
=
"
UES %s %d
"
%
(
self
.
BCS_cmds
[
attr
.
get_name
()],
value_in
)
reply
=
self
.
WriteRead
(
bcs_full_cmd
)
if
not
"
OK
"
in
reply
:
self
.
error_stream
(
"
BCS Communication error %s
"
%
str
(
reply
))
PyTango
.
Except
.
throw_exception
(
"
BCS Communication error
"
,
reply
,
"
writedUesToBCS
"
)
#------------------------------------------------------------------
# readFromBCS attribute
#------------------------------------------------------------------
def
connectToBCS
(
self
):
try
:
if
not
self
.
connectionOk
:
self
.
failures
=
0
self
.
info_stream
(
"
BCS connection
"
)
self
.
s
.
connect
((
self
.
Bcs_server
,
self
.
Bcs_port
))
self
.
connectionOk
=
True
self
.
set_state
(
PyTango
.
DevState
.
ON
)
except
Exception
,
e
:
self
.
connectionOk
=
False
self
.
set_state
(
PyTango
.
DevState
.
FAULT
)
return
self
.
connectionOk
#------------------------------------------------------------------
# load_db:
#
# Description:
# argin:
#------------------------------------------------------------------
def
load_db
(
self
):
self
.
BCS_cmds
=
{}
if
self
.
Use_local_file_DB
:
fo
=
open
(
self
.
Bcs_db
[
0
],
"
r
"
)
self
.
Bcs_db
=
fo
.
readlines
()
fo
.
close
()
self
.
Bcs_db
=
filter
(
None
,
self
.
Bcs_db
)
UES_lines
=
[]
GET_lines
=
[]
SET_lines
=
[]
section
=
None
UES_idx
=
self
.
Bcs_db
.
index
(
"
#UES
"
)
GET_idx
=
self
.
Bcs_db
.
index
(
"
#GET
"
)
SET_idx
=
self
.
Bcs_db
.
index
(
"
#SET
"
)
#
if
UES_idx
==
min
(
UES_idx
,
GET_idx
,
SET_idx
):
UES_list_full
=
self
.
Bcs_db
[
UES_idx
+
1
:
min
(
GET_idx
,
SET_idx
)]
elif
UES_idx
==
max
(
UES_idx
,
GET_idx
,
SET_idx
):
UES_list_full
=
self
.
Bcs_db
[
UES_idx
+
1
:]
else
:
UES_list_full
=
self
.
Bcs_db
[
UES_idx
+
1
:
max
(
GET_idx
,
SET_idx
)]
#
if
GET_idx
==
min
(
UES_idx
,
GET_idx
,
SET_idx
):
GET_list_full
=
self
.
Bcs_db
[
GET_idx
+
1
:
min
(
UES_idx
,
SET_idx
)]
elif
GET_idx
==
max
(
UES_idx
,
GET_idx
,
SET_idx
):
GET_list_full
=
self
.
Bcs_db
[
GET_idx
+
1
:]
else
:
GET_list_full
=
self
.
Bcs_db
[
GET_idx
+
1
:
max
(
UES_idx
,
SET_idx
)]
#
if
SET_idx
==
min
(
UES_idx
,
GET_idx
,
SET_idx
):
SET_list_full
=
self
.
Bcs_db
[
SET_idx
+
1
:
min
(
UES_idx
,
GET_idx
)]
elif
SET_idx
==
max
(
UES_idx
,
GET_idx
,
SET_idx
):
SET_list_full
=
self
.
Bcs_db
[
SET_idx
+
1
:]
else
:
SET_list_full
=
self
.
Bcs_db
[
SET_idx
+
1
:
max
(
UES_idx
,
GET_idx
)]
# UES Sub-lists of 3 lines
UES_list
=
[
UES_list_full
[
i
:
i
+
3
]
for
i
in
range
(
0
,
len
(
UES_list_full
),
3
)]
# GET Sub-lists of 3 lines
GET_list
=
[
GET_list_full
[
i
:
i
+
3
]
for
i
in
range
(
0
,
len
(
GET_list_full
),
3
)]
# SET Sub-lists of different numbero of lines
count
=
0
SET_list
=
[]
is_new
=
True
tmp_list
=
None
for
line
in
SET_list_full
:
if
(
is_new
)
and
(
"
,
"
not
in
line
):
if
tmp_list
:
SET_list
.
append
(
tmp_list
)
tmp_list
=
[
line
]
is_new
=
False
continue
tmp_list
.
append
(
line
)
if
(
"
,
"
in
line
):
is_new
=
True
if
tmp_list
:
SET_list
.
append
(
tmp_list
)
#
for
getpoint
in
GET_list
:
attrName
=
getpoint
[
0
]
attrinfo
=
getpoint
[
2
].
split
(
"
,
"
)
if
attrinfo
[
0
]
==
"
A
"
:
attrtype
=
PyTango
.
ArgType
.
DevFloat
else
:
attrtype
=
PyTango
.
ArgType
.
DevLong
self
.
BCS_cmds
[
attrName
]
=
"
%s
"
%
getpoint
[
1
]
dynAttr
=
PyTango
.
Attr
(
attrName
,
attrtype
)
if
attrinfo
[
1
]
!=
""
:
def_prop
=
PyTango
.
UserDefaultAttrProp
()
def_prop
.
set_unit
(
attrinfo
[
1
])
def_prop
.
set_display_unit
(
attrinfo
[
1
])
def_prop
.
set_standard_unit
(
attrinfo
[
1
])
def_prop
.
set_label
(
attrName
);
dynAttr
.
set_default_properties
(
def_prop
);
a
=
self
.
add_attribute
(
dynAttr
,
self
.
readFromBCS
)
if
(
PyTango
.
__version_info__
[
0
]
==
8
)
and
(
attrinfo
[
-
2
]
==
"
C
"
):
# "Easy" Attribute Polling available only with PyTango 8.x
self
.
poll_attribute
(
attrName
,
int
(
attrinfo
[
-
1
]))
#
for
uespoint
in
UES_list
:
attrName
=
uespoint
[
0
]
attrtype
=
PyTango
.
ArgType
.
DevLong
self
.
BCS_cmds
[
attrName
]
=
uespoint
[
1
]
dynAttr
=
PyTango
.
Attr
(
attrName
,
attrtype
,
PyTango
.
AttrWriteType
.
READ_WRITE
)
a
=
self
.
add_attribute
(
dynAttr
,
self
.
readUesFromBCS
,
self
.
writedUesToBCS
)
if
(
PyTango
.
__version_info__
[
0
]
==
8
)
and
(
attrinfo
[
-
2
]
==
"
C
"
):
# "Easy" Attribute Polling available only with PyTango 8.x
self
.
poll_attribute
(
attrName
,
int
(
attrinfo
[
-
1
]))
#
for
setpoint
in
SET_list
:
attrName
=
setpoint
[
0
]
num_args
=
int
(
setpoint
[
2
])
attrinfo
=
setpoint
[
3
].
split
(
"
,
"
)
if
attrinfo
[
0
]
==
"
A
"
:
attrtype
=
PyTango
.
ArgType
.
DevFloat
else
:
attrtype
=
PyTango
.
ArgType
.
DevLong
self
.
BCS_cmds
[
attrName
]
=
"
%s
"
%
setpoint
[
1
]
for
line
in
setpoint
[
3
:]:
attrinfo
=
line
.
split
(
"
,
"
)
self
.
BCS_cmds
[
attrName
]
+=
"
"
self
.
BCS_cmds
[
attrName
]
+=
attrinfo
[
-
1
]
if
attrinfo
[
0
]
==
"
A
"
:
self
.
BCS_cmds
[
attrName
]
+=
"
,%g
"
else
:
self
.
BCS_cmds
[
attrName
]
+=
"
,%d
"
if
num_args
>
1
:
dynAttr
=
PyTango
.
SpectrumAttr
(
attrName
,
attrtype
,
PyTango
.
AttrWriteType
.
WRITE
,
num_args
)
else
:
dynAttr
=
PyTango
.
Attr
(
attrName
,
attrtype
,
PyTango
.
AttrWriteType
.
WRITE
)
a
=
self
.
add_attribute
(
dynAttr
,
None
,
self
.
writeToBCS
)
#==================================================================
#
# BCSClass class definition
#
#==================================================================
class
BCSClass
(
PyTango
.
DeviceClass
):
# Class Properties
class_property_list
=
{
}
# Device Properties
device_property_list
=
{
'
Bcs_db
'
:
[
PyTango
.
DevVarStringArray
,
"
File or input for the configuration.
"
,
[
"
bcs_db.cfg
"
]
],
'
Bcs_server
'
:
[
PyTango
.
DevString
,
"
BCS IP address or hostname.
"
,
[
"
192.168.30.115
"
]
],
'
Bcs_port
'
:
[
PyTango
.
DevShort
,
"
BCS TCP port.
"
,
[
"
20000
"
]
],
'
Use_local_file_DB
'
:
[
PyTango
.
DevBoolean
,
"
Use cfg file or device property content.
"
,
[
"
0
"
]
],
}
# Command definitions
cmd_list
=
{
'
WriteRead
'
:
[[
PyTango
.
DevString
,
""
],
[
PyTango
.
DevString
,
""
]],
}
# Attribute definitions
attr_list
=
{
'
ConnectionOK
'
:
[[
PyTango
.
DevBoolean
,
PyTango
.
SCALAR
,
PyTango
.
READ
],
{
'
description
'
:
"
Connection status
"
,
}
],
}
#------------------------------------------------------------------
# BCSClass Constructor
#------------------------------------------------------------------
def
__init__
(
self
,
name
):
PyTango
.
DeviceClass
.
__init__
(
self
,
name
)
self
.
set_type
(
name
);
#print "In BCSClass constructor"
#==================================================================
#
# BCS class main method
#
#==================================================================
def
main
():
try
:
py
=
PyTango
.
Util
(
sys
.
argv
)
py
.
add_TgClass
(
BCSClass
,
BCS
,
'
BCS
'
)
U
=
PyTango
.
Util
.
instance
()
U
.
server_init
()
U
.
server_run
()
except
PyTango
.
DevFailed
,
e
:
print
'
-------> Received a DevFailed exception:
'
,
e
except
Exception
,
e
:
print
'
-------> An unforeseen exception occured....
'
,
e
if
__name__
==
'
__main__
'
:
main
()
This diff is collapsed.
Click to expand it.
Preview
0%
Try again
or
attach a new file
.
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Save comment
Cancel
Please
register
or
sign in
to comment