This commit is contained in:
Konstantin Lampalzer 2022-10-07 16:35:37 +02:00
parent c16a7172e7
commit d4a7d8c0c0
186 changed files with 1 additions and 7056 deletions

312
client/compLib/.gitignore vendored Normal file
View file

@ -0,0 +1,312 @@
# Created by https://www.toptal.com/developers/gitignore/api/macos,python,pycharm
# Edit at https://www.toptal.com/developers/gitignore?templates=macos,python,pycharm
### macOS ###
# General
.DS_Store
.AppleDouble
.LSOverride
# Icon must end with two \r
Icon
# Thumbnails
._*
# Files that might appear in the root of a volume
.DocumentRevisions-V100
.fseventsd
.Spotlight-V100
.TemporaryItems
.Trashes
.VolumeIcon.icns
.com.apple.timemachine.donotpresent
# Directories potentially created on remote AFP share
.AppleDB
.AppleDesktop
Network Trash Folder
Temporary Items
.apdisk
### macOS Patch ###
# iCloud generated files
*.icloud
### PyCharm ###
# Covers JetBrains IDEs: IntelliJ, RubyMine, PhpStorm, AppCode, PyCharm, CLion, Android Studio, WebStorm and Rider
# Reference: https://intellij-support.jetbrains.com/hc/en-us/articles/206544839
# User-specific stuff
.idea/**/workspace.xml
.idea/**/tasks.xml
.idea/**/usage.statistics.xml
.idea/**/dictionaries
.idea/**/shelf
# AWS User-specific
.idea/**/aws.xml
# Generated files
.idea/**/contentModel.xml
# Sensitive or high-churn files
.idea/**/dataSources/
.idea/**/dataSources.ids
.idea/**/dataSources.local.xml
.idea/**/sqlDataSources.xml
.idea/**/dynamic.xml
.idea/**/uiDesigner.xml
.idea/**/dbnavigator.xml
# Gradle
.idea/**/gradle.xml
.idea/**/libraries
# Gradle and Maven with auto-import
# When using Gradle or Maven with auto-import, you should exclude module files,
# since they will be recreated, and may cause churn. Uncomment if using
# auto-import.
# .idea/artifacts
# .idea/compiler.xml
# .idea/jarRepositories.xml
# .idea/modules.xml
# .idea/*.iml
# .idea/modules
# *.iml
# *.ipr
# CMake
cmake-build-*/
# Mongo Explorer plugin
.idea/**/mongoSettings.xml
# File-based project format
*.iws
# IntelliJ
out/
# mpeltonen/sbt-idea plugin
.idea_modules/
# JIRA plugin
atlassian-ide-plugin.xml
# Cursive Clojure plugin
.idea/replstate.xml
# SonarLint plugin
.idea/sonarlint/
# Crashlytics plugin (for Android Studio and IntelliJ)
com_crashlytics_export_strings.xml
crashlytics.properties
crashlytics-build.properties
fabric.properties
# Editor-based Rest Client
.idea/httpRequests
# Android studio 3.1+ serialized cache file
.idea/caches/build_file_checksums.ser
### PyCharm Patch ###
# Comment Reason: https://github.com/joeblau/gitignore.io/issues/186#issuecomment-215987721
# *.iml
# modules.xml
# .idea/misc.xml
# *.ipr
# Sonarlint plugin
# https://plugins.jetbrains.com/plugin/7973-sonarlint
.idea/**/sonarlint/
# SonarQube Plugin
# https://plugins.jetbrains.com/plugin/7238-sonarqube-community-plugin
.idea/**/sonarIssues.xml
# Markdown Navigator plugin
# https://plugins.jetbrains.com/plugin/7896-markdown-navigator-enhanced
.idea/**/markdown-navigator.xml
.idea/**/markdown-navigator-enh.xml
.idea/**/markdown-navigator/
# Cache file creation bug
# See https://youtrack.jetbrains.com/issue/JBR-2257
.idea/$CACHE_FILE$
# CodeStream plugin
# https://plugins.jetbrains.com/plugin/12206-codestream
.idea/codestream.xml
# Azure Toolkit for IntelliJ plugin
# https://plugins.jetbrains.com/plugin/8053-azure-toolkit-for-intellij
.idea/**/azureSettings.xml
### Python ###
# Byte-compiled / optimized / DLL files
__pycache__/
*.py[cod]
*$py.class
# C extensions
*.so
# Distribution / packaging
.Python
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/
wheels/
share/python-wheels/
*.egg-info/
.installed.cfg
*.egg
MANIFEST
# PyInstaller
# Usually these files are written by a python script from a template
# before PyInstaller builds the exe, so as to inject date/other infos into it.
*.manifest
*.spec
# Installer logs
pip-log.txt
pip-delete-this-directory.txt
# Unit test / coverage reports
htmlcov/
.tox/
.nox/
.coverage
.coverage.*
.cache
nosetests.xml
coverage.xml
*.cover
*.py,cover
.hypothesis/
.pytest_cache/
cover/
# Translations
*.mo
*.pot
# Django stuff:
*.log
local_settings.py
db.sqlite3
db.sqlite3-journal
# Flask stuff:
instance/
.webassets-cache
# Scrapy stuff:
.scrapy
# Sphinx documentation
docs/_build/
# PyBuilder
.pybuilder/
target/
# Jupyter Notebook
.ipynb_checkpoints
# IPython
profile_default/
ipython_config.py
# pyenv
# For a library or package, you might want to ignore these files since the code is
# intended to run in multiple environments; otherwise, check them in:
# .python-version
# pipenv
# According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control.
# However, in case of collaboration, if having platform-specific dependencies or dependencies
# having no cross-platform support, pipenv may install dependencies that don't work, or not
# install all needed dependencies.
#Pipfile.lock
# poetry
# Similar to Pipfile.lock, it is generally recommended to include poetry.lock in version control.
# This is especially recommended for binary packages to ensure reproducibility, and is more
# commonly ignored for libraries.
# https://python-poetry.org/docs/basic-usage/#commit-your-poetrylock-file-to-version-control
#poetry.lock
# pdm
# Similar to Pipfile.lock, it is generally recommended to include pdm.lock in version control.
#pdm.lock
# pdm stores project-wide configurations in .pdm.toml, but it is recommended to not include it
# in version control.
# https://pdm.fming.dev/#use-with-ide
.pdm.toml
# PEP 582; used by e.g. github.com/David-OConnor/pyflow and github.com/pdm-project/pdm
__pypackages__/
# Celery stuff
celerybeat-schedule
celerybeat.pid
# SageMath parsed files
*.sage.py
# Environments
.env
.venv
env/
venv/
ENV/
env.bak/
venv.bak/
# Spyder project settings
.spyderproject
.spyproject
# Rope project settings
.ropeproject
# mkdocs documentation
/site
# mypy
.mypy_cache/
.dmypy.json
dmypy.json
# Pyre type checker
.pyre/
# pytype static type analyzer
.pytype/
# Cython debug symbols
cython_debug/
# PyCharm
# JetBrains specific template is maintained in a separate JetBrains.gitignore that can
# be found at https://github.com/github/gitignore/blob/main/Global/JetBrains.gitignore
# and can be added to the global gitignore or merged into this file. For a more nuclear
# option (not recommended) you can uncomment the following to ignore the entire idea folder.
#.idea/
# End of https://www.toptal.com/developers/gitignore/api/macos,python,pycharm

