started working on seeding gamestate and double elimination api

This commit is contained in:
Joel Klimont 2022-08-28 15:59:11 +02:00
parent d4c49a0a51
commit 72c5af8b56
2 changed files with 145 additions and 0 deletions

View file

@ -0,0 +1,143 @@
import json
import os
import time
from typing import Tuple, List, Dict
import requests as requests
import logging
logger = logging.getLogger("seeding-api")
ch = logging.StreamHandler()
ch.setLevel(logging.DEBUG)
formatter = logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s")
ch.setFormatter(formatter)
logger.addHandler(ch)
RETRY_TIMEOUT = 0.05
# TODO: rethink how the api url is read
API_URL = os.getenv("API_URL", "http://localhost:5000/") + "api/"
api_override = os.getenv("API_FORCE", "")
if api_override != "":
print(f"API_URL was set to {API_URL} but was overwritten with {api_override}")
API_URL = api_override
API_URL_GET_ROBOT_STATE = API_URL + "getRobotState"
API_URL_GET_POS = API_URL + "getPos"
API_URL_GET_OP = API_URL + "getOp"
API_URL_GET_GOAL = API_URL + "getGoal"
API_URL_GET_ITEMS = API_URL + "getItems"
API_URL_GET_SCORES = API_URL + "getScores"
class Position:
"""Datastructure for holding a position
"""
def __init__(self, x, y, degrees):
self.x = x
self.y = y
self.degrees = degrees
def __repr__(self):
return "{x=%s, y=%s, degrees=%s}" % self.x, self.y, self.degrees
def __str__(self):
return f"Position(x={round(self.x, 5)}, y={round(self.y, 5)}, degrees={round(self.degrees, 5)})"
@staticmethod
def position_from_json(json_str: Dict):
return Position(json_str["x"], json_str["y"], json_str["degrees"])
class DoubleElim:
"""Class used for communicating with double elimination api
"""
@staticmethod
def get_position() -> Tuple[Position, int]:
"""Makes the /api/getPos call to the api.
:return: A Position object with robot position
:rtype: Tuple[Position, int]
"""
res = requests.get(API_URL_GET_POS)
if res.status_code == 408:
logger.error(f"DoubleElim.get_position timeout. API={API_URL_GET_POS}")
time.sleep(RETRY_TIMEOUT)
return DoubleElim.get_position()
response = json.loads(res.content)
logger.debug(f"DoubleElim.get_position = {response}, status code = {res.status_code}")
return Position(response["x"], response["y"], response["degrees"]), res.status_code
@staticmethod
def get_opponent() -> Tuple[Position, int]:
"""Makes the /api/getOp call to the api.
:return: A Position object with opponents robot position
:rtype: Tuple[Position, int]
"""
res = requests.get(API_URL_GET_OP)
if res.status_code == 408:
logger.error(f"DoubleElim.get_opponent timeout. API={API_URL_GET_OP}")
time.sleep(RETRY_TIMEOUT)
return DoubleElim.get_opponent()
response = json.loads(res.content)
logger.debug(f"DoubleElim.get_opponent = x:{response}, status code = {res.status_code}")
return Position(response["x"], response["y"], response["degrees"]), res.status_code
@staticmethod
def get_goal() -> Tuple[Position, int]:
"""Makes the /api/getGoal call to the api.
:return: A Position object with x and y coordinates of the goal, rotation is always -1
:rtype: Tuple[Position, int]
"""
res = requests.get(API_URL_GET_GOAL)
if res.status_code == 408:
logger.error(f"DoubleElim.get_goal timeout. API={API_URL_GET_GOAL}")
time.sleep(RETRY_TIMEOUT)
return DoubleElim.get_goal()
response = json.loads(res.content)
logger.debug(f"DoubleElim.get_goal = {response}, status code = {res.status_code}")
return Position(response["x"], response["y"], -1), res.status_code
@staticmethod
def get_items() -> Tuple[List[Dict], int]:
"""Makes the /api/getItems call to the api.
:return: A list will all items currently on the game field. Items are dictionaries that look like: {"id": 0, "x": 0, "y": 0}
:rtype: Tuple[List[Dict], int]
"""
res = requests.get(API_URL_GET_ITEMS)
if res.status_code == 408:
logger.error(f"DoubleElim.get_items timeout. API={API_URL_GET_ITEMS}")
time.sleep(RETRY_TIMEOUT)
return DoubleElim.get_items()
elif res.status_code == 503:
return [], 503
response = json.loads(res.content)
logger.debug(f"DoubleElim.get_items = {response}, status code = {res.status_code}")
return response, res.status_code
@staticmethod
def get_scores() -> Tuple[Dict, int]:
"""Makes the /api/getScores call to the api.
:return: A dictionary with all scores included like: {"self":2,"opponent":0}
:rtype: Tuple[Dict, int]
"""
res = requests.get(API_URL_GET_SCORES)
if res.status_code == 408:
logger.error(f"DoubleElim.get_scores timeout. API={API_URL_GET_SCORES}")
time.sleep(RETRY_TIMEOUT)
return DoubleElim.get_scores()
elif res.status_code == 503:
return {"self": 0, "opponent": 0}, 503
response = json.loads(res.content)
logger.debug(f"DoubleElim.get_scores = {response}, status code = {res.status_code}")
return response, res.status_code

View file

@ -0,0 +1,2 @@
# TODO: add gamestate etc