diff --git a/Shared/Highway.py b/Shared/Highway.py new file mode 100644 index 0000000..46ff049 --- /dev/null +++ b/Shared/Highway.py @@ -0,0 +1,214 @@ +import Logging +import gzip +import Routing +import struct +import json +from threading import Lock + +from ws4py.websocket import WebSocket +from ws4py.client.threadedclient import WebSocketClient + +INDEXED_DICT = 5 + +DATA_TYPES = {str : 0, dict : 1, + list : 1, bytes : 2, + int : 3, float : 4, INDEXED_DICT : 5} + + +def reverse_dict(dict): + return {v : k for k, v in dict.items()} + + +REVERSE_DATA_TYPES = reverse_dict(DATA_TYPES) + +INVALID_ROUTE = 1 +INVALID_METADATA_LAYOUT = 2 +INVALID_DATA_TYPE = 3 + +META_ROUTE = "meta" + + + + + +class ConvertFailedError(ValueError): + def __init__(self): + super(ValueError, self).__init__("conversion failed (invalid data type supplied)") + + +class Metadata: + def __init__(self, data_type, m_route): + self.data_type = REVERSE_DATA_TYPES[data_type] + self.m_route = m_route + + + +def create_metadata(data_type, converted_route, indexed_dict=False): + return struct.pack("bh", + DATA_TYPES[data_type] if not indexed_dict else DATA_TYPES[INDEXED_DICT], + converted_route) + + +def prepare_data(data): + original_data_type = type(data) + if original_data_type in DATA_TYPES: + data_type = DATA_TYPES[original_data_type] + else: + data_type = DATA_TYPES[bytes] + if original_data_type is str: + data = data.encode() + elif original_data_type is dict or original_data_type is list: + data = json.dumps(data, separators=(',',':')).encode() + return data, original_data_type + + +def pack_message(data, exchange_route, compression_level, + debug=False, indexed_dict=False): + data, original_data_type = prepare_data(data) + data = gzip.compress(data, compression_level) + """ + if debug: + data_repr = str(data).replace("\n", " ") + if len(data_repr) > 80: + data_repr = data_repr[:80] + "..." + Logging.info("Packaged '%s' on route '%s': %s" % + (original_data_type.__name__, route, data_repr)) + """ + return create_metadata(original_data_type, exchange_route, + indexed_dict=indexed_dict) + data + + + +def parse_metadata(message): + metadata = struct.unpack("bh", message[:4]) + return Metadata(metadata[0], metadata[1]) + + + + +def parse_message(data, data_type): + data = convert_data(gzip.decompress(data[4:]), data_type) + return data + + +def convert_data(data, data_type, debug=False): + try: + if data_type == str: + data = data.decode() + elif data_type == int: + data = int(data.decode()) + elif data_type == float: + data = float(data.decode()) + elif data_type in (dict, list): + data = json.loads(data.decode()) + elif data_type == INDEXED_DICT: + data = json.loads(data.decode()) + indexed_data = {} + for key in data: + indexed_data[int(key)] = data[key] + data = indexed_data + except Exception: + return None + return data + + +class Shared: + def setup(self, routes, compression_level, debug=False): + self.routes = routes + self.compression_level = compression_level + self.address, self.port = self.peer_address + self.debug = debug + + + def opened(self): + self.routes = Routing.create_routes(self.routes, self) + + + def received_message(self, message): + message = message.data + metadata = parse_metadata(message) + try: + route = self.routes[self.exchange_routes[metadata.m_route]] + except KeyError: + self.send(Handler.INVALID_ROUTE, META_ROUTE) + Logging.error("Received message with non-existing route '%d' from '%s:%d'" % ( + metadata.m_route, self.address, self.port)) + return + data = convert_data(message, metadata.data_type) + if self.debug: + data_repr = str(data).replace("\n", " ") + if len(data_repr) > 80: + data_repr = data_repr[:80] + "..." + Logging.info("Received '%s' on route '%s': %s (%s:%d)" % ( + type(data).__name__, route, data_repr, self.address, + self.port)) + self.routes[route].run(data, self) + + + def patched_send(self, data, route, indexed_dict=False): + self.raw_send(pack_message(data, self.reverse_exchange_routes[route], + self.compression_level, debug=self.debug, + indexed_dict=indexed_dict), binary=True) + if self.debug: + data_repr = str(data).replace("\n", " ") + if len(data_repr) > 80: + data_repr = data_repr[:80] + "..." + Logging.info("Sent '%s' on route '%s': %s (%s:%d)" % ( + type(data).__name__, route, data_repr, self.address, + self.port)) + + + + +class Server(Shared, WebSocket): + def setup(self, routes, compression_level, debug=False): + super().setup(routes, compression_level, debug=debug) + # Send replacement can't be done in the parent setup method because + # both client and server use a method from a different module. + self.raw_send = self.send + self.send = self.patched_send + + + + def opened(self): + super().opened() + self.exchange_routes = Routing.create_exchange_map(self.routes) + self.reverse_exchange_routes = reverse_dict(self.exchange_routes) + self.patched_send(self.exchange_routes, META_ROUTE, indexed_dict=True) + Routing.launch_routes(self.routes, self) + + + + + +class Client(WebSocketClient, Shared): + def setup(self, routes, compression_level, debug=False): + super().setup(routes, compression_level, debug=debug) + # Send replacement can't be done in the parent setup method because + # both client and server use a method from a different module. + self.metadata_lock = Lock() + self.raw_send = self.send + self.send = self.patched_send + self._received_message = self.received_message + self.received_message = self.receive_routes + + + def receive_routes(self, message): + self.metadata_lock.acquire() + message = message.data + metadata = parse_metadata(message) + if metadata.m_route != -1: + Logging.error("Invalid route on first message (not meta)") + self.close() + return + data = parse_message(message, metadata.data_type) + if Routing.validate_exchange_map(data): + self.exchange_routes = data + self.reverse_exchange_routes = reverse_dict(self.exchange_routes) + self.received_message = self._received_message + Logging.info("Routes successfully exchanged: %s" % str(self.exchange_routes)) + else: + Logging.error("Invalid exchange map.") + self.close() + return + self.metadata_lock.release()