70
client/compLib/Api.py Normal file
View file

@ -0,0 +1,70 @@
import json
import logging
import os
from typing import Dict, Tuple, List
import requests
logger = logging.getLogger("seeding-api")
API_URL = os.getenv("API_URL", "http://localhost:5000/") + "api/"
CONF_URL = os.getenv("API_URL", "http://localhost:5000/") + "config/"
api_override = os.getenv("API_FORCE", "")
if api_override != "":
print(f"API_URL was set to {API_URL} but was overwritten with {api_override}")
API_URL = api_override
API_URL_GET_HEU = API_URL + "getHeuballen"
API_URL_GET_LOGISTIC_PLAN = API_URL + "getLogisticPlan"
API_URL_GET_MATERIAL_DELIVERIES = API_URL + "getMaterialDeliveries"
API_URL_GET_ROBOT_STATE = API_URL + "getRobotState"
class Seeding:
"""Class used for communicating with seeding api
"""
@staticmethod
def get_heuballen() -> int:
"""Makes the /api/getHeuballen call to the api.
:return: hueballencode as int.
:rtype: int
"""
res = requests.get(API_URL_GET_HEU)
result = json.loads(res.content)
logger.debug(f"Seeding.get_heuballen = {result}, status code = {res.status_code}")
return result["heuballen"]
@staticmethod
def get_logistic_plan() -> List:
"""Makes the /api/getLogisticPlan call to the api.
:return: Json Object and status code as returned by the api.
:rtype: List
"""
res = requests.get(API_URL_GET_LOGISTIC_PLAN)
result = json.loads(res.content)
logger.debug(f"Seeding.get_logistic_plan = {result}, status code = {res.status_code}")
return result
@staticmethod
def get_material_deliveries() -> List:
"""Makes the /api/getMaterialDeliveries call to the api.
:return: Json Object and status code as returned by the api.
:rtype: List
"""
res = requests.get(API_URL_GET_MATERIAL_DELIVERIES)
result = json.loads(res.content)
logger.debug(f"Seeding.get_material_deliveries = {result}, status code = {res.status_code}")
return result
@staticmethod
def get_robot_state() -> Tuple[Dict, int]:
res = requests.get(API_URL_GET_ROBOT_STATE)
result = json.loads(res.content)
logger.debug(f"Seeding.get_robot_state {result}, status code = {res.status_code}")
return result, res.status_code

