This repository has been archived on 2025-06-01. You can view files and clone it, but you cannot make any changes to it's state, such as pushing and creating new issues, pull requests or comments.
compLIB/client_s1/compLib/Spi.py
Konstantin Lampalzer 9b567b8c6c Protobuf prototype
2022-03-18 18:11:16 +01:00

222 lines
5.3 KiB
Python

import importlib
from threading import Thread
import multiprocessing
from enum import IntEnum
import time
import sys
import random
from compLib.LogstashLogging import Logging
SPI_BUS = 1
SPI_DEVICE = 2
SPI_SPEED = 4000000
SPI_BUFFER_SIZE = 32
SPI_HEALTH = True
# For development purposes
spi_found = importlib.util.find_spec("spidev") is not None
spi = None
spi_lock = multiprocessing.Lock()
last_spi_call = time.time()
if spi_found:
import spidev
spi = spidev.SpiDev()
spi.open(SPI_BUS, SPI_DEVICE)
spi.max_speed_hz = SPI_SPEED
spi.mode = 0
spi.bits_per_word = 8
class Register(IntEnum):
IDENTIFICATION_MODEL_ID = 1,
IDENTIFICATION_MODEL_REV_MAJOR = 2,
IDENTIFICATION_MODEL_REV_MINOR = 3,
IDENTIFICATION_MODEL_REV_PATCH = 4,
# Motor encoder positions
MOTOR_1_POS_B3 = 10,
MOTOR_1_POS_B2 = 11,
MOTOR_1_POS_B1 = 12,
MOTOR_1_POS_B0 = 13,
MOTOR_2_POS_B3 = 14,
MOTOR_2_POS_B2 = 15,
MOTOR_2_POS_B1 = 16,
MOTOR_2_POS_B0 = 17,
MOTOR_3_POS_B3 = 18,
MOTOR_3_POS_B2 = 19,
MOTOR_3_POS_B1 = 20,
MOTOR_3_POS_B0 = 21,
MOTOR_4_POS_B3 = 22,
MOTOR_4_POS_B2 = 23,
MOTOR_4_POS_B1 = 24,
MOTOR_4_POS_B0 = 25,
# PWM Control Modes
PWM_1_CTRL = 26,
PWM_2_CTRL = 27,
PWM_3_CTRL = 28,
PWM_4_CTRL = 29,
# Motor pwm speed
MOTOR_1_PWM_H = 30,
MOTOR_1_PWM_L = 31,
MOTOR_2_PWM_H = 32,
MOTOR_2_PWM_L = 33,
MOTOR_3_PWM_H = 34,
MOTOR_3_PWM_L = 35,
MOTOR_4_PWM_H = 36,
MOTOR_4_PWM_L = 37,
# Servo goal position
SERVO_1_PWM_H = 38,
SERVO_1_PWM_L = 39,
SERVO_2_PWM_H = 40,
SERVO_2_PWM_L = 41,
SERVO_3_PWM_H = 42,
SERVO_3_PWM_L = 43,
SERVO_4_PWM_H = 44,
SERVO_4_PWM_L = 45,
SERVO_5_PWM_H = 46,
SERVO_5_PWM_L = 47,
SERVO_6_PWM_H = 48,
SERVO_6_PWM_L = 49,
SERVO_7_PWM_H = 50,
SERVO_7_PWM_L = 51,
SERVO_8_PWM_H = 52,
SERVO_8_PWM_L = 53,
# IR Sensor value
IR_1_H = 54,
IR_1_L = 55,
IR_2_H = 56,
IR_2_L = 57,
IR_3_H = 58,
IR_3_L = 59,
IR_4_H = 60,
IR_4_L = 61,
IR_5_H = 62,
IR_5_L = 63,
IR_1_LED = 64,
IR_2_LED = 65,
IR_3_LED = 66,
IR_4_LED = 67,
IR_5_LED = 68,
# Display registers
DISPLAY_LINE_1_C0 = 69,
DISPLAY_LINE_2_C0 = 85,
DISPLAY_LINE_3_C0 = 101,
DISPLAY_LINE_4_C0 = 117
class Spi(object):
@staticmethod
def transfer(tx_buffer: list):
if not spi_found:
return [] * SPI_BUFFER_SIZE
write_reg = tx_buffer[1]
tx_buffer_copy = tx_buffer.copy()
with spi_lock:
spi.xfer(tx_buffer)
rx_buffer = spi.xfer([0] * SPI_BUFFER_SIZE)
if rx_buffer[1] != write_reg:
Logging.get_logger().error(f"Warning! SPI error during read/write of register {write_reg}! Retrying automagically")
time.sleep(random.uniform(0.0, 0.1))
return Spi.transfer(tx_buffer_copy)
global last_spi_call
last_spi_call = time.time()
return rx_buffer
@staticmethod
def read(reg: int, length: int):
return int.from_bytes(Spi.read_array(reg, length), byteorder='big', signed=False)
@staticmethod
def read_array(reg: int, length: int):
if not type(reg) is int:
reg = int(reg)
tx_buf = [0] * SPI_BUFFER_SIZE
tx_buf[0] = 0
tx_buf[1] = reg
tx_buf[2] = length
rx_buf = Spi.transfer(tx_buf)
return rx_buf[2:2 + length]
@staticmethod
def write(reg: int, length: int, value: int):
if not type(reg) is int:
reg = int(reg)
tx_buf = [0] * SPI_BUFFER_SIZE
tx_buf[0] = 1
tx_buf[1] = reg
tx_buf[2] = length
pos = 3
for i in value.to_bytes(length, 'big'):
tx_buf[pos] = i
pos += 1
rx_buf = Spi.transfer(tx_buf)
return int.from_bytes(rx_buf[2:2 + length], byteorder='big', signed=False)
@staticmethod
def write_array(reg: int, length: int, values):
if not type(reg) is int:
reg = int(reg)
tx_buf = [0] * SPI_BUFFER_SIZE
tx_buf[0] = 1
tx_buf[1] = reg
tx_buf[2] = length
pos = 3
for i in values:
tx_buf[pos] = i
pos += 1
rx_buf = Spi.transfer(tx_buf)
return rx_buf
@staticmethod
def health_check():
if Spi.read(Register.IDENTIFICATION_MODEL_ID, 1) != 3:
Logging.get_logger().error(f"Unable to read Version! Make sure the mainboard is connected!")
print("Unable to read Version! Make sure the mainboard is connected!")
@staticmethod
def start_health_check_loop():
health_check_thread = Thread(target=Spi.health_check_loop)
health_check_thread.setDaemon(True)
health_check_thread.start()
@staticmethod
def disable_health_check():
global SPI_HEALTH
SPI_HEALTH = False
@staticmethod
def health_check_loop():
while SPI_HEALTH:
if last_spi_call + 0.5 < time.time():
Spi.health_check()
time.sleep(0.1)
@staticmethod
def get_version():
major = Spi.read(Register.IDENTIFICATION_MODEL_REV_MAJOR, 1)
minor = Spi.read(Register.IDENTIFICATION_MODEL_REV_MINOR, 1)
patch = Spi.read(Register.IDENTIFICATION_MODEL_REV_PATCH, 1)
return f"{major}.{minor}.{patch}"