diff --git a/Server/AsyncServer.py b/Server/AsyncServer.py deleted file mode 100644 index 8e4235f..0000000 --- a/Server/AsyncServer.py +++ /dev/null @@ -1,87 +0,0 @@ -from ESock import ESock -import DataTypes -import Logging - -import sys -from Utils import capture_trace -from Utils import is_socket_related_error -import socket -import _thread - - - -class Server: - def __init__(self, host_port_pair, debug=False, compression_level=None): - self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - try: - self.sock.bind(host_port_pair) - except OSError as e: - Logging.error(str(e)) - exit(1) - self.sock.listen(2) - self.handlers = [] - self.debug = debug - self.compression_level = compression_level - - - def run(self, handler, handler_args={}): - self.handler = handler - while 1: - sock, info = self.sock.accept() - if self.compression_level: - sock = ESock(sock, debug=self.debug, compression_level=self.compression_level) - else: - sock = ESock(sock, debug=self.debug) - - handler = self.handler(sock, info, **handler_args) - self.handlers.append(handler) - _thread.start_new_thread(self.controller, (handler, )) - - - def stop(self): - for handler in self.handlers: - self.attempt_graceful_close(handler, handler.sock) - - - def controller(self, handler): - while 1: - try: - data, route = handler.sock.recv() - handler.handle(data, route) - except Exception as e: - if not is_socket_related_error(e): - self.print_trace(handler.sock) - self.attempt_graceful_close(handler, handler.sock) - _thread.exit() - - - def print_trace(self, sock): - Logging.error("An unhandled exception forced the controller for '%s:%d' to terminate." % (sock.address, sock.port)) - capture_trace() - - - def attempt_graceful_close(self, handler, sock): - try: - handler.finish() - except Exception: - self.print_trace(sock) - finally: - if handler in self.handlers: - del self.handlers[self.handlers.index(handler)] - handler.sock.close() - - - class Handler: - def __init__(self, sock, info, **kwargs): - self.sock = sock - self.info = info - self.setup(**kwargs) - - def setup(self, **kwargs): - pass - - def handle(self, data): - pass - - def finish(self): - pass diff --git a/Shared/ESock.py b/Shared/ESock.py deleted file mode 100644 index 8c557c8..0000000 --- a/Shared/ESock.py +++ /dev/null @@ -1,120 +0,0 @@ -import json -import socket -import struct -import gzip -import DataTypes -import Logging -from Utils import capture_trace - -MAX_ROUTE_LENGTH = 16 - -class ConvertFailedError(ValueError): - def __init__(self): - super(ValueError, self).__init__("conversion failed (false data type supplied)") - -def convert_data(data, data_type): - try: - if data_type == DataTypes.str: - data = data.decode() - elif data_type == DataTypes.int: - data = int(data.decode()) - elif data_type == DataTypes.float: - data = float(data.decode()) - elif data_type == DataTypes.json: - data = json.loads(data.decode()) - except Exception: - capture_trace() - raise ConvertFailedError() - return data - -class ESock: - DATA_TYPES = {str : DataTypes.str, dict : DataTypes.json, list : DataTypes.json, bytes : DataTypes.bin, int : DataTypes.int, float : DataTypes.float} - - def __init__(self, sock, debug=False, disconnect_callback=None, compression_level=0): - self._sock = sock - self.address, self.port = self._sock.getpeername() - self.debug = debug - self.disconnect_callback = disconnect_callback - self.compression_level = compression_level - if debug: - Logging.info("compression_level=%i" % compression_level) - - - def __getattr__(self, attr): - if attr == "recv": - return self.recv - elif attr == "send": - return self.send - elif attr == "_ESock__dict": - return self.__eq__ - return getattr(self._sock, attr) - - - def recv(self): - raw_metadata = self._sock.recv(32) - if raw_metadata == b"": - self._sock.close() - raise socket.error("Connection closed") - try: - metadata = struct.unpack("cQ16s", raw_metadata) - except struct.error: - Logging.error("Invalid metadata layout: '%s:%d'" % (self.address, self.port)) - if self.disconnect_callback != None: - self.disconnect_callback() - self._sock.close() - return None, "" - data_type = metadata[0] - data_length = metadata[1] - route = metadata[2].rstrip(b"\x00").decode() - data = b'' - bufsize = 4096 - while len(data) < data_length: - if len(data) + bufsize <= data_length: - packet = self._sock.recv(bufsize) - else: - packet = self._sock.recv(data_length % bufsize) - if not packet: - return None - data += packet - try: - data = convert_data(gzip.decompress(data), data_type) - except ConvertFailedError: - Logging.error("Invalid data type: '%s' ('%s:%d')" % (data_type.decode(), self.address, self.port)) - if self.disconnect_callback != None: - self.disconnect_callback() - self._sock.close() - return None, "" - if self.debug: - data_repr = str(data).replace("\n", " ") - if len(data_repr) > 80: - data_repr = data_repr[:80] + "..." - Logging.info("Received %d-long '%s' on route '%s': %s (%s:%d)" % (len(data), type(data).__name__, route, data_repr, self.address, self.port)) - return data, route - - - def send(self, data, route=""): - length = len(data) - original_data_type = type(data) - if self.debug: - data_repr = str(data).replace("\n", " ") - if len(data_repr) > 80: - data_repr = data_repr[:80] + "..." - Logging.info("Sending %d-long '%s' on route '%s': %s (%s:%d)" % (length, original_data_type.__name__, route, data_repr, self.address, self.port)) - route = route.encode() - if original_data_type in ESock.DATA_TYPES: - data_type = ESock.DATA_TYPES[original_data_type] - else: - data_type = DataTypes.other - if original_data_type is str: - data = data.encode() - elif original_data_type is dict or original_data_type is list: - data = json.dumps(data, separators=(',',':')).encode() - - data = gzip.compress(data, self.compression_level) - self.sendall(struct.pack("cQ16s", data_type, len(data), route) + data) - - - def __eq__(self, other): - if type(other) is ESock: - return self._sock is other._sock - return False