Commit 0de47fa5 authored by Roberto Borghes's avatar Roberto Borghes
Browse files

Added demo script (provided by Arinax)

parent 0d275ffa
#!/usr/bin/env python
import sys
sys.path.append(sys.path[0]+'/arinaxbzoom')
from ArinaxBzoom import main
if __name__ == '__main__':
main()
#!/usr/bin/env python
# -*- coding:utf-8 -*-
# ############################################################################
# license :
# ============================================================================
#
# File : ArinaxBzoom.py
#
# Project : Arinax B-zoom
#
# This file is part of Tango device class.
#
# Tango is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# Tango is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with Tango. If not, see <http://www.gnu.org/licenses/>.
#
#
# $Author : sci.comp$
#
# $Revision : $
#
# $Date : $
#
# $HeadUrl : $
# ============================================================================
# This file is generated by POGO
# (Program Obviously used to Generate tango Object)
# ############################################################################
__all__ = ["ArinaxBzoom", "ArinaxBzoomClass", "main"]
__docformat__ = 'restructuredtext'
import PyTango
import sys
# Add additional import
#----- PROTECTED REGION ID(ArinaxBzoom.additionnal_import) ENABLED START -----#
import traceback
import struct
from PIL import Image
import numpy
class TangoTestClient():
attributes = {'camera_name': None,
'camera_[*]_pixelsize': None,
'image_[*]_width': None,
'image_[*]_height': None,
'video_[*]_exposure': [2000, 50000, 22200],
'video_[*]_exposure_range': None,
'video_[*]_gain': [0.1, 0.2, 0.5],
'video_[*]_gain_range': None,
'video_[*]_scale': None,
'num_zoom_levels': None,
'video_zoom_idx': 5,
'video_zoom_percent': 0.6,
'video_predefined_zoom_list': None,
'video_zoom_list': None,
'video_[*]_mode': ['RGB8Packed', 'BayerBG8', 'BayerRG8'],
'video_[*]_white_balance': ['Continuous', 'Auto', 'Off'],
'video_[*]_white_balance_red': [0.1, 0.15, 0.5],
'video_[*]_white_balance_blue': [0.5, 0.2, 0.1],
'width_magnification_ratio': None,
'height_magnification_ratio': None,
'width_optic_center_offset': None,
'height_optic_center_offset': None,
'video_live': None,
'video_[*]_last_image_counter': None,
'video_[*]_fps': None,
'endianness': None}
# ------------------
header_format = '>IHHqiiHHHH'
header_size = struct.calcsize(header_format)
hybrid = 'bzoom'
first = 'acA2500-x5'
second = 'acA2440-x30'
#----- PROTECTED REGION END -----# // ArinaxBzoom.additionnal_import
# Device States Description
# No states for this device
class ArinaxBzoom (PyTango.Device_4Impl):
"""Arinax B-zoom"""
# -------- Add you global variables here --------------------------
#----- PROTECTED REGION ID(ArinaxBzoom.global_variables) ENABLED START -----#
#----- PROTECTED REGION END -----# // ArinaxBzoom.global_variables
def __init__(self, cl, name):
PyTango.Device_4Impl.__init__(self,cl,name)
self.debug_stream("In __init__()")
ArinaxBzoom.init_device(self)
#----- PROTECTED REGION ID(ArinaxBzoom.__init__) ENABLED START -----#
#----- PROTECTED REGION END -----# // ArinaxBzoom.__init__
def delete_device(self):
self.debug_stream("In delete_device()")
#----- PROTECTED REGION ID(ArinaxBzoom.delete_device) ENABLED START -----#
#----- PROTECTED REGION END -----# // ArinaxBzoom.delete_device
def init_device(self):
self.debug_stream("In init_device()")
self.get_device_properties(self.get_device_class())
self.attr_Image_read = [[0]]
#----- PROTECTED REGION ID(ArinaxBzoom.init_device) ENABLED START -----#
try:
self.__device = PyTango.DeviceProxy(self.bzoom_tango_url)
self.__device.ping()
self.set_state(PyTango.DevState.ON)
except:
traceback.print_exc()
self.set_state(PyTango.DevState.FAULT)
#----- PROTECTED REGION END -----# // ArinaxBzoom.init_device
def always_executed_hook(self):
self.debug_stream("In always_excuted_hook()")
#----- PROTECTED REGION ID(ArinaxBzoom.always_executed_hook) ENABLED START -----#
#----- PROTECTED REGION END -----# // ArinaxBzoom.always_executed_hook
# -------------------------------------------------------------------------
# ArinaxBzoom read/write attribute methods
# -------------------------------------------------------------------------
def read_Image(self, attr):
self.debug_stream("In read_Image()")
#----- PROTECTED REGION ID(ArinaxBzoom.Image_read) ENABLED START -----#
img_data = self.__device.video_last_image
_, _, _, frame_number, width, height, _, _, _, _ = \
struct.unpack(TangoTestClient.header_format, img_data[1][:TangoTestClient.header_size])
raw = img_data[1][TangoTestClient.header_size:]
img = Image.frombytes(mode='RGB', size=(width, height), data=raw)
#self.attr_Image_read = numpy.array(img.getdata()).reshape(width, height, 3)
self.attr_Image_read = numpy.array(img.convert("P"))
attr.set_value(self.attr_Image_read)
#----- PROTECTED REGION END -----# // ArinaxBzoom.Image_read
def read_attr_hardware(self, data):
self.debug_stream("In read_attr_hardware()")
#----- PROTECTED REGION ID(ArinaxBzoom.read_attr_hardware) ENABLED START -----#
#----- PROTECTED REGION END -----# // ArinaxBzoom.read_attr_hardware
# -------------------------------------------------------------------------
# ArinaxBzoom command methods
# -------------------------------------------------------------------------
#----- PROTECTED REGION ID(ArinaxBzoom.programmer_methods) ENABLED START -----#
#----- PROTECTED REGION END -----# // ArinaxBzoom.programmer_methods
class ArinaxBzoomClass(PyTango.DeviceClass):
# -------- Add you global class variables here --------------------------
#----- PROTECTED REGION ID(ArinaxBzoom.global_class_variables) ENABLED START -----#
#----- PROTECTED REGION END -----# // ArinaxBzoom.global_class_variables
# Class Properties
class_property_list = {
}
# Device Properties
device_property_list = {
'bzoom_tango_url':
[PyTango.DevString,
"Arinax Tango server complete URL",
[] ],
}
# Command definitions
cmd_list = {
}
# Attribute definitions
attr_list = {
'Image':
[[PyTango.DevLong,
PyTango.IMAGE,
PyTango.READ, 3000, 3000]],
}
def main():
try:
py = PyTango.Util(sys.argv)
py.add_class(ArinaxBzoomClass, ArinaxBzoom, 'ArinaxBzoom')
#----- PROTECTED REGION ID(ArinaxBzoom.add_classes) ENABLED START -----#
#----- PROTECTED REGION END -----# // ArinaxBzoom.add_classes
U = PyTango.Util.instance()
U.server_init()
U.server_run()
except PyTango.DevFailed as e:
print ('-------> Received a DevFailed exception:', e)
except Exception as e:
print ('-------> An unforeseen exception occured....', e)
if __name__ == '__main__':
main()
#!/usr/local/bin/python3
"""
This script tests the video server when using the Tango protocol.
It reads some of the attributes and displays them. It also tests image retrieval from the video server
Usage:
* Options:
+ host: IP address of the machine running the Tango server
+ port: communication port used by the video server's protocol
+ hybrid: name of the hybrid (bzoom) camera. Default is bzoom
+ first: name of the large field camera of the bzoom, defined in videoserver's config file (VideoStreamer.ini)
+ second: name of the small field camera of the bzoom
+ capture: boolean (True or False) indicating if images should be captured
+ write_image: boolean indicating if captured images should be stored. If true, capture is also true
./TestTango.py --host=... --hybrid=... --first=... --second=... --capture=... --write_image=...
Author:
Johann Fotsing, ARINAX - 25/10/2021
"""
import os
import sys
import time
import struct
import PyTango
import datetime
from PIL import Image
from PyTango import DeviceProxy
# IP ADDRESS :
IP = '10.3.52.30'
class TangoTestClient():
attributes = {'camera_name': None,
'camera_[*]_pixelsize': None,
'image_[*]_width': None,
'image_[*]_height': None,
'video_[*]_exposure': [2000, 50000, 22200],
'video_[*]_exposure_range': None,
'video_[*]_gain': [0.1, 0.2, 0.5],
'video_[*]_gain_range': None,
'video_[*]_scale': None,
'num_zoom_levels': None,
'video_zoom_idx': 5,
'video_zoom_percent': 0.6,
'video_predefined_zoom_list': None,
'video_zoom_list': None,
'video_[*]_mode': ['RGB8Packed', 'BayerBG8', 'BayerRG8'],
'video_[*]_white_balance': ['Continuous', 'Auto', 'Off'],
'video_[*]_white_balance_red': [0.1, 0.15, 0.5],
'video_[*]_white_balance_blue': [0.5, 0.2, 0.1],
'width_magnification_ratio': None,
'height_magnification_ratio': None,
'width_optic_center_offset': None,
'height_optic_center_offset': None,
'video_live': None,
'video_[*]_last_image_counter': None,
'video_[*]_fps': None,
'endianness': None}
# ------------------
header_format = '>IHHqiiHHHH'
header_size = struct.calcsize(header_format)
img_dir = 'TestTangoImages'
num_acq = 25
num_write_img = 25
# ------------------
log_file = 'test_server_tango.log'
separator = '-' * 10
pre_header = '# '
post_header = ' #'
# ------------------
default_args = {'host': IP,
'port': 9999,
'hybrid': 'bzoom',
'first': 'acA2500-x5',
'second': 'acA2440-x30',
'capture': False,
'write_image': False}
def __init__(self, args):
self.__host = args['host']
self.__port = args['port']
self.__cameras = [args['hybrid'], args['first'], args['second']]
self.__capture = args['capture']
self.__write_image = args['write_image']
self.__zoomLevels = None
self.__device = None
def __init(self):
"""
This function initializes the Tango test client. It creates a device proxy to interact with the videoserver's
Tango device
:rtype: bool
:return: A boolean indicating if the test client was correctly initialized
"""
try:
self.__device = DeviceProxy('tango://%s:%d/md/oav/bzoom#dbase=no' % (self.__host, self.__port))
self.__device.ping()
except PyTango.DevFailed as traceback:
print(traceback[-1])
return False
return True
def __read_attribute(self, name, cam_idx=0):
"""
This function reads the value of an attribute of one of the cameras of the video server.
:param str name: attribute's name
:param int cam_idx: index of the camera which attribute is read
:rtype: int | str | float | list
:return: attribute's value (str, int, float or list)
"""
return self.__device.read_attribute(name.replace('[*]', self.__cameras[cam_idx])).value
def __write_attribute(self, name, value, cam_idx=0):
"""
This function sets a specific camera's attribute to a desired value.
:param str name: name of the attribute
:param str | int | float value: value to set to the attribute
:param int cam_idx: index of the camera which attribute is modified
:rtype: bool
:return: True if write successful, false otherwise
"""
self.__device.write_attribute(name.replace('[*]', self.__cameras[cam_idx]), value)
@staticmethod
def __output(line):
"""
This function logs some information from the test running. Text is printed in Terminal and in log file.
:param str line: line to be logged.
:return: None
"""
print(line)
with open(TangoTestClient.log_file, 'a') as out:
out.write(line + '\n')
out.close()
@staticmethod
def __generate_header(txt):
"""
This function decorates text to be displayed as a header.
:param txt: Text to be decorated
:return: None
"""
n = len(txt) + len(TangoTestClient.pre_header) + len(TangoTestClient.post_header)
return '*' * n + '\n' + TangoTestClient.pre_header + txt + TangoTestClient.post_header + '\n' + '*' * n + '\n'
def __time_tag(self, tag):
"""
This function displays a time tag in the log file of this test. Mainly used to indicate start and end of test.
:param str tag: A meaningful description for the tag.
:return: None
"""
time_tag = '\n' + '#' * 5 + ' {} :: '.format(tag) + \
datetime.datetime.now().strftime('%c') + \
' VideoServer tests with Tango ' + '#' * 5 + '\n'
self.__output(time_tag)
return
def __check_attribute(self, name, value):
"""
The check performed on writable attributes consists in writing an attribute and reading back its value.
The initial value of the attribute is saved and restored after the test is performed.
:param str name: Name of the attribute to be tested
:param str | int | float value: value to write in attribute
:return: None
"""
init_value = self.__device.read_attribute(name).value
ok = 'OK' if self.__device.write_attribute(name, value) \
and self.__device.read_attribute(name).value == value else 'NOK'
self.__device.write_attribute(name, init_value)
line = 'Set attribute %s to value %s %s' % (name, value, ok)
self.__output(line)
return
def __poll_image(self):
"""
This function is used to retrieve one image from the Tango video server.
:rtype tuple(list[Number], int, int):
:return: Raw image as retrieved with its dimensions (width and length).
"""
img_data = self.__device.video_last_image
_, _, _, frame_number, width, height, _, _, _, _ = \
struct.unpack(TangoTestClient.header_format, img_data[1][:TangoTestClient.header_size])
raw = img_data[1][TangoTestClient.header_size:]
return raw, width, height, frame_number
def __log_all_attributes(self):
"""
This function reads all attributes available in the Tango video server and displays them with their values.
:return: None
"""
# Generate header
self.__output(self.__generate_header('Logging all attributes'))
for attr in TangoTestClient.attributes:
self.__output(TangoTestClient.separator)
if '[*]' not in attr:
lines = ['%s = %s' % (attr, self.__read_attribute(attr))]
else:
lines = ['%s = %s' % (attr.replace('[*]', cam), self.__read_attribute(attr, i))
for i, cam in enumerate(self.__cameras)]
for line in lines:
self.__output(line)
self.__output('\n')
return
def __test_attributes(self):
"""
This function is used to test all writable attributes. Tests are better described in self.__check_attribute().
:return: None
"""
# Generate header
self.__output(self.__generate_header('Testing writable attributes'))
# Start testing
for attr in TangoTestClient.attributes:
test_values = TangoTestClient.attributes[attr]
if test_values is not None:
self.__output(TangoTestClient.separator)
if '[*]' not in attr:
assert not isinstance(test_values, list)
self.__check_attribute(attr, test_values)
else:
for i, cam in enumerate(self.__cameras):
self.__check_attribute(attr.replace('[*]', cam), test_values[i])
self.__output('\n')
return
def __test_images(self):
"""
This function is used to test image retrieval from the video server.
Based on the capture and write_image options, images will be polled and stored in TangoTestClient.img_dir
:return: None
"""
if not self.__capture:
self.__output(self.__generate_header('No image capture.'))
return
self.__output(self.__generate_header('Testing video streamer'))
# Prepare image and time counters
last_img_time = time.time()
img_counter = 0
# Create image directory
if self.__write_image:
if not os.path.exists(TangoTestClient.img_dir):
os.mkdir(TangoTestClient.img_dir)
# Enter a loop to read images
tango_last_img_counter = self.__device.video_last_image_counter
while self.__capture and img_counter < TangoTestClient.num_acq:
if self.__device.video_last_image_counter == tango_last_img_counter:
continue
tango_last_img_counter = self.__device.video_last_image_counter
raw, width, height, frame_number = self.__poll_image()
if self.__write_image:
img = Image.frombytes(mode='RGB', size=(width, height), data=raw)
img.convert('1')
img.save("./%s/image%d.bmp" % (TangoTestClient.img_dir, img_counter % TangoTestClient.num_write_img))
img_counter += 1
current_time = time.time()
process_time = current_time - last_img_time
last_img_time = current_time
fps = self.__read_attribute('video_fps')
fps_calc = float(1. / process_time)
line = "Image number: %2d, Frame number: %d, FPS: %6.3f, Server FPS: %6.3f, Process Time: %6.3fms" % \
(img_counter, frame_number, fps_calc, fps, process_time * 1000)
self.__output(line)
return
def run(self):
"""
This function is called to run the test client.
:return: None
"""
if not self.__init():
print('Test client could not be initialized, test is aborted.')
return
self.__time_tag('Start')
self.__log_all_attributes()
# self.__test_attributes()
self.__test_images()
self.__time_tag('End')
return
def parse_test_args():
"""
This function parses the arguments given by the user when launching the scripts:
--host=...
--port=...
etc. see TangoTestClient.default_args dictionary for all available arguments
:rtype: dict[str, Any]
:return: A dictionary mapping argument names to their values input by user
"""
args = TangoTestClient.default_args
if len(sys.argv) > 1:
for arg_exp in sys.argv[1:]:
[arg_name, arg_value] = arg_exp.split('=')
arg_name = arg_name.replace('-', '')
if arg_name == 'port':
arg_value = int(arg_value)
elif arg_name == 'capture' or arg_name == 'write_image':
arg_value = bool(arg_value)
args[arg_name] = arg_value
if args['write_image'] is True:
args['capture'] = True
return args
def main():
"""
Main function to be executed by this script.
:return:
"""
test_client = TangoTestClient(parse_test_args())
test_client.run()
exit(0)
if __name__ == '__main__':
main()
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment