# Copyright (c) 2024, Bloomy Controls, Inc. All rights reserved.
#
# Use of this source code is governed by a BSD-3-clause license that can be
# found in the LICENSE file or at https://opensource.org/license/BSD-3-Clause
from ctypes import *
from ctypes.util import find_library
from enum import IntEnum, IntFlag
import os
import platform
LIB_NAME = "absscpi"
CELL_COUNT = 8
ANALOG_OUTPUT_COUNT = 8
ANALOG_INPUT_COUNT = 8
DIGITAL_OUTPUT_COUNT = 4
DIGITAL_INPUT_COUNT = 4
GLOBAL_MODEL_INPUT_COUNT = 8
LOCAL_MODEL_INPUT_COUNT = 8
MODEL_OUTPUT_COUNT = 36
[docs]
class AbsCellFault(IntEnum):
"""ABS cell faulting mode."""
NONE = 0
OPEN_CIRCUIT = 1
SHORT_CIRCUIT = 2
POLARITY = 3
[docs]
class AbsCellSenseRange(IntEnum):
"""ABS cell current sense range."""
AUTO = 0
LOW_1A = 1
HIGH_5A = 2
[docs]
class AbsCellMode(IntEnum):
"""ABS cell operating mode."""
CV = 0
ILIM = 1
[docs]
class AbsDeviceInfo(Structure):
"""Basic information about an ABS."""
_fields_ = [("part_number", c_char * 128),
("serial", c_char * 128),
("version", c_char * 128)]
[docs]
def get_part_number(self) -> str:
"""Get the device part number."""
return self.part_number.decode()
[docs]
def get_serial(self) -> str:
"""Get the device serial number."""
return self.serial.decode()
[docs]
def get_version(self) -> str:
"""Get the device software version."""
return self.version.decode()
[docs]
class AbsEthernetConfig(Structure):
"""ABS Ethernet address configuration."""
_fields_ = [("ip", c_char * 32),
("netmask", c_char * 32)]
[docs]
def get_ip_address(self) -> str:
"""Get the IP address."""
return self.ip.decode()
[docs]
def get_netmask(self) -> str:
"""Get the subnet mask."""
return self.netmask.decode()
[docs]
class AbsModelStatus(IntFlag):
"""Bits used to decode the ABS model status."""
RUNNING = 0x01
LOADED = 0x02
ERRORED = 0x04
[docs]
class AbsModelInfo(Structure):
"""Information about a model."""
_fields_ = [("name", c_char * 256),
("version", c_char * 256)]
[docs]
def get_name(self) -> str:
"""Get the model's name."""
return self.name.decode()
[docs]
def get_version(self) -> str:
"""Get the model's version."""
return self.version.decode()
[docs]
class AbsEthernetDiscoveryResult(Structure):
"""ABS Ethernet discovery result."""
_fields_ = [("ip", c_char * 32),
("serial", c_char * 128)]
[docs]
def get_ip_address(self) -> str:
"""Get the device's IP address."""
return self.ip.decode()
[docs]
def get_serial(self) -> str:
"""Get the device's serial number."""
return self.serial.decode()
[docs]
class AbsSerialDiscoveryResult(Structure):
"""ABS serial discovery result."""
_fields_ = [("id", c_uint8),
("serial", c_char * 128)]
[docs]
def get_id(self) -> int:
"""Get the device's serial ID."""
return self.id.value
[docs]
def get_serial(self) -> str:
"""Get the device's serial number."""
return self.serial.decode()
[docs]
class ScpiClientError(Exception):
"""SCPI client returned an error."""
pass
[docs]
class ScpiClient:
"""Client for communicating with the ABS with SCPI.
This class supports UDP, TCP, RS-485, and UDP multicast. When using serial
or multicast, it can broadcast messages to all units on the bus.
Typical usage example:
.. code-block:: python
with ScpiClient() as client:
client.open_udp("192.168.1.100")
client.set_cell_voltage(0, 2.3)
client.enable_cell(0, True)
# give the cell time to settle
time.sleep(0.005)
print(f"Cell 1 measured voltage: {client.measure_cell_voltage(0)}")
To re-open the connection with the same or a different communication layer,
you can simply call the corresponding ``open_*()`` method at any time.
"""
def __init__(self, lib: str = LIB_NAME):
"""
Args:
lib: Name of or path to the ABS SCPI DLL. This parameter is
optional. If the DLL is in a discoverable location such as
:file:`C:/Windows/System32` or :file:`/usr/lib`, or if it was
installed to the default path by the Windows MSI installer, it
will be found automatically. If it is not automatically found,
pass the path to the file to this function.
Raises:
OSError: An error occurred while finding or loading the low-level
library.
"""
self.__handle = c_void_p()
if platform.system() == "Windows":
load_library_func = windll.LoadLibrary
else:
load_library_func = cdll.LoadLibrary
if platform.system() == "Windows" and lib is LIB_NAME:
# add the Windows install directory to the lookup path
os.environ['PATH'] += f";{os.environ['ProgramFiles']}/Bloomy Controls/absscpi/bin"
lib_path = find_library(lib)
if not lib_path:
raise OSError(f"{lib} library not found")
try:
self.__dll = load_library_func(lib_path)
except OSError:
raise OSError(
f"The SCPI library could not be loaded ({lib_path})"
) from None
def __enter__(self):
self.init()
return self
def __exit__(self, exc_type, exc_value, traceback):
self.cleanup()
def __err_msg(self, err: int) -> str:
"""Get a string describing an error code.
Args:
err: The error code.
Returns:
A message describing the error code.
"""
self.__dll.AbsScpiClient_ErrorMessage.restype = c_char_p
ret = self.__dll.AbsScpiClient_ErrorMessage(err)
return ret.decode()
def __check_err(self, err: int):
"""Check a return value and throw an exception if it's an error.
Args:
err: The error code returned by a driver function.
Raises:
ScpiClientError: The error code is not successful.
"""
if err < 0:
raise ScpiClientError(self.__err_msg(err))
[docs]
def init(self):
"""Initialize the client handle.
.. warning::
Should not be called directly! Use a "with" block instead:
.. code-block:: python
with ScpiClient() as client:
...
Raises:
ScpiClientError: An error occurred during initialization.
"""
res = self.__dll.AbsScpiClient_Init(byref(self.__handle))
self.__check_err(res)
[docs]
def cleanup(self):
"""Cleanup the client handle.
.. warning::
Should not be called directly! Use a "with" block instead:
.. code-block:: python
with ScpiClient() as client:
...
"""
self.__dll.AbsScpiClient_Destroy(byref(self.__handle))
[docs]
def open_udp(self, target_ip: str, interface_ip: str | None = None):
"""Open a UDP connection to the ABS.
Args:
target_ip: Target device IP address.
interface_ip: If present, determines the IP address of the local
interface to bind the socket to. When not provided, any local
address may be bound.
Raises:
ScpiClientError: An error occurred while opening the socket.
"""
res = self.__dll.AbsScpiClient_OpenUdp(
self.__handle, target_ip.encode(),
interface_ip.encode() if interface_ip else None)
self.__check_err(res)
[docs]
def open_tcp(self, target_ip: str):
"""Open a TCP connection to the ABS.
Args:
target_ip: Target device IP address.
Raises:
ScpiClientError: An error occurred while attempting to connect.
"""
res = self.__dll.AbsScpiClient_OpenTcp(
self.__handle, target_ip.encode())
self.__check_err(res)
[docs]
def open_serial(self, port: str, device_id: int):
"""Open a serial connection to the device.
Args:
port: Serial port, such as "COM1" or "/dev/ttyS1."
device_id: Device's serial ID, 0-31, or 32+ to address all units
on the bus.
Raises:
ScpiClientError: An error occurred while opening the port.
"""
if device_id < 0:
raise ValueError(f"device ID out of range: {device_id}")
res = self.__dll.AbsScpiClient_OpenSerial(
self.__handle, port.encode(), c_uint(device_id))
self.__check_err(res)
[docs]
def close(self):
"""Close client connection.
It is not an error if the client is not connected.
Raises:
ScpiClientError: An error occurred while closing the connection.
"""
res = self.__dll.AbsScpiClient_Close(self.__handle)
self.__check_err(res)
[docs]
def open_udp_multicast(self, interface_ip: str):
"""Open a UDP multicast socket for broadcasting to many ABSes.
Args:
interface_ip: IP address of the local NIC to bind to.
Raises:
ScpiClientError: An error occurred while opening the socket.
"""
res = self.__dll.AbsScpiClient_OpenUdpMulticast(
self.__handle, interface_ip.encode())
self.__check_err(res)
[docs]
def set_target_device_id(self, device_id: int):
"""Set the target device ID for communications.
Only applies to RS-485 connections.
Args:
device_id: Target device ID, 0-31, or 32+ to broadcast to all
units on the bus.
Raises:
ScpiClientError: An error occurred while setting the ID.
"""
if device_id < 0:
raise ValueError(f"device ID out of range: {device_id}")
res = self.__dll.AbsScpiClient_SetTargetDeviceId(
self.__handle, c_uint(device_id))
self.__check_err(res)
[docs]
def get_target_device_id(self) -> int:
"""Get the target device ID for communications.
Only relevant for RS-485 connections.
Returns:
The target device's ID.
Raises:
ScpiClientError: An error occurred while getting the ID.
"""
dev_id = c_uint()
res = self.__dll.AbsScpiClient_GetTargetDeviceId(
self.__handle, byref(dev_id))
self.__check_err(res)
return dev_id.value
[docs]
def get_device_info(self) -> AbsDeviceInfo:
"""Query basic device information from the device.
Returns:
The device information.
Raises:
ScpiClientError: An error occurred while opening the port.
"""
info = AbsDeviceInfo()
res = self.__dll.AbsScpiClient_GetDeviceInfo(
self.__handle, byref(info))
self.__check_err(res)
return info
[docs]
def get_device_id(self) -> int:
"""Query the device's serial ID.
Returns:
The device's ID.
Raises:
ScpiClientError: An error occurred querying the device.
"""
dev_id = c_uint8()
res = self.__dll.AbsScpiClient_GetDeviceId(
self.__handle, byref(dev_id))
self.__check_err(res)
return dev_id.value
[docs]
def get_ip_address(self) -> AbsEthernetConfig:
"""Query the device's IP address and subnet mask.
Returns:
The Ethernet configuration of the device.
Raises:
ScpiClientError: An error occurred querying the device.
"""
conf = AbsEthernetConfig()
res = self.__dll.AbsScpiClient_GetIPAddress(
self.__handle, byref(conf))
self.__check_err(res)
return conf
[docs]
def set_ip_address(self, ip: str, netmask: str):
"""Set the device's IP address and subnet mask.
For TCP and UDP connections, you must close and reopen the connection.
This can be achieved by simply calling the corresponding open_*()
method again.
Args:
ip: Desired IPv4 address.
netmask: Desired IPv4 subnet mask.
Raises:
ScpiClientError: An error occurred sending the command.
"""
conf = AbsEthernetConfig()
conf.ip = ip.encode()
conf.netmask = netmask.encode()
res = self.__dll.AbsScpiClient_SetIPAddress(
self.__handle, byref(conf))
self.__check_err(res)
return conf
[docs]
def get_calibration_date(self) -> str:
"""Query the device's calibration date.
Returns:
Device's calibration date.
Raises:
ScpiClientError: An error occurred while executing the query.
"""
buf = create_string_buffer(128)
res = self.__dll.AbsScpiClient_GetCalibrationDate(
self.__handle, byref(buf), c_uint(len(buf)))
self.__check_err(res)
return buf.value.decode()
[docs]
def get_error_count(self) -> int:
"""Query the number of errors in the device's error queue.
Returns:
Number of errors in the error queue.
Raises:
ScpiClientError: An error occurred while executing the query.
"""
count = c_int()
res = self.__dll.AbsScpiClient_GetErrorCount(
self.__handle, byref(count))
self.__check_err(res)
return count.value
[docs]
def get_next_error(self) -> tuple[int, str] | None:
"""Query the next error from the device's error queue.
Returns:
A tuple containing the returned error code and message or None if
the error code was 0 (no error).
Raises:
ScpiClientError: An error occurred while executing the query.
"""
buf = create_string_buffer(256)
code = c_int16()
res = self.__dll.AbsScpiClient_GetNextError(
self.__handle, byref(code), byref(buf), c_uint(len(buf)))
self.__check_err(res)
if code.value == 0:
return None
return (code.value, buf.value.decode())
[docs]
def clear_errors(self):
"""Clear the device's error queue.
Raises:
ScpiClientError: An error occurred while sending the command.
"""
res = self.__dll.AbsScpiClient_ClearErrors(self.__handle)
self.__check_err(res)
[docs]
def get_alarms(self) -> int:
"""Query the alarms raised on the device.
Returns:
The alarms bitmask.
Raises:
ScpiClientError: An error occurred while executing the query.
"""
alarms = c_uint32()
res = self.__dll.AbsScpiClient_GetAlarms(self.__handle, byref(alarms))
self.__check_err(res)
return alarms.value
[docs]
def get_interlock_state(self) -> bool:
"""Query the system interlock state. When in interlock, the system will
be put into its PoR state and cannot be controlled until the interlock
is lifted.
Returns:
The interlock state.
Raises:
ScpiClientError: An error occurred while executing the query.
"""
state = c_bool()
res = self.__dll.AbsScpiClient_GetInterlockState(
self.__handle, byref(state))
self.__check_err(res)
return state.value
[docs]
def assert_soft_interlock(self):
"""Assert the software interlock (a recoverable alarm).
The interlock may be cleared using the clear_recoverable_alarms()
method.
Raises:
ScpiClientError: An error occurred while sending the command.
"""
res = self.__dll.AbsScpiClient_AssertSoftwareInterlock(self.__handle)
self.__check_err(res)
[docs]
def clear_recoverable_alarms(self):
"""Clear any recoverable alarms raised on the unit (including software
interlock).
Raises:
ScpiClientError: An error occurred while sending the command.
"""
res = self.__dll.AbsScpiClient_ClearRecoverableAlarms(self.__handle)
self.__check_err(res)
[docs]
def reboot(self):
"""Reboot the device, resetting it to its POR state.
Rebooting takes about 8-10 seconds. If using TCP, you'll need to re-open
the connection after the device has rebooted.
Raises:
ScpiClientError: An error occurred while sending the command.
"""
self.__check_err(self.__dll.AbsScpiClient_Reboot(self.__handle))
[docs]
def enable_cell(self, cell: int, en: bool):
"""Enable or disable a single cell.
Args:
cell: Target cell index, 0-7.
en: Whether to enable the cell.
Raises:
ScpiClientError: An error occurred while enabling the cell.
"""
res = self.__dll.AbsScpiClient_EnableCell(
self.__handle, c_uint(cell), c_bool(en))
self.__check_err(res)
[docs]
def enable_all_cells(self, en: list[bool]):
"""Enable or disable many cells.
Args:
en: List of cell enable states. Must not be longer than the total
cell count.
Raises:
ScpiClientError: An error occurred while sending the command.
"""
if len(en) > CELL_COUNT:
raise ValueError("too many inputs")
elif len(en) == 0:
return
cells_on = 0
cells_off = 0
for i in range(len(en)):
if en[i]:
cells_on |= 1 << i
else:
cells_off |= 1 << i
if cells_on != 0:
res = self.__dll.AbsScpiClient_EnableCellsMasked(
self.__handle, c_uint(cells_on), True)
self.__check_err(res)
if cells_off != 0:
res = self.__dll.AbsScpiClient_EnableCellsMasked(
self.__handle, c_uint(cells_off), False)
self.__check_err(res)
[docs]
def get_cell_enabled(self, cell: int) -> bool:
"""Query the enable state of a single cell.
Args:
cell: Target cell index, 0-7.
Returns:
Whether the cell is enabled.
Raises:
ScpiClientError: An error occurred while executing the query.
"""
en = c_bool()
res = self.__dll.AbsScpiClient_GetCellEnabled(
self.__handle, c_uint(cell), byref(en))
self.__check_err(res)
return en.value
[docs]
def get_all_cells_enabled(self) -> list[bool]:
"""Query the enable state of all cells.
Returns:
List of cell enable states, one per cell.
Raises:
ScpiClientError: An error occurred while executing the query.
"""
states = c_uint()
res = self.__dll.AbsScpiClient_GetCellsEnabledMasked(
self.__handle, byref(states))
self.__check_err(res)
state_list = [False] * CELL_COUNT
for i in range(CELL_COUNT):
if (states.value & (1 << i)) != 0:
state_list[i] = True
return state_list
[docs]
def set_cell_voltage(self, cell: int, voltage: float):
"""Set a single cell's target voltage.
Args:
cell: Target cell index, 0-7.
voltage: Cell voltage.
Raises:
ScpiClientError: An error occurred while sending the command.
"""
res = self.__dll.AbsScpiClient_SetCellVoltage(
self.__handle, c_uint(cell), c_float(voltage))
self.__check_err(res)
[docs]
def set_all_cell_voltages(self, voltages: list[float]):
"""Set all cells' voltages.
Args:
voltages: Array of cell voltages. Must not be empty or longer than
the total cell count.
Raises:
ScpiClientError: An error occurred while sending the command.
"""
vals = (c_float * len(voltages))(*voltages)
res = self.__dll.AbsScpiClient_SetAllCellVoltages(
self.__handle, byref(vals), c_uint(len(voltages)))
self.__check_err(res)
[docs]
def get_cell_voltage_target(self, cell: int) -> float:
"""Query a single cell's target voltage.
Args:
cell: Target cell index, 0-7.
Returns:
The cell's target voltage.
Raises:
ScpiClientError: An error occurred while executing the query.
"""
voltage = c_float()
res = self.__dll.AbsScpiClient_GetCellVoltageTarget(
self.__handle, c_uint(cell), byref(voltage))
self.__check_err(res)
return voltage.value
[docs]
def get_all_cell_voltage_targets(self) -> list[float]:
"""Query all cells' target voltages.
Returns:
An array of voltages, one per cell.
Raises:
ScpiClientError: An error occurred while executing the query.
"""
voltages = (c_float * CELL_COUNT)()
res = self.__dll.AbsScpiClient_GetAllCellVoltageTargets(
self.__handle, byref(voltages), c_uint(CELL_COUNT))
self.__check_err(res)
return voltages[:]
[docs]
def set_cell_sourcing(self, cell: int, limit: float):
"""Set a single cell's current sourcing limit.
Args:
cell: Target cell index, 0-7.
limit: Sourcing limit.
Raises:
ScpiClientError: An error occurred while sending the command.
"""
res = self.__dll.AbsScpiClient_SetCellSourcing(
self.__handle, c_uint(cell), c_float(limit))
self.__check_err(res)
[docs]
def set_all_cell_sourcing(self, limits: list[float]):
"""Set all cells' current sourcing limits.
Args:
limits: Array of current limits. Must not be empty or longer than
the total cell count.
Raises:
ScpiClientError: An error occurred while sending the command.
"""
vals = (c_float * len(limits))(*limits)
res = self.__dll.AbsScpiClient_SetAllCellSourcing(
self.__handle, byref(vals), c_uint(len(limits)))
self.__check_err(res)
[docs]
def get_cell_sourcing_limit(self, cell: int) -> float:
"""Query a single cell's current sourcing limit.
Args:
cell: Target cell index, 0-7.
Returns:
The cell's current sourcing limit.
Raises:
ScpiClientError: An error occurred while executing the query.
"""
limit = c_float()
res = self.__dll.AbsScpiClient_GetCellSourcingLimit(
self.__handle, c_uint(cell), byref(limit))
self.__check_err(res)
return limit.value
[docs]
def get_all_cell_sourcing_limits(self) -> list[float]:
"""Query all cells' current sourcing limits.
Returns:
An array of current sourcing limits, one per cell.
Raises:
ScpiClientError: An error occurred while executing the query.
"""
limits = (c_float * CELL_COUNT)()
res = self.__dll.AbsScpiClient_GetAllCellSourcingLimits(
self.__handle, byref(limits), c_uint(CELL_COUNT))
self.__check_err(res)
return limits[:]
[docs]
def set_cell_sinking(self, cell: int, limit: float):
"""Set a single cell's current sinking limit.
Args:
cell: Target cell index, 0-7.
limit: Sinking limit.
Raises:
ScpiClientError: An error occurred while sending the command.
"""
res = self.__dll.AbsScpiClient_SetCellSinking(
self.__handle, c_uint(cell), c_float(limit))
self.__check_err(res)
[docs]
def set_all_cell_sinking(self, limits: list[float]):
"""Set all cells' current sinking limits.
Args:
limits: Array of current limits. Must not be empty or longer than
the total cell count.
Raises:
ScpiClientError: An error occurred while sending the command.
"""
vals = (c_float * len(limits))(*limits)
res = self.__dll.AbsScpiClient_SetAllCellSinking(
self.__handle, byref(vals), c_uint(len(limits)))
self.__check_err(res)
[docs]
def get_cell_sinking_limit(self, cell: int) -> float:
"""Query a single cell's current sinking limit.
Args:
cell: Target cell index, 0-7.
Returns:
The cell's current sinking limit.
Raises:
ScpiClientError: An error occurred while executing the query.
"""
limit = c_float()
res = self.__dll.AbsScpiClient_GetCellSinkingLimit(
self.__handle, c_uint(cell), byref(limit))
self.__check_err(res)
return limit.value
[docs]
def get_all_cell_sinking_limits(self) -> list[float]:
"""Query all cells' current sinking limits.
Returns:
An array of current sinking limits, one per cell.
Raises:
ScpiClientError: An error occurred while executing the query.
"""
limits = (c_float * CELL_COUNT)()
res = self.__dll.AbsScpiClient_GetAllCellSinkingLimits(
self.__handle, byref(limits), c_uint(CELL_COUNT))
self.__check_err(res)
return limits[:]
[docs]
def set_cell_fault(self, cell: int, fault: AbsCellFault):
"""Set a single cell's faulting state.
Args:
cell: Target cell index, 0-7.
fault: Fault state.
Raises:
ScpiClientError: An error occurred while sending the command.
"""
res = self.__dll.AbsScpiClient_SetCellFault(
self.__handle, c_uint(cell), c_int(fault.value))
self.__check_err(res)
[docs]
def set_all_cell_faults(self, faults: list[AbsCellFault]):
"""Set all cells' faulting states.
Args:
faults: Array of fault states. Must not be empty or longer than the
total cell count.
Raises:
ScpiClientError: An error occurred while sending the command.
"""
vals = (c_int * len(faults))(*faults)
res = self.__dll.AbsScpiClient_SetAllCellFaults(
self.__handle, byref(vals), c_uint(len(faults)))
self.__check_err(res)
[docs]
def get_cell_fault(self, cell: int) -> AbsCellFault:
"""Query a single cell's faulting state.
Args:
cell: Target cell index, 0-7.
Returns:
The cell's faulting state.
Raises:
ScpiClientError: An error occurred while executing the query.
"""
state = c_int()
res = self.__dll.AbsScpiClient_GetCellFault(
self.__handle, c_uint(cell), byref(state))
self.__check_err(res)
return AbsCellFault(state.value)
[docs]
def get_all_cell_faults(self) -> list[AbsCellFault]:
"""Query all cells' faulting states.
Returns:
An array of faulting states, one per cell.
Raises:
ScpiClientError: An error occurred while executing the query.
"""
states = (c_int * CELL_COUNT)()
res = self.__dll.AbsScpiClient_GetAllCellFaults(
self.__handle, byref(states), c_uint(CELL_COUNT))
self.__check_err(res)
return [AbsCellFault(state) for state in states]
[docs]
def set_cell_sense_range(self, cell: int, range_: AbsCellSenseRange):
"""Set a single cell's current sense range.
For most applications, changing this setting manually is unnecessary.
By default, the cell will choose the appropriate sense range based on
its sourcing and sinking current limits.
Args:
cell: Target cell index, 0-7.
range_: Sense range.
Raises:
ScpiClientError: An error occurred while sending the command.
"""
res = self.__dll.AbsScpiClient_SetCellSenseRange(
self.__handle, c_uint(cell), c_int(range_.value))
self.__check_err(res)
[docs]
def set_all_cell_sense_ranges(self, ranges: list[AbsCellSenseRange]):
"""Set all cells' current sense ranges.
For most applications, changing this setting manually is unnecessary.
By default, the cells will choose the appropriate sense range based on
their sourcing and sinking current limits.
Args:
ranges: Array of sense ranges. Must not be empty or longer than the
total cell count.
Raises:
ScpiClientError: An error occurred while sending the command.
"""
vals = (c_int * len(ranges))(*ranges)
res = self.__dll.AbsScpiClient_SetAllCellSenseRanges(
self.__handle, byref(vals), c_uint(len(ranges)))
self.__check_err(res)
[docs]
def get_cell_sense_range(self, cell: int) -> AbsCellSenseRange:
"""Query a single cell's current sense range.
Args:
cell: Target cell index, 0-7.
Returns:
The cell's current sense range.
Raises:
ScpiClientError: An error occurred while executing the query.
"""
range_ = c_int()
res = self.__dll.AbsScpiClient_GetCellSenseRange(
self.__handle, c_uint(cell), byref(range_))
self.__check_err(res)
return AbsCellSenseRange(range_.value)
[docs]
def get_all_cell_sense_ranges(self) -> list[AbsCellSenseRange]:
"""Query all cells' current sense ranges.
Returns:
An array of sense ranges, one per cell.
Raises:
ScpiClientError: An error occurred while executing the query.
"""
ranges = (c_int * CELL_COUNT)()
res = self.__dll.AbsScpiClient_GetAllCellSenseRanges(
self.__handle, byref(ranges), c_uint(CELL_COUNT))
self.__check_err(res)
return [AbsCellSenseRange(r) for r in ranges]
[docs]
def enable_cell_noise_filter(self, en: bool):
"""Enable or disable the cell 50/60Hz noise filter.
This mode filters 50/60Hz noise and increases cell measurement accuracy,
but decreases the cell control rate to 10Hz.
Args:
en: Desired filter state.
Raises:
ScpiClientError: An error occurred while sending the command.
"""
res = self.__dll.AbsScpiClient_EnableCellNoiseFilter(
self.__handle, c_bool(en))
self.__check_err(res)
[docs]
def get_cell_noise_filter_enabled(self) -> bool:
"""Query the enable state of the cell 50/60Hz noise filter.
Returns:
The state of the noise filter.
Raises:
ScpiClientError: An error occurred while executing the query.
"""
en = c_bool()
res = self.__dll.AbsScpiClient_GetCellNoiseFilterEnabled(
self.__handle, byref(en))
self.__check_err(res)
return en.value
[docs]
def measure_cell_voltage(self, cell: int) -> float:
"""Measure a single cell's voltage.
Args:
cell: Target cell index, 0-7.
Returns:
Measured cell voltage.
Raises:
ScpiClientError: An error occurred while executing the query.
"""
voltage = c_float()
res = self.__dll.AbsScpiClient_MeasureCellVoltage(
self.__handle, c_uint(cell), byref(voltage))
self.__check_err(res)
return voltage.value
[docs]
def measure_all_cell_voltages(self) -> list[float]:
"""Measure all cell voltages.
Returns:
Array of voltages, one per cell.
Raises:
ScpiClientError: An error occurred while executing the query.
"""
voltages = (c_float * CELL_COUNT)()
res = self.__dll.AbsScpiClient_MeasureAllCellVoltages(
self.__handle, byref(voltages), c_uint(CELL_COUNT))
self.__check_err(res)
return voltages[:]
[docs]
def measure_cell_current(self, cell: int) -> float:
"""Measure a single cell's current.
Args:
cell: Target cell index, 0-7.
Returns:
Measured cell current.
Raises:
ScpiClientError: An error occurred while executing the query.
"""
current = c_float()
res = self.__dll.AbsScpiClient_MeasureCellCurrent(
self.__handle, c_uint(cell), byref(current))
self.__check_err(res)
return current.value
[docs]
def measure_all_cell_currents(self) -> list[float]:
"""Measure all cell currents.
Returns:
Array of currents, one per cell.
Raises:
ScpiClientError: An error occurred while executing the query.
"""
currents = (c_float * CELL_COUNT)()
res = self.__dll.AbsScpiClient_MeasureAllCellCurrents(
self.__handle, byref(currents), c_uint(CELL_COUNT))
self.__check_err(res)
return currents[:]
[docs]
def get_cell_operating_mode(self, cell: int) -> AbsCellMode:
"""Query a single cell's operating mode (constant voltage or current
limited).
Args:
cell: Target cell index, 0-7.
Returns:
The cell's operating mode.
Raises:
ScpiClientError: An error occurred while executing the query.
"""
mode = c_int()
res = self.__dll.AbsScpiClient_GetCellOperatingMode(
self.__handle, c_uint(cell), byref(mode))
self.__check_err(res)
return AbsCellMode(mode.value)
[docs]
def get_all_cell_operating_modes(self) -> list[AbsCellMode]:
"""Query all cells' operating modes (constant voltage or current
limited).
Returns:
An array of cell operating modes, one per cell.
Raises:
ScpiClientError: An error occurred while executing the query.
"""
modes = (c_int * CELL_COUNT)()
res = self.__dll.AbsScpiClient_GetAllCellOperatingModes(
self.__handle, byref(modes), c_uint(CELL_COUNT))
self.__check_err(res)
return [AbsCellMode(m) for m in modes]
[docs]
def set_analog_output(self, channel: int, voltage: float):
"""Set a single analog output voltage.
Args:
channel: Target channel index, 0-7.
voltage: Target voltage.
Raises:
ScpiClientError: An error occurred while executing the command.
"""
res = self.__dll.AbsScpiClient_SetAnalogOutput(
self.__handle, c_uint(channel), c_float(voltage))
self.__check_err(res)
[docs]
def set_all_analog_outputs(self, voltages: list[float]):
"""Set all analog output voltages.
Args:
voltages: An array of voltages, one per channel. Must not be empty
or longer than the total channel count.
Raises:
ScpiClientError: An error occurred while executing the command.
"""
vals = (c_float * len(voltages))(*voltages)
res = self.__dll.AbsScpiClient_SetAllAnalogOutputs(
self.__handle, byref(vals), c_uint(len(voltages)))
self.__check_err(res)
[docs]
def get_analog_output(self, channel: int) -> float:
"""Query an analog output's set point.
Args:
channel: Target channel index, 0-7.
Returns:
The analog output's voltage.
Raises:
ScpiClientError: An error occurred while executing the query.
"""
voltage = c_float()
res = self.__dll.AbsScpiClient_GetAnalogOutput(
self.__handle, c_uint(channel), byref(voltage))
self.__check_err(res)
return voltage.value
[docs]
def get_all_analog_outputs(self) -> list[float]:
"""Query all analog output voltages.
Returns:
An array of voltages, one per channel.
Raises:
ScpiClientError: An error occurred while executing the query.
"""
voltages = (c_float * ANALOG_OUTPUT_COUNT)()
res = self.__dll.AbsScpiClient_GetAllAnalogOutputs(
self.__handle, byref(voltages), c_uint(ANALOG_OUTPUT_COUNT))
self.__check_err(res)
return voltages[:]
[docs]
def set_digital_output(self, channel: int, level: bool):
"""Set a single digital output.
Args:
channel: Target channel index, 0-3.
level: Desired output level.
Raises:
ScpiClientError: An error occurred while sending the command.
"""
res = self.__dll.AbsScpiClient_SetDigitalOutput(
self.__handle, c_uint(channel), c_bool(level))
self.__check_err(res)
[docs]
def set_all_digital_outputs(self, levels: list[bool]):
"""Set all digital outputs.
Args:
levels: An array of output levels, one per channel. Must not be
longer than the total channel count.
Raises:
ScpiClientError: An error occurred while executing the query.
"""
if len(levels) > DIGITAL_OUTPUT_COUNT:
raise ValueError("too many inputs")
elif len(levels) == 0:
return
mask = 0
for i in range(len(levels)):
if levels[i]:
mask |= (1 << i)
res = self.__dll.AbsScpiClient_SetAllDigitalOutputs(
self.__handle, c_uint(mask))
self.__check_err(res)
[docs]
def get_digital_output(self, channel: int) -> bool:
"""Query the state of a single digital output.
Args:
channel: Target channel index, 0-3.
Returns:
The state of the digital output.
Raises:
ScpiClientError: An error occurred while executing the query.
"""
state = c_bool()
res = self.__dll.AbsScpiClient_GetDigitalOutput(
self.__handle, c_uint(channel), byref(state))
self.__check_err(res)
return state.value
[docs]
def get_all_digital_outputs(self) -> list[bool]:
"""Query the states of all digital outputs.
Returns:
An array of states, one per output.
Raises:
ScpiClientError: An error occurred while executing the query.
"""
mask = c_uint()
res = self.__dll.AbsScpiClient_GetAllDigitalOutputs(
self.__handle, byref(mask))
self.__check_err(res)
m = mask.value
return [(m & (1 << i)) != 0 for i in range(DIGITAL_OUTPUT_COUNT)]
[docs]
def get_model_status(self) -> AbsModelStatus:
"""Query the model status bits.
Returns:
Model status.
Raises:
ScpiClientError: An error occurred while executing the query.
"""
val = c_uint8()
res = self.__dll.AbsScpiClient_GetModelStatus(self.__handle, byref(val))
self.__check_err(res)
return AbsModelStatus(val.value)
[docs]
def load_model(self):
"""Load the model configuration on the device.
Raises:
ScpiClientError: An error occurred while sending the command.
"""
self.__check_err(self.__dll.AbsScpiClient_LoadModel(self.__handle))
[docs]
def start_model(self):
"""Start modeling.
Raises:
ScpiClientError: An error occurred while sending the command.
"""
self.__check_err(self.__dll.AbsScpiClient_StartModel(self.__handle))
[docs]
def stop_model(self):
"""Stop modeling.
Raises:
ScpiClientError: An error occurred while sending the command.
"""
self.__check_err(self.__dll.AbsScpiClient_StopModel(self.__handle))
[docs]
def unload_model(self):
"""Unload the model configuration.
Raises:
ScpiClientError: An error occurred while sending the command.
"""
self.__check_err(self.__dll.AbsScpiClient_UnloadModel(self.__handle))
[docs]
def get_model_info(self) -> AbsModelInfo:
"""Query information about the model.
Returns:
Model information.
Raises:
ScpiClientError: An error occurred while executing the query.
"""
info = AbsModelInfo()
res = self.__dll.AbsScpiClient_GetModelInfo(self.__handle, byref(info))
self.__check_err(res)
return info
[docs]
def get_model_output(self, index: int) -> float:
"""Query a single model output.
Args:
index: Output index, 0-35.
Returns:
The model output.
Raises:
ScpiClientError: An error occurred while executing the query.
"""
val = c_float()
res = self.__dll.AbsScpiClient_GetModelOutput(
self.__handle, c_uint(index), byref(val))
self.__check_err(res)
return val.value
[docs]
def get_all_model_outputs(self) -> list[float]:
"""Query all model outputs.
Returns:
A list of all model outputs.
Raises:
ScpiClientError: An error occurred while executing the query.
"""
values = (c_float * MODEL_OUTPUT_COUNT)()
res = self.__dll.AbsScpiClient_GetAllModelOutputs(
self.__handle, byref(values), c_uint(MODEL_OUTPUT_COUNT))
self.__check_err(res)
return values[:]
[docs]
def multicast_discovery(
self,
interface_ip: str,
) -> list[AbsEthernetDiscoveryResult]:
"""Use UDP multicast to discover ABSes on the network.
This function does not require the ScpiClient to be initialized or
connected.
Args:
interface_ip: IP address of the local NIC to bind to.
Returns:
List of discovered devices.
Raises:
ScpiClientError: An error occurred during discovery.
"""
results = (AbsEthernetDiscoveryResult * 64)()
count = c_uint(len(results))
res = self.__dll.AbsScpiClient_MulticastDiscovery(
interface_ip.encode(), byref(results), byref(count))
self.__check_err(res)
return results[:count.value]
[docs]
def serial_discovery(
self,
port: str,
first_id: int = 0,
last_id: int = 31,
) -> list[AbsSerialDiscoveryResult]:
"""Use RS-485 to discover ABSes on the bus.
This function requires that the ScpiClient *not* be connected over
serial! This will interfere with opening the serial port.
Args:
port: Serial port to use, such as COM1 or /dev/ttyS0.
first_id: First serial ID to check, 0-31.
last_id: Last serial ID to check (inclusive), 0-31. Must not be
less than first_id.
Returns:
List of discovered devices.
Raises:
ScpiClientError: An error occurred during discovery.
"""
if last_id < 0 or last_id > 31 or first_id < 0 or first_id > 31:
raise ValueError("invalid ID")
elif last_id < first_id:
raise ValueError("last ID cannot be less than first ID")
count = c_uint(last_id - first_id + 1)
results = (AbsSerialDiscoveryResult * count.value)()
res = self.__dll.AbsScpiClient_SerialDiscovery(
port.encode(), c_uint8(first_id), c_uint8(last_id),
byref(results), byref(count))
self.__check_err(res)
return results[:count.value]