151 lines
5.4 KiB
Python
151 lines
5.4 KiB
Python
import json
|
|
import os
|
|
import time
|
|
from typing import Tuple, List, Dict
|
|
|
|
import requests
|
|
import logging
|
|
|
|
logger = logging.getLogger("complib-logger")
|
|
|
|
RETRY_TIMEOUT = 0.05
|
|
|
|
# TODO: rethink how the api url is read
|
|
API_URL = os.getenv("API_URL", "http://localhost:5000/") + "api/"
|
|
|
|
api_override = os.getenv("API_FORCE", "")
|
|
|
|
if api_override != "":
|
|
logger.warning(f"API_URL was set to {API_URL} but was overwritten with {api_override}")
|
|
API_URL = api_override
|
|
|
|
API_URL_GET_ROBOT_STATE = API_URL + "getRobotState"
|
|
|
|
API_URL_GET_POS = API_URL + "getPos"
|
|
API_URL_GET_OP = API_URL + "getOp"
|
|
API_URL_GET_GOAL = API_URL + "getGoal"
|
|
API_URL_GET_ITEMS = API_URL + "getItems"
|
|
API_URL_GET_SCORES = API_URL + "getScores"
|
|
|
|
|
|
class Position:
|
|
"""Datastructure for holding a position
|
|
"""
|
|
|
|
def __init__(self, x, y, degrees):
|
|
self.x = x
|
|
self.y = y
|
|
self.degrees = degrees
|
|
|
|
def __repr__(self):
|
|
return "{x=%s, y=%s, degrees=%s}" % (self.x, self.y, self.degrees)
|
|
|
|
def __str__(self):
|
|
return f"Position(x={round(self.x, 5)}, y={round(self.y, 5)}, degrees={round(self.degrees, 5)})"
|
|
|
|
def __eq__(self, o: object) -> bool:
|
|
if isinstance(o, Position):
|
|
return self.x == o.x and self.y == o.y and self.degrees == o.degrees
|
|
return False
|
|
|
|
def __ne__(self, o: object) -> bool:
|
|
return not self.__eq__(o)
|
|
|
|
@staticmethod
|
|
def position_from_json(json_str: Dict):
|
|
return Position(json_str["x"], json_str["y"], json_str["degrees"])
|
|
|
|
|
|
class DoubleElim:
|
|
"""Class used for communicating with double elimination api
|
|
"""
|
|
|
|
@staticmethod
|
|
def get_pos() -> Tuple[Position, int]:
|
|
"""Makes the /api/getPos call to the api.
|
|
:return: A Position object with robot position
|
|
:rtype: Tuple[Position, int]
|
|
"""
|
|
res = requests.get(API_URL_GET_POS)
|
|
if res.status_code == 408:
|
|
logger.error(f"DoubleElim.get_position timeout. API={API_URL_GET_POS}")
|
|
time.sleep(RETRY_TIMEOUT)
|
|
return DoubleElim.get_pos()
|
|
elif res.status_code == 503:
|
|
return Position(0, 0, -1), 503
|
|
|
|
response = json.loads(res.content)
|
|
logger.debug(f"DoubleElim.get_position = {response}, status code = {res.status_code}")
|
|
return Position(response["x"], response["y"], response["degrees"]), res.status_code
|
|
|
|
@staticmethod
|
|
def get_opponent() -> Tuple[Position, int]:
|
|
"""Makes the /api/getOp call to the api.
|
|
:return: A Position object with opponents robot position
|
|
:rtype: Tuple[Position, int]
|
|
"""
|
|
res = requests.get(API_URL_GET_OP)
|
|
if res.status_code == 408:
|
|
logger.error(f"DoubleElim.get_opponent timeout. API={API_URL_GET_OP}")
|
|
time.sleep(RETRY_TIMEOUT)
|
|
return DoubleElim.get_opponent()
|
|
elif res.status_code == 503:
|
|
return Position(0, 0, -1), 503
|
|
|
|
response = json.loads(res.content)
|
|
logger.debug(f"DoubleElim.get_opponent = x:{response}, status code = {res.status_code}")
|
|
return Position(response["x"], response["y"], response["degrees"]), res.status_code
|
|
|
|
@staticmethod
|
|
def get_goal() -> Tuple[Position, int]:
|
|
"""Makes the /api/getGoal call to the api.
|
|
:return: A Position object with x and y coordinates of the goal, rotation is always -1
|
|
:rtype: Tuple[Position, int]
|
|
"""
|
|
res = requests.get(API_URL_GET_GOAL)
|
|
if res.status_code == 408:
|
|
logger.error(f"DoubleElim.get_goal timeout. API={API_URL_GET_GOAL}")
|
|
time.sleep(RETRY_TIMEOUT)
|
|
return DoubleElim.get_goal()
|
|
elif res.status_code == 503:
|
|
return Position(0, 0, -1), 503
|
|
|
|
response = json.loads(res.content)
|
|
logger.debug(f"DoubleElim.get_goal = {response}, status code = {res.status_code}")
|
|
return Position(response["x"], response["y"], -1), res.status_code
|
|
|
|
@staticmethod
|
|
def get_items() -> Tuple[List[Dict], int]:
|
|
"""Makes the /api/getItems call to the api.
|
|
:return: A list will all items currently on the game field. Items are dictionaries that look like: {"id": 0, "x": 0, "y": 0}
|
|
:rtype: Tuple[List[Dict], int]
|
|
"""
|
|
res = requests.get(API_URL_GET_ITEMS)
|
|
if res.status_code == 408:
|
|
logger.error(f"DoubleElim.get_items timeout. API={API_URL_GET_ITEMS}")
|
|
time.sleep(RETRY_TIMEOUT)
|
|
return DoubleElim.get_items()
|
|
elif res.status_code == 503:
|
|
return [], 503
|
|
|
|
response = json.loads(res.content)
|
|
logger.debug(f"DoubleElim.get_items = {response}, status code = {res.status_code}")
|
|
return response, res.status_code
|
|
|
|
@staticmethod
|
|
def get_scores() -> Tuple[Dict, int]:
|
|
"""Makes the /api/getScores call to the api.
|
|
:return: A dictionary with all scores included like: {"self":2,"opponent":0}
|
|
:rtype: Tuple[Dict, int]
|
|
"""
|
|
res = requests.get(API_URL_GET_SCORES)
|
|
if res.status_code == 408:
|
|
logger.error(f"DoubleElim.get_scores timeout. API={API_URL_GET_SCORES}")
|
|
time.sleep(RETRY_TIMEOUT)
|
|
return DoubleElim.get_scores()
|
|
elif res.status_code == 503:
|
|
return {"self": 0, "opponent": 0}, 503
|
|
|
|
response = json.loads(res.content)
|
|
logger.debug(f"DoubleElim.get_scores = {response}, status code = {res.status_code}")
|
|
return response, res.status_code
|