changed api client according to specification

bug fixes in VisionDaemon
This commit is contained in:
Joel 2021-02-10 03:22:55 +01:00
parent ce6b06544a
commit 5e46e8068a
No known key found for this signature in database
GPG key ID: BDDDBECD0808290E
6 changed files with 143 additions and 63 deletions

View file

@ -34,7 +34,7 @@ fpm -s python --python-bin python3 --python-pip pip3 --python-package-name-prefi
-d "python3-pigpio" \ -d "python3-pigpio" \
-d "python3-numpy" \ -d "python3-numpy" \
-d "ffmpeg" \ -d "ffmpeg" \
-v 0.0.4-19 -t deb setup.py -v 0.0.4-20 -t deb setup.py
# --deb-changelog changelog \ # --deb-changelog changelog \
# --deb-upstream-changelog changelog \ # --deb-upstream-changelog changelog \

View file

@ -1,3 +1,5 @@
from typing import Dict, Tuple
import requests import requests
import json import json
import os import os
@ -19,37 +21,39 @@ class Seeding:
""" """
@staticmethod @staticmethod
def get_park() -> int: def get_park() -> Tuple[Dict, int]:
"""Get a parkingsapce from the api. """Get a parkingsapce from the api.
:return: An int between 0 and 3 :return: Json Object and status code as returned by the api.
:rtype: int :rtype: Tuple[Dict, int]
""" """
result = json.loads(requests.get(API_URL_GET_PARK).content)["id"] res = requests.get(API_URL_GET_PARK)
Logging.get_logger().debug(f"Seeding.get_park = {result}") result = json.loads(res.content)
return result Logging.get_logger().debug(f"Seeding.get_park = {result}, status code = {res.status_code}")
return result, res.status_code
@staticmethod @staticmethod
def pay_park() -> bool: def pay_park() -> int:
"""Pay for parking. """Pay for parking.
:return: True if successful, False if not successful :return: Status code as returned by the api.
:rtype: bool :rtype: int
""" """
result = requests.get(API_URL_PAY_PARK).status_code == 200 result = requests.get(API_URL_PAY_PARK).status_code
Logging.get_logger().debug(f"Seeding.pay_park = {result}") Logging.get_logger().debug(f"Seeding.pay_park = {result}")
return result return result
@staticmethod @staticmethod
def simon_says() -> int: def simon_says() -> Tuple[Dict, int]:
"""Get next simon says zone from the api. """Get next simon says zone from the api.
:return: An int between 0 and 3 or -1 after making this request 5 times. :return: Json Object and status code as returned by the api.
:rtype: int :rtype: Tuple[Dict, int]
""" """
result = json.loads(requests.get(API_URL_SIMON_SAYS).content)["id"] res = requests.get(API_URL_SIMON_SAYS)
Logging.get_logger().debug(f"Seeding.simon_says = {result}") result = json.loads(res.content)
return result Logging.get_logger().debug(f"Seeding.simon_says = {result}, status code = {res.status_code}")
return result, res.status_code
class Position: class Position:

View file

@ -8,11 +8,13 @@ from LogstashLogging import logstash_logger
from threading import Thread from threading import Thread
import logging import logging
RUN_CHECK = False
try: try:
from Battery import Battery from Battery import Battery
from Buzzer import Buzzer from Buzzer import Buzzer
RUN_CHECK = True
except Exception as e: except Exception as e:
logstash_logger.error("unable to import battery or buzzer in daemon") logstash_logger.error(f"unable to import battery or buzzer in daemon -> {str(e)}")
__run = """raspivid -t 0 -b 5000000 -w 1280 -h 720 -fps 30 -n -o - | gst-launch-1.0 fdsrc ! video/x-h264,width=1280,height=720,framerate=30/1,noise-reduction=1,profile=high,stream-format=byte-stream ! h264parse ! queue ! flvmux streamable=true ! rtmpsink location=\"rtmp://localhost/live/stream\"""" __run = """raspivid -t 0 -b 5000000 -w 1280 -h 720 -fps 30 -n -o - | gst-launch-1.0 fdsrc ! video/x-h264,width=1280,height=720,framerate=30/1,noise-reduction=1,profile=high,stream-format=byte-stream ! h264parse ! queue ! flvmux streamable=true ! rtmpsink location=\"rtmp://localhost/live/stream\""""
@ -43,12 +45,13 @@ if __name__ == '__main__':
logstash_logger.info("starting battery checker") logstash_logger.info("starting battery checker")
battery_checker = None battery_checker = None
try: if RUN_CHECK:
battery_checker = threading.Thread(target=check_battery) try:
battery_checker.start() battery_checker = threading.Thread(target=check_battery)
logstash_logger.info("starting battery checker - DONE") battery_checker.start()
except Exception as e: logstash_logger.info("starting battery checker - DONE")
logstash_logger.error(f"could not start battery checker -> {str(e)}") except Exception as e:
logstash_logger.error(f"could not start battery checker -> {str(e)}")
if STREAM_RASPI: if STREAM_RASPI:
logstash_logger.info("starting gstreamer background process") logstash_logger.info("starting gstreamer background process")
@ -59,5 +62,5 @@ if __name__ == '__main__':
if battery_checker is not None: if battery_checker is not None:
battery_checker.join() battery_checker.join()
else: else:
logstash_logger.info("battery checker failed to initialize.. sleeping for a day") logstash_logger.info("battery checker failed to initialize.. sleeping for a day, good night")
time.sleep(60*60*24) time.sleep(60*60*24)

View file

@ -1,4 +1,4 @@
__version__ = "0.0.4-19" __version__ = "0.0.4-20"
import compLib.LogstashLogging import compLib.LogstashLogging
import logging import logging

