"""Control the PSLab's I2C bus and devices connected on the bus.
Examples
--------
Set I2C bus speed to 400 kbit/s:
>>> from pslab.bus.i2c import I2CMaster, I2CSlave
>>> bus = I2CMaster()
>>> bus.configure(frequency=4e5)
Scan for connected devices:
>>> bus.scan()
[96, 104]
Connect to the PSLab's built-in DS1307 RTC:
>>> rtc = I2CSlave(address=104)
"""
import logging
from typing import List
import pslab.protocol as CP
from pslab.serial_handler import SerialHandler
from pslab.external.sensorlist import sensors
logger = logging.getLogger(__name__)
[docs]class I2CPrimitive:
"""I2C primitive commands.
Handles all the I2C subcommands coded in pslab-firmware.
Parameters
----------
device : :class:`SerialHandler`, optional
Serial connection to PSLab device. If not provided, a new one will be
created.
"""
_ACK = 0
_READ = 1
_WRITE = 0
def __init__(self, device: SerialHandler = None):
self._device = device if device is not None else SerialHandler()
self._running = False
self._mode = None
def _start(self, address: int, mode: int) -> int:
"""Initiate I2C transfer.
Parameters
----------
address : int
7-bit I2C device address.
mode : {0, 1}
0: write
1: read
Returns
-------
response : int
Response from I2C slave device.
"""
if self._mode == mode:
return self._ACK
self._device.send_byte(CP.I2C_HEADER)
secondary = CP.I2C_START if not self._running else CP.I2C_RESTART
self._device.send_byte(secondary)
self._device.send_byte((address << 1) | mode)
response = self._device.get_ack() >> 4 # ACKSTAT
self._running = True
self._mode = mode
return response
def _restart(self, address: int, mode: int) -> int:
"""Send repeated start.
Reinitiate I2C transfer without stoping.
Parameters
----------
address : int
7-bit I2C device address.
mode : {0, 1}
0: write
1: read
Returns
-------
response : int
Response from I2C slave device.
"""
self._device.send_byte(CP.I2C_HEADER)
self._device.send_byte(CP.I2C_RESTART)
self._device.send_byte((address << 1) | mode)
response = self._device.get_ack() >> 4 # ACKSTAT
self._running = True
self._mode = mode
return response
def _stop(self):
"""Stop I2C transfer."""
if self._running or self._mode:
self._device.send_byte(CP.I2C_HEADER)
self._device.send_byte(CP.I2C_STOP)
self._device.get_ack()
self._running = False
self._mode = None
def _wait(self):
"""Wait for I2C."""
self._device.send_byte(CP.I2C_HEADER)
self._device.send_byte(CP.I2C_WAIT)
self._device.get_ack()
@property
def _status(self):
"""int: Contents of the I2C2STAT register.
bit 15 ACKSTAT: Acknowledge Status
1 = NACK received from slave.
0 = ACK received from slave.
bit 14 TRSTAT: Transmit Status
1 = Master transmit is in progress (8 bits + ACK).
0 = Master transmit is not in progress.
bit 10 BCL: Bus Collision Detect
1 = A bus collision has been detected.
0 = No collision.
bit 7 IWCOL: I2C Write Collision Detect
1 = An attempt to write to the I2C2TRN register failed because
the I2C module is busy.
0 = No collision.
bit 6 I2COV: I2C Receive Overflow Flag
1 = A byte is received while the I2C2RCV register is still
holding the previous byte.
0 = No overflow.
bit 4 P: Stop
1 = Indicates that a Stop bit has been detected last.
0 = Stop bit was not detected last.
bit 3 S: Start
1 = Indicates that a Start (or Repeated Start) bit has been
detected last.
0 = Start bit was not detected last.
bit 1 RBF: Receive Buffer Full Status
1 = Receive completes; the I2C2RCV register is full.
0 = Receive is not complete; the I2C2RCV register is empty.
bit 0 TBF: Transmit Buffer Full Status
1 = Transmit is in progress; the I2C2TRN register is full.
0 = Transmit completes; the I2C2TRN register is empty.
"""
self._device.send_byte(CP.I2C_HEADER)
self._device.send_byte(CP.I2C_STATUS)
status = self._device.get_int()
self._device.get_ack()
return status
def _send_byte(self, data: int) -> int:
"""Send data over I2C.
The I2C bus needs to be initialized and set to the correct slave address first.
It is a primitive I2C method, prefered to use :meth:`I2CSlave.read` and
:meth:`I2CSlave.write`.
Parameters
----------
data : int
An integer that fits in a uint8.
Returns
-------
response : int
Response from I2C slave device.
"""
self._device.send_byte(CP.I2C_HEADER)
self._device.send_byte(CP.I2C_SEND)
self._device.send_byte(data) # data byte
response = self._device.get_ack() >> 4 # ACKSTAT
return response
def _send_byte_burst(self, data: int):
"""Send data over I2C.
The function does not wait for the I2C to finish before returning.
It is used for sending large packets quickly.
The I2C bus needs to be initialized and set to the correct slave address first.
It is a primitive I2C method, prefered to use :meth:`I2CSlave.read` and
:meth:`I2CSlave.write`.
Parameters
----------
data : int
An integer that fits in a uint8.
"""
self._device.send_byte(CP.I2C_HEADER)
self._device.send_byte(CP.I2C_SEND_BURST)
self._device.send_byte(data) # data byte
# No handshake. for the sake of speed.
# e.g. loading a frame buffer onto an I2C display such as ssd1306
def _send(self, bytes_to_write: bytearray):
"""Send data over I2C.
This method uses :meth:`_send_byte_burst` to send all the bytes quickly.
The I2C bus needs to be initialized and set to the correct slave address first.
It is a primitive I2C method, prefered to use :meth:`I2CSlave.read` and
:meth:`I2CSlave.write`.
Parameters
----------
bytes_to_write : bytearray
Send data as a bytearray.
"""
for byte in bytes_to_write:
self._send_byte_burst(byte)
def _write_bulk(self, address: int, bytes_to_write: bytearray):
"""Write data to I2C slave.
It is a primitive I2C method, prefered to use :meth:`I2CSlave.read` and
:meth:`I2CSlave.write`.
Parameters
----------
address : int
7-bit I2C device address.
bytes_to_write : bytearray
Write data as a bytearray.
"""
self._device.send_byte(CP.I2C_HEADER)
self._device.send_byte(CP.I2C_WRITE_BULK)
self._device.send_byte(address)
self._device.send_byte(len(bytes_to_write))
for byte in bytes_to_write:
self._device.send_byte(byte)
self._device.get_ack()
def _read_more(self) -> int:
"""Read data from I2C then send ACK.
The I2C bus needs to be initialized and set to the correct slave address first.
It is a primitive I2C method, prefered to use :meth:`I2CSlave.read` and
:meth:`I2CSlave.write`.
Returns
-------
data : int
A byte interpreted as a uint8.
"""
self._device.send_byte(CP.I2C_HEADER)
self._device.send_byte(CP.I2C_READ_MORE)
data = self._device.get_byte()
self._device.get_ack()
return data
def _read_end(self) -> int:
"""Read data from I2C then send NACK.
The I2C bus needs to be initialized and set to the correct slave address first.
It is a primitive I2C method, prefered to use :meth:`I2CSlave.read` and
:meth:`I2CSlave.write`.
Returns
-------
data : int
A byte interpreted as a uint8.
"""
self._device.send_byte(CP.I2C_HEADER)
self._device.send_byte(CP.I2C_READ_END)
data = self._device.get_byte()
self._device.get_ack()
return data
def _read(self, bytes_to_read: int) -> bytearray:
"""Read data from I2C.
The I2C bus needs to be initialized and set to the correct slave address first.
It is a primitive I2C method, prefered to use :meth:`I2CSlave.read` and
:meth:`I2CSlave.write`.
Parameters
----------
bytes_to_read : int
Number of bytes to read from I2C.
Returns
-------
data : bytearray
Read data as a bytearray.
"""
data = bytearray()
for _ in range(bytes_to_read - 1):
data.append(self._read_more())
data.append(self._read_end())
return data
def _read_bulk(
self, address: int, bytes_to_read: int, register_address: int = 0x0
) -> bytearray:
"""Read data from I2C device.
This method relies on the slave device auto incrementing its internal
pointer after each read. Most devices do, but it is not part of the
I2C standard. Refer to the device's documentation.
It is a primitive I2C method, prefered to use :meth:`I2CSlave.read` and
:meth:`I2CSlave.write`.
Parameters
----------
address : int
7-bit I2C device address.
bytes_to_read : int
Number of bytes to read from slave device.
register_address : int, optional
Slave device internal memory address to read from. The default
value is 0x0.
Returns
-------
data : bytearray
Read data as a bytearray.
"""
self._device.send_byte(CP.I2C_HEADER)
self._device.send_byte(CP.I2C_READ_BULK)
self._device.send_byte(address)
self._device.send_byte(register_address)
self._device.send_byte(bytes_to_read)
data = self._device.read(bytes_to_read)
self._device.get_ack()
return bytearray(data)
[docs]class I2CMaster(I2CPrimitive):
"""I2C bus controller.
Handles slave independent functionality with the I2C port.
Parameters
----------
device : :class:`SerialHandler`, optional
Serial connection to PSLab device. If not provided, a new one will be
created.
"""
_MIN_BRGVAL = 2
_MAX_BRGVAL = 511
# Specs say typical delay is 110 ns to 130 ns; 150 ns from testing.
_SCL_DELAY = 150e-9
def __init__(self, device: SerialHandler = None):
super().__init__(device)
self._device.send_byte(CP.I2C_HEADER)
self._device.send_byte(CP.I2C_INIT)
self._device.get_ack()
self.configure(125e3) # 125 kHz is as low as the PSLab can go.
def _get_i2c_frequency(self, brgval: int) -> float:
return 1 / ((brgval + 2) / CP.CLOCK_RATE + self._SCL_DELAY)
[docs] def scan(self) -> List[int]:
"""Scan I2C port for connected devices.
Returns
-------
addrs : list of int
List of 7-bit addresses on which slave devices replied.
"""
addrs = []
for address in range(1, 128): # 0 is the general call address.
slave = I2CSlave(address, self._device)
if slave.ping():
addrs.append(address)
logger.info(
f"Response from slave on {hex(address)} "
+ f"({sensors.get(address, 'None')})."
)
return addrs
[docs]class I2CSlave(I2CPrimitive):
"""I2C slave device.
Parameters
----------
address : int
7-bit I2C device address.
device : :class:`SerialHandler`, optional
Serial interface for communicating with the PSLab device. If not
provided, a new one will be created.
Attributes
----------
address : int
7-bit I2C device address.
"""
def __init__(
self,
address: int,
device: SerialHandler = None,
):
super().__init__(device)
self.address = address
[docs] def ping(self) -> bool:
"""Ping the I2C slave.
Returns
-------
response : bool
True is slave responded, False otherwise.
"""
response = self._start(self.address, self._READ)
self._stop()
return response == self._ACK
[docs] def read(self, bytes_to_read: int, register_address: int = 0x0) -> bytearray:
"""Read data from I2C device.
This method relies on the slave device auto incrementing its internal
pointer after each read. Most devices do, but it is not part of the
I2C standard. Refer to the device's documentation.
If the slave device does not auto increment, use one of
:meth:`read_byte`, :meth:`read_int`, or :meth:`read_long` instead,
depending on how wide the device registers are.
Parameters
----------
bytes_to_read : int
Number of bytes to read from slave device.
register_address : int, optional
Slave device internal memory address to read from. The default
value is 0x0.
Returns
-------
data : bytearray
Read data as a bytearray.
"""
return self._read_bulk(self.address, bytes_to_read, register_address)
[docs] def read_byte(self, register_address: int = 0x0) -> int:
"""Read a single byte from the I2C slave.
Parameters
----------
register_address : int, optional
Slave device internal memory address to read from. The default
value is 0x0.
Returns
-------
data : int
A byte interpreted as a uint8.
"""
return self.read(1, register_address)[0]
[docs] def read_int(self, register_address: int = 0x0) -> int:
"""Read a two byte value from the I2C slave.
Parameters
----------
register_address : int, optional
Slave device internal memory address to read from. The default
value is 0x0.
Returns
-------
data : int
Two bytes interpreted as a uint16.
"""
data = self.read(2, register_address)
return CP.ShortInt.unpack(data)[0]
[docs] def read_long(self, register_address: int = 0x0) -> int:
"""Read a four byte value from the I2C slave.
Parameters
----------
register_address : int, optional
Slave device internal memory address to read from. The default
value is 0x0.
Returns
-------
data : int
Four bytes interpreted as a uint32.
"""
data = self.read(4, register_address)
return CP.Integer.unpack(data)[0]
[docs] def write(self, bytes_to_write: bytearray, register_address: int = 0x0):
"""Write data to I2C slave.
This method relies on the slave device auto incrementing its internal
pointer after each write. Most devices do, but it is not part of the
I2C standard. Refer to the device's documentation.
If the slave device does not auto increment, use one of
:meth:`write_byte`, :meth:`write_int`, or :meth:`write_long` instead,
depending on how wide the device registers are.
Parameters
----------
bytes_to_write : bytearray
Data to write to the slave.
register_address : int
Slave device internal memory address to write to. The default
value is 0x0.
"""
register_address = bytearray(CP.Byte.pack(register_address))
bytes_to_write = bytearray(bytes_to_write)
self._write_bulk(self.address, register_address + bytes_to_write)
[docs] def write_byte(self, data: int, register_address: int = 0x0):
"""Write a single byte to the I2C slave.
Parameters
----------
data : int
An integer that fits in a uint8.
register_address : int, optional
Slave device internal memory address to write to. The default
value is 0x0.
"""
self.write(CP.Byte.pack(data), register_address)
[docs] def write_int(self, data: int, register_address: int = 0x0):
"""Write a two byte value to the I2C slave.
Parameters
----------
data : int
An integer that fits in a uint16.
register_address : int, optional
Slave device internal memory address to write to. The default
value is 0x0.
"""
self.write(CP.ShortInt.pack(data), register_address)
[docs] def write_long(self, data: int, register_address: int = 0x0):
"""Write a four byte value to the I2C slave.
Parameters
----------
data : int
An integer that fits in a uint32.
register_address : int, optional
Slave device internal memory address to write to. The default
value is 0x0.
"""
self.write(CP.Integer.pack(data), register_address)