import socket from threading import Lock import compLib.CompLib_pb2 as CompLib_pb2 class CompLibClient(object): UNIX_SOCKET_PATH = "/tmp/compLib" TCP_SOCKET_HOST = "" TCP_SOCKET_PORT = 9090 SOCKET = None LOCK = Lock() @staticmethod def use_unix_socket(socket_path="/tmp/compLib"): CompLibClient.UNIX_SOCKET_PATH = socket_path CompLibClient.SOCKET = socket.socket( socket.AF_UNIX, socket.SOCK_STREAM) CompLibClient.SOCKET.connect(CompLibClient.UNIX_SOCKET_PATH) from compLib.HealthCheck import HealthUpdater HealthUpdater.start() @staticmethod def use_tcp_socket(socket_host, socket_port=TCP_SOCKET_PORT): CompLibClient.TCP_SOCKET_HOST = socket_host CompLibClient.TCP_SOCKET_PORT = socket_port CompLibClient.SOCKET = socket.socket( socket.AF_INET, socket.SOCK_STREAM) CompLibClient.SOCKET.connect( (CompLibClient.TCP_SOCKET_HOST, CompLibClient.TCP_SOCKET_PORT)) from compLib.HealthCheck import HealthUpdater HealthUpdater.start() @staticmethod def send(data: bytes, size: int) -> bytes: with CompLibClient.LOCK: if CompLibClient.SOCKET is None: CompLibClient.use_unix_socket() CompLibClient.SOCKET.sendall(size.to_bytes(1, byteorder='big')) CompLibClient.SOCKET.sendall(data) response_size_bytes = CompLibClient.SOCKET.recv(1) response_size = int.from_bytes( response_size_bytes, byteorder="big") # print(response_size) response_bytes = CompLibClient.SOCKET.recv(response_size) # print(response_bytes.hex()) # print(len(response_bytes)) CompLibClient.check_response(response_bytes) return response_bytes @staticmethod def check_response(response_bytes: bytes) -> bool: # print(f"{response_bytes}") res = CompLib_pb2.GenericResponse() res.ParseFromString(response_bytes) if res.status.successful: return True # TODO: Log error message if unsuccessful return False