added eq and nq functions for Position class in de fixed error in de function that would occur if the game has not started yet
157 lines
5.6 KiB
Python
157 lines
5.6 KiB
Python
import json
|
|
import os
|
|
import time
|
|
from typing import Tuple, List, Dict
|
|
|
|
import requests as requests
|
|
import logging
|
|
|
|
logger = logging.getLogger("seeding-api")
|
|
|
|
ch = logging.StreamHandler()
|
|
ch.setLevel(logging.DEBUG)
|
|
formatter = logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s")
|
|
ch.setFormatter(formatter)
|
|
logger.addHandler(ch)
|
|
|
|
RETRY_TIMEOUT = 0.05
|
|
|
|
# TODO: rethink how the api url is read
|
|
API_URL = os.getenv("API_URL", "http://localhost:5000/") + "api/"
|
|
|
|
api_override = os.getenv("API_FORCE", "")
|
|
|
|
if api_override != "":
|
|
print(f"API_URL was set to {API_URL} but was overwritten with {api_override}")
|
|
API_URL = api_override
|
|
|
|
API_URL_GET_ROBOT_STATE = API_URL + "getRobotState"
|
|
|
|
API_URL_GET_POS = API_URL + "getPos"
|
|
API_URL_GET_OP = API_URL + "getOp"
|
|
API_URL_GET_GOAL = API_URL + "getGoal"
|
|
API_URL_GET_ITEMS = API_URL + "getItems"
|
|
API_URL_GET_SCORES = API_URL + "getScores"
|
|
|
|
|
|
class Position:
|
|
"""Datastructure for holding a position
|
|
"""
|
|
|
|
def __init__(self, x, y, degrees):
|
|
self.x = x
|
|
self.y = y
|
|
self.degrees = degrees
|
|
|
|
def __repr__(self):
|
|
return "{x=%s, y=%s, degrees=%s}" % (self.x, self.y, self.degrees)
|
|
|
|
def __str__(self):
|
|
return f"Position(x={round(self.x, 5)}, y={round(self.y, 5)}, degrees={round(self.degrees, 5)})"
|
|
|
|
def __eq__(self, o: object) -> bool:
|
|
if isinstance(o, Position):
|
|
return self.x == o.x and self.y == o.y and self.degrees == o.degrees
|
|
return False
|
|
|
|
def __ne__(self, o: object) -> bool:
|
|
return not self.__eq__(o)
|
|
|
|
@staticmethod
|
|
def position_from_json(json_str: Dict):
|
|
return Position(json_str["x"], json_str["y"], json_str["degrees"])
|
|
|
|
|
|
class DoubleElim:
|
|
"""Class used for communicating with double elimination api
|
|
"""
|
|
|
|
@staticmethod
|
|
def get_pos() -> Tuple[Position, int]:
|
|
"""Makes the /api/getPos call to the api.
|
|
:return: A Position object with robot position
|
|
:rtype: Tuple[Position, int]
|
|
"""
|
|
res = requests.get(API_URL_GET_POS)
|
|
if res.status_code == 408:
|
|
logger.error(f"DoubleElim.get_position timeout. API={API_URL_GET_POS}")
|
|
time.sleep(RETRY_TIMEOUT)
|
|
return DoubleElim.get_pos()
|
|
elif res.status_code == 503:
|
|
return Position(0, 0, -1), 503
|
|
|
|
response = json.loads(res.content)
|
|
logger.debug(f"DoubleElim.get_position = {response}, status code = {res.status_code}")
|
|
return Position(response["x"], response["y"], response["degrees"]), res.status_code
|
|
|
|
@staticmethod
|
|
def get_opponent() -> Tuple[Position, int]:
|
|
"""Makes the /api/getOp call to the api.
|
|
:return: A Position object with opponents robot position
|
|
:rtype: Tuple[Position, int]
|
|
"""
|
|
res = requests.get(API_URL_GET_OP)
|
|
if res.status_code == 408:
|
|
logger.error(f"DoubleElim.get_opponent timeout. API={API_URL_GET_OP}")
|
|
time.sleep(RETRY_TIMEOUT)
|
|
return DoubleElim.get_opponent()
|
|
elif res.status_code == 503:
|
|
return Position(0, 0, -1), 503
|
|
|
|
response = json.loads(res.content)
|
|
logger.debug(f"DoubleElim.get_opponent = x:{response}, status code = {res.status_code}")
|
|
return Position(response["x"], response["y"], response["degrees"]), res.status_code
|
|
|
|
@staticmethod
|
|
def get_goal() -> Tuple[Position, int]:
|
|
"""Makes the /api/getGoal call to the api.
|
|
:return: A Position object with x and y coordinates of the goal, rotation is always -1
|
|
:rtype: Tuple[Position, int]
|
|
"""
|
|
res = requests.get(API_URL_GET_GOAL)
|
|
if res.status_code == 408:
|
|
logger.error(f"DoubleElim.get_goal timeout. API={API_URL_GET_GOAL}")
|
|
time.sleep(RETRY_TIMEOUT)
|
|
return DoubleElim.get_goal()
|
|
elif res.status_code == 503:
|
|
return Position(0, 0, -1), 503
|
|
|
|
response = json.loads(res.content)
|
|
logger.debug(f"DoubleElim.get_goal = {response}, status code = {res.status_code}")
|
|
return Position(response["x"], response["y"], -1), res.status_code
|
|
|
|
@staticmethod
|
|
def get_items() -> Tuple[List[Dict], int]:
|
|
"""Makes the /api/getItems call to the api.
|
|
:return: A list will all items currently on the game field. Items are dictionaries that look like: {"id": 0, "x": 0, "y": 0}
|
|
:rtype: Tuple[List[Dict], int]
|
|
"""
|
|
res = requests.get(API_URL_GET_ITEMS)
|
|
if res.status_code == 408:
|
|
logger.error(f"DoubleElim.get_items timeout. API={API_URL_GET_ITEMS}")
|
|
time.sleep(RETRY_TIMEOUT)
|
|
return DoubleElim.get_items()
|
|
elif res.status_code == 503:
|
|
return [], 503
|
|
|
|
response = json.loads(res.content)
|
|
logger.debug(f"DoubleElim.get_items = {response}, status code = {res.status_code}")
|
|
return response, res.status_code
|
|
|
|
@staticmethod
|
|
def get_scores() -> Tuple[Dict, int]:
|
|
"""Makes the /api/getScores call to the api.
|
|
:return: A dictionary with all scores included like: {"self":2,"opponent":0}
|
|
:rtype: Tuple[Dict, int]
|
|
"""
|
|
res = requests.get(API_URL_GET_SCORES)
|
|
if res.status_code == 408:
|
|
logger.error(f"DoubleElim.get_scores timeout. API={API_URL_GET_SCORES}")
|
|
time.sleep(RETRY_TIMEOUT)
|
|
return DoubleElim.get_scores()
|
|
elif res.status_code == 503:
|
|
return {"self": 0, "opponent": 0}, 503
|
|
|
|
response = json.loads(res.content)
|
|
logger.debug(f"DoubleElim.get_scores = {response}, status code = {res.status_code}")
|
|
return response, res.status_code
|