#!/usr/bin/env python
##############################################################################
##
# This file is part of Sardana
##
# http://www.sardana-controls.org/
##
# Copyright 2011 CELLS / ALBA Synchrotron, Bellaterra, Spain
##
# Sardana is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
##
# Sardana is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
##
# You should have received a copy of the GNU Lesser General Public License
# along with Sardana. If not, see <http://www.gnu.org/licenses/>.
##
##############################################################################
"""The device pool submodule.
It contains specific part of sardana device pool"""
import collections
__all__ = ["InterruptException", "StopException", "AbortException",
"BaseElement", "ControllerClass", "ControllerLibrary",
"PoolElement", "Controller", "ComChannel", "ExpChannel",
"CTExpChannel", "ZeroDExpChannel", "OneDExpChannel",
"TwoDExpChannel", "PseudoCounter", "Motor", "PseudoMotor",
"MotorGroup", "TriggerGate",
"MeasurementGroup", "IORegister", "Instrument", "Pool",
"registerExtensions", "getChannelConfigs"]
__docformat__ = 'restructuredtext'
import copy
import operator
import os
import sys
import time
import traceback
import weakref
import numpy
import PyTango
from PyTango import DevState, AttrDataFormat, AttrQuality, DevFailed, \
DeviceProxy
from taurus import Factory, Device, Attribute
from taurus.core.taurusbasetypes import TaurusEventType
from taurus.core.tango.tangovalidator import TangoAttributeNameValidator
from taurus.core.util.log import Logger
from taurus.core.util.codecs import CodecFactory
from taurus.core.util.containers import CaselessDict
from taurus.core.util.event import EventGenerator, AttributeEventWait, \
AttributeEventIterator
from taurus.core.tango import TangoDevice, FROM_TANGO_TO_STR_TYPE
from sardana import sardanacustomsettings
from .sardana import BaseSardanaElementContainer, BaseSardanaElement
from .motion import Moveable, MoveableSource
Ready = Standby = DevState.ON
Counting = Acquiring = Moving = DevState.MOVING
Alarm = DevState.ALARM
Fault = DevState.FAULT
CHANGE_EVT_TYPES = TaurusEventType.Change, TaurusEventType.Periodic
MOVEABLE_TYPES = 'Motor', 'PseudoMotor', 'MotorGroup'
QUALITY = {
AttrQuality.ATTR_VALID: 'VALID',
AttrQuality.ATTR_INVALID: 'INVALID',
AttrQuality.ATTR_CHANGING: 'CHANGING',
AttrQuality.ATTR_WARNING: 'WARNING',
AttrQuality.ATTR_ALARM: 'ALARM',
None: 'UNKNOWN'
}
def _is_referable(channel):
# Equivalent to ExpChannel.isReferable.
# Use DeviceProxy instead of taurus to avoid crashes in Py3
# See: tango-controls/pytango#292
if isinstance(channel, str):
channel = DeviceProxy(channel)
return "valueref" in list(map(str.lower, channel.get_attribute_list()))
class InterruptException(Exception):
pass
class StopException(InterruptException):
pass
class AbortException(InterruptException):
pass
[docs]class BaseElement(object):
""" The base class for elements in the Pool (Pool itself, Motor,
ControllerClass, ExpChannel all should inherit from this class directly or
indirectly)
"""
def __repr__(self):
pd = self.getPoolData()
return "{0}({1})".format(pd['type'], pd['full_name'])
def __str__(self):
return self.getName()
[docs] def serialize(self):
return self.getPoolData()
[docs] def str(self, n=0):
"""Returns a sequence of strings representing the object in
'consistent' way.
Default is to return <name>, <controller name>, <axis>
:param n: the number of elements in the tuple."""
if n == 0:
return CodecFactory.encode(('json'), self.serialize())
return self._str_tuple[:n]
def __lt__(self, o):
return self.getPoolData()['full_name'] < o.getPoolData()['full_name']
[docs] def getName(self):
return self.getPoolData()['name']
[docs] def getPoolObj(self):
"""Get reference to this object's Pool."""
return self._pool_obj
[docs] def getPoolData(self):
"""Get reference to this object's Pool data."""
return self._pool_data
[docs]class ControllerClass(BaseElement):
def __init__(self, **kw):
self.__dict__.update(kw)
self.path, self.f_name = os.path.split(self.file_name)
self.lib_name, self.ext = os.path.splitext(self.f_name)
def __repr__(self):
pd = self.getPoolData()
return "ControllerClass({0})".format(pd['full_name'])
[docs] def getSimpleFileName(self):
return self.f_name
[docs] def getFileName(self):
return self.file_name
[docs] def getClassName(self):
return self.getName()
[docs] def getType(self):
return self.getTypes()[0]
[docs] def getTypes(self):
return self.types
[docs] def getLib(self):
return self.f_name
[docs] def getGender(self):
return self.gender
[docs] def getModel(self):
return self.model
[docs] def getOrganization(self):
return self.organization
def __lt__(self, o):
if self.getType() != o.getType():
return self.getType() < o.getType()
if self.getGender() != o.getGender():
return self.getGender() < o.getGender()
return self.getClassName() < o.getClassName()
class ControllerLibrary(BaseElement):
def __init__(self, **kw):
self.__dict__.update(kw)
def getType(self):
return self.getTypes()[0]
def getTypes(self):
return self.type
class TangoAttributeEG(Logger, EventGenerator):
"""An event generator for a 'State' attribute"""
def __init__(self, attr):
self._attr = attr
self.call__init__(Logger, 'EG', attr)
event_name = '%s EG' % (attr.getParentObj().getNormalName())
self.call__init__(EventGenerator, event_name)
self._attr.addListener(self)
def getAttribute(self):
return self._attr
def eventReceived(self, evt_src, evt_type, evt_value):
"""Event handler from Taurus"""
if evt_type not in CHANGE_EVT_TYPES:
return
if evt_value is None:
v = None
else:
v = evt_value.value
EventGenerator.fireEvent(self, v)
def read(self, force=False):
try:
self.last_val = self._attr.read(cache=not force).value
except:
self.error("Read error")
self.debug("Details:", exc_info=1)
self.last_val = None
return EventGenerator.read(self)
def readValue(self, force=False):
r = self.read(force=force)
if r is None:
# do a retry
r = self.read(force=force)
return r
def write(self, value):
self._attr.write(value, with_read=False)
def __getattr__(self, name):
return getattr(self._attr, name)
def reservedOperation(fn):
def new_fn(*args, **kwargs):
self = args[0]
wr = self.getReservedWR()
if wr is not None:
if wr().isStopped():
raise StopException("stopped before calling %s" % fn.__name__)
elif wr().isAborted():
raise AbortException("aborted before calling %s" % fn.__name__)
try:
return fn(*args, **kwargs)
except:
print("Exception occurred in reserved operation:"
" clearing events...")
self._clearEventWait()
raise
return new_fn
def get_pool_for_device(db, device):
server_devs = db.get_device_class_list(device.info().server_id)
for dev_name, klass_name in zip(server_devs[0::2], server_devs[1::2]):
if klass_name == "Pool":
return Device(dev_name)
[docs]class PoolElement(BaseElement, TangoDevice):
"""Base class for a Pool element device."""
def __init__(self, name, **kwargs):
"""PoolElement initialization."""
self._reserved = None
self._evt_wait = None
self.__go_start_time = 0
self.__go_end_time = 0
self.__go_time = 0
self._total_go_time = 0
self.call__init__(TangoDevice, name, **kwargs)
# dict<string, TangoAttributeEG>
# key : the attribute name
# value : the corresponding TangoAttributeEG
self._attrEG = CaselessDict()
# force the creation of a state attribute
self.getStateEG()
def _find_pool_obj(self):
pool = get_pool_for_device(self.getParentObj(), self.getHWObj())
return pool
def _find_pool_data(self):
pool = self._find_pool_obj()
return pool.getElementInfo(self.getFullName())._data
# Override BaseElement.getPoolObj because the reference to pool object may
# not be filled. This reference is filled when the element is obtained
# using Pool.getObject. If one obtain the element directly using Taurus
# e.g. mot = taurus.Device(<mot_name>) it won't be filled. In this case
# look for the pool object using the database information.
[docs] def getPoolObj(self):
try:
return self._pool_obj
except AttributeError:
self._pool_obj = self._find_pool_obj()
return self._pool_obj
# Override BaseElement.getPoolData because the reference to pool data may
# not be filled. This reference is filled when the element is obtained
# using Pool.getPoolData. If one obtain the element directly using Taurus
# e.g. mot = taurus.Device(<mot_name>) it won't be filled. In this case
# look for the pool object and its data using the database information.
[docs] def getPoolData(self):
try:
return self._pool_data
except AttributeError:
self._pool_data = self._find_pool_data()
return self._pool_data
[docs] def cleanUp(self):
TangoDevice.cleanUp(self)
self._reserved = None
f = self.factory()
attr_map = self._attrEG
for attr_name in list(attr_map.keys()):
attrEG = attr_map.pop(attr_name)
attr = attrEG.getAttribute()
attrEG = None
f.removeExistingAttribute(attr)
[docs] def reserve(self, obj):
if obj is None:
self._reserved = None
return
self._reserved = weakref.ref(obj, self._unreserveCB)
def _unreserveCB(self, obj):
self.unreserve()
[docs] def unreserve(self):
self._reserved = None
[docs] def isReserved(self, obj=None):
if obj is None:
return self._reserved is not None
else:
o = self._reserved()
return o == obj
[docs] def getReservedWR(self):
return self._reserved
[docs] def getReserved(self):
if self._reserved is None:
return None
return self._reserved()
[docs] def dump_attributes(self):
attr_names = self.get_attribute_list()
req_id = self.read_attributes_asynch(attr_names)
return self.read_attributes_reply(req_id, 2000)
def _getAttrValue(self, name, force=False):
attrEG = self._getAttrEG(name)
if attrEG is None:
return None
return attrEG.readValue(force=force)
def _getAttrEG(self, name):
attrEG = self.getAttrEG(name)
if attrEG is None:
attrEG = self._createAttribute(name)
return attrEG
def _createAttribute(self, name):
attrObj = self.getAttribute(name)
if attrObj is None:
self.warning("Unable to create attribute %s" % name)
return None, None
attrEG = TangoAttributeEG(attrObj)
self._attrEG[name] = attrEG
return attrEG
def _getEventWait(self):
if self._evt_wait is None:
# create an object that waits for attribute events.
# each time we use it we have to connect and disconnect to an
# attribute
self._evt_wait = AttributeEventWait()
return self._evt_wait
def _clearEventWait(self):
self._evt_wait = None
[docs] def getStateEG(self):
return self._getAttrEG('state')
[docs] def getControllerName(self):
return self.getControllerObj().name
[docs] def getControllerObj(self):
full_ctrl_name = self.getPoolData()['controller']
return self.getPoolObj().getObj(full_ctrl_name, "Controller")
[docs] def getAxis(self):
return self.getPoolData()['axis']
[docs] def getType(self):
return self.getPoolData()['type']
[docs] def waitReady(self, timeout=None):
return self.getStateEG().waitEvent(Moving, equal=False,
timeout=timeout)
[docs] def getAttrEG(self, name):
"""Returns the TangoAttributeEG object"""
return self._attrEG.get(name)
[docs] def getAttrObj(self, name):
"""Returns the taurus.core.tangoattribute.TangoAttribute object"""
attrEG = self._attrEG.get(name)
if attrEG is None:
return None
return attrEG.getAttribute()
[docs] def getInstrumentObj(self):
return self._getAttrEG('instrument')
[docs] def getInstrumentName(self, force=False):
instr_name = self._getAttrValue('instrument', force=force)
if not instr_name:
return ''
# instr_name = instr_name[:instr_name.index('(')]
return instr_name
[docs] def setInstrumentName(self, instr_name):
self.getInstrumentObj().write(instr_name)
[docs] def getInstrument(self):
instr_name = self.getInstrumentName()
if not instr_name:
return None
return self.getPoolObj().getObj("Instrument", instr_name)
[docs] @reservedOperation
def start(self, *args, **kwargs):
evt_wait = self._getEventWait()
evt_wait.connect(self.getAttribute("state"))
evt_wait.lock()
try:
evt_wait.waitEvent(DevState.MOVING, equal=False)
self.__go_time = 0
self.__go_start_time = ts1 = time.time()
self._start(*args, **kwargs)
ts2 = time.time()
evt_wait.waitEvent(DevState.MOVING, after=ts1)
except:
evt_wait.disconnect()
raise
finally:
evt_wait.unlock()
ts2 = evt_wait.getRecordedEvents().get(DevState.MOVING, ts2)
return (ts2,)
[docs] def waitFinish(self, timeout=None, id=None):
"""Wait for the operation to finish
:param timeout: optional timeout (seconds)
:type timeout: float
:param id: id of the opertation returned by start
:type id: tuple(float)
"""
# Due to taurus-org/taurus #573 we need to divide the timeout
# in two intervals
if timeout is not None:
timeout = timeout / 2
if id is not None:
id = id[0]
evt_wait = self._getEventWait()
evt_wait.lock()
try:
evt_wait.waitEvent(DevState.MOVING, after=id, equal=False,
timeout=timeout, retries=1)
finally:
self.__go_end_time = time.time()
self.__go_time = self.__go_end_time - self.__go_start_time
evt_wait.unlock()
evt_wait.disconnect()
[docs] @reservedOperation
def go(self, *args, **kwargs):
self._total_go_time = 0
start_time = time.time()
eid = self.start(*args, **kwargs)
timeout = kwargs.get('timeout')
self.waitFinish(id=eid, timeout=timeout)
self._total_go_time = time.time() - start_time
[docs] def getLastGoTime(self):
"""Returns the time it took for last go operation"""
return self.__go_time
[docs] def getTotalLastGoTime(self):
"""Returns the time it took for last go operation, including dead time
to prepare, wait for events, etc"""
return self._total_go_time
[docs] def abort(self, wait_ready=True, timeout=None):
state = self.getStateEG()
state.lock()
try:
self.command_inout("Abort")
if wait_ready:
self.waitReady(timeout=timeout)
finally:
state.unlock()
[docs] def stop(self, wait_ready=True, timeout=None):
state = self.getStateEG()
state.lock()
try:
self.command_inout("Stop")
if wait_ready:
self.waitReady(timeout=timeout)
finally:
state.unlock()
def _information(self, tab=' '):
indent = "\n" + tab + 10 * ' '
msg = [self.getName() + ":"]
try:
state_value = self.stateObj.read().rvalue
# state_value is DevState enumeration (IntEnum)
state = state_value.name.capitalize()
except DevFailed as df:
if len(df.args):
state = df.args[0].desc
else:
e_info = sys.exc_info()[:2]
state = traceback.format_exception_only(*e_info)
except:
e_info = sys.exc_info()[:2]
state = traceback.format_exception_only(*e_info)
try:
msg.append(tab + " State: " + state)
except TypeError:
msg.append(tab + " State: " + state[0])
try:
e_info = sys.exc_info()[:2]
status = self.status()
status = status.replace('\n', indent)
except DevFailed as df:
if len(df.args):
status = df.args[0].desc
else:
e_info = sys.exc_info()[:2]
status = traceback.format_exception_only(*e_info)
except:
e_info = sys.exc_info()[:2]
status = traceback.format_exception_only(*e_info)
msg.append(tab + " Status: " + status)
return msg
[docs]class Controller(PoolElement):
""" Class encapsulating Controller functionality."""
def __init__(self, name, **kw):
"""PoolElement initialization."""
self.call__init__(PoolElement, name, **kw)
[docs] def getModuleName(self):
return self.getPoolData()['module']
[docs] def getClassName(self):
return self.getPoolData()['klass']
[docs] def getTypes(self):
return self.getPoolData()['types']
[docs] def getMainType(self):
return self.getPoolData()['main_type']
[docs] def addElement(self, elem):
axis = elem.getAxis()
self._elems[axis] = elem
self._last_axis = max(self._last_axis, axis)
[docs] def removeElement(self, elem):
axis = elem.getAxis()
del self._elems[elem.getAxis()]
if axis == self._last_axis:
self._last_axis = max(self._elems)
[docs] def getElementByAxis(self, axis):
pool = self.getPoolObj()
for _, elem in \
list(pool.getElementsOfType(self.getMainType()).items()):
if (elem.controller != self.getFullName() or
elem.getAxis() != axis):
continue
return elem
[docs] def getElementByName(self, name):
pool = self.getPoolObj()
for _, elem in \
list(pool.getElementsOfType(self.getMainType()).items()):
if (elem.controller != self.getFullName() or
elem.getName() != name):
continue
return elem
[docs] def getUsedAxes(self):
"""Return axes in use by this controller
:return: list of axes
:rtype: list<int>
"""
pool = self.getPoolObj()
axes = []
for _, elem in \
list(pool.getElementsOfType(self.getMainType()).items()):
if elem.controller != self.getFullName():
continue
axes.append(elem.getAxis())
return sorted(axes)
[docs] def getUsedAxis(self):
msg = ("getUsedAxis is deprecated since version 2.5.0. ",
"Use getUsedAxes instead.")
self.warning(msg)
self.getUsedAxes()
[docs] def getLastUsedAxis(self):
"""Return the last used axis (the highest axis) in this controller
:return: last used axis
:rtype: int or None
"""
used_axes = self.getUsedAxes()
if len(used_axes) == 0:
return None
return max(used_axes)
def __lt__(self, o):
return self.getName() < o.getName()
class ComChannel(PoolElement):
""" Class encapsulating CommunicationChannel functionality."""
pass
[docs]class ExpChannel(PoolElement):
""" Class encapsulating ExpChannel functionality."""
def __init__(self, name, **kw):
"""ExpChannel initialization."""
self.call__init__(PoolElement, name, **kw)
self._last_integ_time = None
self._last_value_ref_pattern = None
self._last_value_ref_enabled = None
self._value_buffer = {}
self._value_buffer_cb = None
codec_name = getattr(sardanacustomsettings, "VALUE_BUFFER_CODEC")
self._value_buffer_codec = CodecFactory().getCodec(codec_name)
self._value_ref_buffer = {}
self._value_ref_buffer_cb = None
codec_name = getattr(sardanacustomsettings, "VALUE_REF_BUFFER_CODEC")
self._value_ref_buffer_codec = CodecFactory().getCodec(codec_name)
[docs] def isReferable(self):
if "valueref" in list(map(str.lower, self.get_attribute_list())):
return True
return False
[docs] def getIntegrationTime(self):
return self._getAttrValue('IntegrationTime')
[docs] def getIntegrationTimeObj(self):
return self._getAttrEG('IntegrationTime')
[docs] def setIntegrationTime(self, integ_time):
self.getIntegrationTimeObj().write(integ_time)
[docs] def putIntegrationTime(self, integ_time):
if self._last_integ_time == integ_time:
return
self._last_integ_time = integ_time
self.getIntegrationTimeObj().write(integ_time)
[docs] def getValueObj_(self):
"""Retrurns Value attribute event generator object.
:return: Value attribute event generator
:rtype: TangoAttributeEG
..todo:: When support to Taurus 3 will be dropped provide getValueObj.
Taurus 3 TaurusDevice class already uses this name.
"""
return self._getAttrEG('value')
[docs] def getValue(self, force=False):
return self._getAttrValue('value', force=force)
[docs] def getValueBufferObj(self):
return self._getAttrEG('valuebuffer')
[docs] def getValueBuffer(self):
return self._value_buffer
[docs] def valueBufferChanged(self, value_buffer):
if value_buffer is None:
return
_, value_buffer = self._value_buffer_codec.decode(value_buffer)
indexes = value_buffer["index"]
values = value_buffer["value"]
for index, value in zip(indexes, values):
self._value_buffer[index] = value
[docs] def getValueRefObj(self):
"""Return ValueRef attribute event generator object.
:return: ValueRef attribute event generator
:rtype: TangoAttributeEG
"""
return self._getAttrEG('value')
[docs] def getValueRef(self, force=False):
return self._getAttrValue('valueref', force=force)
[docs] def getValueRefBufferObj(self):
return self._getAttrEG('valuerefbuffer')
[docs] def getValueRefBuffer(self):
return self._value_ref_buffer
[docs] def valueBufferRefChanged(self, value_ref_buffer):
if value_ref_buffer is None:
return
_, value_ref_buffer = self._value_ref_buffercodec.decode(
value_ref_buffer)
indexes = value_ref_buffer["index"]
value_refs = value_ref_buffer["value_ref"]
for index, value_ref in zip(indexes, value_refs):
self._value_ref_buffer[index] = value_ref
[docs] def getValueRefPattern(self):
return self._getAttrValue('ValueRefPattern')
[docs] def getValueRefPatternObj(self):
return self._getAttrEG('ValueRefPattern')
[docs] def setValueRefPattern(self, value_ref_pattern):
self.getValueRefPatternObj().write(value_ref_pattern)
[docs] def putValueRefPattern(self, value_ref_pattern):
if self._last_value_ref_pattern == value_ref_pattern:
return
self._last_value_ref_pattern = value_ref_pattern
self.getValueRefPatternObj().write(value_ref_pattern)
[docs] def isValueRefEnabled(self):
return self._getAttrValue('ValueRefEnabled')
[docs] def getValueRefEnabledObj(self):
return self._getAttrEG('ValueRefEnabled')
[docs] def setValueRefEnabled(self, value_ref_enabled):
self.getValueRefEnabledObj().write(value_ref_enabled)
[docs] def putValueRefEnabled(self, value_ref_enabled):
if self._last_value_ref_enabled == value_ref_enabled:
return
self._last_value_ref_enabled = value_ref_enabled
self.getValueRefEnabledObj().write(value_ref_enabled)
def _start(self, *args, **kwargs):
self.Start()
[docs] def go(self, *args, **kwargs):
"""Count and report count result.
Configuration measurement, then start and wait until finish.
.. note::
The count (go) method API is partially experimental (value
references may be changed to values whenever possible in the
future). Backwards incompatible changes may occur if deemed
necessary by the core developers.
:return: state and value (or value reference - experimental)
:rtype: :obj:`tuple`
"""
start_time = time.time()
integration_time = args[0]
self.putIntegrationTime(integration_time)
PoolElement.go(self)
state = self.getStateEG().readValue()
if self.isReferable() and self.isValueRefEnabled():
result = self.getValueRef()
else:
result = self.getValue()
ret = state, result
self._total_go_time = time.time() - start_time
return ret
startCount = PoolElement.start
waitCount = PoolElement.waitFinish
count = go
stopCount = PoolElement.abort
stop = PoolElement.stop
class TimerableExpChannel(ExpChannel):
def getTimer(self):
return self._getAttrValue('Timer')
def getTimerObj(self):
return self._getAttrEG('Timer')
def setTimer(self, timer):
self.getTimerObj().write(timer)
[docs]class CTExpChannel(TimerableExpChannel):
""" Class encapsulating CTExpChannel functionality."""
pass
[docs]class ZeroDExpChannel(ExpChannel):
""" Class encapsulating ZeroDExpChannel functionality."""
pass
[docs]class OneDExpChannel(TimerableExpChannel):
""" Class encapsulating OneDExpChannel functionality."""
pass
[docs]class TwoDExpChannel(TimerableExpChannel):
""" Class encapsulating TwoDExpChannel functionality."""
pass
[docs]class PseudoCounter(ExpChannel):
""" Class encapsulating PseudoCounter functionality."""
pass
[docs]class TriggerGate(PoolElement):
""" Class encapsulating TriggerGate functionality."""
pass
[docs]class Motor(PoolElement, Moveable):
""" Class encapsulating Motor functionality."""
def __init__(self, name, **kw):
"""PoolElement initialization."""
self.call__init__(PoolElement, name, **kw)
self.call__init__(Moveable)
[docs] def getPosition(self, force=False):
return self._getAttrValue('position', force=force)
[docs] def getDialPosition(self, force=False):
return self._getAttrValue('dialposition', force=force)
[docs] def getVelocity(self, force=False):
return self._getAttrValue('velocity', force=force)
[docs] def getAcceleration(self, force=False):
return self._getAttrValue('acceleration', force=force)
[docs] def getDeceleration(self, force=False):
return self._getAttrValue('deceleration', force=force)
[docs] def getBaseRate(self, force=False):
return self._getAttrValue('base_rate', force=force)
[docs] def getBacklash(self, force=False):
return self._getAttrValue('backlash', force=force)
[docs] def getLimitSwitches(self, force=False):
return self._getAttrValue('limit_switches', force=force)
[docs] def getOffset(self, force=False):
return self._getAttrValue('offset', force=force)
[docs] def getStepPerUnit(self, force=False):
return self._getAttrValue('step_per_unit', force=force)
[docs] def getSign(self, force=False):
return self._getAttrValue('Sign', force=force)
[docs] def getSimulationMode(self, force=False):
return self._getAttrValue('SimulationMode', force=force)
[docs] def getPositionObj(self):
return self._getAttrEG('position')
[docs] def getDialPositionObj(self):
return self._getAttrEG('dialposition')
[docs] def getVelocityObj(self):
return self._getAttrEG('velocity')
[docs] def getAccelerationObj(self):
return self._getAttrEG('acceleration')
[docs] def getDecelerationObj(self):
return self._getAttrEG('deceleration')
[docs] def getBaseRateObj(self):
return self._getAttrEG('base_rate')
[docs] def getBacklashObj(self):
return self._getAttrEG('backlash')
[docs] def getLimitSwitchesObj(self):
return self._getAttrEG('limit_switches')
[docs] def getOffsetObj(self):
return self._getAttrEG('offset')
[docs] def getStepPerUnitObj(self):
return self._getAttrEG('step_per_unit')
[docs] def getSimulationModeObj(self):
return self._getAttrEG('step_per_unit')
[docs] def setVelocity(self, value):
return self.getVelocityObj().write(value)
[docs] def setAcceleration(self, value):
return self.getAccelerationObj().write(value)
[docs] def setDeceleration(self, value):
return self.getDecelerationObj().write(value)
[docs] def setBaseRate(self, value):
return self.getBaseRateObj().write(value)
[docs] def setBacklash(self, value):
return self.getBacklashObj().write(value)
[docs] def setOffset(self, value):
return self.getOffsetObj().write(value)
[docs] def setStepPerUnit(self, value):
return self.getStepPerUnitObj().write(value)
[docs] def setSign(self, value):
return self.getSignObj().write(value)
# -~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-
# Moveable interface
#
def _start(self, *args, **kwargs):
new_pos = args[0]
if isinstance(new_pos, collections.Sequence):
new_pos = new_pos[0]
try:
self.write_attribute('position', new_pos)
except DevFailed as df:
for err in df.args:
if err.reason == 'API_AttrNotAllowed':
raise RuntimeError('%s is already moving' % self)
else:
raise
self.final_pos = new_pos
[docs] def go(self, *args, **kwargs):
start_time = time.time()
PoolElement.go(self, *args, **kwargs)
ret = self.getStateEG().readValue(), self.readPosition()
self._total_go_time = time.time() - start_time
return ret
startMove = PoolElement.start
waitMove = PoolElement.waitFinish
move = go
getLastMotionTime = PoolElement.getLastGoTime
getTotalLastMotionTime = PoolElement.getTotalLastGoTime
[docs] @reservedOperation
def iterMove(self, new_pos, timeout=None):
if isinstance(new_pos, collections.Sequence):
new_pos = new_pos[0]
state, pos = self.getAttribute("state"), self.getAttribute("position")
evt_wait = self._getEventWait()
evt_wait.connect(state)
evt_wait.lock()
try:
# evt_wait.waitEvent(DevState.MOVING, equal=False)
time_stamp = time.time()
try:
self.getPositionObj().write(new_pos)
except DevFailed as err_traceback:
for err in err_traceback.args:
if err.reason == 'API_AttrNotAllowed':
raise RuntimeError('%s is already moving' % self)
else:
raise
self.final_pos = new_pos
# putting timeout=0.1 and retries=1 is a patch for the case when
# the initial moving event doesn't arrive do to an unknown
# tango/pytango error at the time
evt_wait.waitEvent(DevState.MOVING, time_stamp,
timeout=0.1, retries=1)
finally:
evt_wait.unlock()
evt_wait.disconnect()
evt_iter_wait = AttributeEventIterator(state, pos)
evt_iter_wait.lock()
try:
for evt_data in evt_iter_wait.events():
src, value = evt_data
if src == state and value != DevState.MOVING:
raise StopIteration
yield value
finally:
evt_iter_wait.unlock()
evt_iter_wait.disconnect()
[docs] def readPosition(self, force=False):
return [self.getPosition(force=force)]
[docs] def getMoveableSource(self):
return self.getPoolObj()
[docs] def getSize(self):
return 1
[docs] def getIndex(self, name):
if name.lower() == self.getName().lower():
return 0
return -1
#
# End of Moveable interface
# -~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-
def _information(self, tab=' '):
msg = PoolElement._information(self, tab=tab)
try:
position = self.read_attribute("position")
pos = str(position.value)
if position.quality != AttrQuality.ATTR_VALID:
pos += " [" + QUALITY[position.quality] + "]"
except DevFailed as df:
if len(df.args):
pos = df.args[0].desc
else:
e_info = sys.exc_info()[:2]
pos = traceback.format_exception_only(*e_info)
except:
e_info = sys.exc_info()[:2]
pos = traceback.format_exception_only(*e_info)
msg.append(tab + "Position: " + str(pos))
return msg
[docs]class PseudoMotor(PoolElement, Moveable):
""" Class encapsulating PseudoMotor functionality."""
def __init__(self, name, **kw):
"""PoolElement initialization."""
self.call__init__(PoolElement, name, **kw)
self.call__init__(Moveable)
[docs] def getPosition(self, force=False):
return self._getAttrValue('position', force=force)
[docs] def getDialPosition(self, force=False):
return self.getPosition(force=force)
[docs] def getPositionObj(self):
return self._getAttrEG('position')
[docs] def getDialPositionObj(self):
return self.getPositionObj()
# -~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-
# Moveable interface
#
def _start(self, *args, **kwargs):
new_pos = args[0]
if isinstance(new_pos, collections.Sequence):
new_pos = new_pos[0]
try:
self.write_attribute('position', new_pos)
except DevFailed as df:
for err in df.args:
if err.reason == 'API_AttrNotAllowed':
raise RuntimeError('%s is already moving' % self)
else:
raise
self.final_pos = new_pos
[docs] def go(self, *args, **kwargs):
start_time = time.time()
PoolElement.go(self, *args, **kwargs)
ret = self.getStateEG().readValue(), self.readPosition()
self._total_go_time = time.time() - start_time
return ret
startMove = PoolElement.start
waitMove = PoolElement.waitFinish
move = go
getLastMotionTime = PoolElement.getLastGoTime
getTotalLastMotionTime = PoolElement.getTotalLastGoTime
[docs] def readPosition(self, force=False):
return [self.getPosition(force=force)]
[docs] def getMoveableSource(self):
return self.getPoolObj()
[docs] def getSize(self):
return 1
[docs] def getIndex(self, name):
if name.lower() == self.getName().lower():
return 0
return -1
#
# End of Moveable interface
# -~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-
def _information(self, tab=' '):
msg = PoolElement._information(self, tab=tab)
try:
position = self.read_attribute("position")
pos = str(position.value)
if position.quality != AttrQuality.ATTR_VALID:
pos += " [" + QUALITY[position.quality] + "]"
except DevFailed as df:
if len(df.args):
pos = df.args[0].desc
else:
e_info = sys.exc_info()[:2]
pos = traceback.format_exception_only(*e_info)
except:
e_info = sys.exc_info()[:2]
pos = traceback.format_exception_only(*e_info)
msg.append(tab + "Position: " + str(pos))
return msg
[docs]class MotorGroup(PoolElement, Moveable):
""" Class encapsulating MotorGroup functionality."""
def __init__(self, name, **kw):
"""PoolElement initialization."""
self.call__init__(PoolElement, name, **kw)
self.call__init__(Moveable)
def _create_str_tuple(self):
return 3 * ["TODO"]
[docs] def getMotorNames(self):
return self.getPoolData()['elements']
[docs] def hasMotor(self, name):
motor_names = list(map(str.lower, self.getMotorNames()))
return name.lower() in motor_names
[docs] def getPosition(self, force=False):
return self._getAttrValue('position', force=force)
[docs] def getPositionObj(self):
return self._getAttrEG('position')
# -~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-
# Moveable interface
#
def _start(self, *args, **kwargs):
new_pos = args[0]
try:
self.write_attribute('position', new_pos)
except DevFailed as df:
for err in df.args:
if err.reason == 'API_AttrNotAllowed':
raise RuntimeError('%s is already moving' % self)
else:
raise
self.final_pos = new_pos
[docs] def go(self, *args, **kwargs):
start_time = time.time()
PoolElement.go(self, *args, **kwargs)
ret = self.getStateEG().readValue(), self.readPosition()
self._total_go_time = time.time() - start_time
return ret
startMove = PoolElement.start
waitMove = PoolElement.waitFinish
move = go
getLastMotionTime = PoolElement.getLastGoTime
getTotalLastMotionTime = PoolElement.getTotalLastGoTime
[docs] def readPosition(self, force=False):
return self.getPosition(force=force)
[docs] def getMoveableSource(self):
return self.getPoolObj()
[docs] def getSize(self):
return len(self.getMotorNames())
[docs] def getIndex(self, name):
try:
motor_names = list(map(str.lower, self.getMotorNames()))
return motor_names.index(name.lower())
except:
return -1
#
# End of Moveable interface
# -~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-
def _information(self, tab=' '):
msg = PoolElement._information(self, tab=tab)
try:
position = self.read_attribute("position")
pos = str(position.value)
if position.quality != AttrQuality.ATTR_VALID:
pos += " [" + QUALITY[position.quality] + "]"
except DevFailed as df:
if len(df.args):
pos = df.args[0].desc
else:
e_info = sys.exc_info()[:2]
pos = traceback.format_exception_only(*e_info)
except:
e_info = sys.exc_info()[:2]
pos = traceback.format_exception_only(*e_info)
msg.append(tab + "Position: " + str(pos))
return msg
class BaseChannelInfo(object):
def __init__(self, data):
# dict<str, obj>
# channel data
self.raw_data = data
self.__dict__.update(data)
class TangoChannelInfo(BaseChannelInfo):
def __init__(self, data, info):
BaseChannelInfo.__init__(self, data)
# PyTango.AttributeInfoEx
self.set_info(info)
def has_info(self):
return self.raw_info is not None
def set_info(self, info):
self.raw_info = info
if info is None:
return
data = self.raw_data
if 'data_type' not in data:
data_type = info.data_type
try:
self.data_type = FROM_TANGO_TO_STR_TYPE[data_type]
except KeyError as e:
# For backwards compatibility:
# starting from Taurus 4.3.0 DevVoid was added to the dict
if data_type == PyTango.DevVoid:
self.data_type = None
else:
raise e
if 'shape' not in data:
shape = ()
if info.data_format == AttrDataFormat.SPECTRUM:
shape = (info.max_dim_x,)
elif info.data_format == AttrDataFormat.IMAGE:
shape = (info.max_dim_x, info.max_dim_y)
self.shape = shape
else:
shape = self.shape
self.shape = list(shape)
def __getattr__(self, name):
if self.has_info():
return getattr(self.raw_info, name)
cls_name = self.__class__.__name__
raise AttributeError("'%s' has no attribute '%s'" % (cls_name, name))
def getChannelConfigs(mgconfig, ctrls=None, sort=True):
'''
gets a list of channel configurations of the controllers of the given
measurement group configuration. It optionally filters to those channels
matching given lists of controller.
:param ctrls: (seq<str> or None) a sequence of strings to filter the
controllers. If None given, all controllers will be used
:param sort: (bool) If True (default) the returned list will be sorted
according to channel index (if given in channeldata) and
then by channelname.
:return: (list<tuple>) A list of channelname,channeldata pairs.
'''
chconfigs = []
if not mgconfig:
return []
for ctrl_name, ctrl_data in list(mgconfig['controllers'].items()):
if ctrls is None or ctrl_name in ctrls:
for ch_name, ch_data in list(ctrl_data['channels'].items()):
ch_data.update({'_controller_name': ctrl_name})
chconfigs.append((ch_name, ch_data))
if sort:
# sort the channel configs by index (primary sort) and then by channel
# name.
# sort by channel_name
chconfigs = sorted(chconfigs, key=lambda c: c[0])
# sort by index (give a very large index for those which don't have it)
chconfigs = sorted(chconfigs, key=lambda c: c[1].get('index', 1e16))
return chconfigs
class MGConfiguration(object):
def __init__(self, mg, data):
self._mg = weakref.ref(mg)
if isinstance(data, str):
data = CodecFactory().decode(('json', data))
self.raw_data = data
self.__dict__.update(data)
# dict<str, dict>
# where key is the channel name and value is the channel data in form
# of a dict as receveid by the MG configuration attribute
self.channels = channels = CaselessDict()
for _, ctrl_data in list(self.controllers.items()):
for channel_name, channel_data in \
list(ctrl_data['channels'].items()):
channels[channel_name] = channel_data
#####################
# @todo: the for-loops above could be replaced by something like:
# self.channels = channels = \
# CaselessDict(getChannelConfigs(data, sort=False))
#####################
# seq<dict> each element is the channel data in form of a dict as
# receveid by the MG configuration attribute. This seq is just a cache
# ordered by channel index in the MG.
self.channel_list = len(channels) * [None]
for channel in list(channels.values()):
self.channel_list[channel['index']] = channel
# dict<str, list[DeviceProxy, CaselessDict<str, dict>]>
# where key is a device name and value is a list with two elements:
# - A device proxy or None if there was an error building it
# - A dict where keys are attribute names and value is a reference to
# a dict representing channel data as received in raw data
self.tango_dev_channels = None
# Number of elements in tango_dev_channels in error (could not build
# DeviceProxy, probably)
self.tango_dev_channels_in_error = 0
# dict<str, tuple<str, str, TangoChannelInfo>>
# where key is a channel name and value is a tuple of three elements:
# - device name
# - attribute name
# - attribute information or None if there was an error trying to get
# the information
self.tango_channels_info = None
# Number of elements in tango_channels_info_in_error in error
# (could not build attribute info, probably)
self.tango_channels_info_in_error = 0
# dict<str, dict>
# where key is a channel name and data is a reference to a dict
# representing channel data as received in raw data
self.non_tango_channels = None
self.initialized = False
def _build(self):
# internal channel structure that groups channels by tango device so
# they can be read as a group minimizing this way the network requests
self.tango_dev_channels = tg_dev_chs = CaselessDict()
self.tango_dev_channels_in_error = 0
self.tango_channels_info = tg_chs_info = CaselessDict()
self.tango_channels_info_in_error = 0
self.non_tango_channels = n_tg_chs = CaselessDict()
self.cache = cache = {}
tg_attr_validator = TangoAttributeNameValidator()
for channel_name, channel_data in list(self.channels.items()):
cache[channel_name] = None
data_source = channel_data['source']
params = tg_attr_validator.getParams(data_source)
if params is None:
# Handle NON tango channel
n_tg_chs[channel_name] = channel_data
else:
# Handle tango channel
dev_name = params['devname'].lower()
attr_name = params['_shortattrname'].lower()
host, port = params.get('host'), params.get('port')
if host is not None and port is not None:
dev_name = "tango://{0}:{1}/{2}".format(host, port,
dev_name)
dev_data = tg_dev_chs.get(dev_name)
# technical debt: read Value or ValueRef attribute
# ideally the source configuration should include this info
# Use DeviceProxy instead of taurus to avoid crashes in Py3
# See: tango-controls/pytango#292
# channel = Device(dev_name)
# if (isinstance(channel, ExpChannel)
# and channel.isReferable()
# and channel_data.get("value_ref_enabled", False)):
if (_is_referable(dev_name)
and channel_data.get("value_ref_enabled", False)):
attr_name += "Ref"
if dev_data is None:
# Build tango device
dev = None
try:
dev = DeviceProxy(dev_name)
except:
self.tango_dev_channels_in_error += 1
tg_dev_chs[dev_name] = dev_data = [dev, CaselessDict()]
dev, attr_data = dev_data
attr_data[attr_name] = channel_data
# get attribute configuration
attr_info = None
if dev is None:
self.tango_channels_info_in_error += 1
else:
try:
tg_attr_info = dev.get_attribute_config_ex(attr_name)[
0]
except:
tg_attr_info = \
self._build_empty_tango_attr_info(channel_data)
self.tango_channels_info_in_error += 1
attr_info = TangoChannelInfo(channel_data, tg_attr_info)
tg_chs_info[channel_name] = dev_name, attr_name, attr_info
def _build_empty_tango_attr_info(self, channel_data):
ret = PyTango.AttributeInfoEx()
ret.name = channel_data['name']
ret.label = channel_data['label']
return ret
def prepare(self):
# first time? build everything
if self.tango_dev_channels is None:
return self._build()
# prepare missing tango devices
if self.tango_dev_channels_in_error > 0:
for dev_name, dev_data in list(self.tango_dev_channels.items()):
if dev_data[0] is None:
try:
dev_data[0] = DeviceProxy(dev_name)
self.tango_dev_channels_in_error -= 1
except:
pass
# prepare missing tango attribute configuration
if self.tango_channels_info_in_error > 0:
for _, attr_data in list(self.tango_channels_info.items()):
dev_name, attr_name, attr_info = attr_data
if attr_info.has_info():
continue
dev = self.tango_dev_channels[dev_name]
if dev is None:
continue
try:
tg_attr_info = dev.get_attribute_config_ex(attr_name)[0]
attr_info.set_info(tg_attr_info)
self.tango_channels_info_in_error -= 1
except:
pass
def getChannels(self):
return self.channel_list
def getChannelInfo(self, channel_name):
try:
return self.tango_channels_info[channel_name]
except Exception:
channel_name = channel_name.lower()
for d_name, a_name, ch_info in \
list(self.tango_channels_info.values()):
if ch_info.name.lower() == channel_name:
return d_name, a_name, ch_info
def getChannelsInfo(self, only_enabled=False):
"""Returns information about the channels present in the measurement
group in a form of dictionary, where key is a channel name and value is
a tuple of three elements:
- device name
- attribute name
- attribute information or None if there was an error trying to get
the information
:param only_enabled: flag to filter out disabled channels
:type only_enabled: bool
:return: dictionary with channels info
:rtype: dict<str, tuple<str, str, TangoChannelInfo>>
"""
self.prepare()
ret = CaselessDict(self.tango_channels_info)
ret.update(self.non_tango_channels)
for ch_name, (_, _, ch_info) in list(ret.items()):
if only_enabled and not ch_info.enabled:
ret.pop(ch_name)
return ret
def getChannelsInfoList(self, only_enabled=False):
"""Returns information about the channels present in the measurement
group in a form of ordered, based on the channel index, list.
:param only_enabled: flag to filter out disabled channels
:type only_enabled: bool
:return: list with channels info
:rtype: list<TangoChannelInfo>
"""
channels_info = self.getChannelsInfo(only_enabled=only_enabled)
ret = []
for _, (_, _, ch_info) in list(channels_info.items()):
ret.append(ch_info)
ret = sorted(ret, key=lambda x: x.index)
return ret
def getCountersInfoList(self):
channels_info = self.getChannelsInfoList()
timer_name, idx = self.timer, -1
for i, ch in enumerate(channels_info):
if ch['full_name'] == timer_name:
idx = i
break
if idx >= 0:
channels_info.pop(idx)
return channels_info
def getTangoDevChannels(self, only_enabled=False):
"""Returns Tango channels (attributes) that could be used to read
measurement group results in a form of dict where key is a device name
and value is a list with two elements:
- A device proxy or None if there was an error building it
- A dict where keys are attribute names and value is a reference to
a dict representing channel data as received in raw data
:param only_enabled: flag to filter out disabled channels
:type only_enabled: bool
:return: dict with Tango channels
:rtype: dict<str, list[DeviceProxy, CaselessDict<str, dict>]>
"""
if not only_enabled:
return self.tango_dev_channels
tango_dev_channels = {}
for dev_name, dev_data in list(self.tango_dev_channels.items()):
dev_proxy, attrs = dev_data[0], copy.deepcopy(dev_data[1])
for attr_name, channel_data in list(attrs.items()):
if not channel_data["enabled"]:
attrs.pop(attr_name)
tango_dev_channels[dev_name] = [dev_proxy, attrs]
return tango_dev_channels
def read(self, parallel=True):
if parallel:
return self._read_parallel()
return self._read()
def _read_parallel(self):
self.prepare()
ret = CaselessDict(self.cache)
dev_replies = {}
# deposit read requests
tango_dev_channels = self.getTangoDevChannels(only_enabled=True)
for _, dev_data in list(tango_dev_channels.items()):
dev, attrs = dev_data
if dev is None:
continue
try:
dev_replies[dev] = dev.read_attributes_asynch(
list(attrs.keys())), attrs
except Exception:
dev_replies[dev] = None, attrs
# gather all replies
for dev, reply_data in list(dev_replies.items()):
reply, attrs = reply_data
try:
data = dev.read_attributes_reply(reply, 0)
for data_item in data:
channel_data = attrs[data_item.name]
if data_item.has_failed:
value = None
else:
value = data_item.value
ret[channel_data['full_name']] = value
except Exception:
for _, channel_data in list(attrs.items()):
ret[channel_data['full_name']] = None
return ret
def _read(self):
self.prepare()
ret = CaselessDict(self.cache)
tango_dev_channels = self.getTangoDevChannels(only_enabled=True)
for _, dev_data in list(tango_dev_channels.items()):
dev, attrs = dev_data
try:
data = dev.read_attributes(list(attrs.keys()))
for data_item in data:
channel_data = attrs[data_item.name]
if data_item.has_failed:
value = None
else:
value = data_item.value
ret[channel_data['full_name']] = value
except Exception:
for _, channel_data in list(attrs.items()):
ret[channel_data['full_name']] = None
return ret
[docs]class MeasurementGroup(PoolElement):
""" Class encapsulating MeasurementGroup functionality."""
def __init__(self, name, **kw):
"""PoolElement initialization."""
self._configuration = None
self._channels = None
self._last_integ_time = None
self.call__init__(PoolElement, name, **kw)
self.__cfg_attr = self.getAttribute('configuration')
self.__cfg_attr.addListener(self.on_configuration_changed)
self._value_buffer_cb = None
self._value_buffer_channels = None
codec_name = getattr(sardanacustomsettings, "VALUE_BUFFER_CODEC")
self._value_buffer_codec = CodecFactory().getCodec(codec_name)
self._value_ref_buffer_cb = None
self._value_ref_buffer_channels = None
codec_name = getattr(sardanacustomsettings, "VALUE_REF_BUFFER_CODEC")
self._value_ref_buffer_codec = CodecFactory().getCodec(codec_name)
[docs] def cleanUp(self):
PoolElement.cleanUp(self)
f = self.factory()
f.removeExistingAttribute(self.__cfg_attr)
def _create_str_tuple(self):
channel_names = ", ".join(self.getChannelNames())
return self.getName(), self.getTimerName(), channel_names
[docs] def getConfigurationAttrEG(self):
return self._getAttrEG('Configuration')
[docs] def setConfiguration(self, configuration):
codec = CodecFactory().getCodec('json')
f, data = codec.encode(('', configuration))
self.write_attribute('configuration', data)
def _setConfiguration(self, data):
self._configuration = MGConfiguration(self, data)
[docs] def getConfiguration(self, force=False):
if force or self._configuration is None:
data = self.getConfigurationAttrEG().readValue(force=True)
self._setConfiguration(data)
return self._configuration
[docs] def on_configuration_changed(self, evt_src, evt_type, evt_value):
if evt_type not in CHANGE_EVT_TYPES:
return
self.info("Configuration changed")
self._setConfiguration(evt_value.value)
[docs] def getTimerName(self):
return self.getTimer()['name']
[docs] def getTimer(self):
cfg = self.getConfiguration()
return cfg.channels[cfg.timer]
[docs] def getTimerValue(self):
return self.getTimerName()
[docs] def getMonitorName(self):
return self.getMonitor()['name']
[docs] def getMonitor(self):
cfg = self.getConfiguration()
return cfg.channels[cfg.monitor]
[docs] def setTimer(self, timer_name):
try:
self.getChannel(timer_name)
except KeyError:
raise Exception("%s does not contain a channel named '%s'"
% (str(self), timer_name))
cfg = self.getConfiguration().raw_data
cfg['timer'] = timer_name
import json
self.write_attribute("configuration", json.dumps(cfg))
[docs] def getChannels(self):
return self.getConfiguration().getChannels()
[docs] def getCounters(self):
cfg = self.getConfiguration()
return [c for c in self.getChannels() if c['full_name'] != cfg.timer]
[docs] def getChannelNames(self):
return [ch['name'] for ch in self.getChannels()]
[docs] def getCounterNames(self):
return [ch['name'] for ch in self.getCounters()]
[docs] def getChannelLabels(self):
return [ch['label'] for ch in self.getChannels()]
[docs] def getCounterLabels(self):
return [ch['label'] for ch in self.getCounters()]
[docs] def getChannel(self, name):
return self.getConfiguration().channels[name]
[docs] def getChannelInfo(self, name):
return self.getConfiguration().getChannelInfo(name)
[docs] def getChannelsInfo(self):
return self.getConfiguration().getChannelsInfoList()
[docs] def getChannelsEnabledInfo(self):
"""Returns information about **only enabled** channels present in the
measurement group in a form of ordered, based on the channel index,
list.
:return: list with channels info
:rtype: list<TangoChannelInfo>
"""
return self.getConfiguration().getChannelsInfoList(only_enabled=True)
[docs] def getCountersInfo(self):
return self.getConfiguration().getCountersInfoList()
[docs] def getValues(self, parallel=True):
return self.getConfiguration().read(parallel=parallel)
[docs] def getValueBuffers(self):
value_buffers = []
for channel_info in self.getChannels():
channel = Device(channel_info["full_name"])
value_buffers.append(channel.getValueBuffer())
return value_buffers
[docs] def getIntegrationTime(self):
return self._getAttrValue('IntegrationTime')
[docs] def getIntegrationTimeObj(self):
return self._getAttrEG('IntegrationTime')
[docs] def setIntegrationTime(self, ctime):
self.getIntegrationTimeObj().write(ctime)
[docs] def putIntegrationTime(self, ctime):
if self._last_integ_time == ctime:
return
self._last_integ_time = ctime
self.getIntegrationTimeObj().write(ctime)
[docs] def getAcquisitionModeObj(self):
return self._getAttrEG('AcquisitionMode')
[docs] def getAcquisitionMode(self):
return self._getAttrValue('AcquisitionMode')
[docs] def setAcquisitionMode(self, acqMode):
self.getAcquisitionModeObj().write(acqMode)
[docs] def getSynchronizationObj(self):
return self._getAttrEG('Synchronization')
[docs] def getSynchronization(self):
return self._getAttrValue('Synchronization')
[docs] def setSynchronization(self, synchronization):
codec = CodecFactory().getCodec('json')
_, data = codec.encode(('', synchronization))
self.getSynchronizationObj().write(data)
self._last_integ_time = None
# NbStarts Methods
[docs] def getNbStartsObj(self):
return self._getAttrEG('NbStarts')
[docs] def setNbStarts(self, starts):
self.getNbStartsObj().write(starts)
[docs] def getNbStarts(self):
return self._getAttrValue('NbStarts')
[docs] def getMoveableObj(self):
return self._getAttrEG('Moveable')
[docs] def getMoveable(self):
return self._getAttrValue('Moveable')
[docs] def getLatencyTimeObj(self):
return self._getAttrEG('LatencyTime')
[docs] def getLatencyTime(self):
return self._getAttrValue('LatencyTime')
[docs] def setMoveable(self, moveable=None):
if moveable is None:
moveable = 'None' # Tango attribute is of type DevString
self.getMoveableObj().write(moveable)
[docs] def valueBufferChanged(self, channel, value_buffer):
"""Receive value buffer updates, pre-process them, and call
the subscribed callback.
:param channel: channel that reports value buffer update
:type channel: ExpChannel
:param value_buffer: json encoded value buffer update, it contains
at least values and indexes
:type value_buffer: :obj:`str`
"""
if value_buffer is None:
return
_, value_buffer = self._value_buffer_codec.decode(value_buffer)
values = value_buffer["value"]
if isinstance(values[0], list):
np_values = list(map(numpy.array, values))
value_buffer["value"] = np_values
self._value_buffer_cb(channel, value_buffer)
[docs] def subscribeValueBuffer(self, cb=None):
"""Subscribe to channels' value buffer update events. If no
callback is passed, the default channel's callback is subscribed which
will store the data in the channel's value_buffer attribute.
:param cb: callback to be subscribed, None means subscribe the default
channel's callback
:type cb: callable
"""
self._value_buffer_channels = []
for channel_info in self.getChannels():
full_name = channel_info["full_name"]
value_ref_enabled = channel_info.get("value_ref_enabled", False)
# Use DeviceProxy instead of taurus to avoid crashes in Py3
# See: tango-controls/pytango#292
if _is_referable(full_name) and value_ref_enabled:
continue
channel = Device(full_name)
value_buffer_obj = channel.getValueBufferObj()
if cb is not None:
self._value_buffer_cb = cb
value_buffer_obj.subscribeEvent(self.valueBufferChanged,
channel, False)
else:
value_buffer_obj.subscribeEvent(channel.valueBufferChanged,
with_first_event=False)
self._value_buffer_channels.append(channel)
[docs] def unsubscribeValueBuffer(self, cb=None):
"""Unsubscribe from channels' value buffer events. If no callback is
passed, unsubscribe the channel's default callback.
:param cb: callback to be unsubscribed, None means unsubscribe the
default channel's callback
:type cb: callable
"""
for channel_info in self.getChannels():
full_name = channel_info["full_name"]
value_ref_enabled = channel_info.get("value_ref_enabled", False)
# Use DeviceProxy instead of taurus to avoid crashes in Py3
# See: tango-controls/pytango#292
if _is_referable(full_name) and value_ref_enabled:
continue
channel = Device(full_name)
value_buffer_obj = channel.getValueBufferObj()
if cb is not None:
value_buffer_obj.unsubscribeEvent(self.valueBufferChanged,
channel)
self._value_buffer_cb = None
else:
value_buffer_obj.unsubscribeEvent(channel.valueBufferChanged)
self._value_buffer_channels = None
[docs] def valueRefBufferChanged(self, channel, value_ref_buffer):
"""Receive value ref buffer updates, pre-process them, and call
the subscribed callback.
:param channel: channel that reports value ref buffer update
:type channel: ExpChannel
:param value_ref_buffer: json encoded value ref buffer update,
it contains at least value refs and indexes
:type value_ref_buffer: :obj:`str`
"""
if value_ref_buffer is None:
return
_, value_ref_buffer = self._value_ref_buffer_codec.decode(
value_ref_buffer)
self._value_ref_buffer_cb(channel, value_ref_buffer)
[docs] def subscribeValueRefBuffer(self, cb=None):
"""Subscribe to channels' value ref buffer update events. If no
callback is passed, the default channel's callback is subscribed which
will store the data in the channel's value_buffer attribute.
:param cb: callback to be subscribed, None means subscribe the default
channel's callback
:type cb: callable
"""
self._value_ref_buffer_channels = []
for channel_info in self.getChannels():
full_name = channel_info["full_name"]
value_ref_enabled = channel_info.get("value_ref_enabled", False)
# Use DeviceProxy instead of taurus to avoid crashes in Py3
# See: tango-controls/pytango#292
if not _is_referable(full_name):
continue
if not value_ref_enabled:
continue
channel = Device(full_name)
value_ref_buffer_obj = channel.getValueRefBufferObj()
if cb is not None:
self._value_ref_buffer_cb = cb
value_ref_buffer_obj.subscribeEvent(
self.valueRefBufferChanged, channel, False)
else:
value_ref_buffer_obj.subscribeEvent(
channel.valueRefBufferChanged, with_first_event=False)
self._value_ref_buffer_channels.append(channel)
[docs] def unsubscribeValueRefBuffer(self, cb=None):
"""Unsubscribe from channels' value ref buffer events. If no
callback is passed, unsubscribe the channel's default callback.
:param cb: callback to be unsubscribed, None means unsubscribe the
default channel's callback
:type cb: callable
"""
for channel_info in self.getChannels():
full_name = channel_info["full_name"]
value_ref_enabled = channel_info.get("value_ref_enabled", False)
# Use DeviceProxy instead of taurus to avoid crashes in Py3
# See: tango-controls/pytango#292
if not _is_referable(full_name):
continue
if not value_ref_enabled:
continue
channel = Device(full_name)
value_ref_buffer_obj = channel.getValueRefBufferObj()
if cb is not None:
value_ref_buffer_obj.unsubscribeEvent(
self.valueRefBufferChanged, channel)
self._value_ref_buffer_cb = None
else:
value_ref_buffer_obj.unsubscribeEvent(
channel.valueRefBufferChanged)
self._value_ref_buffer_channels = None
[docs] def enableChannels(self, channels):
'''Enable acquisition of the indicated channels.
:param channels: (seq<str>) a sequence of strings indicating
channel names
'''
self._enableChannels(channels, True)
[docs] def disableChannels(self, channels):
'''Disable acquisition of the indicated channels.
:param channels: (seq<str>) a sequence of strings indicating
channel names
'''
self._enableChannels(channels, False)
def _enableChannels(self, channels, state):
found = {}
for channel in channels:
found[channel] = False
cfg = self.getConfiguration()
for channel in cfg.getChannels():
name = channel['name']
if name in channels:
channel['enabled'] = state
found[name] = True
wrong_channels = []
for ch, f in list(found.items()):
if f is False:
wrong_channels.append(ch)
if len(wrong_channels) > 0:
msg = 'channels: %s are not present in measurement group' % \
wrong_channels
raise Exception(msg)
self.setConfiguration(cfg.raw_data)
def _start(self, *args, **kwargs):
try:
self.Start()
except DevFailed as e:
# TODO: Workaround for CORBA timeout on measurement group start
# remove it whenever sardana-org/sardana#93 gets implemented
if e.args[-1].reason == "API_DeviceTimedOut":
self.error("start timed out, trying to stop")
self.stop()
self.debug("stopped")
raise e
[docs] def prepare(self):
self.command_inout("Prepare")
[docs] def count_raw(self, start_time=None):
"""Raw count and report count values.
Simply start and wait until finish, no configuration nor preparation.
.. note::
The count_raw method API is partially experimental (value
references may be changed to values whenever possible in the
future). Backwards incompatible changes may occur if deemed
necessary by the core developers.
:param start_time: start time of the whole count operation, if not
passed a current timestamp will be used
:type start_time: :obj:`float`
:return: channel names and values (or value references - experimental)
:rtype: :obj:`dict` where keys are channel full names and values are
channel values (or value references - experimental)
"""
if start_time is None:
start_time = time.time()
PoolElement.go(self)
state = self.getStateEG().readValue()
if state == Fault:
msg = "Measurement group ended acquisition with Fault state"
raise Exception(msg)
values = self.getValues()
ret = state, values
self._total_go_time = time.time() - start_time
return ret
[docs] def go(self, *args, **kwargs):
"""Count and report count values.
Configuration and prepare for measurement, then start and wait until
finish.
.. note::
The count (go) method API is partially experimental (value
references may be changed to values whenever possible in the
future). Backwards incompatible changes may occur if deemed
necessary by the core developers.
:return: channel names and values (or value references - experimental)
:rtype: :obj:`dict` where keys are channel full names and values are
channel values (or value references - experimental)
"""
start_time = time.time()
cfg = self.getConfiguration()
cfg.prepare()
integration_time = args[0]
if integration_time is None or integration_time == 0:
return self.getStateEG().readValue(), self.getValues()
self.putIntegrationTime(integration_time)
self.setMoveable(None)
self.setNbStarts(1)
self.prepare()
return self.count_raw(start_time)
[docs] def count_continuous(self, synchronization, value_buffer_cb=None):
"""Execute measurement process according to the given synchronization
description.
:param synchronization: synchronization description
:type synchronization: list of groups with equidistant synchronizations
:param value_buffer_cb: callback on value buffer updates
:type value_buffer_cb: callable
:return: state and eventually value buffers if no callback was passed
:rtype: tuple<list<DevState>,<list>>
.. todo:: Think of unifying measure with count.
.. note:: The measure method has been included in MeasurementGroup
class on a provisional basis. Backwards incompatible changes
(up to and including removal of the method) may occur if
deemed necessary by the core developers.
"""
start_time = time.time()
cfg = self.getConfiguration()
cfg.prepare()
self.setSynchronization(synchronization)
self.subscribeValueBuffer(value_buffer_cb)
self.count_raw(start_time)
self.unsubscribeValueBuffer(value_buffer_cb)
state = self.getStateEG().readValue()
if state == Fault:
msg = "Measurement group ended acquisition with Fault state"
raise Exception(msg)
if value_buffer_cb is None:
value_buffers = self.getValueBuffers()
else:
value_buffers = None
ret = state, value_buffers
self._total_go_time = time.time() - start_time
return ret
startCount = PoolElement.start
waitCount = PoolElement.waitFinish
count = go
stopCount = PoolElement.abort
stop = PoolElement.stop
[docs]class IORegister(PoolElement):
""" Class encapsulating IORegister functionality."""
def __init__(self, name, **kw):
"""IORegister initialization."""
self.call__init__(PoolElement, name, **kw)
[docs] def getValueObj(self):
return self._getAttrEG('value')
[docs] def readValue(self, force=False):
return self._getAttrValue('value', force=force)
[docs] def startWriteValue(self, new_value, timeout=None):
try:
self.getValueObj().write(new_value)
self.final_val = new_value
except DevFailed as err_traceback:
for err in err_traceback.args:
if err.reason == 'API_AttrNotAllowed':
raise RuntimeError('%s is already chaging' % self)
else:
raise
[docs] def waitWriteValue(self, timeout=None):
pass
[docs] def writeValue(self, new_value, timeout=None):
self.startWriteValue(new_value, timeout=timeout)
self.waitWriteValue(timeout=timeout)
return self.getStateEG().readValue(), self.readValue()
writeIORegister = writeIOR = writeValue
readIORegister = readIOR = getValue = readValue
[docs]class Instrument(BaseElement):
def __init__(self, **kw):
self.__dict__.update(kw)
[docs] def getFullName(self):
return self.full_name
[docs] def getParentInstrument(self):
return self.getPoolObj().getObj(self.parent_instrument)
[docs] def getParentInstrumentName(self):
return self.parent_instrument
[docs] def getChildrenInstruments(self):
raise NotImplementedError
return self._children
[docs] def getElements(self):
raise NotImplementedError
return self._elements
[docs] def getType(self):
return self.klass
[docs]class Pool(TangoDevice, MoveableSource):
""" Class encapsulating device Pool functionality."""
def __init__(self, name, **kw):
self.call__init__(TangoDevice, name, **kw)
self.call__init__(MoveableSource)
self._elements = BaseSardanaElementContainer()
self.__elements_attr = self.getAttribute("Elements")
self.__elements_attr.addListener(self.on_elements_changed)
[docs] def cleanUp(self):
TangoDevice.cleanUp(self)
f = self.factory()
f.removeExistingAttribute(self.__elements_attr)
[docs] def getObject(self, element_info):
elem_type = element_info.getType()
data = element_info._data
if elem_type in ('ControllerClass', 'ControllerLibrary', 'Instrument'):
klass = globals()[elem_type]
kwargs = dict(data)
kwargs['_pool_data'] = data
kwargs['_pool_obj'] = self
return klass(**kwargs)
obj = Factory().getDevice(element_info.full_name, _pool_obj=self,
_pool_data=data)
return obj
[docs] def on_elements_changed(self, evt_src, evt_type, evt_value):
if evt_type == TaurusEventType.Error:
msg = evt_value
if isinstance(msg, DevFailed):
d = msg.args[0]
# skip configuration errors
if d.reason == "API_BadConfigurationProperty":
return
if d.reason in ("API_DeviceNotExported",
"API_CantConnectToDevice"):
msg = "Pool was shutdown or is inaccessible"
else:
msg = "{0}: {1}".format(d.reason, d.desc)
self.warning("Received elements error event %s", msg)
self.debug(evt_value)
return
elif evt_type not in CHANGE_EVT_TYPES:
return
try:
elems = CodecFactory().decode(evt_value.rvalue)
except:
self.error("Could not decode element info")
self.info("value: '%s'", evt_value.rvalue)
self.debug("Details:", exc_info=1)
return
elements = self.getElementsInfo()
for element_data in elems.get('new', ()):
element_data['manager'] = self
element = BaseSardanaElement(**element_data)
elements.addElement(element)
for element_data in elems.get('del', ()):
element = self.getElementInfo(element_data['full_name'])
try:
elements.removeElement(element)
except:
self.warning("Failed to remove %s", element_data)
for element_data in elems.get('change', ()):
# TODO: element is assigned but not used!! (check)
element = self._removeElement(element_data)
element = self._addElement(element_data)
return elems
def _addElement(self, element_data):
element_data['manager'] = self
element = BaseSardanaElement(**element_data)
self.getElementsInfo().addElement(element)
return element
def _removeElement(self, element_data):
name = element_data['full_name']
element = self.getElementInfo(name)
self.getElementsInfo().removeElement(element)
return element
[docs] def getElementsInfo(self):
return self._elements
[docs] def getElements(self):
return self.getElementsInfo().getElements()
[docs] def getElementInfo(self, name):
return self.getElementsInfo().getElement(name)
[docs] def getElementNamesOfType(self, elem_type):
return self.getElementsInfo().getElementNamesOfType(elem_type)
[docs] def getElementsOfType(self, elem_type):
return self.getElementsInfo().getElementsOfType(elem_type)
[docs] def getElementsWithInterface(self, interface):
return self.getElementsInfo().getElementsWithInterface(interface)
[docs] def getElementWithInterface(self, elem_name, interface):
return self.getElementsInfo().getElementWithInterface(elem_name,
interface)
[docs] def getObj(self, name, elem_type=None):
if elem_type is None:
return self.getElementInfo(name)
elif isinstance(elem_type, str):
elem_types = elem_type,
else:
elem_types = elem_type
name = name.lower()
for e_type in elem_types:
elems = self.getElementsOfType(e_type)
for elem in list(elems.values()):
if elem.name.lower() == name:
return elem
elem = elems.get(name)
if elem is not None:
return elem
def __repr__(self):
return self.getNormalName()
def __str__(self):
return repr(self)
# -~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-
# MoveableSource interface
#
[docs] def getMoveable(self, names):
"""getMoveable(seq<string> names) -> Moveable
Returns a moveable object that handles all the moveable items given in
names."""
# if simple motor just return it (if the pool has it)
if isinstance(names, str):
names = names,
if len(names) == 1:
name = names[0]
return self.getObj(name, elem_type=MOVEABLE_TYPES)
# find a motor group that contains elements
moveable = self.__findMotorGroupWithElems(names)
# if none exists create one
if moveable is None:
mgs = self.getElementsOfType('MotorGroup')
i = 1
pid = os.getpid()
while True:
name = "_mg_ms_{0}_{1}".format(pid, i)
exists = False
for mg in list(mgs.values()):
if mg.name == name:
exists = True
break
if not exists:
break
i += 1
moveable = self.createMotorGroup(name, names)
return moveable
def __findMotorGroupWithElems(self, names):
names_lower = list(map(str.lower, names))
len_names = len(names)
mgs = self.getElementsOfType('MotorGroup')
for mg in list(mgs.values()):
mg_elems = mg.elements
if len(mg_elems) != len_names:
continue
for mg_elem, name in zip(mg_elems, names_lower):
if mg_elem.lower() != name:
break
else:
return mg
#
# End of MoveableSource interface
# -~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-~-
def _wait_for_element_in_container(self, container, elem_name, timeout=0.5,
contains=True):
start = time.time()
cond = True
nap = 0.01
if timeout:
nap = timeout / 10
while cond:
elem = container.getElement(elem_name)
if contains:
if elem is not None:
return elem
else:
if elem is None:
return True
if timeout:
dt = time.time() - start
if dt > timeout:
self.info("Timed out waiting for '%s' in container",
elem_name)
return
time.sleep(nap)
[docs] def createMotorGroup(self, mg_name, elements):
params = [mg_name, ] + list(map(str, elements))
self.debug('trying to create motor group for elements: %s', params)
self.command_inout('CreateMotorGroup', params)
elements_info = self.getElementsInfo()
return self._wait_for_element_in_container(elements_info, mg_name)
[docs] def createMeasurementGroup(self, mg_name, elements):
params = [mg_name, ] + list(map(str, elements))
self.debug('trying to create measurement group: %s', params)
self.command_inout('CreateMeasurementGroup', params)
elements_info = self.getElementsInfo()
return self._wait_for_element_in_container(elements_info, mg_name)
[docs] def deleteMeasurementGroup(self, name):
return self.deleteElement(name)
[docs] def createElement(self, name, ctrl, axis=None):
ctrl_type = ctrl.types[0]
if axis is None:
last_axis = ctrl.getLastUsedAxis()
if last_axis is None:
axis = str(1)
else:
axis = str(last_axis + 1)
else:
axis = str(axis)
cmd = "CreateElement"
pars = ctrl_type, ctrl.name, axis, name
self.command_inout(cmd, pars)
elements_info = self.getElementsInfo()
return self._wait_for_element_in_container(elements_info, name)
[docs] def renameElement(self, old_name, new_name):
self.debug('trying to rename element: %s to: %s', old_name, new_name)
self.command_inout('RenameElement', [old_name, new_name])
elements_info = self.getElementsInfo()
return self._wait_for_element_in_container(elements_info, new_name,
contains=True)
[docs] def deleteElement(self, name):
self.debug('trying to delete element: %s', name)
self.command_inout('DeleteElement', name)
elements_info = self.getElementsInfo()
return self._wait_for_element_in_container(elements_info, name,
contains=False)
[docs] def createController(self, class_name, name, *props):
ctrl_class = self.getObj(class_name, elem_type='ControllerClass')
if ctrl_class is None:
raise Exception("Controller class %s not found" % class_name)
cmd = "CreateController"
pars = [ctrl_class.types[0], ctrl_class.file_name, class_name, name]
pars.extend(list(map(str, props)))
self.command_inout(cmd, pars)
elements_info = self.getElementsInfo()
return self._wait_for_element_in_container(elements_info, name)
[docs] def deleteController(self, name):
return self.deleteElement(name)
[docs] def createInstrument(self, full_name, class_name):
self.command_inout("CreateInstrument", [full_name, class_name])
elements_info = self.getElementsInfo()
return self._wait_for_element_in_container(elements_info, full_name)
[docs]def registerExtensions():
factory = Factory()
factory.registerDeviceClass("Pool", Pool)
hw_type_names = [
'Controller',
'ComChannel', 'Motor', 'PseudoMotor', 'TriggerGate',
'CTExpChannel', 'ZeroDExpChannel', 'OneDExpChannel', 'TwoDExpChannel',
'PseudoCounter', 'IORegister', 'MotorGroup', 'MeasurementGroup']
hw_type_map = [(name, globals()[name]) for name in hw_type_names]
for klass_name, klass in hw_type_map:
factory.registerDeviceClass(klass_name, klass)
[docs]def unregisterExtensions():
factory = Factory()
factory.unregisterDeviceClass("Pool")
hw_type_names = [
'Controller',
'ComChannel', 'Motor', 'PseudoMotor', 'TriggerGate',
'CTExpChannel', 'ZeroDExpChannel', 'OneDExpChannel', 'TwoDExpChannel',
'PseudoCounter', 'IORegister', 'MotorGroup', 'MeasurementGroup']
for klass_name in hw_type_names:
factory.unregisterDeviceClass(klass_name)