import spidev from threading import Thread, Lock from enum import IntEnum import time import sys from compLib.LogstashLogging import Logging SPI_BUS = 1 SPI_DEVICE = 2 SPI_SPEED = 1000000 SPI_BUFFER_SIZE = 32 spi = spidev.SpiDev() spi.open(SPI_BUS, SPI_DEVICE) spi.max_speed_hz = SPI_SPEED spi.mode = 0 spi.bits_per_word = 8 spi_mutex = Lock() class Register(IntEnum): IDENTIFICATION_MODEL_ID = 1, IDENTIFICATION_MODEL_REV_MAJOR = 2, IDENTIFICATION_MODEL_REV_MINOR = 3, IDENTIFICATION_MODEL_REV_PATCH = 4, # Motor encoder positions MOTOR_1_POS_B3 = 10, MOTOR_1_POS_B2 = 11, MOTOR_1_POS_B1 = 12, MOTOR_1_POS_B0 = 13, MOTOR_2_POS_B3 = 14, MOTOR_2_POS_B2 = 15, MOTOR_2_POS_B1 = 16, MOTOR_2_POS_B0 = 17, MOTOR_3_POS_B3 = 18, MOTOR_3_POS_B2 = 19, MOTOR_3_POS_B1 = 20, MOTOR_3_POS_B0 = 21, MOTOR_4_POS_B3 = 22, MOTOR_4_POS_B2 = 23, MOTOR_4_POS_B1 = 24, MOTOR_4_POS_B0 = 25, # PWM Control Modes PWM_1_CTRL = 26, PWM_2_CTRL = 27, PWM_3_CTRL = 28, PWM_4_CTRL = 29, # Motor pwm speed MOTOR_1_PWM_H = 30, MOTOR_1_PWM_L = 31, MOTOR_2_PWM_H = 32, MOTOR_2_PWM_L = 33, MOTOR_3_PWM_H = 34, MOTOR_3_PWM_L = 35, MOTOR_4_PWM_H = 36, MOTOR_4_PWM_L = 37, # Servo goal position SERVO_1_PWM_H = 38, SERVO_1_PWM_L = 39, SERVO_2_PWM_H = 40, SERVO_2_PWM_L = 41, SERVO_3_PWM_H = 42, SERVO_3_PWM_L = 43, SERVO_4_PWM_H = 44, SERVO_4_PWM_L = 45, SERVO_5_PWM_H = 46, SERVO_5_PWM_L = 47, SERVO_6_PWM_H = 48, SERVO_6_PWM_L = 49, SERVO_7_PWM_H = 50, SERVO_7_PWM_L = 51, SERVO_8_PWM_H = 52, SERVO_8_PWM_L = 53, # IR Sensor value IR_1_H = 54, IR_1_L = 55, IR_2_H = 56, IR_2_L = 57, IR_3_H = 58, IR_3_L = 59, IR_4_H = 60, IR_4_L = 61, IR_5_H = 62, IR_5_L = 63, IR_1_LED = 64, IR_2_LED = 65, IR_3_LED = 66, IR_4_LED = 67, IR_5_LED = 68, # Display registers DISPLAY_LINE_1_C0 = 69, DISPLAY_LINE_2_C0 = 85, DISPLAY_LINE_3_C0 = 101, DISPLAY_LINE_4_C0 = 117 class Spi(object): @staticmethod def transfer(tx_buffer: list): write_reg = tx_buffer[1] spi.xfer(tx_buffer) rx_buffer = spi.xfer([0] * 32) if rx_buffer[1] != write_reg: Logging.get_logger().error(f"SPI error during read/write of register {write_reg}!") return rx_buffer @staticmethod def read(reg: int, length: int): if not type(reg) is int: reg = int(reg) tx_buf = [0] * SPI_BUFFER_SIZE tx_buf[0] = 0 tx_buf[1] = reg tx_buf[2] = length rx_buf = Spi.transfer(tx_buf) return int.from_bytes(rx_buf[2:2 + length], byteorder='big', signed=False) @staticmethod def write(reg: int, length: int, value: int): if not type(reg) is int: reg = int(reg) tx_buf = [0] * SPI_BUFFER_SIZE tx_buf[0] = 1 tx_buf[1] = reg tx_buf[2] = length pos = 3 for i in value.to_bytes(length, 'big'): tx_buf[pos] = i pos += 1 rx_buf = Spi.transfer(tx_buf) return int.from_bytes(rx_buf[2:2 + length], byteorder='big', signed=False) @staticmethod def write_array(reg: int, length: int, values): if not type(reg) is int: reg = int(reg) tx_buf = [0] * SPI_BUFFER_SIZE tx_buf[0] = 1 tx_buf[1] = reg tx_buf[2] = length pos = 3 for i in values: tx_buf[pos] = i pos += 1 rx_buf = Spi.transfer(tx_buf) return rx_buf @staticmethod def health_check(): if Spi.read(Register.IDENTIFICATION_MODEL_ID, 1) != 3: Logging.get_logger().error(f"Unable to read Version! Make sure the mainboard is connected!") print("Unable to read Version! Make sure the mainboard is connected!") sys.exit() @staticmethod def health_check_loop(): while True: Spi.health_check() time.sleep(0.5) health_check_thread = Thread(target=Spi.health_check_loop) health_check_thread.setDaemon(True) health_check_thread.start()