View file

@ -0,0 +1,113 @@
syntax = "proto3";
package CompLib;
message Header {
string message_type = 1;
}
message Status {
bool successful = 1;
string error_message = 2;
}
message GenericRequest {
Header header = 1;
}
message GenericResponse {
Header header = 1;
Status status = 2;
}
message EncoderReadPositionsRequest {
Header header = 1;
}
message EncoderReadPositionsResponse {
Header header = 1;
Status status = 2;
repeated int32 positions = 3 [packed = true];
}
message EncoderReadVelocitiesRequest {
Header header = 1;
}
message EncoderReadVelocitiesResponse {
Header header = 1;
Status status = 2;
repeated double velocities = 3 [packed = true];
}
message IRSensorsEnableRequest {
Header header = 1;
}
message IRSensorsDisableRequest {
Header header = 1;
}
message IRSensorsReadAllRequest {
Header header = 1;
}
message IRSensorsReadAllResponse {
Header header = 1;
Status status = 2;
repeated uint32 data = 3 [packed = true];
}
message MotorSetPowerRequest {
uint32 port = 1;
double power = 2;
}
message MotorsSetPowerRequest {
Header header = 1;
repeated MotorSetPowerRequest requests = 2;
}
message MotorSetSpeedRequest {
uint32 port = 1;
double speed = 2;
}
message MotorsSetSpeedRequest {
Header header = 1;
repeated MotorSetSpeedRequest requests = 2;
}
message OdometryReadRequest {
Header header = 1;
}
message OdometryReadResponse {
Header header = 1;
Status status = 2;
double x_position = 3;
double y_position = 4;
double orientation = 5;
}
message DriveDistanceRequest {
Header header = 1;
double distance_m = 2;
double velocity_m_s = 3;
}
message TurnDegreesRequest {
Header header = 1;
double angle_degrees = 2;
double velocity_rad_s = 3;
}
message DriveRequest {
Header header = 1;
double linear_velocity_m_s = 2;
double angular_velocity_rad_s = 3;
}
message HealthUpdateRequest {
Header header = 1;
}

