import sys from typing import Any, Tuple, List # build image is somehow different from raspberry image? opencv-python is installed to a directory which is not in the pythonpath by default.... sys.path.append("/usr/lib/python3.9/site-packages") import logging import os import queue import threading import cv2 from flask import Flask, Response logging.basicConfig(format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', level=logging.INFO) SERVE_VIDEO = os.getenv("SERVER_SRC", "/live") BUILDING_DOCS = os.getenv("BUILDING_DOCS", "false") HTML = """ Opencv Output

Opencv Output

""" # it would be better to use jinja2 here, but I don't want to blow up the package dependencies... HTML = HTML.replace("{{ VIDEO_DST }}", SERVE_VIDEO) class Marker: def __init__(self, id: int, x: float, y: float): self.id: int = id self.x: float = x self.y: float = y def __str__(self) -> str: return f"Marker ID: {self.id}, position: {self.x} x, {self.y} y" def __repr__(self) -> str: return str({"id": self.id, "x": self.x, "y": self.y}) class Camera: class __Webserver: def __init__(self, camera): self.app = Flask(__name__) self.__camera = camera self.__thread = threading.Thread(target=self.__start_flask, daemon=True) self.__thread.start() @self.app.route("/live") def __video_feed(): """ Define route for serving jpeg stream. :return: Return the response generated along with the specific media. """ return Response(self.__camera._newest_frame_generator(), mimetype="multipart/x-mixed-replace; boundary=frame") @self.app.route("/") def __index(): """ Define route for serving a static http site to view the stream. :return: Static html page where the video stream of Opencv can be viewed. """ return HTML def __start_flask(self): """ Function for running flask server in a thread. :return: """ logging.getLogger("complib-logger").info("starting flask server") self.app.run(host="0.0.0.0", port=9898, debug=True, threaded=True, use_reloader=False) class __NoBufferVideoCapture: def __init__(self, cam): self.cap = cv2.VideoCapture(cam) self.cap.set(3, 640) self.cap.set(4, 480) self.q = queue.Queue(maxsize=3) self.stopped = False self.t = threading.Thread(target=self._reader, daemon=True) self.t.start() def _reader(self): while not self.stopped: ret, frame = self.cap.read() if not ret: continue if self.q.full(): try: self.q.get_nowait() except queue.Empty: pass self.q.put(frame) def read(self): return self.q.get() def stop(self): self.stopped = True self.t.join() def __init__(self): self.__logger = logging.getLogger("complib-logger") self.__logger.info("capturing rtmp stream is disabled in this version") self.__camera_stream = self.__NoBufferVideoCapture(-1) self.__newest_frame = None self.__lock = threading.Lock() self.__webserver = self.__Webserver(self) self.aruco_dict = cv2.aruco.Dictionary_get(cv2.aruco.DICT_6X6_50) self.aruco_params = cv2.aruco.DetectorParameters_create() self.__logger.info("Initialized vision") def get_frame(self) -> Any: """ Die Funktion gibt das neuste Bild, welches die Kamera aufgenommen, hat zurück. :return: Ein "opencv image frame" """ img16 = self.__camera_stream.read() return img16 def detect_markers(self, image) -> Any: """ Funktion um die ArUco Marker in einem Bild zu erkennen. :param image: Bild, welches die Kamera aufgenommen hat. :return: Gibt drei Variablen zurueck. Erstens eine Liste an Postionen der "Ecken" der erkannten Markern. Zweitens eine Liste an IDs der erkannten Markern und dritten noch Debug Informationen (diese können ignoriert werden). """ return cv2.aruco.detectMarkers(image, self.aruco_dict, parameters=self.aruco_params) def detect_markers_midpoint(self, image) -> Tuple[List[Marker], Any]: """ Funktion um die ArUco Marker in einem Bild zu erkennen, einzuzeichnen und den Mittelpunkt der Marker auszurechnen. :param image: Bild, welches die Kamera aufgenommen hat. :return: Gibt zwei Variablen zurueck. Erstens eine Liste an "Markern" und zweitens das Bild mit den eigezeichneten Marken. :rtype: Tuple[List[Marker], Any] """ (corners, ids, rejected) = self.detect_markers(image) self.draw_markers(image, corners, ids) res = [] for i in range(0, len(corners)): x = sum([point[0] for point in corners[i][0]]) / 4 y = sum([point[1] for point in corners[i][0]]) / 4 res.append(Marker(ids[i][0], x, y)) return res, image def draw_markers(self, image, corners, ids) -> Any: """ Zeichnet die erkannten Markern mit ihren IDs in das Bild. :param image: Original Bild, in dem die Marker erkannt wurden. :param corners: List der Positionen der Ecken der erkannten Marker. :param ids: IDs der erkannten Markern. :return: Neues Bild mit den eigezeichneten Markern. """ return cv2.aruco.drawDetectedMarkers(image, corners, ids) def publish_frame(self, image): """ Sendet das Bild, welches der Funktion übergeben wird, an den Webserver, damit es der Nutzer in seinem Browser ansehen kann. :param image: Opencv Bild, welches dem Nutzer angezeigt werden soll. :return: None """ with self.__lock: if image is not None: self.__newest_frame = image.copy() def _newest_frame_generator(self): """ Private generator which is called directly from flask server. :return: Yields image/jpeg encoded frames published from publish_frame function. """ while True: # use a buffer frame to copy the newest frame with lock and then freeing it immediately buffer_frame = None with self.__lock: if self.__newest_frame is None: continue buffer_frame = self.__newest_frame.copy() # encode frame for jpeg stream (flag, encoded_image) = cv2.imencode(".jpg", buffer_frame) # if there was an error try again with the next frame if not flag: continue # else yield encoded frame with mimetype image/jpeg yield (b'--frame\r\n' b'Content-Type: image/jpeg\r\n\r\n' + bytearray(encoded_image) + b'\r\n') # for debugging and testing start processing frames and detecting a 6 by 9 calibration chessboard if __name__ == '__main__' and BUILDING_DOCS == "false": camera = Camera() while True: image = camera.get_frame() criteria = (cv2.TERM_CRITERIA_EPS + cv2.TERM_CRITERIA_MAX_ITER, 30, 0.001) # processing # gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY) # find the chessboard corners # ret, corners = cv2.findChessboardCorners(gray, (6, 9), None) # cv2.drawChessboardCorners(frame, (6, 9), corners, ret) markers, image = camera.detect_markers_midpoint(image) print(markers) print("-----------------") camera.publish_frame(image)