diff --git a/compLib/Display.py b/compLib/Display.py index d27b49e..3253f16 100644 --- a/compLib/Display.py +++ b/compLib/Display.py @@ -25,13 +25,13 @@ class Display(object): to_write[i] = ord(text[i]) if line == 1: - Spi.write(Register.DISPLAY_LINE_1_C0, CHARS_PER_LINE, to_write) + Spi.write_array(Register.DISPLAY_LINE_1_C0, CHARS_PER_LINE, to_write) elif line == 2: - Spi.write(Register.DISPLAY_LINE_2_C0, CHARS_PER_LINE, to_write) + Spi.write_array(Register.DISPLAY_LINE_2_C0, CHARS_PER_LINE, to_write) elif line == 3: - Spi.write(Register.DISPLAY_LINE_3_C0, CHARS_PER_LINE, to_write) + Spi.write_array(Register.DISPLAY_LINE_3_C0, CHARS_PER_LINE, to_write) elif line == 4: - Spi.write(Register.DISPLAY_LINE_4_C0, CHARS_PER_LINE, to_write) + Spi.write_array(Register.DISPLAY_LINE_4_C0, CHARS_PER_LINE, to_write) @staticmethod def clear(): diff --git a/compLib/Encoder.py b/compLib/Encoder.py index ed61c99..eddc77f 100644 --- a/compLib/Encoder.py +++ b/compLib/Encoder.py @@ -7,13 +7,15 @@ from compLib.Spi import Spi, Register MOTOR_COUNT = 4 +encoder_start_values = [0] * (MOTOR_COUNT + 1) + class Encoder(object): """Class used to read the encoders """ @staticmethod - def read(port: int) -> int: - """Read encoder from a specified port + def read_raw(port: int) -> int: + """Read raw encoder from a specified port. Will not be reset to 0 at start. :param port: Port, which the motor is connected to. 1-4 allowed :raises: IndexError @@ -30,8 +32,32 @@ class Encoder(object): return Spi.read(Register.MOTOR_3_POS_B3, 4) elif port == 4: return Spi.read(Register.MOTOR_4_POS_B3, 4) + + @staticmethod + def read(port: int) -> int: + """Read encoder from a specified port + + :param port: Port, which the motor is connected to. 1-4 allowed + :raises: IndexError + :return: Current encoder position + """ + if port <= 0 or port > MOTOR_COUNT: + raise IndexError("Invalid encoder port specified!") + + return Encoder.read_raw(port) - encoder_start_values[port] @staticmethod - def reset(port: int): - # TODO implement registers and me - pass + def clear(port: int): + """Reset encoder position to 0 + + :param port: Port, which the motor is connected to. 1-4 allowed + :raises: IndexError + """ + if port <= 0 or port > MOTOR_COUNT: + raise IndexError("Invalid encoder port specified!") + + encoder_start_values[port] = Encoder.read_raw(port) + + +for i in range(1, MOTOR_COUNT + 1): + encoder_start_values[i] = Encoder.read_raw(i) \ No newline at end of file diff --git a/compLib/IRSensor.py b/compLib/IRSensor.py index c12e746..01b3abe 100644 --- a/compLib/IRSensor.py +++ b/compLib/IRSensor.py @@ -22,8 +22,10 @@ class IRSensor(object): return Spi.read(Register.IR_3_H, 2) elif sensor == 4: return Spi.read(Register.IR_4_H, 2) - - return 0 + elif sensor == 5: + return Spi.read(Register.IR_5_H, 2) + else: + return 0 @staticmethod def set(sensor: int, on: bool): @@ -38,3 +40,5 @@ class IRSensor(object): Spi.write(Register.IR_3_LED, on) elif sensor == 4: Spi.write(Register.IR_4_LED, on) + elif sensor == 5: + Spi.write(Register.IR_5_LED, on) \ No newline at end of file diff --git a/compLib/Motor.py b/compLib/Motor.py index 45acc33..5eca21b 100644 --- a/compLib/Motor.py +++ b/compLib/Motor.py @@ -1,5 +1,5 @@ import atexit -from enum import Enum +from enum import IntEnum from compLib.LogstashLogging import Logging from compLib.Spi import Spi, Register @@ -9,7 +9,7 @@ MAX_MOTOR_SPEED = 65535 MOTOR_PERCENTAGE_MULT = MAX_MOTOR_SPEED / 100.0 -class MotorMode(Enum): +class MotorMode(IntEnum): COAST = 0, FORWARD = 1, BACKWARD = 2, @@ -34,16 +34,16 @@ class Motor(object): if port == 1: Spi.write(Register.MOTOR_1_PWM_H, 2, pwm) - Spi.write(Register.MOTOR_1_CTRL, 1, mode) + Spi.write(Register.MOTOR_1_CTRL, 1, int(mode)) elif port == 2: Spi.write(Register.MOTOR_2_PWM_H, 2, pwm) - Spi.write(Register.MOTOR_2_CTRL, 1, mode) + Spi.write(Register.MOTOR_2_CTRL, 1, int(mode)) elif port == 3: Spi.write(Register.MOTOR_3_PWM_H, 2, pwm) - Spi.write(Register.MOTOR_3_CTRL, 1, mode) + Spi.write(Register.MOTOR_3_CTRL, 1, int(mode)) elif port == 4: Spi.write(Register.MOTOR_4_PWM_H, 2, pwm) - Spi.write(Register.MOTOR_4_CTRL, 1, mode) + Spi.write(Register.MOTOR_4_CTRL, 1, int(mode)) @staticmethod def power(port: int, percent: float): diff --git a/compLib/Spi.py b/compLib/Spi.py index a458e81..0e53dd2 100644 --- a/compLib/Spi.py +++ b/compLib/Spi.py @@ -1,8 +1,9 @@ import spidev from threading import Thread, Lock -from enum import Enum +from enum import IntEnum +import time -from LogstashLogging import logstash_logger +from compLib.LogstashLogging import logstash_logger SPI_BUS = 1 SPI_DEVICE = 2 @@ -17,67 +18,7 @@ spi.bits_per_word = 8 spi_mutex = Lock() - -class Spi(object): - - @staticmethod - def transfer(tx_buffer: list): - write_reg = tx_buffer[1] - print(tx_buffer) - spi.xfer(tx_buffer) - - rx_buffer = spi.xfer([0] * 32) - - if rx_buffer[1] != write_reg: - logstash_logger.error(f"SPI error during write to register {tx_buffer[0]}!") - - return rx_buffer - - @staticmethod - def read(reg: int, length: int): - tx_buf = [0] * SPI_BUFFER_SIZE - - tx_buf[0] = 0 - tx_buf[1] = reg - tx_buf[2] = length - - rx_buf = Spi.transfer(tx_buf) - return int.from_bytes(rx_buf[2:2 + length], byteorder='big', signed=False) - - @staticmethod - def write(reg: int, length: int, value: int): - tx_buf = [0] * SPI_BUFFER_SIZE - - tx_buf[0] = 1 - tx_buf[1] = reg - tx_buf[2] = length - - pos = 3 - for i in value.to_bytes(length, 'big'): - tx_buf[pos] = i - pos += 1 - - rx_buf = Spi.transfer(tx_buf) - return int.from_bytes(rx_buf[2:2 + length], byteorder='big', signed=False) - - @staticmethod - def write_array(reg: int, length: int, values: list[int]): - tx_buf = [0] * SPI_BUFFER_SIZE - - tx_buf[0] = 1 - tx_buf[1] = reg - tx_buf[2] = length - - pos = 3 - for i in values: - tx_buf[pos] = i - pos += 1 - - rx_buf = Spi.transfer(tx_buf) - return rx_buf - - -class Register(Enum): +class Register(IntEnum): IDENTIFICATION_MODEL_ID = 1, IDENTIFICATION_MODEL_REV_MAJOR = 2, IDENTIFICATION_MODEL_REV_MINOR = 3, @@ -147,3 +88,87 @@ class Register(Enum): DISPLAY_LINE_2_C0 = 79, DISPLAY_LINE_3_C0 = 95, DISPLAY_LINE_4_C0 = 111 + + +class Spi(object): + + @staticmethod + def transfer(tx_buffer: list): + write_reg = tx_buffer[1] + + spi.xfer(tx_buffer) + rx_buffer = spi.xfer([0] * 32) + + if rx_buffer[1] != write_reg: + logstash_logger.error(f"SPI error during read/write of register {write_reg}!") + + return rx_buffer + + @staticmethod + def read(reg: int, length: int): + if not type(reg) is int: + reg = int(reg) + + tx_buf = [0] * SPI_BUFFER_SIZE + + tx_buf[0] = 0 + tx_buf[1] = reg + tx_buf[2] = length + + rx_buf = Spi.transfer(tx_buf) + return int.from_bytes(rx_buf[2:2 + length], byteorder='big', signed=False) + + @staticmethod + def write(reg: int, length: int, value: int): + if not type(reg) is int: + reg = int(reg) + + tx_buf = [0] * SPI_BUFFER_SIZE + + tx_buf[0] = 1 + tx_buf[1] = reg + tx_buf[2] = length + + pos = 3 + for i in value.to_bytes(length, 'big'): + tx_buf[pos] = i + pos += 1 + + rx_buf = Spi.transfer(tx_buf) + return int.from_bytes(rx_buf[2:2 + length], byteorder='big', signed=False) + + @staticmethod + def write_array(reg: int, length: int, values): + if not type(reg) is int: + reg = int(reg) + + tx_buf = [0] * SPI_BUFFER_SIZE + + tx_buf[0] = 1 + tx_buf[1] = reg + tx_buf[2] = length + + pos = 3 + for i in values: + tx_buf[pos] = i + pos += 1 + + rx_buf = Spi.transfer(tx_buf) + return rx_buf + + @staticmethod + def health_check(): + if Spi.read(Register.IDENTIFICATION_MODEL_ID, 1) != 3: + logstash_logger.error(f"Unable to read Version! Make sure the mainboard is connected!") + quit() + + @staticmethod + def health_check_loop(): + while True: + Spi.health_check() + time.sleep(0.5) + + +health_check_thread = Thread(target=Spi.health_check_loop) +health_check_thread.setDaemon(True) +health_check_thread.start() \ No newline at end of file diff --git a/compLib/__init__.py b/compLib/__init__.py index 649f355..bc17135 100644 --- a/compLib/__init__.py +++ b/compLib/__init__.py @@ -1,6 +1,7 @@ __version__ = "0.1.5-1" import compLib.LogstashLogging +import compLib.Spi import logging import apt @@ -14,3 +15,5 @@ try: except Exception as e: compLib.LogstashLogging.Logging.get_logger().error(f"error during checking apt package version -> {str(e)}") print(f"\033[91merror during checking apt package version -> {str(e)}\033[0m\n") + +compLib.Spi.Spi.health_check() \ No newline at end of file