Protobuf prototype
This commit is contained in:
parent
e277a83c5f
commit
9b567b8c6c
119 changed files with 8599 additions and 0 deletions
|
@ -1,97 +0,0 @@
|
|||
from influxdb_client import InfluxDBClient, Point, WritePrecision
|
||||
import socket
|
||||
import uuid
|
||||
import os
|
||||
import multiprocessing
|
||||
import datetime
|
||||
import requests
|
||||
import time
|
||||
|
||||
CONCURRENCY = 1
|
||||
|
||||
HOSTNAME = socket.gethostname()
|
||||
RUN_TRACE = str(uuid.uuid4())
|
||||
|
||||
TOKEN = "aCdxnntw-Zp3agci8Z32lQAR_ep6MhdmWOJG7ObnoikYqe7nKAhFYx1jVGBpipQuId79SC4Jl0J6IBYVqauJyw=="
|
||||
ORG = "robo4you"
|
||||
BUCKET = "compAIR"
|
||||
|
||||
INFLUX_HOST = "https://influxdb.comp-air.at"
|
||||
|
||||
EXTENSIVE_LOGGING = os.getenv("EXTENSIVE_LOGGING", "True")
|
||||
if EXTENSIVE_LOGGING == "True":
|
||||
EXTENSIVE_LOGGING = True
|
||||
else:
|
||||
EXTENSIVE_LOGGING = False
|
||||
|
||||
|
||||
influx_client = InfluxDBClient(url=INFLUX_HOST, token=TOKEN)
|
||||
write_api = influx_client.write_api()
|
||||
|
||||
point_queue = multiprocessing.Queue()
|
||||
workers = []
|
||||
|
||||
class MetricsLogging():
|
||||
"""Used to send metrics / points to a influxdb
|
||||
"""
|
||||
|
||||
@staticmethod
|
||||
def is_influx_reachable() -> bool:
|
||||
"""Check if we can send metrics to the database
|
||||
|
||||
:return: Is reachable?
|
||||
:rtype: bool
|
||||
"""
|
||||
try:
|
||||
r = requests.get(INFLUX_HOST)
|
||||
if r.status_code == 200:
|
||||
return True
|
||||
else:
|
||||
print(f"Could not connect to {INFLUX_HOST} -> ERROR CODE: {r.status_code}!")
|
||||
return False
|
||||
except requests.exceptions.RequestException as identifier:
|
||||
print(f"Could not connect to {INFLUX_HOST}!")
|
||||
|
||||
@staticmethod
|
||||
def put(sensorType, value, port):
|
||||
"""Put a datapoint into a time-series-database
|
||||
|
||||
:param sensorType: A key used to identify a datapoint
|
||||
:param value: Value measured by the sensor
|
||||
:param port: Port of the sensor which was read
|
||||
"""
|
||||
if EXTENSIVE_LOGGING:
|
||||
point = Point(sensorType) \
|
||||
.tag("host", HOSTNAME) \
|
||||
.tag("runID", RUN_TRACE) \
|
||||
.tag("port", port) \
|
||||
.field("value", value) \
|
||||
.time(datetime.datetime.utcnow(), WritePrecision.MS)
|
||||
point_queue.put_nowait(point)
|
||||
|
||||
@staticmethod
|
||||
def worker():
|
||||
point = object()
|
||||
for job in iter(point_queue.get, point):
|
||||
try:
|
||||
write_api.write(BUCKET, ORG, point)
|
||||
time.sleep(0.1)
|
||||
except Exception as e:
|
||||
pass
|
||||
finally:
|
||||
pass
|
||||
|
||||
@staticmethod
|
||||
def start_workers():
|
||||
global EXTENSIVE_LOGGING
|
||||
if EXTENSIVE_LOGGING:
|
||||
if MetricsLogging.is_influx_reachable():
|
||||
for i in range(CONCURRENCY):
|
||||
worker = multiprocessing.Process(target=MetricsLogging.worker, daemon=True)
|
||||
worker.start()
|
||||
workers.append(worker)
|
||||
else:
|
||||
EXTENSIVE_LOGGING = False
|
||||
|
||||
|
||||
MetricsLogging.start_workers()
|
Reference in a new issue