This repository has been archived on 2025-06-01. You can view files and clone it, but you cannot make any changes to it's state, such as pushing and creating new issues, pull requests or comments.
compLIB/compLib/VisionDaemon.py
2021-10-01 00:24:08 +02:00

72 lines
2.4 KiB
Python

import os
import threading
import time
import systemd.daemon
import LogstashLogging
from LogstashLogging import logstash_logger
from threading import Thread
import logging
import socket
RUN_IP_CHECK = False
try:
from compLib.Display import Display
except Exception as e:
logstash_logger.warning(f"Could not import display for battery output {str(e)}")
__run = """raspivid -t 0 -b 5000000 -w 1280 -h 720 -fps 30 -n -o - | gst-launch-1.0 fdsrc ! video/x-h264,width=1280,height=720,framerate=30/1,noise-reduction=1,profile=high,stream-format=byte-stream ! h264parse ! queue ! flvmux streamable=true ! rtmpsink location=\"rtmp://localhost/live/stream\""""
STREAM_RASPI = False if os.getenv("STREAM_RASPI", "false") == "false" else True
IP_OUTPUT = False if os.getenv("IP_OUTPUT", "false") == "false" else True
def get_ip():
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
try:
s.connect(('10.255.255.255', 1))
IP = s.getsockname()[0]
except Exception:
IP = '127.0.0.1'
finally:
s.close()
return IP
def write_ip_to_screen():
while not os.getenv("IP_OUTPUT", "false") == "false":
ip = str(get_ip())
logstash_logger.info(f"writing {ip} to display")
Display.write(1, ip)
time.sleep(60)
if __name__ == '__main__':
try:
systemd.daemon.notify(systemd.daemon.Notification.READY)
except:
logstash_logger.warning("Warning, old systemd version detected")
systemd.daemon.notify('READY=1')
logstash_logger.info("starting ip output")
ip_output = None
if RUN_IP_CHECK and IP_OUTPUT:
try:
ip_output = threading.Thread(target=write_ip_to_screen)
ip_output.start()
logstash_logger.info("starting battery checker - DONE")
except Exception as e:
logstash_logger.error(f"could not start battery checker -> {str(e)}")
if STREAM_RASPI:
logstash_logger.info("starting gstreamer background process")
os.system(__run)
logstash_logger.error("gstreamer stopped...")
else:
logstash_logger.info("not starting gstreamer background process and only checking for battery")
if ip_output is not None:
ip_output.join()
else:
logstash_logger.info("ip display output failed to initialize.. sleeping for a day, good night")
time.sleep(60*60*24)