import os import cv2 import logging import threading from compLib.LogstashLogging import Logging from flask import Flask, Response RTMP_SERVER = os.getenv("RTMP_SERVER", "rtmp://localhost/live/stream") SERVE_VIDEO = os.getenv("SERVER_SRC", "/live") BUILDING_DOCS = os.getenv("BUILDING_DOCS", "false") app = Flask(__name__) HTML = """ Opencv Output

Opencv Output

""" # it would be better to use jinja2 here, but I don't want to blow up the package dependencies... HTML = HTML.replace("{{ VIDEO_DST }}", SERVE_VIDEO) class __Streaming: """ Class that handles rtmp streaming for opencv. DO NOT CREATE AN INSTANCE OF THIS CLASS YOURSELF! This is automatically done when importing this module. Use Vision.Streaming which is an instance of this class! grab frames -> do your own processing -> publish frame -> view on http server """ def __init__(self): """ Create instance of __Streaming class This is done implicitly when importing the vision module and will only fail if you would create an object of this class. (There can (SHOULD!) only be one VideCapture) """ self.__camera_stream = cv2.VideoCapture(RTMP_SERVER) # self.__camera_stream = cv2.VideoCapture(0) self.__newest_frame = None self.__lock = threading.Lock() Logging.get_logger().info("Initialized vision") def get_frame(self): """ Grab the newest frame from the rtmp stream. :return: An opencv frame """ ret, img16 = self.__camera_stream.read() return img16 def publish_frame(self, image): """ Publish an opencv frame to the http webserver. :param image: Opencv frame that will be published :return: None """ with self.__lock: if image is not None: self.__newest_frame = image.copy() def _newest_frame_generator(self): """ Private generator which is called directly from flask server. :return: Yields image/jpeg encoded frames published from publish_frame function. """ while True: # use a buffer frame to copy the newest frame with lock and then freeing it immediately buffer_frame = None with self.__lock: if self.__newest_frame is None: continue buffer_frame = self.__newest_frame.copy() # encode frame for jpeg stream (flag, encoded_image) = cv2.imencode(".jpg", buffer_frame) # if there was an error try again with the next frame if not flag: continue # else yield encoded frame with mimetype image/jpeg yield (b'--frame\r\n' b'Content-Type: image/jpeg\r\n\r\n' + bytearray(encoded_image) + b'\r\n') Streaming = None if BUILDING_DOCS == "false": # instantiate private class __Streaming Streaming = __Streaming() Logging.get_logger().info("created instance of streaming class") @app.route("/live") def __video_feed(): """ Define route for serving jpeg stream. :return: Return the response generated along with the specific media. """ return Response(Streaming._newest_frame_generator(), mimetype="multipart/x-mixed-replace; boundary=frame") @app.route("/") def __index(): """ Define route for serving a static http site to view the stream. :return: Static html page """ return HTML def __start_flask(): """ Function for running flask server in a thread. :return: """ Logging.get_logger().info("starting flask server") app.run(host="0.0.0.0", port=9898, debug=True, threaded=True, use_reloader=False) if BUILDING_DOCS == "false": # start flask service in the background __webserver_thread = threading.Thread(target=__start_flask) __webserver_thread.start() # for debugging and testing start processing frames and detecting a 6 by 9 calibration chessboard if __name__ == '__main__' and BUILDING_DOCS == "false": while True: frame = Streaming.get_frame() criteria = (cv2.TERM_CRITERIA_EPS + cv2.TERM_CRITERIA_MAX_ITER, 30, 0.001) # processing gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY) # find the chessboard corners ret, corners = cv2.findChessboardCorners(gray, (6, 9), None) cv2.drawChessboardCorners(frame, (6, 9), corners, ret) Streaming.publish_frame(frame)