import os import threading import time import systemd.daemon import LogstashLogging from LogstashLogging import logstash_logger from threading import Thread import logging RUN_CHECK = False try: from Battery import Battery from Buzzer import Buzzer RUN_CHECK = True except Exception as e: logstash_logger.error(f"unable to import battery or buzzer in daemon -> {str(e)}") __run = """raspivid -t 0 -b 5000000 -w 1280 -h 720 -fps 30 -n -o - | gst-launch-1.0 fdsrc ! video/x-h264,width=1280,height=720,framerate=30/1,noise-reduction=1,profile=high,stream-format=byte-stream ! h264parse ! queue ! flvmux streamable=true ! rtmpsink location=\"rtmp://localhost/live/stream\"""" STREAM_RASPI = False if os.getenv("STREAM_RASPI", "false") == "false" else True def check_battery(): while True: time.sleep(2) battery = 0 try: battery = Battery.percent() except Exception as e: logstash_logger.error(f"could not check battery -> {str(e)}") if -100 <= battery <= 15: logstash_logger.warning(f"LOW BATTERY DETECTED: '{battery}'") Buzzer.set(True) time.sleep(1) Buzzer.set(False) if __name__ == '__main__': try: systemd.daemon.notify(systemd.daemon.Notification.READY) except: logstash_logger.warning("Warning, old systemd version detected") systemd.daemon.notify('READY=1') logstash_logger.info("starting battery checker") battery_checker = None if RUN_CHECK: try: battery_checker = threading.Thread(target=check_battery) battery_checker.start() logstash_logger.info("starting battery checker - DONE") except Exception as e: logstash_logger.error(f"could not start battery checker -> {str(e)}") if STREAM_RASPI: logstash_logger.info("starting gstreamer background process") os.system(__run) logstash_logger.error("gstreamer stopped...") else: logstash_logger.info("not starting gstreamer background process and only checking for battery") if battery_checker is not None: battery_checker.join() else: logstash_logger.info("battery checker failed to initialize.. sleeping for a day, good night") time.sleep(60*60*24)