View file

@ -0,0 +1,80 @@
import socket
from threading import Lock
import compLib.CompLib_pb2 as CompLib_pb2
class CompLibClient(object):
USE_UNIX_SOCKET = False
UNIX_SOCKET_PATH = "/tmp/compLib"
USE_TCP_SOCKET = False
TCP_SOCKET_HOST = "10.20.5.1"
TCP_SOCKET_PORT = 9090
SOCKET = None
LOCK = Lock()
@staticmethod
def use_unix_socket(socket_path="/tmp/compLib"):
CompLibClient.UNIX_SOCKET_PATH = socket_path
CompLibClient.USE_UNIX_SOCKET = True
CompLibClient.USE_TCP_SOCKET = False
CompLibClient.SOCKET = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
CompLibClient.SOCKET.connect(CompLibClient.UNIX_SOCKET_PATH)
from compLib.HealthCheck import HealthUpdater
HealthUpdater.start()
@staticmethod
def use_tcp_socket(socket_host, socket_port=TCP_SOCKET_PORT):
CompLibClient.TCP_SOCKET_HOST = socket_host
CompLibClient.TCP_SOCKET_PORT = socket_port
CompLibClient.USE_UNIX_SOCKET = False
CompLibClient.USE_TCP_SOCKET = True
CompLibClient.SOCKET = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
CompLibClient.SOCKET.connect((CompLibClient.TCP_SOCKET_HOST, CompLibClient.TCP_SOCKET_PORT))
from compLib.HealthCheck import HealthUpdater
HealthUpdater.start()
@staticmethod
def send(data: bytes, size: int) -> bytes:
with CompLibClient.LOCK:
# if CompLibClient.USE_TCP_SOCKET:
# sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
# sock.connect((CompLibClient.TCP_SOCKET_HOST, CompLibClient.TCP_SOCKET_PORT))
#
# elif CompLibClient.USE_UNIX_SOCKET:
# sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
# sock.connect(CompLibClient.UNIX_SOCKET_PATH)
#
# else:
# return bytes(0)
CompLibClient.SOCKET.sendall(size.to_bytes(1, byteorder='big'))
CompLibClient.SOCKET.sendall(data)
response_size_bytes = CompLibClient.SOCKET.recv(1)
response_size = int.from_bytes(response_size_bytes, byteorder="big")
# print(response_size)
response_bytes = CompLibClient.SOCKET.recv(response_size)
# print(response_bytes.hex())
# print(len(response_bytes))
CompLibClient.check_response(response_bytes)
return response_bytes
@staticmethod
def check_response(response_bytes: bytes) -> bool:
# print(f"{response_bytes}")
res = CompLib_pb2.GenericResponse()
res.ParseFromString(response_bytes)
if res.status.successful:
return True
# TODO: Log error message if unsuccessful
return False

File diff suppressed because it is too large Load diff

View file

