Source code for sardana.taurus.core.tango.sardana.pool

#!/usr/bin/env python

##############################################################################
##
# This file is part of Sardana
##
# http://www.sardana-controls.org/
##
# Copyright 2011 CELLS / ALBA Synchrotron, Bellaterra, Spain
##
# Sardana is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
##
# Sardana is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Lesser General Public License for more details.
##
# You should have received a copy of the GNU Lesser General Public License
# along with Sardana.  If not, see <http://www.gnu.org/licenses/>.
##
##############################################################################

"""The device pool submodule.
It contains specific part of sardana device pool"""


import collections

__all__ = ["InterruptException", "StopException", "AbortException",
           "BaseElement", "ControllerClass", "ControllerLibrary",
           "PoolElement", "Controller", "ComChannel", "ExpChannel",
           "CTExpChannel", "ZeroDExpChannel", "OneDExpChannel",
           "TwoDExpChannel", "PseudoCounter", "Motor", "PseudoMotor",
           "MotorGroup", "TriggerGate",
           "MeasurementGroup", "IORegister", "Instrument", "Pool",
           "registerExtensions", "getChannelConfigs"]

__docformat__ = 'restructuredtext'

import copy
import operator
import os
import sys
import time
import traceback
import weakref
import numpy

import PyTango

from PyTango import DevState, AttrDataFormat, AttrQuality, DevFailed, \
    DeviceProxy
from taurus import Factory, Device, Attribute
from taurus.core.taurusbasetypes import TaurusEventType

from taurus.core.tango.tangovalidator import TangoAttributeNameValidator
from taurus.core.util.log import Logger
from taurus.core.util.codecs import CodecFactory
from taurus.core.util.containers import CaselessDict
from taurus.core.util.event import EventGenerator, AttributeEventWait, \
    AttributeEventIterator
from taurus.core.tango import TangoDevice, FROM_TANGO_TO_STR_TYPE

from sardana import sardanacustomsettings
from .sardana import BaseSardanaElementContainer, BaseSardanaElement
from .motion import Moveable, MoveableSource

Ready = Standby = DevState.ON
Counting = Acquiring = Moving = DevState.MOVING
Alarm = DevState.ALARM
Fault = DevState.FAULT

CHANGE_EVT_TYPES = TaurusEventType.Change, TaurusEventType.Periodic

MOVEABLE_TYPES = 'Motor', 'PseudoMotor', 'MotorGroup'

QUALITY = {
    AttrQuality.ATTR_VALID: 'VALID',
    AttrQuality.ATTR_INVALID: 'INVALID',
    AttrQuality.ATTR_CHANGING: 'CHANGING',
    AttrQuality.ATTR_WARNING: 'WARNING',
    AttrQuality.ATTR_ALARM: 'ALARM',
    None: 'UNKNOWN'
}


def _is_referable(channel):
    # Equivalent to ExpChannel.isReferable.
    # Use DeviceProxy instead of taurus to avoid crashes in Py3
    # See: tango-controls/pytango#292
    if isinstance(channel, str):
        channel = DeviceProxy(channel)
    return "valueref" in list(map(str.lower, channel.get_attribute_list()))


class InterruptException(Exception):
    pass


class StopException(InterruptException):
    pass


class AbortException(InterruptException):
    pass


