from influxdb_client import InfluxDBClient, Point, WritePrecision import socket import uuid import os import multiprocessing import datetime import requests import time CONCURRENCY = 1 HOSTNAME = socket.gethostname() RUN_TRACE = str(uuid.uuid4()) TOKEN = "aCdxnntw-Zp3agci8Z32lQAR_ep6MhdmWOJG7ObnoikYqe7nKAhFYx1jVGBpipQuId79SC4Jl0J6IBYVqauJyw==" ORG = "robo4you" BUCKET = "compAIR" INFLUX_HOST = "https://influxdb.comp-air.at" EXTENSIVE_LOGGING = os.getenv("EXTENSIVE_LOGGING", "True") if EXTENSIVE_LOGGING == "True": EXTENSIVE_LOGGING = True else: EXTENSIVE_LOGGING = False influx_client = InfluxDBClient(url=INFLUX_HOST, token=TOKEN) write_api = influx_client.write_api() point_queue = multiprocessing.Queue() workers = [] class MetricsLogging(): """Used to send metrics / points to a influxdb """ @staticmethod def is_influx_reachable() -> bool: """Check if we can send metrics to the database :return: Is reachable? :rtype: bool """ try: r = requests.get(INFLUX_HOST) if r.status_code == 200: return True else: print(f"Could not connect to {INFLUX_HOST} -> ERROR CODE: {r.status_code}!") return False except requests.exceptions.RequestException as identifier: print(f"Could not connect to {INFLUX_HOST}!") @staticmethod def put(sensorType, value, port): """Put a datapoint into a time-series-database :param sensorType: A key used to identify a datapoint :param value: Value measured by the sensor :param port: Port of the sensor which was read """ if EXTENSIVE_LOGGING: point = Point(sensorType) \ .tag("host", HOSTNAME) \ .tag("runID", RUN_TRACE) \ .tag("port", port) \ .field("value", value) \ .time(datetime.datetime.utcnow(), WritePrecision.MS) point_queue.put_nowait(point) @staticmethod def worker(): point = object() for job in iter(point_queue.get, point): try: write_api.write(BUCKET, ORG, point) time.sleep(0.1) except Exception as e: pass finally: pass @staticmethod def start_workers(): global EXTENSIVE_LOGGING if EXTENSIVE_LOGGING: if MetricsLogging.is_influx_reachable(): for i in range(CONCURRENCY): worker = multiprocessing.Process(target=MetricsLogging.worker, daemon=True) worker.start() workers.append(worker) else: EXTENSIVE_LOGGING = False MetricsLogging.start_workers()