Add encoder resetting, SPI health check

This commit is contained in:
Konstantin Lampalzer 2021-08-23 22:13:01 +01:00
parent 63b8f868c9
commit 0f103cb34d
6 changed files with 138 additions and 80 deletions

View file

@ -1,8 +1,9 @@
import spidev
from threading import Thread, Lock
from enum import Enum
from enum import IntEnum
import time
from LogstashLogging import logstash_logger
from compLib.LogstashLogging import logstash_logger
SPI_BUS = 1
SPI_DEVICE = 2
@ -17,67 +18,7 @@ spi.bits_per_word = 8
spi_mutex = Lock()
class Spi(object):
@staticmethod
def transfer(tx_buffer: list):
write_reg = tx_buffer[1]
print(tx_buffer)
spi.xfer(tx_buffer)
rx_buffer = spi.xfer([0] * 32)
if rx_buffer[1] != write_reg:
logstash_logger.error(f"SPI error during write to register {tx_buffer[0]}!")
return rx_buffer
@staticmethod
def read(reg: int, length: int):
tx_buf = [0] * SPI_BUFFER_SIZE
tx_buf[0] = 0
tx_buf[1] = reg
tx_buf[2] = length
rx_buf = Spi.transfer(tx_buf)
return int.from_bytes(rx_buf[2:2 + length], byteorder='big', signed=False)
@staticmethod
def write(reg: int, length: int, value: int):
tx_buf = [0] * SPI_BUFFER_SIZE
tx_buf[0] = 1
tx_buf[1] = reg
tx_buf[2] = length
pos = 3
for i in value.to_bytes(length, 'big'):
tx_buf[pos] = i
pos += 1
rx_buf = Spi.transfer(tx_buf)
return int.from_bytes(rx_buf[2:2 + length], byteorder='big', signed=False)
@staticmethod
def write_array(reg: int, length: int, values: list[int]):
tx_buf = [0] * SPI_BUFFER_SIZE
tx_buf[0] = 1
tx_buf[1] = reg
tx_buf[2] = length
pos = 3
for i in values:
tx_buf[pos] = i
pos += 1
rx_buf = Spi.transfer(tx_buf)
return rx_buf
class Register(Enum):
class Register(IntEnum):
IDENTIFICATION_MODEL_ID = 1,
IDENTIFICATION_MODEL_REV_MAJOR = 2,
IDENTIFICATION_MODEL_REV_MINOR = 3,
@ -147,3 +88,87 @@ class Register(Enum):
DISPLAY_LINE_2_C0 = 79,
DISPLAY_LINE_3_C0 = 95,
DISPLAY_LINE_4_C0 = 111
class Spi(object):
@staticmethod
def transfer(tx_buffer: list):
write_reg = tx_buffer[1]
spi.xfer(tx_buffer)
rx_buffer = spi.xfer([0] * 32)
if rx_buffer[1] != write_reg:
logstash_logger.error(f"SPI error during read/write of register {write_reg}!")
return rx_buffer
@staticmethod
def read(reg: int, length: int):
if not type(reg) is int:
reg = int(reg)
tx_buf = [0] * SPI_BUFFER_SIZE
tx_buf[0] = 0
tx_buf[1] = reg
tx_buf[2] = length
rx_buf = Spi.transfer(tx_buf)
return int.from_bytes(rx_buf[2:2 + length], byteorder='big', signed=False)
@staticmethod
def write(reg: int, length: int, value: int):
if not type(reg) is int:
reg = int(reg)
tx_buf = [0] * SPI_BUFFER_SIZE
tx_buf[0] = 1
tx_buf[1] = reg
tx_buf[2] = length
pos = 3
for i in value.to_bytes(length, 'big'):
tx_buf[pos] = i
pos += 1
rx_buf = Spi.transfer(tx_buf)
return int.from_bytes(rx_buf[2:2 + length], byteorder='big', signed=False)
@staticmethod
def write_array(reg: int, length: int, values):
if not type(reg) is int:
reg = int(reg)
tx_buf = [0] * SPI_BUFFER_SIZE
tx_buf[0] = 1
tx_buf[1] = reg
tx_buf[2] = length
pos = 3
for i in values:
tx_buf[pos] = i
pos += 1
rx_buf = Spi.transfer(tx_buf)
return rx_buf
@staticmethod
def health_check():
if Spi.read(Register.IDENTIFICATION_MODEL_ID, 1) != 3:
logstash_logger.error(f"Unable to read Version! Make sure the mainboard is connected!")
quit()
@staticmethod
def health_check_loop():
while True:
Spi.health_check()
time.sleep(0.5)
health_check_thread = Thread(target=Spi.health_check_loop)
health_check_thread.setDaemon(True)
health_check_thread.start()