@ -0,0 +1,157 @@
import json
import os
import time
from typing import Tuple, List, Dict
import requests as requests
import logging
logger = logging.getLogger("seeding-api")
ch = logging.StreamHandler()
ch.setLevel(logging.DEBUG)
formatter = logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s")
ch.setFormatter(formatter)
logger.addHandler(ch)
RETRY_TIMEOUT = 0.05
# TODO: rethink how the api url is read
API_URL = os.getenv("API_URL", "http://localhost:5000/") + "api/"
api_override = os.getenv("API_FORCE", "")
if api_override != "":
print(f"API_URL was set to {API_URL} but was overwritten with {api_override}")
API_URL = api_override
API_URL_GET_ROBOT_STATE = API_URL + "getRobotState"
API_URL_GET_POS = API_URL + "getPos"
API_URL_GET_OP = API_URL + "getOp"
API_URL_GET_GOAL = API_URL + "getGoal"
API_URL_GET_ITEMS = API_URL + "getItems"
API_URL_GET_SCORES = API_URL + "getScores"
class Position:
"""Datastructure for holding a position
"""
def __init__(self, x, y, degrees):
self.x = x
self.y = y
self.degrees = degrees
def __repr__(self):
return "{x=%s, y=%s, degrees=%s}" % (self.x, self.y, self.degrees)
def __str__(self):
return f"Position(x={round(self.x, 5)}, y={round(self.y, 5)}, degrees={round(self.degrees, 5)})"
def __eq__(self, o: object) -> bool:
if isinstance(o, Position):
return self.x == o.x and self.y == o.y and self.degrees == o.degrees
return False
def __ne__(self, o: object) -> bool:
return not self.__eq__(o)
@staticmethod
def position_from_json(json_str: Dict):
return Position(json_str["x"], json_str["y"], json_str["degrees"])
class DoubleElim:
"""Class used for communicating with double elimination api
"""
@staticmethod
def get_pos() -> Tuple[Position, int]:
"""Makes the /api/getPos call to the api.
:return: A Position object with robot position
:rtype: Tuple[Position, int]
"""
res = requests.get(API_URL_GET_POS)
if res.status_code == 408:
logger.error(f"DoubleElim.get_position timeout. API={API_URL_GET_POS}")
time.sleep(RETRY_TIMEOUT)
return DoubleElim.get_pos()
elif res.status_code == 503:
return Position(0, 0, -1), 503
response = json.loads(res.content)
logger.debug(f"DoubleElim.get_position = {response}, status code = {res.status_code}")
return Position(response["x"], response["y"], response["degrees"]), res.status_code
@staticmethod
def get_opponent() -> Tuple[Position, int]:
"""Makes the /api/getOp call to the api.
:return: A Position object with opponents robot position
:rtype: Tuple[Position, int]
"""
res = requests.get(API_URL_GET_OP)
if res.status_code == 408:
logger.error(f"DoubleElim.get_opponent timeout. API={API_URL_GET_OP}")
time.sleep(RETRY_TIMEOUT)
return DoubleElim.get_opponent()
elif res.status_code == 503:
return Position(0, 0, -1), 503
response = json.loads(res.content)
logger.debug(f"DoubleElim.get_opponent = x:{response}, status code = {res.status_code}")
return Position(response["x"], response["y"], response["degrees"]), res.status_code
@staticmethod
def get_goal() -> Tuple[Position, int]:
"""Makes the /api/getGoal call to the api.
:return: A Position object with x and y coordinates of the goal, rotation is always -1
:rtype: Tuple[Position, int]
"""
res = requests.get(API_URL_GET_GOAL)
if res.status_code == 408:
logger.error(f"DoubleElim.get_goal timeout. API={API_URL_GET_GOAL}")
time.sleep(RETRY_TIMEOUT)
return DoubleElim.get_goal()
elif res.status_code == 503:
return Position(0, 0, -1), 503
response = json.loads(res.content)
logger.debug(f"DoubleElim.get_goal = {response}, status code = {res.status_code}")
return Position(response["x"], response["y"], -1), res.status_code
@staticmethod
def get_items() -> Tuple[List[Dict], int]:
"""Makes the /api/getItems call to the api.
:return: A list will all items currently on the game field. Items are dictionaries that look like: {"id": 0, "x": 0, "y": 0}
:rtype: Tuple[List[Dict], int]
"""
res = requests.get(API_URL_GET_ITEMS)
if res.status_code == 408:
logger.error(f"DoubleElim.get_items timeout. API={API_URL_GET_ITEMS}")
time.sleep(RETRY_TIMEOUT)
return DoubleElim.get_items()
elif res.status_code == 503:
return [], 503
response = json.loads(res.content)
logger.debug(f"DoubleElim.get_items = {response}, status code = {res.status_code}")
return response, res.status_code
@staticmethod
def get_scores() -> Tuple[Dict, int]:
"""Makes the /api/getScores call to the api.
:return: A dictionary with all scores included like: {"self":2,"opponent":0}
:rtype: Tuple[Dict, int]
"""
res = requests.get(API_URL_GET_SCORES)
if res.status_code == 408:
logger.error(f"DoubleElim.get_scores timeout. API={API_URL_GET_SCORES}")
time.sleep(RETRY_TIMEOUT)
return DoubleElim.get_scores()
elif res.status_code == 503:
return {"self": 0, "opponent": 0}, 503
response = json.loads(res.content)
logger.debug(f"DoubleElim.get_scores = {response}, status code = {res.status_code}")
return response, res.status_code