View file

@ -12,7 +12,7 @@ else:
setuptools.setup( setuptools.setup(
name="complib", name="complib",
version="0.0.4-19", version="0.0.4-20",
author="F-WuTs", author="F-WuTs",
author_email="--", author_email="--",
description="", description="",

145
test.py
View file

@ -20,68 +20,141 @@ from multiprocessing import Process
from compLib import LogstashLogging from compLib import LogstashLogging
from compLib import Api from compLib import Api
class MyTestCase(unittest.TestCase):
def test_get_park(self):
ret = Api.Seeding.get_park()
assert type(ret) is int
got = []
while len(got) != 4:
ret = Api.Seeding.get_park()
assert 0 <= ret < 4
if ret not in got:
got.append(ret)
assert True def change_api_state(park_id: int = 0, in_get_park: bool = False, was_in_park: bool = False,
simon_says_ids: list = [0, 0, 0, 0, 0],
in_simon_zone: bool = False):
data = {
"parkId": park_id,
"inGetPark": in_get_park,
"wasInPark": was_in_park,
"simonSaysIDs": simon_says_ids,
"inSimonZone": in_simon_zone
}
requests.post("http://localhost:5000/api/api/updateState", json=data)
class TestApiServer(unittest.TestCase):
def test_get_park(self):
ret, code = Api.Seeding.get_park()
id = ret["id"]
assert type(ret) is dict
assert type(id) is int
assert id == -1
assert type(code) is int
assert code == 403
change_api_state(in_get_park=True)
ret, code = Api.Seeding.get_park()
id = ret["id"]
assert type(ret) is dict
assert type(id) is int
assert 0 <= id < 4
assert type(code) is int
assert code == 200
def test_pay_park(self): def test_pay_park(self):
ret = Api.Seeding.pay_park() ret = Api.Seeding.pay_park()
assert type(ret) is bool assert type(ret) is int
assert ret == 402
change_api_state(was_in_park=True)
ret = Api.Seeding.pay_park()
assert type(ret) is int
assert ret == 402
change_api_state(was_in_park=True, in_get_park=True)
ret = Api.Seeding.pay_park()
assert type(ret) is int
assert ret == 201
def test_simon_says(self): def test_simon_says(self):
ret = Api.Seeding.simon_says() change_api_state(in_simon_zone=False)
assert type(ret) is int ret, code = Api.Seeding.simon_says()
got = [] id = ret["id"]
while len(got) != 4: assert type(ret) is dict
ret = Api.Seeding.simon_says() assert type(id) is int
if ret != -1: assert type(code) is int
assert 0 <= ret < 4 assert code == 403
if ret not in got: assert id == -1
got.append(ret)
else:
self.resetApi()
assert True got = []
while len(got) != 5:
change_api_state(in_simon_zone=True)
ret, code = Api.Seeding.simon_says()
id = ret["id"]
assert type(ret) is dict
assert type(id) is int
assert type(code) is int
assert code == 200
assert id <= 0 < 4
got.append(ret)
print(got)
change_api_state(in_simon_zone=True)
ret, code = Api.Seeding.simon_says()
id = ret["id"]
assert type(ret) is dict
assert type(id) is int
assert type(code) is int
assert code == 200
assert id == -1
change_api_state(in_simon_zone=False)
ret, code = Api.Seeding.simon_says()
id = ret["id"]
assert type(ret) is dict
assert type(id) is int
assert type(code) is int
assert code == 403
assert id == -1
def test_simon_says_iterations(self): def test_simon_says_iterations(self):
for i in range(0, 4): for i in range(0, 5):
ret = Api.Seeding.simon_says() change_api_state(in_simon_zone=True)
assert type(ret) is int ret, status = Api.Seeding.simon_says()
assert 0 <= ret < 4 assert type(ret) is dict
assert type(status) is int
assert type(ret["id"]) is int
assert 0 <= ret["id"] < 4
# after 4 iterations the api should only return -1 from now # after 4 iterations the api should only return -1 from now
for i in range(0, 10): for i in range(0, 10):
assert Api.Seeding.simon_says() == -1 assert Api.Seeding.simon_says()[0]["id"] == -1
# after api reset this test should work again # after api reset this test should work again
self.resetApi() self.resetApi()
for i in range(0, 4): for i in range(0, 4):
ret = Api.Seeding.simon_says() change_api_state(in_simon_zone=True)
assert type(ret) is int ret, status = Api.Seeding.simon_says()
assert 0 <= ret < 4 assert type(ret) is dict
assert type(status) is int
assert type(ret["id"]) is int
assert 0 <= ret["id"] < 4
for i in range(0, 10): for i in range(0, 10):
assert Api.Seeding.simon_says() == -1 assert Api.Seeding.simon_says()[0]["id"] == -1
def test_simon_says_non_reapet(self): def test_simon_says_non_repeat(self):
""" """
Note: this test will fail if testing against the local api as the
configuration is to return zeros if not otherwise specified.
Checks if simons says does not send the robot to the Checks if simons says does not send the robot to the
same position again. same position again.
""" """
last_pos = Api.Seeding.simon_says() last_pos, status = Api.Seeding.simon_says()
last_pos = last_pos["id"]
for i in range(0, 100): for i in range(0, 100):
next_pos = Api.Seeding.simon_says() change_api_state(in_simon_zone=True)
next_pos, status = Api.Seeding.simon_says()
next_pos = next_pos["id"]
if next_pos == -1: if next_pos == -1:
last_pos = -1 # state is reset, so reset here as well last_pos = -1 # state is reset, so reset here as well
self.resetApi() self.resetApi()