This repository has been archived on 2025-06-01. You can view files and clone it, but you cannot make any changes to it's state, such as pushing and creating new issues, pull requests or comments.
compLIB/compLib/IPService.py
2022-02-13 17:50:06 +00:00

116 lines
3.9 KiB
Python

import datetime
import os
import socket
import threading
import time
import systemd.daemon
from compLib.Lock import Lock
try:
from compLib.LogstashLogging import Logging
except Exception as e:
import logging
class Logger():
def __init__(self):
self.logger = logging.Logger('compApi background')
def get_logger(self):
return self.logger
Logging = Logger()
print(f"Could not import compLib.LogstashLogging: {str(e)}")
Logging.get_logger().error(f"Could not import compLib.LogstashLogging: {str(e)}")
print("after basic imports")
RUN_IP_CHECK = False
try:
from compLib.Display import Display
from compLib.Spi import Spi
from compLib import __version__
RUN_IP_CHECK = True
except Exception as e:
print(f"Could not import display or spi for ip output {str(e)}")
Logging.get_logger().warning(f"Could not import display or spi for ip output {str(e)}")
print(f"After display and Spi import")
__run = """raspivid -t 0 -b 5000000 -w 1280 -h 720 -fps 30 -n -o - | gst-launch-1.0 fdsrc ! video/x-h264,width=1280,height=720,framerate=30/1,noise-reduction=1,profile=high,stream-format=byte-stream ! h264parse ! queue ! flvmux streamable=true ! rtmpsink location=\"rtmp://localhost/live/stream\""""
STREAM_RASPI = False if os.getenv("STREAM_RASPI", "false") == "false" else True
IP_OUTPUT = False if os.getenv("IP_OUTPUT", "true") != "true" else True
def get_ip():
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
try:
s.connect(('10.255.255.255', 1))
IP = s.getsockname()[0]
except Exception:
IP = 'Not connected'
print(f"Error could not query ip: {e}")
finally:
s.close()
return IP
def write_ip_to_screen():
while os.getenv("IP_OUTPUT", "true") == "true":
try:
if not Lock.is_locked():
Lock.lock()
ip = str(get_ip())
print(f"writing {ip} to display")
Display.write(2, f"LIB: V{__version__}")
Display.write(3, f"FW: V{Spi.get_version()}")
Display.write(4, f"IP: {ip}")
Display.write(1, datetime.datetime.now().strftime("%b %d %H:%M:%S"))
Lock.unlock()
time.sleep(10)
except Exception as e:
print(f"Exception in write ip thread: {e}")
time.sleep(5)
if __name__ == '__main__':
try:
systemd.daemon.notify(systemd.daemon.Notification.READY)
except:
Logging.get_logger().warning("Warning, old systemd version detected")
systemd.daemon.notify('READY=1')
ip_output = None
if RUN_IP_CHECK and IP_OUTPUT:
print("starting ip output")
Logging.get_logger().info("starting ip output")
Lock.unlock()
try:
Spi.disable_health_check()
ip_output = threading.Thread(target=write_ip_to_screen)
ip_output.start()
print("starting ip output - DONE")
Logging.get_logger().info("starting ip output - DONE")
except Exception as e:
print(f"could not start ip output -> {str(e)}")
Logging.get_logger().error(f"could not start ip output -> {str(e)}")
if STREAM_RASPI:
print("starting gstreamer background process")
Logging.get_logger().info("starting gstreamer background process")
os.system(__run)
print("gstreamer stopped...")
Logging.get_logger().error("gstreamer stopped...")
else:
print("not starting gstreamer background process")
Logging.get_logger().info("not starting gstreamer background process")
if ip_output is not None:
ip_output.join()
else:
print("ip display output failed to initialize.. sleeping for a day, good night")
Logging.get_logger().info("ip display output failed to initialize.. sleeping for a day, good night")
time.sleep(60 * 60 * 24)