35
client/compLib/Encoder.py Normal file
View file

@ -0,0 +1,35 @@
import compLib.CompLib_pb2 as CompLib_pb2
from compLib.CompLibClient import CompLibClient
class Encoder(object):
"""Class used to read the encoders
"""
@staticmethod
def read_all_positions():
"""Read all encoder positions.
:return: Tuple of all current encoder positions
"""
request = CompLib_pb2.EncoderReadPositionsRequest()
request.header.message_type = request.DESCRIPTOR.full_name
response = CompLib_pb2.EncoderReadPositionsResponse()
response.ParseFromString(CompLibClient.send(request.SerializeToString(), request.ByteSize()))
return tuple(i for i in response.positions)
@staticmethod
def read_all_velocities():
"""Read the velocity of all motors connected.
:return: Tuple of all current motor velocities
"""
request = CompLib_pb2.EncoderReadVelocitiesRequest()
request.header.message_type = request.DESCRIPTOR.full_name
response = CompLib_pb2.EncoderReadVelocitiesResponse()
response.ParseFromString(CompLibClient.send(request.SerializeToString(), request.ByteSize()))
return tuple(i for i in response.velocities)

View file

@ -0,0 +1,23 @@
import threading
import time
import compLib.CompLib_pb2 as CompLib_pb2
from compLib.CompLibClient import CompLibClient
class HealthUpdater(object):
started = False
@staticmethod
def start():
if not HealthUpdater.started:
threading.Thread(target=HealthUpdater.loop, daemon=True).start()
HealthUpdater.started = True
@staticmethod
def loop():
while True:
request = CompLib_pb2.HealthUpdateRequest()
request.header.message_type = request.DESCRIPTOR.full_name
CompLibClient.send(request.SerializeToString(), request.ByteSize())
time.sleep(0.25)

View file

@ -0,0 +1,40 @@
import compLib.CompLib_pb2 as CompLib_pb2
from compLib.CompLibClient import CompLibClient
class IRSensor(object):
"""Access the different IR Sensors of the robot
"""
@staticmethod
def read_all():
"""Read all IR sensors at once.
:return: Array of all current ir sensors
"""
request = CompLib_pb2.IRSensorsReadAllRequest()
request.header.message_type = request.DESCRIPTOR.full_name
response = CompLib_pb2.IRSensorsReadAllResponse()
response.ParseFromString(CompLibClient.send(request.SerializeToString(), request.ByteSize()))
return [i for i in response.data]
@staticmethod
def enable():
"""Turn on all IR emitters
"""
request = CompLib_pb2.IRSensorsEnableRequest()
request.header.message_type = request.DESCRIPTOR.full_name
CompLibClient.send(request.SerializeToString(), request.ByteSize())
@staticmethod
def disable():
"""Turn off all IR emitters
"""
request = CompLib_pb2.IRSensorsDisableRequest()
request.header.message_type = request.DESCRIPTOR.full_name
CompLibClient.send(request.SerializeToString(), request.ByteSize())

117
client/compLib/Motor.py Normal file
View file