[docs]class BaseElement(object): """ The base class for elements in the Pool (Pool itself, Motor, ControllerClass, ExpChannel all should inherit from this class directly or indirectly) """ def __repr__(self): pd = self.getPoolData() return "{0}({1})".format(pd['type'], pd['full_name']) def __str__(self): return self.getName()
[docs] def serialize(self): return self.getPoolData()
[docs] def str(self, n=0): """Returns a sequence of strings representing the object in 'consistent' way. Default is to return <name>, <controller name>, <axis> :param n: the number of elements in the tuple.""" if n == 0: return CodecFactory.encode(('json'), self.serialize()) return self._str_tuple[:n]
def __lt__(self, o): return self.getPoolData()['full_name'] < o.getPoolData()['full_name']
[docs] def getName(self): return self.getPoolData()['name']
[docs] def getPoolObj(self): """Get reference to this object's Pool.""" return self._pool_obj
[docs] def getPoolData(self): """Get reference to this object's Pool data.""" return self._pool_data
[docs]class ControllerClass(BaseElement): def __init__(self, **kw): self.__dict__.update(kw) self.path, self.f_name = os.path.split(self.file_name) self.lib_name, self.ext = os.path.splitext(self.f_name) def __repr__(self): pd = self.getPoolData() return "ControllerClass({0})".format(pd['full_name'])
[docs] def getSimpleFileName(self): return self.f_name
[docs] def getFileName(self): return self.file_name
[docs] def getClassName(self): return self.getName()
[docs] def getType(self): return self.getTypes()[0]
[docs] def getTypes(self): return self.types
[docs] def getLib(self): return self.f_name
[docs] def getGender(self): return self.gender
[docs] def getModel(self): return self.model
[docs] def getOrganization(self): return self.organization
def __lt__(self, o): if self.getType() != o.getType(): return self.getType() < o.getType() if self.getGender() != o.getGender(): return self.getGender() < o.getGender() return self.getClassName() < o.getClassName()
class ControllerLibrary(BaseElement): def __init__(self, **kw): self.__dict__.update(kw) def getType(self): return self.getTypes()[0] def getTypes(self): return self.type class TangoAttributeEG(Logger, EventGenerator): """An event generator for a 'State' attribute""" def __init__(self, attr): self._attr = attr self.call__init__(Logger, 'EG', attr) event_name = '%s EG' % (attr.getParentObj().getNormalName()) self.call__init__(EventGenerator, event_name) self._attr.addListener(self) def getAttribute(self): return self._attr def eventReceived(self, evt_src, evt_type, evt_value): """Event handler from Taurus""" if evt_type not in CHANGE_EVT_TYPES: return if evt_value is None: v = None else: v = evt_value.value EventGenerator.fireEvent(self, v) def read(self, force=False): try: self.last_val = self._attr.read(cache=not force).value except: self.error("Read error") self.debug("Details:", exc_info=1) self.last_val = None return EventGenerator.read(self) def readValue(self, force=False): r = self.read(force=force) if r is None: # do a retry r = self.read(force=force) return r def write(self, value): self._attr.write(value, with_read=False) def __getattr__(self, name): return getattr(self._attr, name) def reservedOperation(fn): def new_fn(*args, **kwargs): self = args[0] wr = self.getReservedWR() if wr is not None: if wr().isStopped(): raise StopException("stopped before calling %s" % fn.__name__) elif wr().isAborted(): raise AbortException("aborted before calling %s" % fn.__name__) try: return fn(*args, **kwargs) except: print("Exception occurred in reserved operation:" " clearing events...") self._clearEventWait() raise return new_fn def get_pool_for_device(db, device): server_devs = db.get_device_class_list(device.info().server_id) for dev_name, klass_name in zip(server_devs[0::2], server_devs[1::2]): if klass_name == "Pool": return Device(dev_name)
[docs]class PoolElement(BaseElement, TangoDevice): """Base class for a Pool element device.""" def __init__(self, name, **kwargs): """PoolElement initialization.""" self._reserved = None self._evt_wait = None self.__go_start_time = 0 self.__go_end_time = 0 self.__go_time = 0 self._total_go_time = 0 self.call__init__(TangoDevice, name, **kwargs) # dict<string, TangoAttributeEG> # key : the attribute name # value : the corresponding TangoAttributeEG self._attrEG = CaselessDict() # force the creation of a state attribute self.getStateEG() def _find_pool_obj(self): pool = get_pool_for_device(self.getParentObj(), self.getHWObj()) return pool def _find_pool_data(self): pool = self._find_pool_obj() return pool.getElementInfo(self.getFullName())._data # Override BaseElement.getPoolObj because the reference to pool object may # not be filled. This reference is filled when the element is obtained # using Pool.getObject. If one obtain the element directly using Taurus # e.g. mot = taurus.Device(<mot_name>) it won't be filled. In this case # look for the pool object using the database information.
[docs] def getPoolObj(self): try: return self._pool_obj except AttributeError: self._pool_obj = self._find_pool_obj() return self._pool_obj
# Override BaseElement.getPoolData because the reference to pool data may # not be filled. This reference is filled when the element is obtained # using Pool.getPoolData. If one obtain the element directly using Taurus # e.g. mot = taurus.Device(<mot_name>) it won't be filled. In this case # look for the pool object and its data using the database information.
[docs] def getPoolData(self): try: return self._pool_data except AttributeError: self._pool_data = self._find_pool_data() return self._pool_data
[docs] def cleanUp(self): TangoDevice.cleanUp(self) self._reserved = None f = self.factory() attr_map = self._attrEG for attr_name in list(attr_map.keys()): attrEG = attr_map.pop(attr_name) attr = attrEG.getAttribute() attrEG = None f.removeExistingAttribute(attr)
[docs] def reserve(self, obj): if obj is None: self._reserved = None return self._reserved = weakref.ref(obj, self._unreserveCB)
def _unreserveCB(self, obj): self.unreserve()
[docs] def unreserve(self): self._reserved = None
[docs] def isReserved(self, obj=None): if obj is None: return self._reserved is not None else: o = self._reserved() return o == obj
[docs] def getReservedWR(self): return self._reserved
[docs] def getReserved(self): if self._reserved is None: return None return self._reserved()
[docs] def dump_attributes(self): attr_names = self.get_attribute_list() req_id = self.read_attributes_asynch(attr_names) return self.read_attributes_reply(req_id, 2000)
def _getAttrValue(self, name, force=False): attrEG = self._getAttrEG(name) if attrEG is None: return None return attrEG.readValue(force=force) def _getAttrEG(self, name): attrEG = self.getAttrEG(name) if attrEG is None: attrEG = self._createAttribute(name) return attrEG def _createAttribute(self, name): attrObj = self.getAttribute(name) if attrObj is None: self.warning("Unable to create attribute %s" % name) return None, None attrEG = TangoAttributeEG(attrObj) self._attrEG[name] = attrEG return attrEG def _getEventWait(self): if self._evt_wait is None: # create an object that waits for attribute events. # each time we use it we have to connect and disconnect to an # attribute self._evt_wait = AttributeEventWait() return self._evt_wait def _clearEventWait(self): self._evt_wait = None
[docs] def getStateEG(self): return self._getAttrEG('state')
[docs] def getControllerName(self): return self.getControllerObj().name
[docs] def getControllerObj(self): full_ctrl_name = self.getPoolData()['controller'] return self.getPoolObj().getObj(full_ctrl_name, "Controller")
[docs] def getAxis(self): return self.getPoolData()['axis']
[docs] def getType(self): return self.getPoolData()['type']
[docs] def waitReady(self, timeout=None): return self.getStateEG().waitEvent(Moving, equal=False, timeout=timeout)
[docs] def getAttrEG(self, name): """Returns the TangoAttributeEG object""" return self._attrEG.get(name)
[docs] def getAttrObj(self, name): """Returns the taurus.core.tangoattribute.TangoAttribute object""" attrEG = self._attrEG.get(name) if attrEG is None: return None return attrEG.getAttribute()
[docs] def getInstrumentObj(self): return self._getAttrEG('instrument')
[docs] def getInstrumentName(self, force=False): instr_name = self._getAttrValue('instrument', force=force) if not instr_name: return '' # instr_name = instr_name[:instr_name.index('(')] return instr_name
[docs] def setInstrumentName(self, instr_name): self.getInstrumentObj().write(instr_name)
[docs] def getInstrument(self): instr_name = self.getInstrumentName() if not instr_name: return None return self.getPoolObj().getObj("Instrument", instr_name)
[docs] @reservedOperation def start(self, *args, **kwargs): evt_wait = self._getEventWait() evt_wait.connect(self.getAttribute("state")) evt_wait.lock() try: evt_wait.waitEvent(DevState.MOVING, equal=False) self.__go_time = 0 self.__go_start_time = ts1 = time.time() self._start(*args, **kwargs) ts2 = time.time() evt_wait.waitEvent(DevState.MOVING, after=ts1) except: evt_wait.disconnect() raise finally: evt_wait.unlock() ts2 = evt_wait.getRecordedEvents().get(DevState.MOVING, ts2) return (ts2,)
[docs] def waitFinish(self, timeout=None, id=None): """Wait for the operation to finish :param timeout: optional timeout (seconds) :type timeout: float :param id: id of the opertation returned by start :type id: tuple(float) """ # Due to taurus-org/taurus #573 we need to divide the timeout # in two intervals if timeout is not None: timeout = timeout / 2 if id is not None: id = id[0] evt_wait = self._getEventWait() evt_wait.lock() try: evt_wait.waitEvent(DevState.MOVING, after=id, equal=False, timeout=timeout, retries=1) finally: self.__go_end_time = time.time() self.__go_time = self.__go_end_time - self.__go_start_time evt_wait.unlock() evt_wait.disconnect()
[docs] @reservedOperation def go(self, *args, **kwargs): self._total_go_time = 0 start_time = time.time() eid = self.start(*args, **kwargs) timeout = kwargs.get('timeout') self.waitFinish(id=eid, timeout=timeout) self._total_go_time = time.time() - start_time
[docs] def getLastGoTime(self): """Returns the time it took for last go operation""" return self.__go_time
[docs] def getTotalLastGoTime(self): """Returns the time it took for last go operation, including dead time to prepare, wait for events, etc""" return self._total_go_time
[docs] def abort(self, wait_ready=True, timeout=None): state = self.getStateEG() state.lock() try: self.command_inout("Abort") if wait_ready: self.waitReady(timeout=timeout) finally: state.unlock()
[docs] def stop(self, wait_ready=True, timeout=None): state = self.getStateEG() state.lock() try: self.command_inout("Stop") if wait_ready: self.waitReady(timeout=timeout) finally: state.unlock()
[docs] def information(self, tab=' '): msg = self._information(tab=tab) return "\n".join(msg)
def _information(self, tab=' '): indent = "\n" + tab + 10 * ' ' msg = [self.getName() + ":"] try: state_value = self.stateObj.read().rvalue # state_value is DevState enumeration (IntEnum) state = state_value.name.capitalize() except DevFailed as df: if len(df.args): state = df.args[0].desc else: e_info = sys.exc_info()[:2] state = traceback.format_exception_only(*e_info) except: e_info = sys.exc_info()[:2] state = traceback.format_exception_only(*e_info) try: msg.append(tab + " State: " + state) except TypeError: msg.append(tab + " State: " + state[0]) try: e_info = sys.exc_info()[:2] status = self.status() status = status.replace('\n', indent) except DevFailed as df: if len(df.args): status = df.args[0].desc else: e_info = sys.exc_info()[:2] status = traceback.format_exception_only(*e_info) except: e_info = sys.exc_info()[:2] status = traceback.format_exception_only(*e_info) msg.append(tab + " Status: " + status) return msg
[docs]class Controller(PoolElement): """ Class encapsulating Controller functionality.""" def __init__(self, name, **kw): """PoolElement initialization.""" self.call__init__(PoolElement, name, **kw)
[docs] def getModuleName(self): return self.getPoolData()['module']
[docs] def getClassName(self): return self.getPoolData()['klass']
[docs] def getTypes(self): return self.getPoolData()['types']
[docs] def getMainType(self): return self.getPoolData()['main_type']
[docs] def addElement(self, elem): axis = elem.getAxis() self._elems[axis] = elem self._last_axis = max(self._last_axis, axis)
[docs] def removeElement(self, elem): axis = elem.getAxis() del self._elems[elem.getAxis()] if axis == self._last_axis: self._last_axis = max(self._elems)
[docs] def getElementByAxis(self, axis): pool = self.getPoolObj() for _, elem in \ list(pool.getElementsOfType(self.getMainType()).items()): if (elem.controller != self.getFullName() or elem.getAxis() != axis): continue return elem
[docs] def getElementByName(self, name): pool = self.getPoolObj() for _, elem in \ list(pool.getElementsOfType(self.getMainType()).items()): if (elem.controller != self.getFullName() or elem.getName() != name): continue return elem
[docs] def getUsedAxes(self): """Return axes in use by this controller :return: list of axes :rtype: list<int> """ pool = self.getPoolObj() axes = [] for _, elem in \ list(pool.getElementsOfType(self.getMainType()).items()): if elem.controller != self.getFullName(): continue axes.append(elem.getAxis()) return sorted(axes)
[docs] def getUsedAxis(self): msg = ("getUsedAxis is deprecated since version 2.5.0. ", "Use getUsedAxes instead.") self.warning(msg) self.getUsedAxes()
[docs] def getLastUsedAxis(self): """Return the last used axis (the highest axis) in this controller :return: last used axis :rtype: int or None """ used_axes = self.getUsedAxes() if len(used_axes) == 0: return None return max(used_axes)
def __lt__(self, o): return self.getName() < o.getName()
class ComChannel(PoolElement): """ Class encapsulating CommunicationChannel functionality.""" pass
[docs]class ExpChannel(PoolElement): """ Class encapsulating ExpChannel functionality.""" def __init__(self, name, **kw): """ExpChannel initialization.""" self.call__init__(PoolElement, name, **kw) self._last_integ_time = None self._last_value_ref_pattern = None self._last_value_ref_enabled = None self._value_buffer = {} self._value_buffer_cb = None codec_name = getattr(sardanacustomsettings, "VALUE_BUFFER_CODEC") self._value_buffer_codec = CodecFactory().getCodec(codec_name) self._value_ref_buffer = {} self._value_ref_buffer_cb = None codec_name = getattr(sardanacustomsettings, "VALUE_REF_BUFFER_CODEC") self._value_ref_buffer_codec = CodecFactory().getCodec(codec_name)
[docs] def isReferable(self): if "valueref" in list(map(str.lower, self.get_attribute_list())): return True return False
[docs] def getIntegrationTime(self): return self._getAttrValue('IntegrationTime')
[docs] def getIntegrationTimeObj(self): return self._getAttrEG('IntegrationTime')
[docs] def setIntegrationTime(self, integ_time): self.getIntegrationTimeObj().write(integ_time)
[docs] def putIntegrationTime(self, integ_time): if self._last_integ_time == integ_time: return self._last_integ_time = integ_time self.getIntegrationTimeObj().write(integ_time)
[docs] def getValueObj_(self): """Retrurns Value attribute event generator object. :return: Value attribute event generator :rtype: TangoAttributeEG ..todo:: When support to Taurus 3 will be dropped provide getValueObj. Taurus 3 TaurusDevice class already uses this name. """ return self._getAttrEG('value')
[docs] def getValue(self, force=False): return self._getAttrValue('value', force=force)
[docs] def getValueBufferObj(self): return self._getAttrEG('valuebuffer')
[docs] def getValueBuffer(self): return self._value_buffer
[docs] def valueBufferChanged(self, value_buffer): if value_buffer is None: return _, value_buffer = self._value_buffer_codec.decode(value_buffer) indexes = value_buffer["index"] values = value_buffer["value"] for index, value in zip(indexes, values): self._value_buffer[index] = value
[docs] def getValueRefObj(self): """Return ValueRef attribute event generator object. :return: ValueRef attribute event generator :rtype: TangoAttributeEG """ return self._getAttrEG('value')
[docs] def getValueRef(self, force=False): return self._getAttrValue('valueref', force=force)
[docs] def getValueRefBufferObj(self): return self._getAttrEG('valuerefbuffer')
[docs] def getValueRefBuffer(self): return self._value_ref_buffer
[docs] def valueBufferRefChanged(self, value_ref_buffer): if value_ref_buffer is None: return _, value_ref_buffer = self._value_ref_buffercodec.decode( value_ref_buffer) indexes = value_ref_buffer["index"] value_refs = value_ref_buffer["value_ref"] for index, value_ref in zip(indexes, value_refs): self._value_ref_buffer[index] = value_ref
[docs] def getValueRefPattern(self): return self._getAttrValue('ValueRefPattern')
[docs] def getValueRefPatternObj(self): return self._getAttrEG('ValueRefPattern')
[docs] def setValueRefPattern(self, value_ref_pattern): self.getValueRefPatternObj().write(value_ref_pattern)
[docs] def putValueRefPattern(self, value_ref_pattern): if self._last_value_ref_pattern == value_ref_pattern: return self._last_value_ref_pattern = value_ref_pattern self.getValueRefPatternObj().write(value_ref_pattern)
[docs] def isValueRefEnabled(self): return self._getAttrValue('ValueRefEnabled')
[docs] def getValueRefEnabledObj(self): return self._getAttrEG('ValueRefEnabled')
[docs] def setValueRefEnabled(self, value_ref_enabled): self.getValueRefEnabledObj().write(value_ref_enabled)
[docs] def putValueRefEnabled(self, value_ref_enabled): if self._last_value_ref_enabled == value_ref_enabled: return self._last_value_ref_enabled = value_ref_enabled self.getValueRefEnabledObj().write(value_ref_enabled)
def _start(self, *args, **kwargs): self.Start()
[docs] def go(self, *args, **kwargs): """Count and report count result. Configuration measurement, then start and wait until finish. .. note:: The count (go) method API is partially experimental (value references may be changed to values whenever possible in the future). Backwards incompatible changes may occur if deemed necessary by the core developers. :return: state and value (or value reference - experimental) :rtype: :obj:`tuple` """ start_time = time.time() integration_time = args[0] self.putIntegrationTime(integration_time) PoolElement.go(self) state = self.getStateEG().readValue() if self.isReferable() and self.isValueRefEnabled(): result = self.getValueRef() else: result = self.getValue() ret = state, result self._total_go_time = time.time() - start_time return ret
startCount = PoolElement.start waitCount = PoolElement.waitFinish count = go stopCount = PoolElement.abort stop = PoolElement.stop
class TimerableExpChannel(ExpChannel): def getTimer(self): return self._getAttrValue('Timer') def getTimerObj(self): return self._getAttrEG('Timer') def setTimer(self, timer): self.getTimerObj().write(timer)
[docs]class CTExpChannel(TimerableExpChannel): """ Class encapsulating CTExpChannel functionality.""" pass
[docs]class ZeroDExpChannel(ExpChannel): """ Class encapsulating ZeroDExpChannel functionality.""" pass
[docs]class OneDExpChannel(TimerableExpChannel): """ Class encapsulating OneDExpChannel functionality.""" pass
[docs]class TwoDExpChannel(TimerableExpChannel): """ Class encapsulating TwoDExpChannel functionality.""" pass
[docs]class PseudoCounter(ExpChannel): """ Class encapsulating PseudoCounter functionality.""" pass
[docs]class TriggerGate(PoolElement): """ Class encapsulating TriggerGate functionality.""" pass
[docs]class Motor(PoolElement, Moveable): """ Class encapsulating Motor functionality.""" def __init__(self, name, **kw): """PoolElement initialization.""" self.call__init__(PoolElement, name, **kw) self.call__init__(Moveable)
[docs] def getPosition(self, force=False): return self._getAttrValue('position', force=force)
[docs] def getDialPosition(self, force=False): return self._getAttrValue('dialposition', force=force)
[docs] def getVelocity(self, force=False): return self._getAttrValue('velocity', force=force)
[docs] def getAcceleration(self, force=False): return self._getAttrValue('acceleration', force=force)
[docs] def getDeceleration(self, force=False): return self._getAttrValue('deceleration', force=force)
[docs] def getBaseRate(self, force=False): return self._getAttrValue('base_rate', force=force)
[docs] def getBacklash(self, force=False): return self._getAttrValue('backlash', force=force)
[docs] def getLimitSwitches(self, force=False): return self._getAttrValue('limit_switches', force=force)
[docs] def getOffset(self, force=False): return self._getAttrValue('offset', force=force)
[docs] def getStepPerUnit(self, force=False): return self._getAttrValue('step_per_unit', force=force)
[docs] def getSign(self, force=False): return self._getAttrValue('Sign', force=force)
[docs] def getSimulationMode(self, force=False): return self._getAttrValue('SimulationMode', force=force)
[docs] def getPositionObj(self): return self._getAttrEG('position')
[docs] def getDialPositionObj(self): return self._getAttrEG('dialposition')
[docs] def getVelocityObj(self): return self._getAttrEG('velocity')
[docs] def getAccelerationObj(self): return self._getAttrEG('acceleration')
[docs] def getDecelerationObj(self): return self._getAttrEG('deceleration')
[docs] def getBaseRateObj(self): return self._getAttrEG('base_rate')
[docs] def getBacklashObj(self): return self._getAttrEG('backlash')
[docs] def getLimitSwitchesObj(self): return self._getAttrEG('limit_switches')
[docs] def getOffsetObj(self): return self._getAttrEG('offset')
[docs] def getStepPerUnitObj(self): return self._getAttrEG('step_per_unit')
[docs] def getSimulationModeObj(self): return self._getAttrEG('step_per_unit')
[docs] def setVelocity(self, value): return self.getVelocityObj().write(value)
[docs] def setAcceleration(self, value): return self.getAccelerationObj().write(value)
[docs] def setDeceleration(self, value): return self.getDecelerationObj().write(value)
[docs] def setBaseRate(self, value): return self.getBaseRateObj().write(value)
[docs] def setBacklash(self, value): return self.getBacklashObj().write(value)
[docs] def setOffset(self, value): return self.getOffsetObj().write(value)
[docs] def setStepPerUnit(self, value): return self.getStepPerUnitObj().write(value)
[docs] def setSign(self, value): return self.getSignObj().write(value)
# -~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~- # Moveable interface # def _start(self, *args, **kwargs): new_pos = args[0] if isinstance(new_pos, collections.Sequence): new_pos = new_pos[0] try: self.write_attribute('position', new_pos) except DevFailed as df: for err in df.args: if err.reason == 'API_AttrNotAllowed': raise RuntimeError('%s is already moving' % self) else: raise self.final_pos = new_pos
[docs] def go(self, *args, **kwargs): start_time = time.time() PoolElement.go(self, *args, **kwargs) ret = self.getStateEG().readValue(), self.readPosition() self._total_go_time = time.time() - start_time return ret
startMove = PoolElement.start waitMove = PoolElement.waitFinish move = go getLastMotionTime = PoolElement.getLastGoTime getTotalLastMotionTime = PoolElement.getTotalLastGoTime
[docs] @reservedOperation def iterMove(self, new_pos, timeout=None): if isinstance(new_pos, collections.Sequence): new_pos = new_pos[0] state, pos = self.getAttribute("state"), self.getAttribute("position") evt_wait = self._getEventWait() evt_wait.connect(state) evt_wait.lock() try: # evt_wait.waitEvent(DevState.MOVING, equal=False) time_stamp = time.time() try: self.getPositionObj().write(new_pos) except DevFailed as err_traceback: for err in err_traceback.args: if err.reason == 'API_AttrNotAllowed': raise RuntimeError('%s is already moving' % self) else: raise self.final_pos = new_pos # putting timeout=0.1 and retries=1 is a patch for the case when # the initial moving event doesn't arrive do to an unknown # tango/pytango error at the time evt_wait.waitEvent(DevState.MOVING, time_stamp, timeout=0.1, retries=1) finally: evt_wait.unlock() evt_wait.disconnect() evt_iter_wait = AttributeEventIterator(state, pos) evt_iter_wait.lock() try: for evt_data in evt_iter_wait.events(): src, value = evt_data if src == state and value != DevState.MOVING: raise StopIteration yield value finally: evt_iter_wait.unlock() evt_iter_wait.disconnect()
[docs] def readPosition(self, force=False): return [self.getPosition(force=force)]
[docs] def getMoveableSource(self): return self.getPoolObj()
[docs] def getSize(self): return 1
[docs] def getIndex(self, name): if name.lower() == self.getName().lower(): return 0 return -1
# # End of Moveable interface # -~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~- def _information(self, tab=' '): msg = PoolElement._information(self, tab=tab) try: position = self.read_attribute("position") pos = str(position.value) if position.quality != AttrQuality.ATTR_VALID: pos += " [" + QUALITY[position.quality] + "]" except DevFailed as df: if len(df.args): pos = df.args[0].desc else: e_info = sys.exc_info()[:2] pos = traceback.format_exception_only(*e_info) except: e_info = sys.exc_info()[:2] pos = traceback.format_exception_only(*e_info) msg.append(tab + "Position: " + str(pos)) return msg
[docs]class PseudoMotor(PoolElement, Moveable): """ Class encapsulating PseudoMotor functionality.""" def __init__(self, name, **kw): """PoolElement initialization.""" self.call__init__(PoolElement, name, **kw) self.call__init__(Moveable)
[docs] def getPosition(self, force=False): return self._getAttrValue('position', force=force)
[docs] def getDialPosition(self, force=False): return self.getPosition(force=force)
[docs] def getPositionObj(self): return self._getAttrEG('position')
[docs] def getDialPositionObj(self): return self.getPositionObj()
# -~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~- # Moveable interface # def _start(self, *args, **kwargs): new_pos = args[0] if isinstance(new_pos, collections.Sequence): new_pos = new_pos[0] try: self.write_attribute('position', new_pos) except DevFailed as df: for err in df.args: if err.reason == 'API_AttrNotAllowed': raise RuntimeError('%s is already moving' % self) else: raise self.final_pos = new_pos
[docs] def go(self, *args, **kwargs): start_time = time.time() PoolElement.go(self, *args, **kwargs) ret = self.getStateEG().readValue(), self.readPosition() self._total_go_time = time.time() - start_time return ret
startMove = PoolElement.start waitMove = PoolElement.waitFinish move = go getLastMotionTime = PoolElement.getLastGoTime getTotalLastMotionTime = PoolElement.getTotalLastGoTime
[docs] def readPosition(self, force=False): return [self.getPosition(force=force)]
[docs] def getMoveableSource(self): return self.getPoolObj()
[docs] def getSize(self): return 1
[docs] def getIndex(self, name): if name.lower() == self.getName().lower(): return 0 return -1
# # End of Moveable interface # -~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~- def _information(self, tab=' '): msg = PoolElement._information(self, tab=tab) try: position = self.read_attribute("position") pos = str(position.value) if position.quality != AttrQuality.ATTR_VALID: pos += " [" + QUALITY[position.quality] + "]" except DevFailed as df: if len(df.args): pos = df.args[0].desc else: e_info = sys.exc_info()[:2] pos = traceback.format_exception_only(*e_info) except: e_info = sys.exc_info()[:2] pos = traceback.format_exception_only(*e_info) msg.append(tab + "Position: " + str(pos)) return msg
[docs]class MotorGroup(PoolElement, Moveable): """ Class encapsulating MotorGroup functionality.""" def __init__(self, name, **kw): """PoolElement initialization.""" self.call__init__(PoolElement, name, **kw) self.call__init__(Moveable) def _create_str_tuple(self): return 3 * ["TODO"]
[docs] def getMotorNames(self): return self.getPoolData()['elements']
[docs] def hasMotor(self, name): motor_names = list(map(str.lower, self.getMotorNames())) return name.lower() in motor_names
[docs] def getPosition(self, force=False): return self._getAttrValue('position', force=force)
[docs] def getPositionObj(self): return self._getAttrEG('position')
# -~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~- # Moveable interface # def _start(self, *args, **kwargs): new_pos = args[0] try: self.write_attribute('position', new_pos) except DevFailed as df: for err in df.args: if err.reason == 'API_AttrNotAllowed': raise RuntimeError('%s is already moving' % self) else: raise self.final_pos = new_pos
[docs] def go(self, *args, **kwargs): start_time = time.time() PoolElement.go(self, *args, **kwargs) ret = self.getStateEG().readValue(), self.readPosition() self._total_go_time = time.time() - start_time return ret
startMove = PoolElement.start waitMove = PoolElement.waitFinish move = go getLastMotionTime = PoolElement.getLastGoTime getTotalLastMotionTime = PoolElement.getTotalLastGoTime
[docs] def readPosition(self, force=False): return self.getPosition(force=force)
[docs] def getMoveableSource(self): return self.getPoolObj()
[docs] def getSize(self): return len(self.getMotorNames())
[docs] def getIndex(self, name): try: motor_names = list(map(str.lower, self.getMotorNames())) return motor_names.index(name.lower()) except: return -1
# # End of Moveable interface # -~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~- def _information(self, tab=' '): msg = PoolElement._information(self, tab=tab) try: position = self.read_attribute("position") pos = str(position.value) if position.quality != AttrQuality.ATTR_VALID: pos += " [" + QUALITY[position.quality] + "]" except DevFailed as df: if len(df.args): pos = df.args[0].desc else: e_info = sys.exc_info()[:2] pos = traceback.format_exception_only(*e_info) except: e_info = sys.exc_info()[:2] pos = traceback.format_exception_only(*e_info) msg.append(tab + "Position: " + str(pos)) return msg
class BaseChannelInfo(object): def __init__(self, data): # dict<str, obj> # channel data self.raw_data = data self.__dict__.update(data) class TangoChannelInfo(BaseChannelInfo): def __init__(self, data, info): BaseChannelInfo.__init__(self, data) # PyTango.AttributeInfoEx self.set_info(info) def has_info(self): return self.raw_info is not None def set_info(self, info): self.raw_info = info if info is None: return data = self.raw_data if 'data_type' not in data: data_type = info.data_type try: self.data_type = FROM_TANGO_TO_STR_TYPE[data_type] except KeyError as e: # For backwards compatibility: # starting from Taurus 4.3.0 DevVoid was added to the dict if data_type == PyTango.DevVoid: self.data_type = None else: raise e if 'shape' not in data: shape = () if info.data_format == AttrDataFormat.SPECTRUM: shape = (info.max_dim_x,) elif info.data_format == AttrDataFormat.IMAGE: shape = (info.max_dim_x, info.max_dim_y) self.shape = shape else: shape = self.shape self.shape = list(shape) def __getattr__(self, name): if self.has_info(): return getattr(self.raw_info, name) cls_name = self.__class__.__name__ raise AttributeError("'%s' has no attribute '%s'" % (cls_name, name)) def getChannelConfigs(mgconfig, ctrls=None, sort=True): ''' gets a list of channel configurations of the controllers of the given measurement group configuration. It optionally filters to those channels matching given lists of controller. :param ctrls: (seq<str> or None) a sequence of strings to filter the controllers. If None given, all controllers will be used :param sort: (bool) If True (default) the returned list will be sorted according to channel index (if given in channeldata) and then by channelname. :return: (list<tuple>) A list of channelname,channeldata pairs. ''' chconfigs = [] if not mgconfig: return [] for ctrl_name, ctrl_data in list(mgconfig['controllers'].items()): if ctrls is None or ctrl_name in ctrls: for ch_name, ch_data in list(ctrl_data['channels'].items()): ch_data.update({'_controller_name': ctrl_name}) chconfigs.append((ch_name, ch_data)) if sort: # sort the channel configs by index (primary sort) and then by channel # name. # sort by channel_name chconfigs = sorted(chconfigs, key=lambda c: c[0]) # sort by index (give a very large index for those which don't have it) chconfigs = sorted(chconfigs, key=lambda c: c[1].get('index', 1e16)) return chconfigs class MGConfiguration(object): def __init__(self, mg, data): self._mg = weakref.ref(mg) if isinstance(data, str): data = CodecFactory().decode(('json', data)) self.raw_data = data self.__dict__.update(data) # dict<str, dict> # where key is the channel name and value is the channel data in form # of a dict as receveid by the MG configuration attribute self.channels = channels = CaselessDict() for _, ctrl_data in list(self.controllers.items()): for channel_name, channel_data in \ list(ctrl_data['channels'].items()): channels[channel_name] = channel_data ##################### # @todo: the for-loops above could be replaced by something like: # self.channels = channels = \ # CaselessDict(getChannelConfigs(data, sort=False)) ##################### # seq<dict> each element is the channel data in form of a dict as # receveid by the MG configuration attribute. This seq is just a cache # ordered by channel index in the MG. self.channel_list = len(channels) * [None] for channel in list(channels.values()): self.channel_list[channel['index']] = channel # dict<str, list[DeviceProxy, CaselessDict<str, dict>]> # where key is a device name and value is a list with two elements: # - A device proxy or None if there was an error building it # - A dict where keys are attribute names and value is a reference to # a dict representing channel data as received in raw data self.tango_dev_channels = None # Number of elements in tango_dev_channels in error (could not build # DeviceProxy, probably) self.tango_dev_channels_in_error = 0 # dict<str, tuple<str, str, TangoChannelInfo>> # where key is a channel name and value is a tuple of three elements: # - device name # - attribute name # - attribute information or None if there was an error trying to get # the information self.tango_channels_info = None # Number of elements in tango_channels_info_in_error in error # (could not build attribute info, probably) self.tango_channels_info_in_error = 0 # dict<str, dict> # where key is a channel name and data is a reference to a dict # representing channel data as received in raw data self.non_tango_channels = None self.initialized = False def _build(self): # internal channel structure that groups channels by tango device so # they can be read as a group minimizing this way the network requests self.tango_dev_channels = tg_dev_chs = CaselessDict() self.tango_dev_channels_in_error = 0 self.tango_channels_info = tg_chs_info = CaselessDict() self.tango_channels_info_in_error = 0 self.non_tango_channels = n_tg_chs = CaselessDict() self.cache = cache = {} tg_attr_validator = TangoAttributeNameValidator() for channel_name, channel_data in list(self.channels.items()): cache[channel_name] = None data_source = channel_data['source'] params = tg_attr_validator.getParams(data_source) if params is None: # Handle NON tango channel n_tg_chs[channel_name] = channel_data else: # Handle tango channel dev_name = params['devname'].lower() attr_name = params['_shortattrname'].lower() host, port = params.get('host'), params.get('port') if host is not None and port is not None: dev_name = "tango://{0}:{1}/{2}".format(host, port, dev_name) dev_data = tg_dev_chs.get(dev_name) # technical debt: read Value or ValueRef attribute # ideally the source configuration should include this info # Use DeviceProxy instead of taurus to avoid crashes in Py3 # See: tango-controls/pytango#292 # channel = Device(dev_name) # if (isinstance(channel, ExpChannel) # and channel.isReferable() # and channel_data.get("value_ref_enabled", False)): if (_is_referable(dev_name) and channel_data.get("value_ref_enabled", False)): attr_name += "Ref" if dev_data is None: # Build tango device dev = None try: dev = DeviceProxy(dev_name) except: self.tango_dev_channels_in_error += 1 tg_dev_chs[dev_name] = dev_data = [dev, CaselessDict()] dev, attr_data = dev_data attr_data[attr_name] = channel_data # get attribute configuration attr_info = None if dev is None: self.tango_channels_info_in_error += 1 else: try: tg_attr_info = dev.get_attribute_config_ex(attr_name)[ 0] except: tg_attr_info = \ self._build_empty_tango_attr_info(channel_data) self.tango_channels_info_in_error += 1 attr_info = TangoChannelInfo(channel_data, tg_attr_info) tg_chs_info[channel_name] = dev_name, attr_name, attr_info def _build_empty_tango_attr_info(self, channel_data): ret = PyTango.AttributeInfoEx() ret.name = channel_data['name'] ret.label = channel_data['label'] return ret def prepare(self): # first time? build everything if self.tango_dev_channels is None: return self._build() # prepare missing tango devices if self.tango_dev_channels_in_error > 0: for dev_name, dev_data in list(self.tango_dev_channels.items()): if dev_data[0] is None: try: dev_data[0] = DeviceProxy(dev_name) self.tango_dev_channels_in_error -= 1 except: pass # prepare missing tango attribute configuration if self.tango_channels_info_in_error > 0: for _, attr_data in list(self.tango_channels_info.items()): dev_name, attr_name, attr_info = attr_data if attr_info.has_info(): continue dev = self.tango_dev_channels[dev_name] if dev is None: continue try: tg_attr_info = dev.get_attribute_config_ex(attr_name)[0] attr_info.set_info(tg_attr_info) self.tango_channels_info_in_error -= 1 except: pass def getChannels(self): return self.channel_list def getChannelInfo(self, channel_name): try: return self.tango_channels_info[channel_name] except Exception: channel_name = channel_name.lower() for d_name, a_name, ch_info in \ list(self.tango_channels_info.values()): if ch_info.name.lower() == channel_name: return d_name, a_name, ch_info def getChannelsInfo(self, only_enabled=False): """Returns information about the channels present in the measurement group in a form of dictionary, where key is a channel name and value is a tuple of three elements: - device name - attribute name - attribute information or None if there was an error trying to get the information :param only_enabled: flag to filter out disabled channels :type only_enabled: bool :return: dictionary with channels info :rtype: dict<str, tuple<str, str, TangoChannelInfo>> """ self.prepare() ret = CaselessDict(self.tango_channels_info) ret.update(self.non_tango_channels) for ch_name, (_, _, ch_info) in list(ret.items()): if only_enabled and not ch_info.enabled: ret.pop(ch_name) return ret def getChannelsInfoList(self, only_enabled=False): """Returns information about the channels present in the measurement group in a form of ordered, based on the channel index, list. :param only_enabled: flag to filter out disabled channels :type only_enabled: bool :return: list with channels info :rtype: list<TangoChannelInfo> """ channels_info = self.getChannelsInfo(only_enabled=only_enabled) ret = [] for _, (_, _, ch_info) in list(channels_info.items()): ret.append(ch_info) ret = sorted(ret, key=lambda x: x.index) return ret def getCountersInfoList(self): channels_info = self.getChannelsInfoList() timer_name, idx = self.timer, -1 for i, ch in enumerate(channels_info): if ch['full_name'] == timer_name: idx = i break if idx >= 0: channels_info.pop(idx) return channels_info def getTangoDevChannels(self, only_enabled=False): """Returns Tango channels (attributes) that could be used to read measurement group results in a form of dict where key is a device name and value is a list with two elements: - A device proxy or None if there was an error building it - A dict where keys are attribute names and value is a reference to a dict representing channel data as received in raw data :param only_enabled: flag to filter out disabled channels :type only_enabled: bool :return: dict with Tango channels :rtype: dict<str, list[DeviceProxy, CaselessDict<str, dict>]> """ if not only_enabled: return self.tango_dev_channels tango_dev_channels = {} for dev_name, dev_data in list(self.tango_dev_channels.items()): dev_proxy, attrs = dev_data[0], copy.deepcopy(dev_data[1]) for attr_name, channel_data in list(attrs.items()): if not channel_data["enabled"]: attrs.pop(attr_name) tango_dev_channels[dev_name] = [dev_proxy, attrs] return tango_dev_channels def read(self, parallel=True): if parallel: return self._read_parallel() return self._read() def _read_parallel(self): self.prepare() ret = CaselessDict(self.cache) dev_replies = {} # deposit read requests tango_dev_channels = self.getTangoDevChannels(only_enabled=True) for _, dev_data in list(tango_dev_channels.items()): dev, attrs = dev_data if dev is None: continue try: dev_replies[dev] = dev.read_attributes_asynch( list(attrs.keys())), attrs except Exception: dev_replies[dev] = None, attrs # gather all replies for dev, reply_data in list(dev_replies.items()): reply, attrs = reply_data try: data = dev.read_attributes_reply(reply, 0) for data_item in data: channel_data = attrs[data_item.name] if data_item.has_failed: value = None else: value = data_item.value ret[channel_data['full_name']] = value except Exception: for _, channel_data in list(attrs.items()): ret[channel_data['full_name']] = None return ret def _read(self): self.prepare() ret = CaselessDict(self.cache) tango_dev_channels = self.getTangoDevChannels(only_enabled=True) for _, dev_data in list(tango_dev_channels.items()): dev, attrs = dev_data try: data = dev.read_attributes(list(attrs.keys())) for data_item in data: channel_data = attrs[data_item.name] if data_item.has_failed: value = None else: value = data_item.value ret[channel_data['full_name']] = value except Exception: for _, channel_data in list(attrs.items()): ret[channel_data['full_name']] = None return ret
[docs]class MeasurementGroup(PoolElement): """ Class encapsulating MeasurementGroup functionality.""" def __init__(self, name, **kw): """PoolElement initialization.""" self._configuration = None self._channels = None self._last_integ_time = None self.call__init__(PoolElement, name, **kw) self.__cfg_attr = self.getAttribute('configuration') self.__cfg_attr.addListener(self.on_configuration_changed) self._value_buffer_cb = None self._value_buffer_channels = None codec_name = getattr(sardanacustomsettings, "VALUE_BUFFER_CODEC") self._value_buffer_codec = CodecFactory().getCodec(codec_name) self._value_ref_buffer_cb = None self._value_ref_buffer_channels = None codec_name = getattr(sardanacustomsettings, "VALUE_REF_BUFFER_CODEC") self._value_ref_buffer_codec = CodecFactory().getCodec(codec_name)
[docs] def cleanUp(self): PoolElement.cleanUp(self) f = self.factory() f.removeExistingAttribute(self.__cfg_attr)
def _create_str_tuple(self): channel_names = ", ".join(self.getChannelNames()) return self.getName(), self.getTimerName(), channel_names
[docs] def getConfigurationAttrEG(self): return self._getAttrEG('Configuration')
[docs] def setConfiguration(self, configuration): codec = CodecFactory().getCodec('json') f, data = codec.encode(('', configuration)) self.write_attribute('configuration', data)
def _setConfiguration(self, data): self._configuration = MGConfiguration(self, data)
[docs] def getConfiguration(self, force=False): if force or self._configuration is None: data = self.getConfigurationAttrEG().readValue(force=True) self._setConfiguration(data) return self._configuration
[docs] def on_configuration_changed(self, evt_src, evt_type, evt_value): if evt_type not in CHANGE_EVT_TYPES: return self.info("Configuration changed") self._setConfiguration(evt_value.value)
[docs] def getTimerName(self): return self.getTimer()['name']
[docs] def getTimer(self): cfg = self.getConfiguration() return cfg.channels[cfg.timer]
[docs] def getTimerValue(self): return self.getTimerName()
[docs] def getMonitorName(self): return self.getMonitor()['name']
[docs] def getMonitor(self): cfg = self.getConfiguration() return cfg.channels[cfg.monitor]
[docs] def setTimer(self, timer_name): try: self.getChannel(timer_name) except KeyError: raise Exception("%s does not contain a channel named '%s'" % (str(self), timer_name)) cfg = self.getConfiguration().raw_data cfg['timer'] = timer_name import json self.write_attribute("configuration", json.dumps(cfg))
[docs] def getChannels(self): return self.getConfiguration().getChannels()
[docs] def getCounters(self): cfg = self.getConfiguration() return [c for c in self.getChannels() if c['full_name'] != cfg.timer]
[docs] def getChannelNames(self): return [ch['name'] for ch in self.getChannels()]
[docs] def getCounterNames(self): return [ch['name'] for ch in self.getCounters()]
[docs] def getChannelLabels(self): return [ch['label'] for ch in self.getChannels()]
[docs] def getCounterLabels(self): return [ch['label'] for ch in self.getCounters()]
[docs] def getChannel(self, name): return self.getConfiguration().channels[name]
[docs] def getChannelInfo(self, name): return self.getConfiguration().getChannelInfo(name)
[docs] def getChannelsInfo(self): return self.getConfiguration().getChannelsInfoList()
[docs] def getChannelsEnabledInfo(self): """Returns information about **only enabled** channels present in the measurement group in a form of ordered, based on the channel index, list. :return: list with channels info :rtype: list<TangoChannelInfo> """ return self.getConfiguration().getChannelsInfoList(only_enabled=True)
[docs] def getCountersInfo(self): return self.getConfiguration().getCountersInfoList()
[docs] def getValues(self, parallel=True): return self.getConfiguration().read(parallel=parallel)
[docs] def getValueBuffers(self): value_buffers = [] for channel_info in self.getChannels(): channel = Device(channel_info["full_name"]) value_buffers.append(channel.getValueBuffer()) return value_buffers
[docs] def getIntegrationTime(self): return self._getAttrValue('IntegrationTime')
[docs] def getIntegrationTimeObj(self): return self._getAttrEG('IntegrationTime')
[docs] def setIntegrationTime(self, ctime): self.getIntegrationTimeObj().write(ctime)
[docs] def putIntegrationTime(self, ctime): if self._last_integ_time == ctime: return self._last_integ_time = ctime self.getIntegrationTimeObj().write(ctime)
[docs] def getAcquisitionModeObj(self): return self._getAttrEG('AcquisitionMode')
[docs] def getAcquisitionMode(self): return self._getAttrValue('AcquisitionMode')
[docs] def setAcquisitionMode(self, acqMode): self.getAcquisitionModeObj().write(acqMode)
[docs] def getSynchronizationObj(self): return self._getAttrEG('Synchronization')
[docs] def getSynchronization(self): return self._getAttrValue('Synchronization')
[docs] def setSynchronization(self, synchronization): codec = CodecFactory().getCodec('json') _, data = codec.encode(('', synchronization)) self.getSynchronizationObj().write(data) self._last_integ_time = None
# NbStarts Methods
[docs] def getNbStartsObj(self): return self._getAttrEG('NbStarts')
[docs] def setNbStarts(self, starts): self.getNbStartsObj().write(starts)
[docs] def getNbStarts(self): return self._getAttrValue('NbStarts')
[docs] def getMoveableObj(self): return self._getAttrEG('Moveable')
[docs] def getMoveable(self): return self._getAttrValue('Moveable')
[docs] def getLatencyTimeObj(self): return self._getAttrEG('LatencyTime')
[docs] def getLatencyTime(self): return self._getAttrValue('LatencyTime')
[docs] def setMoveable(self, moveable=None): if moveable is None: moveable = 'None' # Tango attribute is of type DevString self.getMoveableObj().write(moveable)
[docs] def valueBufferChanged(self, channel, value_buffer): """Receive value buffer updates, pre-process them, and call the subscribed callback. :param channel: channel that reports value buffer update :type channel: ExpChannel :param value_buffer: json encoded value buffer update, it contains at least values and indexes :type value_buffer: :obj:`str` """ if value_buffer is None: return _, value_buffer = self._value_buffer_codec.decode(value_buffer) values = value_buffer["value"] if isinstance(values[0], list): np_values = list(map(numpy.array, values)) value_buffer["value"] = np_values self._value_buffer_cb(channel, value_buffer)
[docs] def subscribeValueBuffer(self, cb=None): """Subscribe to channels' value buffer update events. If no callback is passed, the default channel's callback is subscribed which will store the data in the channel's value_buffer attribute. :param cb: callback to be subscribed, None means subscribe the default channel's callback :type cb: callable """ self._value_buffer_channels = [] for channel_info in self.getChannels(): full_name = channel_info["full_name"] value_ref_enabled = channel_info.get("value_ref_enabled", False) # Use DeviceProxy instead of taurus to avoid crashes in Py3 # See: tango-controls/pytango#292 if _is_referable(full_name) and value_ref_enabled: continue channel = Device(full_name) value_buffer_obj = channel.getValueBufferObj() if cb is not None: self._value_buffer_cb = cb value_buffer_obj.subscribeEvent(self.valueBufferChanged, channel, False) else: value_buffer_obj.subscribeEvent(channel.valueBufferChanged, with_first_event=False) self._value_buffer_channels.append(channel)
[docs] def unsubscribeValueBuffer(self, cb=None): """Unsubscribe from channels' value buffer events. If no callback is passed, unsubscribe the channel's default callback. :param cb: callback to be unsubscribed, None means unsubscribe the default channel's callback :type cb: callable """ for channel_info in self.getChannels(): full_name = channel_info["full_name"] value_ref_enabled = channel_info.get("value_ref_enabled", False) # Use DeviceProxy instead of taurus to avoid crashes in Py3 # See: tango-controls/pytango#292 if _is_referable(full_name) and value_ref_enabled: continue channel = Device(full_name) value_buffer_obj = channel.getValueBufferObj() if cb is not None: value_buffer_obj.unsubscribeEvent(self.valueBufferChanged, channel) self._value_buffer_cb = None else: value_buffer_obj.unsubscribeEvent(channel.valueBufferChanged) self._value_buffer_channels = None
[docs] def valueRefBufferChanged(self, channel, value_ref_buffer): """Receive value ref buffer updates, pre-process them, and call the subscribed callback. :param channel: channel that reports value ref buffer update :type channel: ExpChannel :param value_ref_buffer: json encoded value ref buffer update, it contains at least value refs and indexes :type value_ref_buffer: :obj:`str` """ if value_ref_buffer is None: return _, value_ref_buffer = self._value_ref_buffer_codec.decode( value_ref_buffer) self._value_ref_buffer_cb(channel, value_ref_buffer)
[docs] def subscribeValueRefBuffer(self, cb=None): """Subscribe to channels' value ref buffer update events. If no callback is passed, the default channel's callback is subscribed which will store the data in the channel's value_buffer attribute. :param cb: callback to be subscribed, None means subscribe the default channel's callback :type cb: callable """ self._value_ref_buffer_channels = [] for channel_info in self.getChannels(): full_name = channel_info["full_name"] value_ref_enabled = channel_info.get("value_ref_enabled", False) # Use DeviceProxy instead of taurus to avoid crashes in Py3 # See: tango-controls/pytango#292 if not _is_referable(full_name): continue if not value_ref_enabled: continue channel = Device(full_name) value_ref_buffer_obj = channel.getValueRefBufferObj() if cb is not None: self._value_ref_buffer_cb = cb value_ref_buffer_obj.subscribeEvent( self.valueRefBufferChanged, channel, False) else: value_ref_buffer_obj.subscribeEvent( channel.valueRefBufferChanged, with_first_event=False) self._value_ref_buffer_channels.append(channel)
[docs] def unsubscribeValueRefBuffer(self, cb=None): """Unsubscribe from channels' value ref buffer events. If no callback is passed, unsubscribe the channel's default callback. :param cb: callback to be unsubscribed, None means unsubscribe the default channel's callback :type cb: callable """ for channel_info in self.getChannels(): full_name = channel_info["full_name"] value_ref_enabled = channel_info.get("value_ref_enabled", False) # Use DeviceProxy instead of taurus to avoid crashes in Py3 # See: tango-controls/pytango#292 if not _is_referable(full_name): continue if not value_ref_enabled: continue channel = Device(full_name) value_ref_buffer_obj = channel.getValueRefBufferObj() if cb is not None: value_ref_buffer_obj.unsubscribeEvent( self.valueRefBufferChanged, channel) self._value_ref_buffer_cb = None else: value_ref_buffer_obj.unsubscribeEvent( channel.valueRefBufferChanged) self._value_ref_buffer_channels = None
[docs] def enableChannels(self, channels): '''Enable acquisition of the indicated channels. :param channels: (seq<str>) a sequence of strings indicating channel names ''' self._enableChannels(channels, True)
[docs] def disableChannels(self, channels): '''Disable acquisition of the indicated channels. :param channels: (seq<str>) a sequence of strings indicating channel names ''' self._enableChannels(channels, False)
def _enableChannels(self, channels, state): found = {} for channel in channels: found[channel] = False cfg = self.getConfiguration() for channel in cfg.getChannels(): name = channel['name'] if name in channels: channel['enabled'] = state found[name] = True wrong_channels = [] for ch, f in list(found.items()): if f is False: wrong_channels.append(ch) if len(wrong_channels) > 0: msg = 'channels: %s are not present in measurement group' % \ wrong_channels raise Exception(msg) self.setConfiguration(cfg.raw_data) def _start(self, *args, **kwargs): try: self.Start() except DevFailed as e: # TODO: Workaround for CORBA timeout on measurement group start # remove it whenever sardana-org/sardana#93 gets implemented if e.args[-1].reason == "API_DeviceTimedOut": self.error("start timed out, trying to stop") self.stop() self.debug("stopped") raise e
[docs] def prepare(self): self.command_inout("Prepare")
[docs] def count_raw(self, start_time=None): """Raw count and report count values. Simply start and wait until finish, no configuration nor preparation. .. note:: The count_raw method API is partially experimental (value references may be changed to values whenever possible in the future). Backwards incompatible changes may occur if deemed necessary by the core developers. :param start_time: start time of the whole count operation, if not passed a current timestamp will be used :type start_time: :obj:`float` :return: channel names and values (or value references - experimental) :rtype: :obj:`dict` where keys are channel full names and values are channel values (or value references - experimental) """ if start_time is None: start_time = time.time() PoolElement.go(self) state = self.getStateEG().readValue() if state == Fault: msg = "Measurement group ended acquisition with Fault state" raise Exception(msg) values = self.getValues() ret = state, values self._total_go_time = time.time() - start_time return ret
[docs] def go(self, *args, **kwargs): """Count and report count values. Configuration and prepare for measurement, then start and wait until finish. .. note:: The count (go) method API is partially experimental (value references may be changed to values whenever possible in the future). Backwards incompatible changes may occur if deemed necessary by the core developers. :return: channel names and values (or value references - experimental) :rtype: :obj:`dict` where keys are channel full names and values are channel values (or value references - experimental) """ start_time = time.time() cfg = self.getConfiguration() cfg.prepare() integration_time = args[0] if integration_time is None or integration_time == 0: return self.getStateEG().readValue(), self.getValues() self.putIntegrationTime(integration_time) self.setMoveable(None) self.setNbStarts(1) self.prepare() return self.count_raw(start_time)
[docs] def count_continuous(self, synchronization, value_buffer_cb=None): """Execute measurement process according to the given synchronization description. :param synchronization: synchronization description :type synchronization: list of groups with equidistant synchronizations :param value_buffer_cb: callback on value buffer updates :type value_buffer_cb: callable :return: state and eventually value buffers if no callback was passed :rtype: tuple<list<DevState>,<list>> .. todo:: Think of unifying measure with count. .. note:: The measure method has been included in MeasurementGroup class on a provisional basis. Backwards incompatible changes (up to and including removal of the method) may occur if deemed necessary by the core developers. """ start_time = time.time() cfg = self.getConfiguration() cfg.prepare() self.setSynchronization(synchronization) self.subscribeValueBuffer(value_buffer_cb) self.count_raw(start_time) self.unsubscribeValueBuffer(value_buffer_cb) state = self.getStateEG().readValue() if state == Fault: msg = "Measurement group ended acquisition with Fault state" raise Exception(msg) if value_buffer_cb is None: value_buffers = self.getValueBuffers() else: value_buffers = None ret = state, value_buffers self._total_go_time = time.time() - start_time return ret
startCount = PoolElement.start waitCount = PoolElement.waitFinish count = go stopCount = PoolElement.abort stop = PoolElement.stop
[docs]class IORegister(PoolElement): """ Class encapsulating IORegister functionality.""" def __init__(self, name, **kw): """IORegister initialization.""" self.call__init__(PoolElement, name, **kw)
[docs] def getValueObj(self): return self._getAttrEG('value')
[docs] def readValue(self, force=False): return self._getAttrValue('value', force=force)
[docs] def startWriteValue(self, new_value, timeout=None): try: self.getValueObj().write(new_value) self.final_val = new_value except DevFailed as err_traceback: for err in err_traceback.args: if err.reason == 'API_AttrNotAllowed': raise RuntimeError('%s is already chaging' % self) else: raise
[docs] def waitWriteValue(self, timeout=None): pass
[docs] def writeValue(self, new_value, timeout=None): self.startWriteValue(new_value, timeout=timeout) self.waitWriteValue(timeout=timeout) return self.getStateEG().readValue(), self.readValue()
writeIORegister = writeIOR = writeValue readIORegister = readIOR = getValue = readValue
[docs]class Instrument(BaseElement): def __init__(self, **kw): self.__dict__.update(kw)
[docs] def getFullName(self): return self.full_name
[docs] def getParentInstrument(self): return self.getPoolObj().getObj(self.parent_instrument)
[docs] def getParentInstrumentName(self): return self.parent_instrument
[docs] def getChildrenInstruments(self): raise NotImplementedError return self._children
[docs] def getElements(self): raise NotImplementedError return self._elements
[docs] def getType(self): return self.klass
[docs]class Pool(TangoDevice, MoveableSource): """ Class encapsulating device Pool functionality.""" def __init__(self, name, **kw): self.call__init__(TangoDevice, name, **kw) self.call__init__(MoveableSource) self._elements = BaseSardanaElementContainer() self.__elements_attr = self.getAttribute("Elements") self.__elements_attr.addListener(self.on_elements_changed)
[docs] def cleanUp(self): TangoDevice.cleanUp(self) f = self.factory() f.removeExistingAttribute(self.__elements_attr)
[docs] def getObject(self, element_info): elem_type = element_info.getType() data = element_info._data if elem_type in ('ControllerClass', 'ControllerLibrary', 'Instrument'): klass = globals()[elem_type] kwargs = dict(data) kwargs['_pool_data'] = data kwargs['_pool_obj'] = self return klass(**kwargs) obj = Factory().getDevice(element_info.full_name, _pool_obj=self, _pool_data=data) return obj
[docs] def on_elements_changed(self, evt_src, evt_type, evt_value): if evt_type == TaurusEventType.Error: msg = evt_value if isinstance(msg, DevFailed): d = msg.args[0] # skip configuration errors if d.reason == "API_BadConfigurationProperty": return if d.reason in ("API_DeviceNotExported", "API_CantConnectToDevice"): msg = "Pool was shutdown or is inaccessible" else: msg = "{0}: {1}".format(d.reason, d.desc) self.warning("Received elements error event %s", msg) self.debug(evt_value) return elif evt_type not in CHANGE_EVT_TYPES: return try: elems = CodecFactory().decode(evt_value.rvalue) except: self.error("Could not decode element info") self.info("value: '%s'", evt_value.rvalue) self.debug("Details:", exc_info=1) return elements = self.getElementsInfo() for element_data in elems.get('new', ()): element_data['manager'] = self element = BaseSardanaElement(**element_data) elements.addElement(element) for element_data in elems.get('del', ()): element = self.getElementInfo(element_data['full_name']) try: elements.removeElement(element) except: self.warning("Failed to remove %s", element_data) for element_data in elems.get('change', ()): # TODO: element is assigned but not used!! (check) element = self._removeElement(element_data) element = self._addElement(element_data) return elems
def _addElement(self, element_data): element_data['manager'] = self element = BaseSardanaElement(**element_data) self.getElementsInfo().addElement(element) return element def _removeElement(self, element_data): name = element_data['full_name'] element = self.getElementInfo(name) self.getElementsInfo().removeElement(element) return element
[docs] def getElementsInfo(self): return self._elements
[docs] def getElements(self): return self.getElementsInfo().getElements()
[docs] def getElementInfo(self, name): return self.getElementsInfo().getElement(name)
[docs] def getElementNamesOfType(self, elem_type): return self.getElementsInfo().getElementNamesOfType(elem_type)
[docs] def getElementsOfType(self, elem_type): return self.getElementsInfo().getElementsOfType(elem_type)
[docs] def getElementsWithInterface(self, interface): return self.getElementsInfo().getElementsWithInterface(interface)
[docs] def getElementWithInterface(self, elem_name, interface): return self.getElementsInfo().getElementWithInterface(elem_name, interface)
[docs] def getObj(self, name, elem_type=None): if elem_type is None: return self.getElementInfo(name) elif isinstance(elem_type, str): elem_types = elem_type, else: elem_types = elem_type name = name.lower() for e_type in elem_types: elems = self.getElementsOfType(e_type) for elem in list(elems.values()): if elem.name.lower() == name: return elem elem = elems.get(name) if elem is not None: return elem
def __repr__(self): return self.getNormalName() def __str__(self): return repr(self) # -~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~- # MoveableSource interface #
[docs] def getMoveable(self, names): """getMoveable(seq<string> names) -> Moveable Returns a moveable object that handles all the moveable items given in names.""" # if simple motor just return it (if the pool has it) if isinstance(names, str): names = names, if len(names) == 1: name = names[0] return self.getObj(name, elem_type=MOVEABLE_TYPES) # find a motor group that contains elements moveable = self.__findMotorGroupWithElems(names) # if none exists create one if moveable is None: mgs = self.getElementsOfType('MotorGroup') i = 1 pid = os.getpid() while True: name = "_mg_ms_{0}_{1}".format(pid, i) exists = False for mg in list(mgs.values()): if mg.name == name: exists = True break if not exists: break i += 1 moveable = self.createMotorGroup(name, names) return moveable
def __findMotorGroupWithElems(self, names): names_lower = list(map(str.lower, names)) len_names = len(names) mgs = self.getElementsOfType('MotorGroup') for mg in list(mgs.values()): mg_elems = mg.elements if len(mg_elems) != len_names: continue for mg_elem, name in zip(mg_elems, names_lower): if mg_elem.lower() != name: break else: return mg # # End of MoveableSource interface # -~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~- def _wait_for_element_in_container(self, container, elem_name, timeout=0.5, contains=True): start = time.time() cond = True nap = 0.01 if timeout: nap = timeout / 10 while cond: elem = container.getElement(elem_name) if contains: if elem is not None: return elem else: if elem is None: return True if timeout: dt = time.time() - start if dt > timeout: self.info("Timed out waiting for '%s' in container", elem_name) return time.sleep(nap)
[docs] def createMotorGroup(self, mg_name, elements): params = [mg_name, ] + list(map(str, elements)) self.debug('trying to create motor group for elements: %s', params) self.command_inout('CreateMotorGroup', params) elements_info = self.getElementsInfo() return self._wait_for_element_in_container(elements_info, mg_name)
[docs] def createMeasurementGroup(self, mg_name, elements): params = [mg_name, ] + list(map(str, elements)) self.debug('trying to create measurement group: %s', params) self.command_inout('CreateMeasurementGroup', params) elements_info = self.getElementsInfo() return self._wait_for_element_in_container(elements_info, mg_name)
[docs] def deleteMeasurementGroup(self, name): return self.deleteElement(name)
[docs] def createElement(self, name, ctrl, axis=None): ctrl_type = ctrl.types[0] if axis is None: last_axis = ctrl.getLastUsedAxis() if last_axis is None: axis = str(1) else: axis = str(last_axis + 1) else: axis = str(axis) cmd = "CreateElement" pars = ctrl_type, ctrl.name, axis, name self.command_inout(cmd, pars) elements_info = self.getElementsInfo() return self._wait_for_element_in_container(elements_info, name)
[docs] def renameElement(self, old_name, new_name): self.debug('trying to rename element: %s to: %s', old_name, new_name) self.command_inout('RenameElement', [old_name, new_name]) elements_info = self.getElementsInfo() return self._wait_for_element_in_container(elements_info, new_name, contains=True)
[docs] def deleteElement(self, name): self.debug('trying to delete element: %s', name) self.command_inout('DeleteElement', name) elements_info = self.getElementsInfo() return self._wait_for_element_in_container(elements_info, name, contains=False)
[docs] def createController(self, class_name, name, *props): ctrl_class = self.getObj(class_name, elem_type='ControllerClass') if ctrl_class is None: raise Exception("Controller class %s not found" % class_name) cmd = "CreateController" pars = [ctrl_class.types[0], ctrl_class.file_name, class_name, name] pars.extend(list(map(str, props))) self.command_inout(cmd, pars) elements_info = self.getElementsInfo() return self._wait_for_element_in_container(elements_info, name)
[docs] def deleteController(self, name): return self.deleteElement(name)
[docs] def createInstrument(self, full_name, class_name): self.command_inout("CreateInstrument", [full_name, class_name]) elements_info = self.getElementsInfo() return self._wait_for_element_in_container(elements_info, full_name)
[docs]def registerExtensions(): factory = Factory() factory.registerDeviceClass("Pool", Pool) hw_type_names = [ 'Controller', 'ComChannel', 'Motor', 'PseudoMotor', 'TriggerGate', 'CTExpChannel', 'ZeroDExpChannel', 'OneDExpChannel', 'TwoDExpChannel', 'PseudoCounter', 'IORegister', 'MotorGroup', 'MeasurementGroup'] hw_type_map = [(name, globals()[name]) for name in hw_type_names] for klass_name, klass in hw_type_map: factory.registerDeviceClass(klass_name, klass)
[docs]def unregisterExtensions(): factory = Factory() factory.unregisterDeviceClass("Pool") hw_type_names = [ 'Controller', 'ComChannel', 'Motor', 'PseudoMotor', 'TriggerGate', 'CTExpChannel', 'ZeroDExpChannel', 'OneDExpChannel', 'TwoDExpChannel', 'PseudoCounter', 'IORegister', 'MotorGroup', 'MeasurementGroup'] for klass_name in hw_type_names: factory.unregisterDeviceClass(klass_name)