"""Low-level communication for PSLab.
Example
-------
>>> from pslab.serial_handler import SerialHandler
>>> device = SerialHandler()
>>> version = device.get_version()
>>> device.disconnect()
"""
import logging
import os.path
import platform
import struct
import time
from functools import partial, update_wrapper
from typing import List, Union
import serial
from serial.tools import list_ports
import pslab.protocol as CP
logger = logging.getLogger(__name__)
[docs]def detect():
"""Detect connected PSLab devices.
Returns
-------
devices : dict of str: str
Dictionary containing port name as keys and device version on that
port as values.
"""
regex = []
for vid, pid in zip(SerialHandler._USB_VID, SerialHandler._USB_PID):
regex.append(f"{vid:04x}:{pid:04x}")
regex = "(" + "|".join(regex) + ")"
port_info_generator = list_ports.grep(regex)
pslab_devices = {}
for port_info in port_info_generator:
version = _get_version(port_info.device)
if any(expected in version for expected in ["PSLab", "CSpark"]):
pslab_devices[port_info.device] = version
return pslab_devices
def _get_version(port: str) -> str:
interface = serial.Serial(port=port, baudrate=1e6, timeout=1)
interface.write(CP.COMMON)
interface.write(CP.GET_VERSION)
version = interface.readline()
return version.decode("utf-8")
[docs]class SerialHandler:
"""Provides methods for communicating with the PSLab hardware.
When instantiated, SerialHandler tries to connect to the PSLab. A port can
optionally be specified; otherwise Handler will try to find the correct
port automatically.
Parameters
----------
See :meth:`connect`.
"""
# V5 V6
_USB_VID = [0x04D8, 0x10C4]
_USB_PID = [0x00DF, 0xEA60]
def __init__(
self,
port: str = None,
baudrate: int = 1000000,
timeout: float = 1.0,
):
self._check_udev()
self.version = ""
self._log = b""
self._logging = False
self.interface = serial.Serial()
self.send_byte = partial(self._send, size=1)
update_wrapper(self.send_byte, self._send)
self.send_int = partial(self._send, size=2)
update_wrapper(self.send_int, self._send)
self.get_byte = partial(self._receive, size=1)
update_wrapper(self.get_byte, self._receive)
self.get_int = partial(self._receive, size=2)
update_wrapper(self.get_int, self._receive)
self.get_long = partial(self._receive, size=4)
update_wrapper(self.get_long, self._receive)
self.connect(port=port, baudrate=baudrate, timeout=timeout)
self.connected = self.interface.is_open
@staticmethod
def _check_udev():
if platform.system() == "Linux":
udev_paths = [
"/run/udev/rules.d/",
"/etc/udev/rules.d/",
"/lib/udev/rules.d/",
]
for p in udev_paths:
udev_rules = os.path.join(p, "99-pslab.rules")
if os.path.isfile(udev_rules):
break
else:
e = (
"A udev rule must be installed to access the PSLab. "
+ "Please copy 99-pslab.rules to /etc/udev/rules.d/."
)
raise OSError(e)
@staticmethod
def _list_ports() -> List[str]:
"""Return a list of serial port names."""
return [p.device for p in list_ports.comports()]
[docs] def connect(
self,
port: str = None,
baudrate: int = 1000000,
timeout: float = 1.0,
):
"""Connect to PSLab.
Parameters
----------
port : str, optional
The name of the port to which the PSLab is connected as a string.
On Posix this is a path, e.g. "/dev/ttyACM0". On Windows, it's a
numbered COM port, e.g. "COM5". Will be autodetected if not
specified. If multiple PSLab devices are connected, port must be
specified.
baudrate : int, optional
Symbol rate in bit/s. The default value is 1000000.
timeout : float, optional
Time in seconds to wait before cancelling a read or write command. The
default value is 1.0.
Raises
------
SerialException
If connection could not be established.
RuntimeError
If ultiple devices are connected and no port was specified.
"""
# serial.Serial opens automatically if port is not None.
self.interface = serial.Serial(
port=port,
baudrate=baudrate,
timeout=timeout,
write_timeout=timeout,
)
pslab_devices = detect()
if self.interface.is_open:
# User specified a port.
version = self.get_version()
else:
if len(pslab_devices) == 1:
self.interface.port = list(pslab_devices.keys())[0]
self.interface.open()
version = self.get_version()
elif len(pslab_devices) > 1:
found = ""
for port, version in pslab_devices.items():
found += f"{port}: {version}"
raise RuntimeError(
"Multiple PSLab devices found:\n"
f"{found}"
"Please choose a device by specifying a port."
)
else:
version = ""
if self.interface.port in pslab_devices:
self.version = version
logger.info(f"Connected to {self.version} on {self.interface.port}.")
else:
self.interface.close()
self.version = ""
raise serial.SerialException("Device not found.")
[docs] def disconnect(self):
"""Disconnect from PSLab."""
self.interface.close()
[docs] def reconnect(
self,
port: str = None,
baudrate: int = None,
timeout: float = None,
):
"""Reconnect to PSLab.
Will reuse previous settings (port, baudrate, timeout) unless new ones are
provided.
Parameters
----------
See :meth:`connect`.
"""
self.disconnect()
# Reuse previous settings unless user provided new ones.
baudrate = self.interface.baudrate if baudrate is None else baudrate
port = self.interface.port if port is None else port
timeout = self.interface.timeout if timeout is None else timeout
self.connect(
port=port,
baudrate=baudrate,
timeout=timeout,
)
[docs] def get_version(self) -> str:
"""Query PSLab for its version and return it as a decoded string.
Returns
-------
str
Version string.
"""
self.send_byte(CP.COMMON)
self.send_byte(CP.GET_VERSION)
version = self.interface.readline()
self._write_log(version, "RX")
return version.decode("utf-8")
[docs] def get_ack(self) -> int:
"""Get response code from PSLab.
Returns
-------
int
Response code. Meanings:
0x01 ACK
0x10 I2C ACK
0x20 I2C bus collision
0x10 Radio max retransmits
0x20 Radio not present
0x40 Radio reply timout
"""
response = self.read(1)
if not response:
raise serial.SerialException("Timeout while waiting for ACK.")
ack = CP.Byte.unpack(response)[0]
if not (ack & 0x01):
raise serial.SerialException("Received non ACK byte while waiting for ACK.")
return ack
@staticmethod
def _get_integer_type(size: int) -> struct.Struct:
if size == 1:
return CP.Byte
elif size == 2:
return CP.ShortInt
elif size == 4:
return CP.Integer
else:
raise ValueError("size must be 1, 2, or 4.")
def _send(self, value: Union[bytes, int], size: int):
"""Send a value to the PSLab.
Parameters
----------
value : int
Value to send to PSLab. Must fit in four bytes.
"""
if isinstance(value, bytes):
packet = value
else:
packer = self._get_integer_type(size)
packet = packer.pack(value)
self.write(packet)
def _receive(self, size: int) -> int:
"""Read and unpack data from the serial port.
Returns
-------
int
Unpacked data.
Raises
------
SerialException if too few bytes received.
"""
received = self.read(size)
if len(received) == size:
unpacker = self._get_integer_type(size)
retval = unpacker.unpack(received)[0]
else:
raise serial.SerialException(
f"Requested {size} bytes, got {len(received)}."
)
return retval
[docs] def read(self, number_of_bytes: int) -> bytes:
"""Log incoming bytes.
Wrapper for Serial.read().
Parameters
----------
number_of_bytes : int
Number of bytes to read from the serial port.
Returns
-------
bytes
Bytes read from the serial port.
"""
data = self.interface.read(number_of_bytes)
self._write_log(data, "RX")
return data
[docs] def write(self, data: bytes):
"""Log outgoing bytes.
Wrapper for Serial.write().
Parameters
----------
data : int
Bytes to write to the serial port.
"""
self.interface.write(data)
self._write_log(data, "TX")
def _write_log(self, data: bytes, direction: str):
if self._logging:
self._log += direction.encode() + data + "STOP".encode()
[docs] def wait_for_data(self, timeout: float = 0.2) -> bool:
"""Wait for :timeout: seconds or until there is data in the input buffer.
Parameters
----------
timeout : float, optional
Time in seconds to wait. The default is 0.2.
Returns
-------
bool
True iff the input buffer is not empty.
"""
start_time = time.time()
while time.time() - start_time < timeout:
if self.interface.in_waiting:
return True
time.sleep(0.02)
return False
RECORDED_TRAFFIC = iter([])
"""An iterator returning (request, response) pairs.
The request is checked against data written to the dummy serial port, and if it matches
the response can be read back. Both request and response should be bytes-like.
Intended to be monkey-patched by the calling test module.
"""
[docs]class MockHandler(SerialHandler):
"""Mock implementation of :class:`SerialHandler` for testing.
Parameters
----------
Same as :class:`SerialHandler`.
"""
VERSION = "PSLab vMOCK"
def __init__(
self,
port: str = None,
baudrate: int = 1000000,
timeout: float = 1.0,
):
self._in_buffer = b""
super().__init__(port, baudrate, timeout)
@staticmethod
def _check_udev():
pass
[docs] def connect(
self,
port: str = None,
baudrate: int = 1000000,
timeout: float = 1.0,
):
"""See :meth:`SerialHandler.connect`."""
self.version = self.get_version()
[docs] def disconnect(self):
"""See :meth:`SerialHandler.disconnect`."""
pass
[docs] def reconnect(
self,
port: str = None,
baudrate: int = None,
timeout: float = None,
):
"""See :meth:`SerialHandler.reconnect`."""
pass
[docs] def get_version(self) -> str:
"""Return mock version."""
return self.VERSION
[docs] def read(self, number_of_bytes: int) -> bytes:
"""Mimic the behavior of the serial bus by returning recorded RX traffic.
The returned data depends on how :meth:`write` was called prior to calling
:meth:`read`.
See also :meth:`SerialHandler.read`.
"""
read_bytes = self._in_buffer[:number_of_bytes]
self._in_buffer = self._in_buffer[number_of_bytes:]
return read_bytes
[docs] def write(self, data: bytes):
"""Add recorded RX data to buffer if written data equals recorded TX data.
See also :meth:`SerialHandler.write`.
"""
tx, rx = next(RECORDED_TRAFFIC)
if tx == data:
self._in_buffer += rx
[docs] def wait_for_data(self, timeout: float = 0.2) -> bool:
"""Return True if there is data in buffer, or return False after timeout."""
if self._in_buffer:
return True
else:
time.sleep(timeout)
return False
[docs]class ADCBufferMixin:
"""Mixin for classes that need to read or write to the ADC buffer."""
[docs] def fetch_buffer(self, samples: int, starting_position: int = 0):
"""Fetch a section of the ADC buffer.
Parameters
----------
samples : int
Number of samples to fetch.
starting_position : int, optional
Location in the ADC buffer to start from. By default samples will
be fetched from the beginning of the buffer.
Returns
-------
received : list of int
List of received samples.
"""
self._device.send_byte(CP.COMMON)
self._device.send_byte(CP.RETRIEVE_BUFFER)
self._device.send_int(starting_position)
self._device.send_int(samples)
received = [self._device.get_int() for i in range(samples)]
self._device.get_ack()
return received
[docs] def clear_buffer(self, samples: int, starting_position: int = 0):
"""Clear a section of the ADC buffer.
Parameters
----------
samples : int
Number of samples to clear from the buffer.
starting_position : int, optional
Location in the ADC buffer to start from. By default samples will
be cleared from the beginning of the buffer.
"""
self._device.send_byte(CP.COMMON)
self._device.send_byte(CP.CLEAR_BUFFER)
self._device.send_int(starting_position)
self._device.send_int(samples)
self._device.get_ack()
[docs] def fill_buffer(self, data: List[int], starting_position: int = 0):
"""Fill a section of the ADC buffer with data.
Parameters
----------
data : list of int
Values to write to the ADC buffer.
starting_position : int, optional
Location in the ADC buffer to start from. By default writing will
start at the beginning of the buffer.
"""
self._device.send_byte(CP.COMMON)
self._device.send_byte(CP.FILL_BUFFER)
self._device.send_int(starting_position)
self._device.send_int(len(data))
for value in data:
self._device.send_int(value)
self._device.get_ack()