Add spi
This commit is contained in:
parent
4bf031a6c7
commit
530ddd8be7
7 changed files with 62 additions and 283 deletions
|
@ -1,38 +0,0 @@
|
|||
import smbus
|
||||
|
||||
from compLib.LogstashLogging import Logging
|
||||
|
||||
SINGLE_ENDED = 0x84
|
||||
ADDRESS = 0x48
|
||||
|
||||
bus = smbus.SMBus(1)
|
||||
|
||||
|
||||
class ADC:
|
||||
|
||||
@staticmethod
|
||||
def read(channel) -> float:
|
||||
"""
|
||||
Read from adc channel
|
||||
0 -> Left IR Sensor
|
||||
1 -> Right IR Sensor
|
||||
2 -> Battery voltage / 3
|
||||
:param channel: Channel between 0 and 2
|
||||
:return: voltage
|
||||
"""
|
||||
|
||||
"""Select the Command data from the given provided value above"""
|
||||
command = SINGLE_ENDED | ((((channel << 2) | (channel >> 1)) & 0x07) << 4)
|
||||
bus.write_byte(ADDRESS, command)
|
||||
|
||||
while "Koka is great":
|
||||
value1 = bus.read_byte(ADDRESS)
|
||||
value2 = bus.read_byte(ADDRESS)
|
||||
if value1 == value2:
|
||||
break
|
||||
|
||||
voltage = value1 / 255.0 * 3.3 # calculate the voltage value
|
||||
voltage = round(voltage, 2)
|
||||
|
||||
Logging.get_logger().debug(f"ADC.read {channel} = {voltage}")
|
||||
return voltage
|
|
@ -1,27 +0,0 @@
|
|||
from compLib.ADC import ADC
|
||||
from compLib.LogstashLogging import Logging
|
||||
|
||||
BATTERY_CHANNEL = 2
|
||||
BATTERY_COUNT = 2
|
||||
BATTERY_MULTIPLIER = 3
|
||||
BATTERY_MIN_VOLTAGE = 3.4
|
||||
BATTERY_MAX_VOLTAGE = 4.2
|
||||
|
||||
adc = ADC()
|
||||
|
||||
|
||||
class Battery(object):
|
||||
"""Used to interact with the battery
|
||||
"""
|
||||
|
||||
@staticmethod
|
||||
def percent() -> int:
|
||||
"""Get battery percentage
|
||||
|
||||
:return: Percentage between 0 and 100
|
||||
:rtype: int
|
||||
"""
|
||||
voltage = adc.read(BATTERY_CHANNEL) * BATTERY_MULTIPLIER
|
||||
result = int(((voltage / BATTERY_COUNT) - BATTERY_MIN_VOLTAGE) / (BATTERY_MAX_VOLTAGE - BATTERY_MIN_VOLTAGE) * 100)
|
||||
Logging.get_logger().debug(f"Battery.percent = {result}")
|
||||
return result
|
|
@ -1,31 +0,0 @@
|
|||
import atexit
|
||||
import RPi.GPIO as GPIO
|
||||
|
||||
from compLib.LogstashLogging import Logging
|
||||
|
||||
GPIO.setwarnings(False)
|
||||
Buzzer_Pin = 17
|
||||
GPIO.setmode(GPIO.BCM)
|
||||
GPIO.setup(Buzzer_Pin, GPIO.OUT)
|
||||
|
||||
|
||||
class Buzzer:
|
||||
"""Used to interact with the buzzer
|
||||
"""
|
||||
|
||||
@staticmethod
|
||||
def set(on: bool):
|
||||
"""Turn the buzzer on / off
|
||||
|
||||
:param on: True if on, False if off
|
||||
"""
|
||||
Logging.get_logger().debug(f"Buzzer.set {on}")
|
||||
|
||||
GPIO.output(Buzzer_Pin, on)
|
||||
|
||||
@staticmethod
|
||||
def exit():
|
||||
Buzzer.set(False)
|
||||
|
||||
|
||||
atexit.register(Buzzer.exit)
|
|
@ -1,11 +1,7 @@
|
|||
import atexit
|
||||
|
||||
from compLib.PCA9685 import PCA9685
|
||||
from compLib.LogstashLogging import Logging
|
||||
|
||||
pwm = PCA9685(0x40, debug=True)
|
||||
pwm.setPWMFreq(50)
|
||||
|
||||
MOTOR_COUNT = 4
|
||||
MAX_MOTOR_SPEED = 4095.0
|
||||
MOTOR_PERCENTAGE_MULT = MAX_MOTOR_SPEED / 100.0
|
||||
|
|
|
@ -1,76 +0,0 @@
|
|||
#!/usr/bin/python
|
||||
|
||||
import math
|
||||
import time
|
||||
|
||||
import smbus
|
||||
|
||||
|
||||
# ============================================================================
|
||||
# Raspi PCA9685 16-Channel PWM Servo Driver
|
||||
# ============================================================================
|
||||
|
||||
class PCA9685:
|
||||
# Registers/etc.
|
||||
__SUBADR1 = 0x02
|
||||
__SUBADR2 = 0x03
|
||||
__SUBADR3 = 0x04
|
||||
__MODE1 = 0x00
|
||||
__PRESCALE = 0xFE
|
||||
__LED0_ON_L = 0x06
|
||||
__LED0_ON_H = 0x07
|
||||
__LED0_OFF_L = 0x08
|
||||
__LED0_OFF_H = 0x09
|
||||
__ALLLED_ON_L = 0xFA
|
||||
__ALLLED_ON_H = 0xFB
|
||||
__ALLLED_OFF_L = 0xFC
|
||||
__ALLLED_OFF_H = 0xFD
|
||||
|
||||
def __init__(self, address=0x40, debug=False):
|
||||
self.bus = smbus.SMBus(1)
|
||||
self.address = address
|
||||
self.debug = debug
|
||||
self.write(self.__MODE1, 0x00)
|
||||
|
||||
def write(self, reg, value):
|
||||
"Writes an 8-bit value to the specified register/address"
|
||||
self.bus.write_byte_data(self.address, reg, value)
|
||||
|
||||
def read(self, reg):
|
||||
"Read an unsigned byte from the I2C device"
|
||||
result = self.bus.read_byte_data(self.address, reg)
|
||||
return result
|
||||
|
||||
def setPWMFreq(self, freq):
|
||||
"Sets the PWM frequency"
|
||||
prescaleval = 25000000.0 # 25MHz
|
||||
prescaleval /= 4096.0 # 12-bit
|
||||
prescaleval /= float(freq)
|
||||
prescaleval -= 1.0
|
||||
|
||||
oldmode = self.read(self.__MODE1)
|
||||
newmode = (oldmode & 0x7F) | 0x10 # sleep
|
||||
self.write(self.__MODE1, newmode) # go to sleep
|
||||
self.write(self.__PRESCALE, int(round(prescaleval)))
|
||||
self.write(self.__MODE1, oldmode)
|
||||
time.sleep(0.005)
|
||||
self.write(self.__MODE1, oldmode | 0x80)
|
||||
|
||||
def setPWM(self, channel, on, off):
|
||||
"Sets a single PWM channel"
|
||||
self.write(self.__LED0_ON_L + 4 * channel, on & 0xFF)
|
||||
self.write(self.__LED0_ON_H + 4 * channel, on >> 8)
|
||||
self.write(self.__LED0_OFF_L + 4 * channel, off & 0xFF)
|
||||
self.write(self.__LED0_OFF_H + 4 * channel, off >> 8)
|
||||
|
||||
def setMotorPwm(self, channel, duty):
|
||||
self.setPWM(channel, 0, duty)
|
||||
|
||||
def setServoPulse(self, channel, pulse):
|
||||
"Sets the Servo Pulse,The PWM frequency must be 50HZ"
|
||||
pulse = float(pulse) * (4096.0 / 20000.0) # PWM frequency is 50HZ,the period is 20000us
|
||||
self.setPWM(channel, 0, int(pulse))
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
pass
|
62
compLib/Spi.py
Normal file
62
compLib/Spi.py
Normal file
|
@ -0,0 +1,62 @@
|
|||
import spidev
|
||||
from threading import Thread, Lock
|
||||
from enum import Enum
|
||||
|
||||
# from LogstashLogging import logstash_logger
|
||||
|
||||
SPI_BUS = 1
|
||||
SPI_DEVICE = 2
|
||||
SPI_SPEED = 1000000
|
||||
SPI_BUFFER_SIZE = 32
|
||||
|
||||
spi = spidev.SpiDev()
|
||||
spi.open(SPI_BUS, SPI_DEVICE)
|
||||
spi.max_speed_hz = SPI_SPEED
|
||||
spi.mode = 0
|
||||
spi.bits_per_word = 8
|
||||
|
||||
spi_mutex = Lock()
|
||||
|
||||
|
||||
class Spi(object):
|
||||
|
||||
@staticmethod
|
||||
def transfer(tx_buffer: list):
|
||||
write_reg = tx_buffer[1]
|
||||
print(tx_buffer)
|
||||
spi.xfer(tx_buffer)
|
||||
|
||||
rx_buffer = spi.xfer([0] * 32)
|
||||
|
||||
if rx_buffer[1] != write_reg:
|
||||
# logstash_logger.error(f"SPI error during write to register {tx_buffer[0]}!")
|
||||
print(f"SPI error during write to register {write_reg}!")
|
||||
|
||||
return rx_buffer
|
||||
|
||||
@staticmethod
|
||||
def read(reg: int, length: int):
|
||||
tx_buf = [0] * SPI_BUFFER_SIZE
|
||||
|
||||
tx_buf[0] = 0
|
||||
tx_buf[1] = reg
|
||||
tx_buf[2] = length
|
||||
|
||||
rx_buf = Spi.transfer(tx_buf)
|
||||
return int.from_bytes(rx_buf[2:2 + length], byteorder='big', signed=False)
|
||||
|
||||
@staticmethod
|
||||
def write(reg: int, length: int, value: int):
|
||||
tx_buf = [0] * SPI_BUFFER_SIZE
|
||||
|
||||
tx_buf[0] = 1
|
||||
tx_buf[1] = reg
|
||||
tx_buf[2] = length
|
||||
|
||||
pos = 3
|
||||
for i in value.to_bytes(length, 'big'):
|
||||
tx_buf[pos] = i
|
||||
pos += 1
|
||||
|
||||
rx_buf = Spi.transfer(tx_buf)
|
||||
return int.from_bytes(rx_buf[2:2 + length], byteorder='big', signed=False)
|
|
@ -1,107 +0,0 @@
|
|||
import time
|
||||
import RPi.GPIO as GPIO
|
||||
|
||||
trigger_pin = 27
|
||||
echo_pin = 22
|
||||
speed_of_sound_mps = 340 # in m/s
|
||||
|
||||
rising_found = False
|
||||
rising_time = 0
|
||||
falling_found = False
|
||||
falling_time = 0
|
||||
|
||||
class Ultrasonic:
|
||||
|
||||
@staticmethod
|
||||
def __send_trigger():
|
||||
GPIO.setup(trigger_pin, GPIO.OUT)
|
||||
GPIO.output(trigger_pin, False) # Ensure a clean transition
|
||||
time.sleep(0.000005)
|
||||
GPIO.output(trigger_pin, True) # Trigger now
|
||||
time.sleep(0.000010)
|
||||
GPIO.output(trigger_pin, False)
|
||||
|
||||
@staticmethod
|
||||
def __signal_start_callback(channel):
|
||||
global rising_found
|
||||
global rising_time
|
||||
rising_found = True
|
||||
rising_time = time.process_time_ns()
|
||||
|
||||
@staticmethod
|
||||
def __signal_end_callback(channel):
|
||||
global falling_found
|
||||
global falling_time
|
||||
falling_found = True
|
||||
falling_time = time.process_time_ns()
|
||||
|
||||
@staticmethod
|
||||
def __measure_pulse(timeout = 1000000) -> int:
|
||||
""" Measure the length of a pulse in us. When either the pulse doesn't start in [timeout] us
|
||||
or the pulse is longer than [timeout] us, 0 is returned.
|
||||
|
||||
:param timeout: Timeout in us
|
||||
:return: The length of the pulse or 0 on timeout
|
||||
:rtype: int
|
||||
"""
|
||||
|
||||
global rising_found
|
||||
global rising_time
|
||||
global falling_found
|
||||
global falling_time
|
||||
|
||||
# Setup pin
|
||||
GPIO.setup(echo_pin, GPIO.IN)
|
||||
|
||||
# Wait for the rising edge
|
||||
rising_found = False
|
||||
GPIO.add_event_detect(echo_pin, GPIO.RISING, callback = Ultrasonic.__signal_start_callback)
|
||||
start = time.process_time_ns()
|
||||
while (time.process_time_ns() < start + timeout * 1000 and not rising_found):
|
||||
pass
|
||||
|
||||
GPIO.remove_event_detect(echo_pin)
|
||||
if (not rising_found):
|
||||
return 0
|
||||
|
||||
# Wait for the falling edge
|
||||
falling_found = False
|
||||
GPIO.add_event_detect(echo_pin, GPIO.FALLING, callback = Ultrasonic.__signal_end_callback)
|
||||
start = time.process_time_ns()
|
||||
while (time.process_time_ns() < start + timeout * 1000 and not falling_found):
|
||||
pass
|
||||
|
||||
GPIO.remove_event_detect(echo_pin)
|
||||
if (not falling_found):
|
||||
return 0
|
||||
|
||||
# Finally, return the time difference
|
||||
return int((falling_time - rising_time) / 1000)
|
||||
|
||||
@staticmethod
|
||||
def get_distance() -> float:
|
||||
""" Get the distance from the Ultrasonic sensor in cm.
|
||||
Returns 'NaN' when the result is invalid.
|
||||
|
||||
:return: The distance in cm or 'NaN'
|
||||
:rtype: float
|
||||
"""
|
||||
GPIO.setmode(GPIO.BCM)
|
||||
|
||||
# Trigger the measurement
|
||||
Ultrasonic.__send_trigger()
|
||||
|
||||
# The pulse length in us
|
||||
pulse = Ultrasonic.__measure_pulse()
|
||||
|
||||
if (pulse == 0):
|
||||
print("invalid")
|
||||
return float('NaN')
|
||||
|
||||
# Convert the speed of sound from m/s to cm/us for convenience
|
||||
speed_of_sound_cmpus = speed_of_sound_mps / 10000
|
||||
|
||||
# Convert us time to cm distance
|
||||
distance = (pulse / 2) * speed_of_sound_cmpus
|
||||
|
||||
return distance
|
Reference in a new issue