diff --git a/compLib/ADC.py b/compLib/ADC.py deleted file mode 100644 index b4297ab..0000000 --- a/compLib/ADC.py +++ /dev/null @@ -1,38 +0,0 @@ -import smbus - -from compLib.LogstashLogging import Logging - -SINGLE_ENDED = 0x84 -ADDRESS = 0x48 - -bus = smbus.SMBus(1) - - -class ADC: - - @staticmethod - def read(channel) -> float: - """ - Read from adc channel - 0 -> Left IR Sensor - 1 -> Right IR Sensor - 2 -> Battery voltage / 3 - :param channel: Channel between 0 and 2 - :return: voltage - """ - - """Select the Command data from the given provided value above""" - command = SINGLE_ENDED | ((((channel << 2) | (channel >> 1)) & 0x07) << 4) - bus.write_byte(ADDRESS, command) - - while "Koka is great": - value1 = bus.read_byte(ADDRESS) - value2 = bus.read_byte(ADDRESS) - if value1 == value2: - break - - voltage = value1 / 255.0 * 3.3 # calculate the voltage value - voltage = round(voltage, 2) - - Logging.get_logger().debug(f"ADC.read {channel} = {voltage}") - return voltage diff --git a/compLib/Battery.py b/compLib/Battery.py deleted file mode 100644 index 38f43a8..0000000 --- a/compLib/Battery.py +++ /dev/null @@ -1,27 +0,0 @@ -from compLib.ADC import ADC -from compLib.LogstashLogging import Logging - -BATTERY_CHANNEL = 2 -BATTERY_COUNT = 2 -BATTERY_MULTIPLIER = 3 -BATTERY_MIN_VOLTAGE = 3.4 -BATTERY_MAX_VOLTAGE = 4.2 - -adc = ADC() - - -class Battery(object): - """Used to interact with the battery - """ - - @staticmethod - def percent() -> int: - """Get battery percentage - - :return: Percentage between 0 and 100 - :rtype: int - """ - voltage = adc.read(BATTERY_CHANNEL) * BATTERY_MULTIPLIER - result = int(((voltage / BATTERY_COUNT) - BATTERY_MIN_VOLTAGE) / (BATTERY_MAX_VOLTAGE - BATTERY_MIN_VOLTAGE) * 100) - Logging.get_logger().debug(f"Battery.percent = {result}") - return result diff --git a/compLib/Buzzer.py b/compLib/Buzzer.py deleted file mode 100644 index 4b1329d..0000000 --- a/compLib/Buzzer.py +++ /dev/null @@ -1,31 +0,0 @@ -import atexit -import RPi.GPIO as GPIO - -from compLib.LogstashLogging import Logging - -GPIO.setwarnings(False) -Buzzer_Pin = 17 -GPIO.setmode(GPIO.BCM) -GPIO.setup(Buzzer_Pin, GPIO.OUT) - - -class Buzzer: - """Used to interact with the buzzer - """ - - @staticmethod - def set(on: bool): - """Turn the buzzer on / off - - :param on: True if on, False if off - """ - Logging.get_logger().debug(f"Buzzer.set {on}") - - GPIO.output(Buzzer_Pin, on) - - @staticmethod - def exit(): - Buzzer.set(False) - - -atexit.register(Buzzer.exit) diff --git a/compLib/Motor.py b/compLib/Motor.py index 664acf9..f604936 100644 --- a/compLib/Motor.py +++ b/compLib/Motor.py @@ -1,11 +1,7 @@ import atexit -from compLib.PCA9685 import PCA9685 from compLib.LogstashLogging import Logging -pwm = PCA9685(0x40, debug=True) -pwm.setPWMFreq(50) - MOTOR_COUNT = 4 MAX_MOTOR_SPEED = 4095.0 MOTOR_PERCENTAGE_MULT = MAX_MOTOR_SPEED / 100.0 diff --git a/compLib/PCA9685.py b/compLib/PCA9685.py deleted file mode 100644 index 1389e55..0000000 --- a/compLib/PCA9685.py +++ /dev/null @@ -1,76 +0,0 @@ -#!/usr/bin/python - -import math -import time - -import smbus - - -# ============================================================================ -# Raspi PCA9685 16-Channel PWM Servo Driver -# ============================================================================ - -class PCA9685: - # Registers/etc. - __SUBADR1 = 0x02 - __SUBADR2 = 0x03 - __SUBADR3 = 0x04 - __MODE1 = 0x00 - __PRESCALE = 0xFE - __LED0_ON_L = 0x06 - __LED0_ON_H = 0x07 - __LED0_OFF_L = 0x08 - __LED0_OFF_H = 0x09 - __ALLLED_ON_L = 0xFA - __ALLLED_ON_H = 0xFB - __ALLLED_OFF_L = 0xFC - __ALLLED_OFF_H = 0xFD - - def __init__(self, address=0x40, debug=False): - self.bus = smbus.SMBus(1) - self.address = address - self.debug = debug - self.write(self.__MODE1, 0x00) - - def write(self, reg, value): - "Writes an 8-bit value to the specified register/address" - self.bus.write_byte_data(self.address, reg, value) - - def read(self, reg): - "Read an unsigned byte from the I2C device" - result = self.bus.read_byte_data(self.address, reg) - return result - - def setPWMFreq(self, freq): - "Sets the PWM frequency" - prescaleval = 25000000.0 # 25MHz - prescaleval /= 4096.0 # 12-bit - prescaleval /= float(freq) - prescaleval -= 1.0 - - oldmode = self.read(self.__MODE1) - newmode = (oldmode & 0x7F) | 0x10 # sleep - self.write(self.__MODE1, newmode) # go to sleep - self.write(self.__PRESCALE, int(round(prescaleval))) - self.write(self.__MODE1, oldmode) - time.sleep(0.005) - self.write(self.__MODE1, oldmode | 0x80) - - def setPWM(self, channel, on, off): - "Sets a single PWM channel" - self.write(self.__LED0_ON_L + 4 * channel, on & 0xFF) - self.write(self.__LED0_ON_H + 4 * channel, on >> 8) - self.write(self.__LED0_OFF_L + 4 * channel, off & 0xFF) - self.write(self.__LED0_OFF_H + 4 * channel, off >> 8) - - def setMotorPwm(self, channel, duty): - self.setPWM(channel, 0, duty) - - def setServoPulse(self, channel, pulse): - "Sets the Servo Pulse,The PWM frequency must be 50HZ" - pulse = float(pulse) * (4096.0 / 20000.0) # PWM frequency is 50HZ,the period is 20000us - self.setPWM(channel, 0, int(pulse)) - - -if __name__ == '__main__': - pass diff --git a/compLib/Spi.py b/compLib/Spi.py new file mode 100644 index 0000000..227d198 --- /dev/null +++ b/compLib/Spi.py @@ -0,0 +1,62 @@ +import spidev +from threading import Thread, Lock +from enum import Enum + +# from LogstashLogging import logstash_logger + +SPI_BUS = 1 +SPI_DEVICE = 2 +SPI_SPEED = 1000000 +SPI_BUFFER_SIZE = 32 + +spi = spidev.SpiDev() +spi.open(SPI_BUS, SPI_DEVICE) +spi.max_speed_hz = SPI_SPEED +spi.mode = 0 +spi.bits_per_word = 8 + +spi_mutex = Lock() + + +class Spi(object): + + @staticmethod + def transfer(tx_buffer: list): + write_reg = tx_buffer[1] + print(tx_buffer) + spi.xfer(tx_buffer) + + rx_buffer = spi.xfer([0] * 32) + + if rx_buffer[1] != write_reg: + # logstash_logger.error(f"SPI error during write to register {tx_buffer[0]}!") + print(f"SPI error during write to register {write_reg}!") + + return rx_buffer + + @staticmethod + def read(reg: int, length: int): + tx_buf = [0] * SPI_BUFFER_SIZE + + tx_buf[0] = 0 + tx_buf[1] = reg + tx_buf[2] = length + + rx_buf = Spi.transfer(tx_buf) + return int.from_bytes(rx_buf[2:2 + length], byteorder='big', signed=False) + + @staticmethod + def write(reg: int, length: int, value: int): + tx_buf = [0] * SPI_BUFFER_SIZE + + tx_buf[0] = 1 + tx_buf[1] = reg + tx_buf[2] = length + + pos = 3 + for i in value.to_bytes(length, 'big'): + tx_buf[pos] = i + pos += 1 + + rx_buf = Spi.transfer(tx_buf) + return int.from_bytes(rx_buf[2:2 + length], byteorder='big', signed=False) \ No newline at end of file diff --git a/compLib/Ultrasonic.py b/compLib/Ultrasonic.py deleted file mode 100644 index 2ecbf60..0000000 --- a/compLib/Ultrasonic.py +++ /dev/null @@ -1,107 +0,0 @@ -import time -import RPi.GPIO as GPIO - -trigger_pin = 27 -echo_pin = 22 -speed_of_sound_mps = 340 # in m/s - -rising_found = False -rising_time = 0 -falling_found = False -falling_time = 0 - -class Ultrasonic: - - @staticmethod - def __send_trigger(): - GPIO.setup(trigger_pin, GPIO.OUT) - GPIO.output(trigger_pin, False) # Ensure a clean transition - time.sleep(0.000005) - GPIO.output(trigger_pin, True) # Trigger now - time.sleep(0.000010) - GPIO.output(trigger_pin, False) - - @staticmethod - def __signal_start_callback(channel): - global rising_found - global rising_time - rising_found = True - rising_time = time.process_time_ns() - - @staticmethod - def __signal_end_callback(channel): - global falling_found - global falling_time - falling_found = True - falling_time = time.process_time_ns() - - @staticmethod - def __measure_pulse(timeout = 1000000) -> int: - """ Measure the length of a pulse in us. When either the pulse doesn't start in [timeout] us - or the pulse is longer than [timeout] us, 0 is returned. - - :param timeout: Timeout in us - :return: The length of the pulse or 0 on timeout - :rtype: int - """ - - global rising_found - global rising_time - global falling_found - global falling_time - - # Setup pin - GPIO.setup(echo_pin, GPIO.IN) - - # Wait for the rising edge - rising_found = False - GPIO.add_event_detect(echo_pin, GPIO.RISING, callback = Ultrasonic.__signal_start_callback) - start = time.process_time_ns() - while (time.process_time_ns() < start + timeout * 1000 and not rising_found): - pass - - GPIO.remove_event_detect(echo_pin) - if (not rising_found): - return 0 - - # Wait for the falling edge - falling_found = False - GPIO.add_event_detect(echo_pin, GPIO.FALLING, callback = Ultrasonic.__signal_end_callback) - start = time.process_time_ns() - while (time.process_time_ns() < start + timeout * 1000 and not falling_found): - pass - - GPIO.remove_event_detect(echo_pin) - if (not falling_found): - return 0 - - # Finally, return the time difference - return int((falling_time - rising_time) / 1000) - - @staticmethod - def get_distance() -> float: - """ Get the distance from the Ultrasonic sensor in cm. - Returns 'NaN' when the result is invalid. - - :return: The distance in cm or 'NaN' - :rtype: float - """ - GPIO.setmode(GPIO.BCM) - - # Trigger the measurement - Ultrasonic.__send_trigger() - - # The pulse length in us - pulse = Ultrasonic.__measure_pulse() - - if (pulse == 0): - print("invalid") - return float('NaN') - - # Convert the speed of sound from m/s to cm/us for convenience - speed_of_sound_cmpus = speed_of_sound_mps / 10000 - - # Convert us time to cm distance - distance = (pulse / 2) * speed_of_sound_cmpus - - return distance