# Copyright (c) 2026, Bloomy Controls, Inc. All rights reserved.
#
# Use of this source code is governed by a BSD-3-clause license that can be
# found in the LICENSE file or at https://opensource.org/license/BSD-3-Clause
from ctypes import *
from ctypes.util import find_library
import os
import platform
LIB_NAME = "bs120xenet"
CELL_COUNT = 12
ANALOG_OUTPUT_COUNT = 2
ANALOG_INPUT_COUNT = 8
DIO_COUNT = 8
MAX_CONFIG_BUFFER = 256
def libver_to_str(libver: int) -> str:
return f"{libver // 10000}.{(libver % 10000) // 100}.{libver % 100}"
[docs]
class Bs120xOpModeConfig(Structure):
"""Operational mode configuration for the BS120x."""
_fields_ = [("calibration_mode_enable", c_bool),
("pid_disable", c_bool),
("precision_mode_enable", c_bool)]
[docs]
def get_calibration_mode_enable(self) -> bool:
"""Gets the calibration mode enable state."""
return self.calibration_mode_enable
[docs]
def get_pid_disable(self) -> bool:
"""Gets the PID disable state."""
return self.pid_disable
[docs]
def get_precision_mode_enable(self) -> bool:
"""Gets the precision mode enable state."""
return self.precision_mode_enable
[docs]
def set_calibration_mode_enable(self, val: bool):
"""Sets the calibration mode enable state."""
self.calibration_mode_enable = val
[docs]
def set_pid_disable(self, val: bool):
"""Sets the PID disable state."""
self.pid_disable = val
[docs]
def set_precision_mode_enable(self, val: bool):
"""Sets the precision mode enable state."""
self.precision_mode_enable = val
[docs]
class Bs120xStatus(Structure):
"""Health and status information for the BS120x."""
_fields_ = [("fan_fail_status", c_bool * 4),
("inhibit_state", c_bool),
("temps", c_float * 3)]
[docs]
def get_fan_fail_status(self) -> list[bool]:
"""Returns a list of fan failure statuses. A value of TRUE indicates a fan failure."""
return [i for i in self.fan_fail_status]
[docs]
def get_inhibit_state(self) -> bool:
"""Gets the state of the inhibit."""
return self.inhibit_state
[docs]
def get_temps(self) -> list[float]:
"""Gets the list of temperatures reported in degrees Celsius."""
return [i for i in self.temps]
[docs]
class Bs120xUnitConfig(Structure):
"""Unit configuration information."""
_fields_ = [("serial_number", c_char * 128), ("firmware_version", c_char * 128),
("calibration_date", c_char * 128), ("cell_inhibit_enable", c_bool),
("ip_address", c_char * 32), ("udp_data_port", c_uint16),
("udp_data_period", c_uint32), ("udp_data_broadcast_enable", c_bool),
("box_id", c_uint8), ("can_data_period", c_uint32)]
[docs]
def get_serial_number(self) -> str:
"""Gets the unit serial number."""
return self.serial_number.decode()
[docs]
def get_firmware_version(self) -> str:
"""Gets the unit firmware version."""
return self.firmware_version.decode()
[docs]
def get_calibration_date(self) -> str:
"""Gets the unit calibration date."""
return self.calibration_date.decode()
[docs]
def get_cell_inhibit_enable(self) -> bool:
"""Gets the state of the cell inhibit enable."""
return self.cell_inhibit_enable
[docs]
def get_ip_address(self) ->str:
"""Gets the unit IP address."""
return self.ip_address.decode()
[docs]
def get_udp_data_port(self) -> int:
"""Gets the UDP data port used by the unit."""
return self.udp_data_port
[docs]
def get_udp_data_period(self) -> int:
"""Gets the interval in milliseconds at which data is broadcast over UDP."""
return self.udp_data_period
[docs]
def get_udp_data_broadcast_enable(self) -> bool:
"""Gets the enable state of the UDP data broadcast."""
return self.udp_data_broadcast_enable
[docs]
def get_box_id(self) -> int:
"""Gets the Box ID of the unit."""
return self.box_id
[docs]
def get_can_data_period(self) -> int:
"""Gets the interval in milliseconds at which CAN data is sent."""
return self.can_data_period
[docs]
class Bs120xEnetError(Exception):
"""The client returned an error."""
pass
[docs]
class Bs120xEnet:
"""Client for communicating with a BS120x unit over Ethernet.
Typical usage example:
.. code-block:: python
with Bs120xEnet() as client:
client.connect("192.168.1.105", "192.168.1.101", 54321)
client.set_cell_voltage(0, 1.5)
client.enable_cell(0)
# Allow time for cell to settle
time.sleep(0.1)
print(f"Cell 0 measured voltage: {client.get_cell_voltage(0)} V")
client.disconnect()
"""
def __init__(self, lib: str = LIB_NAME):
"""
Args:
lib: Name of or path to the BS120xEnet DLL. This parameter is optional. if
the DLL is in a discoverable location such as :file:`C:/Windows/System32` or :file:`/usr/lib`,
or if it was installed to the default path by the Windows MSI installer, it will be found
automatically. If it is not automatically found, pass the path to the file to this function.
Raises:
OSError: An error occurred while finding or loading the low-level library.
"""
self.__handle = c_void_p()
load_library_func = cdll.LoadLibrary
if platform.system() == "Windows" and lib is LIB_NAME:
# add the Windows install directory to the lookup path
os.environ['PATH'] += f";{os.environ['ProgramFiles']}/Bloomy Controls/bs120xenet/bin"
lib_path = find_library(lib)
if not lib_path:
raise OSError(f"{lib} library not found")
try:
self.__dll = load_library_func(lib_path)
except OSError:
raise OSError(
f"The Bs120x Ethernet library could not be loaded ({lib_path})"
) from None
self.__lib_version = self.__dll.Bs120xEnet_Version()
def __enter__(self):
self.init()
return self
def __exit__(self, exc_type, exc_value, traceback):
self.cleanup()
def __err_msg(self, err: int) -> str:
"""Get a string describing an error code.
Args:
err: The error code.
Returns:
A message describing the error code.
"""
self.__dll.Bs120xEnet_ErrorMessage.restype = c_char_p
ret = self.__dll.Bs120xEnet_ErrorMessage(err)
return ret.decode()
def __check_err(self, err: int):
"""Check a return value and throw an exception if it's an error.
Args:
err: The error code returned.
Raises:
Bs120xEnetError: The error code is not successful.
"""
if err < 0:
raise Bs120xEnetError(self.__err_msg(err))
def __ensure_ver(self, req_maj: int, req_min: int, req_patch: int):
"""Ensure that the low-level library's version is high enough to support a command.
Args:
req_maj: The required major library version. For example, if v1.2.3 is required,
this value should be 1.
req_min: The required minor library version. For example, if v1.2.3 is required,
this value should be 2.
req_path: The required path library version. For example, if v1.2.3 is required,
this value should be 3.
Raises:
Bs120xEnetError: The required version is not met.
"""
required_ver = req_maj * 10000 + req_min * 100 + req_patch
if self.__lib_version < required_ver:
req_str = libver_to_str(required_ver)
found_str = libver_to_str(self.__lib_version)
raise Bs120xEnetError("Bs120xEnet library is too old! " +
f"Required version {req_str}, " +
f"found version {found_str}.")
[docs]
def init(self):
"""Initialize the client handle.
.. warning::
Should not be called directly! Use a 'with' block instead.
.. code-block:: python
with Bs120xEnet() as client:
...
Raises:
Bs120xEnetError: An error occurred during initialization.
"""
res = self.__dll.Bs120xEnet_Init(byref(self.__handle))
self.__check_err(res)
[docs]
def cleanup(self):
"""Clean up the client handle.
.. warning::
Should not be called directly! Use a 'with' block instead.
.. code-block:: python
with Bs120xEnet() as client:
...
"""
self.__dll.Bs120xEnet_Destroy(byref(self.__handle))
[docs]
def connect(self, interface_ip, device_ip, udp_port, udp_timeout = 1000, tcp_port = 12345):
"""Opens a connection to the BS120x unit.
Args:
interface_ip: The IP address of the local NIC to connect on.
device_ip: The IP address of the BS120x unit to connect to.
udp_port: The UDP data port of the BS120x unit.
udp_timeout: The time in milliseconds to wait to receive data from the BS120x unit. A value of -1 will never time out.
tcp_port: The TCP port to send commands to.
Raises:
Bs120xEnetError: An error occurred while attempting to connect.
"""
res = self.__dll.Bs120xEnet_Connect(self.__handle, interface_ip.encode(),
device_ip.encode(), c_uint16(udp_port),
c_uint32(udp_timeout), c_uint16(tcp_port))
self.__check_err(res)
[docs]
def disconnect(self):
"""Closes TCP and UDP connections to the BS120x unit."""
res = self.__dll.Bs120xEnet_Disconnect(self.__handle)
self.__check_err(res)
[docs]
def set_dio_states(self, output: list[bool], direction: list[bool]):
"""Sets the states and directions of the Bs120x DIO.
Args:
output: list of DIO states. Length must be equal to the number of DIO.
direction: list of DIO directions. Length must be equal to the number of DIO. TRUE configures the DIO as an output and FALSE configures it as an input.
Raises:
Bs120xEnetError: An error occurred while sending the command.
"""
if len(output) != len(direction) != DIO_COUNT:
raise ValueError("Incorrect number of items.")
val_out = (c_bool * len(output))(*output)
val_dir = (c_bool * len(direction))(*direction)
res = self.__dll.Bs120xEnet_SetDIOStates(self.__handle, val_out, c_size_t(len(output)),
val_dir, c_size_t(len(direction)))
self.__check_err(res)
[docs]
def set_analog_outputs(self, voltages: list[float]):
"""Sets the BS120x analog outputs.
Args:
voltages: list of analog output voltages. Length must be equal to the number of analog outputs.
Raises:
Bs120xEnetError: An error occurred while sending the command.
"""
if len(voltages) != ANALOG_OUTPUT_COUNT:
raise ValueError("Incorrect number of items.")
val = (c_float * len(voltages))(*voltages)
res = self.__dll.Bs120xEnet_SetAnalogOutputs(self.__handle, val, c_size_t(len(voltages)))
self.__check_err(res)
[docs]
def set_op_mode_config(self, config: Bs120xOpModeConfig):
"""Configures the BS120x operational mode options.
Args:
config: Bs120xOpModeConfig object containing configuration options.
Raises:
Bs120xEnetError: An error occurred while sending the command.
"""
res = self.__dll.Bs120xEnet_SetOpModeConfig(self.__handle, config)
self.__check_err(res)
[docs]
def set_all_cell_currents(self, isrc: float, isnk: float):
"""Sets the sinking and sourcing current limits for all cells.
Args:
isrc: Sourcing current limit to set in Amps.
isnk: Sinking current limit to set in Amps.
Raises:
Bs120xEnetError: An error occurred while sending the command.
"""
res = self.__dll.Bs120xEnet_SetAllCellCurrents(self.__handle, c_float(isrc), c_float(isnk))
self.__check_err(res)
[docs]
def set_cell_sink_current(self, cell: int, isnk: float):
"""Sets the sinking current limit for a single cell.
Args:
cell: The cell number to set. Cells are 0-indexed.
isnk: Sinking current limit in Amps.
Raises:
Bs120xEnetError: An error occurred while sending the command.
"""
res = self.__dll.Bs120xEnet_SetCellSinkCurrent(self.__handle, c_uint(cell), c_float(isnk))
self.__check_err(res)
[docs]
def set_cell_source_current(self, cell: int, isrc: float):
"""Sets the sourcing current limit for a single cell.
Args:
cell: The cell number to set. Cells are 0-indexed.
isrc: The sourcing current limit in Amps.
Raises:
Bs120xEnetError: An error occurred while sending the command.
"""
res = self.__dll.Bs120xEnet_SetCellSourceCurrent(self.__handle, c_uint(cell), c_float(isrc))
self.__check_err(res)
[docs]
def set_all_cell_voltage(self, voltage: float):
"""Sets the voltage for all cells.
Args:
voltage: The voltage to set.
Raises:
Bs120xEnetError: An error occurred while sending the command.
"""
res = self.__dll.Bs120xEnet_SetAllCellVoltage(self.__handle, c_float(voltage))
self.__check_err(res)
[docs]
def set_cell_voltage(self, cell: int, voltage: float):
"""Sets the voltage for a single cell.
Args:
cell: The cell number to set. Cells are 0-indexed.
voltage: The voltage to set.
Raises:
Bs120xEnetError: An error occurred while sending the command.
"""
res = self.__dll.Bs120xEnet_SetCellVoltage(self.__handle, c_uint(cell), c_float(voltage))
self.__check_err(res)
[docs]
def enable_all_cells(self, enable: bool):
"""Enables or disables all cells.
Args:
enable: A value of TRUE will enable all cells. A value of FALSE will disable all cells.
Raises:
Bs120xEnetError: An error occurred while sending the command.
"""
res = self.__dll.Bs120xEnet_EnableAllCells(self.__handle, c_bool(enable))
self.__check_err(res)
[docs]
def enable_cell(self, cell: int, enable: bool):
"""Enables or disables a single cell.
Args:
cell: The number of the cell to set.
enable: The enable state for the cell. A value of TRUE will enable the cell and a value of FALSE will disable it.
Raises:
Bs120xEnetError: An error occurred while sending the command.
"""
res = self.__dll.Bs120xEnet_EnableCell(self.__handle, c_uint(cell), c_bool(enable))
self.__check_err(res)
[docs]
def get_cell_voltage(self, cell: int) -> float:
"""Gets the voltage for a single cell.
Args:
cell: The number of the cell to read. Cells are 0-indexed.
Returns:
The voltage read at the cell.
Raises:
Bs120xEnetError: An error occurred during readback.
"""
voltage = c_float()
res = self.__dll.Bs120xEnet_GetCellVoltage(self.__handle, c_uint(cell), byref(voltage))
self.__check_err(res)
return voltage.value
[docs]
def get_all_cell_voltage(self) -> list[float]:
"""Gets the voltages for all cells.
Returns:
A list of all cell voltages.
Raises:
Bs120xEnetError: An error occurred during readback.
"""
voltage = (c_float * CELL_COUNT)()
res = self.__dll.Bs120xEnet_GetAllCellVoltage(self.__handle, byref(voltage), c_size_t(CELL_COUNT))
self.__check_err(res)
return voltage[:]
[docs]
def get_cell_current(self, cell: int) -> float:
"""Gets the current for a single cell.
Args:
cell: The number of the cell to read. Cells are 0-indexed.
Returns:
The cell current in Amps.
Raises:
Bs120xEnetError: An error occurred during readback.
"""
current = c_float()
res = self.__dll.Bs120xEnet_GetCellCurrent(self.__handle, c_uint(cell), byref(current))
self.__check_err(res)
return current.value
[docs]
def get_all_cell_current(self) -> list[float]:
"""Gets the currents for all cells.
Returns:
A list of all cell currents in Amps.
Raises:
Bs120xEnetError: An error occurred during readback.
"""
current = (c_float * CELL_COUNT)()
res = self.__dll.Bs120xEnet_GetAllCellCurrent(self.__handle, byref(current), c_size_t(CELL_COUNT))
self.__check_err(res)
return current[:]
[docs]
def get_dio_state(self, dio: int) -> bool:
"""Gets the state of a DIO.
Args:
dio: The index of the DIO to read. DIO are 0-indexed.
Returns:
A Boolean value representing the state of the DIO.
Raises:
Bs120xEnetError: An error occurred during readback.
"""
state = c_bool()
res = self.__dll.Bs120xEnet_GetDIOState(self.__handle, c_uint(dio), byref(state))
self.__check_err(res)
return state.value
[docs]
def get_all_dio_state(self) -> list[bool]:
"""Gets the states of all DIO.
Returns:
A list of all DIO states.
Raises:
Bs120xEnetError: An error occurred during readback.
"""
state = (c_bool * DIO_COUNT)()
res = self.__dll.Bs120xEnet_GetAllDIOState(self.__handle, byref(state), c_size_t(DIO_COUNT))
self.__check_err(res)
return state[:]
[docs]
def get_status(self) -> Bs120xStatus:
"""Gets status information for the unit.
Returns:
A Bs120xStatus object containing health and status information.
Raises:
Bs120xEnetError: An error occurred during readback.
"""
status = Bs120xStatus()
res = self.__dll.Bs120xEnet_GetStatus(self.__handle, byref(status))
self.__check_err(res)
return status
[docs]
def get_unit_config(self) -> Bs120xUnitConfig:
"""Retrieves unit configuration information.
.. note::
query_config must be called prior to calling this method.
Returns:
Bs120xUnitConfig object containing unit configuration values.
Raises:
Bs120xEnetError: An error occured while getting configuration.
"""
config = Bs120xUnitConfig()
res = self.__dll.Bs120xEnet_GetUnitConfig(self.__handle, byref(config))
self.__check_err(res)
return config
[docs]
def reset(self):
"""Resets the BS120x unit. disconnect and connect must be called to re-establish
a connection with the unit.
Raises:
Bs120xEnetError: An error occurred while sending the command.
"""
res = self.__dll.Bs120xEnet_Reset(self.__handle)
self.__check_err(res)
[docs]
def set_ip_address(self, ip: str):
"""Sets the unit's IP address. The unit must be restarted for changes to take
effect.
Args:
ip: The new IP address for the unit. The final octet may not be equal to 1 or 255.
Raises:
Bs120xEnetError: An error occurred while sending the command.
"""
res = self.__dll.Bs120xEnet_SetIPAddress(self.__handle, ip.encode())
self.__check_err(res)
[docs]
def query_config(self):
"""Queries the unit's configuration information.
Raises:
Bs120xEnetError: An error occurred while sending the command.
"""
res = self.__dll.Bs120xEnet_QueryConfig(self.__handle)
self.__check_err(res)
[docs]
def set_config_option(self, opt: str, value):
"""Sets a configuration option by name. Unit must be restarted for changes to take effect.
Args:
opt: The option to set.
value: The value to set the option to. Values may be Boolean, integer, float, or string.
Raises:
Bs120xEnetError: An error occurred while setting the configuration option.
"""
match value:
case bool():
res = self.__dll.Bs120xEnet_SetConfigOptionBool(self.__handle, opt.encode(), c_bool(value))
self.__check_err(res)
case int():
res = self.__dll.Bs120xEnet_SetConfigOptionInt(self.__handle, opt.encode(), c_int(value))
self.__check_err(res)
case str():
res = self.__dll.Bs120xEnet_SetConfigOptionString(self.__handle, opt.encode(), value.encode())
self.__check_err(res)
case float():
res = self.__dll.Bs120xEnet_SetConfigOptionFloat(self.__handle, opt.encode(), c_float(value))
self.__check_err(res)
case _:
raise ValueError("Invalid configuration option type.")
[docs]
def enable_udp_data(self, enable: bool):
"""Enables or disables the UDP data broadcast. Unit must be restarted for changes to take
effect.
Args:
enable: A value of TRUE will enable the UDP data broadcast and a value of FALSE will disable it.
Raises:
Bs120xEnetError: An error occurred while setting the configuration option.
"""
res = self.__dll.Bs120xEnet_EnableUDPData(self.__handle, c_bool(enable))
self.__check_err(res)
[docs]
def set_udp_data_port(self, port: int):
"""Sets the port for the unit to broadcast data on. Unit must be restarted for changes to
take effect
Args:
port: The UDP port to broadcast on.
Raises:
Bs120xEnetError: An error occurred while setting the configuration option.
"""
res = self.__dll.Bs120xEnet_SetUDPDataPort(self.__handle, c_uint16(port))
self.__check_err(res)
[docs]
def set_udp_data_period(self, period: int):
"""Sets the interval at which the unit broadcasts data over UDP. Unit must be restarted
for changes to take effect.
Args:
period: The period in milliseconds at which to broadcast.
Raises:
Bs120xEnetError: An error occurred while setting the configuration option.
"""
res = self.__dll.Bs120xEnet_SetUDPDataPeriod(self.__handle, c_uint32(period))
self.__check_err(res)
[docs]
def set_box_id(self, id: int):
"""Sets the unit's Box ID. Unit must be restarted for changes to take effect.
Args:
id: The new Box ID for the unit. Valid values are 0-15.
Raises:
Bs120xEnetError: An error occurred while setting the configuration option.
"""
res = self.__dll.Bs120xEnet_SetBoxId(self.__handle, c_uint8(id))
self.__check_err(res)
[docs]
def set_can_period(self, period: int):
"""Sets the interval at which the unit transmits data over CAN. Unit must be
restarted for changes to take effect.
Args:
period: The period in milliseconds at which to transmit data.
Raises:
Bs120xEnetError: An error occurred while setting the configuration option.
"""
res = self.__dll.Bs120xEnet_SetCANPeriod(self.__handle, c_uint32(period))
self.__check_err(res)
[docs]
def enable_cell_inhibit(self, enable: bool):
"""Enables or disables the cell inhibit lines.
Args:
enables: A value of TRUE will enable cell inhibit lines and a value of FALSE will disable them.
Raises:
Bs120xEnetError: An error occurred while setting the configuration option.
"""
res = self.__dll.Bs120xEnet_EnableCellInhibit(self.__handle, c_bool(enable))
self.__check_err(res)
[docs]
def get_config_value_bool(self, opt: str) -> bool:
"""Gets a Boolean configuration option value by name.
.. note::
query_config must be called prior to calling this method.
Args:
opt: The configuration option to read.
Returns:
The configuration option value.
Raises:
Bs120xEnetError: An error occurred while reading the configuration option.
"""
val = c_bool()
res = self.__dll.Bs120xEnet_GetConfigValueBool(self.__handle, opt.encode(), byref(val))
self.__check_err(res)
return val.value
[docs]
def get_config_value_int(self, opt: str) -> int:
"""Gets an integer configuration option value by name.
.. note::
query_config must be called prior to calling this method.
Args:
opt: The configuration option to read.
Returns:
The configuration option value.
Raises:
Bs120xEnetError: An error occurred while reading the configuration option.
"""
val = c_int()
res = self.__dll.Bs120xEnet_GetConfigValueInt(self.__handle, opt.encode(), byref(val))
self.__check_err(res)
return val.value
[docs]
def get_config_value_float(self, opt: str) -> float:
"""Getes a floating point configuration option by name.
.. note::
query_config must be called prior to calling this method.
Args:
opt: The configuration option to read.
Returns:
The configuration option value.
Raises:
Bs120xEnetError: An error occurred while reading the configuration option.
"""
val = c_float()
res = self.__dll.Bs120xEnet_GetConfigValueFloat(self.__handle, opt.encode(), byref(val))
self.__check_err(res)
return val.value
[docs]
def get_config_value_string(self, opt: str) -> str:
"""Gets a string configuration option by name.
.. note::
query_config must be called prior to calling this method.
Args:
opt: The configuration option to read.
Returns:
The configuration option value.
Raises:
Bs120xEnetError: An error occurred while reading the configuration option.
"""
val = create_string_buffer(MAX_CONFIG_BUFFER)
res = self.__dll.Bs120xEnet_GetConfigValueString(self.__handle, opt.encode(), val, c_size_t(MAX_CONFIG_BUFFER))
self.__check_err(res)
return val.value.decode()