Skip to content
Snippets Groups Projects
PilatusXM.py 59.1 KiB
Newer Older
#!/usr/bin/env python
# -*- coding:utf-8 -*-


# ############################################################################
#  license :
# ============================================================================
#
#  File :        PilatusXM.pyf
#
#  Project :     Interface class for the Pilatus detectors
#
# This file is part of Tango device class.
# 
# Tango is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# 
# Tango is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
# 
# You should have received a copy of the GNU General Public License
# along with Tango.  If not, see <http://www.gnu.org/licenses/>.
# 
#
#  $Author :      sci.comp$
#
#  $Revision :    $
#
#  $Date :        $
#
#  $HeadUrl :     $
# ============================================================================
#            This file is generated by POGO
#     (Program Obviously used to Generate tango Object)
# ############################################################################

__all__ = ["PilatusXM", "PilatusXMClass", "main"]

__docformat__ = 'restructuredtext'

import PyTango
import sys
# Add additional import
#----- PROTECTED REGION ID(PilatusXM.additionnal_import) ENABLED START -----#
import socket
import traceback
import time
import os
END_MESSAGE = chr(0x18)
DBUG = False

class CommunicationClass():
 
    def __init__(self, ipaddr_in):
        self.ipaddr_in = ipaddr_in
        self.socket = None
        self._stop = False
        self.connect()
 
    def disconnect(self):
         try:
             if self.socket is not None:
                 self.socket.close()
         except:
             self.socket = None

    def connect(self):
         try:
             if self.socket is not None:
                 self.socket.close()
         except:
             self.socket = None
         self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
         self.socket.settimeout(0.1)
         port = 41234
         host = self.ipaddr_in
         try:
             self.socket.connect((host,port))
         except:
             self.socket = None
         
    def send_to_detector(self, msg, wait_reply = True):
        # check state, Pilatus won't responde if it's RUNNING
        # only STOP command can be sent in this case
        self.socket.sendall(msg + END_MESSAGE)
        t0 = time.time()
        if DBUG:
            print ("Sent", msg)
        if not wait_reply:
            return
        if "CamSetup" in msg:
            mandatory_code = "2 OK"
        else:
            mandatory_code = ""
        try:
            reply = self.get_detector_reply(timeout = 2, wait_string = mandatory_code)
        except:
            reply = ''
        if reply is None:
            self.disconnect()
            return None
        return reply.split(END_MESSAGE)[0]

    def get_detector_reply(self, timeout = 2, wait_string = ''):
        t0 = time.time()
        reply = ''
        while (END_MESSAGE not in reply) or (wait_string not in reply):
            if (time.time() - t0) > timeout:
                break
            tmpreply = self.socket.recv(1024).strip()
            reply += tmpreply
        if DBUG:
            print ("Received", reply)
        if END_MESSAGE not in reply:
            reply = None
        return reply

class fakeAttr():

    def __init__(self):
        self.value = None

    def set_value(self,value_in):
        self.value = value_in

    def get_write_value(self):
        return self.value
        
#----- PROTECTED REGION END -----#    //    PilatusXM.additionnal_import

# Device States Description
# ON : The detecor is ready to take images.
# DISABLE : The device is disconnected from the camserver.
# RUNNING : An acquisition is running.
# FAULT : An error occured on the detector during an acquisition.


class PilatusXM (PyTango.LatestDeviceImpl):
    """Pilatus detectors are a series pixel detecors build by DECTRIS
    <br />
    http://www.dectris.com.
    <br />
    All detectors of this series can talk to the outside world via a socket
    connection. An ASCI protocol is used on this sockect connection to
    communicate with the detector.
    <p>
    The server process which handles the socket on the detecor PC is
    called camserver. Only one client can commumicate with camserver.
    If the native client tvx is connected, the device server cannot connect until
    tvx gets disconnected.
    </p>"""
    
    # -------- Add you global variables here --------------------------
    #----- PROTECTED REGION ID(PilatusXM.global_variables) ENABLED START -----#
    
    #----- PROTECTED REGION END -----#    //    PilatusXM.global_variables

    def __init__(self, cl, name):
        PyTango.LatestDeviceImpl.__init__(self,cl,name)
        self.debug_stream("In __init__()")
        PilatusXM.init_device(self)
        #----- PROTECTED REGION ID(PilatusXM.__init__) ENABLED START -----#
        
        #----- PROTECTED REGION END -----#    //    PilatusXM.__init__
        
    def delete_device(self):
        self.debug_stream("In delete_device()")
        #----- PROTECTED REGION ID(PilatusXM.delete_device) ENABLED START -----#
        
        #----- PROTECTED REGION END -----#    //    PilatusXM.delete_device

    def init_device(self):
        self.debug_stream("In init_device()")
        self.get_device_properties(self.get_device_class())
        self.attr_ExposureTime_read = 0.0
        self.attr_ExposurePeriod_read = 0.0
        self.attr_NbFrames_read = 0
        self.attr_NbExposures_read = 0
        self.attr_DelayTime_read = 0.0
        self.attr_ShutterEnable_read = False
        self.attr_TriggerMode_read = 0
        self.attr_UseRamDisk_read = False
        self.attr_FileDir_read = ""
        self.attr_FilePrefix_read = ""
        self.attr_FileStartNum_read = 0
        self.attr_FilePostfix_read = ""
        self.attr_LastImageTaken_read = ""
        self.attr_Energy_read = 0
        self.attr_Threshold_read = 0
        self.attr_Gain_read = 0
        self.attr_LastImagePath_read = ""
        self.attr_MxSettings_read = ""
        self.attr_GapFill_read = 0
        self.attr_LdBadPixMap_read = ""
        #----- PROTECTED REGION ID(PilatusXM.init_device) ENABLED START -----#
        self.attr_FilePrefix_read = 'test'
Loading full blame...