This repository has been archived on 2025-06-01. You can view files and clone it, but you cannot make any changes to it's state, such as pushing and creating new issues, pull requests or comments.
compLIB/client/compLib/Camera.py
Joel Klimont f03df1b3b3 updated opencv documentation (camera module)
added enabling of legacy support for the camera module to the postinstall script
2022-11-18 17:19:40 +01:00

221 lines
7.8 KiB
Python

import sys
from typing import Any
# build image is somehow different from raspberry image? opencv-python is installed to a directory which is not in the pythonpath by default....
sys.path.append("/usr/lib/python3.9/site-packages")
import logging
import os
import queue
import threading
import cv2
from flask import Flask, Response
logging.basicConfig(format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', level=logging.INFO)
SERVE_VIDEO = os.getenv("SERVER_SRC", "/live")
BUILDING_DOCS = os.getenv("BUILDING_DOCS", "false")
HTML = """
<html>
<head>
<title>Opencv Output</title>
</head>
<body>
<h1>Opencv Output</h1>
<img src="{{ VIDEO_DST }}">
</body>
</html>
"""
# it would be better to use jinja2 here, but I don't want to blow up the package dependencies...
HTML = HTML.replace("{{ VIDEO_DST }}", SERVE_VIDEO)
class Marker:
def __init__(self, id: int, x: float, y: float):
self.id: int = id
self.x: float = x
self.y: float = y
def __str__(self) -> str:
return f"Marker ID: {self.id}, position: {self.x} x, {self.y} y"
def __repr__(self) -> str:
return str({"id": self.id, "x": self.x, "y": self.y})
class Camera:
class __Webserver:
def __init__(self, camera):
self.app = Flask(__name__)
self.__camera = camera
self.__thread = threading.Thread(target=self.__start_flask, daemon=True)
self.__thread.start()
@self.app.route("/live")
def __video_feed():
"""
Define route for serving jpeg stream.
:return: Return the response generated along with the specific media.
"""
return Response(self.__camera._newest_frame_generator(),
mimetype="multipart/x-mixed-replace; boundary=frame")
@self.app.route("/")
def __index():
"""
Define route for serving a static http site to view the stream.
:return: Static html page
"""
return HTML
def __start_flask(self):
"""
Function for running flask server in a thread.
:return:
"""
logging.getLogger("complib-logger").info("starting flask server")
self.app.run(host="0.0.0.0", port=9898, debug=True, threaded=True, use_reloader=False)
class __NoBufferVideoCapture:
def __init__(self, cam):
self.cap = cv2.VideoCapture(cam)
self.cap.set(3, 640)
self.cap.set(4, 480)
self.q = queue.Queue(maxsize=3)
self.stopped = False
self.t = threading.Thread(target=self._reader, daemon=True)
self.t.start()
def _reader(self):
while not self.stopped:
ret, frame = self.cap.read()
if not ret:
continue
if self.q.full():
try:
self.q.get_nowait()
except queue.Empty:
pass
self.q.put(frame)
def read(self):
return self.q.get()
def stop(self):
self.stopped = True
self.t.join()
def __init__(self):
self.__logger = logging.getLogger("complib-logger")
self.__logger.info("capturing rtmp stream is disabled in this version")
self.__camera_stream = self.__NoBufferVideoCapture(-1)
self.__newest_frame = None
self.__lock = threading.Lock()
self.__webserver = self.__Webserver(self)
self.aruco_dict = cv2.aruco.Dictionary_get(cv2.aruco.DICT_6X6_50)
self.aruco_params = cv2.aruco.DetectorParameters_create()
self.__logger.info("Initialized vision")
def get_frame(self):
"""
Die Funktion das neuste Bild, welches die Kamera aufgenommen hat zurück.
:return: Ein "opencv image frame"
"""
img16 = self.__camera_stream.read()
return img16
def detect_markers(self, image):
"""
Funktion um die ArUco Marker in einem Bild zu erkennen.
:param image: Bild, welches die Kamera aufgenommen hat.
:return: Gibt drei Variablen zurueck. Erstens eine Liste an Postionen der "Ecken" der erkannten Markern. Zweitens eine Liste an IDs der erkannten Markern und dritten noch Debug Informationen (diese können ignoriert werden).
"""
return cv2.aruco.detectMarkers(image, self.aruco_dict, parameters=self.aruco_params)
def detect_markers_midpoint(self, image) -> tuple[list[Marker], Any]:
"""
Funktion um die ArUco Marker in einem Bild zu erkennen, einzuzeichnen und den Mittelpunkt der Marker auszurechnen.
:param image: Bild, welches die Kamera aufgenommen hat.
:return: Gibt zwei Variablen zurueck. Erstens eine Liste an "Markern" und zweitens das Bild mit den eigezeichneten Marken.
"""
(corners, ids, rejected) = self.detect_markers(image)
self.draw_markers(image, corners, ids)
res = []
for i in range(0, len(corners)):
x = sum([point[0] for point in corners[i][0]]) / 4
y = sum([point[1] for point in corners[i][0]]) / 4
res.append(Marker(ids[i][0], x, y))
return res, image
def draw_markers(self, image, corners, ids):
"""
Zeichnet die erkannten Markern mit ihren IDs in das Bild.
:param image: Original Bild, in dem die Marker erkannt wurden.
:param corners: List der Positionen der Ecken der erkannten Marker.
:param ids: IDs der erkannten Markern.
:return: Neues Bild mit den eigezeichneten Markern.
"""
return cv2.aruco.drawDetectedMarkers(image, corners, ids)
def publish_frame(self, image):
"""
Sendet das Bild, welches der Funktion übergeben wird, an den Webserver, damit es der Nutzer in seinem Browser ansehen kann.
:param image: Opencv Bild, welches dem Nutzer angezeigt werden soll.
:return: None
"""
with self.__lock:
if image is not None:
self.__newest_frame = image.copy()
def _newest_frame_generator(self):
"""
Private generator which is called directly from flask server.
:return: Yields image/jpeg encoded frames published from publish_frame function.
"""
while True:
# use a buffer frame to copy the newest frame with lock and then freeing it immediately
buffer_frame = None
with self.__lock:
if self.__newest_frame is None:
continue
buffer_frame = self.__newest_frame.copy()
# encode frame for jpeg stream
(flag, encoded_image) = cv2.imencode(".jpg", buffer_frame)
# if there was an error try again with the next frame
if not flag:
continue
# else yield encoded frame with mimetype image/jpeg
yield (b'--frame\r\n' b'Content-Type: image/jpeg\r\n\r\n' +
bytearray(encoded_image) + b'\r\n')
# for debugging and testing start processing frames and detecting a 6 by 9 calibration chessboard
if __name__ == '__main__' and BUILDING_DOCS == "false":
camera = Camera()
while True:
image = camera.get_frame()
criteria = (cv2.TERM_CRITERIA_EPS + cv2.TERM_CRITERIA_MAX_ITER, 30, 0.001)
# processing
# gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
# find the chessboard corners
# ret, corners = cv2.findChessboardCorners(gray, (6, 9), None)
# cv2.drawChessboardCorners(frame, (6, 9), corners, ret)
markers, image = camera.detect_markers_midpoint(image)
print(markers)
print("-----------------")
camera.publish_frame(image)