@ -0,0 +1,117 @@
import compLib.CompLib_pb2 as CompLib_pb2
from compLib.CompLibClient import CompLibClient
MOTOR_COUNT = 4
class Motor(object):
"""Class used to control the motors
"""
@staticmethod
def power(port: int, percent: float):
"""Set specified motor to percentage power
:param port: Port, which the motor is connected to. 0-3
:param percent: Percentage of max speed. between -100 and 100
:raises: IndexError
"""
if port < 0 or port >= MOTOR_COUNT:
raise IndexError("Invalid Motor port specified!")
if percent < -100 or percent > 100:
raise IndexError("Invalid Motor speed specified! Speed is between -100 and 100 percent!")
request = CompLib_pb2.MotorsSetPowerRequest()
request.header.message_type = request.DESCRIPTOR.full_name
request.port = port
request.power = percent
CompLibClient.send(request.SerializeToString(), request.ByteSize())
@staticmethod
def multiple_power(*arguments: tuple[int, float]):
"""Set specified motors to percentage power
:param arguments: tuples of port, percentage
:raises: IndexError
"""
request = CompLib_pb2.MotorsSetPowerRequest()
request.header.message_type = request.DESCRIPTOR.full_name
for port, percent in arguments:
if port < 0 or port >= MOTOR_COUNT:
raise IndexError("Invalid Motor port specified!")
if percent < -100 or percent > 100:
raise IndexError("Invalid Motor speed specified! Speed is between -100 and 100 percent!")
inner_request = CompLib_pb2.MotorSetPowerRequest()
inner_request.port = port
inner_request.power = percent
request.requests.append(inner_request)
CompLibClient.send(request.SerializeToString(), request.ByteSize())
@staticmethod
def speed(port: int, speed: float):
"""Set specified motor to percentage power
:param port: Port, which the motor is connected to. 0-3
:param speed: Speed at which a motor should turn in RPM
:raises: IndexError
"""
if port < 0 or port >= MOTOR_COUNT:
raise IndexError("Invalid Motor port specified!")
request = CompLib_pb2.MotorsSetSpeedRequest()
request.header.message_type = request.DESCRIPTOR.full_name
request.port = port
request.speed = speed
CompLibClient.send(request.SerializeToString(), request.ByteSize())
@staticmethod
def multiple_speed(*arguments: tuple[int, float]):
"""Set specified motor to percentage power
:param arguments: tuples of port, speed in rpm
:raises: IndexError
"""
request = CompLib_pb2.MotorsSetSpeedRequest()
request.header.message_type = request.DESCRIPTOR.full_name
for port, speed in arguments:
if port < 0 or port >= MOTOR_COUNT:
raise IndexError("Invalid Motor port specified!")
inner_request = CompLib_pb2.MotorSetSpeedRequest()
inner_request.port = port
inner_request.speed = speed
request.requests.append(inner_request)
CompLibClient.send(request.SerializeToString(), request.ByteSize())
# @staticmethod
# def all_off():
# """
# Turns of all motors
# """
# Logging.get_logger().debug(f"Motor.all_off")
#
# for i in range(1, MOTOR_COUNT + 1):
# Motor.active_break(i)
# @staticmethod
# def active_break(port: int):
# """
# Actively break with a specific motor
#
# :param port: Port, which the motor is connected to. 1-4
# """
# Motor.pwm(port, 0, MotorMode.BREAK)

View file

