import json import os import time from typing import Tuple, List, Dict import requests as requests import logging logger = logging.getLogger("seeding-api") ch = logging.StreamHandler() ch.setLevel(logging.DEBUG) formatter = logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s") ch.setFormatter(formatter) logger.addHandler(ch) RETRY_TIMEOUT = 0.05 # TODO: rethink how the api url is read API_URL = os.getenv("API_URL", "http://localhost:5000/") + "api/" api_override = os.getenv("API_FORCE", "") if api_override != "": print(f"API_URL was set to {API_URL} but was overwritten with {api_override}") API_URL = api_override API_URL_GET_ROBOT_STATE = API_URL + "getRobotState" API_URL_GET_POS = API_URL + "getPos" API_URL_GET_OP = API_URL + "getOp" API_URL_GET_GOAL = API_URL + "getGoal" API_URL_GET_ITEMS = API_URL + "getItems" API_URL_GET_SCORES = API_URL + "getScores" class Position: """Datastructure for holding a position """ def __init__(self, x, y, degrees): self.x = x self.y = y self.degrees = degrees def __repr__(self): return "{x=%s, y=%s, degrees=%s}" % (self.x, self.y, self.degrees) def __str__(self): return f"Position(x={round(self.x, 5)}, y={round(self.y, 5)}, degrees={round(self.degrees, 5)})" def __eq__(self, o: object) -> bool: if isinstance(o, Position): return self.x == o.x and self.y == o.y and self.degrees == o.degrees return False def __ne__(self, o: object) -> bool: return not self.__eq__(o) @staticmethod def position_from_json(json_str: Dict): return Position(json_str["x"], json_str["y"], json_str["degrees"]) class DoubleElim: """Class used for communicating with double elimination api """ @staticmethod def get_pos() -> Tuple[Position, int]: """Makes the /api/getPos call to the api. :return: A Position object with robot position :rtype: Tuple[Position, int] """ res = requests.get(API_URL_GET_POS) if res.status_code == 408: logger.error(f"DoubleElim.get_position timeout. API={API_URL_GET_POS}") time.sleep(RETRY_TIMEOUT) return DoubleElim.get_pos() elif res.status_code == 503: return Position(0, 0, -1), 503 response = json.loads(res.content) logger.debug(f"DoubleElim.get_position = {response}, status code = {res.status_code}") return Position(response["x"], response["y"], response["degrees"]), res.status_code @staticmethod def get_opponent() -> Tuple[Position, int]: """Makes the /api/getOp call to the api. :return: A Position object with opponents robot position :rtype: Tuple[Position, int] """ res = requests.get(API_URL_GET_OP) if res.status_code == 408: logger.error(f"DoubleElim.get_opponent timeout. API={API_URL_GET_OP}") time.sleep(RETRY_TIMEOUT) return DoubleElim.get_opponent() elif res.status_code == 503: return Position(0, 0, -1), 503 response = json.loads(res.content) logger.debug(f"DoubleElim.get_opponent = x:{response}, status code = {res.status_code}") return Position(response["x"], response["y"], response["degrees"]), res.status_code @staticmethod def get_goal() -> Tuple[Position, int]: """Makes the /api/getGoal call to the api. :return: A Position object with x and y coordinates of the goal, rotation is always -1 :rtype: Tuple[Position, int] """ res = requests.get(API_URL_GET_GOAL) if res.status_code == 408: logger.error(f"DoubleElim.get_goal timeout. API={API_URL_GET_GOAL}") time.sleep(RETRY_TIMEOUT) return DoubleElim.get_goal() elif res.status_code == 503: return Position(0, 0, -1), 503 response = json.loads(res.content) logger.debug(f"DoubleElim.get_goal = {response}, status code = {res.status_code}") return Position(response["x"], response["y"], -1), res.status_code @staticmethod def get_items() -> Tuple[List[Dict], int]: """Makes the /api/getItems call to the api. :return: A list will all items currently on the game field. Items are dictionaries that look like: {"id": 0, "x": 0, "y": 0} :rtype: Tuple[List[Dict], int] """ res = requests.get(API_URL_GET_ITEMS) if res.status_code == 408: logger.error(f"DoubleElim.get_items timeout. API={API_URL_GET_ITEMS}") time.sleep(RETRY_TIMEOUT) return DoubleElim.get_items() elif res.status_code == 503: return [], 503 response = json.loads(res.content) logger.debug(f"DoubleElim.get_items = {response}, status code = {res.status_code}") return response, res.status_code @staticmethod def get_scores() -> Tuple[Dict, int]: """Makes the /api/getScores call to the api. :return: A dictionary with all scores included like: {"self":2,"opponent":0} :rtype: Tuple[Dict, int] """ res = requests.get(API_URL_GET_SCORES) if res.status_code == 408: logger.error(f"DoubleElim.get_scores timeout. API={API_URL_GET_SCORES}") time.sleep(RETRY_TIMEOUT) return DoubleElim.get_scores() elif res.status_code == 503: return {"self": 0, "opponent": 0}, 503 response = json.loads(res.content) logger.debug(f"DoubleElim.get_scores = {response}, status code = {res.status_code}") return response, res.status_code