@ -0,0 +1,57 @@
import compLib.CompLib_pb2 as CompLib_pb2
from compLib.CompLibClient import CompLibClient
class Movement(object):
"""High level class to control movement of the robot
"""
@staticmethod
def drive_distance(distance: float, speed: float):
"""
Drive a given distance with a certain speed.
Positive distance and speed with result in forward motion. Everything else will move backwards.
:param distance: Distance in meters
:param speed: Speed in meters per second
:return: None
"""
request = CompLib_pb2.DriveDistanceRequest()
request.header.message_type = request.DESCRIPTOR.full_name
request.distance_m = distance
request.velocity_m_s = speed
response = CompLib_pb2.GenericResponse()
response.ParseFromString(CompLibClient.send(request.SerializeToString(), request.ByteSize()))
@staticmethod
def turn_degrees(degrees: float, speed: float):
"""
Turn specified degrees with a given speed.
Positive degrees and speed with result in counter-clockwise motion. Everything else will be clockwise
:param degrees: Degrees between -180 and 180
:param speed: Speed in radians per second
:return: None
"""
request = CompLib_pb2.TurnDegreesRequest()
request.header.message_type = request.DESCRIPTOR.full_name
request.angle_degrees = degrees
request.velocity_rad_s = speed
response = CompLib_pb2.GenericResponse()
response.ParseFromString(CompLibClient.send(request.SerializeToString(), request.ByteSize()))
@staticmethod
def drive(linear: float, angular: float):
"""
Non-blocking way to perform a linear and angular motion at the same time.
:param linear: Linear speed in meters per second
:param angular: Angular speed in radians per second
:return: None
"""
request = CompLib_pb2.DriveDistanceRequest()
request.header.message_type = request.DESCRIPTOR.full_name
request.linear_velocity_m_s = linear
request.angular_velocity_rad_s = angular
response = CompLib_pb2.GenericResponse()
response.ParseFromString(CompLibClient.send(request.SerializeToString(), request.ByteSize()))

89
client/compLib/Seeding.py Normal file
View file

@ -0,0 +1,89 @@
import numpy as np
# TODO: if set to competition mode, get the seed from the api
def set_random_seed(seed: int):
np.random.seed(seed)
def get_random_number(min: int, max: int):
return np.random.randint(256 ** 4, dtype='<u4', size=1)[0] % (max - min + 1) + min
class Gamestate:
def __str__(self) -> str:
return f"""Seed: {self.seed}
Heu Color: {self.heu_color}
Material Pairs: {self.material_pairs}
Material Zones: {self.materials}
Logistic Plan: {self.logistic_plan}
Logistic Centers: {self.logistic_center}"""
def __init__(self, seed: int):
self.seed = seed
set_random_seed(seed)
self.heu_color = get_random_number(1, 2)
self.materials = [0, 0, 0, 0]
self.material_pairs = []
for i in range(0, 4):
num1 = get_random_number(0, 3)
self.material_pairs.append([num1, num1])
while self.material_pairs[i][1] == num1:
self.material_pairs[i][1] = get_random_number(0, 3)
flat = [item for sublist in self.material_pairs for item in sublist]
for i in range(0, 4):
self.materials[i] = flat.count(i)
self.logistic_plan = [0 for i in range(0, 21)]
self.logistic_center = [[0, 0, 0, 0] for i in range(0, 4)]
visited = [5, 5, 5, 5]
def __logistic_plan_generator(i: int):
drive_to = get_random_number(0, 3)
for j in range(0, 4):
drive_to = (drive_to + j) % 4
if visited[drive_to] <= 0 or drive_to == self.logistic_plan[i - 1]:
continue
self.logistic_plan[i] = drive_to
visited[drive_to] -= 1
finished = True
for k in visited:
if k != 0:
finished = False
if finished and drive_to == 2:
visited[drive_to] += 1
continue
if finished:
return True
if i < len(self.logistic_plan):
if __logistic_plan_generator(i + 1):
return True
visited[drive_to] += 1
return False
self.logistic_plan[0] = 2
visited[2] -= 1
_ = __logistic_plan_generator(1)
self.logistic_plan[-1] = 2
for i in range(0, len(self.logistic_plan) - 1):
self.logistic_center[self.logistic_plan[i]][self.logistic_plan[i + 1]] += 1
self.logistic_plan = [x + 10 for x in self.logistic_plan]
def get_heuballen(self) -> int:
return self.heu_color
def get_logistic_plan(self) -> []:
return self.logistic_plan
def get_material_deliveries(self) -> [[]]:
return self.material_pairs

View file